diff options
Diffstat (limited to 'packages/backend/src/server/api/stream/channels')
18 files changed, 1320 insertions, 0 deletions
diff --git a/packages/backend/src/server/api/stream/channels/admin.ts b/packages/backend/src/server/api/stream/channels/admin.ts new file mode 100644 index 0000000000..1ff932d1dd --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/admin.ts @@ -0,0 +1,16 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; + +export default class extends Channel { + public readonly chName = 'admin'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + // Subscribe admin stream + this.subscriber.on(`adminStream:${this.user!.id}`, data => { + this.send(data); + }); + } +} diff --git a/packages/backend/src/server/api/stream/channels/antenna.ts b/packages/backend/src/server/api/stream/channels/antenna.ts new file mode 100644 index 0000000000..3cbdfebb43 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/antenna.ts @@ -0,0 +1,45 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; +import { Notes } from '@/models/index'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { StreamMessages } from '../types'; + +export default class extends Channel { + public readonly chName = 'antenna'; + public static shouldShare = false; + public static requireCredential = false; + private antennaId: string; + + @autobind + public async init(params: any) { + this.antennaId = params.antennaId as string; + + // Subscribe stream + this.subscriber.on(`antennaStream:${this.antennaId}`, this.onEvent); + } + + @autobind + private async onEvent(data: StreamMessages['antenna']['payload']) { + if (data.type === 'note') { + const note = await Notes.pack(data.body.id, this.user, { detail: true }); + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } else { + this.send(data.type, data.body); + } + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off(`antennaStream:${this.antennaId}`, this.onEvent); + } +} diff --git a/packages/backend/src/server/api/stream/channels/channel.ts b/packages/backend/src/server/api/stream/channels/channel.ts new file mode 100644 index 0000000000..bf7942f522 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/channel.ts @@ -0,0 +1,92 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; +import { Notes, Users } from '@/models/index'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { User } from '@/models/entities/user'; +import { StreamMessages } from '../types'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'channel'; + public static shouldShare = false; + public static requireCredential = false; + private channelId: string; + private typers: Record<User['id'], Date> = {}; + private emitTypersIntervalId: ReturnType<typeof setInterval>; + + @autobind + public async init(params: any) { + this.channelId = params.channelId as string; + + // Subscribe stream + this.subscriber.on('notesStream', this.onNote); + this.subscriber.on(`channelStream:${this.channelId}`, this.onEvent); + this.emitTypersIntervalId = setInterval(this.emitTypers, 5000); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + if (note.channelId !== this.channelId) return; + + // リプライなら再pack + if (note.replyId != null) { + note.reply = await Notes.pack(note.replyId, this.user, { + detail: true + }); + } + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user, { + detail: true + }); + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } + + @autobind + private onEvent(data: StreamMessages['channel']['payload']) { + if (data.type === 'typing') { + const id = data.body; + const begin = this.typers[id] == null; + this.typers[id] = new Date(); + if (begin) { + this.emitTypers(); + } + } + } + + @autobind + private async emitTypers() { + const now = new Date(); + + // Remove not typing users + for (const [userId, date] of Object.entries(this.typers)) { + if (now.getTime() - date.getTime() > 5000) delete this.typers[userId]; + } + + const users = await Users.packMany(Object.keys(this.typers), null, { detail: false }); + + this.send({ + type: 'typers', + body: users, + }); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('notesStream', this.onNote); + this.subscriber.off(`channelStream:${this.channelId}`, this.onEvent); + + clearInterval(this.emitTypersIntervalId); + } +} diff --git a/packages/backend/src/server/api/stream/channels/drive.ts b/packages/backend/src/server/api/stream/channels/drive.ts new file mode 100644 index 0000000000..4112dd9b04 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/drive.ts @@ -0,0 +1,16 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; + +export default class extends Channel { + public readonly chName = 'drive'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + // Subscribe drive stream + this.subscriber.on(`driveStream:${this.user!.id}`, data => { + this.send(data); + }); + } +} diff --git a/packages/backend/src/server/api/stream/channels/games/reversi-game.ts b/packages/backend/src/server/api/stream/channels/games/reversi-game.ts new file mode 100644 index 0000000000..bfdbf1d266 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/games/reversi-game.ts @@ -0,0 +1,372 @@ +import autobind from 'autobind-decorator'; +import * as CRC32 from 'crc-32'; +import { publishReversiGameStream } from '@/services/stream'; +import Reversi from '../../../../../games/reversi/core'; +import * as maps from '../../../../../games/reversi/maps'; +import Channel from '../../channel'; +import { ReversiGame } from '@/models/entities/games/reversi/game'; +import { ReversiGames, Users } from '@/models/index'; +import { User } from '@/models/entities/user'; + +export default class extends Channel { + public readonly chName = 'gamesReversiGame'; + public static shouldShare = false; + public static requireCredential = false; + + private gameId: ReversiGame['id'] | null = null; + private watchers: Record<User['id'], Date> = {}; + private emitWatchersIntervalId: ReturnType<typeof setInterval>; + + @autobind + public async init(params: any) { + this.gameId = params.gameId; + + // Subscribe game stream + this.subscriber.on(`reversiGameStream:${this.gameId}`, this.onEvent); + this.emitWatchersIntervalId = setInterval(this.emitWatchers, 5000); + + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + // 観戦者イベント + this.watch(game); + } + + @autobind + private onEvent(data: any) { + if (data.type === 'watching') { + const id = data.body; + this.watchers[id] = new Date(); + } else { + this.send(data); + } + } + + @autobind + private async emitWatchers() { + const now = new Date(); + + // Remove not watching users + for (const [userId, date] of Object.entries(this.watchers)) { + if (now.getTime() - date.getTime() > 5000) delete this.watchers[userId]; + } + + const users = await Users.packMany(Object.keys(this.watchers), null, { detail: false }); + + this.send({ + type: 'watchers', + body: users, + }); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off(`reversiGameStream:${this.gameId}`, this.onEvent); + clearInterval(this.emitWatchersIntervalId); + } + + @autobind + public onMessage(type: string, body: any) { + switch (type) { + case 'accept': this.accept(true); break; + case 'cancelAccept': this.accept(false); break; + case 'updateSettings': this.updateSettings(body.key, body.value); break; + case 'initForm': this.initForm(body); break; + case 'updateForm': this.updateForm(body.id, body.value); break; + case 'message': this.message(body); break; + case 'set': this.set(body.pos); break; + case 'check': this.check(body.crc32); break; + } + } + + @autobind + private async updateSettings(key: string, value: any) { + if (this.user == null) return; + + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + if (game.isStarted) return; + if ((game.user1Id !== this.user.id) && (game.user2Id !== this.user.id)) return; + if ((game.user1Id === this.user.id) && game.user1Accepted) return; + if ((game.user2Id === this.user.id) && game.user2Accepted) return; + + if (!['map', 'bw', 'isLlotheo', 'canPutEverywhere', 'loopedBoard'].includes(key)) return; + + await ReversiGames.update(this.gameId!, { + [key]: value + }); + + publishReversiGameStream(this.gameId!, 'updateSettings', { + key: key, + value: value + }); + } + + @autobind + private async initForm(form: any) { + if (this.user == null) return; + + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + if (game.isStarted) return; + if ((game.user1Id !== this.user.id) && (game.user2Id !== this.user.id)) return; + + const set = game.user1Id === this.user.id ? { + form1: form + } : { + form2: form + }; + + await ReversiGames.update(this.gameId!, set); + + publishReversiGameStream(this.gameId!, 'initForm', { + userId: this.user.id, + form + }); + } + + @autobind + private async updateForm(id: string, value: any) { + if (this.user == null) return; + + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + if (game.isStarted) return; + if ((game.user1Id !== this.user.id) && (game.user2Id !== this.user.id)) return; + + const form = game.user1Id === this.user.id ? game.form2 : game.form1; + + const item = form.find((i: any) => i.id == id); + + if (item == null) return; + + item.value = value; + + const set = game.user1Id === this.user.id ? { + form2: form + } : { + form1: form + }; + + await ReversiGames.update(this.gameId!, set); + + publishReversiGameStream(this.gameId!, 'updateForm', { + userId: this.user.id, + id, + value + }); + } + + @autobind + private async message(message: any) { + if (this.user == null) return; + + message.id = Math.random(); + publishReversiGameStream(this.gameId!, 'message', { + userId: this.user.id, + message + }); + } + + @autobind + private async accept(accept: boolean) { + if (this.user == null) return; + + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + if (game.isStarted) return; + + let bothAccepted = false; + + if (game.user1Id === this.user.id) { + await ReversiGames.update(this.gameId!, { + user1Accepted: accept + }); + + publishReversiGameStream(this.gameId!, 'changeAccepts', { + user1: accept, + user2: game.user2Accepted + }); + + if (accept && game.user2Accepted) bothAccepted = true; + } else if (game.user2Id === this.user.id) { + await ReversiGames.update(this.gameId!, { + user2Accepted: accept + }); + + publishReversiGameStream(this.gameId!, 'changeAccepts', { + user1: game.user1Accepted, + user2: accept + }); + + if (accept && game.user1Accepted) bothAccepted = true; + } else { + return; + } + + if (bothAccepted) { + // 3秒後、まだacceptされていたらゲーム開始 + setTimeout(async () => { + const freshGame = await ReversiGames.findOne(this.gameId!); + if (freshGame == null || freshGame.isStarted || freshGame.isEnded) return; + if (!freshGame.user1Accepted || !freshGame.user2Accepted) return; + + let bw: number; + if (freshGame.bw == 'random') { + bw = Math.random() > 0.5 ? 1 : 2; + } else { + bw = parseInt(freshGame.bw, 10); + } + + function getRandomMap() { + const mapCount = Object.entries(maps).length; + const rnd = Math.floor(Math.random() * mapCount); + return Object.values(maps)[rnd].data; + } + + const map = freshGame.map != null ? freshGame.map : getRandomMap(); + + await ReversiGames.update(this.gameId!, { + startedAt: new Date(), + isStarted: true, + black: bw, + map: map + }); + + //#region 盤面に最初から石がないなどして始まった瞬間に勝敗が決定する場合があるのでその処理 + const o = new Reversi(map, { + isLlotheo: freshGame.isLlotheo, + canPutEverywhere: freshGame.canPutEverywhere, + loopedBoard: freshGame.loopedBoard + }); + + if (o.isEnded) { + let winner; + if (o.winner === true) { + winner = freshGame.black == 1 ? freshGame.user1Id : freshGame.user2Id; + } else if (o.winner === false) { + winner = freshGame.black == 1 ? freshGame.user2Id : freshGame.user1Id; + } else { + winner = null; + } + + await ReversiGames.update(this.gameId!, { + isEnded: true, + winnerId: winner + }); + + publishReversiGameStream(this.gameId!, 'ended', { + winnerId: winner, + game: await ReversiGames.pack(this.gameId!, this.user) + }); + } + //#endregion + + publishReversiGameStream(this.gameId!, 'started', + await ReversiGames.pack(this.gameId!, this.user)); + }, 3000); + } + } + + // 石を打つ + @autobind + private async set(pos: number) { + if (this.user == null) return; + + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + if (!game.isStarted) return; + if (game.isEnded) return; + if ((game.user1Id !== this.user.id) && (game.user2Id !== this.user.id)) return; + + const myColor = + ((game.user1Id === this.user.id) && game.black == 1) || ((game.user2Id === this.user.id) && game.black == 2) + ? true + : false; + + const o = new Reversi(game.map, { + isLlotheo: game.isLlotheo, + canPutEverywhere: game.canPutEverywhere, + loopedBoard: game.loopedBoard + }); + + // 盤面の状態を再生 + for (const log of game.logs) { + o.put(log.color, log.pos); + } + + if (o.turn !== myColor) return; + + if (!o.canPut(myColor, pos)) return; + o.put(myColor, pos); + + let winner; + if (o.isEnded) { + if (o.winner === true) { + winner = game.black == 1 ? game.user1Id : game.user2Id; + } else if (o.winner === false) { + winner = game.black == 1 ? game.user2Id : game.user1Id; + } else { + winner = null; + } + } + + const log = { + at: new Date(), + color: myColor, + pos + }; + + const crc32 = CRC32.str(game.logs.map(x => x.pos.toString()).join('') + pos.toString()).toString(); + + game.logs.push(log); + + await ReversiGames.update(this.gameId!, { + crc32, + isEnded: o.isEnded, + winnerId: winner, + logs: game.logs + }); + + publishReversiGameStream(this.gameId!, 'set', Object.assign(log, { + next: o.turn + })); + + if (o.isEnded) { + publishReversiGameStream(this.gameId!, 'ended', { + winnerId: winner, + game: await ReversiGames.pack(this.gameId!, this.user) + }); + } + } + + @autobind + private async check(crc32: string | number) { + const game = await ReversiGames.findOne(this.gameId!); + if (game == null) throw new Error('game not found'); + + if (!game.isStarted) return; + + if (crc32.toString() !== game.crc32) { + this.send('rescue', await ReversiGames.pack(game, this.user)); + } + + // ついでに観戦者イベントを発行 + this.watch(game); + } + + @autobind + private watch(game: ReversiGame) { + if (this.user != null) { + if ((game.user1Id !== this.user.id) && (game.user2Id !== this.user.id)) { + publishReversiGameStream(this.gameId!, 'watching', this.user.id); + } + } + } +} diff --git a/packages/backend/src/server/api/stream/channels/games/reversi.ts b/packages/backend/src/server/api/stream/channels/games/reversi.ts new file mode 100644 index 0000000000..3b89aac35c --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/games/reversi.ts @@ -0,0 +1,33 @@ +import autobind from 'autobind-decorator'; +import { publishMainStream } from '@/services/stream'; +import Channel from '../../channel'; +import { ReversiMatchings } from '@/models/index'; + +export default class extends Channel { + public readonly chName = 'gamesReversi'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + // Subscribe reversi stream + this.subscriber.on(`reversiStream:${this.user!.id}`, data => { + this.send(data); + }); + } + + @autobind + public async onMessage(type: string, body: any) { + switch (type) { + case 'ping': + if (body.id == null) return; + const matching = await ReversiMatchings.findOne({ + parentId: this.user!.id, + childId: body.id + }); + if (matching == null) return; + publishMainStream(matching.childId, 'reversiInvited', await ReversiMatchings.pack(matching, { id: matching.childId })); + break; + } + } +} diff --git a/packages/backend/src/server/api/stream/channels/global-timeline.ts b/packages/backend/src/server/api/stream/channels/global-timeline.ts new file mode 100644 index 0000000000..f5983ab472 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/global-timeline.ts @@ -0,0 +1,73 @@ +import autobind from 'autobind-decorator'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import Channel from '../channel'; +import { fetchMeta } from '@/misc/fetch-meta'; +import { Notes } from '@/models/index'; +import { checkWordMute } from '@/misc/check-word-mute'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'globalTimeline'; + public static shouldShare = true; + public static requireCredential = false; + + @autobind + public async init(params: any) { + const meta = await fetchMeta(); + if (meta.disableGlobalTimeline) { + if (this.user == null || (!this.user.isAdmin && !this.user.isModerator)) return; + } + + // Subscribe events + this.subscriber.on('notesStream', this.onNote); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + if (note.visibility !== 'public') return; + if (note.channelId != null) return; + + // リプライなら再pack + if (note.replyId != null) { + note.reply = await Notes.pack(note.replyId, this.user, { + detail: true + }); + } + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user, { + detail: true + }); + } + + // 関係ない返信は除外 + if (note.reply) { + const reply = note.reply; + // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合 + if (reply.userId !== this.user!.id && note.userId !== this.user!.id && reply.userId !== note.userId) return; + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + // 流れてきたNoteがミュートすべきNoteだったら無視する + // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) + // 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、 + // レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。 + // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる + if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('notesStream', this.onNote); + } +} diff --git a/packages/backend/src/server/api/stream/channels/hashtag.ts b/packages/backend/src/server/api/stream/channels/hashtag.ts new file mode 100644 index 0000000000..281be4f2eb --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/hashtag.ts @@ -0,0 +1,53 @@ +import autobind from 'autobind-decorator'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import Channel from '../channel'; +import { Notes } from '@/models/index'; +import { normalizeForSearch } from '@/misc/normalize-for-search'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'hashtag'; + public static shouldShare = false; + public static requireCredential = false; + private q: string[][]; + + @autobind + public async init(params: any) { + this.q = params.q; + + if (this.q == null) return; + + // Subscribe stream + this.subscriber.on('notesStream', this.onNote); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + const noteTags = note.tags ? note.tags.map((t: string) => t.toLowerCase()) : []; + const matched = this.q.some(tags => tags.every(tag => noteTags.includes(normalizeForSearch(tag)))); + if (!matched) return; + + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user, { + detail: true + }); + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('notesStream', this.onNote); + } +} diff --git a/packages/backend/src/server/api/stream/channels/home-timeline.ts b/packages/backend/src/server/api/stream/channels/home-timeline.ts new file mode 100644 index 0000000000..52e9aec250 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/home-timeline.ts @@ -0,0 +1,81 @@ +import autobind from 'autobind-decorator'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import Channel from '../channel'; +import { Notes } from '@/models/index'; +import { checkWordMute } from '@/misc/check-word-mute'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'homeTimeline'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + // Subscribe events + this.subscriber.on('notesStream', this.onNote); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + if (note.channelId) { + if (!this.followingChannels.has(note.channelId)) return; + } else { + // その投稿のユーザーをフォローしていなかったら弾く + if ((this.user!.id !== note.userId) && !this.following.has(note.userId)) return; + } + + if (['followers', 'specified'].includes(note.visibility)) { + note = await Notes.pack(note.id, this.user!, { + detail: true + }); + + if (note.isHidden) { + return; + } + } else { + // リプライなら再pack + if (note.replyId != null) { + note.reply = await Notes.pack(note.replyId, this.user!, { + detail: true + }); + } + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user!, { + detail: true + }); + } + } + + // 関係ない返信は除外 + if (note.reply) { + const reply = note.reply; + // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合 + if (reply.userId !== this.user!.id && note.userId !== this.user!.id && reply.userId !== note.userId) return; + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + // 流れてきたNoteがミュートすべきNoteだったら無視する + // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) + // 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、 + // レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。 + // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる + if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('notesStream', this.onNote); + } +} diff --git a/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts b/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts new file mode 100644 index 0000000000..51f95fc0cd --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts @@ -0,0 +1,89 @@ +import autobind from 'autobind-decorator'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import Channel from '../channel'; +import { fetchMeta } from '@/misc/fetch-meta'; +import { Notes } from '@/models/index'; +import { checkWordMute } from '@/misc/check-word-mute'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'hybridTimeline'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + const meta = await fetchMeta(); + if (meta.disableLocalTimeline && !this.user!.isAdmin && !this.user!.isModerator) return; + + // Subscribe events + this.subscriber.on('notesStream', this.onNote); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + // チャンネルの投稿ではなく、自分自身の投稿 または + // チャンネルの投稿ではなく、その投稿のユーザーをフォローしている または + // チャンネルの投稿ではなく、全体公開のローカルの投稿 または + // フォローしているチャンネルの投稿 の場合だけ + if (!( + (note.channelId == null && this.user!.id === note.userId) || + (note.channelId == null && this.following.has(note.userId)) || + (note.channelId == null && (note.user.host == null && note.visibility === 'public')) || + (note.channelId != null && this.followingChannels.has(note.channelId)) + )) return; + + if (['followers', 'specified'].includes(note.visibility)) { + note = await Notes.pack(note.id, this.user!, { + detail: true + }); + + if (note.isHidden) { + return; + } + } else { + // リプライなら再pack + if (note.replyId != null) { + note.reply = await Notes.pack(note.replyId, this.user!, { + detail: true + }); + } + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user!, { + detail: true + }); + } + } + + // 関係ない返信は除外 + if (note.reply) { + const reply = note.reply; + // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合 + if (reply.userId !== this.user!.id && note.userId !== this.user!.id && reply.userId !== note.userId) return; + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + // 流れてきたNoteがミュートすべきNoteだったら無視する + // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) + // 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、 + // レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。 + // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる + if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('notesStream', this.onNote); + } +} diff --git a/packages/backend/src/server/api/stream/channels/index.ts b/packages/backend/src/server/api/stream/channels/index.ts new file mode 100644 index 0000000000..1841573043 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/index.ts @@ -0,0 +1,37 @@ +import main from './main'; +import homeTimeline from './home-timeline'; +import localTimeline from './local-timeline'; +import hybridTimeline from './hybrid-timeline'; +import globalTimeline from './global-timeline'; +import serverStats from './server-stats'; +import queueStats from './queue-stats'; +import userList from './user-list'; +import antenna from './antenna'; +import messaging from './messaging'; +import messagingIndex from './messaging-index'; +import drive from './drive'; +import hashtag from './hashtag'; +import channel from './channel'; +import admin from './admin'; +import gamesReversi from './games/reversi'; +import gamesReversiGame from './games/reversi-game'; + +export default { + main, + homeTimeline, + localTimeline, + hybridTimeline, + globalTimeline, + serverStats, + queueStats, + userList, + antenna, + messaging, + messagingIndex, + drive, + hashtag, + channel, + admin, + gamesReversi, + gamesReversiGame +}; diff --git a/packages/backend/src/server/api/stream/channels/local-timeline.ts b/packages/backend/src/server/api/stream/channels/local-timeline.ts new file mode 100644 index 0000000000..a6166c2be2 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/local-timeline.ts @@ -0,0 +1,74 @@ +import autobind from 'autobind-decorator'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import Channel from '../channel'; +import { fetchMeta } from '@/misc/fetch-meta'; +import { Notes } from '@/models/index'; +import { checkWordMute } from '@/misc/check-word-mute'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'localTimeline'; + public static shouldShare = true; + public static requireCredential = false; + + @autobind + public async init(params: any) { + const meta = await fetchMeta(); + if (meta.disableLocalTimeline) { + if (this.user == null || (!this.user.isAdmin && !this.user.isModerator)) return; + } + + // Subscribe events + this.subscriber.on('notesStream', this.onNote); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + if (note.user.host !== null) return; + if (note.visibility !== 'public') return; + if (note.channelId != null && !this.followingChannels.has(note.channelId)) return; + + // リプライなら再pack + if (note.replyId != null) { + note.reply = await Notes.pack(note.replyId, this.user, { + detail: true + }); + } + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user, { + detail: true + }); + } + + // 関係ない返信は除外 + if (note.reply) { + const reply = note.reply; + // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合 + if (reply.userId !== this.user!.id && note.userId !== this.user!.id && reply.userId !== note.userId) return; + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + // 流れてきたNoteがミュートすべきNoteだったら無視する + // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) + // 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、 + // レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。 + // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる + if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + + this.connection.cacheNote(note); + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('notesStream', this.onNote); + } +} diff --git a/packages/backend/src/server/api/stream/channels/main.ts b/packages/backend/src/server/api/stream/channels/main.ts new file mode 100644 index 0000000000..131ac30472 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/main.ts @@ -0,0 +1,43 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; +import { Notes } from '@/models/index'; + +export default class extends Channel { + public readonly chName = 'main'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + // Subscribe main stream channel + this.subscriber.on(`mainStream:${this.user!.id}`, async data => { + switch (data.type) { + case 'notification': { + if (data.body.userId && this.muting.has(data.body.userId)) return; + + if (data.body.note && data.body.note.isHidden) { + const note = await Notes.pack(data.body.note.id, this.user, { + detail: true + }); + this.connection.cacheNote(note); + data.body.note = note; + } + break; + } + case 'mention': { + if (this.muting.has(data.body.userId)) return; + if (data.body.isHidden) { + const note = await Notes.pack(data.body.id, this.user, { + detail: true + }); + this.connection.cacheNote(note); + data.body = note; + } + break; + } + } + + this.send(data.type, data.body); + }); + } +} diff --git a/packages/backend/src/server/api/stream/channels/messaging-index.ts b/packages/backend/src/server/api/stream/channels/messaging-index.ts new file mode 100644 index 0000000000..0c495398ab --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/messaging-index.ts @@ -0,0 +1,16 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; + +export default class extends Channel { + public readonly chName = 'messagingIndex'; + public static shouldShare = true; + public static requireCredential = true; + + @autobind + public async init(params: any) { + // Subscribe messaging index stream + this.subscriber.on(`messagingIndexStream:${this.user!.id}`, data => { + this.send(data); + }); + } +} diff --git a/packages/backend/src/server/api/stream/channels/messaging.ts b/packages/backend/src/server/api/stream/channels/messaging.ts new file mode 100644 index 0000000000..c049e880b9 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/messaging.ts @@ -0,0 +1,106 @@ +import autobind from 'autobind-decorator'; +import { readUserMessagingMessage, readGroupMessagingMessage, deliverReadActivity } from '../../common/read-messaging-message'; +import Channel from '../channel'; +import { UserGroupJoinings, Users, MessagingMessages } from '@/models/index'; +import { User, ILocalUser, IRemoteUser } from '@/models/entities/user'; +import { UserGroup } from '@/models/entities/user-group'; +import { StreamMessages } from '../types'; + +export default class extends Channel { + public readonly chName = 'messaging'; + public static shouldShare = false; + public static requireCredential = true; + + private otherpartyId: string | null; + private otherparty: User | null; + private groupId: string | null; + private subCh: `messagingStream:${User['id']}-${User['id']}` | `messagingStream:${UserGroup['id']}`; + private typers: Record<User['id'], Date> = {}; + private emitTypersIntervalId: ReturnType<typeof setInterval>; + + @autobind + public async init(params: any) { + this.otherpartyId = params.otherparty; + this.otherparty = this.otherpartyId ? await Users.findOneOrFail({ id: this.otherpartyId }) : null; + this.groupId = params.group; + + // Check joining + if (this.groupId) { + const joining = await UserGroupJoinings.findOne({ + userId: this.user!.id, + userGroupId: this.groupId + }); + + if (joining == null) { + return; + } + } + + this.emitTypersIntervalId = setInterval(this.emitTypers, 5000); + + this.subCh = this.otherpartyId + ? `messagingStream:${this.user!.id}-${this.otherpartyId}` + : `messagingStream:${this.groupId}`; + + // Subscribe messaging stream + this.subscriber.on(this.subCh, this.onEvent); + } + + @autobind + private onEvent(data: StreamMessages['messaging']['payload'] | StreamMessages['groupMessaging']['payload']) { + if (data.type === 'typing') { + const id = data.body; + const begin = this.typers[id] == null; + this.typers[id] = new Date(); + if (begin) { + this.emitTypers(); + } + } else { + this.send(data); + } + } + + @autobind + public onMessage(type: string, body: any) { + switch (type) { + case 'read': + if (this.otherpartyId) { + readUserMessagingMessage(this.user!.id, this.otherpartyId, [body.id]); + + // リモートユーザーからのメッセージだったら既読配信 + if (Users.isLocalUser(this.user!) && Users.isRemoteUser(this.otherparty!)) { + MessagingMessages.findOne(body.id).then(message => { + if (message) deliverReadActivity(this.user as ILocalUser, this.otherparty as IRemoteUser, message); + }); + } + } else if (this.groupId) { + readGroupMessagingMessage(this.user!.id, this.groupId, [body.id]); + } + break; + } + } + + @autobind + private async emitTypers() { + const now = new Date(); + + // Remove not typing users + for (const [userId, date] of Object.entries(this.typers)) { + if (now.getTime() - date.getTime() > 5000) delete this.typers[userId]; + } + + const users = await Users.packMany(Object.keys(this.typers), null, { detail: false }); + + this.send({ + type: 'typers', + body: users, + }); + } + + @autobind + public dispose() { + this.subscriber.off(this.subCh, this.onEvent); + + clearInterval(this.emitTypersIntervalId); + } +} diff --git a/packages/backend/src/server/api/stream/channels/queue-stats.ts b/packages/backend/src/server/api/stream/channels/queue-stats.ts new file mode 100644 index 0000000000..0bda0cfcb9 --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/queue-stats.ts @@ -0,0 +1,41 @@ +import autobind from 'autobind-decorator'; +import Xev from 'xev'; +import Channel from '../channel'; + +const ev = new Xev(); + +export default class extends Channel { + public readonly chName = 'queueStats'; + public static shouldShare = true; + public static requireCredential = false; + + @autobind + public async init(params: any) { + ev.addListener('queueStats', this.onStats); + } + + @autobind + private onStats(stats: any) { + this.send('stats', stats); + } + + @autobind + public onMessage(type: string, body: any) { + switch (type) { + case 'requestLog': + ev.once(`queueStatsLog:${body.id}`, statsLog => { + this.send('statsLog', statsLog); + }); + ev.emit('requestQueueStatsLog', { + id: body.id, + length: body.length + }); + break; + } + } + + @autobind + public dispose() { + ev.removeListener('queueStats', this.onStats); + } +} diff --git a/packages/backend/src/server/api/stream/channels/server-stats.ts b/packages/backend/src/server/api/stream/channels/server-stats.ts new file mode 100644 index 0000000000..d245a7f70c --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/server-stats.ts @@ -0,0 +1,41 @@ +import autobind from 'autobind-decorator'; +import Xev from 'xev'; +import Channel from '../channel'; + +const ev = new Xev(); + +export default class extends Channel { + public readonly chName = 'serverStats'; + public static shouldShare = true; + public static requireCredential = false; + + @autobind + public async init(params: any) { + ev.addListener('serverStats', this.onStats); + } + + @autobind + private onStats(stats: any) { + this.send('stats', stats); + } + + @autobind + public onMessage(type: string, body: any) { + switch (type) { + case 'requestLog': + ev.once(`serverStatsLog:${body.id}`, statsLog => { + this.send('statsLog', statsLog); + }); + ev.emit('requestServerStatsLog', { + id: body.id, + length: body.length + }); + break; + } + } + + @autobind + public dispose() { + ev.removeListener('serverStats', this.onStats); + } +} diff --git a/packages/backend/src/server/api/stream/channels/user-list.ts b/packages/backend/src/server/api/stream/channels/user-list.ts new file mode 100644 index 0000000000..63b254605b --- /dev/null +++ b/packages/backend/src/server/api/stream/channels/user-list.ts @@ -0,0 +1,92 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; +import { Notes, UserListJoinings, UserLists } from '@/models/index'; +import { isMutedUserRelated } from '@/misc/is-muted-user-related'; +import { User } from '@/models/entities/user'; +import { isBlockerUserRelated } from '@/misc/is-blocker-user-related'; +import { Packed } from '@/misc/schema'; + +export default class extends Channel { + public readonly chName = 'userList'; + public static shouldShare = false; + public static requireCredential = false; + private listId: string; + public listUsers: User['id'][] = []; + private listUsersClock: NodeJS.Timer; + + @autobind + public async init(params: any) { + this.listId = params.listId as string; + + // Check existence and owner + const list = await UserLists.findOne({ + id: this.listId, + userId: this.user!.id + }); + if (!list) return; + + // Subscribe stream + this.subscriber.on(`userListStream:${this.listId}`, this.send); + + this.subscriber.on('notesStream', this.onNote); + + this.updateListUsers(); + this.listUsersClock = setInterval(this.updateListUsers, 5000); + } + + @autobind + private async updateListUsers() { + const users = await UserListJoinings.find({ + where: { + userListId: this.listId, + }, + select: ['userId'] + }); + + this.listUsers = users.map(x => x.userId); + } + + @autobind + private async onNote(note: Packed<'Note'>) { + if (!this.listUsers.includes(note.userId)) return; + + if (['followers', 'specified'].includes(note.visibility)) { + note = await Notes.pack(note.id, this.user, { + detail: true + }); + + if (note.isHidden) { + return; + } + } else { + // リプライなら再pack + if (note.replyId != null) { + note.reply = await Notes.pack(note.replyId, this.user, { + detail: true + }); + } + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await Notes.pack(note.renoteId, this.user, { + detail: true + }); + } + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (isMutedUserRelated(note, this.muting)) return; + // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する + if (isBlockerUserRelated(note, this.blocking)) return; + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off(`userListStream:${this.listId}`, this.send); + this.subscriber.off('notesStream', this.onNote); + + clearInterval(this.listUsersClock); + } +} |