summaryrefslogtreecommitdiff
path: root/packages/megalodon/src/misskey/web_socket.ts
diff options
context:
space:
mode:
authorMar0xy <marie@kaifa.ch>2023-09-24 01:44:53 +0200
committerMar0xy <marie@kaifa.ch>2023-09-24 01:44:53 +0200
commitafda15260f4f97ec00b3e7fdf63bd13013daae40 (patch)
tree8f7869ddb0fb48f096648d3765f0c25561606b10 /packages/megalodon/src/misskey/web_socket.ts
parentupd: add new endpoints to Masto API (diff)
downloadsharkey-afda15260f4f97ec00b3e7fdf63bd13013daae40.tar.gz
sharkey-afda15260f4f97ec00b3e7fdf63bd13013daae40.tar.bz2
sharkey-afda15260f4f97ec00b3e7fdf63bd13013daae40.zip
upd: megalodon to v7
Diffstat (limited to 'packages/megalodon/src/misskey/web_socket.ts')
-rw-r--r--packages/megalodon/src/misskey/web_socket.ts803
1 files changed, 379 insertions, 424 deletions
diff --git a/packages/megalodon/src/misskey/web_socket.ts b/packages/megalodon/src/misskey/web_socket.ts
index 0cbfc2bfeb..181fb1c903 100644
--- a/packages/megalodon/src/misskey/web_socket.ts
+++ b/packages/megalodon/src/misskey/web_socket.ts
@@ -1,365 +1,328 @@
-import WS from "ws";
-import dayjs, { Dayjs } from "dayjs";
-import { v4 as uuid } from "uuid";
-import { EventEmitter } from "events";
-import { WebSocketInterface } from "../megalodon";
-import proxyAgent, { ProxyConfig } from "../proxy_config";
-import MisskeyAPI from "./api_client";
+import WS from 'ws'
+import dayjs, { Dayjs } from 'dayjs'
+import { v4 as uuid } from 'uuid'
+import { EventEmitter } from 'events'
+import { WebSocketInterface } from '../megalodon'
+import proxyAgent, { ProxyConfig } from '../proxy_config'
+import MisskeyAPI from './api_client'
+import { UnknownNotificationTypeError } from '../notification'
/**
* WebSocket
* Misskey is not support http streaming. It supports websocket instead of streaming.
* So this class connect to Misskey server with WebSocket.
*/
-export default class WebSocket
- extends EventEmitter
- implements WebSocketInterface
-{
- public url: string;
- public channel:
- | "user"
- | "localTimeline"
- | "hybridTimeline"
- | "globalTimeline"
- | "conversation"
- | "list";
- public parser: any;
- public headers: { [key: string]: string };
- public proxyConfig: ProxyConfig | false = false;
- public listId: string | null = null;
- private _converter: MisskeyAPI.Converter;
- private _accessToken: string;
- private _reconnectInterval: number;
- private _reconnectMaxAttempts: number;
- private _reconnectCurrentAttempts: number;
- private _connectionClosed: boolean;
- private _client: WS | null = null;
- private _channelID: string;
- private _pongReceivedTimestamp: Dayjs;
- private _heartbeatInterval = 60000;
- private _pongWaiting = false;
+export default class WebSocket extends EventEmitter implements WebSocketInterface {
+ public url: string
+ public channel: 'user' | 'localTimeline' | 'hybridTimeline' | 'globalTimeline' | 'conversation' | 'list'
+ public parser: Parser
+ public headers: { [key: string]: string }
+ public proxyConfig: ProxyConfig | false = false
+ public listId: string | null = null
+ private _accessToken: string
+ private _reconnectInterval: number
+ private _reconnectMaxAttempts: number
+ private _reconnectCurrentAttempts: number
+ private _connectionClosed: boolean
+ private _client: WS | null = null
+ private _channelID: string
+ private _pongReceivedTimestamp: Dayjs
+ private _heartbeatInterval: number = 60000
+ private _pongWaiting: boolean = false
- /**
- * @param url Full url of websocket: e.g. wss://misskey.io/streaming
- * @param channel Channel name is user, localTimeline, hybridTimeline, globalTimeline, conversation or list.
- * @param accessToken The access token.
- * @param listId This parameter is required when you specify list as channel.
- */
- constructor(
- url: string,
- channel:
- | "user"
- | "localTimeline"
- | "hybridTimeline"
- | "globalTimeline"
- | "conversation"
- | "list",
- accessToken: string,
- listId: string | undefined,
- userAgent: string,
- proxyConfig: ProxyConfig | false = false,
- converter: MisskeyAPI.Converter,
- ) {
- super();
- this.url = url;
- this.parser = new Parser();
- this.channel = channel;
- this.headers = {
- "User-Agent": userAgent,
- };
- if (listId === undefined) {
- this.listId = null;
- } else {
- this.listId = listId;
- }
- this.proxyConfig = proxyConfig;
- this._accessToken = accessToken;
- this._reconnectInterval = 10000;
- this._reconnectMaxAttempts = Infinity;
- this._reconnectCurrentAttempts = 0;
- this._connectionClosed = false;
- this._channelID = uuid();
- this._pongReceivedTimestamp = dayjs();
- this._converter = converter;
- }
+ /**
+ * @param url Full url of websocket: e.g. wss://misskey.io/streaming
+ * @param channel Channel name is user, localTimeline, hybridTimeline, globalTimeline, conversation or list.
+ * @param accessToken The access token.
+ * @param listId This parameter is required when you specify list as channel.
+ */
+ constructor(
+ url: string,
+ channel: 'user' | 'localTimeline' | 'hybridTimeline' | 'globalTimeline' | 'conversation' | 'list',
+ accessToken: string,
+ listId: string | undefined,
+ userAgent: string,
+ proxyConfig: ProxyConfig | false = false
+ ) {
+ super()
+ this.url = url
+ this.parser = new Parser()
+ this.channel = channel
+ this.headers = {
+ 'User-Agent': userAgent
+ }
+ if (listId === undefined) {
+ this.listId = null
+ } else {
+ this.listId = listId
+ }
+ this.proxyConfig = proxyConfig
+ this._accessToken = accessToken
+ this._reconnectInterval = 10000
+ this._reconnectMaxAttempts = Infinity
+ this._reconnectCurrentAttempts = 0
+ this._connectionClosed = false
+ this._channelID = uuid()
+ this._pongReceivedTimestamp = dayjs()
+ }
- /**
- * Start websocket connection.
- */
- public start() {
- this._connectionClosed = false;
- this._resetRetryParams();
- this._startWebSocketConnection();
- }
+ /**
+ * Start websocket connection.
+ */
+ public start() {
+ this._connectionClosed = false
+ this._resetRetryParams()
+ this._startWebSocketConnection()
+ }
- private baseUrlToHost(baseUrl: string): string {
- return baseUrl.replace("https://", "");
- }
+ /**
+ * Reset connection and start new websocket connection.
+ */
+ private _startWebSocketConnection() {
+ this._resetConnection()
+ this._setupParser()
+ this._client = this._connect()
+ this._bindSocket(this._client)
+ }
- /**
- * Reset connection and start new websocket connection.
- */
- private _startWebSocketConnection() {
- this._resetConnection();
- this._setupParser();
- this._client = this._connect();
- this._bindSocket(this._client);
- }
+ /**
+ * Stop current connection.
+ */
+ public stop() {
+ this._connectionClosed = true
+ this._resetConnection()
+ this._resetRetryParams()
+ }
- /**
- * Stop current connection.
- */
- public stop() {
- this._connectionClosed = true;
- this._resetConnection();
- this._resetRetryParams();
- }
+ /**
+ * Clean up current connection, and listeners.
+ */
+ private _resetConnection() {
+ if (this._client) {
+ this._client.close(1000)
+ this._client.removeAllListeners()
+ this._client = null
+ }
- /**
- * Clean up current connection, and listeners.
- */
- private _resetConnection() {
- if (this._client) {
- this._client.close(1000);
- this._client.removeAllListeners();
- this._client = null;
- }
+ if (this.parser) {
+ this.parser.removeAllListeners()
+ }
+ }
- if (this.parser) {
- this.parser.removeAllListeners();
- }
- }
+ /**
+ * Resets the parameters used in reconnect.
+ */
+ private _resetRetryParams() {
+ this._reconnectCurrentAttempts = 0
+ }
- /**
- * Resets the parameters used in reconnect.
- */
- private _resetRetryParams() {
- this._reconnectCurrentAttempts = 0;
- }
+ /**
+ * Connect to the endpoint.
+ */
+ private _connect(): WS {
+ let options: WS.ClientOptions = {
+ headers: this.headers
+ }
+ if (this.proxyConfig) {
+ options = Object.assign(options, {
+ agent: proxyAgent(this.proxyConfig)
+ })
+ }
+ const cli: WS = new WS(`${this.url}?i=${this._accessToken}`, options)
+ return cli
+ }
- /**
- * Connect to the endpoint.
- */
- private _connect(): WS {
- let options: WS.ClientOptions = {
- headers: this.headers,
- };
- if (this.proxyConfig) {
- options = Object.assign(options, {
- agent: proxyAgent(this.proxyConfig),
- });
- }
- const cli: WS = new WS(`${this.url}?i=${this._accessToken}`, options);
- return cli;
- }
+ /**
+ * Connect specified channels in websocket.
+ */
+ private _channel() {
+ if (!this._client) {
+ return
+ }
+ switch (this.channel) {
+ case 'conversation':
+ this._client.send(
+ JSON.stringify({
+ type: 'connect',
+ body: {
+ channel: 'main',
+ id: this._channelID
+ }
+ })
+ )
+ break
+ case 'user':
+ this._client.send(
+ JSON.stringify({
+ type: 'connect',
+ body: {
+ channel: 'main',
+ id: this._channelID
+ }
+ })
+ )
+ this._client.send(
+ JSON.stringify({
+ type: 'connect',
+ body: {
+ channel: 'homeTimeline',
+ id: this._channelID
+ }
+ })
+ )
+ break
+ case 'list':
+ this._client.send(
+ JSON.stringify({
+ type: 'connect',
+ body: {
+ channel: 'userList',
+ id: this._channelID,
+ params: {
+ listId: this.listId
+ }
+ }
+ })
+ )
+ break
+ default:
+ this._client.send(
+ JSON.stringify({
+ type: 'connect',
+ body: {
+ channel: this.channel,
+ id: this._channelID
+ }
+ })
+ )
+ break
+ }
+ }
- /**
- * Connect specified channels in websocket.
- */
- private _channel() {
- if (!this._client) {
- return;
- }
- switch (this.channel) {
- case "conversation":
- this._client.send(
- JSON.stringify({
- type: "connect",
- body: {
- channel: "main",
- id: this._channelID,
- },
- }),
- );
- break;
- case "user":
- this._client.send(
- JSON.stringify({
- type: "connect",
- body: {
- channel: "main",
- id: this._channelID,
- },
- }),
- );
- this._client.send(
- JSON.stringify({
- type: "connect",
- body: {
- channel: "homeTimeline",
- id: this._channelID,
- },
- }),
- );
- break;
- case "list":
- this._client.send(
- JSON.stringify({
- type: "connect",
- body: {
- channel: "userList",
- id: this._channelID,
- params: {
- listId: this.listId,
- },
- },
- }),
- );
- break;
- default:
- this._client.send(
- JSON.stringify({
- type: "connect",
- body: {
- channel: this.channel,
- id: this._channelID,
- },
- }),
- );
- break;
- }
- }
+ /**
+ * Reconnects to the same endpoint.
+ */
- /**
- * Reconnects to the same endpoint.
- */
+ private _reconnect() {
+ setTimeout(() => {
+ // Skip reconnect when client is connecting.
+ // https://github.com/websockets/ws/blob/7.2.1/lib/websocket.js#L365
+ if (this._client && this._client.readyState === WS.CONNECTING) {
+ return
+ }
- private _reconnect() {
- setTimeout(() => {
- // Skip reconnect when client is connecting.
- // https://github.com/websockets/ws/blob/7.2.1/lib/websocket.js#L365
- if (this._client && this._client.readyState === WS.CONNECTING) {
- return;
- }
+ if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) {
+ this._reconnectCurrentAttempts++
+ this._clearBinding()
+ if (this._client) {
+ // In reconnect, we want to close the connection immediately,
+ // because recoonect is necessary when some problems occur.
+ this._client.terminate()
+ }
+ // Call connect methods
+ console.log('Reconnecting')
+ this._client = this._connect()
+ this._bindSocket(this._client)
+ }
+ }, this._reconnectInterval)
+ }
- if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) {
- this._reconnectCurrentAttempts++;
- this._clearBinding();
- if (this._client) {
- // In reconnect, we want to close the connection immediately,
- // because recoonect is necessary when some problems occur.
- this._client.terminate();
- }
- // Call connect methods
- console.log("Reconnecting");
- this._client = this._connect();
- this._bindSocket(this._client);
- }
- }, this._reconnectInterval);
- }
+ /**
+ * Clear binding event for websocket client.
+ */
+ private _clearBinding() {
+ if (this._client) {
+ this._client.removeAllListeners('close')
+ this._client.removeAllListeners('pong')
+ this._client.removeAllListeners('open')
+ this._client.removeAllListeners('message')
+ this._client.removeAllListeners('error')
+ }
+ }
- /**
- * Clear binding event for websocket client.
- */
- private _clearBinding() {
- if (this._client) {
- this._client.removeAllListeners("close");
- this._client.removeAllListeners("pong");
- this._client.removeAllListeners("open");
- this._client.removeAllListeners("message");
- this._client.removeAllListeners("error");
- }
- }
+ /**
+ * Bind event for web socket client.
+ * @param client A WebSocket instance.
+ */
+ private _bindSocket(client: WS) {
+ client.on('close', (code: number, _reason: Buffer) => {
+ if (code === 1000) {
+ this.emit('close', {})
+ } else {
+ console.log(`Closed connection with ${code}`)
+ if (!this._connectionClosed) {
+ this._reconnect()
+ }
+ }
+ })
+ client.on('pong', () => {
+ this._pongWaiting = false
+ this.emit('pong', {})
+ this._pongReceivedTimestamp = dayjs()
+ // It is required to anonymous function since get this scope in checkAlive.
+ setTimeout(() => this._checkAlive(this._pongReceivedTimestamp), this._heartbeatInterval)
+ })
+ client.on('open', () => {
+ this.emit('connect', {})
+ this._channel()
+ // Call first ping event.
+ setTimeout(() => {
+ client.ping('')
+ }, 10000)
+ })
+ client.on('message', (data: WS.Data, isBinary: boolean) => {
+ this.parser.parse(data, isBinary, this._channelID)
+ })
+ client.on('error', (err: Error) => {
+ this.emit('error', err)
+ })
+ }
- /**
- * Bind event for web socket client.
- * @param client A WebSocket instance.
- */
- private _bindSocket(client: WS) {
- client.on("close", (code: number, _reason: Buffer) => {
- if (code === 1000) {
- this.emit("close", {});
- } else {
- console.log(`Closed connection with ${code}`);
- if (!this._connectionClosed) {
- this._reconnect();
- }
- }
- });
- client.on("pong", () => {
- this._pongWaiting = false;
- this.emit("pong", {});
- this._pongReceivedTimestamp = dayjs();
- // It is required to anonymous function since get this scope in checkAlive.
- setTimeout(
- () => this._checkAlive(this._pongReceivedTimestamp),
- this._heartbeatInterval,
- );
- });
- client.on("open", () => {
- this.emit("connect", {});
- this._channel();
- // Call first ping event.
- setTimeout(() => {
- client.ping("");
- }, 10000);
- });
- client.on("message", (data: WS.Data, isBinary: boolean) => {
- this.parser.parse(data, isBinary, this._channelID);
- });
- client.on("error", (err: Error) => {
- this.emit("error", err);
- });
- }
+ /**
+ * Set up parser when receive message.
+ */
+ private _setupParser() {
+ this.parser.on('update', (note: MisskeyAPI.Entity.Note) => {
+ this.emit('update', MisskeyAPI.Converter.note(note))
+ })
+ this.parser.on('notification', (notification: MisskeyAPI.Entity.Notification) => {
+ const n = MisskeyAPI.Converter.notification(notification)
+ if (n instanceof UnknownNotificationTypeError) {
+ console.warn(`Unknown notification event has received: ${notification}`)
+ } else {
+ this.emit('notification', n)
+ }
+ })
+ this.parser.on('conversation', (note: MisskeyAPI.Entity.Note) => {
+ this.emit('conversation', MisskeyAPI.Converter.noteToConversation(note))
+ })
+ this.parser.on('error', (err: Error) => {
+ this.emit('parser-error', err)
+ })
+ }
- /**
- * Set up parser when receive message.
- */
- private _setupParser() {
- this.parser.on("update", (note: MisskeyAPI.Entity.Note) => {
- this.emit(
- "update",
- this._converter.note(note, this.baseUrlToHost(this.url)),
- );
- });
- this.parser.on(
- "notification",
- (notification: MisskeyAPI.Entity.Notification) => {
- this.emit(
- "notification",
- this._converter.notification(
- notification,
- this.baseUrlToHost(this.url),
- ),
- );
- },
- );
- this.parser.on("conversation", (note: MisskeyAPI.Entity.Note) => {
- this.emit(
- "conversation",
- this._converter.noteToConversation(note, this.baseUrlToHost(this.url)),
- );
- });
- this.parser.on("error", (err: Error) => {
- this.emit("parser-error", err);
- });
- }
-
- /**
- * Call ping and wait to pong.
- */
- private _checkAlive(timestamp: Dayjs) {
- const now: Dayjs = dayjs();
- // Block multiple calling, if multiple pong event occur.
- // It the duration is less than interval, through ping.
- if (
- now.diff(timestamp) > this._heartbeatInterval - 1000 &&
- !this._connectionClosed
- ) {
- // Skip ping when client is connecting.
- // https://github.com/websockets/ws/blob/7.2.1/lib/websocket.js#L289
- if (this._client && this._client.readyState !== WS.CONNECTING) {
- this._pongWaiting = true;
- this._client.ping("");
- setTimeout(() => {
- if (this._pongWaiting) {
- this._pongWaiting = false;
- this._reconnect();
- }
- }, 10000);
- }
- }
- }
+ /**
+ * Call ping and wait to pong.
+ */
+ private _checkAlive(timestamp: Dayjs) {
+ const now: Dayjs = dayjs()
+ // Block multiple calling, if multiple pong event occur.
+ // It the duration is less than interval, through ping.
+ if (now.diff(timestamp) > this._heartbeatInterval - 1000 && !this._connectionClosed) {
+ // Skip ping when client is connecting.
+ // https://github.com/websockets/ws/blob/7.2.1/lib/websocket.js#L289
+ if (this._client && this._client.readyState !== WS.CONNECTING) {
+ this._pongWaiting = true
+ this._client.ping('')
+ setTimeout(() => {
+ if (this._pongWaiting) {
+ this._pongWaiting = false
+ this._reconnect()
+ }
+ }, 10000)
+ }
+ }
+ }
}
/**
@@ -367,92 +330,84 @@ export default class WebSocket
* This class provides parser for websocket message.
*/
export class Parser extends EventEmitter {
- /**
- * @param message Message body of websocket.
- * @param channelID Parse only messages which has same channelID.
- */
- public parse(data: WS.Data, isBinary: boolean, channelID: string) {
- const message = isBinary ? data : data.toString();
- if (typeof message !== "string") {
- this.emit("heartbeat", {});
- return;
- }
+ /**
+ * @param message Message body of websocket.
+ * @param channelID Parse only messages which has same channelID.
+ */
+ public parse(data: WS.Data, isBinary: boolean, channelID: string) {
+ const message = isBinary ? data : data.toString()
+ if (typeof message !== 'string') {
+ this.emit('heartbeat', {})
+ return
+ }
- if (message === "") {
- this.emit("heartbeat", {});
- return;
- }
+ if (message === '') {
+ this.emit('heartbeat', {})
+ return
+ }
- let obj: {
- type: string;
- body: {
- id: string;
- type: string;
- body: any;
- };
- };
- let body: {
- id: string;
- type: string;
- body: any;
- };
+ let obj: {
+ type: string
+ body: {
+ id: string
+ type: string
+ body: any
+ }
+ }
+ let body: {
+ id: string
+ type: string
+ body: any
+ }
- try {
- obj = JSON.parse(message);
- if (obj.type !== "channel") {
- return;
- }
- if (!obj.body) {
- return;
- }
- body = obj.body;
- if (body.id !== channelID) {
- return;
- }
- } catch (err) {
- this.emit(
- "error",
- new Error(
- `Error parsing websocket reply: ${message}, error message: ${err}`,
- ),
- );
- return;
- }
+ try {
+ obj = JSON.parse(message)
+ if (obj.type !== 'channel') {
+ return
+ }
+ if (!obj.body) {
+ return
+ }
+ body = obj.body
+ if (body.id !== channelID) {
+ return
+ }
+ } catch (err) {
+ this.emit('error', new Error(`Error parsing websocket reply: ${message}, error message: ${err}`))
+ return
+ }
- switch (body.type) {
- case "note":
- this.emit("update", body.body as MisskeyAPI.Entity.Note);
- break;
- case "notification":
- this.emit("notification", body.body as MisskeyAPI.Entity.Notification);
- break;
- case "mention": {
- const note = body.body as MisskeyAPI.Entity.Note;
- if (note.visibility === "specified") {
- this.emit("conversation", note);
- }
- break;
- }
- // When renote and followed event, the same notification will be received.
- case "renote":
- case "followed":
- case "follow":
- case "unfollow":
- case "receiveFollowRequest":
- case "meUpdated":
- case "readAllNotifications":
- case "readAllUnreadSpecifiedNotes":
- case "readAllAntennas":
- case "readAllUnreadMentions":
- case "unreadNotification":
- // Ignore these events
- break;
- default:
- this.emit(
- "error",
- new Error(`Unknown event has received: ${JSON.stringify(body)}`),
- );
- break;
- }
- }
+ switch (body.type) {
+ case 'note':
+ this.emit('update', body.body as MisskeyAPI.Entity.Note)
+ break
+ case 'notification':
+ this.emit('notification', body.body as MisskeyAPI.Entity.Notification)
+ break
+ case 'mention': {
+ const note = body.body as MisskeyAPI.Entity.Note
+ if (note.visibility === 'specified') {
+ this.emit('conversation', note)
+ }
+ break
+ }
+ // When renote and followed event, the same notification will be received.
+ case 'renote':
+ case 'followed':
+ case 'follow':
+ case 'unfollow':
+ case 'receiveFollowRequest':
+ case 'meUpdated':
+ case 'readAllNotifications':
+ case 'readAllUnreadSpecifiedNotes':
+ case 'readAllAntennas':
+ case 'readAllUnreadMentions':
+ case 'unreadNotification':
+ // Ignore these events
+ break
+ default:
+ this.emit('error', new Error(`Unknown event has received: ${JSON.stringify(body)}`))
+ break
+ }
+ }
}