diff options
| author | syuilo <Syuilotan@yahoo.co.jp> | 2023-09-15 14:28:29 +0900 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-09-15 14:28:29 +0900 |
| commit | 6cf466e5d1d423aec8ec06f65644c73c7e9a8ecd (patch) | |
| tree | b3623e27ae94471fa4d58dda9e9296b21afbd20e /packages/backend/src/server/api/stream/index.ts | |
| parent | enhance nodeinfo by prpoagate the standart keys for homepage and repository (... (diff) | |
| download | sharkey-6cf466e5d1d423aec8ec06f65644c73c7e9a8ecd.tar.gz sharkey-6cf466e5d1d423aec8ec06f65644c73c7e9a8ecd.tar.bz2 sharkey-6cf466e5d1d423aec8ec06f65644c73c7e9a8ecd.zip | |
update deps (#11820)
* update deps
* fix
* wip
* wip
* wip
* Update docker-compose.yml.example
* Delete reviewer-lottery.yml
* Update RepositoryModule.ts
* wip
* wip
* clean up
* update deps
* wip
* wip
Diffstat (limited to 'packages/backend/src/server/api/stream/index.ts')
| -rw-r--r-- | packages/backend/src/server/api/stream/index.ts | 299 |
1 files changed, 0 insertions, 299 deletions
diff --git a/packages/backend/src/server/api/stream/index.ts b/packages/backend/src/server/api/stream/index.ts deleted file mode 100644 index 232ec5700d..0000000000 --- a/packages/backend/src/server/api/stream/index.ts +++ /dev/null @@ -1,299 +0,0 @@ -/* - * SPDX-FileCopyrightText: syuilo and other misskey contributors - * SPDX-License-Identifier: AGPL-3.0-only - */ - -import * as WebSocket from 'ws'; -import type { MiUser } from '@/models/entities/User.js'; -import type { MiAccessToken } from '@/models/entities/AccessToken.js'; -import type { Packed } from '@/misc/json-schema.js'; -import type { NoteReadService } from '@/core/NoteReadService.js'; -import type { NotificationService } from '@/core/NotificationService.js'; -import { bindThis } from '@/decorators.js'; -import { CacheService } from '@/core/CacheService.js'; -import { MiUserProfile } from '@/models/index.js'; -import type { ChannelsService } from './ChannelsService.js'; -import type { EventEmitter } from 'events'; -import type Channel from './channel.js'; -import type { StreamEventEmitter, StreamMessages } from './types.js'; - -/** - * Main stream connection - */ -// eslint-disable-next-line import/no-default-export -export default class Connection { - public user?: MiUser; - public token?: MiAccessToken; - private wsConnection: WebSocket.WebSocket; - public subscriber: StreamEventEmitter; - private channels: Channel[] = []; - private subscribingNotes: any = {}; - private cachedNotes: Packed<'Note'>[] = []; - public userProfile: MiUserProfile | null = null; - public following: Set<string> = new Set(); - public followingChannels: Set<string> = new Set(); - public userIdsWhoMeMuting: Set<string> = new Set(); - public userIdsWhoBlockingMe: Set<string> = new Set(); - public userIdsWhoMeMutingRenotes: Set<string> = new Set(); - private fetchIntervalId: NodeJS.Timeout | null = null; - - constructor( - private channelsService: ChannelsService, - private noteReadService: NoteReadService, - private notificationService: NotificationService, - private cacheService: CacheService, - - user: MiUser | null | undefined, - token: MiAccessToken | null | undefined, - ) { - if (user) this.user = user; - if (token) this.token = token; - } - - @bindThis - public async fetch() { - if (this.user == null) return; - const [userProfile, following, followingChannels, userIdsWhoMeMuting, userIdsWhoBlockingMe, userIdsWhoMeMutingRenotes] = await Promise.all([ - this.cacheService.userProfileCache.fetch(this.user.id), - this.cacheService.userFollowingsCache.fetch(this.user.id), - this.cacheService.userFollowingChannelsCache.fetch(this.user.id), - this.cacheService.userMutingsCache.fetch(this.user.id), - this.cacheService.userBlockedCache.fetch(this.user.id), - this.cacheService.renoteMutingsCache.fetch(this.user.id), - ]); - this.userProfile = userProfile; - this.following = following; - this.followingChannels = followingChannels; - this.userIdsWhoMeMuting = userIdsWhoMeMuting; - this.userIdsWhoBlockingMe = userIdsWhoBlockingMe; - this.userIdsWhoMeMutingRenotes = userIdsWhoMeMutingRenotes; - } - - @bindThis - public async init() { - if (this.user != null) { - await this.fetch(); - - if (!this.fetchIntervalId) { - this.fetchIntervalId = setInterval(this.fetch, 1000 * 10); - } - } - } - - @bindThis - public async listen(subscriber: EventEmitter, wsConnection: WebSocket.WebSocket) { - this.subscriber = subscriber; - - this.wsConnection = wsConnection; - this.wsConnection.on('message', this.onWsConnectionMessage); - - this.subscriber.on('broadcast', data => { - this.onBroadcastMessage(data); - }); - } - - /** - * クライアントからメッセージ受信時 - */ - @bindThis - private async onWsConnectionMessage(data: WebSocket.RawData) { - let obj: Record<string, any>; - - try { - obj = JSON.parse(data.toString()); - } catch (e) { - return; - } - - const { type, body } = obj; - - switch (type) { - case 'readNotification': this.onReadNotification(body); break; - case 'subNote': this.onSubscribeNote(body); break; - case 's': this.onSubscribeNote(body); break; // alias - case 'sr': this.onSubscribeNote(body); this.readNote(body); break; - case 'unsubNote': this.onUnsubscribeNote(body); break; - case 'un': this.onUnsubscribeNote(body); break; // alias - case 'connect': this.onChannelConnectRequested(body); break; - case 'disconnect': this.onChannelDisconnectRequested(body); break; - case 'channel': this.onChannelMessageRequested(body); break; - case 'ch': this.onChannelMessageRequested(body); break; // alias - } - } - - @bindThis - private onBroadcastMessage(data: StreamMessages['broadcast']['payload']) { - this.sendMessageToWs(data.type, data.body); - } - - @bindThis - public cacheNote(note: Packed<'Note'>) { - const add = (note: Packed<'Note'>) => { - const existIndex = this.cachedNotes.findIndex(n => n.id === note.id); - if (existIndex > -1) { - this.cachedNotes[existIndex] = note; - return; - } - - this.cachedNotes.unshift(note); - if (this.cachedNotes.length > 32) { - this.cachedNotes.splice(32); - } - }; - - add(note); - if (note.reply) add(note.reply); - if (note.renote) add(note.renote); - } - - @bindThis - private readNote(body: any) { - const id = body.id; - - const note = this.cachedNotes.find(n => n.id === id); - if (note == null) return; - - if (this.user && (note.userId !== this.user.id)) { - this.noteReadService.read(this.user.id, [note]); - } - } - - @bindThis - private onReadNotification(payload: any) { - this.notificationService.readAllNotification(this.user!.id); - } - - /** - * 投稿購読要求時 - */ - @bindThis - private onSubscribeNote(payload: any) { - if (!payload.id) return; - - if (this.subscribingNotes[payload.id] == null) { - this.subscribingNotes[payload.id] = 0; - } - - this.subscribingNotes[payload.id]++; - - if (this.subscribingNotes[payload.id] === 1) { - this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage); - } - } - - /** - * 投稿購読解除要求時 - */ - @bindThis - private onUnsubscribeNote(payload: any) { - if (!payload.id) return; - - this.subscribingNotes[payload.id]--; - if (this.subscribingNotes[payload.id] <= 0) { - delete this.subscribingNotes[payload.id]; - this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage); - } - } - - @bindThis - private async onNoteStreamMessage(data: StreamMessages['note']['payload']) { - this.sendMessageToWs('noteUpdated', { - id: data.body.id, - type: data.type, - body: data.body.body, - }); - } - - /** - * チャンネル接続要求時 - */ - @bindThis - private onChannelConnectRequested(payload: any) { - const { channel, id, params, pong } = payload; - this.connectChannel(id, params, channel, pong); - } - - /** - * チャンネル切断要求時 - */ - @bindThis - private onChannelDisconnectRequested(payload: any) { - const { id } = payload; - this.disconnectChannel(id); - } - - /** - * クライアントにメッセージ送信 - */ - @bindThis - public sendMessageToWs(type: string, payload: any) { - this.wsConnection.send(JSON.stringify({ - type: type, - body: payload, - })); - } - - /** - * チャンネルに接続 - */ - @bindThis - public connectChannel(id: string, params: any, channel: string, pong = false) { - const channelService = this.channelsService.getChannelService(channel); - - if (channelService.requireCredential && this.user == null) { - return; - } - - // 共有可能チャンネルに接続しようとしていて、かつそのチャンネルに既に接続していたら無意味なので無視 - if (channelService.shouldShare && this.channels.some(c => c.chName === channel)) { - return; - } - - const ch: Channel = channelService.create(id, this); - this.channels.push(ch); - ch.init(params ?? {}); - - if (pong) { - this.sendMessageToWs('connected', { - id: id, - }); - } - } - - /** - * チャンネルから切断 - * @param id チャンネルコネクションID - */ - @bindThis - public disconnectChannel(id: string) { - const channel = this.channels.find(c => c.id === id); - - if (channel) { - if (channel.dispose) channel.dispose(); - this.channels = this.channels.filter(c => c.id !== id); - } - } - - /** - * チャンネルへメッセージ送信要求時 - * @param data メッセージ - */ - @bindThis - private onChannelMessageRequested(data: any) { - const channel = this.channels.find(c => c.id === data.id); - if (channel != null && channel.onMessage != null) { - channel.onMessage(data.type, data.body); - } - } - - /** - * ストリームが切れたとき - */ - @bindThis - public dispose() { - if (this.fetchIntervalId) clearInterval(this.fetchIntervalId); - for (const c of this.channels.filter(c => c.dispose)) { - if (c.dispose) c.dispose(); - } - } -} |