diff options
| author | syuilo <Syuilotan@yahoo.co.jp> | 2023-09-15 14:28:29 +0900 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-09-15 14:28:29 +0900 |
| commit | 6cf466e5d1d423aec8ec06f65644c73c7e9a8ecd (patch) | |
| tree | b3623e27ae94471fa4d58dda9e9296b21afbd20e /packages/backend/src/server/api/stream/Connection.ts | |
| parent | enhance nodeinfo by prpoagate the standart keys for homepage and repository (... (diff) | |
| download | sharkey-6cf466e5d1d423aec8ec06f65644c73c7e9a8ecd.tar.gz sharkey-6cf466e5d1d423aec8ec06f65644c73c7e9a8ecd.tar.bz2 sharkey-6cf466e5d1d423aec8ec06f65644c73c7e9a8ecd.zip | |
update deps (#11820)
* update deps
* fix
* wip
* wip
* wip
* Update docker-compose.yml.example
* Delete reviewer-lottery.yml
* Update RepositoryModule.ts
* wip
* wip
* clean up
* update deps
* wip
* wip
Diffstat (limited to 'packages/backend/src/server/api/stream/Connection.ts')
| -rw-r--r-- | packages/backend/src/server/api/stream/Connection.ts | 299 |
1 files changed, 299 insertions, 0 deletions
diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts new file mode 100644 index 0000000000..9ada6c559e --- /dev/null +++ b/packages/backend/src/server/api/stream/Connection.ts @@ -0,0 +1,299 @@ +/* + * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import * as WebSocket from 'ws'; +import type { MiUser } from '@/models/entities/User.js'; +import type { MiAccessToken } from '@/models/entities/AccessToken.js'; +import type { Packed } from '@/misc/json-schema.js'; +import type { NoteReadService } from '@/core/NoteReadService.js'; +import type { NotificationService } from '@/core/NotificationService.js'; +import { bindThis } from '@/decorators.js'; +import { CacheService } from '@/core/CacheService.js'; +import { MiUserProfile } from '@/models/_.js'; +import type { ChannelsService } from './ChannelsService.js'; +import type { EventEmitter } from 'events'; +import type Channel from './channel.js'; +import type { StreamEventEmitter, StreamMessages } from './types.js'; + +/** + * Main stream connection + */ +// eslint-disable-next-line import/no-default-export +export default class Connection { + public user?: MiUser; + public token?: MiAccessToken; + private wsConnection: WebSocket.WebSocket; + public subscriber: StreamEventEmitter; + private channels: Channel[] = []; + private subscribingNotes: any = {}; + private cachedNotes: Packed<'Note'>[] = []; + public userProfile: MiUserProfile | null = null; + public following: Set<string> = new Set(); + public followingChannels: Set<string> = new Set(); + public userIdsWhoMeMuting: Set<string> = new Set(); + public userIdsWhoBlockingMe: Set<string> = new Set(); + public userIdsWhoMeMutingRenotes: Set<string> = new Set(); + private fetchIntervalId: NodeJS.Timeout | null = null; + + constructor( + private channelsService: ChannelsService, + private noteReadService: NoteReadService, + private notificationService: NotificationService, + private cacheService: CacheService, + + user: MiUser | null | undefined, + token: MiAccessToken | null | undefined, + ) { + if (user) this.user = user; + if (token) this.token = token; + } + + @bindThis + public async fetch() { + if (this.user == null) return; + const [userProfile, following, followingChannels, userIdsWhoMeMuting, userIdsWhoBlockingMe, userIdsWhoMeMutingRenotes] = await Promise.all([ + this.cacheService.userProfileCache.fetch(this.user.id), + this.cacheService.userFollowingsCache.fetch(this.user.id), + this.cacheService.userFollowingChannelsCache.fetch(this.user.id), + this.cacheService.userMutingsCache.fetch(this.user.id), + this.cacheService.userBlockedCache.fetch(this.user.id), + this.cacheService.renoteMutingsCache.fetch(this.user.id), + ]); + this.userProfile = userProfile; + this.following = following; + this.followingChannels = followingChannels; + this.userIdsWhoMeMuting = userIdsWhoMeMuting; + this.userIdsWhoBlockingMe = userIdsWhoBlockingMe; + this.userIdsWhoMeMutingRenotes = userIdsWhoMeMutingRenotes; + } + + @bindThis + public async init() { + if (this.user != null) { + await this.fetch(); + + if (!this.fetchIntervalId) { + this.fetchIntervalId = setInterval(this.fetch, 1000 * 10); + } + } + } + + @bindThis + public async listen(subscriber: EventEmitter, wsConnection: WebSocket.WebSocket) { + this.subscriber = subscriber; + + this.wsConnection = wsConnection; + this.wsConnection.on('message', this.onWsConnectionMessage); + + this.subscriber.on('broadcast', data => { + this.onBroadcastMessage(data); + }); + } + + /** + * クライアントからメッセージ受信時 + */ + @bindThis + private async onWsConnectionMessage(data: WebSocket.RawData) { + let obj: Record<string, any>; + + try { + obj = JSON.parse(data.toString()); + } catch (e) { + return; + } + + const { type, body } = obj; + + switch (type) { + case 'readNotification': this.onReadNotification(body); break; + case 'subNote': this.onSubscribeNote(body); break; + case 's': this.onSubscribeNote(body); break; // alias + case 'sr': this.onSubscribeNote(body); this.readNote(body); break; + case 'unsubNote': this.onUnsubscribeNote(body); break; + case 'un': this.onUnsubscribeNote(body); break; // alias + case 'connect': this.onChannelConnectRequested(body); break; + case 'disconnect': this.onChannelDisconnectRequested(body); break; + case 'channel': this.onChannelMessageRequested(body); break; + case 'ch': this.onChannelMessageRequested(body); break; // alias + } + } + + @bindThis + private onBroadcastMessage(data: StreamMessages['broadcast']['payload']) { + this.sendMessageToWs(data.type, data.body); + } + + @bindThis + public cacheNote(note: Packed<'Note'>) { + const add = (note: Packed<'Note'>) => { + const existIndex = this.cachedNotes.findIndex(n => n.id === note.id); + if (existIndex > -1) { + this.cachedNotes[existIndex] = note; + return; + } + + this.cachedNotes.unshift(note); + if (this.cachedNotes.length > 32) { + this.cachedNotes.splice(32); + } + }; + + add(note); + if (note.reply) add(note.reply); + if (note.renote) add(note.renote); + } + + @bindThis + private readNote(body: any) { + const id = body.id; + + const note = this.cachedNotes.find(n => n.id === id); + if (note == null) return; + + if (this.user && (note.userId !== this.user.id)) { + this.noteReadService.read(this.user.id, [note]); + } + } + + @bindThis + private onReadNotification(payload: any) { + this.notificationService.readAllNotification(this.user!.id); + } + + /** + * 投稿購読要求時 + */ + @bindThis + private onSubscribeNote(payload: any) { + if (!payload.id) return; + + if (this.subscribingNotes[payload.id] == null) { + this.subscribingNotes[payload.id] = 0; + } + + this.subscribingNotes[payload.id]++; + + if (this.subscribingNotes[payload.id] === 1) { + this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage); + } + } + + /** + * 投稿購読解除要求時 + */ + @bindThis + private onUnsubscribeNote(payload: any) { + if (!payload.id) return; + + this.subscribingNotes[payload.id]--; + if (this.subscribingNotes[payload.id] <= 0) { + delete this.subscribingNotes[payload.id]; + this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage); + } + } + + @bindThis + private async onNoteStreamMessage(data: StreamMessages['note']['payload']) { + this.sendMessageToWs('noteUpdated', { + id: data.body.id, + type: data.type, + body: data.body.body, + }); + } + + /** + * チャンネル接続要求時 + */ + @bindThis + private onChannelConnectRequested(payload: any) { + const { channel, id, params, pong } = payload; + this.connectChannel(id, params, channel, pong); + } + + /** + * チャンネル切断要求時 + */ + @bindThis + private onChannelDisconnectRequested(payload: any) { + const { id } = payload; + this.disconnectChannel(id); + } + + /** + * クライアントにメッセージ送信 + */ + @bindThis + public sendMessageToWs(type: string, payload: any) { + this.wsConnection.send(JSON.stringify({ + type: type, + body: payload, + })); + } + + /** + * チャンネルに接続 + */ + @bindThis + public connectChannel(id: string, params: any, channel: string, pong = false) { + const channelService = this.channelsService.getChannelService(channel); + + if (channelService.requireCredential && this.user == null) { + return; + } + + // 共有可能チャンネルに接続しようとしていて、かつそのチャンネルに既に接続していたら無意味なので無視 + if (channelService.shouldShare && this.channels.some(c => c.chName === channel)) { + return; + } + + const ch: Channel = channelService.create(id, this); + this.channels.push(ch); + ch.init(params ?? {}); + + if (pong) { + this.sendMessageToWs('connected', { + id: id, + }); + } + } + + /** + * チャンネルから切断 + * @param id チャンネルコネクションID + */ + @bindThis + public disconnectChannel(id: string) { + const channel = this.channels.find(c => c.id === id); + + if (channel) { + if (channel.dispose) channel.dispose(); + this.channels = this.channels.filter(c => c.id !== id); + } + } + + /** + * チャンネルへメッセージ送信要求時 + * @param data メッセージ + */ + @bindThis + private onChannelMessageRequested(data: any) { + const channel = this.channels.find(c => c.id === data.id); + if (channel != null && channel.onMessage != null) { + channel.onMessage(data.type, data.body); + } + } + + /** + * ストリームが切れたとき + */ + @bindThis + public dispose() { + if (this.fetchIntervalId) clearInterval(this.fetchIntervalId); + for (const c of this.channels.filter(c => c.dispose)) { + if (c.dispose) c.dispose(); + } + } +} |