From 0e4a111f81cceed275d9bec2695f6e401fb654d8 Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 12 Nov 2021 02:02:25 +0900 Subject: refactoring Resolve #7779 --- packages/backend/src/server/api/stream/channel.ts | 62 +++ .../src/server/api/stream/channels/admin.ts | 16 + .../src/server/api/stream/channels/antenna.ts | 45 +++ .../src/server/api/stream/channels/channel.ts | 92 +++++ .../src/server/api/stream/channels/drive.ts | 16 + .../api/stream/channels/games/reversi-game.ts | 372 ++++++++++++++++++ .../server/api/stream/channels/games/reversi.ts | 33 ++ .../server/api/stream/channels/global-timeline.ts | 73 ++++ .../src/server/api/stream/channels/hashtag.ts | 53 +++ .../server/api/stream/channels/home-timeline.ts | 81 ++++ .../server/api/stream/channels/hybrid-timeline.ts | 89 +++++ .../src/server/api/stream/channels/index.ts | 37 ++ .../server/api/stream/channels/local-timeline.ts | 74 ++++ .../backend/src/server/api/stream/channels/main.ts | 43 +++ .../server/api/stream/channels/messaging-index.ts | 16 + .../src/server/api/stream/channels/messaging.ts | 106 ++++++ .../src/server/api/stream/channels/queue-stats.ts | 41 ++ .../src/server/api/stream/channels/server-stats.ts | 41 ++ .../src/server/api/stream/channels/user-list.ts | 92 +++++ packages/backend/src/server/api/stream/index.ts | 421 +++++++++++++++++++++ packages/backend/src/server/api/stream/types.ts | 299 +++++++++++++++ 21 files changed, 2102 insertions(+) create mode 100644 packages/backend/src/server/api/stream/channel.ts create mode 100644 packages/backend/src/server/api/stream/channels/admin.ts create mode 100644 packages/backend/src/server/api/stream/channels/antenna.ts create mode 100644 packages/backend/src/server/api/stream/channels/channel.ts create mode 100644 packages/backend/src/server/api/stream/channels/drive.ts create mode 100644 packages/backend/src/server/api/stream/channels/games/reversi-game.ts create mode 100644 packages/backend/src/server/api/stream/channels/games/reversi.ts create mode 100644 packages/backend/src/server/api/stream/channels/global-timeline.ts create mode 100644 packages/backend/src/server/api/stream/channels/hashtag.ts create mode 100644 packages/backend/src/server/api/stream/channels/home-timeline.ts create mode 100644 packages/backend/src/server/api/stream/channels/hybrid-timeline.ts create mode 100644 packages/backend/src/server/api/stream/channels/index.ts create mode 100644 packages/backend/src/server/api/stream/channels/local-timeline.ts create mode 100644 packages/backend/src/server/api/stream/channels/main.ts create mode 100644 packages/backend/src/server/api/stream/channels/messaging-index.ts create mode 100644 packages/backend/src/server/api/stream/channels/messaging.ts create mode 100644 packages/backend/src/server/api/stream/channels/queue-stats.ts create mode 100644 packages/backend/src/server/api/stream/channels/server-stats.ts create mode 100644 packages/backend/src/server/api/stream/channels/user-list.ts create mode 100644 packages/backend/src/server/api/stream/index.ts create mode 100644 packages/backend/src/server/api/stream/types.ts (limited to 'packages/backend/src/server/api/stream') diff --git a/packages/backend/src/server/api/stream/channel.ts b/packages/backend/src/server/api/stream/channel.ts new file mode 100644 index 0000000000..2824d7d1b8 --- /dev/null +++ b/packages/backend/src/server/api/stream/channel.ts @@ -0,0 +1,62 @@ +import autobind from 'autobind-decorator'; +import Connection from '.'; + +/** + * Stream channel + */ +export default abstract class Channel { + protected connection: Connection; + public id: string; + public abstract readonly chName: string; + public static readonly shouldShare: boolean; + public static readonly requireCredential: boolean; + + protected get user() { + return this.connection.user; + } + + protected get userProfile() { + return this.connection.userProfile; + } + + protected get following() { + return this.connection.following; + } + + protected get muting() { + return this.connection.muting; + } + + protected get blocking() { + return this.connection.blocking; + } + + protected get followingChannels() { + return this.connection.followingChannels; + } + + protected get subscriber() { + return this.connection.subscriber; + } + + constructor(id: string, connection: Connection) { + this.id = id; + this.connection = connection; + } + + @autobind + public send(typeOrPayload: any, payload?: any) { + const type = payload === undefined ? typeOrPayload.type : typeOrPayload; + const body = payload === undefined ? typeOrPayload.body : payload; + + this.connection.sendMessageToWs('channel', { + id: this.id, + type: type, + body: body + }); + } + + public abstract init(params: any): void; + public dispose?(): void; + public onMessage?(type: string, body: any): void; +} diff --git a/packages/backend/src/server/api/stream/channels/admin.ts b/packages/backend/src/server/api/stream/channels/admin.ts new file mode 100644 index 0000000000..1ff932d1dd --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/admin.ts @@ -0,0 +1,16 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; + +export default class extends Channel { + public readonly chName = 'admin'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + // Subscribe admin stream + this.subscriber.on(`adminStream:${this.user!.id}`, data => { + this.send(data); + }); + } +} diff --git a/packages/backend/src/server/api/stream/channels/antenna.ts b/packages/backend/src/server/api/stream/channels/antenna.ts new file mode 100644 index 0000000000..3cbdfebb43 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/antenna.ts @@ -0,0 +1,45 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; +import { Notes } from '@/models/index'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { StreamMessages } from '../types'; + +export default class extends Channel { + public readonly chName = 'antenna'; + public static shouldShare = false; + public static requireCredential = false; + private antennaId: string; + + @autobind + public async init(params: any) { + this.antennaId = params.antennaId as string; + + // Subscribe stream + this.subscriber.on(`antennaStream:${this.antennaId}`, this.onEvent); + } + + @autobind + private async onEvent(data: StreamMessages['antenna']['payload']) { + if (data.type === 'note') { + const note = await Notes.pack(data.body.id, this.user, { detail: true }); + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } else { + this.send(data.type, data.body); + } + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off(`antennaStream:${this.antennaId}`, this.onEvent); + } +} diff --git a/packages/backend/src/server/api/stream/channels/channel.ts b/packages/backend/src/server/api/stream/channels/channel.ts new file mode 100644 index 0000000000..bf7942f522 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/channel.ts @@ -0,0 +1,92 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; +import { Notes, Users } from '@/models/index'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { User } from '@/models/entities/user'; +import { StreamMessages } from '../types'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'channel'; + public static shouldShare = false; + public static requireCredential = false; + private channelId: string; + private typers: Record = {}; + private emitTypersIntervalId: ReturnType; + + @autobind + public async init(params: any) { + this.channelId = params.channelId as string; + + // Subscribe stream + this.subscriber.on('notesStream', this.onNote); + this.subscriber.on(`channelStream:${this.channelId}`, this.onEvent); + this.emitTypersIntervalId = setInterval(this.emitTypers, 5000); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + if (note.channelId !== this.channelId) return; + + // リプライなら再pack + if (note.replyId != null) { + note.reply = await Notes.pack(note.replyId, this.user, { + detail: true + }); + } + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user, { + detail: true + }); + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } + + @autobind + private onEvent(data: StreamMessages['channel']['payload']) { + if (data.type === 'typing') { + const id = data.body; + const begin = this.typers[id] == null; + this.typers[id] = new Date(); + if (begin) { + this.emitTypers(); + } + } + } + + @autobind + private async emitTypers() { + const now = new Date(); + + // Remove not typing users + for (const [userId, date] of Object.entries(this.typers)) { + if (now.getTime() - date.getTime() > 5000) delete this.typers[userId]; + } + + const users = await Users.packMany(Object.keys(this.typers), null, { detail: false }); + + this.send({ + type: 'typers', + body: users, + }); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('notesStream', this.onNote); + this.subscriber.off(`channelStream:${this.channelId}`, this.onEvent); + + clearInterval(this.emitTypersIntervalId); + } +} diff --git a/packages/backend/src/server/api/stream/channels/drive.ts b/packages/backend/src/server/api/stream/channels/drive.ts new file mode 100644 index 0000000000..4112dd9b04 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/drive.ts @@ -0,0 +1,16 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; + +export default class extends Channel { + public readonly chName = 'drive'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + // Subscribe drive stream + this.subscriber.on(`driveStream:${this.user!.id}`, data => { + this.send(data); + }); + } +} diff --git a/packages/backend/src/server/api/stream/channels/games/reversi-game.ts b/packages/backend/src/server/api/stream/channels/games/reversi-game.ts new file mode 100644 index 0000000000..bfdbf1d266 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/games/reversi-game.ts @@ -0,0 +1,372 @@ +import autobind from 'autobind-decorator'; +import * as CRC32 from 'crc-32'; +import { publishReversiGameStream } from '@/services/stream'; +import Reversi from '../../../../../games/reversi/core'; +import * as maps from '../../../../../games/reversi/maps'; +import Channel from '../../channel'; +import { ReversiGame } from '@/models/entities/games/reversi/game'; +import { ReversiGames, Users } from '@/models/index'; +import { User } from '@/models/entities/user'; + +export default class extends Channel { + public readonly chName = 'gamesReversiGame'; + public static shouldShare = false; + public static requireCredential = false; + + private gameId: ReversiGame['id'] | null = null; + private watchers: Record = {}; + private emitWatchersIntervalId: ReturnType; + + @autobind + public async init(params: any) { + this.gameId = params.gameId; + + // Subscribe game stream + this.subscriber.on(`reversiGameStream:${this.gameId}`, this.onEvent); + this.emitWatchersIntervalId = setInterval(this.emitWatchers, 5000); + + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + // 観戦者イベント + this.watch(game); + } + + @autobind + private onEvent(data: any) { + if (data.type === 'watching') { + const id = data.body; + this.watchers[id] = new Date(); + } else { + this.send(data); + } + } + + @autobind + private async emitWatchers() { + const now = new Date(); + + // Remove not watching users + for (const [userId, date] of Object.entries(this.watchers)) { + if (now.getTime() - date.getTime() > 5000) delete this.watchers[userId]; + } + + const users = await Users.packMany(Object.keys(this.watchers), null, { detail: false }); + + this.send({ + type: 'watchers', + body: users, + }); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off(`reversiGameStream:${this.gameId}`, this.onEvent); + clearInterval(this.emitWatchersIntervalId); + } + + @autobind + public onMessage(type: string, body: any) { + switch (type) { + case 'accept': this.accept(true); break; + case 'cancelAccept': this.accept(false); break; + case 'updateSettings': this.updateSettings(body.key, body.value); break; + case 'initForm': this.initForm(body); break; + case 'updateForm': this.updateForm(body.id, body.value); break; + case 'message': this.message(body); break; + case 'set': this.set(body.pos); break; + case 'check': this.check(body.crc32); break; + } + } + + @autobind + private async updateSettings(key: string, value: any) { + if (this.user == null) return; + + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + if (game.isStarted) return; + if ((game.user1Id !== this.user.id) && (game.user2Id !== this.user.id)) return; + if ((game.user1Id === this.user.id) && game.user1Accepted) return; + if ((game.user2Id === this.user.id) && game.user2Accepted) return; + + if (!['map', 'bw', 'isLlotheo', 'canPutEverywhere', 'loopedBoard'].includes(key)) return; + + await ReversiGames.update(this.gameId!, { + [key]: value + }); + + publishReversiGameStream(this.gameId!, 'updateSettings', { + key: key, + value: value + }); + } + + @autobind + private async initForm(form: any) { + if (this.user == null) return; + + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + if (game.isStarted) return; + if ((game.user1Id !== this.user.id) && (game.user2Id !== this.user.id)) return; + + const set = game.user1Id === this.user.id ? { + form1: form + } : { + form2: form + }; + + await ReversiGames.update(this.gameId!, set); + + publishReversiGameStream(this.gameId!, 'initForm', { + userId: this.user.id, + form + }); + } + + @autobind + private async updateForm(id: string, value: any) { + if (this.user == null) return; + + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + if (game.isStarted) return; + if ((game.user1Id !== this.user.id) && (game.user2Id !== this.user.id)) return; + + const form = game.user1Id === this.user.id ? game.form2 : game.form1; + + const item = form.find((i: any) => i.id == id); + + if (item == null) return; + + item.value = value; + + const set = game.user1Id === this.user.id ? { + form2: form + } : { + form1: form + }; + + await ReversiGames.update(this.gameId!, set); + + publishReversiGameStream(this.gameId!, 'updateForm', { + userId: this.user.id, + id, + value + }); + } + + @autobind + private async message(message: any) { + if (this.user == null) return; + + message.id = Math.random(); + publishReversiGameStream(this.gameId!, 'message', { + userId: this.user.id, + message + }); + } + + @autobind + private async accept(accept: boolean) { + if (this.user == null) return; + + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + if (game.isStarted) return; + + let bothAccepted = false; + + if (game.user1Id === this.user.id) { + await ReversiGames.update(this.gameId!, { + user1Accepted: accept + }); + + publishReversiGameStream(this.gameId!, 'changeAccepts', { + user1: accept, + user2: game.user2Accepted + }); + + if (accept && game.user2Accepted) bothAccepted = true; + } else if (game.user2Id === this.user.id) { + await ReversiGames.update(this.gameId!, { + user2Accepted: accept + }); + + publishReversiGameStream(this.gameId!, 'changeAccepts', { + user1: game.user1Accepted, + user2: accept + }); + + if (accept && game.user1Accepted) bothAccepted = true; + } else { + return; + } + + if (bothAccepted) { + // 3秒後、まだacceptされていたらゲーム開始 + setTimeout(async () => { + const freshGame = await ReversiGames.findOne(this.gameId!); + if (freshGame == null || freshGame.isStarted || freshGame.isEnded) return; + if (!freshGame.user1Accepted || !freshGame.user2Accepted) return; + + let bw: number; + if (freshGame.bw == 'random') { + bw = Math.random() > 0.5 ? 1 : 2; + } else { + bw = parseInt(freshGame.bw, 10); + } + + function getRandomMap() { + const mapCount = Object.entries(maps).length; + const rnd = Math.floor(Math.random() * mapCount); + return Object.values(maps)[rnd].data; + } + + const map = freshGame.map != null ? freshGame.map : getRandomMap(); + + await ReversiGames.update(this.gameId!, { + startedAt: new Date(), + isStarted: true, + black: bw, + map: map + }); + + //#region 盤面に最初から石がないなどして始まった瞬間に勝敗が決定する場合があるのでその処理 + const o = new Reversi(map, { + isLlotheo: freshGame.isLlotheo, + canPutEverywhere: freshGame.canPutEverywhere, + loopedBoard: freshGame.loopedBoard + }); + + if (o.isEnded) { + let winner; + if (o.winner === true) { + winner = freshGame.black == 1 ? freshGame.user1Id : freshGame.user2Id; + } else if (o.winner === false) { + winner = freshGame.black == 1 ? freshGame.user2Id : freshGame.user1Id; + } else { + winner = null; + } + + await ReversiGames.update(this.gameId!, { + isEnded: true, + winnerId: winner + }); + + publishReversiGameStream(this.gameId!, 'ended', { + winnerId: winner, + game: await ReversiGames.pack(this.gameId!, this.user) + }); + } + //#endregion + + publishReversiGameStream(this.gameId!, 'started', + await ReversiGames.pack(this.gameId!, this.user)); + }, 3000); + } + } + + // 石を打つ + @autobind + private async set(pos: number) { + if (this.user == null) return; + + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + if (!game.isStarted) return; + if (game.isEnded) return; + if ((game.user1Id !== this.user.id) && (game.user2Id !== this.user.id)) return; + + const myColor = + ((game.user1Id === this.user.id) && game.black == 1) || ((game.user2Id === this.user.id) && game.black == 2) + ? true + : false; + + const o = new Reversi(game.map, { + isLlotheo: game.isLlotheo, + canPutEverywhere: game.canPutEverywhere, + loopedBoard: game.loopedBoard + }); + + // 盤面の状態を再生 + for (const log of game.logs) { + o.put(log.color, log.pos); + } + + if (o.turn !== myColor) return; + + if (!o.canPut(myColor, pos)) return; + o.put(myColor, pos); + + let winner; + if (o.isEnded) { + if (o.winner === true) { + winner = game.black == 1 ? game.user1Id : game.user2Id; + } else if (o.winner === false) { + winner = game.black == 1 ? game.user2Id : game.user1Id; + } else { + winner = null; + } + } + + const log = { + at: new Date(), + color: myColor, + pos + }; + + const crc32 = CRC32.str(game.logs.map(x => x.pos.toString()).join('') + pos.toString()).toString(); + + game.logs.push(log); + + await ReversiGames.update(this.gameId!, { + crc32, + isEnded: o.isEnded, + winnerId: winner, + logs: game.logs + }); + + publishReversiGameStream(this.gameId!, 'set', Object.assign(log, { + next: o.turn + })); + + if (o.isEnded) { + publishReversiGameStream(this.gameId!, 'ended', { + winnerId: winner, + game: await ReversiGames.pack(this.gameId!, this.user) + }); + } + } + + @autobind + private async check(crc32: string | number) { + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + if (!game.isStarted) return; + + if (crc32.toString() !== game.crc32) { + this.send('rescue', await ReversiGames.pack(game, this.user)); + } + + // ついでに観戦者イベントを発行 + this.watch(game); + } + + @autobind + private watch(game: ReversiGame) { + if (this.user != null) { + if ((game.user1Id !== this.user.id) && (game.user2Id !== this.user.id)) { + publishReversiGameStream(this.gameId!, 'watching', this.user.id); + } + } + } +} diff --git a/packages/backend/src/server/api/stream/channels/games/reversi.ts b/packages/backend/src/server/api/stream/channels/games/reversi.ts new file mode 100644 index 0000000000..3b89aac35c --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/games/reversi.ts @@ -0,0 +1,33 @@ +import autobind from 'autobind-decorator'; +import { publishMainStream } from '@/services/stream'; +import Channel from '../../channel'; +import { ReversiMatchings } from '@/models/index'; + +export default class extends Channel { + public readonly chName = 'gamesReversi'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + // Subscribe reversi stream + this.subscriber.on(`reversiStream:${this.user!.id}`, data => { + this.send(data); + }); + } + + @autobind + public async onMessage(type: string, body: any) { + switch (type) { + case 'ping': + if (body.id == null) return; + const matching = await ReversiMatchings.findOne({ + parentId: this.user!.id, + childId: body.id + }); + if (matching == null) return; + publishMainStream(matching.childId, 'reversiInvited', await ReversiMatchings.pack(matching, { id: matching.childId })); + break; + } + } +} diff --git a/packages/backend/src/server/api/stream/channels/global-timeline.ts b/packages/backend/src/server/api/stream/channels/global-timeline.ts new file mode 100644 index 0000000000..f5983ab472 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/global-timeline.ts @@ -0,0 +1,73 @@ +import autobind from 'autobind-decorator'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import Channel from '../channel'; +import { fetchMeta } from '@/misc/fetch-meta'; +import { Notes } from '@/models/index'; +import { checkWordMute } from '@/misc/check-word-mute'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'globalTimeline'; + public static shouldShare = true; + public static requireCredential = false; + + @autobind + public async init(params: any) { + const meta = await fetchMeta(); + if (meta.disableGlobalTimeline) { + if (this.user == null || (!this.user.isAdmin && !this.user.isModerator)) return; + } + + // Subscribe events + this.subscriber.on('notesStream', this.onNote); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + if (note.visibility !== 'public') return; + if (note.channelId != null) return; + + // リプライなら再pack + if (note.replyId != null) { + note.reply = await Notes.pack(note.replyId, this.user, { + detail: true + }); + } + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user, { + detail: true + }); + } + + // 関係ない返信は除外 + if (note.reply) { + const reply = note.reply; + // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合 + if (reply.userId !== this.user!.id && note.userId !== this.user!.id && reply.userId !== note.userId) return; + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + // 流れてきたNoteがミュートすべきNoteだったら無視する + // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) + // 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、 + // レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。 + // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる + if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('notesStream', this.onNote); + } +} diff --git a/packages/backend/src/server/api/stream/channels/hashtag.ts b/packages/backend/src/server/api/stream/channels/hashtag.ts new file mode 100644 index 0000000000..281be4f2eb --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/hashtag.ts @@ -0,0 +1,53 @@ +import autobind from 'autobind-decorator'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import Channel from '../channel'; +import { Notes } from '@/models/index'; +import { normalizeForSearch } from '@/misc/normalize-for-search'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'hashtag'; + public static shouldShare = false; + public static requireCredential = false; + private q: string[][]; + + @autobind + public async init(params: any) { + this.q = params.q; + + if (this.q == null) return; + + // Subscribe stream + this.subscriber.on('notesStream', this.onNote); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + const noteTags = note.tags ? note.tags.map((t: string) => t.toLowerCase()) : []; + const matched = this.q.some(tags => tags.every(tag => noteTags.includes(normalizeForSearch(tag)))); + if (!matched) return; + + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user, { + detail: true + }); + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('notesStream', this.onNote); + } +} diff --git a/packages/backend/src/server/api/stream/channels/home-timeline.ts b/packages/backend/src/server/api/stream/channels/home-timeline.ts new file mode 100644 index 0000000000..52e9aec250 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/home-timeline.ts @@ -0,0 +1,81 @@ +import autobind from 'autobind-decorator'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import Channel from '../channel'; +import { Notes } from '@/models/index'; +import { checkWordMute } from '@/misc/check-word-mute'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'homeTimeline'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + // Subscribe events + this.subscriber.on('notesStream', this.onNote); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + if (note.channelId) { + if (!this.followingChannels.has(note.channelId)) return; + } else { + // その投稿のユーザーをフォローしていなかったら弾く + if ((this.user!.id !== note.userId) && !this.following.has(note.userId)) return; + } + + if (['followers', 'specified'].includes(note.visibility)) { + note = await Notes.pack(note.id, this.user!, { + detail: true + }); + + if (note.isHidden) { + return; + } + } else { + // リプライなら再pack + if (note.replyId != null) { + note.reply = await Notes.pack(note.replyId, this.user!, { + detail: true + }); + } + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user!, { + detail: true + }); + } + } + + // 関係ない返信は除外 + if (note.reply) { + const reply = note.reply; + // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合 + if (reply.userId !== this.user!.id && note.userId !== this.user!.id && reply.userId !== note.userId) return; + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + // 流れてきたNoteがミュートすべきNoteだったら無視する + // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) + // 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、 + // レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。 + // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる + if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('notesStream', this.onNote); + } +} diff --git a/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts b/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts new file mode 100644 index 0000000000..51f95fc0cd --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts @@ -0,0 +1,89 @@ +import autobind from 'autobind-decorator'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import Channel from '../channel'; +import { fetchMeta } from '@/misc/fetch-meta'; +import { Notes } from '@/models/index'; +import { checkWordMute } from '@/misc/check-word-mute'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'hybridTimeline'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + const meta = await fetchMeta(); + if (meta.disableLocalTimeline && !this.user!.isAdmin && !this.user!.isModerator) return; + + // Subscribe events + this.subscriber.on('notesStream', this.onNote); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + // チャンネルの投稿ではなく、自分自身の投稿 または + // チャンネルの投稿ではなく、その投稿のユーザーをフォローしている または + // チャンネルの投稿ではなく、全体公開のローカルの投稿 または + // フォローしているチャンネルの投稿 の場合だけ + if (!( + (note.channelId == null && this.user!.id === note.userId) || + (note.channelId == null && this.following.has(note.userId)) || + (note.channelId == null && (note.user.host == null && note.visibility === 'public')) || + (note.channelId != null && this.followingChannels.has(note.channelId)) + )) return; + + if (['followers', 'specified'].includes(note.visibility)) { + note = await Notes.pack(note.id, this.user!, { + detail: true + }); + + if (note.isHidden) { + return; + } + } else { + // リプライなら再pack + if (note.replyId != null) { + note.reply = await Notes.pack(note.replyId, this.user!, { + detail: true + }); + } + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user!, { + detail: true + }); + } + } + + // 関係ない返信は除外 + if (note.reply) { + const reply = note.reply; + // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合 + if (reply.userId !== this.user!.id && note.userId !== this.user!.id && reply.userId !== note.userId) return; + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + // 流れてきたNoteがミュートすべきNoteだったら無視する + // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) + // 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、 + // レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。 + // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる + if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('notesStream', this.onNote); + } +} diff --git a/packages/backend/src/server/api/stream/channels/index.ts b/packages/backend/src/server/api/stream/channels/index.ts new file mode 100644 index 0000000000..1841573043 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/index.ts @@ -0,0 +1,37 @@ +import main from './main'; +import homeTimeline from './home-timeline'; +import localTimeline from './local-timeline'; +import hybridTimeline from './hybrid-timeline'; +import globalTimeline from './global-timeline'; +import serverStats from './server-stats'; +import queueStats from './queue-stats'; +import userList from './user-list'; +import antenna from './antenna'; +import messaging from './messaging'; +import messagingIndex from './messaging-index'; +import drive from './drive'; +import hashtag from './hashtag'; +import channel from './channel'; +import admin from './admin'; +import gamesReversi from './games/reversi'; +import gamesReversiGame from './games/reversi-game'; + +export default { + main, + homeTimeline, + localTimeline, + hybridTimeline, + globalTimeline, + serverStats, + queueStats, + userList, + antenna, + messaging, + messagingIndex, + drive, + hashtag, + channel, + admin, + gamesReversi, + gamesReversiGame +}; diff --git a/packages/backend/src/server/api/stream/channels/local-timeline.ts b/packages/backend/src/server/api/stream/channels/local-timeline.ts new file mode 100644 index 0000000000..a6166c2be2 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/local-timeline.ts @@ -0,0 +1,74 @@ +import autobind from 'autobind-decorator'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import Channel from '../channel'; +import { fetchMeta } from '@/misc/fetch-meta'; +import { Notes } from '@/models/index'; +import { checkWordMute } from '@/misc/check-word-mute'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'localTimeline'; + public static shouldShare = true; + public static requireCredential = false; + + @autobind + public async init(params: any) { + const meta = await fetchMeta(); + if (meta.disableLocalTimeline) { + if (this.user == null || (!this.user.isAdmin && !this.user.isModerator)) return; + } + + // Subscribe events + this.subscriber.on('notesStream', this.onNote); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + if (note.user.host !== null) return; + if (note.visibility !== 'public') return; + if (note.channelId != null && !this.followingChannels.has(note.channelId)) return; + + // リプライなら再pack + if (note.replyId != null) { + note.reply = await Notes.pack(note.replyId, this.user, { + detail: true + }); + } + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user, { + detail: true + }); + } + + // 関係ない返信は除外 + if (note.reply) { + const reply = note.reply; + // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合 + if (reply.userId !== this.user!.id && note.userId !== this.user!.id && reply.userId !== note.userId) return; + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + // 流れてきたNoteがミュートすべきNoteだったら無視する + // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) + // 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、 + // レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。 + // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる + if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('notesStream', this.onNote); + } +} diff --git a/packages/backend/src/server/api/stream/channels/main.ts b/packages/backend/src/server/api/stream/channels/main.ts new file mode 100644 index 0000000000..131ac30472 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/main.ts @@ -0,0 +1,43 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; +import { Notes } from '@/models/index'; + +export default class extends Channel { + public readonly chName = 'main'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + // Subscribe main stream channel + this.subscriber.on(`mainStream:${this.user!.id}`, async data => { + switch (data.type) { + case 'notification': { + if (data.body.userId && this.muting.has(data.body.userId)) return; + + if (data.body.note && data.body.note.isHidden) { + const note = await Notes.pack(data.body.note.id, this.user, { + detail: true + }); + this.connection.cacheNote(note); + data.body.note = note; + } + break; + } + case 'mention': { + if (this.muting.has(data.body.userId)) return; + if (data.body.isHidden) { + const note = await Notes.pack(data.body.id, this.user, { + detail: true + }); + this.connection.cacheNote(note); + data.body = note; + } + break; + } + } + + this.send(data.type, data.body); + }); + } +} diff --git a/packages/backend/src/server/api/stream/channels/messaging-index.ts b/packages/backend/src/server/api/stream/channels/messaging-index.ts new file mode 100644 index 0000000000..0c495398ab --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/messaging-index.ts @@ -0,0 +1,16 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; + +export default class extends Channel { + public readonly chName = 'messagingIndex'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + // Subscribe messaging index stream + this.subscriber.on(`messagingIndexStream:${this.user!.id}`, data => { + this.send(data); + }); + } +} diff --git a/packages/backend/src/server/api/stream/channels/messaging.ts b/packages/backend/src/server/api/stream/channels/messaging.ts new file mode 100644 index 0000000000..c049e880b9 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/messaging.ts @@ -0,0 +1,106 @@ +import autobind from 'autobind-decorator'; +import { readUserMessagingMessage, readGroupMessagingMessage, deliverReadActivity } from '../../common/read-messaging-message'; +import Channel from '../channel'; +import { UserGroupJoinings, Users, MessagingMessages } from '@/models/index'; +import { User, ILocalUser, IRemoteUser } from '@/models/entities/user'; +import { UserGroup } from '@/models/entities/user-group'; +import { StreamMessages } from '../types'; + +export default class extends Channel { + public readonly chName = 'messaging'; + public static shouldShare = false; + public static requireCredential = true; + + private otherpartyId: string | null; + private otherparty: User | null; + private groupId: string | null; + private subCh: `messagingStream:${User['id']}-${User['id']}` | `messagingStream:${UserGroup['id']}`; + private typers: Record = {}; + private emitTypersIntervalId: ReturnType; + + @autobind + public async init(params: any) { + this.otherpartyId = params.otherparty; + this.otherparty = this.otherpartyId ? await Users.findOneOrFail({ id: this.otherpartyId }) : null; + this.groupId = params.group; + + // Check joining + if (this.groupId) { + const joining = await UserGroupJoinings.findOne({ + userId: this.user!.id, + userGroupId: this.groupId + }); + + if (joining == null) { + return; + } + } + + this.emitTypersIntervalId = setInterval(this.emitTypers, 5000); + + this.subCh = this.otherpartyId + ? `messagingStream:${this.user!.id}-${this.otherpartyId}` + : `messagingStream:${this.groupId}`; + + // Subscribe messaging stream + this.subscriber.on(this.subCh, this.onEvent); + } + + @autobind + private onEvent(data: StreamMessages['messaging']['payload'] | StreamMessages['groupMessaging']['payload']) { + if (data.type === 'typing') { + const id = data.body; + const begin = this.typers[id] == null; + this.typers[id] = new Date(); + if (begin) { + this.emitTypers(); + } + } else { + this.send(data); + } + } + + @autobind + public onMessage(type: string, body: any) { + switch (type) { + case 'read': + if (this.otherpartyId) { + readUserMessagingMessage(this.user!.id, this.otherpartyId, [body.id]); + + // リモートユーザーからのメッセージだったら既読配信 + if (Users.isLocalUser(this.user!) && Users.isRemoteUser(this.otherparty!)) { + MessagingMessages.findOne(body.id).then(message => { + if (message) deliverReadActivity(this.user as ILocalUser, this.otherparty as IRemoteUser, message); + }); + } + } else if (this.groupId) { + readGroupMessagingMessage(this.user!.id, this.groupId, [body.id]); + } + break; + } + } + + @autobind + private async emitTypers() { + const now = new Date(); + + // Remove not typing users + for (const [userId, date] of Object.entries(this.typers)) { + if (now.getTime() - date.getTime() > 5000) delete this.typers[userId]; + } + + const users = await Users.packMany(Object.keys(this.typers), null, { detail: false }); + + this.send({ + type: 'typers', + body: users, + }); + } + + @autobind + public dispose() { + this.subscriber.off(this.subCh, this.onEvent); + + clearInterval(this.emitTypersIntervalId); + } +} diff --git a/packages/backend/src/server/api/stream/channels/queue-stats.ts b/packages/backend/src/server/api/stream/channels/queue-stats.ts new file mode 100644 index 0000000000..0bda0cfcb9 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/queue-stats.ts @@ -0,0 +1,41 @@ +import autobind from 'autobind-decorator'; +import Xev from 'xev'; +import Channel from '../channel'; + +const ev = new Xev(); + +export default class extends Channel { + public readonly chName = 'queueStats'; + public static shouldShare = true; + public static requireCredential = false; + + @autobind + public async init(params: any) { + ev.addListener('queueStats', this.onStats); + } + + @autobind + private onStats(stats: any) { + this.send('stats', stats); + } + + @autobind + public onMessage(type: string, body: any) { + switch (type) { + case 'requestLog': + ev.once(`queueStatsLog:${body.id}`, statsLog => { + this.send('statsLog', statsLog); + }); + ev.emit('requestQueueStatsLog', { + id: body.id, + length: body.length + }); + break; + } + } + + @autobind + public dispose() { + ev.removeListener('queueStats', this.onStats); + } +} diff --git a/packages/backend/src/server/api/stream/channels/server-stats.ts b/packages/backend/src/server/api/stream/channels/server-stats.ts new file mode 100644 index 0000000000..d245a7f70c --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/server-stats.ts @@ -0,0 +1,41 @@ +import autobind from 'autobind-decorator'; +import Xev from 'xev'; +import Channel from '../channel'; + +const ev = new Xev(); + +export default class extends Channel { + public readonly chName = 'serverStats'; + public static shouldShare = true; + public static requireCredential = false; + + @autobind + public async init(params: any) { + ev.addListener('serverStats', this.onStats); + } + + @autobind + private onStats(stats: any) { + this.send('stats', stats); + } + + @autobind + public onMessage(type: string, body: any) { + switch (type) { + case 'requestLog': + ev.once(`serverStatsLog:${body.id}`, statsLog => { + this.send('statsLog', statsLog); + }); + ev.emit('requestServerStatsLog', { + id: body.id, + length: body.length + }); + break; + } + } + + @autobind + public dispose() { + ev.removeListener('serverStats', this.onStats); + } +} diff --git a/packages/backend/src/server/api/stream/channels/user-list.ts b/packages/backend/src/server/api/stream/channels/user-list.ts new file mode 100644 index 0000000000..63b254605b --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/user-list.ts @@ -0,0 +1,92 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; +import { Notes, UserListJoinings, UserLists } from '@/models/index'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import { User } from '@/models/entities/user'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'userList'; + public static shouldShare = false; + public static requireCredential = false; + private listId: string; + public listUsers: User['id'][] = []; + private listUsersClock: NodeJS.Timer; + + @autobind + public async init(params: any) { + this.listId = params.listId as string; + + // Check existence and owner + const list = await UserLists.findOne({ + id: this.listId, + userId: this.user!.id + }); + if (!list) return; + + // Subscribe stream + this.subscriber.on(`userListStream:${this.listId}`, this.send); + + this.subscriber.on('notesStream', this.onNote); + + this.updateListUsers(); + this.listUsersClock = setInterval(this.updateListUsers, 5000); + } + + @autobind + private async updateListUsers() { + const users = await UserListJoinings.find({ + where: { + userListId: this.listId, + }, + select: ['userId'] + }); + + this.listUsers = users.map(x => x.userId); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + if (!this.listUsers.includes(note.userId)) return; + + if (['followers', 'specified'].includes(note.visibility)) { + note = await Notes.pack(note.id, this.user, { + detail: true + }); + + if (note.isHidden) { + return; + } + } else { + // リプライなら再pack + if (note.replyId != null) { + note.reply = await Notes.pack(note.replyId, this.user, { + detail: true + }); + } + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user, { + detail: true + }); + } + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off(`userListStream:${this.listId}`, this.send); + this.subscriber.off('notesStream', this.onNote); + + clearInterval(this.listUsersClock); + } +} diff --git a/packages/backend/src/server/api/stream/index.ts b/packages/backend/src/server/api/stream/index.ts new file mode 100644 index 0000000000..da4ea5ec99 --- /dev/null +++ b/packages/backend/src/server/api/stream/index.ts @@ -0,0 +1,421 @@ +import autobind from 'autobind-decorator'; +import * as websocket from 'websocket'; +import { readNotification } from '../common/read-notification'; +import call from '../call'; +import readNote from '@/services/note/read'; +import Channel from './channel'; +import channels from './channels/index'; +import { EventEmitter } from 'events'; +import { User } from '@/models/entities/user'; +import { Channel as ChannelModel } from '@/models/entities/channel'; +import { Users, Followings, Mutings, UserProfiles, ChannelFollowings, Blockings } from '@/models/index'; +import { ApiError } from '../error'; +import { AccessToken } from '@/models/entities/access-token'; +import { UserProfile } from '@/models/entities/user-profile'; +import { publishChannelStream, publishGroupMessagingStream, publishMessagingStream } from '@/services/stream'; +import { UserGroup } from '@/models/entities/user-group'; +import { StreamEventEmitter, StreamMessages } from './types'; +import { Packed } from '@/misc/schema'; + +/** + * Main stream connection + */ +export default class Connection { + public user?: User; + public userProfile?: UserProfile; + public following: Set = new Set(); + public muting: Set = new Set(); + public blocking: Set = new Set(); // "被"blocking + public followingChannels: Set = new Set(); + public token?: AccessToken; + private wsConnection: websocket.connection; + public subscriber: StreamEventEmitter; + private channels: Channel[] = []; + private subscribingNotes: any = {}; + private cachedNotes: Packed<'Note'>[] = []; + + constructor( + wsConnection: websocket.connection, + subscriber: EventEmitter, + user: User | null | undefined, + token: AccessToken | null | undefined + ) { + this.wsConnection = wsConnection; + this.subscriber = subscriber; + if (user) this.user = user; + if (token) this.token = token; + + this.wsConnection.on('message', this.onWsConnectionMessage); + + this.subscriber.on('broadcast', data => { + this.onBroadcastMessage(data); + }); + + if (this.user) { + this.updateFollowing(); + this.updateMuting(); + this.updateBlocking(); + this.updateFollowingChannels(); + this.updateUserProfile(); + + this.subscriber.on(`user:${this.user.id}`, this.onUserEvent); + } + } + + @autobind + private onUserEvent(data: StreamMessages['user']['payload']) { // { type, body }と展開するとそれぞれ型が分離してしまう + switch (data.type) { + case 'follow': + this.following.add(data.body.id); + break; + + case 'unfollow': + this.following.delete(data.body.id); + break; + + case 'mute': + this.muting.add(data.body.id); + break; + + case 'unmute': + this.muting.delete(data.body.id); + break; + + // TODO: block events + + case 'followChannel': + this.followingChannels.add(data.body.id); + break; + + case 'unfollowChannel': + this.followingChannels.delete(data.body.id); + break; + + case 'updateUserProfile': + this.userProfile = data.body; + break; + + case 'terminate': + this.wsConnection.close(); + this.dispose(); + break; + + default: + break; + } + } + + /** + * クライアントからメッセージ受信時 + */ + @autobind + private async onWsConnectionMessage(data: websocket.IMessage) { + if (data.utf8Data == null) return; + + let obj: Record; + + try { + obj = JSON.parse(data.utf8Data); + } catch (e) { + return; + } + + const { type, body } = obj; + + switch (type) { + case 'api': this.onApiRequest(body); break; + case 'readNotification': this.onReadNotification(body); break; + case 'subNote': this.onSubscribeNote(body); break; + case 's': this.onSubscribeNote(body); break; // alias + case 'sr': this.onSubscribeNote(body); this.readNote(body); break; + case 'unsubNote': this.onUnsubscribeNote(body); break; + case 'un': this.onUnsubscribeNote(body); break; // alias + case 'connect': this.onChannelConnectRequested(body); break; + case 'disconnect': this.onChannelDisconnectRequested(body); break; + case 'channel': this.onChannelMessageRequested(body); break; + case 'ch': this.onChannelMessageRequested(body); break; // alias + + // 個々のチャンネルではなくルートレベルでこれらのメッセージを受け取る理由は、 + // クライアントの事情を考慮したとき、入力フォームはノートチャンネルやメッセージのメインコンポーネントとは別 + // なこともあるため、それらのコンポーネントがそれぞれ各チャンネルに接続するようにするのは面倒なため。 + case 'typingOnChannel': this.typingOnChannel(body.channel); break; + case 'typingOnMessaging': this.typingOnMessaging(body); break; + } + } + + @autobind + private onBroadcastMessage(data: StreamMessages['broadcast']['payload']) { + this.sendMessageToWs(data.type, data.body); + } + + @autobind + public cacheNote(note: Packed<'Note'>) { + const add = (note: Packed<'Note'>) => { + const existIndex = this.cachedNotes.findIndex(n => n.id === note.id); + if (existIndex > -1) { + this.cachedNotes[existIndex] = note; + return; + } + + this.cachedNotes.unshift(note); + if (this.cachedNotes.length > 32) { + this.cachedNotes.splice(32); + } + }; + + add(note); + if (note.reply) add(note.reply); + if (note.renote) add(note.renote); + } + + @autobind + private readNote(body: any) { + const id = body.id; + + const note = this.cachedNotes.find(n => n.id === id); + if (note == null) return; + + if (this.user && (note.userId !== this.user.id)) { + readNote(this.user.id, [note], { + following: this.following, + followingChannels: this.followingChannels, + }); + } + } + + /** + * APIリクエスト要求時 + */ + @autobind + private async onApiRequest(payload: any) { + // 新鮮なデータを利用するためにユーザーをフェッチ + const user = this.user ? await Users.findOne(this.user.id) : null; + + const endpoint = payload.endpoint || payload.ep; // alias + + // 呼び出し + call(endpoint, user, this.token, payload.data).then(res => { + this.sendMessageToWs(`api:${payload.id}`, { res }); + }).catch((e: ApiError) => { + this.sendMessageToWs(`api:${payload.id}`, { + error: { + message: e.message, + code: e.code, + id: e.id, + kind: e.kind, + ...(e.info ? { info: e.info } : {}) + } + }); + }); + } + + @autobind + private onReadNotification(payload: any) { + if (!payload.id) return; + readNotification(this.user!.id, [payload.id]); + } + + /** + * 投稿購読要求時 + */ + @autobind + private onSubscribeNote(payload: any) { + if (!payload.id) return; + + if (this.subscribingNotes[payload.id] == null) { + this.subscribingNotes[payload.id] = 0; + } + + this.subscribingNotes[payload.id]++; + + if (this.subscribingNotes[payload.id] === 1) { + this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage); + } + } + + /** + * 投稿購読解除要求時 + */ + @autobind + private onUnsubscribeNote(payload: any) { + if (!payload.id) return; + + this.subscribingNotes[payload.id]--; + if (this.subscribingNotes[payload.id] <= 0) { + delete this.subscribingNotes[payload.id]; + this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage); + } + } + + @autobind + private async onNoteStreamMessage(data: StreamMessages['note']['payload']) { + this.sendMessageToWs('noteUpdated', { + id: data.body.id, + type: data.type, + body: data.body.body, + }); + } + + /** + * チャンネル接続要求時 + */ + @autobind + private onChannelConnectRequested(payload: any) { + const { channel, id, params, pong } = payload; + this.connectChannel(id, params, channel, pong); + } + + /** + * チャンネル切断要求時 + */ + @autobind + private onChannelDisconnectRequested(payload: any) { + const { id } = payload; + this.disconnectChannel(id); + } + + /** + * クライアントにメッセージ送信 + */ + @autobind + public sendMessageToWs(type: string, payload: any) { + this.wsConnection.send(JSON.stringify({ + type: type, + body: payload + })); + } + + /** + * チャンネルに接続 + */ + @autobind + public connectChannel(id: string, params: any, channel: string, pong = false) { + if ((channels as any)[channel].requireCredential && this.user == null) { + return; + } + + // 共有可能チャンネルに接続しようとしていて、かつそのチャンネルに既に接続していたら無意味なので無視 + if ((channels as any)[channel].shouldShare && this.channels.some(c => c.chName === channel)) { + return; + } + + const ch: Channel = new (channels as any)[channel](id, this); + this.channels.push(ch); + ch.init(params); + + if (pong) { + this.sendMessageToWs('connected', { + id: id + }); + } + } + + /** + * チャンネルから切断 + * @param id チャンネルコネクションID + */ + @autobind + public disconnectChannel(id: string) { + const channel = this.channels.find(c => c.id === id); + + if (channel) { + if (channel.dispose) channel.dispose(); + this.channels = this.channels.filter(c => c.id !== id); + } + } + + /** + * チャンネルへメッセージ送信要求時 + * @param data メッセージ + */ + @autobind + private onChannelMessageRequested(data: any) { + const channel = this.channels.find(c => c.id === data.id); + if (channel != null && channel.onMessage != null) { + channel.onMessage(data.type, data.body); + } + } + + @autobind + private typingOnChannel(channel: ChannelModel['id']) { + if (this.user) { + publishChannelStream(channel, 'typing', this.user.id); + } + } + + @autobind + private typingOnMessaging(param: { partner?: User['id']; group?: UserGroup['id']; }) { + if (this.user) { + if (param.partner) { + publishMessagingStream(param.partner, this.user.id, 'typing', this.user.id); + } else if (param.group) { + publishGroupMessagingStream(param.group, 'typing', this.user.id); + } + } + } + + @autobind + private async updateFollowing() { + const followings = await Followings.find({ + where: { + followerId: this.user!.id + }, + select: ['followeeId'] + }); + + this.following = new Set(followings.map(x => x.followeeId)); + } + + @autobind + private async updateMuting() { + const mutings = await Mutings.find({ + where: { + muterId: this.user!.id + }, + select: ['muteeId'] + }); + + this.muting = new Set(mutings.map(x => x.muteeId)); + } + + @autobind + private async updateBlocking() { // ここでいうBlockingは被Blockingの意 + const blockings = await Blockings.find({ + where: { + blockeeId: this.user!.id + }, + select: ['blockerId'] + }); + + this.blocking = new Set(blockings.map(x => x.blockerId)); + } + + @autobind + private async updateFollowingChannels() { + const followings = await ChannelFollowings.find({ + where: { + followerId: this.user!.id + }, + select: ['followeeId'] + }); + + this.followingChannels = new Set(followings.map(x => x.followeeId)); + } + + @autobind + private async updateUserProfile() { + this.userProfile = await UserProfiles.findOne({ + userId: this.user!.id + }); + } + + /** + * ストリームが切れたとき + */ + @autobind + public dispose() { + for (const c of this.channels.filter(c => c.dispose)) { + if (c.dispose) c.dispose(); + } + } +} diff --git a/packages/backend/src/server/api/stream/types.ts b/packages/backend/src/server/api/stream/types.ts new file mode 100644 index 0000000000..70eb5c5ce5 --- /dev/null +++ b/packages/backend/src/server/api/stream/types.ts @@ -0,0 +1,299 @@ +import { EventEmitter } from 'events'; +import Emitter from 'strict-event-emitter-types'; +import { Channel } from '@/models/entities/channel'; +import { User } from '@/models/entities/user'; +import { UserProfile } from '@/models/entities/user-profile'; +import { Note } from '@/models/entities/note'; +import { Antenna } from '@/models/entities/antenna'; +import { DriveFile } from '@/models/entities/drive-file'; +import { DriveFolder } from '@/models/entities/drive-folder'; +import { Emoji } from '@/models/entities/emoji'; +import { UserList } from '@/models/entities/user-list'; +import { MessagingMessage } from '@/models/entities/messaging-message'; +import { UserGroup } from '@/models/entities/user-group'; +import { ReversiGame } from '@/models/entities/games/reversi/game'; +import { AbuseUserReport } from '@/models/entities/abuse-user-report'; +import { Signin } from '@/models/entities/signin'; +import { Page } from '@/models/entities/page'; +import { Packed } from '@/misc/schema'; + +//#region Stream type-body definitions +export interface InternalStreamTypes { + antennaCreated: Antenna; + antennaDeleted: Antenna; + antennaUpdated: Antenna; +} + +export interface BroadcastTypes { + emojiAdded: { + emoji: Packed<'Emoji'>; + }; +} + +export interface UserStreamTypes { + terminate: {}; + followChannel: Channel; + unfollowChannel: Channel; + updateUserProfile: UserProfile; + mute: User; + unmute: User; + follow: Packed<'User'>; + unfollow: Packed<'User'>; + userAdded: Packed<'User'>; +} + +export interface MainStreamTypes { + notification: Packed<'Notification'>; + mention: Packed<'Note'>; + reply: Packed<'Note'>; + renote: Packed<'Note'>; + follow: Packed<'User'>; + followed: Packed<'User'>; + unfollow: Packed<'User'>; + meUpdated: Packed<'User'>; + pageEvent: { + pageId: Page['id']; + event: string; + var: any; + userId: User['id']; + user: Packed<'User'>; + }; + urlUploadFinished: { + marker?: string | null; + file: Packed<'DriveFile'>; + }; + readAllNotifications: undefined; + unreadNotification: Packed<'Notification'>; + unreadMention: Note['id']; + readAllUnreadMentions: undefined; + unreadSpecifiedNote: Note['id']; + readAllUnreadSpecifiedNotes: undefined; + readAllMessagingMessages: undefined; + messagingMessage: Packed<'MessagingMessage'>; + unreadMessagingMessage: Packed<'MessagingMessage'>; + readAllAntennas: undefined; + unreadAntenna: Antenna; + readAllAnnouncements: undefined; + readAllChannels: undefined; + unreadChannel: Note['id']; + myTokenRegenerated: undefined; + reversiNoInvites: undefined; + reversiInvited: Packed<'ReversiMatching'>; + signin: Signin; + registryUpdated: { + scope?: string[]; + key: string; + value: any | null; + }; + driveFileCreated: Packed<'DriveFile'>; + readAntenna: Antenna; +} + +export interface DriveStreamTypes { + fileCreated: Packed<'DriveFile'>; + fileDeleted: DriveFile['id']; + fileUpdated: Packed<'DriveFile'>; + folderCreated: Packed<'DriveFolder'>; + folderDeleted: DriveFolder['id']; + folderUpdated: Packed<'DriveFolder'>; +} + +export interface NoteStreamTypes { + pollVoted: { + choice: number; + userId: User['id']; + }; + deleted: { + deletedAt: Date; + }; + reacted: { + reaction: string; + emoji?: Emoji; + userId: User['id']; + }; + unreacted: { + reaction: string; + userId: User['id']; + }; +} +type NoteStreamEventTypes = { + [key in keyof NoteStreamTypes]: { + id: Note['id']; + body: NoteStreamTypes[key]; + }; +}; + +export interface ChannelStreamTypes { + typing: User['id']; +} + +export interface UserListStreamTypes { + userAdded: Packed<'User'>; + userRemoved: Packed<'User'>; +} + +export interface AntennaStreamTypes { + note: Note; +} + +export interface MessagingStreamTypes { + read: MessagingMessage['id'][]; + typing: User['id']; + message: Packed<'MessagingMessage'>; + deleted: MessagingMessage['id']; +} + +export interface GroupMessagingStreamTypes { + read: { + ids: MessagingMessage['id'][]; + userId: User['id']; + }; + typing: User['id']; + message: Packed<'MessagingMessage'>; + deleted: MessagingMessage['id']; +} + +export interface MessagingIndexStreamTypes { + read: MessagingMessage['id'][]; + message: Packed<'MessagingMessage'>; +} + +export interface ReversiStreamTypes { + matched: Packed<'ReversiGame'>; + invited: Packed<'ReversiMatching'>; +} + +export interface ReversiGameStreamTypes { + started: Packed<'ReversiGame'>; + ended: { + winnerId?: User['id'] | null, + game: Packed<'ReversiGame'>; + }; + updateSettings: { + key: string; + value: FIXME; + }; + initForm: { + userId: User['id']; + form: FIXME; + }; + updateForm: { + userId: User['id']; + id: string; + value: FIXME; + }; + message: { + userId: User['id']; + message: FIXME; + }; + changeAccepts: { + user1: boolean; + user2: boolean; + }; + set: { + at: Date; + color: boolean; + pos: number; + next: boolean; + }; + watching: User['id']; +} + +export interface AdminStreamTypes { + newAbuseUserReport: { + id: AbuseUserReport['id']; + targetUserId: User['id'], + reporterId: User['id'], + comment: string; + }; +} +//#endregion + +// 辞書(interface or type)から{ type, body }ユニオンを定義 +// https://stackoverflow.com/questions/49311989/can-i-infer-the-type-of-a-value-using-extends-keyof-type +// VS Codeの展開を防止するためにEvents型を定義 +type Events = { [K in keyof T]: { type: K; body: T[K]; } }; +type EventUnionFromDictionary< + T extends object, + U = Events +> = U[keyof U]; + +// name/messages(spec) pairs dictionary +export type StreamMessages = { + internal: { + name: 'internal'; + payload: EventUnionFromDictionary; + }; + broadcast: { + name: 'broadcast'; + payload: EventUnionFromDictionary; + }; + user: { + name: `user:${User['id']}`; + payload: EventUnionFromDictionary; + }; + main: { + name: `mainStream:${User['id']}`; + payload: EventUnionFromDictionary; + }; + drive: { + name: `driveStream:${User['id']}`; + payload: EventUnionFromDictionary; + }; + note: { + name: `noteStream:${Note['id']}`; + payload: EventUnionFromDictionary; + }; + channel: { + name: `channelStream:${Channel['id']}`; + payload: EventUnionFromDictionary; + }; + userList: { + name: `userListStream:${UserList['id']}`; + payload: EventUnionFromDictionary; + }; + antenna: { + name: `antennaStream:${Antenna['id']}`; + payload: EventUnionFromDictionary; + }; + messaging: { + name: `messagingStream:${User['id']}-${User['id']}`; + payload: EventUnionFromDictionary; + }; + groupMessaging: { + name: `messagingStream:${UserGroup['id']}`; + payload: EventUnionFromDictionary; + }; + messagingIndex: { + name: `messagingIndexStream:${User['id']}`; + payload: EventUnionFromDictionary; + }; + reversi: { + name: `reversiStream:${User['id']}`; + payload: EventUnionFromDictionary; + }; + reversiGame: { + name: `reversiGameStream:${ReversiGame['id']}`; + payload: EventUnionFromDictionary; + }; + admin: { + name: `adminStream:${User['id']}`; + payload: EventUnionFromDictionary; + }; + notes: { + name: 'notesStream'; + payload: Packed<'Note'>; + }; +}; + +// API event definitions +// ストリームごとのEmitterの辞書を用意 +type EventEmitterDictionary = { [x in keyof StreamMessages]: Emitter void }> }; +// 共用体型を交差型にする型 https://stackoverflow.com/questions/54938141/typescript-convert-union-to-intersection +type UnionToIntersection = (U extends any ? (k: U) => void : never) extends ((k: infer I) => void) ? I : never; +// Emitter辞書から共用体型を作り、UnionToIntersectionで交差型にする +export type StreamEventEmitter = UnionToIntersection; +// { [y in name]: (e: spec) => void }をまとめてその交差型をEmitterにかけるとts(2590)にひっかかる + +// provide stream channels union +export type StreamChannels = StreamMessages[keyof StreamMessages]['name']; -- cgit v1.2.3-freya