summaryrefslogtreecommitdiff
path: root/packages/misskey-js/src/streaming.ts
diff options
context:
space:
mode:
authorKagami Sascha Rosylight <saschanaz@outlook.com>2023-03-30 02:33:19 +0200
committerGitHub <noreply@github.com>2023-03-30 09:33:19 +0900
commitcee1d5e2d01359e6d762a10fc67d4e7c1cc830eb (patch)
tree45eab3096b1983ae5267caab4aa4c5eff77b6e5d /packages/misskey-js/src/streaming.ts
parentNew Crowdin updates (#10407) (diff)
downloadsharkey-cee1d5e2d01359e6d762a10fc67d4e7c1cc830eb.tar.gz
sharkey-cee1d5e2d01359e6d762a10fc67d4e7c1cc830eb.tar.bz2
sharkey-cee1d5e2d01359e6d762a10fc67d4e7c1cc830eb.zip
chore: integrate misskey-js as a workspace item (git subtree) (#10409)
* Additional changes for the merge * api-misskey-js
Diffstat (limited to 'packages/misskey-js/src/streaming.ts')
-rw-r--r--packages/misskey-js/src/streaming.ts340
1 files changed, 340 insertions, 0 deletions
diff --git a/packages/misskey-js/src/streaming.ts b/packages/misskey-js/src/streaming.ts
new file mode 100644
index 0000000000..6388828689
--- /dev/null
+++ b/packages/misskey-js/src/streaming.ts
@@ -0,0 +1,340 @@
+import autobind from 'autobind-decorator';
+import { EventEmitter } from 'eventemitter3';
+import ReconnectingWebsocket from 'reconnecting-websocket';
+import type { BroadcastEvents, Channels } from './streaming.types';
+
+export function urlQuery(obj: Record<string, string | number | boolean | undefined>): string {
+ const params = Object.entries(obj)
+ .filter(([, v]) => Array.isArray(v) ? v.length : v !== undefined)
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ .reduce((a, [k, v]) => (a[k] = v!, a), {} as Record<string, string | number | boolean>);
+
+ return Object.entries(params)
+ .map((e) => `${e[0]}=${encodeURIComponent(e[1])}`)
+ .join('&');
+}
+
+type AnyOf<T extends Record<any, any>> = T[keyof T];
+
+type StreamEvents = {
+ _connected_: void;
+ _disconnected_: void;
+} & BroadcastEvents;
+
+/**
+ * Misskey stream connection
+ */
+export default class Stream extends EventEmitter<StreamEvents> {
+ private stream: ReconnectingWebsocket;
+ public state: 'initializing' | 'reconnecting' | 'connected' = 'initializing';
+ private sharedConnectionPools: Pool[] = [];
+ private sharedConnections: SharedConnection[] = [];
+ private nonSharedConnections: NonSharedConnection[] = [];
+ private idCounter = 0;
+
+ constructor(origin: string, user: { token: string; } | null, options?: {
+ WebSocket?: any;
+ }) {
+ super();
+ options = options || { };
+
+ const query = urlQuery({
+ i: user?.token,
+
+ // To prevent cache of an HTML such as error screen
+ _t: Date.now(),
+ });
+
+ const wsOrigin = origin.replace('http://', 'ws://').replace('https://', 'wss://');
+
+ this.stream = new ReconnectingWebsocket(`${wsOrigin}/streaming?${query}`, '', {
+ minReconnectionDelay: 1, // https://github.com/pladaria/reconnecting-websocket/issues/91
+ WebSocket: options.WebSocket,
+ });
+ this.stream.addEventListener('open', this.onOpen);
+ this.stream.addEventListener('close', this.onClose);
+ this.stream.addEventListener('message', this.onMessage);
+ }
+
+ @autobind
+ private genId(): string {
+ return (++this.idCounter).toString();
+ }
+
+ @autobind
+ public useChannel<C extends keyof Channels>(channel: C, params?: Channels[C]['params'], name?: string): Connection<Channels[C]> {
+ if (params) {
+ return this.connectToChannel(channel, params);
+ } else {
+ return this.useSharedConnection(channel, name);
+ }
+ }
+
+ @autobind
+ private useSharedConnection<C extends keyof Channels>(channel: C, name?: string): SharedConnection<Channels[C]> {
+ let pool = this.sharedConnectionPools.find(p => p.channel === channel);
+
+ if (pool == null) {
+ pool = new Pool(this, channel, this.genId());
+ this.sharedConnectionPools.push(pool);
+ }
+
+ const connection = new SharedConnection(this, channel, pool, name);
+ this.sharedConnections.push(connection);
+ return connection;
+ }
+
+ @autobind
+ public removeSharedConnection(connection: SharedConnection): void {
+ this.sharedConnections = this.sharedConnections.filter(c => c !== connection);
+ }
+
+ @autobind
+ public removeSharedConnectionPool(pool: Pool): void {
+ this.sharedConnectionPools = this.sharedConnectionPools.filter(p => p !== pool);
+ }
+
+ @autobind
+ private connectToChannel<C extends keyof Channels>(channel: C, params: Channels[C]['params']): NonSharedConnection<Channels[C]> {
+ const connection = new NonSharedConnection(this, channel, this.genId(), params);
+ this.nonSharedConnections.push(connection);
+ return connection;
+ }
+
+ @autobind
+ public disconnectToChannel(connection: NonSharedConnection): void {
+ this.nonSharedConnections = this.nonSharedConnections.filter(c => c !== connection);
+ }
+
+ /**
+ * Callback of when open connection
+ */
+ @autobind
+ private onOpen(): void {
+ const isReconnect = this.state === 'reconnecting';
+
+ this.state = 'connected';
+ this.emit('_connected_');
+
+ // チャンネル再接続
+ if (isReconnect) {
+ for (const p of this.sharedConnectionPools) p.connect();
+ for (const c of this.nonSharedConnections) c.connect();
+ }
+ }
+
+ /**
+ * Callback of when close connection
+ */
+ @autobind
+ private onClose(): void {
+ if (this.state === 'connected') {
+ this.state = 'reconnecting';
+ this.emit('_disconnected_');
+ }
+ }
+
+ /**
+ * Callback of when received a message from connection
+ */
+ @autobind
+ private onMessage(message: { data: string; }): void {
+ const { type, body } = JSON.parse(message.data);
+
+ if (type === 'channel') {
+ const id = body.id;
+
+ let connections: Connection[];
+
+ connections = this.sharedConnections.filter(c => c.id === id);
+
+ if (connections.length === 0) {
+ const found = this.nonSharedConnections.find(c => c.id === id);
+ if (found) {
+ connections = [found];
+ }
+ }
+
+ for (const c of connections) {
+ c.emit(body.type, body.body);
+ c.inCount++;
+ }
+ } else {
+ this.emit(type, body);
+ }
+ }
+
+ /**
+ * Send a message to connection
+ */
+ @autobind
+ public send(typeOrPayload: any, payload?: any): void {
+ const data = payload === undefined ? typeOrPayload : {
+ type: typeOrPayload,
+ body: payload,
+ };
+
+ this.stream.send(JSON.stringify(data));
+ }
+
+ /**
+ * Close this connection
+ */
+ @autobind
+ public close(): void {
+ this.stream.close();
+ }
+}
+
+// TODO: これらのクラスを Stream クラスの内部クラスにすれば余計なメンバをpublicにしないで済むかも?
+// もしくは @internal を使う? https://www.typescriptlang.org/tsconfig#stripInternal
+class Pool {
+ public channel: string;
+ public id: string;
+ protected stream: Stream;
+ public users = 0;
+ private disposeTimerId: any;
+ private isConnected = false;
+
+ constructor(stream: Stream, channel: string, id: string) {
+ this.channel = channel;
+ this.stream = stream;
+ this.id = id;
+
+ this.stream.on('_disconnected_', this.onStreamDisconnected);
+ }
+
+ @autobind
+ private onStreamDisconnected(): void {
+ this.isConnected = false;
+ }
+
+ @autobind
+ public inc(): void {
+ if (this.users === 0 && !this.isConnected) {
+ this.connect();
+ }
+
+ this.users++;
+
+ // タイマー解除
+ if (this.disposeTimerId) {
+ clearTimeout(this.disposeTimerId);
+ this.disposeTimerId = null;
+ }
+ }
+
+ @autobind
+ public dec(): void {
+ this.users--;
+
+ // そのコネクションの利用者が誰もいなくなったら
+ if (this.users === 0) {
+ // また直ぐに再利用される可能性があるので、一定時間待ち、
+ // 新たな利用者が現れなければコネクションを切断する
+ this.disposeTimerId = setTimeout(() => {
+ this.disconnect();
+ }, 3000);
+ }
+ }
+
+ @autobind
+ public connect(): void {
+ if (this.isConnected) return;
+ this.isConnected = true;
+ this.stream.send('connect', {
+ channel: this.channel,
+ id: this.id,
+ });
+ }
+
+ @autobind
+ private disconnect(): void {
+ this.stream.off('_disconnected_', this.onStreamDisconnected);
+ this.stream.send('disconnect', { id: this.id });
+ this.stream.removeSharedConnectionPool(this);
+ }
+}
+
+export abstract class Connection<Channel extends AnyOf<Channels> = any> extends EventEmitter<Channel['events']> {
+ public channel: string;
+ protected stream: Stream;
+ public abstract id: string;
+
+ public name?: string; // for debug
+ public inCount = 0; // for debug
+ public outCount = 0; // for debug
+
+ constructor(stream: Stream, channel: string, name?: string) {
+ super();
+
+ this.stream = stream;
+ this.channel = channel;
+ this.name = name;
+ }
+
+ @autobind
+ public send<T extends keyof Channel['receives']>(type: T, body: Channel['receives'][T]): void {
+ this.stream.send('ch', {
+ id: this.id,
+ type: type,
+ body: body,
+ });
+
+ this.outCount++;
+ }
+
+ public abstract dispose(): void;
+}
+
+class SharedConnection<Channel extends AnyOf<Channels> = any> extends Connection<Channel> {
+ private pool: Pool;
+
+ public get id(): string {
+ return this.pool.id;
+ }
+
+ constructor(stream: Stream, channel: string, pool: Pool, name?: string) {
+ super(stream, channel, name);
+
+ this.pool = pool;
+ this.pool.inc();
+ }
+
+ @autobind
+ public dispose(): void {
+ this.pool.dec();
+ this.removeAllListeners();
+ this.stream.removeSharedConnection(this);
+ }
+}
+
+class NonSharedConnection<Channel extends AnyOf<Channels> = any> extends Connection<Channel> {
+ public id: string;
+ protected params: Channel['params'];
+
+ constructor(stream: Stream, channel: string, id: string, params: Channel['params']) {
+ super(stream, channel);
+
+ this.params = params;
+ this.id = id;
+
+ this.connect();
+ }
+
+ @autobind
+ public connect(): void {
+ this.stream.send('connect', {
+ channel: this.channel,
+ id: this.id,
+ params: this.params,
+ });
+ }
+
+ @autobind
+ public dispose(): void {
+ this.removeAllListeners();
+ this.stream.send('disconnect', { id: this.id });
+ this.stream.disconnectToChannel(this);
+ }
+}