From 0e4a111f81cceed275d9bec2695f6e401fb654d8 Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 12 Nov 2021 02:02:25 +0900 Subject: refactoring Resolve #7779 --- packages/backend/src/server/api/stream/channel.ts | 62 +++ .../src/server/api/stream/channels/admin.ts | 16 + .../src/server/api/stream/channels/antenna.ts | 45 +++ .../src/server/api/stream/channels/channel.ts | 92 +++++ .../src/server/api/stream/channels/drive.ts | 16 + .../api/stream/channels/games/reversi-game.ts | 372 ++++++++++++++++++ .../server/api/stream/channels/games/reversi.ts | 33 ++ .../server/api/stream/channels/global-timeline.ts | 73 ++++ .../src/server/api/stream/channels/hashtag.ts | 53 +++ .../server/api/stream/channels/home-timeline.ts | 81 ++++ .../server/api/stream/channels/hybrid-timeline.ts | 89 +++++ .../src/server/api/stream/channels/index.ts | 37 ++ .../server/api/stream/channels/local-timeline.ts | 74 ++++ .../backend/src/server/api/stream/channels/main.ts | 43 +++ .../server/api/stream/channels/messaging-index.ts | 16 + .../src/server/api/stream/channels/messaging.ts | 106 ++++++ .../src/server/api/stream/channels/queue-stats.ts | 41 ++ .../src/server/api/stream/channels/server-stats.ts | 41 ++ .../src/server/api/stream/channels/user-list.ts | 92 +++++ packages/backend/src/server/api/stream/index.ts | 421 +++++++++++++++++++++ packages/backend/src/server/api/stream/types.ts | 299 +++++++++++++++ 21 files changed, 2102 insertions(+) create mode 100644 packages/backend/src/server/api/stream/channel.ts create mode 100644 packages/backend/src/server/api/stream/channels/admin.ts create mode 100644 packages/backend/src/server/api/stream/channels/antenna.ts create mode 100644 packages/backend/src/server/api/stream/channels/channel.ts create mode 100644 packages/backend/src/server/api/stream/channels/drive.ts create mode 100644 packages/backend/src/server/api/stream/channels/games/reversi-game.ts create mode 100644 packages/backend/src/server/api/stream/channels/games/reversi.ts create mode 100644 packages/backend/src/server/api/stream/channels/global-timeline.ts create mode 100644 packages/backend/src/server/api/stream/channels/hashtag.ts create mode 100644 packages/backend/src/server/api/stream/channels/home-timeline.ts create mode 100644 packages/backend/src/server/api/stream/channels/hybrid-timeline.ts create mode 100644 packages/backend/src/server/api/stream/channels/index.ts create mode 100644 packages/backend/src/server/api/stream/channels/local-timeline.ts create mode 100644 packages/backend/src/server/api/stream/channels/main.ts create mode 100644 packages/backend/src/server/api/stream/channels/messaging-index.ts create mode 100644 packages/backend/src/server/api/stream/channels/messaging.ts create mode 100644 packages/backend/src/server/api/stream/channels/queue-stats.ts create mode 100644 packages/backend/src/server/api/stream/channels/server-stats.ts create mode 100644 packages/backend/src/server/api/stream/channels/user-list.ts create mode 100644 packages/backend/src/server/api/stream/index.ts create mode 100644 packages/backend/src/server/api/stream/types.ts (limited to 'packages/backend/src/server/api/stream') diff --git a/packages/backend/src/server/api/stream/channel.ts b/packages/backend/src/server/api/stream/channel.ts new file mode 100644 index 0000000000..2824d7d1b8 --- /dev/null +++ b/packages/backend/src/server/api/stream/channel.ts @@ -0,0 +1,62 @@ +import autobind from 'autobind-decorator'; +import Connection from '.'; + +/** + * Stream channel + */ +export default abstract class Channel { + protected connection: Connection; + public id: string; + public abstract readonly chName: string; + public static readonly shouldShare: boolean; + public static readonly requireCredential: boolean; + + protected get user() { + return this.connection.user; + } + + protected get userProfile() { + return this.connection.userProfile; + } + + protected get following() { + return this.connection.following; + } + + protected get muting() { + return this.connection.muting; + } + + protected get blocking() { + return this.connection.blocking; + } + + protected get followingChannels() { + return this.connection.followingChannels; + } + + protected get subscriber() { + return this.connection.subscriber; + } + + constructor(id: string, connection: Connection) { + this.id = id; + this.connection = connection; + } + + @autobind + public send(typeOrPayload: any, payload?: any) { + const type = payload === undefined ? typeOrPayload.type : typeOrPayload; + const body = payload === undefined ? typeOrPayload.body : payload; + + this.connection.sendMessageToWs('channel', { + id: this.id, + type: type, + body: body + }); + } + + public abstract init(params: any): void; + public dispose?(): void; + public onMessage?(type: string, body: any): void; +} diff --git a/packages/backend/src/server/api/stream/channels/admin.ts b/packages/backend/src/server/api/stream/channels/admin.ts new file mode 100644 index 0000000000..1ff932d1dd --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/admin.ts @@ -0,0 +1,16 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; + +export default class extends Channel { + public readonly chName = 'admin'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + // Subscribe admin stream + this.subscriber.on(`adminStream:${this.user!.id}`, data => { + this.send(data); + }); + } +} diff --git a/packages/backend/src/server/api/stream/channels/antenna.ts b/packages/backend/src/server/api/stream/channels/antenna.ts new file mode 100644 index 0000000000..3cbdfebb43 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/antenna.ts @@ -0,0 +1,45 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; +import { Notes } from '@/models/index'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { StreamMessages } from '../types'; + +export default class extends Channel { + public readonly chName = 'antenna'; + public static shouldShare = false; + public static requireCredential = false; + private antennaId: string; + + @autobind + public async init(params: any) { + this.antennaId = params.antennaId as string; + + // Subscribe stream + this.subscriber.on(`antennaStream:${this.antennaId}`, this.onEvent); + } + + @autobind + private async onEvent(data: StreamMessages['antenna']['payload']) { + if (data.type === 'note') { + const note = await Notes.pack(data.body.id, this.user, { detail: true }); + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } else { + this.send(data.type, data.body); + } + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off(`antennaStream:${this.antennaId}`, this.onEvent); + } +} diff --git a/packages/backend/src/server/api/stream/channels/channel.ts b/packages/backend/src/server/api/stream/channels/channel.ts new file mode 100644 index 0000000000..bf7942f522 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/channel.ts @@ -0,0 +1,92 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; +import { Notes, Users } from '@/models/index'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { User } from '@/models/entities/user'; +import { StreamMessages } from '../types'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'channel'; + public static shouldShare = false; + public static requireCredential = false; + private channelId: string; + private typers: Record = {}; + private emitTypersIntervalId: ReturnType; + + @autobind + public async init(params: any) { + this.channelId = params.channelId as string; + + // Subscribe stream + this.subscriber.on('notesStream', this.onNote); + this.subscriber.on(`channelStream:${this.channelId}`, this.onEvent); + this.emitTypersIntervalId = setInterval(this.emitTypers, 5000); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + if (note.channelId !== this.channelId) return; + + // リプライなら再pack + if (note.replyId != null) { + note.reply = await Notes.pack(note.replyId, this.user, { + detail: true + }); + } + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user, { + detail: true + }); + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } + + @autobind + private onEvent(data: StreamMessages['channel']['payload']) { + if (data.type === 'typing') { + const id = data.body; + const begin = this.typers[id] == null; + this.typers[id] = new Date(); + if (begin) { + this.emitTypers(); + } + } + } + + @autobind + private async emitTypers() { + const now = new Date(); + + // Remove not typing users + for (const [userId, date] of Object.entries(this.typers)) { + if (now.getTime() - date.getTime() > 5000) delete this.typers[userId]; + } + + const users = await Users.packMany(Object.keys(this.typers), null, { detail: false }); + + this.send({ + type: 'typers', + body: users, + }); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('notesStream', this.onNote); + this.subscriber.off(`channelStream:${this.channelId}`, this.onEvent); + + clearInterval(this.emitTypersIntervalId); + } +} diff --git a/packages/backend/src/server/api/stream/channels/drive.ts b/packages/backend/src/server/api/stream/channels/drive.ts new file mode 100644 index 0000000000..4112dd9b04 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/drive.ts @@ -0,0 +1,16 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; + +export default class extends Channel { + public readonly chName = 'drive'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + // Subscribe drive stream + this.subscriber.on(`driveStream:${this.user!.id}`, data => { + this.send(data); + }); + } +} diff --git a/packages/backend/src/server/api/stream/channels/games/reversi-game.ts b/packages/backend/src/server/api/stream/channels/games/reversi-game.ts new file mode 100644 index 0000000000..bfdbf1d266 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/games/reversi-game.ts @@ -0,0 +1,372 @@ +import autobind from 'autobind-decorator'; +import * as CRC32 from 'crc-32'; +import { publishReversiGameStream } from '@/services/stream'; +import Reversi from '../../../../../games/reversi/core'; +import * as maps from '../../../../../games/reversi/maps'; +import Channel from '../../channel'; +import { ReversiGame } from '@/models/entities/games/reversi/game'; +import { ReversiGames, Users } from '@/models/index'; +import { User } from '@/models/entities/user'; + +export default class extends Channel { + public readonly chName = 'gamesReversiGame'; + public static shouldShare = false; + public static requireCredential = false; + + private gameId: ReversiGame['id'] | null = null; + private watchers: Record = {}; + private emitWatchersIntervalId: ReturnType; + + @autobind + public async init(params: any) { + this.gameId = params.gameId; + + // Subscribe game stream + this.subscriber.on(`reversiGameStream:${this.gameId}`, this.onEvent); + this.emitWatchersIntervalId = setInterval(this.emitWatchers, 5000); + + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + // 観戦者イベント + this.watch(game); + } + + @autobind + private onEvent(data: any) { + if (data.type === 'watching') { + const id = data.body; + this.watchers[id] = new Date(); + } else { + this.send(data); + } + } + + @autobind + private async emitWatchers() { + const now = new Date(); + + // Remove not watching users + for (const [userId, date] of Object.entries(this.watchers)) { + if (now.getTime() - date.getTime() > 5000) delete this.watchers[userId]; + } + + const users = await Users.packMany(Object.keys(this.watchers), null, { detail: false }); + + this.send({ + type: 'watchers', + body: users, + }); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off(`reversiGameStream:${this.gameId}`, this.onEvent); + clearInterval(this.emitWatchersIntervalId); + } + + @autobind + public onMessage(type: string, body: any) { + switch (type) { + case 'accept': this.accept(true); break; + case 'cancelAccept': this.accept(false); break; + case 'updateSettings': this.updateSettings(body.key, body.value); break; + case 'initForm': this.initForm(body); break; + case 'updateForm': this.updateForm(body.id, body.value); break; + case 'message': this.message(body); break; + case 'set': this.set(body.pos); break; + case 'check': this.check(body.crc32); break; + } + } + + @autobind + private async updateSettings(key: string, value: any) { + if (this.user == null) return; + + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + if (game.isStarted) return; + if ((game.user1Id !== this.user.id) && (game.user2Id !== this.user.id)) return; + if ((game.user1Id === this.user.id) && game.user1Accepted) return; + if ((game.user2Id === this.user.id) && game.user2Accepted) return; + + if (!['map', 'bw', 'isLlotheo', 'canPutEverywhere', 'loopedBoard'].includes(key)) return; + + await ReversiGames.update(this.gameId!, { + [key]: value + }); + + publishReversiGameStream(this.gameId!, 'updateSettings', { + key: key, + value: value + }); + } + + @autobind + private async initForm(form: any) { + if (this.user == null) return; + + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + if (game.isStarted) return; + if ((game.user1Id !== this.user.id) && (game.user2Id !== this.user.id)) return; + + const set = game.user1Id === this.user.id ? { + form1: form + } : { + form2: form + }; + + await ReversiGames.update(this.gameId!, set); + + publishReversiGameStream(this.gameId!, 'initForm', { + userId: this.user.id, + form + }); + } + + @autobind + private async updateForm(id: string, value: any) { + if (this.user == null) return; + + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + if (game.isStarted) return; + if ((game.user1Id !== this.user.id) && (game.user2Id !== this.user.id)) return; + + const form = game.user1Id === this.user.id ? game.form2 : game.form1; + + const item = form.find((i: any) => i.id == id); + + if (item == null) return; + + item.value = value; + + const set = game.user1Id === this.user.id ? { + form2: form + } : { + form1: form + }; + + await ReversiGames.update(this.gameId!, set); + + publishReversiGameStream(this.gameId!, 'updateForm', { + userId: this.user.id, + id, + value + }); + } + + @autobind + private async message(message: any) { + if (this.user == null) return; + + message.id = Math.random(); + publishReversiGameStream(this.gameId!, 'message', { + userId: this.user.id, + message + }); + } + + @autobind + private async accept(accept: boolean) { + if (this.user == null) return; + + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + if (game.isStarted) return; + + let bothAccepted = false; + + if (game.user1Id === this.user.id) { + await ReversiGames.update(this.gameId!, { + user1Accepted: accept + }); + + publishReversiGameStream(this.gameId!, 'changeAccepts', { + user1: accept, + user2: game.user2Accepted + }); + + if (accept && game.user2Accepted) bothAccepted = true; + } else if (game.user2Id === this.user.id) { + await ReversiGames.update(this.gameId!, { + user2Accepted: accept + }); + + publishReversiGameStream(this.gameId!, 'changeAccepts', { + user1: game.user1Accepted, + user2: accept + }); + + if (accept && game.user1Accepted) bothAccepted = true; + } else { + return; + } + + if (bothAccepted) { + // 3秒後、まだacceptされていたらゲーム開始 + setTimeout(async () => { + const freshGame = await ReversiGames.findOne(this.gameId!); + if (freshGame == null || freshGame.isStarted || freshGame.isEnded) return; + if (!freshGame.user1Accepted || !freshGame.user2Accepted) return; + + let bw: number; + if (freshGame.bw == 'random') { + bw = Math.random() > 0.5 ? 1 : 2; + } else { + bw = parseInt(freshGame.bw, 10); + } + + function getRandomMap() { + const mapCount = Object.entries(maps).length; + const rnd = Math.floor(Math.random() * mapCount); + return Object.values(maps)[rnd].data; + } + + const map = freshGame.map != null ? freshGame.map : getRandomMap(); + + await ReversiGames.update(this.gameId!, { + startedAt: new Date(), + isStarted: true, + black: bw, + map: map + }); + + //#region 盤面に最初から石がないなどして始まった瞬間に勝敗が決定する場合があるのでその処理 + const o = new Reversi(map, { + isLlotheo: freshGame.isLlotheo, + canPutEverywhere: freshGame.canPutEverywhere, + loopedBoard: freshGame.loopedBoard + }); + + if (o.isEnded) { + let winner; + if (o.winner === true) { + winner = freshGame.black == 1 ? freshGame.user1Id : freshGame.user2Id; + } else if (o.winner === false) { + winner = freshGame.black == 1 ? freshGame.user2Id : freshGame.user1Id; + } else { + winner = null; + } + + await ReversiGames.update(this.gameId!, { + isEnded: true, + winnerId: winner + }); + + publishReversiGameStream(this.gameId!, 'ended', { + winnerId: winner, + game: await ReversiGames.pack(this.gameId!, this.user) + }); + } + //#endregion + + publishReversiGameStream(this.gameId!, 'started', + await ReversiGames.pack(this.gameId!, this.user)); + }, 3000); + } + } + + // 石を打つ + @autobind + private async set(pos: number) { + if (this.user == null) return; + + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + if (!game.isStarted) return; + if (game.isEnded) return; + if ((game.user1Id !== this.user.id) && (game.user2Id !== this.user.id)) return; + + const myColor = + ((game.user1Id === this.user.id) && game.black == 1) || ((game.user2Id === this.user.id) && game.black == 2) + ? true + : false; + + const o = new Reversi(game.map, { + isLlotheo: game.isLlotheo, + canPutEverywhere: game.canPutEverywhere, + loopedBoard: game.loopedBoard + }); + + // 盤面の状態を再生 + for (const log of game.logs) { + o.put(log.color, log.pos); + } + + if (o.turn !== myColor) return; + + if (!o.canPut(myColor, pos)) return; + o.put(myColor, pos); + + let winner; + if (o.isEnded) { + if (o.winner === true) { + winner = game.black == 1 ? game.user1Id : game.user2Id; + } else if (o.winner === false) { + winner = game.black == 1 ? game.user2Id : game.user1Id; + } else { + winner = null; + } + } + + const log = { + at: new Date(), + color: myColor, + pos + }; + + const crc32 = CRC32.str(game.logs.map(x => x.pos.toString()).join('') + pos.toString()).toString(); + + game.logs.push(log); + + await ReversiGames.update(this.gameId!, { + crc32, + isEnded: o.isEnded, + winnerId: winner, + logs: game.logs + }); + + publishReversiGameStream(this.gameId!, 'set', Object.assign(log, { + next: o.turn + })); + + if (o.isEnded) { + publishReversiGameStream(this.gameId!, 'ended', { + winnerId: winner, + game: await ReversiGames.pack(this.gameId!, this.user) + }); + } + } + + @autobind + private async check(crc32: string | number) { + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + if (!game.isStarted) return; + + if (crc32.toString() !== game.crc32) { + this.send('rescue', await ReversiGames.pack(game, this.user)); + } + + // ついでに観戦者イベントを発行 + this.watch(game); + } + + @autobind + private watch(game: ReversiGame) { + if (this.user != null) { + if ((game.user1Id !== this.user.id) && (game.user2Id !== this.user.id)) { + publishReversiGameStream(this.gameId!, 'watching', this.user.id); + } + } + } +} diff --git a/packages/backend/src/server/api/stream/channels/games/reversi.ts b/packages/backend/src/server/api/stream/channels/games/reversi.ts new file mode 100644 index 0000000000..3b89aac35c --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/games/reversi.ts @@ -0,0 +1,33 @@ +import autobind from 'autobind-decorator'; +import { publishMainStream } from '@/services/stream'; +import Channel from '../../channel'; +import { ReversiMatchings } from '@/models/index'; + +export default class extends Channel { + public readonly chName = 'gamesReversi'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + // Subscribe reversi stream + this.subscriber.on(`reversiStream:${this.user!.id}`, data => { + this.send(data); + }); + } + + @autobind + public async onMessage(type: string, body: any) { + switch (type) { + case 'ping': + if (body.id == null) return; + const matching = await ReversiMatchings.findOne({ + parentId: this.user!.id, + childId: body.id + }); + if (matching == null) return; + publishMainStream(matching.childId, 'reversiInvited', await ReversiMatchings.pack(matching, { id: matching.childId })); + break; + } + } +} diff --git a/packages/backend/src/server/api/stream/channels/global-timeline.ts b/packages/backend/src/server/api/stream/channels/global-timeline.ts new file mode 100644 index 0000000000..f5983ab472 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/global-timeline.ts @@ -0,0 +1,73 @@ +import autobind from 'autobind-decorator'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import Channel from '../channel'; +import { fetchMeta } from '@/misc/fetch-meta'; +import { Notes } from '@/models/index'; +import { checkWordMute } from '@/misc/check-word-mute'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'globalTimeline'; + public static shouldShare = true; + public static requireCredential = false; + + @autobind + public async init(params: any) { + const meta = await fetchMeta(); + if (meta.disableGlobalTimeline) { + if (this.user == null || (!this.user.isAdmin && !this.user.isModerator)) return; + } + + // Subscribe events + this.subscriber.on('notesStream', this.onNote); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + if (note.visibility !== 'public') return; + if (note.channelId != null) return; + + // リプライなら再pack + if (note.replyId != null) { + note.reply = await Notes.pack(note.replyId, this.user, { + detail: true + }); + } + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user, { + detail: true + }); + } + + // 関係ない返信は除外 + if (note.reply) { + const reply = note.reply; + // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合 + if (reply.userId !== this.user!.id && note.userId !== this.user!.id && reply.userId !== note.userId) return; + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + // 流れてきたNoteがミュートすべきNoteだったら無視する + // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) + // 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、 + // レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。 + // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる + if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('notesStream', this.onNote); + } +} diff --git a/packages/backend/src/server/api/stream/channels/hashtag.ts b/packages/backend/src/server/api/stream/channels/hashtag.ts new file mode 100644 index 0000000000..281be4f2eb --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/hashtag.ts @@ -0,0 +1,53 @@ +import autobind from 'autobind-decorator'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import Channel from '../channel'; +import { Notes } from '@/models/index'; +import { normalizeForSearch } from '@/misc/normalize-for-search'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'hashtag'; + public static shouldShare = false; + public static requireCredential = false; + private q: string[][]; + + @autobind + public async init(params: any) { + this.q = params.q; + + if (this.q == null) return; + + // Subscribe stream + this.subscriber.on('notesStream', this.onNote); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + const noteTags = note.tags ? note.tags.map((t: string) => t.toLowerCase()) : []; + const matched = this.q.some(tags => tags.every(tag => noteTags.includes(normalizeForSearch(tag)))); + if (!matched) return; + + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user, { + detail: true + }); + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('notesStream', this.onNote); + } +} diff --git a/packages/backend/src/server/api/stream/channels/home-timeline.ts b/packages/backend/src/server/api/stream/channels/home-timeline.ts new file mode 100644 index 0000000000..52e9aec250 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/home-timeline.ts @@ -0,0 +1,81 @@ +import autobind from 'autobind-decorator'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import Channel from '../channel'; +import { Notes } from '@/models/index'; +import { checkWordMute } from '@/misc/check-word-mute'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'homeTimeline'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + // Subscribe events + this.subscriber.on('notesStream', this.onNote); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + if (note.channelId) { + if (!this.followingChannels.has(note.channelId)) return; + } else { + // その投稿のユーザーをフォローしていなかったら弾く + if ((this.user!.id !== note.userId) && !this.following.has(note.userId)) return; + } + + if (['followers', 'specified'].includes(note.visibility)) { + note = await Notes.pack(note.id, this.user!, { + detail: true + }); + + if (note.isHidden) { + return; + } + } else { + // リプライなら再pack + if (note.replyId != null) { + note.reply = await Notes.pack(note.replyId, this.user!, { + detail: true + }); + } + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user!, { + detail: true + }); + } + } + + // 関係ない返信は除外 + if (note.reply) { + const reply = note.reply; + // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合 + if (reply.userId !== this.user!.id && note.userId !== this.user!.id && reply.userId !== note.userId) return; + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + // 流れてきたNoteがミュートすべきNoteだったら無視する + // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) + // 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、 + // レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。 + // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる + if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('notesStream', this.onNote); + } +} diff --git a/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts b/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts new file mode 100644 index 0000000000..51f95fc0cd --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts @@ -0,0 +1,89 @@ +import autobind from 'autobind-decorator'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import Channel from '../channel'; +import { fetchMeta } from '@/misc/fetch-meta'; +import { Notes } from '@/models/index'; +import { checkWordMute } from '@/misc/check-word-mute'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'hybridTimeline'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + const meta = await fetchMeta(); + if (meta.disableLocalTimeline && !this.user!.isAdmin && !this.user!.isModerator) return; + + // Subscribe events + this.subscriber.on('notesStream', this.onNote); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + // チャンネルの投稿ではなく、自分自身の投稿 または + // チャンネルの投稿ではなく、その投稿のユーザーをフォローしている または + // チャンネルの投稿ではなく、全体公開のローカルの投稿 または + // フォローしているチャンネルの投稿 の場合だけ + if (!( + (note.channelId == null && this.user!.id === note.userId) || + (note.channelId == null && this.following.has(note.userId)) || + (note.channelId == null && (note.user.host == null && note.visibility === 'public')) || + (note.channelId != null && this.followingChannels.has(note.channelId)) + )) return; + + if (['followers', 'specified'].includes(note.visibility)) { + note = await Notes.pack(note.id, this.user!, { + detail: true + }); + + if (note.isHidden) { + return; + } + } else { + // リプライなら再pack + if (note.replyId != null) { + note.reply = await Notes.pack(note.replyId, this.user!, { + detail: true + }); + } + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user!, { + detail: true + }); + } + } + + // 関係ない返信は除外 + if (note.reply) { + const reply = note.reply; + // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合 + if (reply.userId !== this.user!.id && note.userId !== this.user!.id && reply.userId !== note.userId) return; + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + // 流れてきたNoteがミュートすべきNoteだったら無視する + // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) + // 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、 + // レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。 + // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる + if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('notesStream', this.onNote); + } +} diff --git a/packages/backend/src/server/api/stream/channels/index.ts b/packages/backend/src/server/api/stream/channels/index.ts new file mode 100644 index 0000000000..1841573043 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/index.ts @@ -0,0 +1,37 @@ +import main from './main'; +import homeTimeline from './home-timeline'; +import localTimeline from './local-timeline'; +import hybridTimeline from './hybrid-timeline'; +import globalTimeline from './global-timeline'; +import serverStats from './server-stats'; +import queueStats from './queue-stats'; +import userList from './user-list'; +import antenna from './antenna'; +import messaging from './messaging'; +import messagingIndex from './messaging-index'; +import drive from './drive'; +import hashtag from './hashtag'; +import channel from './channel'; +import admin from './admin'; +import gamesReversi from './games/reversi'; +import gamesReversiGame from './games/reversi-game'; + +export default { + main, + homeTimeline, + localTimeline, + hybridTimeline, + globalTimeline, + serverStats, + queueStats, + userList, + antenna, + messaging, + messagingIndex, + drive, + hashtag, + channel, + admin, + gamesReversi, + gamesReversiGame +}; diff --git a/packages/backend/src/server/api/stream/channels/local-timeline.ts b/packages/backend/src/server/api/stream/channels/local-timeline.ts new file mode 100644 index 0000000000..a6166c2be2 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/local-timeline.ts @@ -0,0 +1,74 @@ +import autobind from 'autobind-decorator'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import Channel from '../channel'; +import { fetchMeta } from '@/misc/fetch-meta'; +import { Notes } from '@/models/index'; +import { checkWordMute } from '@/misc/check-word-mute'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'localTimeline'; + public static shouldShare = true; + public static requireCredential = false; + + @autobind + public async init(params: any) { + const meta = await fetchMeta(); + if (meta.disableLocalTimeline) { + if (this.user == null || (!this.user.isAdmin && !this.user.isModerator)) return; + } + + // Subscribe events + this.subscriber.on('notesStream', this.onNote); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + if (note.user.host !== null) return; + if (note.visibility !== 'public') return; + if (note.channelId != null && !this.followingChannels.has(note.channelId)) return; + + // リプライなら再pack + if (note.replyId != null) { + note.reply = await Notes.pack(note.replyId, this.user, { + detail: true + }); + } + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user, { + detail: true + }); + } + + // 関係ない返信は除外 + if (note.reply) { + const reply = note.reply; + // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合 + if (reply.userId !== this.user!.id && note.userId !== this.user!.id && reply.userId !== note.userId) return; + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + // 流れてきたNoteがミュートすべきNoteだったら無視する + // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) + // 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、 + // レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。 + // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる + if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('notesStream', this.onNote); + } +} diff --git a/packages/backend/src/server/api/stream/channels/main.ts b/packages/backend/src/server/api/stream/channels/main.ts new file mode 100644 index 0000000000..131ac30472 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/main.ts @@ -0,0 +1,43 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; +import { Notes } from '@/models/index'; + +export default class extends Channel { + public readonly chName = 'main'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + // Subscribe main stream channel + this.subscriber.on(`mainStream:${this.user!.id}`, async data => { + switch (data.type) { + case 'notification': { + if (data.body.userId && this.muting.has(data.body.userId)) return; + + if (data.body.note && data.body.note.isHidden) { + const note = await Notes.pack(data.body.note.id, this.user, { + detail: true + }); + this.connection.cacheNote(note); + data.body.note = note; + } + break; + } + case 'mention': { + if (this.muting.has(data.body.userId)) return; + if (data.body.isHidden) { + const note = await Notes.pack(data.body.id, this.user, { + detail: true + }); + this.connection.cacheNote(note); + data.body = note; + } + break; + } + } + + this.send(data.type, data.body); + }); + } +} diff --git a/packages/backend/src/server/api/stream/channels/messaging-index.ts b/packages/backend/src/server/api/stream/channels/messaging-index.ts new file mode 100644 index 0000000000..0c495398ab --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/messaging-index.ts @@ -0,0 +1,16 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; + +export default class extends Channel { + public readonly chName = 'messagingIndex'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + // Subscribe messaging index stream + this.subscriber.on(`messagingIndexStream:${this.user!.id}`, data => { + this.send(data); + }); + } +} diff --git a/packages/backend/src/server/api/stream/channels/messaging.ts b/packages/backend/src/server/api/stream/channels/messaging.ts new file mode 100644 index 0000000000..c049e880b9 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/messaging.ts @@ -0,0 +1,106 @@ +import autobind from 'autobind-decorator'; +import { readUserMessagingMessage, readGroupMessagingMessage, deliverReadActivity } from '../../common/read-messaging-message'; +import Channel from '../channel'; +import { UserGroupJoinings, Users, MessagingMessages } from '@/models/index'; +import { User, ILocalUser, IRemoteUser } from '@/models/entities/user'; +import { UserGroup } from '@/models/entities/user-group'; +import { StreamMessages } from '../types'; + +export default class extends Channel { + public readonly chName = 'messaging'; + public static shouldShare = false; + public static requireCredential = true; + + private otherpartyId: string | null; + private otherparty: User | null; + private groupId: string | null; + private subCh: `messagingStream:${User['id']}-${User['id']}` | `messagingStream:${UserGroup['id']}`; + private typers: Record = {}; + private emitTypersIntervalId: ReturnType; + + @autobind + public async init(params: any) { + this.otherpartyId = params.otherparty; + this.otherparty = this.otherpartyId ? await Users.findOneOrFail({ id: this.otherpartyId }) : null; + this.groupId = params.group; + + // Check joining + if (this.groupId) { + const joining = await UserGroupJoinings.findOne({ + userId: this.user!.id, + userGroupId: this.groupId + }); + + if (joining == null) { + return; + } + } + + this.emitTypersIntervalId = setInterval(this.emitTypers, 5000); + + this.subCh = this.otherpartyId + ? `messagingStream:${this.user!.id}-${this.otherpartyId}` + : `messagingStream:${this.groupId}`; + + // Subscribe messaging stream + this.subscriber.on(this.subCh, this.onEvent); + } + + @autobind + private onEvent(data: StreamMessages['messaging']['payload'] | StreamMessages['groupMessaging']['payload']) { + if (data.type === 'typing') { + const id = data.body; + const begin = this.typers[id] == null; + this.typers[id] = new Date(); + if (begin) { + this.emitTypers(); + } + } else { + this.send(data); + } + } + + @autobind + public onMessage(type: string, body: any) { + switch (type) { + case 'read': + if (this.otherpartyId) { + readUserMessagingMessage(this.user!.id, this.otherpartyId, [body.id]); + + // リモートユーザーからのメッセージだったら既読配信 + if (Users.isLocalUser(this.user!) && Users.isRemoteUser(this.otherparty!)) { + MessagingMessages.findOne(body.id).then(message => { + if (message) deliverReadActivity(this.user as ILocalUser, this.otherparty as IRemoteUser, message); + }); + } + } else if (this.groupId) { + readGroupMessagingMessage(this.user!.id, this.groupId, [body.id]); + } + break; + } + } + + @autobind + private async emitTypers() { + const now = new Date(); + + // Remove not typing users + for (const [userId, date] of Object.entries(this.typers)) { + if (now.getTime() - date.getTime() > 5000) delete this.typers[userId]; + } + + const users = await Users.packMany(Object.keys(this.typers), null, { detail: false }); + + this.send({ + type: 'typers', + body: users, + }); + } + + @autobind + public dispose() { + this.subscriber.off(this.subCh, this.onEvent); + + clearInterval(this.emitTypersIntervalId); + } +} diff --git a/packages/backend/src/server/api/stream/channels/queue-stats.ts b/packages/backend/src/server/api/stream/channels/queue-stats.ts new file mode 100644 index 0000000000..0bda0cfcb9 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/queue-stats.ts @@ -0,0 +1,41 @@ +import autobind from 'autobind-decorator'; +import Xev from 'xev'; +import Channel from '../channel'; + +const ev = new Xev(); + +export default class extends Channel { + public readonly chName = 'queueStats'; + public static shouldShare = true; + public static requireCredential = false; + + @autobind + public async init(params: any) { + ev.addListener('queueStats', this.onStats); + } + + @autobind + private onStats(stats: any) { + this.send('stats', stats); + } + + @autobind + public onMessage(type: string, body: any) { + switch (type) { + case 'requestLog': + ev.once(`queueStatsLog:${body.id}`, statsLog => { + this.send('statsLog', statsLog); + }); + ev.emit('requestQueueStatsLog', { + id: body.id, + length: body.length + }); + break; + } + } + + @autobind + public dispose() { + ev.removeListener('queueStats', this.onStats); + } +} diff --git a/packages/backend/src/server/api/stream/channels/server-stats.ts b/packages/backend/src/server/api/stream/channels/server-stats.ts new file mode 100644 index 0000000000..d245a7f70c --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/server-stats.ts @@ -0,0 +1,41 @@ +import autobind from 'autobind-decorator'; +import Xev from 'xev'; +import Channel from '../channel'; + +const ev = new Xev(); + +export default class extends Channel { + public readonly chName = 'serverStats'; + public static shouldShare = true; + public static requireCredential = false; + + @autobind + public async init(params: any) { + ev.addListener('serverStats', this.onStats); + } + + @autobind + private onStats(stats: any) { + this.send('stats', stats); + } + + @autobind + public onMessage(type: string, body: any) { + switch (type) { + case 'requestLog': + ev.once(`serverStatsLog:${body.id}`, statsLog => { + this.send('statsLog', statsLog); + }); + ev.emit('requestServerStatsLog', { + id: body.id, + length: body.length + }); + break; + } + } + + @autobind + public dispose() { + ev.removeListener('serverStats', this.onStats); + } +} diff --git a/packages/backend/src/server/api/stream/channels/user-list.ts b/packages/backend/src/server/api/stream/channels/user-list.ts new file mode 100644 index 0000000000..63b254605b --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/user-list.ts @@ -0,0 +1,92 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; +import { Notes, UserListJoinings, UserLists } from '@/models/index'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import { User } from '@/models/entities/user'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'userList'; + public static shouldShare = false; + public static requireCredential = false; + private listId: string; + public listUsers: User['id'][] = []; + private listUsersClock: NodeJS.Timer; + + @autobind + public async init(params: any) { + this.listId = params.listId as string; + + // Check existence and owner + const list = await UserLists.findOne({ + id: this.listId, + userId: this.user!.id + }); + if (!list) return; + + // Subscribe stream + this.subscriber.on(`userListStream:${this.listId}`, this.send); + + this.subscriber.on('notesStream', this.onNote); + + this.updateListUsers(); + this.listUsersClock = setInterval(this.updateListUsers, 5000); + } + + @autobind + private async updateListUsers() { + const users = await UserListJoinings.find({ + where: { + userListId: this.listId, + }, + select: ['userId'] + }); + + this.listUsers = users.map(x => x.userId); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + if (!this.listUsers.includes(note.userId)) return; + + if (['followers', 'specified'].includes(note.visibility)) { + note = await Notes.pack(note.id, this.user, { + detail: true + }); + + if (note.isHidden) { + return; + } + } else { + // リプライなら再pack + if (note.replyId != null) { + note.reply = await Notes.pack(note.replyId, this.user, { + detail: true + }); + } + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user, { + detail: true + }); + } + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off(`userListStream:${this.listId}`, this.send); + this.subscriber.off('notesStream', this.onNote); + + clearInterval(this.listUsersClock); + } +} diff --git a/packages/backend/src/server/api/stream/index.ts b/packages/backend/src/server/api/stream/index.ts new file mode 100644 index 0000000000..da4ea5ec99 --- /dev/null +++ b/packages/backend/src/server/api/stream/index.ts @@ -0,0 +1,421 @@ +import autobind from 'autobind-decorator'; +import * as websocket from 'websocket'; +import { readNotification } from '../common/read-notification'; +import call from '../call'; +import readNote from '@/services/note/read'; +import Channel from './channel'; +import channels from './channels/index'; +import { EventEmitter } from 'events'; +import { User } from '@/models/entities/user'; +import { Channel as ChannelModel } from '@/models/entities/channel'; +import { Users, Followings, Mutings, UserProfiles, ChannelFollowings, Blockings } from '@/models/index'; +import { ApiError } from '../error'; +import { AccessToken } from '@/models/entities/access-token'; +import { UserProfile } from '@/models/entities/user-profile'; +import { publishChannelStream, publishGroupMessagingStream, publishMessagingStream } from '@/services/stream'; +import { UserGroup } from '@/models/entities/user-group'; +import { StreamEventEmitter, StreamMessages } from './types'; +import { Packed } from '@/misc/schema'; + +/** + * Main stream connection + */ +export default class Connection { + public user?: User; + public userProfile?: UserProfile; + public following: Set = new Set(); + public muting: Set = new Set(); + public blocking: Set = new Set(); // "被"blocking + public followingChannels: Set = new Set(); + public token?: AccessToken; + private wsConnection: websocket.connection; + public subscriber: StreamEventEmitter; + private channels: Channel[] = []; + private subscribingNotes: any = {}; + private cachedNotes: Packed<'Note'>[] = []; + + constructor( + wsConnection: websocket.connection, + subscriber: EventEmitter, + user: User | null | undefined, + token: AccessToken | null | undefined + ) { + this.wsConnection = wsConnection; + this.subscriber = subscriber; + if (user) this.user = user; + if (token) this.token = token; + + this.wsConnection.on('message', this.onWsConnectionMessage); + + this.subscriber.on('broadcast', data => { + this.onBroadcastMessage(data); + }); + + if (this.user) { + this.updateFollowing(); + this.updateMuting(); + this.updateBlocking(); + this.updateFollowingChannels(); + this.updateUserProfile(); + + this.subscriber.on(`user:${this.user.id}`, this.onUserEvent); + } + } + + @autobind + private onUserEvent(data: StreamMessages['user']['payload']) { // { type, body }と展開するとそれぞれ型が分離してしまう + switch (data.type) { + case 'follow': + this.following.add(data.body.id); + break; + + case 'unfollow': + this.following.delete(data.body.id); + break; + + case 'mute': + this.muting.add(data.body.id); + break; + + case 'unmute': + this.muting.delete(data.body.id); + break; + + // TODO: block events + + case 'followChannel': + this.followingChannels.add(data.body.id); + break; + + case 'unfollowChannel': + this.followingChannels.delete(data.body.id); + break; + + case 'updateUserProfile': + this.userProfile = data.body; + break; + + case 'terminate': + this.wsConnection.close(); + this.dispose(); + break; + + default: + break; + } + } + + /** + * クライアントからメッセージ受信時 + */ + @autobind + private async onWsConnectionMessage(data: websocket.IMessage) { + if (data.utf8Data == null) return; + + let obj: Record; + + try { + obj = JSON.parse(data.utf8Data); + } catch (e) { + return; + } + + const { type, body } = obj; + + switch (type) { + case 'api': this.onApiRequest(body); break; + case 'readNotification': this.onReadNotification(body); break; + case 'subNote': this.onSubscribeNote(body); break; + case 's': this.onSubscribeNote(body); break; // alias + case 'sr': this.onSubscribeNote(body); this.readNote(body); break; + case 'unsubNote': this.onUnsubscribeNote(body); break; + case 'un': this.onUnsubscribeNote(body); break; // alias + case 'connect': this.onChannelConnectRequested(body); break; + case 'disconnect': this.onChannelDisconnectRequested(body); break; + case 'channel': this.onChannelMessageRequested(body); break; + case 'ch': this.onChannelMessageRequested(body); break; // alias + + // 個々のチャンネルではなくルートレベルでこれらのメッセージを受け取る理由は、 + // クライアントの事情を考慮したとき、入力フォームはノートチャンネルやメッセージのメインコンポーネントとは別 + // なこともあるため、それらのコンポーネントがそれぞれ各チャンネルに接続するようにするのは面倒なため。 + case 'typingOnChannel': this.typingOnChannel(body.channel); break; + case 'typingOnMessaging': this.typingOnMessaging(body); break; + } + } + + @autobind + private onBroadcastMessage(data: StreamMessages['broadcast']['payload']) { + this.sendMessageToWs(data.type, data.body); + } + + @autobind + public cacheNote(note: Packed<'Note'>) { + const add = (note: Packed<'Note'>) => { + const existIndex = this.cachedNotes.findIndex(n => n.id === note.id); + if (existIndex > -1) { + this.cachedNotes[existIndex] = note; + return; + } + + this.cachedNotes.unshift(note); + if (this.cachedNotes.length > 32) { + this.cachedNotes.splice(32); + } + }; + + add(note); + if (note.reply) add(note.reply); + if (note.renote) add(note.renote); + } + + @autobind + private readNote(body: any) { + const id = body.id; + + const note = this.cachedNotes.find(n => n.id === id); + if (note == null) return; + + if (this.user && (note.userId !== this.user.id)) { + readNote(this.user.id, [note], { + following: this.following, + followingChannels: this.followingChannels, + }); + } + } + + /** + * APIリクエスト要求時 + */ + @autobind + private async onApiRequest(payload: any) { + // 新鮮なデータを利用するためにユーザーをフェッチ + const user = this.user ? await Users.findOne(this.user.id) : null; + + const endpoint = payload.endpoint || payload.ep; // alias + + // 呼び出し + call(endpoint, user, this.token, payload.data).then(res => { + this.sendMessageToWs(`api:${payload.id}`, { res }); + }).catch((e: ApiError) => { + this.sendMessageToWs(`api:${payload.id}`, { + error: { + message: e.message, + code: e.code, + id: e.id, + kind: e.kind, + ...(e.info ? { info: e.info } : {}) + } + }); + }); + } + + @autobind + private onReadNotification(payload: any) { + if (!payload.id) return; + readNotification(this.user!.id, [payload.id]); + } + + /** + * 投稿購読要求時 + */ + @autobind + private onSubscribeNote(payload: any) { + if (!payload.id) return; + + if (this.subscribingNotes[payload.id] == null) { + this.subscribingNotes[payload.id] = 0; + } + + this.subscribingNotes[payload.id]++; + + if (this.subscribingNotes[payload.id] === 1) { + this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage); + } + } + + /** + * 投稿購読解除要求時 + */ + @autobind + private onUnsubscribeNote(payload: any) { + if (!payload.id) return; + + this.subscribingNotes[payload.id]--; + if (this.subscribingNotes[payload.id] <= 0) { + delete this.subscribingNotes[payload.id]; + this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage); + } + } + + @autobind + private async onNoteStreamMessage(data: StreamMessages['note']['payload']) { + this.sendMessageToWs('noteUpdated', { + id: data.body.id, + type: data.type, + body: data.body.body, + }); + } + + /** + * チャンネル接続要求時 + */ + @autobind + private onChannelConnectRequested(payload: any) { + const { channel, id, params, pong } = payload; + this.connectChannel(id, params, channel, pong); + } + + /** + * チャンネル切断要求時 + */ + @autobind + private onChannelDisconnectRequested(payload: any) { + const { id } = payload; + this.disconnectChannel(id); + } + + /** + * クライアントにメッセージ送信 + */ + @autobind + public sendMessageToWs(type: string, payload: any) { + this.wsConnection.send(JSON.stringify({ + type: type, + body: payload + })); + } + + /** + * チャンネルに接続 + */ + @autobind + public connectChannel(id: string, params: any, channel: string, pong = false) { + if ((channels as any)[channel].requireCredential && this.user == null) { + return; + } + + // 共有可能チャンネルに接続しようとしていて、かつそのチャンネルに既に接続していたら無意味なので無視 + if ((channels as any)[channel].shouldShare && this.channels.some(c => c.chName === channel)) { + return; + } + + const ch: Channel = new (channels as any)[channel](id, this); + this.channels.push(ch); + ch.init(params); + + if (pong) { + this.sendMessageToWs('connected', { + id: id + }); + } + } + + /** + * チャンネルから切断 + * @param id チャンネルコネクションID + */ + @autobind + public disconnectChannel(id: string) { + const channel = this.channels.find(c => c.id === id); + + if (channel) { + if (channel.dispose) channel.dispose(); + this.channels = this.channels.filter(c => c.id !== id); + } + } + + /** + * チャンネルへメッセージ送信要求時 + * @param data メッセージ + */ + @autobind + private onChannelMessageRequested(data: any) { + const channel = this.channels.find(c => c.id === data.id); + if (channel != null && channel.onMessage != null) { + channel.onMessage(data.type, data.body); + } + } + + @autobind + private typingOnChannel(channel: ChannelModel['id']) { + if (this.user) { + publishChannelStream(channel, 'typing', this.user.id); + } + } + + @autobind + private typingOnMessaging(param: { partner?: User['id']; group?: UserGroup['id']; }) { + if (this.user) { + if (param.partner) { + publishMessagingStream(param.partner, this.user.id, 'typing', this.user.id); + } else if (param.group) { + publishGroupMessagingStream(param.group, 'typing', this.user.id); + } + } + } + + @autobind + private async updateFollowing() { + const followings = await Followings.find({ + where: { + followerId: this.user!.id + }, + select: ['followeeId'] + }); + + this.following = new Set(followings.map(x => x.followeeId)); + } + + @autobind + private async updateMuting() { + const mutings = await Mutings.find({ + where: { + muterId: this.user!.id + }, + select: ['muteeId'] + }); + + this.muting = new Set(mutings.map(x => x.muteeId)); + } + + @autobind + private async updateBlocking() { // ここでいうBlockingは被Blockingの意 + const blockings = await Blockings.find({ + where: { + blockeeId: this.user!.id + }, + select: ['blockerId'] + }); + + this.blocking = new Set(blockings.map(x => x.blockerId)); + } + + @autobind + private async updateFollowingChannels() { + const followings = await ChannelFollowings.find({ + where: { + followerId: this.user!.id + }, + select: ['followeeId'] + }); + + this.followingChannels = new Set(followings.map(x => x.followeeId)); + } + + @autobind + private async updateUserProfile() { + this.userProfile = await UserProfiles.findOne({ + userId: this.user!.id + }); + } + + /** + * ストリームが切れたとき + */ + @autobind + public dispose() { + for (const c of this.channels.filter(c => c.dispose)) { + if (c.dispose) c.dispose(); + } + } +} diff --git a/packages/backend/src/server/api/stream/types.ts b/packages/backend/src/server/api/stream/types.ts new file mode 100644 index 0000000000..70eb5c5ce5 --- /dev/null +++ b/packages/backend/src/server/api/stream/types.ts @@ -0,0 +1,299 @@ +import { EventEmitter } from 'events'; +import Emitter from 'strict-event-emitter-types'; +import { Channel } from '@/models/entities/channel'; +import { User } from '@/models/entities/user'; +import { UserProfile } from '@/models/entities/user-profile'; +import { Note } from '@/models/entities/note'; +import { Antenna } from '@/models/entities/antenna'; +import { DriveFile } from '@/models/entities/drive-file'; +import { DriveFolder } from '@/models/entities/drive-folder'; +import { Emoji } from '@/models/entities/emoji'; +import { UserList } from '@/models/entities/user-list'; +import { MessagingMessage } from '@/models/entities/messaging-message'; +import { UserGroup } from '@/models/entities/user-group'; +import { ReversiGame } from '@/models/entities/games/reversi/game'; +import { AbuseUserReport } from '@/models/entities/abuse-user-report'; +import { Signin } from '@/models/entities/signin'; +import { Page } from '@/models/entities/page'; +import { Packed } from '@/misc/schema'; + +//#region Stream type-body definitions +export interface InternalStreamTypes { + antennaCreated: Antenna; + antennaDeleted: Antenna; + antennaUpdated: Antenna; +} + +export interface BroadcastTypes { + emojiAdded: { + emoji: Packed<'Emoji'>; + }; +} + +export interface UserStreamTypes { + terminate: {}; + followChannel: Channel; + unfollowChannel: Channel; + updateUserProfile: UserProfile; + mute: User; + unmute: User; + follow: Packed<'User'>; + unfollow: Packed<'User'>; + userAdded: Packed<'User'>; +} + +export interface MainStreamTypes { + notification: Packed<'Notification'>; + mention: Packed<'Note'>; + reply: Packed<'Note'>; + renote: Packed<'Note'>; + follow: Packed<'User'>; + followed: Packed<'User'>; + unfollow: Packed<'User'>; + meUpdated: Packed<'User'>; + pageEvent: { + pageId: Page['id']; + event: string; + var: any; + userId: User['id']; + user: Packed<'User'>; + }; + urlUploadFinished: { + marker?: string | null; + file: Packed<'DriveFile'>; + }; + readAllNotifications: undefined; + unreadNotification: Packed<'Notification'>; + unreadMention: Note['id']; + readAllUnreadMentions: undefined; + unreadSpecifiedNote: Note['id']; + readAllUnreadSpecifiedNotes: undefined; + readAllMessagingMessages: undefined; + messagingMessage: Packed<'MessagingMessage'>; + unreadMessagingMessage: Packed<'MessagingMessage'>; + readAllAntennas: undefined; + unreadAntenna: Antenna; + readAllAnnouncements: undefined; + readAllChannels: undefined; + unreadChannel: Note['id']; + myTokenRegenerated: undefined; + reversiNoInvites: undefined; + reversiInvited: Packed<'ReversiMatching'>; + signin: Signin; + registryUpdated: { + scope?: string[]; + key: string; + value: any | null; + }; + driveFileCreated: Packed<'DriveFile'>; + readAntenna: Antenna; +} + +export interface DriveStreamTypes { + fileCreated: Packed<'DriveFile'>; + fileDeleted: DriveFile['id']; + fileUpdated: Packed<'DriveFile'>; + folderCreated: Packed<'DriveFolder'>; + folderDeleted: DriveFolder['id']; + folderUpdated: Packed<'DriveFolder'>; +} + +export interface NoteStreamTypes { + pollVoted: { + choice: number; + userId: User['id']; + }; + deleted: { + deletedAt: Date; + }; + reacted: { + reaction: string; + emoji?: Emoji; + userId: User['id']; + }; + unreacted: { + reaction: string; + userId: User['id']; + }; +} +type NoteStreamEventTypes = { + [key in keyof NoteStreamTypes]: { + id: Note['id']; + body: NoteStreamTypes[key]; + }; +}; + +export interface ChannelStreamTypes { + typing: User['id']; +} + +export interface UserListStreamTypes { + userAdded: Packed<'User'>; + userRemoved: Packed<'User'>; +} + +export interface AntennaStreamTypes { + note: Note; +} + +export interface MessagingStreamTypes { + read: MessagingMessage['id'][]; + typing: User['id']; + message: Packed<'MessagingMessage'>; + deleted: MessagingMessage['id']; +} + +export interface GroupMessagingStreamTypes { + read: { + ids: MessagingMessage['id'][]; + userId: User['id']; + }; + typing: User['id']; + message: Packed<'MessagingMessage'>; + deleted: MessagingMessage['id']; +} + +export interface MessagingIndexStreamTypes { + read: MessagingMessage['id'][]; + message: Packed<'MessagingMessage'>; +} + +export interface ReversiStreamTypes { + matched: Packed<'ReversiGame'>; + invited: Packed<'ReversiMatching'>; +} + +export interface ReversiGameStreamTypes { + started: Packed<'ReversiGame'>; + ended: { + winnerId?: User['id'] | null, + game: Packed<'ReversiGame'>; + }; + updateSettings: { + key: string; + value: FIXME; + }; + initForm: { + userId: User['id']; + form: FIXME; + }; + updateForm: { + userId: User['id']; + id: string; + value: FIXME; + }; + message: { + userId: User['id']; + message: FIXME; + }; + changeAccepts: { + user1: boolean; + user2: boolean; + }; + set: { + at: Date; + color: boolean; + pos: number; + next: boolean; + }; + watching: User['id']; +} + +export interface AdminStreamTypes { + newAbuseUserReport: { + id: AbuseUserReport['id']; + targetUserId: User['id'], + reporterId: User['id'], + comment: string; + }; +} +//#endregion + +// 辞書(interface or type)から{ type, body }ユニオンを定義 +// https://stackoverflow.com/questions/49311989/can-i-infer-the-type-of-a-value-using-extends-keyof-type +// VS Codeの展開を防止するためにEvents型を定義 +type Events = { [K in keyof T]: { type: K; body: T[K]; } }; +type EventUnionFromDictionary< + T extends object, + U = Events +> = U[keyof U]; + +// name/messages(spec) pairs dictionary +export type StreamMessages = { + internal: { + name: 'internal'; + payload: EventUnionFromDictionary; + }; + broadcast: { + name: 'broadcast'; + payload: EventUnionFromDictionary; + }; + user: { + name: `user:${User['id']}`; + payload: EventUnionFromDictionary; + }; + main: { + name: `mainStream:${User['id']}`; + payload: EventUnionFromDictionary; + }; + drive: { + name: `driveStream:${User['id']}`; + payload: EventUnionFromDictionary; + }; + note: { + name: `noteStream:${Note['id']}`; + payload: EventUnionFromDictionary; + }; + channel: { + name: `channelStream:${Channel['id']}`; + payload: EventUnionFromDictionary; + }; + userList: { + name: `userListStream:${UserList['id']}`; + payload: EventUnionFromDictionary; + }; + antenna: { + name: `antennaStream:${Antenna['id']}`; + payload: EventUnionFromDictionary; + }; + messaging: { + name: `messagingStream:${User['id']}-${User['id']}`; + payload: EventUnionFromDictionary; + }; + groupMessaging: { + name: `messagingStream:${UserGroup['id']}`; + payload: EventUnionFromDictionary; + }; + messagingIndex: { + name: `messagingIndexStream:${User['id']}`; + payload: EventUnionFromDictionary; + }; + reversi: { + name: `reversiStream:${User['id']}`; + payload: EventUnionFromDictionary; + }; + reversiGame: { + name: `reversiGameStream:${ReversiGame['id']}`; + payload: EventUnionFromDictionary; + }; + admin: { + name: `adminStream:${User['id']}`; + payload: EventUnionFromDictionary; + }; + notes: { + name: 'notesStream'; + payload: Packed<'Note'>; + }; +}; + +// API event definitions +// ストリームごとのEmitterの辞書を用意 +type EventEmitterDictionary = { [x in keyof StreamMessages]: Emitter void }> }; +// 共用体型を交差型にする型 https://stackoverflow.com/questions/54938141/typescript-convert-union-to-intersection +type UnionToIntersection = (U extends any ? (k: U) => void : never) extends ((k: infer I) => void) ? I : never; +// Emitter辞書から共用体型を作り、UnionToIntersectionで交差型にする +export type StreamEventEmitter = UnionToIntersection; +// { [y in name]: (e: spec) => void }をまとめてその交差型をEmitterにかけるとts(2590)にひっかかる + +// provide stream channels union +export type StreamChannels = StreamMessages[keyof StreamMessages]['name']; -- cgit v1.2.3-freya From b9eaf906e7b7202d06c9fea72b6d3c422a03f81e Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 12 Nov 2021 10:52:10 +0900 Subject: fix lint errors --- packages/backend/.eslintrc.js | 3 ++ packages/backend/@types/jsrsasign.d.ts | 2 +- packages/backend/src/mfm/from-html.ts | 3 +- packages/backend/src/misc/cafy-id.ts | 1 + packages/backend/src/misc/gen-avatar.ts | 2 +- packages/backend/src/prelude/array.ts | 2 +- packages/backend/src/prelude/url.ts | 2 +- .../object-storage/clean-remote-files.ts | 2 +- .../backend/src/queue/processors/system/index.ts | 4 +-- .../src/queue/processors/system/resync-charts.ts | 2 +- packages/backend/src/queue/queues.ts | 2 +- packages/backend/src/queue/types.ts | 2 +- .../src/remote/activitypub/models/person.ts | 2 +- .../activitypub/renderer/ordered-collection.ts | 2 +- packages/backend/src/server/api/define.ts | 6 ++-- .../src/server/api/endpoints/channels/update.ts | 2 +- .../src/server/api/endpoints/i/2fa/key-done.ts | 2 +- packages/backend/src/server/api/limiter.ts | 8 ++--- .../server/api/stream/channels/games/reversi.ts | 3 +- packages/backend/src/server/api/stream/types.ts | 2 +- packages/backend/src/services/chart/core.ts | 16 +++++----- packages/backend/src/services/stream.ts | 34 +++++++++++----------- 22 files changed, 55 insertions(+), 49 deletions(-) (limited to 'packages/backend/src/server/api/stream') diff --git a/packages/backend/.eslintrc.js b/packages/backend/.eslintrc.js index 952a2ee9ef..bafd0c9b63 100644 --- a/packages/backend/.eslintrc.js +++ b/packages/backend/.eslintrc.js @@ -22,6 +22,7 @@ module.exports = { 'eol-last': ['error', 'always'], 'semi': ['error', 'always'], 'quotes': ['warn', 'single'], + 'comma-dangle': ['warn', 'always-multiline'], 'keyword-spacing': ['error', { 'before': true, 'after': true, @@ -44,6 +45,8 @@ module.exports = { 'no-multi-spaces': ['warn'], 'no-control-regex': ['warn'], 'no-empty': ['warn'], + 'no-inner-declarations': ['off'], + 'no-sparse-arrays': ['off'], '@typescript-eslint/no-var-requires': ['warn'], '@typescript-eslint/no-inferrable-types': ['warn'], '@typescript-eslint/no-empty-function': ['off'], diff --git a/packages/backend/@types/jsrsasign.d.ts b/packages/backend/@types/jsrsasign.d.ts index bc9d746f7e..bb52f8f64e 100644 --- a/packages/backend/@types/jsrsasign.d.ts +++ b/packages/backend/@types/jsrsasign.d.ts @@ -171,7 +171,7 @@ declare module 'jsrsasign' { public static getTLVbyList(h: ASN1S, currentIndex: Idx, nthList: Mutable, checkingTag?: string): ASN1TLV; - // tslint:disable-next-line:bool-param-default + // eslint:disable-next-line:bool-param-default public static getVbyList(h: ASN1S, currentIndex: Idx, nthList: Mutable, checkingTag?: string, removeUnusedbits?: boolean): ASN1V; public static hextooidstr(hex: ASN1OIDV): OID; diff --git a/packages/backend/src/mfm/from-html.ts b/packages/backend/src/mfm/from-html.ts index de6aa3d0cc..43e16d80c5 100644 --- a/packages/backend/src/mfm/from-html.ts +++ b/packages/backend/src/mfm/from-html.ts @@ -48,9 +48,10 @@ export function fromHtml(html: string, hashtagNames?: string[]): string | null { if (!treeAdapter.isElementNode(node)) return; switch (node.nodeName) { - case 'br': + case 'br': { text += '\n'; break; + } case 'a': { diff --git a/packages/backend/src/misc/cafy-id.ts b/packages/backend/src/misc/cafy-id.ts index 39886611e1..dd81c5c4cf 100644 --- a/packages/backend/src/misc/cafy-id.ts +++ b/packages/backend/src/misc/cafy-id.ts @@ -1,5 +1,6 @@ import { Context } from 'cafy'; +// eslint-disable-next-line @typescript-eslint/ban-types export class ID extends Context { public readonly name = 'ID'; diff --git a/packages/backend/src/misc/gen-avatar.ts b/packages/backend/src/misc/gen-avatar.ts index f03ca9f96d..8838ec8d15 100644 --- a/packages/backend/src/misc/gen-avatar.ts +++ b/packages/backend/src/misc/gen-avatar.ts @@ -56,7 +56,7 @@ export function genAvatar(seed: string, stream: WriteStream): Promise { // 1*n (filled by false) const center: boolean[] = new Array(n).fill(false); - // tslint:disable-next-line:prefer-for-of + // eslint:disable-next-line:prefer-for-of for (let x = 0; x < side.length; x++) { for (let y = 0; y < side[x].length; y++) { side[x][y] = rand(3) === 0; diff --git a/packages/backend/src/prelude/array.ts b/packages/backend/src/prelude/array.ts index d63f0475d0..1e9e62b895 100644 --- a/packages/backend/src/prelude/array.ts +++ b/packages/backend/src/prelude/array.ts @@ -87,7 +87,7 @@ export function groupOn(f: (x: T) => S, xs: T[]): T[][] { export function groupByX(collections: T[], keySelector: (x: T) => string) { return collections.reduce((obj: Record, item: T) => { const key = keySelector(item); - if (!obj.hasOwnProperty(key)) { + if (!Object.prototype.hasOwnProperty.call(obj, key)) { obj[key] = []; } diff --git a/packages/backend/src/prelude/url.ts b/packages/backend/src/prelude/url.ts index c7f2b7c1e7..a4f2f7f5a8 100644 --- a/packages/backend/src/prelude/url.ts +++ b/packages/backend/src/prelude/url.ts @@ -1,4 +1,4 @@ -export function query(obj: {}): string { +export function query(obj: Record): string { const params = Object.entries(obj) .filter(([, v]) => Array.isArray(v) ? v.length : v !== undefined) .reduce((a, [k, v]) => (a[k] = v, a), {} as Record); diff --git a/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts b/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts index 3b2e4ea939..a094c39d5d 100644 --- a/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts +++ b/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts @@ -7,7 +7,7 @@ import { MoreThan, Not, IsNull } from 'typeorm'; const logger = queueLogger.createSubLogger('clean-remote-files'); -export default async function cleanRemoteFiles(job: Bull.Job<{}>, done: any): Promise { +export default async function cleanRemoteFiles(job: Bull.Job>, done: any): Promise { logger.info(`Deleting cached remote files...`); let deletedCount = 0; diff --git a/packages/backend/src/queue/processors/system/index.ts b/packages/backend/src/queue/processors/system/index.ts index 52b7868105..8460ea0a9b 100644 --- a/packages/backend/src/queue/processors/system/index.ts +++ b/packages/backend/src/queue/processors/system/index.ts @@ -3,9 +3,9 @@ import { resyncCharts } from './resync-charts'; const jobs = { resyncCharts, -} as Record | Bull.ProcessPromiseFunction<{}>>; +} as Record> | Bull.ProcessPromiseFunction>>; -export default function(dbQueue: Bull.Queue<{}>) { +export default function(dbQueue: Bull.Queue>) { for (const [k, v] of Object.entries(jobs)) { dbQueue.process(k, v); } diff --git a/packages/backend/src/queue/processors/system/resync-charts.ts b/packages/backend/src/queue/processors/system/resync-charts.ts index b36b024cfb..78a70bb981 100644 --- a/packages/backend/src/queue/processors/system/resync-charts.ts +++ b/packages/backend/src/queue/processors/system/resync-charts.ts @@ -5,7 +5,7 @@ import { driveChart, notesChart, usersChart } from '@/services/chart/index'; const logger = queueLogger.createSubLogger('resync-charts'); -export default async function resyncCharts(job: Bull.Job<{}>, done: any): Promise { +export async function resyncCharts(job: Bull.Job>, done: any): Promise { logger.info(`Resync charts...`); // TODO: ユーザーごとのチャートも更新する diff --git a/packages/backend/src/queue/queues.ts b/packages/backend/src/queue/queues.ts index a66a7ca451..b1d790fcb1 100644 --- a/packages/backend/src/queue/queues.ts +++ b/packages/backend/src/queue/queues.ts @@ -2,7 +2,7 @@ import config from '@/config/index'; import { initialize as initializeQueue } from './initialize'; import { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData } from './types'; -export const systemQueue = initializeQueue<{}>('system'); +export const systemQueue = initializeQueue>('system'); export const deliverQueue = initializeQueue('deliver', config.deliverJobPerSec || 128); export const inboxQueue = initializeQueue('inbox', config.inboxJobPerSec || 16); export const dbQueue = initializeQueue('db'); diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index 39cab29966..c8c7147152 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -33,7 +33,7 @@ export type DbUserImportJobData = { fileId: DriveFile['id']; }; -export type ObjectStorageJobData = ObjectStorageFileJobData | {}; +export type ObjectStorageJobData = ObjectStorageFileJobData | Record; export type ObjectStorageFileJobData = { key: string; diff --git a/packages/backend/src/remote/activitypub/models/person.ts b/packages/backend/src/remote/activitypub/models/person.ts index eb8c00a10b..95db46bff2 100644 --- a/packages/backend/src/remote/activitypub/models/person.ts +++ b/packages/backend/src/remote/activitypub/models/person.ts @@ -274,7 +274,7 @@ export async function createPerson(uri: string, resolver?: Resolver): Promise { +export async function updatePerson(uri: string, resolver?: Resolver | null, hint?: Record): Promise { if (typeof uri !== 'string') throw new Error('uri is not string'); // URIがこのサーバーを指しているならスキップ diff --git a/packages/backend/src/remote/activitypub/renderer/ordered-collection.ts b/packages/backend/src/remote/activitypub/renderer/ordered-collection.ts index 68870a0ecd..c4b4337af8 100644 --- a/packages/backend/src/remote/activitypub/renderer/ordered-collection.ts +++ b/packages/backend/src/remote/activitypub/renderer/ordered-collection.ts @@ -6,7 +6,7 @@ * @param last URL of last page (optional) * @param orderedItems attached objects (optional) */ -export default function(id: string | null, totalItems: any, first?: string, last?: string, orderedItems?: object) { +export default function(id: string | null, totalItems: any, first?: string, last?: string, orderedItems?: Record) { const page: any = { id, type: 'OrderedCollection', diff --git a/packages/backend/src/server/api/define.ts b/packages/backend/src/server/api/define.ts index 4bd8f95e31..48253e78e0 100644 --- a/packages/backend/src/server/api/define.ts +++ b/packages/backend/src/server/api/define.ts @@ -20,7 +20,7 @@ type SimpleUserInfo = { }; type Params = { - [P in keyof T['params']]: NonNullable[P]['transform'] extends Function + [P in keyof T['params']]: NonNullable[P]['transform'] extends () => any ? ReturnType[P]['transform']> : NonNullable[P]['default'] extends null | number | string ? NonOptional[P]['validator']['get']>[0]> @@ -30,7 +30,7 @@ type Params = { export type Response = Record | void; type executor = - (params: Params, user: T['requireCredential'] extends true ? SimpleUserInfo : SimpleUserInfo | null, token: AccessToken | null, file?: any, cleanup?: Function) => + (params: Params, user: T['requireCredential'] extends true ? SimpleUserInfo : SimpleUserInfo | null, token: AccessToken | null, file?: any, cleanup?: () => any) => Promise>>; export default function (meta: T, cb: executor) @@ -74,7 +74,7 @@ function getParams(defs: T, params: any): [Params, A }); return true; } else { - if (v === undefined && def.hasOwnProperty('default')) { + if (v === undefined && Object.prototype.hasOwnProperty.call(def, 'default')) { x[k] = def.default; } else { x[k] = v; diff --git a/packages/backend/src/server/api/endpoints/channels/update.ts b/packages/backend/src/server/api/endpoints/channels/update.ts index 9b447bd04b..05f279d6ac 100644 --- a/packages/backend/src/server/api/endpoints/channels/update.ts +++ b/packages/backend/src/server/api/endpoints/channels/update.ts @@ -69,7 +69,7 @@ export default define(meta, async (ps, me) => { throw new ApiError(meta.errors.accessDenied); } - // tslint:disable-next-line:no-unnecessary-initializer + // eslint:disable-next-line:no-unnecessary-initializer let banner = undefined; if (ps.bannerId != null) { banner = await DriveFiles.findOne({ diff --git a/packages/backend/src/server/api/endpoints/i/2fa/key-done.ts b/packages/backend/src/server/api/endpoints/i/2fa/key-done.ts index b4d3af235a..e06d0a9f68 100644 --- a/packages/backend/src/server/api/endpoints/i/2fa/key-done.ts +++ b/packages/backend/src/server/api/endpoints/i/2fa/key-done.ts @@ -75,7 +75,7 @@ export default define(meta, async (ps, user) => { const flags = attestation.authData[32]; - // tslint:disable-next-line:no-bitwise + // eslint:disable-next-line:no-bitwise if (!(flags & 1)) { throw new Error('user not present'); } diff --git a/packages/backend/src/server/api/limiter.ts b/packages/backend/src/server/api/limiter.ts index 1e2fe5bcb3..82a8613c90 100644 --- a/packages/backend/src/server/api/limiter.ts +++ b/packages/backend/src/server/api/limiter.ts @@ -10,16 +10,16 @@ const logger = new Logger('limiter'); export default (endpoint: IEndpoint, user: User) => new Promise((ok, reject) => { const limitation = endpoint.meta.limit!; - const key = limitation.hasOwnProperty('key') + const key = Object.prototype.hasOwnProperty.call(limitation, 'key') ? limitation.key : endpoint.name; const hasShortTermLimit = - limitation.hasOwnProperty('minInterval'); + Object.prototype.hasOwnProperty.call(limitation, 'minInterval'); const hasLongTermLimit = - limitation.hasOwnProperty('duration') && - limitation.hasOwnProperty('max'); + Object.prototype.hasOwnProperty.call(limitation, 'duration') && + Object.prototype.hasOwnProperty.call(limitation, 'max'); if (hasShortTermLimit) { min(); diff --git a/packages/backend/src/server/api/stream/channels/games/reversi.ts b/packages/backend/src/server/api/stream/channels/games/reversi.ts index 3b89aac35c..399750c26a 100644 --- a/packages/backend/src/server/api/stream/channels/games/reversi.ts +++ b/packages/backend/src/server/api/stream/channels/games/reversi.ts @@ -19,7 +19,7 @@ export default class extends Channel { @autobind public async onMessage(type: string, body: any) { switch (type) { - case 'ping': + case 'ping': { if (body.id == null) return; const matching = await ReversiMatchings.findOne({ parentId: this.user!.id, @@ -28,6 +28,7 @@ export default class extends Channel { if (matching == null) return; publishMainStream(matching.childId, 'reversiInvited', await ReversiMatchings.pack(matching, { id: matching.childId })); break; + } } } } diff --git a/packages/backend/src/server/api/stream/types.ts b/packages/backend/src/server/api/stream/types.ts index 70eb5c5ce5..f4302f64a0 100644 --- a/packages/backend/src/server/api/stream/types.ts +++ b/packages/backend/src/server/api/stream/types.ts @@ -31,7 +31,7 @@ export interface BroadcastTypes { } export interface UserStreamTypes { - terminate: {}; + terminate: Record; followChannel: Channel; unfollowChannel: Channel; updateUserProfile: UserProfile; diff --git a/packages/backend/src/services/chart/core.ts b/packages/backend/src/services/chart/core.ts index c0d3280c2b..78b7dd1359 100644 --- a/packages/backend/src/services/chart/core.ts +++ b/packages/backend/src/services/chart/core.ts @@ -70,7 +70,7 @@ export default abstract class Chart> { @autobind private static convertSchemaToFlatColumnDefinitions(schema: SimpleSchema) { - const columns = {} as any; + const columns = {} as Record; const flatColumns = (x: Obj, path?: string) => { for (const [k, v] of Object.entries(x)) { const p = path ? `${path}${this.columnDot}${k}` : k; @@ -93,8 +93,8 @@ export default abstract class Chart> { } @autobind - private static convertFlattenColumnsToObject(x: Record): Record { - const obj = {} as any; + private static convertFlattenColumnsToObject(x: Record): Record { + const obj = {} as Record; for (const k of Object.keys(x).filter(k => k.startsWith(Chart.columnPrefix))) { // now k is ___x_y_z const path = k.substr(Chart.columnPrefix.length).split(Chart.columnDot).join('.'); @@ -104,7 +104,7 @@ export default abstract class Chart> { } @autobind - private static convertObjectToFlattenColumns(x: Record) { + private static convertObjectToFlattenColumns(x: Record) { const columns = {} as Record; const flatten = (x: Obj, path?: string) => { for (const [k, v] of Object.entries(x)) { @@ -121,9 +121,9 @@ export default abstract class Chart> { } @autobind - private static countUniqueFields(x: Record) { + private static countUniqueFields(x: Record) { const exec = (x: Obj) => { - const res = {} as Record; + const res = {} as Record; for (const [k, v] of Object.entries(x)) { if (typeof v === 'object' && !Array.isArray(v)) { res[k] = exec(v); @@ -140,7 +140,7 @@ export default abstract class Chart> { @autobind private static convertQuery(diff: Record) { - const query: Record = {}; + const query: Record string> = {}; for (const [k, v] of Object.entries(diff)) { if (typeof v === 'number') { @@ -337,7 +337,7 @@ export default abstract class Chart> { } @autobind - public async save() { + public async save(): Promise { if (this.buffer.length === 0) { logger.info(`${this.name}: Write skipped`); return; diff --git a/packages/backend/src/services/stream.ts b/packages/backend/src/services/stream.ts index 2c308a1b54..0901857c33 100644 --- a/packages/backend/src/services/stream.ts +++ b/packages/backend/src/services/stream.ts @@ -37,74 +37,74 @@ class Publisher { channel: channel, message: message })); - } + }; public publishInternalEvent = (type: K, value?: InternalStreamTypes[K]): void => { this.publish('internal', type, typeof value === 'undefined' ? null : value); - } + }; public publishUserEvent = (userId: User['id'], type: K, value?: UserStreamTypes[K]): void => { this.publish(`user:${userId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishBroadcastStream = (type: K, value?: BroadcastTypes[K]): void => { this.publish('broadcast', type, typeof value === 'undefined' ? null : value); - } + }; public publishMainStream = (userId: User['id'], type: K, value?: MainStreamTypes[K]): void => { this.publish(`mainStream:${userId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishDriveStream = (userId: User['id'], type: K, value?: DriveStreamTypes[K]): void => { this.publish(`driveStream:${userId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishNoteStream = (noteId: Note['id'], type: K, value?: NoteStreamTypes[K]): void => { this.publish(`noteStream:${noteId}`, type, { id: noteId, body: value }); - } + }; public publishChannelStream = (channelId: Channel['id'], type: K, value?: ChannelStreamTypes[K]): void => { this.publish(`channelStream:${channelId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishUserListStream = (listId: UserList['id'], type: K, value?: UserListStreamTypes[K]): void => { this.publish(`userListStream:${listId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishAntennaStream = (antennaId: Antenna['id'], type: K, value?: AntennaStreamTypes[K]): void => { this.publish(`antennaStream:${antennaId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishMessagingStream = (userId: User['id'], otherpartyId: User['id'], type: K, value?: MessagingStreamTypes[K]): void => { this.publish(`messagingStream:${userId}-${otherpartyId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishGroupMessagingStream = (groupId: UserGroup['id'], type: K, value?: GroupMessagingStreamTypes[K]): void => { this.publish(`messagingStream:${groupId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishMessagingIndexStream = (userId: User['id'], type: K, value?: MessagingIndexStreamTypes[K]): void => { this.publish(`messagingIndexStream:${userId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishReversiStream = (userId: User['id'], type: K, value?: ReversiStreamTypes[K]): void => { this.publish(`reversiStream:${userId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishReversiGameStream = (gameId: ReversiGame['id'], type: K, value?: ReversiGameStreamTypes[K]): void => { this.publish(`reversiGameStream:${gameId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishNotesStream = (note: Packed<'Note'>): void => { this.publish('notesStream', null, note); - } + }; public publishAdminStream = (userId: User['id'], type: K, value?: AdminStreamTypes[K]): void => { this.publish(`adminStream:${userId}`, type, typeof value === 'undefined' ? null : value); - } + }; } const publisher = new Publisher(); -- cgit v1.2.3-freya