From 6cf466e5d1d423aec8ec06f65644c73c7e9a8ecd Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 15 Sep 2023 14:28:29 +0900 Subject: update deps (#11820) * update deps * fix * wip * wip * wip * Update docker-compose.yml.example * Delete reviewer-lottery.yml * Update RepositoryModule.ts * wip * wip * clean up * update deps * wip * wip --- .../backend/src/server/api/stream/Connection.ts | 299 +++++++++++++++++++++ packages/backend/src/server/api/stream/channel.ts | 2 +- .../src/server/api/stream/channels/user-list.ts | 2 +- packages/backend/src/server/api/stream/index.ts | 299 --------------------- packages/backend/src/server/api/stream/types.ts | 2 +- 5 files changed, 302 insertions(+), 302 deletions(-) create mode 100644 packages/backend/src/server/api/stream/Connection.ts delete mode 100644 packages/backend/src/server/api/stream/index.ts (limited to 'packages/backend/src/server/api/stream') diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts new file mode 100644 index 0000000000..9ada6c559e --- /dev/null +++ b/packages/backend/src/server/api/stream/Connection.ts @@ -0,0 +1,299 @@ +/* + * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import * as WebSocket from 'ws'; +import type { MiUser } from '@/models/entities/User.js'; +import type { MiAccessToken } from '@/models/entities/AccessToken.js'; +import type { Packed } from '@/misc/json-schema.js'; +import type { NoteReadService } from '@/core/NoteReadService.js'; +import type { NotificationService } from '@/core/NotificationService.js'; +import { bindThis } from '@/decorators.js'; +import { CacheService } from '@/core/CacheService.js'; +import { MiUserProfile } from '@/models/_.js'; +import type { ChannelsService } from './ChannelsService.js'; +import type { EventEmitter } from 'events'; +import type Channel from './channel.js'; +import type { StreamEventEmitter, StreamMessages } from './types.js'; + +/** + * Main stream connection + */ +// eslint-disable-next-line import/no-default-export +export default class Connection { + public user?: MiUser; + public token?: MiAccessToken; + private wsConnection: WebSocket.WebSocket; + public subscriber: StreamEventEmitter; + private channels: Channel[] = []; + private subscribingNotes: any = {}; + private cachedNotes: Packed<'Note'>[] = []; + public userProfile: MiUserProfile | null = null; + public following: Set = new Set(); + public followingChannels: Set = new Set(); + public userIdsWhoMeMuting: Set = new Set(); + public userIdsWhoBlockingMe: Set = new Set(); + public userIdsWhoMeMutingRenotes: Set = new Set(); + private fetchIntervalId: NodeJS.Timeout | null = null; + + constructor( + private channelsService: ChannelsService, + private noteReadService: NoteReadService, + private notificationService: NotificationService, + private cacheService: CacheService, + + user: MiUser | null | undefined, + token: MiAccessToken | null | undefined, + ) { + if (user) this.user = user; + if (token) this.token = token; + } + + @bindThis + public async fetch() { + if (this.user == null) return; + const [userProfile, following, followingChannels, userIdsWhoMeMuting, userIdsWhoBlockingMe, userIdsWhoMeMutingRenotes] = await Promise.all([ + this.cacheService.userProfileCache.fetch(this.user.id), + this.cacheService.userFollowingsCache.fetch(this.user.id), + this.cacheService.userFollowingChannelsCache.fetch(this.user.id), + this.cacheService.userMutingsCache.fetch(this.user.id), + this.cacheService.userBlockedCache.fetch(this.user.id), + this.cacheService.renoteMutingsCache.fetch(this.user.id), + ]); + this.userProfile = userProfile; + this.following = following; + this.followingChannels = followingChannels; + this.userIdsWhoMeMuting = userIdsWhoMeMuting; + this.userIdsWhoBlockingMe = userIdsWhoBlockingMe; + this.userIdsWhoMeMutingRenotes = userIdsWhoMeMutingRenotes; + } + + @bindThis + public async init() { + if (this.user != null) { + await this.fetch(); + + if (!this.fetchIntervalId) { + this.fetchIntervalId = setInterval(this.fetch, 1000 * 10); + } + } + } + + @bindThis + public async listen(subscriber: EventEmitter, wsConnection: WebSocket.WebSocket) { + this.subscriber = subscriber; + + this.wsConnection = wsConnection; + this.wsConnection.on('message', this.onWsConnectionMessage); + + this.subscriber.on('broadcast', data => { + this.onBroadcastMessage(data); + }); + } + + /** + * クライアントからメッセージ受信時 + */ + @bindThis + private async onWsConnectionMessage(data: WebSocket.RawData) { + let obj: Record; + + try { + obj = JSON.parse(data.toString()); + } catch (e) { + return; + } + + const { type, body } = obj; + + switch (type) { + case 'readNotification': this.onReadNotification(body); break; + case 'subNote': this.onSubscribeNote(body); break; + case 's': this.onSubscribeNote(body); break; // alias + case 'sr': this.onSubscribeNote(body); this.readNote(body); break; + case 'unsubNote': this.onUnsubscribeNote(body); break; + case 'un': this.onUnsubscribeNote(body); break; // alias + case 'connect': this.onChannelConnectRequested(body); break; + case 'disconnect': this.onChannelDisconnectRequested(body); break; + case 'channel': this.onChannelMessageRequested(body); break; + case 'ch': this.onChannelMessageRequested(body); break; // alias + } + } + + @bindThis + private onBroadcastMessage(data: StreamMessages['broadcast']['payload']) { + this.sendMessageToWs(data.type, data.body); + } + + @bindThis + public cacheNote(note: Packed<'Note'>) { + const add = (note: Packed<'Note'>) => { + const existIndex = this.cachedNotes.findIndex(n => n.id === note.id); + if (existIndex > -1) { + this.cachedNotes[existIndex] = note; + return; + } + + this.cachedNotes.unshift(note); + if (this.cachedNotes.length > 32) { + this.cachedNotes.splice(32); + } + }; + + add(note); + if (note.reply) add(note.reply); + if (note.renote) add(note.renote); + } + + @bindThis + private readNote(body: any) { + const id = body.id; + + const note = this.cachedNotes.find(n => n.id === id); + if (note == null) return; + + if (this.user && (note.userId !== this.user.id)) { + this.noteReadService.read(this.user.id, [note]); + } + } + + @bindThis + private onReadNotification(payload: any) { + this.notificationService.readAllNotification(this.user!.id); + } + + /** + * 投稿購読要求時 + */ + @bindThis + private onSubscribeNote(payload: any) { + if (!payload.id) return; + + if (this.subscribingNotes[payload.id] == null) { + this.subscribingNotes[payload.id] = 0; + } + + this.subscribingNotes[payload.id]++; + + if (this.subscribingNotes[payload.id] === 1) { + this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage); + } + } + + /** + * 投稿購読解除要求時 + */ + @bindThis + private onUnsubscribeNote(payload: any) { + if (!payload.id) return; + + this.subscribingNotes[payload.id]--; + if (this.subscribingNotes[payload.id] <= 0) { + delete this.subscribingNotes[payload.id]; + this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage); + } + } + + @bindThis + private async onNoteStreamMessage(data: StreamMessages['note']['payload']) { + this.sendMessageToWs('noteUpdated', { + id: data.body.id, + type: data.type, + body: data.body.body, + }); + } + + /** + * チャンネル接続要求時 + */ + @bindThis + private onChannelConnectRequested(payload: any) { + const { channel, id, params, pong } = payload; + this.connectChannel(id, params, channel, pong); + } + + /** + * チャンネル切断要求時 + */ + @bindThis + private onChannelDisconnectRequested(payload: any) { + const { id } = payload; + this.disconnectChannel(id); + } + + /** + * クライアントにメッセージ送信 + */ + @bindThis + public sendMessageToWs(type: string, payload: any) { + this.wsConnection.send(JSON.stringify({ + type: type, + body: payload, + })); + } + + /** + * チャンネルに接続 + */ + @bindThis + public connectChannel(id: string, params: any, channel: string, pong = false) { + const channelService = this.channelsService.getChannelService(channel); + + if (channelService.requireCredential && this.user == null) { + return; + } + + // 共有可能チャンネルに接続しようとしていて、かつそのチャンネルに既に接続していたら無意味なので無視 + if (channelService.shouldShare && this.channels.some(c => c.chName === channel)) { + return; + } + + const ch: Channel = channelService.create(id, this); + this.channels.push(ch); + ch.init(params ?? {}); + + if (pong) { + this.sendMessageToWs('connected', { + id: id, + }); + } + } + + /** + * チャンネルから切断 + * @param id チャンネルコネクションID + */ + @bindThis + public disconnectChannel(id: string) { + const channel = this.channels.find(c => c.id === id); + + if (channel) { + if (channel.dispose) channel.dispose(); + this.channels = this.channels.filter(c => c.id !== id); + } + } + + /** + * チャンネルへメッセージ送信要求時 + * @param data メッセージ + */ + @bindThis + private onChannelMessageRequested(data: any) { + const channel = this.channels.find(c => c.id === data.id); + if (channel != null && channel.onMessage != null) { + channel.onMessage(data.type, data.body); + } + } + + /** + * ストリームが切れたとき + */ + @bindThis + public dispose() { + if (this.fetchIntervalId) clearInterval(this.fetchIntervalId); + for (const c of this.channels.filter(c => c.dispose)) { + if (c.dispose) c.dispose(); + } + } +} diff --git a/packages/backend/src/server/api/stream/channel.ts b/packages/backend/src/server/api/stream/channel.ts index 93c673838f..ad32d08fee 100644 --- a/packages/backend/src/server/api/stream/channel.ts +++ b/packages/backend/src/server/api/stream/channel.ts @@ -4,7 +4,7 @@ */ import { bindThis } from '@/decorators.js'; -import type Connection from './index.js'; +import type Connection from './Connection.js'; /** * Stream channel diff --git a/packages/backend/src/server/api/stream/channels/user-list.ts b/packages/backend/src/server/api/stream/channels/user-list.ts index f7001e41cd..051b0bf502 100644 --- a/packages/backend/src/server/api/stream/channels/user-list.ts +++ b/packages/backend/src/server/api/stream/channels/user-list.ts @@ -4,7 +4,7 @@ */ import { Inject, Injectable } from '@nestjs/common'; -import type { UserListJoiningsRepository, UserListsRepository } from '@/models/index.js'; +import type { UserListJoiningsRepository, UserListsRepository } from '@/models/_.js'; import type { MiUser } from '@/models/entities/User.js'; import { isUserRelated } from '@/misc/is-user-related.js'; import type { Packed } from '@/misc/json-schema.js'; diff --git a/packages/backend/src/server/api/stream/index.ts b/packages/backend/src/server/api/stream/index.ts deleted file mode 100644 index 232ec5700d..0000000000 --- a/packages/backend/src/server/api/stream/index.ts +++ /dev/null @@ -1,299 +0,0 @@ -/* - * SPDX-FileCopyrightText: syuilo and other misskey contributors - * SPDX-License-Identifier: AGPL-3.0-only - */ - -import * as WebSocket from 'ws'; -import type { MiUser } from '@/models/entities/User.js'; -import type { MiAccessToken } from '@/models/entities/AccessToken.js'; -import type { Packed } from '@/misc/json-schema.js'; -import type { NoteReadService } from '@/core/NoteReadService.js'; -import type { NotificationService } from '@/core/NotificationService.js'; -import { bindThis } from '@/decorators.js'; -import { CacheService } from '@/core/CacheService.js'; -import { MiUserProfile } from '@/models/index.js'; -import type { ChannelsService } from './ChannelsService.js'; -import type { EventEmitter } from 'events'; -import type Channel from './channel.js'; -import type { StreamEventEmitter, StreamMessages } from './types.js'; - -/** - * Main stream connection - */ -// eslint-disable-next-line import/no-default-export -export default class Connection { - public user?: MiUser; - public token?: MiAccessToken; - private wsConnection: WebSocket.WebSocket; - public subscriber: StreamEventEmitter; - private channels: Channel[] = []; - private subscribingNotes: any = {}; - private cachedNotes: Packed<'Note'>[] = []; - public userProfile: MiUserProfile | null = null; - public following: Set = new Set(); - public followingChannels: Set = new Set(); - public userIdsWhoMeMuting: Set = new Set(); - public userIdsWhoBlockingMe: Set = new Set(); - public userIdsWhoMeMutingRenotes: Set = new Set(); - private fetchIntervalId: NodeJS.Timeout | null = null; - - constructor( - private channelsService: ChannelsService, - private noteReadService: NoteReadService, - private notificationService: NotificationService, - private cacheService: CacheService, - - user: MiUser | null | undefined, - token: MiAccessToken | null | undefined, - ) { - if (user) this.user = user; - if (token) this.token = token; - } - - @bindThis - public async fetch() { - if (this.user == null) return; - const [userProfile, following, followingChannels, userIdsWhoMeMuting, userIdsWhoBlockingMe, userIdsWhoMeMutingRenotes] = await Promise.all([ - this.cacheService.userProfileCache.fetch(this.user.id), - this.cacheService.userFollowingsCache.fetch(this.user.id), - this.cacheService.userFollowingChannelsCache.fetch(this.user.id), - this.cacheService.userMutingsCache.fetch(this.user.id), - this.cacheService.userBlockedCache.fetch(this.user.id), - this.cacheService.renoteMutingsCache.fetch(this.user.id), - ]); - this.userProfile = userProfile; - this.following = following; - this.followingChannels = followingChannels; - this.userIdsWhoMeMuting = userIdsWhoMeMuting; - this.userIdsWhoBlockingMe = userIdsWhoBlockingMe; - this.userIdsWhoMeMutingRenotes = userIdsWhoMeMutingRenotes; - } - - @bindThis - public async init() { - if (this.user != null) { - await this.fetch(); - - if (!this.fetchIntervalId) { - this.fetchIntervalId = setInterval(this.fetch, 1000 * 10); - } - } - } - - @bindThis - public async listen(subscriber: EventEmitter, wsConnection: WebSocket.WebSocket) { - this.subscriber = subscriber; - - this.wsConnection = wsConnection; - this.wsConnection.on('message', this.onWsConnectionMessage); - - this.subscriber.on('broadcast', data => { - this.onBroadcastMessage(data); - }); - } - - /** - * クライアントからメッセージ受信時 - */ - @bindThis - private async onWsConnectionMessage(data: WebSocket.RawData) { - let obj: Record; - - try { - obj = JSON.parse(data.toString()); - } catch (e) { - return; - } - - const { type, body } = obj; - - switch (type) { - case 'readNotification': this.onReadNotification(body); break; - case 'subNote': this.onSubscribeNote(body); break; - case 's': this.onSubscribeNote(body); break; // alias - case 'sr': this.onSubscribeNote(body); this.readNote(body); break; - case 'unsubNote': this.onUnsubscribeNote(body); break; - case 'un': this.onUnsubscribeNote(body); break; // alias - case 'connect': this.onChannelConnectRequested(body); break; - case 'disconnect': this.onChannelDisconnectRequested(body); break; - case 'channel': this.onChannelMessageRequested(body); break; - case 'ch': this.onChannelMessageRequested(body); break; // alias - } - } - - @bindThis - private onBroadcastMessage(data: StreamMessages['broadcast']['payload']) { - this.sendMessageToWs(data.type, data.body); - } - - @bindThis - public cacheNote(note: Packed<'Note'>) { - const add = (note: Packed<'Note'>) => { - const existIndex = this.cachedNotes.findIndex(n => n.id === note.id); - if (existIndex > -1) { - this.cachedNotes[existIndex] = note; - return; - } - - this.cachedNotes.unshift(note); - if (this.cachedNotes.length > 32) { - this.cachedNotes.splice(32); - } - }; - - add(note); - if (note.reply) add(note.reply); - if (note.renote) add(note.renote); - } - - @bindThis - private readNote(body: any) { - const id = body.id; - - const note = this.cachedNotes.find(n => n.id === id); - if (note == null) return; - - if (this.user && (note.userId !== this.user.id)) { - this.noteReadService.read(this.user.id, [note]); - } - } - - @bindThis - private onReadNotification(payload: any) { - this.notificationService.readAllNotification(this.user!.id); - } - - /** - * 投稿購読要求時 - */ - @bindThis - private onSubscribeNote(payload: any) { - if (!payload.id) return; - - if (this.subscribingNotes[payload.id] == null) { - this.subscribingNotes[payload.id] = 0; - } - - this.subscribingNotes[payload.id]++; - - if (this.subscribingNotes[payload.id] === 1) { - this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage); - } - } - - /** - * 投稿購読解除要求時 - */ - @bindThis - private onUnsubscribeNote(payload: any) { - if (!payload.id) return; - - this.subscribingNotes[payload.id]--; - if (this.subscribingNotes[payload.id] <= 0) { - delete this.subscribingNotes[payload.id]; - this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage); - } - } - - @bindThis - private async onNoteStreamMessage(data: StreamMessages['note']['payload']) { - this.sendMessageToWs('noteUpdated', { - id: data.body.id, - type: data.type, - body: data.body.body, - }); - } - - /** - * チャンネル接続要求時 - */ - @bindThis - private onChannelConnectRequested(payload: any) { - const { channel, id, params, pong } = payload; - this.connectChannel(id, params, channel, pong); - } - - /** - * チャンネル切断要求時 - */ - @bindThis - private onChannelDisconnectRequested(payload: any) { - const { id } = payload; - this.disconnectChannel(id); - } - - /** - * クライアントにメッセージ送信 - */ - @bindThis - public sendMessageToWs(type: string, payload: any) { - this.wsConnection.send(JSON.stringify({ - type: type, - body: payload, - })); - } - - /** - * チャンネルに接続 - */ - @bindThis - public connectChannel(id: string, params: any, channel: string, pong = false) { - const channelService = this.channelsService.getChannelService(channel); - - if (channelService.requireCredential && this.user == null) { - return; - } - - // 共有可能チャンネルに接続しようとしていて、かつそのチャンネルに既に接続していたら無意味なので無視 - if (channelService.shouldShare && this.channels.some(c => c.chName === channel)) { - return; - } - - const ch: Channel = channelService.create(id, this); - this.channels.push(ch); - ch.init(params ?? {}); - - if (pong) { - this.sendMessageToWs('connected', { - id: id, - }); - } - } - - /** - * チャンネルから切断 - * @param id チャンネルコネクションID - */ - @bindThis - public disconnectChannel(id: string) { - const channel = this.channels.find(c => c.id === id); - - if (channel) { - if (channel.dispose) channel.dispose(); - this.channels = this.channels.filter(c => c.id !== id); - } - } - - /** - * チャンネルへメッセージ送信要求時 - * @param data メッセージ - */ - @bindThis - private onChannelMessageRequested(data: any) { - const channel = this.channels.find(c => c.id === data.id); - if (channel != null && channel.onMessage != null) { - channel.onMessage(data.type, data.body); - } - } - - /** - * ストリームが切れたとき - */ - @bindThis - public dispose() { - if (this.fetchIntervalId) clearInterval(this.fetchIntervalId); - for (const c of this.channels.filter(c => c.dispose)) { - if (c.dispose) c.dispose(); - } - } -} diff --git a/packages/backend/src/server/api/stream/types.ts b/packages/backend/src/server/api/stream/types.ts index 531c00bdf6..c8e168519a 100644 --- a/packages/backend/src/server/api/stream/types.ts +++ b/packages/backend/src/server/api/stream/types.ts @@ -17,7 +17,7 @@ import type { MiPage } from '@/models/entities/Page.js'; import type { Packed } from '@/misc/json-schema.js'; import type { MiWebhook } from '@/models/entities/Webhook.js'; import type { MiMeta } from '@/models/entities/Meta.js'; -import { MiRole, MiRoleAssignment } from '@/models/index.js'; +import { MiRole, MiRoleAssignment } from '@/models/_.js'; import type Emitter from 'strict-event-emitter-types'; import type { EventEmitter } from 'events'; -- cgit v1.2.3-freya