summaryrefslogtreecommitdiff
path: root/packages/backend/src/server/api/stream/Connection.ts
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2023-09-15 14:28:29 +0900
committerGitHub <noreply@github.com>2023-09-15 14:28:29 +0900
commit6cf466e5d1d423aec8ec06f65644c73c7e9a8ecd (patch)
treeb3623e27ae94471fa4d58dda9e9296b21afbd20e /packages/backend/src/server/api/stream/Connection.ts
parentenhance nodeinfo by prpoagate the standart keys for homepage and repository (... (diff)
downloadsharkey-6cf466e5d1d423aec8ec06f65644c73c7e9a8ecd.tar.gz
sharkey-6cf466e5d1d423aec8ec06f65644c73c7e9a8ecd.tar.bz2
sharkey-6cf466e5d1d423aec8ec06f65644c73c7e9a8ecd.zip
update deps (#11820)
* update deps * fix * wip * wip * wip * Update docker-compose.yml.example * Delete reviewer-lottery.yml * Update RepositoryModule.ts * wip * wip * clean up * update deps * wip * wip
Diffstat (limited to 'packages/backend/src/server/api/stream/Connection.ts')
-rw-r--r--packages/backend/src/server/api/stream/Connection.ts299
1 files changed, 299 insertions, 0 deletions
diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts
new file mode 100644
index 0000000000..9ada6c559e
--- /dev/null
+++ b/packages/backend/src/server/api/stream/Connection.ts
@@ -0,0 +1,299 @@
+/*
+ * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+import * as WebSocket from 'ws';
+import type { MiUser } from '@/models/entities/User.js';
+import type { MiAccessToken } from '@/models/entities/AccessToken.js';
+import type { Packed } from '@/misc/json-schema.js';
+import type { NoteReadService } from '@/core/NoteReadService.js';
+import type { NotificationService } from '@/core/NotificationService.js';
+import { bindThis } from '@/decorators.js';
+import { CacheService } from '@/core/CacheService.js';
+import { MiUserProfile } from '@/models/_.js';
+import type { ChannelsService } from './ChannelsService.js';
+import type { EventEmitter } from 'events';
+import type Channel from './channel.js';
+import type { StreamEventEmitter, StreamMessages } from './types.js';
+
+/**
+ * Main stream connection
+ */
+// eslint-disable-next-line import/no-default-export
+export default class Connection {
+ public user?: MiUser;
+ public token?: MiAccessToken;
+ private wsConnection: WebSocket.WebSocket;
+ public subscriber: StreamEventEmitter;
+ private channels: Channel[] = [];
+ private subscribingNotes: any = {};
+ private cachedNotes: Packed<'Note'>[] = [];
+ public userProfile: MiUserProfile | null = null;
+ public following: Set<string> = new Set();
+ public followingChannels: Set<string> = new Set();
+ public userIdsWhoMeMuting: Set<string> = new Set();
+ public userIdsWhoBlockingMe: Set<string> = new Set();
+ public userIdsWhoMeMutingRenotes: Set<string> = new Set();
+ private fetchIntervalId: NodeJS.Timeout | null = null;
+
+ constructor(
+ private channelsService: ChannelsService,
+ private noteReadService: NoteReadService,
+ private notificationService: NotificationService,
+ private cacheService: CacheService,
+
+ user: MiUser | null | undefined,
+ token: MiAccessToken | null | undefined,
+ ) {
+ if (user) this.user = user;
+ if (token) this.token = token;
+ }
+
+ @bindThis
+ public async fetch() {
+ if (this.user == null) return;
+ const [userProfile, following, followingChannels, userIdsWhoMeMuting, userIdsWhoBlockingMe, userIdsWhoMeMutingRenotes] = await Promise.all([
+ this.cacheService.userProfileCache.fetch(this.user.id),
+ this.cacheService.userFollowingsCache.fetch(this.user.id),
+ this.cacheService.userFollowingChannelsCache.fetch(this.user.id),
+ this.cacheService.userMutingsCache.fetch(this.user.id),
+ this.cacheService.userBlockedCache.fetch(this.user.id),
+ this.cacheService.renoteMutingsCache.fetch(this.user.id),
+ ]);
+ this.userProfile = userProfile;
+ this.following = following;
+ this.followingChannels = followingChannels;
+ this.userIdsWhoMeMuting = userIdsWhoMeMuting;
+ this.userIdsWhoBlockingMe = userIdsWhoBlockingMe;
+ this.userIdsWhoMeMutingRenotes = userIdsWhoMeMutingRenotes;
+ }
+
+ @bindThis
+ public async init() {
+ if (this.user != null) {
+ await this.fetch();
+
+ if (!this.fetchIntervalId) {
+ this.fetchIntervalId = setInterval(this.fetch, 1000 * 10);
+ }
+ }
+ }
+
+ @bindThis
+ public async listen(subscriber: EventEmitter, wsConnection: WebSocket.WebSocket) {
+ this.subscriber = subscriber;
+
+ this.wsConnection = wsConnection;
+ this.wsConnection.on('message', this.onWsConnectionMessage);
+
+ this.subscriber.on('broadcast', data => {
+ this.onBroadcastMessage(data);
+ });
+ }
+
+ /**
+ * クライアントからメッセージ受信時
+ */
+ @bindThis
+ private async onWsConnectionMessage(data: WebSocket.RawData) {
+ let obj: Record<string, any>;
+
+ try {
+ obj = JSON.parse(data.toString());
+ } catch (e) {
+ return;
+ }
+
+ const { type, body } = obj;
+
+ switch (type) {
+ case 'readNotification': this.onReadNotification(body); break;
+ case 'subNote': this.onSubscribeNote(body); break;
+ case 's': this.onSubscribeNote(body); break; // alias
+ case 'sr': this.onSubscribeNote(body); this.readNote(body); break;
+ case 'unsubNote': this.onUnsubscribeNote(body); break;
+ case 'un': this.onUnsubscribeNote(body); break; // alias
+ case 'connect': this.onChannelConnectRequested(body); break;
+ case 'disconnect': this.onChannelDisconnectRequested(body); break;
+ case 'channel': this.onChannelMessageRequested(body); break;
+ case 'ch': this.onChannelMessageRequested(body); break; // alias
+ }
+ }
+
+ @bindThis
+ private onBroadcastMessage(data: StreamMessages['broadcast']['payload']) {
+ this.sendMessageToWs(data.type, data.body);
+ }
+
+ @bindThis
+ public cacheNote(note: Packed<'Note'>) {
+ const add = (note: Packed<'Note'>) => {
+ const existIndex = this.cachedNotes.findIndex(n => n.id === note.id);
+ if (existIndex > -1) {
+ this.cachedNotes[existIndex] = note;
+ return;
+ }
+
+ this.cachedNotes.unshift(note);
+ if (this.cachedNotes.length > 32) {
+ this.cachedNotes.splice(32);
+ }
+ };
+
+ add(note);
+ if (note.reply) add(note.reply);
+ if (note.renote) add(note.renote);
+ }
+
+ @bindThis
+ private readNote(body: any) {
+ const id = body.id;
+
+ const note = this.cachedNotes.find(n => n.id === id);
+ if (note == null) return;
+
+ if (this.user && (note.userId !== this.user.id)) {
+ this.noteReadService.read(this.user.id, [note]);
+ }
+ }
+
+ @bindThis
+ private onReadNotification(payload: any) {
+ this.notificationService.readAllNotification(this.user!.id);
+ }
+
+ /**
+ * 投稿購読要求時
+ */
+ @bindThis
+ private onSubscribeNote(payload: any) {
+ if (!payload.id) return;
+
+ if (this.subscribingNotes[payload.id] == null) {
+ this.subscribingNotes[payload.id] = 0;
+ }
+
+ this.subscribingNotes[payload.id]++;
+
+ if (this.subscribingNotes[payload.id] === 1) {
+ this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage);
+ }
+ }
+
+ /**
+ * 投稿購読解除要求時
+ */
+ @bindThis
+ private onUnsubscribeNote(payload: any) {
+ if (!payload.id) return;
+
+ this.subscribingNotes[payload.id]--;
+ if (this.subscribingNotes[payload.id] <= 0) {
+ delete this.subscribingNotes[payload.id];
+ this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage);
+ }
+ }
+
+ @bindThis
+ private async onNoteStreamMessage(data: StreamMessages['note']['payload']) {
+ this.sendMessageToWs('noteUpdated', {
+ id: data.body.id,
+ type: data.type,
+ body: data.body.body,
+ });
+ }
+
+ /**
+ * チャンネル接続要求時
+ */
+ @bindThis
+ private onChannelConnectRequested(payload: any) {
+ const { channel, id, params, pong } = payload;
+ this.connectChannel(id, params, channel, pong);
+ }
+
+ /**
+ * チャンネル切断要求時
+ */
+ @bindThis
+ private onChannelDisconnectRequested(payload: any) {
+ const { id } = payload;
+ this.disconnectChannel(id);
+ }
+
+ /**
+ * クライアントにメッセージ送信
+ */
+ @bindThis
+ public sendMessageToWs(type: string, payload: any) {
+ this.wsConnection.send(JSON.stringify({
+ type: type,
+ body: payload,
+ }));
+ }
+
+ /**
+ * チャンネルに接続
+ */
+ @bindThis
+ public connectChannel(id: string, params: any, channel: string, pong = false) {
+ const channelService = this.channelsService.getChannelService(channel);
+
+ if (channelService.requireCredential && this.user == null) {
+ return;
+ }
+
+ // 共有可能チャンネルに接続しようとしていて、かつそのチャンネルに既に接続していたら無意味なので無視
+ if (channelService.shouldShare && this.channels.some(c => c.chName === channel)) {
+ return;
+ }
+
+ const ch: Channel = channelService.create(id, this);
+ this.channels.push(ch);
+ ch.init(params ?? {});
+
+ if (pong) {
+ this.sendMessageToWs('connected', {
+ id: id,
+ });
+ }
+ }
+
+ /**
+ * チャンネルから切断
+ * @param id チャンネルコネクションID
+ */
+ @bindThis
+ public disconnectChannel(id: string) {
+ const channel = this.channels.find(c => c.id === id);
+
+ if (channel) {
+ if (channel.dispose) channel.dispose();
+ this.channels = this.channels.filter(c => c.id !== id);
+ }
+ }
+
+ /**
+ * チャンネルへメッセージ送信要求時
+ * @param data メッセージ
+ */
+ @bindThis
+ private onChannelMessageRequested(data: any) {
+ const channel = this.channels.find(c => c.id === data.id);
+ if (channel != null && channel.onMessage != null) {
+ channel.onMessage(data.type, data.body);
+ }
+ }
+
+ /**
+ * ストリームが切れたとき
+ */
+ @bindThis
+ public dispose() {
+ if (this.fetchIntervalId) clearInterval(this.fetchIntervalId);
+ for (const c of this.channels.filter(c => c.dispose)) {
+ if (c.dispose) c.dispose();
+ }
+ }
+}