diff options
| author | syuilo <Syuilotan@yahoo.co.jp> | 2023-01-16 15:21:43 +0900 |
|---|---|---|
| committer | syuilo <Syuilotan@yahoo.co.jp> | 2023-01-16 15:21:43 +0900 |
| commit | d56fc4186529bf41fe840cb3497f1a363ac84475 (patch) | |
| tree | e4da77fc7544fb8d5619e9799ee9ce3494ccd80b /packages/backend/src/server/api/stream | |
| parent | masterブランチをmaster_securityとマージ (#9260) (diff) | |
| parent | 13.0.0 (diff) | |
| download | misskey-d56fc4186529bf41fe840cb3497f1a363ac84475.tar.gz misskey-d56fc4186529bf41fe840cb3497f1a363ac84475.tar.bz2 misskey-d56fc4186529bf41fe840cb3497f1a363ac84475.zip | |
Merge branch 'develop'
Diffstat (limited to 'packages/backend/src/server/api/stream')
20 files changed, 777 insertions, 215 deletions
diff --git a/packages/backend/src/server/api/stream/ChannelsService.ts b/packages/backend/src/server/api/stream/ChannelsService.ts new file mode 100644 index 0000000000..198fc190d4 --- /dev/null +++ b/packages/backend/src/server/api/stream/ChannelsService.ts @@ -0,0 +1,64 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { DI } from '@/di-symbols.js'; +import { HybridTimelineChannelService } from './channels/hybrid-timeline.js'; +import { LocalTimelineChannelService } from './channels/local-timeline.js'; +import { HomeTimelineChannelService } from './channels/home-timeline.js'; +import { GlobalTimelineChannelService } from './channels/global-timeline.js'; +import { MainChannelService } from './channels/main.js'; +import { ChannelChannelService } from './channels/channel.js'; +import { AdminChannelService } from './channels/admin.js'; +import { ServerStatsChannelService } from './channels/server-stats.js'; +import { QueueStatsChannelService } from './channels/queue-stats.js'; +import { UserListChannelService } from './channels/user-list.js'; +import { AntennaChannelService } from './channels/antenna.js'; +import { MessagingChannelService } from './channels/messaging.js'; +import { MessagingIndexChannelService } from './channels/messaging-index.js'; +import { DriveChannelService } from './channels/drive.js'; +import { HashtagChannelService } from './channels/hashtag.js'; +import { bindThis } from '@/decorators.js'; + +@Injectable() +export class ChannelsService { + constructor( + private mainChannelService: MainChannelService, + private homeTimelineChannelService: HomeTimelineChannelService, + private localTimelineChannelService: LocalTimelineChannelService, + private hybridTimelineChannelService: HybridTimelineChannelService, + private globalTimelineChannelService: GlobalTimelineChannelService, + private userListChannelService: UserListChannelService, + private hashtagChannelService: HashtagChannelService, + private antennaChannelService: AntennaChannelService, + private channelChannelService: ChannelChannelService, + private messagingChannelService: MessagingChannelService, + private messagingIndexChannelService: MessagingIndexChannelService, + private driveChannelService: DriveChannelService, + private serverStatsChannelService: ServerStatsChannelService, + private queueStatsChannelService: QueueStatsChannelService, + private adminChannelService: AdminChannelService, + ) { + } + + @bindThis + public getChannelService(name: string) { + switch (name) { + case 'main': return this.mainChannelService; + case 'homeTimeline': return this.homeTimelineChannelService; + case 'localTimeline': return this.localTimelineChannelService; + case 'hybridTimeline': return this.hybridTimelineChannelService; + case 'globalTimeline': return this.globalTimelineChannelService; + case 'userList': return this.userListChannelService; + case 'hashtag': return this.hashtagChannelService; + case 'antenna': return this.antennaChannelService; + case 'channel': return this.channelChannelService; + case 'messaging': return this.messagingChannelService; + case 'messagingIndex': return this.messagingIndexChannelService; + case 'drive': return this.driveChannelService; + case 'serverStats': return this.serverStatsChannelService; + case 'queueStats': return this.queueStatsChannelService; + case 'admin': return this.adminChannelService; + + default: + throw new Error(`no such channel: ${name}`); + } + } +} diff --git a/packages/backend/src/server/api/stream/channel.ts b/packages/backend/src/server/api/stream/channel.ts index d2cc5122d5..3e67880b45 100644 --- a/packages/backend/src/server/api/stream/channel.ts +++ b/packages/backend/src/server/api/stream/channel.ts @@ -1,4 +1,5 @@ -import Connection from '.'; +import { bindThis } from '@/decorators.js'; +import type Connection from '.'; /** * Stream channel @@ -43,6 +44,7 @@ export default abstract class Channel { this.connection = connection; } + @bindThis public send(typeOrPayload: any, payload?: any) { const type = payload === undefined ? typeOrPayload.type : typeOrPayload; const body = payload === undefined ? typeOrPayload.body : payload; diff --git a/packages/backend/src/server/api/stream/channels/admin.ts b/packages/backend/src/server/api/stream/channels/admin.ts index 945182ea10..210e016a7e 100644 --- a/packages/backend/src/server/api/stream/channels/admin.ts +++ b/packages/backend/src/server/api/stream/channels/admin.ts @@ -1,10 +1,13 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { bindThis } from '@/decorators.js'; import Channel from '../channel.js'; -export default class extends Channel { +class AdminChannel extends Channel { public readonly chName = 'admin'; public static shouldShare = true; public static requireCredential = true; + @bindThis public async init(params: any) { // Subscribe admin stream this.subscriber.on(`adminStream:${this.user!.id}`, data => { @@ -12,3 +15,21 @@ export default class extends Channel { }); } } + +@Injectable() +export class AdminChannelService { + public readonly shouldShare = AdminChannel.shouldShare; + public readonly requireCredential = AdminChannel.requireCredential; + + constructor( + ) { + } + + @bindThis + public create(id: string, connection: Channel['connection']): AdminChannel { + return new AdminChannel( + id, + connection, + ); + } +} diff --git a/packages/backend/src/server/api/stream/channels/antenna.ts b/packages/backend/src/server/api/stream/channels/antenna.ts index d28320d928..44beef2da2 100644 --- a/packages/backend/src/server/api/stream/channels/antenna.ts +++ b/packages/backend/src/server/api/stream/channels/antenna.ts @@ -1,19 +1,28 @@ -import Channel from '../channel.js'; -import { Notes } from '@/models/index.js'; +import { Inject, Injectable } from '@nestjs/common'; +import type { NotesRepository } from '@/models/index.js'; import { isUserRelated } from '@/misc/is-user-related.js'; -import { StreamMessages } from '../types.js'; +import { NoteEntityService } from '@/core/entities/NoteEntityService.js'; +import { bindThis } from '@/decorators.js'; +import Channel from '../channel.js'; +import type { StreamMessages } from '../types.js'; -export default class extends Channel { +class AntennaChannel extends Channel { public readonly chName = 'antenna'; public static shouldShare = false; public static requireCredential = false; private antennaId: string; - constructor(id: string, connection: Channel['connection']) { + constructor( + private noteEntityService: NoteEntityService, + + id: string, + connection: Channel['connection'], + ) { super(id, connection); - this.onEvent = this.onEvent.bind(this); + //this.onEvent = this.onEvent.bind(this); } + @bindThis public async init(params: any) { this.antennaId = params.antennaId as string; @@ -21,9 +30,10 @@ export default class extends Channel { this.subscriber.on(`antennaStream:${this.antennaId}`, this.onEvent); } + @bindThis private async onEvent(data: StreamMessages['antenna']['payload']) { if (data.type === 'note') { - const note = await Notes.pack(data.body.id, this.user, { detail: true }); + const note = await this.noteEntityService.pack(data.body.id, this.user, { detail: true }); // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する if (isUserRelated(note, this.muting)) return; @@ -38,8 +48,29 @@ export default class extends Channel { } } + @bindThis public dispose() { // Unsubscribe events this.subscriber.off(`antennaStream:${this.antennaId}`, this.onEvent); } } + +@Injectable() +export class AntennaChannelService { + public readonly shouldShare = AntennaChannel.shouldShare; + public readonly requireCredential = AntennaChannel.requireCredential; + + constructor( + private noteEntityService: NoteEntityService, + ) { + } + + @bindThis + public create(id: string, connection: Channel['connection']): AntennaChannel { + return new AntennaChannel( + this.noteEntityService, + id, + connection, + ); + } +} diff --git a/packages/backend/src/server/api/stream/channels/channel.ts b/packages/backend/src/server/api/stream/channels/channel.ts index 3cdd89a8b3..5ba84e43c4 100644 --- a/packages/backend/src/server/api/stream/channels/channel.ts +++ b/packages/backend/src/server/api/stream/channels/channel.ts @@ -1,11 +1,15 @@ -import Channel from '../channel.js'; -import { Notes, Users } from '@/models/index.js'; +import { Inject, Injectable } from '@nestjs/common'; +import type { NotesRepository, UsersRepository } from '@/models/index.js'; import { isUserRelated } from '@/misc/is-user-related.js'; -import { User } from '@/models/entities/user.js'; -import { StreamMessages } from '../types.js'; -import { Packed } from '@/misc/schema.js'; +import type { User } from '@/models/entities/User.js'; +import type { Packed } from '@/misc/schema.js'; +import { NoteEntityService } from '@/core/entities/NoteEntityService.js'; +import { UserEntityService } from '@/core/entities/UserEntityService.js'; +import { bindThis } from '@/decorators.js'; +import Channel from '../channel.js'; +import type { StreamMessages } from '../types.js'; -export default class extends Channel { +class ChannelChannel extends Channel { public readonly chName = 'channel'; public static shouldShare = false; public static requireCredential = false; @@ -13,12 +17,19 @@ export default class extends Channel { private typers: Record<User['id'], Date> = {}; private emitTypersIntervalId: ReturnType<typeof setInterval>; - constructor(id: string, connection: Channel['connection']) { + constructor( + private noteEntityService: NoteEntityService, + private userEntityService: UserEntityService, + + id: string, + connection: Channel['connection'], + ) { super(id, connection); - this.onNote = this.onNote.bind(this); - this.emitTypers = this.emitTypers.bind(this); + //this.onNote = this.onNote.bind(this); + //this.emitTypers = this.emitTypers.bind(this); } + @bindThis public async init(params: any) { this.channelId = params.channelId as string; @@ -28,18 +39,19 @@ export default class extends Channel { this.emitTypersIntervalId = setInterval(this.emitTypers, 5000); } + @bindThis private async onNote(note: Packed<'Note'>) { if (note.channelId !== this.channelId) return; // リプライなら再pack if (note.replyId != null) { - note.reply = await Notes.pack(note.replyId, this.user, { + note.reply = await this.noteEntityService.pack(note.replyId, this.user, { detail: true, }); } // Renoteなら再pack if (note.renoteId != null) { - note.renote = await Notes.pack(note.renoteId, this.user, { + note.renote = await this.noteEntityService.pack(note.renoteId, this.user, { detail: true, }); } @@ -54,6 +66,7 @@ export default class extends Channel { this.send('note', note); } + @bindThis private onEvent(data: StreamMessages['channel']['payload']) { if (data.type === 'typing') { const id = data.body; @@ -65,6 +78,7 @@ export default class extends Channel { } } + @bindThis private async emitTypers() { const now = new Date(); @@ -73,7 +87,7 @@ export default class extends Channel { if (now.getTime() - date.getTime() > 5000) delete this.typers[userId]; } - const users = await Users.packMany(Object.keys(this.typers), null, { detail: false }); + const users = await this.userEntityService.packMany(Object.keys(this.typers), null, { detail: false }); this.send({ type: 'typers', @@ -81,6 +95,7 @@ export default class extends Channel { }); } + @bindThis public dispose() { // Unsubscribe events this.subscriber.off('notesStream', this.onNote); @@ -89,3 +104,25 @@ export default class extends Channel { clearInterval(this.emitTypersIntervalId); } } + +@Injectable() +export class ChannelChannelService { + public readonly shouldShare = ChannelChannel.shouldShare; + public readonly requireCredential = ChannelChannel.requireCredential; + + constructor( + private noteEntityService: NoteEntityService, + private userEntityService: UserEntityService, + ) { + } + + @bindThis + public create(id: string, connection: Channel['connection']): ChannelChannel { + return new ChannelChannel( + this.noteEntityService, + this.userEntityService, + id, + connection, + ); + } +} diff --git a/packages/backend/src/server/api/stream/channels/drive.ts b/packages/backend/src/server/api/stream/channels/drive.ts index 140255acd1..cfcb125b6b 100644 --- a/packages/backend/src/server/api/stream/channels/drive.ts +++ b/packages/backend/src/server/api/stream/channels/drive.ts @@ -1,10 +1,13 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { bindThis } from '@/decorators.js'; import Channel from '../channel.js'; -export default class extends Channel { +class DriveChannel extends Channel { public readonly chName = 'drive'; public static shouldShare = true; public static requireCredential = true; + @bindThis public async init(params: any) { // Subscribe drive stream this.subscriber.on(`driveStream:${this.user!.id}`, data => { @@ -12,3 +15,21 @@ export default class extends Channel { }); } } + +@Injectable() +export class DriveChannelService { + public readonly shouldShare = DriveChannel.shouldShare; + public readonly requireCredential = DriveChannel.requireCredential; + + constructor( + ) { + } + + @bindThis + public create(id: string, connection: Channel['connection']): DriveChannel { + return new DriveChannel( + id, + connection, + ); + } +} diff --git a/packages/backend/src/server/api/stream/channels/global-timeline.ts b/packages/backend/src/server/api/stream/channels/global-timeline.ts index 5b4ae850ec..43d8907fc9 100644 --- a/packages/backend/src/server/api/stream/channels/global-timeline.ts +++ b/packages/backend/src/server/api/stream/channels/global-timeline.ts @@ -1,44 +1,55 @@ -import Channel from '../channel.js'; -import { fetchMeta } from '@/misc/fetch-meta.js'; -import { Notes } from '@/models/index.js'; +import { Inject, Injectable } from '@nestjs/common'; +import type { NotesRepository } from '@/models/index.js'; import { checkWordMute } from '@/misc/check-word-mute.js'; import { isInstanceMuted } from '@/misc/is-instance-muted.js'; import { isUserRelated } from '@/misc/is-user-related.js'; -import { Packed } from '@/misc/schema.js'; +import type { Packed } from '@/misc/schema.js'; +import { MetaService } from '@/core/MetaService.js'; +import { NoteEntityService } from '@/core/entities/NoteEntityService.js'; +import { bindThis } from '@/decorators.js'; +import { RoleService } from '@/core/RoleService.js'; +import Channel from '../channel.js'; -export default class extends Channel { +class GlobalTimelineChannel extends Channel { public readonly chName = 'globalTimeline'; public static shouldShare = true; public static requireCredential = false; - constructor(id: string, connection: Channel['connection']) { + constructor( + private metaService: MetaService, + private roleService: RoleService, + private noteEntityService: NoteEntityService, + + id: string, + connection: Channel['connection'], + ) { super(id, connection); - this.onNote = this.onNote.bind(this); + //this.onNote = this.onNote.bind(this); } + @bindThis public async init(params: any) { - const meta = await fetchMeta(); - if (meta.disableGlobalTimeline) { - if (this.user == null || (!this.user.isAdmin && !this.user.isModerator)) return; - } + const policies = await this.roleService.getUserPolicies(this.user ? this.user.id : null); + if (!policies.gtlAvailable) return; // Subscribe events this.subscriber.on('notesStream', this.onNote); } + @bindThis private async onNote(note: Packed<'Note'>) { if (note.visibility !== 'public') return; if (note.channelId != null) return; // リプライなら再pack if (note.replyId != null) { - note.reply = await Notes.pack(note.replyId, this.user, { + note.reply = await this.noteEntityService.pack(note.replyId, this.user, { detail: true, }); } // Renoteなら再pack if (note.renoteId != null) { - note.renote = await Notes.pack(note.renoteId, this.user, { + note.renote = await this.noteEntityService.pack(note.renoteId, this.user, { detail: true, }); } @@ -70,8 +81,33 @@ export default class extends Channel { this.send('note', note); } + @bindThis public dispose() { // Unsubscribe events this.subscriber.off('notesStream', this.onNote); } } + +@Injectable() +export class GlobalTimelineChannelService { + public readonly shouldShare = GlobalTimelineChannel.shouldShare; + public readonly requireCredential = GlobalTimelineChannel.requireCredential; + + constructor( + private metaService: MetaService, + private roleService: RoleService, + private noteEntityService: NoteEntityService, + ) { + } + + @bindThis + public create(id: string, connection: Channel['connection']): GlobalTimelineChannel { + return new GlobalTimelineChannel( + this.metaService, + this.roleService, + this.noteEntityService, + id, + connection, + ); + } +} diff --git a/packages/backend/src/server/api/stream/channels/hashtag.ts b/packages/backend/src/server/api/stream/channels/hashtag.ts index 741db447e6..073b737079 100644 --- a/packages/backend/src/server/api/stream/channels/hashtag.ts +++ b/packages/backend/src/server/api/stream/channels/hashtag.ts @@ -1,20 +1,29 @@ -import Channel from '../channel.js'; -import { Notes } from '@/models/index.js'; +import { Inject, Injectable } from '@nestjs/common'; +import type { NotesRepository } from '@/models/index.js'; import { normalizeForSearch } from '@/misc/normalize-for-search.js'; import { isUserRelated } from '@/misc/is-user-related.js'; -import { Packed } from '@/misc/schema.js'; +import type { Packed } from '@/misc/schema.js'; +import { NoteEntityService } from '@/core/entities/NoteEntityService.js'; +import { bindThis } from '@/decorators.js'; +import Channel from '../channel.js'; -export default class extends Channel { +class HashtagChannel extends Channel { public readonly chName = 'hashtag'; public static shouldShare = false; public static requireCredential = false; private q: string[][]; - constructor(id: string, connection: Channel['connection']) { + constructor( + private noteEntityService: NoteEntityService, + + id: string, + connection: Channel['connection'], + ) { super(id, connection); - this.onNote = this.onNote.bind(this); + //this.onNote = this.onNote.bind(this); } + @bindThis public async init(params: any) { this.q = params.q; @@ -24,6 +33,7 @@ export default class extends Channel { this.subscriber.on('notesStream', this.onNote); } + @bindThis private async onNote(note: Packed<'Note'>) { const noteTags = note.tags ? note.tags.map((t: string) => t.toLowerCase()) : []; const matched = this.q.some(tags => tags.every(tag => noteTags.includes(normalizeForSearch(tag)))); @@ -31,7 +41,7 @@ export default class extends Channel { // Renoteなら再pack if (note.renoteId != null) { - note.renote = await Notes.pack(note.renoteId, this.user, { + note.renote = await this.noteEntityService.pack(note.renoteId, this.user, { detail: true, }); } @@ -46,8 +56,29 @@ export default class extends Channel { this.send('note', note); } + @bindThis public dispose() { // Unsubscribe events this.subscriber.off('notesStream', this.onNote); } } + +@Injectable() +export class HashtagChannelService { + public readonly shouldShare = HashtagChannel.shouldShare; + public readonly requireCredential = HashtagChannel.requireCredential; + + constructor( + private noteEntityService: NoteEntityService, + ) { + } + + @bindThis + public create(id: string, connection: Channel['connection']): HashtagChannel { + return new HashtagChannel( + this.noteEntityService, + id, + connection, + ); + } +} diff --git a/packages/backend/src/server/api/stream/channels/home-timeline.ts b/packages/backend/src/server/api/stream/channels/home-timeline.ts index 075a242ef0..5707ddd821 100644 --- a/packages/backend/src/server/api/stream/channels/home-timeline.ts +++ b/packages/backend/src/server/api/stream/channels/home-timeline.ts @@ -1,25 +1,35 @@ -import Channel from '../channel.js'; -import { Notes } from '@/models/index.js'; +import { Inject, Injectable } from '@nestjs/common'; +import type { NotesRepository } from '@/models/index.js'; import { checkWordMute } from '@/misc/check-word-mute.js'; import { isUserRelated } from '@/misc/is-user-related.js'; import { isInstanceMuted } from '@/misc/is-instance-muted.js'; -import { Packed } from '@/misc/schema.js'; +import type { Packed } from '@/misc/schema.js'; +import { NoteEntityService } from '@/core/entities/NoteEntityService.js'; +import { bindThis } from '@/decorators.js'; +import Channel from '../channel.js'; -export default class extends Channel { +class HomeTimelineChannel extends Channel { public readonly chName = 'homeTimeline'; public static shouldShare = true; public static requireCredential = true; - constructor(id: string, connection: Channel['connection']) { + constructor( + private noteEntityService: NoteEntityService, + + id: string, + connection: Channel['connection'], + ) { super(id, connection); - this.onNote = this.onNote.bind(this); + //this.onNote = this.onNote.bind(this); } + @bindThis public async init(params: any) { // Subscribe events this.subscriber.on('notesStream', this.onNote); } + @bindThis private async onNote(note: Packed<'Note'>) { if (note.channelId) { if (!this.followingChannels.has(note.channelId)) return; @@ -32,7 +42,7 @@ export default class extends Channel { if (isInstanceMuted(note, new Set<string>(this.userProfile?.mutedInstances ?? []))) return; if (['followers', 'specified'].includes(note.visibility)) { - note = await Notes.pack(note.id, this.user!, { + note = await this.noteEntityService.pack(note.id, this.user!, { detail: true, }); @@ -42,13 +52,13 @@ export default class extends Channel { } else { // リプライなら再pack if (note.replyId != null) { - note.reply = await Notes.pack(note.replyId, this.user!, { + note.reply = await this.noteEntityService.pack(note.replyId, this.user!, { detail: true, }); } // Renoteなら再pack if (note.renoteId != null) { - note.renote = await Notes.pack(note.renoteId, this.user!, { + note.renote = await this.noteEntityService.pack(note.renoteId, this.user!, { detail: true, }); } @@ -78,8 +88,29 @@ export default class extends Channel { this.send('note', note); } + @bindThis public dispose() { // Unsubscribe events this.subscriber.off('notesStream', this.onNote); } } + +@Injectable() +export class HomeTimelineChannelService { + public readonly shouldShare = HomeTimelineChannel.shouldShare; + public readonly requireCredential = HomeTimelineChannel.requireCredential; + + constructor( + private noteEntityService: NoteEntityService, + ) { + } + + @bindThis + public create(id: string, connection: Channel['connection']): HomeTimelineChannel { + return new HomeTimelineChannel( + this.noteEntityService, + id, + connection, + ); + } +} diff --git a/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts b/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts index f5dedf77ce..340f677815 100644 --- a/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts +++ b/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts @@ -1,29 +1,43 @@ -import Channel from '../channel.js'; -import { fetchMeta } from '@/misc/fetch-meta.js'; -import { Notes } from '@/models/index.js'; +import { Inject, Injectable } from '@nestjs/common'; +import type { NotesRepository } from '@/models/index.js'; import { checkWordMute } from '@/misc/check-word-mute.js'; import { isUserRelated } from '@/misc/is-user-related.js'; import { isInstanceMuted } from '@/misc/is-instance-muted.js'; -import { Packed } from '@/misc/schema.js'; +import type { Packed } from '@/misc/schema.js'; +import { DI } from '@/di-symbols.js'; +import { MetaService } from '@/core/MetaService.js'; +import { NoteEntityService } from '@/core/entities/NoteEntityService.js'; +import { bindThis } from '@/decorators.js'; +import { RoleService } from '@/core/RoleService.js'; +import Channel from '../channel.js'; -export default class extends Channel { +class HybridTimelineChannel extends Channel { public readonly chName = 'hybridTimeline'; public static shouldShare = true; public static requireCredential = true; - constructor(id: string, connection: Channel['connection']) { + constructor( + private metaService: MetaService, + private roleService: RoleService, + private noteEntityService: NoteEntityService, + + id: string, + connection: Channel['connection'], + ) { super(id, connection); - this.onNote = this.onNote.bind(this); + //this.onNote = this.onNote.bind(this); } - public async init(params: any) { - const meta = await fetchMeta(); - if (meta.disableLocalTimeline && !this.user!.isAdmin && !this.user!.isModerator) return; + @bindThis + public async init(params: any): Promise<void> { + const policies = await this.roleService.getUserPolicies(this.user ? this.user.id : null); + if (!policies.ltlAvailable) return; // Subscribe events this.subscriber.on('notesStream', this.onNote); } + @bindThis private async onNote(note: Packed<'Note'>) { // チャンネルの投稿ではなく、自分自身の投稿 または // チャンネルの投稿ではなく、その投稿のユーザーをフォローしている または @@ -37,7 +51,7 @@ export default class extends Channel { )) return; if (['followers', 'specified'].includes(note.visibility)) { - note = await Notes.pack(note.id, this.user!, { + note = await this.noteEntityService.pack(note.id, this.user!, { detail: true, }); @@ -47,13 +61,13 @@ export default class extends Channel { } else { // リプライなら再pack if (note.replyId != null) { - note.reply = await Notes.pack(note.replyId, this.user!, { + note.reply = await this.noteEntityService.pack(note.replyId, this.user!, { detail: true, }); } // Renoteなら再pack if (note.renoteId != null) { - note.renote = await Notes.pack(note.renoteId, this.user!, { + note.renote = await this.noteEntityService.pack(note.renoteId, this.user!, { detail: true, }); } @@ -86,8 +100,33 @@ export default class extends Channel { this.send('note', note); } - public dispose() { + @bindThis + public dispose(): void { // Unsubscribe events this.subscriber.off('notesStream', this.onNote); } } + +@Injectable() +export class HybridTimelineChannelService { + public readonly shouldShare = HybridTimelineChannel.shouldShare; + public readonly requireCredential = HybridTimelineChannel.requireCredential; + + constructor( + private metaService: MetaService, + private roleService: RoleService, + private noteEntityService: NoteEntityService, + ) { + } + + @bindThis + public create(id: string, connection: Channel['connection']): HybridTimelineChannel { + return new HybridTimelineChannel( + this.metaService, + this.roleService, + this.noteEntityService, + id, + connection, + ); + } +} diff --git a/packages/backend/src/server/api/stream/channels/index.ts b/packages/backend/src/server/api/stream/channels/index.ts deleted file mode 100644 index d422edde87..0000000000 --- a/packages/backend/src/server/api/stream/channels/index.ts +++ /dev/null @@ -1,33 +0,0 @@ -import main from './main.js'; -import homeTimeline from './home-timeline.js'; -import localTimeline from './local-timeline.js'; -import hybridTimeline from './hybrid-timeline.js'; -import globalTimeline from './global-timeline.js'; -import serverStats from './server-stats.js'; -import queueStats from './queue-stats.js'; -import userList from './user-list.js'; -import antenna from './antenna.js'; -import messaging from './messaging.js'; -import messagingIndex from './messaging-index.js'; -import drive from './drive.js'; -import hashtag from './hashtag.js'; -import channel from './channel.js'; -import admin from './admin.js'; - -export default { - main, - homeTimeline, - localTimeline, - hybridTimeline, - globalTimeline, - serverStats, - queueStats, - userList, - antenna, - messaging, - messagingIndex, - drive, - hashtag, - channel, - admin, -}; diff --git a/packages/backend/src/server/api/stream/channels/local-timeline.ts b/packages/backend/src/server/api/stream/channels/local-timeline.ts index f01f477238..ea29e30d63 100644 --- a/packages/backend/src/server/api/stream/channels/local-timeline.ts +++ b/packages/backend/src/server/api/stream/channels/local-timeline.ts @@ -1,30 +1,41 @@ -import Channel from '../channel.js'; -import { fetchMeta } from '@/misc/fetch-meta.js'; -import { Notes } from '@/models/index.js'; +import { Inject, Injectable } from '@nestjs/common'; +import type { NotesRepository } from '@/models/index.js'; import { checkWordMute } from '@/misc/check-word-mute.js'; import { isUserRelated } from '@/misc/is-user-related.js'; -import { Packed } from '@/misc/schema.js'; +import type { Packed } from '@/misc/schema.js'; +import { MetaService } from '@/core/MetaService.js'; +import { NoteEntityService } from '@/core/entities/NoteEntityService.js'; +import { bindThis } from '@/decorators.js'; +import { RoleService } from '@/core/RoleService.js'; +import Channel from '../channel.js'; -export default class extends Channel { +class LocalTimelineChannel extends Channel { public readonly chName = 'localTimeline'; public static shouldShare = true; public static requireCredential = false; - constructor(id: string, connection: Channel['connection']) { + constructor( + private metaService: MetaService, + private roleService: RoleService, + private noteEntityService: NoteEntityService, + + id: string, + connection: Channel['connection'], + ) { super(id, connection); - this.onNote = this.onNote.bind(this); + //this.onNote = this.onNote.bind(this); } + @bindThis public async init(params: any) { - const meta = await fetchMeta(); - if (meta.disableLocalTimeline) { - if (this.user == null || (!this.user.isAdmin && !this.user.isModerator)) return; - } + const policies = await this.roleService.getUserPolicies(this.user ? this.user.id : null); + if (!policies.ltlAvailable) return; // Subscribe events this.subscriber.on('notesStream', this.onNote); } + @bindThis private async onNote(note: Packed<'Note'>) { if (note.user.host !== null) return; if (note.visibility !== 'public') return; @@ -32,13 +43,13 @@ export default class extends Channel { // リプライなら再pack if (note.replyId != null) { - note.reply = await Notes.pack(note.replyId, this.user, { + note.reply = await this.noteEntityService.pack(note.replyId, this.user, { detail: true, }); } // Renoteなら再pack if (note.renoteId != null) { - note.renote = await Notes.pack(note.renoteId, this.user, { + note.renote = await this.noteEntityService.pack(note.renoteId, this.user, { detail: true, }); } @@ -67,8 +78,33 @@ export default class extends Channel { this.send('note', note); } + @bindThis public dispose() { // Unsubscribe events this.subscriber.off('notesStream', this.onNote); } } + +@Injectable() +export class LocalTimelineChannelService { + public readonly shouldShare = LocalTimelineChannel.shouldShare; + public readonly requireCredential = LocalTimelineChannel.requireCredential; + + constructor( + private metaService: MetaService, + private roleService: RoleService, + private noteEntityService: NoteEntityService, + ) { + } + + @bindThis + public create(id: string, connection: Channel['connection']): LocalTimelineChannel { + return new LocalTimelineChannel( + this.metaService, + this.roleService, + this.noteEntityService, + id, + connection, + ); + } +} diff --git a/packages/backend/src/server/api/stream/channels/main.ts b/packages/backend/src/server/api/stream/channels/main.ts index 9cfea0bfc4..42f255b8fe 100644 --- a/packages/backend/src/server/api/stream/channels/main.ts +++ b/packages/backend/src/server/api/stream/channels/main.ts @@ -1,12 +1,25 @@ -import Channel from '../channel.js'; -import { Notes } from '@/models/index.js'; +import { Inject, Injectable } from '@nestjs/common'; +import type { NotesRepository } from '@/models/index.js'; import { isInstanceMuted, isUserFromMutedInstance } from '@/misc/is-instance-muted.js'; +import { NoteEntityService } from '@/core/entities/NoteEntityService.js'; +import { bindThis } from '@/decorators.js'; +import Channel from '../channel.js'; -export default class extends Channel { +class MainChannel extends Channel { public readonly chName = 'main'; public static shouldShare = true; public static requireCredential = true; + constructor( + private noteEntityService: NoteEntityService, + + id: string, + connection: Channel['connection'], + ) { + super(id, connection); + } + + @bindThis public async init(params: any) { // Subscribe main stream channel this.subscriber.on(`mainStream:${this.user!.id}`, async data => { @@ -17,7 +30,7 @@ export default class extends Channel { if (data.body.userId && this.muting.has(data.body.userId)) return; if (data.body.note && data.body.note.isHidden) { - const note = await Notes.pack(data.body.note.id, this.user, { + const note = await this.noteEntityService.pack(data.body.note.id, this.user, { detail: true, }); this.connection.cacheNote(note); @@ -30,7 +43,7 @@ export default class extends Channel { if (this.muting.has(data.body.userId)) return; if (data.body.isHidden) { - const note = await Notes.pack(data.body.id, this.user, { + const note = await this.noteEntityService.pack(data.body.id, this.user, { detail: true, }); this.connection.cacheNote(note); @@ -44,3 +57,23 @@ export default class extends Channel { }); } } + +@Injectable() +export class MainChannelService { + public readonly shouldShare = MainChannel.shouldShare; + public readonly requireCredential = MainChannel.requireCredential; + + constructor( + private noteEntityService: NoteEntityService, + ) { + } + + @bindThis + public create(id: string, connection: Channel['connection']): MainChannel { + return new MainChannel( + this.noteEntityService, + id, + connection, + ); + } +} diff --git a/packages/backend/src/server/api/stream/channels/messaging-index.ts b/packages/backend/src/server/api/stream/channels/messaging-index.ts index b930785d20..66cb79f7a7 100644 --- a/packages/backend/src/server/api/stream/channels/messaging-index.ts +++ b/packages/backend/src/server/api/stream/channels/messaging-index.ts @@ -1,10 +1,13 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { bindThis } from '@/decorators.js'; import Channel from '../channel.js'; -export default class extends Channel { +class MessagingIndexChannel extends Channel { public readonly chName = 'messagingIndex'; public static shouldShare = true; public static requireCredential = true; + @bindThis public async init(params: any) { // Subscribe messaging index stream this.subscriber.on(`messagingIndexStream:${this.user!.id}`, data => { @@ -12,3 +15,21 @@ export default class extends Channel { }); } } + +@Injectable() +export class MessagingIndexChannelService { + public readonly shouldShare = MessagingIndexChannel.shouldShare; + public readonly requireCredential = MessagingIndexChannel.requireCredential; + + constructor( + ) { + } + + @bindThis + public create(id: string, connection: Channel['connection']): MessagingIndexChannel { + return new MessagingIndexChannel( + id, + connection, + ); + } +} diff --git a/packages/backend/src/server/api/stream/channels/messaging.ts b/packages/backend/src/server/api/stream/channels/messaging.ts index 877d44c38e..92af6b591c 100644 --- a/packages/backend/src/server/api/stream/channels/messaging.ts +++ b/packages/backend/src/server/api/stream/channels/messaging.ts @@ -1,11 +1,15 @@ -import { readUserMessagingMessage, readGroupMessagingMessage, deliverReadActivity } from '../../common/read-messaging-message.js'; +import { Inject, Injectable } from '@nestjs/common'; +import type { UserGroupJoiningsRepository, UsersRepository, MessagingMessagesRepository } from '@/models/index.js'; +import type { User, ILocalUser, IRemoteUser } from '@/models/entities/User.js'; +import type { UserGroup } from '@/models/entities/UserGroup.js'; +import { MessagingService } from '@/core/MessagingService.js'; +import { UserEntityService } from '@/core/entities/UserEntityService.js'; +import { DI } from '@/di-symbols.js'; +import { bindThis } from '@/decorators.js'; import Channel from '../channel.js'; -import { UserGroupJoinings, Users, MessagingMessages } from '@/models/index.js'; -import { User, ILocalUser, IRemoteUser } from '@/models/entities/user.js'; -import { UserGroup } from '@/models/entities/user-group.js'; -import { StreamMessages } from '../types.js'; +import type { StreamMessages } from '../types.js'; -export default class extends Channel { +class MessagingChannel extends Channel { public readonly chName = 'messaging'; public static shouldShare = false; public static requireCredential = true; @@ -17,21 +21,31 @@ export default class extends Channel { private typers: Record<User['id'], Date> = {}; private emitTypersIntervalId: ReturnType<typeof setInterval>; - constructor(id: string, connection: Channel['connection']) { + constructor( + private usersRepository: UsersRepository, + private userGroupJoiningsRepository: UserGroupJoiningsRepository, + private messagingMessagesRepository: MessagingMessagesRepository, + private userEntityService: UserEntityService, + private messagingService: MessagingService, + + id: string, + connection: Channel['connection'], + ) { super(id, connection); - this.onEvent = this.onEvent.bind(this); - this.onMessage = this.onMessage.bind(this); - this.emitTypers = this.emitTypers.bind(this); + //this.onEvent = this.onEvent.bind(this); + //this.onMessage = this.onMessage.bind(this); + //this.emitTypers = this.emitTypers.bind(this); } + @bindThis public async init(params: any) { this.otherpartyId = params.otherparty; - this.otherparty = this.otherpartyId ? await Users.findOneByOrFail({ id: this.otherpartyId }) : null; + this.otherparty = this.otherpartyId ? await this.usersRepository.findOneByOrFail({ id: this.otherpartyId }) : null; this.groupId = params.group; // Check joining if (this.groupId) { - const joining = await UserGroupJoinings.findOneBy({ + const joining = await this.userGroupJoiningsRepository.findOneBy({ userId: this.user!.id, userGroupId: this.groupId, }); @@ -51,6 +65,7 @@ export default class extends Channel { this.subscriber.on(this.subCh, this.onEvent); } + @bindThis private onEvent(data: StreamMessages['messaging']['payload'] | StreamMessages['groupMessaging']['payload']) { if (data.type === 'typing') { const id = data.body; @@ -64,25 +79,27 @@ export default class extends Channel { } } + @bindThis public onMessage(type: string, body: any) { switch (type) { case 'read': if (this.otherpartyId) { - readUserMessagingMessage(this.user!.id, this.otherpartyId, [body.id]); + this.messagingService.readUserMessagingMessage(this.user!.id, this.otherpartyId, [body.id]); // リモートユーザーからのメッセージだったら既読配信 - if (Users.isLocalUser(this.user!) && Users.isRemoteUser(this.otherparty!)) { - MessagingMessages.findOneBy({ id: body.id }).then(message => { - if (message) deliverReadActivity(this.user as ILocalUser, this.otherparty as IRemoteUser, message); + if (this.userEntityService.isLocalUser(this.user!) && this.userEntityService.isRemoteUser(this.otherparty!)) { + this.messagingMessagesRepository.findOneBy({ id: body.id }).then(message => { + if (message) this.messagingService.deliverReadActivity(this.user as ILocalUser, this.otherparty as IRemoteUser, message); }); } } else if (this.groupId) { - readGroupMessagingMessage(this.user!.id, this.groupId, [body.id]); + this.messagingService.readGroupMessagingMessage(this.user!.id, this.groupId, [body.id]); } break; } } + @bindThis private async emitTypers() { const now = new Date(); @@ -91,7 +108,7 @@ export default class extends Channel { if (now.getTime() - date.getTime() > 5000) delete this.typers[userId]; } - const users = await Users.packMany(Object.keys(this.typers), null, { detail: false }); + const users = await this.userEntityService.packMany(Object.keys(this.typers), null, { detail: false }); this.send({ type: 'typers', @@ -99,9 +116,44 @@ export default class extends Channel { }); } + @bindThis public dispose() { this.subscriber.off(this.subCh, this.onEvent); clearInterval(this.emitTypersIntervalId); } } + +@Injectable() +export class MessagingChannelService { + public readonly shouldShare = MessagingChannel.shouldShare; + public readonly requireCredential = MessagingChannel.requireCredential; + + constructor( + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.userGroupJoiningsRepository) + private userGroupJoiningsRepository: UserGroupJoiningsRepository, + + @Inject(DI.messagingMessagesRepository) + private messagingMessagesRepository: MessagingMessagesRepository, + + private userEntityService: UserEntityService, + private messagingService: MessagingService, + ) { + } + + @bindThis + public create(id: string, connection: Channel['connection']): MessagingChannel { + return new MessagingChannel( + this.usersRepository, + this.userGroupJoiningsRepository, + this.messagingMessagesRepository, + this.userEntityService, + this.messagingService, + id, + connection, + ); + } +} diff --git a/packages/backend/src/server/api/stream/channels/queue-stats.ts b/packages/backend/src/server/api/stream/channels/queue-stats.ts index b67600474b..c773916103 100644 --- a/packages/backend/src/server/api/stream/channels/queue-stats.ts +++ b/packages/backend/src/server/api/stream/channels/queue-stats.ts @@ -1,27 +1,32 @@ import Xev from 'xev'; +import { Inject, Injectable } from '@nestjs/common'; +import { bindThis } from '@/decorators.js'; import Channel from '../channel.js'; const ev = new Xev(); -export default class extends Channel { +class QueueStatsChannel extends Channel { public readonly chName = 'queueStats'; public static shouldShare = true; public static requireCredential = false; constructor(id: string, connection: Channel['connection']) { super(id, connection); - this.onStats = this.onStats.bind(this); - this.onMessage = this.onMessage.bind(this); + //this.onStats = this.onStats.bind(this); + //this.onMessage = this.onMessage.bind(this); } + @bindThis public async init(params: any) { ev.addListener('queueStats', this.onStats); } + @bindThis private onStats(stats: any) { this.send('stats', stats); } + @bindThis public onMessage(type: string, body: any) { switch (type) { case 'requestLog': @@ -36,7 +41,26 @@ export default class extends Channel { } } + @bindThis public dispose() { ev.removeListener('queueStats', this.onStats); } } + +@Injectable() +export class QueueStatsChannelService { + public readonly shouldShare = QueueStatsChannel.shouldShare; + public readonly requireCredential = QueueStatsChannel.requireCredential; + + constructor( + ) { + } + + @bindThis + public create(id: string, connection: Channel['connection']): QueueStatsChannel { + return new QueueStatsChannel( + id, + connection, + ); + } +} diff --git a/packages/backend/src/server/api/stream/channels/server-stats.ts b/packages/backend/src/server/api/stream/channels/server-stats.ts index db75a6fa38..492912dbe6 100644 --- a/packages/backend/src/server/api/stream/channels/server-stats.ts +++ b/packages/backend/src/server/api/stream/channels/server-stats.ts @@ -1,27 +1,32 @@ import Xev from 'xev'; +import { Inject, Injectable } from '@nestjs/common'; +import { bindThis } from '@/decorators.js'; import Channel from '../channel.js'; const ev = new Xev(); -export default class extends Channel { +class ServerStatsChannel extends Channel { public readonly chName = 'serverStats'; public static shouldShare = true; public static requireCredential = false; constructor(id: string, connection: Channel['connection']) { super(id, connection); - this.onStats = this.onStats.bind(this); - this.onMessage = this.onMessage.bind(this); + //this.onStats = this.onStats.bind(this); + //this.onMessage = this.onMessage.bind(this); } + @bindThis public async init(params: any) { ev.addListener('serverStats', this.onStats); } + @bindThis private onStats(stats: any) { this.send('stats', stats); } + @bindThis public onMessage(type: string, body: any) { switch (type) { case 'requestLog': @@ -36,7 +41,26 @@ export default class extends Channel { } } + @bindThis public dispose() { ev.removeListener('serverStats', this.onStats); } } + +@Injectable() +export class ServerStatsChannelService { + public readonly shouldShare = ServerStatsChannel.shouldShare; + public readonly requireCredential = ServerStatsChannel.requireCredential; + + constructor( + ) { + } + + @bindThis + public create(id: string, connection: Channel['connection']): ServerStatsChannel { + return new ServerStatsChannel( + id, + connection, + ); + } +} diff --git a/packages/backend/src/server/api/stream/channels/user-list.ts b/packages/backend/src/server/api/stream/channels/user-list.ts index 97ad2983c5..16af32868c 100644 --- a/packages/backend/src/server/api/stream/channels/user-list.ts +++ b/packages/backend/src/server/api/stream/channels/user-list.ts @@ -1,10 +1,14 @@ -import Channel from '../channel.js'; -import { Notes, UserListJoinings, UserLists } from '@/models/index.js'; -import { User } from '@/models/entities/user.js'; +import { Inject, Injectable } from '@nestjs/common'; +import type { UserListJoiningsRepository, UserListsRepository, NotesRepository } from '@/models/index.js'; +import type { User } from '@/models/entities/User.js'; import { isUserRelated } from '@/misc/is-user-related.js'; -import { Packed } from '@/misc/schema.js'; +import type { Packed } from '@/misc/schema.js'; +import { NoteEntityService } from '@/core/entities/NoteEntityService.js'; +import { DI } from '@/di-symbols.js'; +import { bindThis } from '@/decorators.js'; +import Channel from '../channel.js'; -export default class extends Channel { +class UserListChannel extends Channel { public readonly chName = 'userList'; public static shouldShare = false; public static requireCredential = false; @@ -12,17 +16,25 @@ export default class extends Channel { public listUsers: User['id'][] = []; private listUsersClock: NodeJS.Timer; - constructor(id: string, connection: Channel['connection']) { + constructor( + private userListsRepository: UserListsRepository, + private userListJoiningsRepository: UserListJoiningsRepository, + private noteEntityService: NoteEntityService, + + id: string, + connection: Channel['connection'], + ) { super(id, connection); - this.updateListUsers = this.updateListUsers.bind(this); - this.onNote = this.onNote.bind(this); + //this.updateListUsers = this.updateListUsers.bind(this); + //this.onNote = this.onNote.bind(this); } + @bindThis public async init(params: any) { this.listId = params.listId as string; // Check existence and owner - const list = await UserLists.findOneBy({ + const list = await this.userListsRepository.findOneBy({ id: this.listId, userId: this.user!.id, }); @@ -37,8 +49,9 @@ export default class extends Channel { this.listUsersClock = setInterval(this.updateListUsers, 5000); } + @bindThis private async updateListUsers() { - const users = await UserListJoinings.find({ + const users = await this.userListJoiningsRepository.find({ where: { userListId: this.listId, }, @@ -48,11 +61,12 @@ export default class extends Channel { this.listUsers = users.map(x => x.userId); } + @bindThis private async onNote(note: Packed<'Note'>) { if (!this.listUsers.includes(note.userId)) return; if (['followers', 'specified'].includes(note.visibility)) { - note = await Notes.pack(note.id, this.user, { + note = await this.noteEntityService.pack(note.id, this.user, { detail: true, }); @@ -62,13 +76,13 @@ export default class extends Channel { } else { // リプライなら再pack if (note.replyId != null) { - note.reply = await Notes.pack(note.replyId, this.user, { + note.reply = await this.noteEntityService.pack(note.replyId, this.user, { detail: true, }); } // Renoteなら再pack if (note.renoteId != null) { - note.renote = await Notes.pack(note.renoteId, this.user, { + note.renote = await this.noteEntityService.pack(note.renoteId, this.user, { detail: true, }); } @@ -82,6 +96,7 @@ export default class extends Channel { this.send('note', note); } + @bindThis public dispose() { // Unsubscribe events this.subscriber.off(`userListStream:${this.listId}`, this.send); @@ -90,3 +105,31 @@ export default class extends Channel { clearInterval(this.listUsersClock); } } + +@Injectable() +export class UserListChannelService { + public readonly shouldShare = UserListChannel.shouldShare; + public readonly requireCredential = UserListChannel.requireCredential; + + constructor( + @Inject(DI.userListsRepository) + private userListsRepository: UserListsRepository, + + @Inject(DI.userListJoiningsRepository) + private userListJoiningsRepository: UserListJoiningsRepository, + + private noteEntityService: NoteEntityService, + ) { + } + + @bindThis + public create(id: string, connection: Channel['connection']): UserListChannel { + return new UserListChannel( + this.userListsRepository, + this.userListJoiningsRepository, + this.noteEntityService, + id, + connection, + ); + } +} diff --git a/packages/backend/src/server/api/stream/index.ts b/packages/backend/src/server/api/stream/index.ts index 2d23145f14..6763953f9d 100644 --- a/packages/backend/src/server/api/stream/index.ts +++ b/packages/backend/src/server/api/stream/index.ts @@ -1,18 +1,19 @@ -import { EventEmitter } from 'events'; -import * as websocket from 'websocket'; -import readNote from '@/services/note/read.js'; -import { User } from '@/models/entities/user.js'; -import { Channel as ChannelModel } from '@/models/entities/channel.js'; -import { Users, Followings, Mutings, UserProfiles, ChannelFollowings, Blockings } from '@/models/index.js'; -import { AccessToken } from '@/models/entities/access-token.js'; -import { UserProfile } from '@/models/entities/user-profile.js'; -import { publishChannelStream, publishGroupMessagingStream, publishMessagingStream } from '@/services/stream.js'; -import { UserGroup } from '@/models/entities/user-group.js'; -import { Packed } from '@/misc/schema.js'; -import { readNotification } from '../common/read-notification.js'; -import channels from './channels/index.js'; -import Channel from './channel.js'; -import { StreamEventEmitter, StreamMessages } from './types.js'; +import type { User } from '@/models/entities/User.js'; +import type { Channel as ChannelModel } from '@/models/entities/Channel.js'; +import type { FollowingsRepository, MutingsRepository, UserProfilesRepository, ChannelFollowingsRepository, BlockingsRepository } from '@/models/index.js'; +import type { AccessToken } from '@/models/entities/AccessToken.js'; +import type { UserProfile } from '@/models/entities/UserProfile.js'; +import type { UserGroup } from '@/models/entities/UserGroup.js'; +import type { Packed } from '@/misc/schema.js'; +import type { GlobalEventService } from '@/core/GlobalEventService.js'; +import type { NoteReadService } from '@/core/NoteReadService.js'; +import type { NotificationService } from '@/core/NotificationService.js'; +import { bindThis } from '@/decorators.js'; +import type { ChannelsService } from './ChannelsService.js'; +import type * as websocket from 'websocket'; +import type { EventEmitter } from 'events'; +import type Channel from './channel.js'; +import type { StreamEventEmitter, StreamMessages } from './types.js'; /** * Main stream connection @@ -32,6 +33,16 @@ export default class Connection { private cachedNotes: Packed<'Note'>[] = []; constructor( + private followingsRepository: FollowingsRepository, + private mutingsRepository: MutingsRepository, + private blockingsRepository: BlockingsRepository, + private channelFollowingsRepository: ChannelFollowingsRepository, + private userProfilesRepository: UserProfilesRepository, + private channelsService: ChannelsService, + private globalEventService: GlobalEventService, + private noteReadService: NoteReadService, + private notificationService: NotificationService, + wsConnection: websocket.connection, subscriber: EventEmitter, user: User | null | undefined, @@ -42,10 +53,10 @@ export default class Connection { if (user) this.user = user; if (token) this.token = token; - this.onWsConnectionMessage = this.onWsConnectionMessage.bind(this); - this.onUserEvent = this.onUserEvent.bind(this); - this.onNoteStreamMessage = this.onNoteStreamMessage.bind(this); - this.onBroadcastMessage = this.onBroadcastMessage.bind(this); + //this.onWsConnectionMessage = this.onWsConnectionMessage.bind(this); + //this.onUserEvent = this.onUserEvent.bind(this); + //this.onNoteStreamMessage = this.onNoteStreamMessage.bind(this); + //this.onBroadcastMessage = this.onBroadcastMessage.bind(this); this.wsConnection.on('message', this.onWsConnectionMessage); @@ -64,6 +75,7 @@ export default class Connection { } } + @bindThis private onUserEvent(data: StreamMessages['user']['payload']) { // { type, body }と展開するとそれぞれ型が分離してしまう switch (data.type) { case 'follow': @@ -109,6 +121,7 @@ export default class Connection { /** * クライアントからメッセージ受信時 */ + @bindThis private async onWsConnectionMessage(data: websocket.Message) { if (data.type !== 'utf8') return; if (data.utf8Data == null) return; @@ -143,10 +156,12 @@ export default class Connection { } } + @bindThis private onBroadcastMessage(data: StreamMessages['broadcast']['payload']) { this.sendMessageToWs(data.type, data.body); } + @bindThis public cacheNote(note: Packed<'Note'>) { const add = (note: Packed<'Note'>) => { const existIndex = this.cachedNotes.findIndex(n => n.id === note.id); @@ -166,6 +181,7 @@ export default class Connection { if (note.renote) add(note.renote); } + @bindThis private readNote(body: any) { const id = body.id; @@ -173,21 +189,23 @@ export default class Connection { if (note == null) return; if (this.user && (note.userId !== this.user.id)) { - readNote(this.user.id, [note], { + this.noteReadService.read(this.user.id, [note], { following: this.following, followingChannels: this.followingChannels, }); } } + @bindThis private onReadNotification(payload: any) { if (!payload.id) return; - readNotification(this.user!.id, [payload.id]); + this.notificationService.readNotification(this.user!.id, [payload.id]); } /** * 投稿購読要求時 */ + @bindThis private onSubscribeNote(payload: any) { if (!payload.id) return; @@ -205,6 +223,7 @@ export default class Connection { /** * 投稿購読解除要求時 */ + @bindThis private onUnsubscribeNote(payload: any) { if (!payload.id) return; @@ -215,6 +234,7 @@ export default class Connection { } } + @bindThis private async onNoteStreamMessage(data: StreamMessages['note']['payload']) { this.sendMessageToWs('noteUpdated', { id: data.body.id, @@ -226,6 +246,7 @@ export default class Connection { /** * チャンネル接続要求時 */ + @bindThis private onChannelConnectRequested(payload: any) { const { channel, id, params, pong } = payload; this.connectChannel(id, params, channel, pong); @@ -234,6 +255,7 @@ export default class Connection { /** * チャンネル切断要求時 */ + @bindThis private onChannelDisconnectRequested(payload: any) { const { id } = payload; this.disconnectChannel(id); @@ -242,6 +264,7 @@ export default class Connection { /** * クライアントにメッセージ送信 */ + @bindThis public sendMessageToWs(type: string, payload: any) { this.wsConnection.send(JSON.stringify({ type: type, @@ -252,17 +275,20 @@ export default class Connection { /** * チャンネルに接続 */ + @bindThis public connectChannel(id: string, params: any, channel: string, pong = false) { - if ((channels as any)[channel].requireCredential && this.user == null) { + const channelService = this.channelsService.getChannelService(channel); + + if (channelService.requireCredential && this.user == null) { return; } // 共有可能チャンネルに接続しようとしていて、かつそのチャンネルに既に接続していたら無意味なので無視 - if ((channels as any)[channel].shouldShare && this.channels.some(c => c.chName === channel)) { + if (channelService.shouldShare && this.channels.some(c => c.chName === channel)) { return; } - const ch: Channel = new (channels as any)[channel](id, this); + const ch: Channel = channelService.create(id, this); this.channels.push(ch); ch.init(params); @@ -277,6 +303,7 @@ export default class Connection { * チャンネルから切断 * @param id チャンネルコネクションID */ + @bindThis public disconnectChannel(id: string) { const channel = this.channels.find(c => c.id === id); @@ -290,6 +317,7 @@ export default class Connection { * チャンネルへメッセージ送信要求時 * @param data メッセージ */ + @bindThis private onChannelMessageRequested(data: any) { const channel = this.channels.find(c => c.id === data.id); if (channel != null && channel.onMessage != null) { @@ -297,24 +325,27 @@ export default class Connection { } } + @bindThis private typingOnChannel(channel: ChannelModel['id']) { if (this.user) { - publishChannelStream(channel, 'typing', this.user.id); + this.globalEventService.publishChannelStream(channel, 'typing', this.user.id); } } + @bindThis private typingOnMessaging(param: { partner?: User['id']; group?: UserGroup['id']; }) { if (this.user) { if (param.partner) { - publishMessagingStream(param.partner, this.user.id, 'typing', this.user.id); + this.globalEventService.publishMessagingStream(param.partner, this.user.id, 'typing', this.user.id); } else if (param.group) { - publishGroupMessagingStream(param.group, 'typing', this.user.id); + this.globalEventService.publishGroupMessagingStream(param.group, 'typing', this.user.id); } } } + @bindThis private async updateFollowing() { - const followings = await Followings.find({ + const followings = await this.followingsRepository.find({ where: { followerId: this.user!.id, }, @@ -324,8 +355,9 @@ export default class Connection { this.following = new Set<string>(followings.map(x => x.followeeId)); } + @bindThis private async updateMuting() { - const mutings = await Mutings.find({ + const mutings = await this.mutingsRepository.find({ where: { muterId: this.user!.id, }, @@ -335,8 +367,9 @@ export default class Connection { this.muting = new Set<string>(mutings.map(x => x.muteeId)); } + @bindThis private async updateBlocking() { // ここでいうBlockingは被Blockingの意 - const blockings = await Blockings.find({ + const blockings = await this.blockingsRepository.find({ where: { blockeeId: this.user!.id, }, @@ -346,8 +379,9 @@ export default class Connection { this.blocking = new Set<string>(blockings.map(x => x.blockerId)); } + @bindThis private async updateFollowingChannels() { - const followings = await ChannelFollowings.find({ + const followings = await this.channelFollowingsRepository.find({ where: { followerId: this.user!.id, }, @@ -357,8 +391,9 @@ export default class Connection { this.followingChannels = new Set<string>(followings.map(x => x.followeeId)); } + @bindThis private async updateUserProfile() { - this.userProfile = await UserProfiles.findOneBy({ + this.userProfile = await this.userProfilesRepository.findOneBy({ userId: this.user!.id, }); } @@ -366,6 +401,7 @@ export default class Connection { /** * ストリームが切れたとき */ + @bindThis public dispose() { for (const c of this.channels.filter(c => c.dispose)) { if (c.dispose) c.dispose(); diff --git a/packages/backend/src/server/api/stream/types.ts b/packages/backend/src/server/api/stream/types.ts index 3b0a75d793..a442529bb3 100644 --- a/packages/backend/src/server/api/stream/types.ts +++ b/packages/backend/src/server/api/stream/types.ts @@ -1,35 +1,48 @@ -import { EventEmitter } from 'events'; -import Emitter from 'strict-event-emitter-types'; -import { Channel } from '@/models/entities/channel.js'; -import { User } from '@/models/entities/user.js'; -import { UserProfile } from '@/models/entities/user-profile.js'; -import { Note } from '@/models/entities/note.js'; -import { Antenna } from '@/models/entities/antenna.js'; -import { DriveFile } from '@/models/entities/drive-file.js'; -import { DriveFolder } from '@/models/entities/drive-folder.js'; -import { Emoji } from '@/models/entities/emoji.js'; -import { UserList } from '@/models/entities/user-list.js'; -import { MessagingMessage } from '@/models/entities/messaging-message.js'; -import { UserGroup } from '@/models/entities/user-group.js'; -import { AbuseUserReport } from '@/models/entities/abuse-user-report.js'; -import { Signin } from '@/models/entities/signin.js'; -import { Page } from '@/models/entities/page.js'; -import { Packed } from '@/misc/schema.js'; -import { Webhook } from '@/models/entities/webhook'; +import type { Channel } from '@/models/entities/Channel.js'; +import type { User } from '@/models/entities/User.js'; +import type { UserProfile } from '@/models/entities/UserProfile.js'; +import type { Note } from '@/models/entities/Note.js'; +import type { Antenna } from '@/models/entities/Antenna.js'; +import type { DriveFile } from '@/models/entities/DriveFile.js'; +import type { DriveFolder } from '@/models/entities/DriveFolder.js'; +import type { UserList } from '@/models/entities/UserList.js'; +import type { MessagingMessage } from '@/models/entities/MessagingMessage.js'; +import type { UserGroup } from '@/models/entities/UserGroup.js'; +import type { AbuseUserReport } from '@/models/entities/AbuseUserReport.js'; +import type { Signin } from '@/models/entities/Signin.js'; +import type { Page } from '@/models/entities/Page.js'; +import type { Packed } from '@/misc/schema.js'; +import type { Webhook } from '@/models/entities/Webhook.js'; +import type { Meta } from '@/models/entities/Meta.js'; +import { Following, Role, RoleAssignment } from '@/models'; +import type Emitter from 'strict-event-emitter-types'; +import type { EventEmitter } from 'events'; + +// redis通すとDateのインスタンスはstringに変換されるので +type Serialized<T> = { + [K in keyof T]: T[K] extends Date ? string : T[K]; +}; //#region Stream type-body definitions export interface InternalStreamTypes { - userChangeSuspendedState: { id: User['id']; isSuspended: User['isSuspended']; }; - userChangeSilencedState: { id: User['id']; isSilenced: User['isSilenced']; }; - userChangeModeratorState: { id: User['id']; isModerator: User['isModerator']; }; - userTokenRegenerated: { id: User['id']; oldToken: User['token']; newToken: User['token']; }; - remoteUserUpdated: { id: User['id']; }; - webhookCreated: Webhook; - webhookDeleted: Webhook; - webhookUpdated: Webhook; - antennaCreated: Antenna; - antennaDeleted: Antenna; - antennaUpdated: Antenna; + userChangeSuspendedState: Serialized<{ id: User['id']; isSuspended: User['isSuspended']; }>; + userTokenRegenerated: Serialized<{ id: User['id']; oldToken: User['token']; newToken: User['token']; }>; + remoteUserUpdated: Serialized<{ id: User['id']; }>; + follow: Serialized<{ followerId: User['id']; followeeId: User['id']; }>; + unfollow: Serialized<{ followerId: User['id']; followeeId: User['id']; }>; + policiesUpdated: Serialized<Role['options']>; + roleCreated: Serialized<Role>; + roleDeleted: Serialized<Role>; + roleUpdated: Serialized<Role>; + userRoleAssigned: Serialized<RoleAssignment>; + userRoleUnassigned: Serialized<RoleAssignment>; + webhookCreated: Serialized<Webhook>; + webhookDeleted: Serialized<Webhook>; + webhookUpdated: Serialized<Webhook>; + antennaCreated: Serialized<Antenna>; + antennaDeleted: Serialized<Antenna>; + antennaUpdated: Serialized<Antenna>; + metaUpdated: Serialized<Meta>; } export interface BroadcastTypes { |