diff options
| author | syuilo <Syuilotan@yahoo.co.jp> | 2018-10-07 11:06:17 +0900 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-10-07 11:06:17 +0900 |
| commit | d0570d7fe3a3bf3c6b0312dece74bacc04c3534a (patch) | |
| tree | 698218279a38f9c78b0350e81b8ac77ae52e4a0d /src/server/api | |
| parent | Fix お知らせが確認中...のままになる(Announcement Fetching...) (... (diff) | |
| download | sharkey-d0570d7fe3a3bf3c6b0312dece74bacc04c3534a.tar.gz sharkey-d0570d7fe3a3bf3c6b0312dece74bacc04c3534a.tar.bz2 sharkey-d0570d7fe3a3bf3c6b0312dece74bacc04c3534a.zip | |
V10 (#2826)
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* Update CHANGELOG.md
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* Update CHANGELOG.md
* Update CHANGELOG.md
* wip
* Update CHANGELOG.md
* wip
* wip
* wip
* wip
Diffstat (limited to 'src/server/api')
46 files changed, 1019 insertions, 837 deletions
diff --git a/src/server/api/call.ts b/src/server/api/call.ts index ee79e0a13c..7419bdc95d 100644 --- a/src/server/api/call.ts +++ b/src/server/api/call.ts @@ -9,6 +9,10 @@ export default (endpoint: string, user: IUser, app: IApp, data: any, file?: any) const ep = endpoints.find(e => e.name === endpoint); + if (ep == null) { + return rej('ENDPOINT_NOT_FOUND'); + } + if (ep.meta.secure && !isSecure) { return rej('ACCESS_DENIED'); } diff --git a/src/server/api/common/read-messaging-message.ts b/src/server/api/common/read-messaging-message.ts index 005240a37c..075e369832 100644 --- a/src/server/api/common/read-messaging-message.ts +++ b/src/server/api/common/read-messaging-message.ts @@ -1,7 +1,7 @@ import * as mongo from 'mongodb'; import Message from '../../../models/messaging-message'; import { IMessagingMessage as IMessage } from '../../../models/messaging-message'; -import { publishUserStream } from '../../../stream'; +import { publishMainStream } from '../../../stream'; import { publishMessagingStream } from '../../../stream'; import { publishMessagingIndexStream } from '../../../stream'; import User from '../../../models/user'; @@ -71,6 +71,6 @@ export default ( }); // 全ての(いままで未読だった)自分宛てのメッセージを(これで)読みましたよというイベントを発行 - publishUserStream(userId, 'read_all_messaging_messages'); + publishMainStream(userId, 'readAllMessagingMessages'); } }); diff --git a/src/server/api/common/read-notification.ts b/src/server/api/common/read-notification.ts index 0b0f3e4e5a..2d58ada4ce 100644 --- a/src/server/api/common/read-notification.ts +++ b/src/server/api/common/read-notification.ts @@ -1,6 +1,6 @@ import * as mongo from 'mongodb'; import { default as Notification, INotification } from '../../../models/notification'; -import { publishUserStream } from '../../../stream'; +import { publishMainStream } from '../../../stream'; import Mute from '../../../models/mute'; import User from '../../../models/user'; @@ -66,6 +66,6 @@ export default ( }); // 全ての(いままで未読だった)通知を(これで)読みましたよというイベントを発行 - publishUserStream(userId, 'read_all_notifications'); + publishMainStream(userId, 'readAllNotifications'); } }); diff --git a/src/server/api/endpoints/games/reversi/match.ts b/src/server/api/endpoints/games/reversi/match.ts index aba400af1d..d7483a0bfd 100644 --- a/src/server/api/endpoints/games/reversi/match.ts +++ b/src/server/api/endpoints/games/reversi/match.ts @@ -2,7 +2,7 @@ import $ from 'cafy'; import ID from '../../../../../misc/cafy-id'; import Matching, { pack as packMatching } from '../../../../../models/games/reversi/matching'; import ReversiGame, { pack as packGame } from '../../../../../models/games/reversi/game'; import User, { ILocalUser } from '../../../../../models/user'; -import { publishUserStream, publishReversiStream } from '../../../../../stream'; +import { publishMainStream, publishReversiStream } from '../../../../../stream'; import { eighteight } from '../../../../../games/reversi/maps'; export const meta = { @@ -58,7 +58,7 @@ export default (params: any, user: ILocalUser) => new Promise(async (res, rej) = }); if (other == 0) { - publishUserStream(user._id, 'reversi_no_invites'); + publishMainStream(user._id, 'reversi_no_invites'); } } else { // Fetch child @@ -94,6 +94,6 @@ export default (params: any, user: ILocalUser) => new Promise(async (res, rej) = // 招待 publishReversiStream(child._id, 'invited', packed); - publishUserStream(child._id, 'reversi_invited', packed); + publishMainStream(child._id, 'reversiInvited', packed); } }); diff --git a/src/server/api/endpoints/i/regenerate_token.ts b/src/server/api/endpoints/i/regenerate_token.ts index fe4a5cd118..2d85f06cfa 100644 --- a/src/server/api/endpoints/i/regenerate_token.ts +++ b/src/server/api/endpoints/i/regenerate_token.ts @@ -1,7 +1,7 @@ import $ from 'cafy'; import * as bcrypt from 'bcryptjs'; import User, { ILocalUser } from '../../../../models/user'; -import { publishUserStream } from '../../../../stream'; +import { publishMainStream } from '../../../../stream'; import generateUserToken from '../../common/generate-native-user-token'; export const meta = { @@ -33,5 +33,5 @@ export default async (params: any, user: ILocalUser) => new Promise(async (res, res(); // Publish event - publishUserStream(user._id, 'my_token_regenerated'); + publishMainStream(user._id, 'myTokenRegenerated'); }); diff --git a/src/server/api/endpoints/i/update.ts b/src/server/api/endpoints/i/update.ts index c1be0b6ebc..548ce5cadb 100644 --- a/src/server/api/endpoints/i/update.ts +++ b/src/server/api/endpoints/i/update.ts @@ -1,6 +1,6 @@ import $ from 'cafy'; import ID from '../../../../misc/cafy-id'; import User, { isValidName, isValidDescription, isValidLocation, isValidBirthday, pack, ILocalUser } from '../../../../models/user'; -import { publishUserStream } from '../../../../stream'; +import { publishMainStream } from '../../../../stream'; import DriveFile from '../../../../models/drive-file'; import acceptAllFollowRequests from '../../../../services/following/requests/accept-all'; import { IApp } from '../../../../models/app'; @@ -177,7 +177,7 @@ export default async (params: any, user: ILocalUser, app: IApp) => new Promise(a res(iObj); // Publish meUpdated event - publishUserStream(user._id, 'meUpdated', iObj); + publishMainStream(user._id, 'meUpdated', iObj); // 鍵垢を解除したとき、溜まっていたフォローリクエストがあるならすべて承認 if (user.isLocked && ps.isLocked === false) { diff --git a/src/server/api/endpoints/i/update_client_setting.ts b/src/server/api/endpoints/i/update_client_setting.ts index aed93c792f..2c05299dff 100644 --- a/src/server/api/endpoints/i/update_client_setting.ts +++ b/src/server/api/endpoints/i/update_client_setting.ts @@ -1,6 +1,6 @@ import $ from 'cafy'; import User, { ILocalUser } from '../../../../models/user'; -import { publishUserStream } from '../../../../stream'; +import { publishMainStream } from '../../../../stream'; export const meta = { requireCredential: true, @@ -26,7 +26,7 @@ export default async (params: any, user: ILocalUser) => new Promise(async (res, res(); // Publish event - publishUserStream(user._id, 'clientSettingUpdated', { + publishMainStream(user._id, 'clientSettingUpdated', { key: name, value }); diff --git a/src/server/api/endpoints/i/update_home.ts b/src/server/api/endpoints/i/update_home.ts index ffca9b90b3..27afc9fe5a 100644 --- a/src/server/api/endpoints/i/update_home.ts +++ b/src/server/api/endpoints/i/update_home.ts @@ -1,6 +1,6 @@ import $ from 'cafy'; import User, { ILocalUser } from '../../../../models/user'; -import { publishUserStream } from '../../../../stream'; +import { publishMainStream } from '../../../../stream'; export const meta = { requireCredential: true, @@ -25,5 +25,5 @@ export default async (params: any, user: ILocalUser) => new Promise(async (res, res(); - publishUserStream(user._id, 'home_updated', home); + publishMainStream(user._id, 'homeUpdated', home); }); diff --git a/src/server/api/endpoints/i/update_mobile_home.ts b/src/server/api/endpoints/i/update_mobile_home.ts index 0b72fbe2c1..1d4df389e4 100644 --- a/src/server/api/endpoints/i/update_mobile_home.ts +++ b/src/server/api/endpoints/i/update_mobile_home.ts @@ -1,6 +1,6 @@ import $ from 'cafy'; import User, { ILocalUser } from '../../../../models/user'; -import { publishUserStream } from '../../../../stream'; +import { publishMainStream } from '../../../../stream'; export const meta = { requireCredential: true, @@ -24,5 +24,5 @@ export default async (params: any, user: ILocalUser) => new Promise(async (res, res(); - publishUserStream(user._id, 'mobile_home_updated', home); + publishMainStream(user._id, 'mobileHomeUpdated', home); }); diff --git a/src/server/api/endpoints/i/update_widget.ts b/src/server/api/endpoints/i/update_widget.ts index 5cbe7c07a3..92499493eb 100644 --- a/src/server/api/endpoints/i/update_widget.ts +++ b/src/server/api/endpoints/i/update_widget.ts @@ -1,6 +1,6 @@ import $ from 'cafy'; import User, { ILocalUser } from '../../../../models/user'; -import { publishUserStream } from '../../../../stream'; +import { publishMainStream } from '../../../../stream'; export const meta = { requireCredential: true, @@ -73,7 +73,7 @@ export default async (params: any, user: ILocalUser) => new Promise(async (res, //#endregion if (widget) { - publishUserStream(user._id, 'widgetUpdated', { + publishMainStream(user._id, 'widgetUpdated', { id, data }); diff --git a/src/server/api/endpoints/messaging/messages/create.ts b/src/server/api/endpoints/messaging/messages/create.ts index 9a49e09248..f504f92326 100644 --- a/src/server/api/endpoints/messaging/messages/create.ts +++ b/src/server/api/endpoints/messaging/messages/create.ts @@ -6,7 +6,7 @@ import User, { ILocalUser } from '../../../../../models/user'; import Mute from '../../../../../models/mute'; import DriveFile from '../../../../../models/drive-file'; import { pack } from '../../../../../models/messaging-message'; -import { publishUserStream } from '../../../../../stream'; +import { publishMainStream } from '../../../../../stream'; import { publishMessagingStream, publishMessagingIndexStream } from '../../../../../stream'; import pushSw from '../../../../../push-sw'; @@ -88,12 +88,12 @@ export default (params: any, user: ILocalUser) => new Promise(async (res, rej) = // 自分のストリーム publishMessagingStream(message.userId, message.recipientId, 'message', messageObj); publishMessagingIndexStream(message.userId, 'message', messageObj); - publishUserStream(message.userId, 'messaging_message', messageObj); + publishMainStream(message.userId, 'messagingMessage', messageObj); // 相手のストリーム publishMessagingStream(message.recipientId, message.userId, 'message', messageObj); publishMessagingIndexStream(message.recipientId, 'message', messageObj); - publishUserStream(message.recipientId, 'messaging_message', messageObj); + publishMainStream(message.recipientId, 'messagingMessage', messageObj); // Update flag User.update({ _id: recipient._id }, { @@ -117,8 +117,8 @@ export default (params: any, user: ILocalUser) => new Promise(async (res, rej) = } //#endregion - publishUserStream(message.recipientId, 'unread_messaging_message', messageObj); - pushSw(message.recipientId, 'unread_messaging_message', messageObj); + publishMainStream(message.recipientId, 'unreadMessagingMessage', messageObj); + pushSw(message.recipientId, 'unreadMessagingMessage', messageObj); } }, 3000); diff --git a/src/server/api/endpoints/notes/polls/vote.ts b/src/server/api/endpoints/notes/polls/vote.ts index ab80e7f5d0..3b78d62fd3 100644 --- a/src/server/api/endpoints/notes/polls/vote.ts +++ b/src/server/api/endpoints/notes/polls/vote.ts @@ -72,7 +72,10 @@ export default (params: any, user: ILocalUser) => new Promise(async (res, rej) = $inc: inc }); - publishNoteStream(note._id, 'poll_voted'); + publishNoteStream(note._id, 'pollVoted', { + choice: choice, + userId: user._id.toHexString() + }); // Notify notify(note.userId, user._id, 'poll_vote', { diff --git a/src/server/api/endpoints/notifications/mark_all_as_read.ts b/src/server/api/endpoints/notifications/mark_all_as_read.ts index e2bde777b3..6487cd8b48 100644 --- a/src/server/api/endpoints/notifications/mark_all_as_read.ts +++ b/src/server/api/endpoints/notifications/mark_all_as_read.ts @@ -1,5 +1,5 @@ import Notification from '../../../../models/notification'; -import { publishUserStream } from '../../../../stream'; +import { publishMainStream } from '../../../../stream'; import User, { ILocalUser } from '../../../../models/user'; export const meta = { @@ -40,5 +40,5 @@ export default (params: any, user: ILocalUser) => new Promise(async (res, rej) = }); // 全ての通知を読みましたよというイベントを発行 - publishUserStream(user._id, 'read_all_notifications'); + publishMainStream(user._id, 'readAllNotifications'); }); diff --git a/src/server/api/private/signin.ts b/src/server/api/private/signin.ts index c42fb7bd8c..0e44c2ddd6 100644 --- a/src/server/api/private/signin.ts +++ b/src/server/api/private/signin.ts @@ -3,7 +3,7 @@ import * as bcrypt from 'bcryptjs'; import * as speakeasy from 'speakeasy'; import User, { ILocalUser } from '../../../models/user'; import Signin, { pack } from '../../../models/signin'; -import { publishUserStream } from '../../../stream'; +import { publishMainStream } from '../../../stream'; import signin from '../common/signin'; import config from '../../../config'; @@ -87,5 +87,5 @@ export default async (ctx: Koa.Context) => { }); // Publish signin event - publishUserStream(user._id, 'signin', await pack(record)); + publishMainStream(user._id, 'signin', await pack(record)); }; diff --git a/src/server/api/service/twitter.ts b/src/server/api/service/twitter.ts index aad2846bb4..f71e588628 100644 --- a/src/server/api/service/twitter.ts +++ b/src/server/api/service/twitter.ts @@ -4,7 +4,7 @@ import * as uuid from 'uuid'; import autwh from 'autwh'; import redis from '../../../db/redis'; import User, { pack, ILocalUser } from '../../../models/user'; -import { publishUserStream } from '../../../stream'; +import { publishMainStream } from '../../../stream'; import config from '../../../config'; import signin from '../common/signin'; @@ -49,7 +49,7 @@ router.get('/disconnect/twitter', async ctx => { ctx.body = `Twitterの連携を解除しました :v:`; // Publish i updated event - publishUserStream(user._id, 'meUpdated', await pack(user, user, { + publishMainStream(user._id, 'meUpdated', await pack(user, user, { detail: true, includeSecrets: true })); @@ -174,7 +174,7 @@ if (config.twitter == null) { ctx.body = `Twitter: @${result.screenName} を、Misskey: @${user.username} に接続しました!`; // Publish i updated event - publishUserStream(user._id, 'meUpdated', await pack(user, user, { + publishMainStream(user._id, 'meUpdated', await pack(user, user, { detail: true, includeSecrets: true })); diff --git a/src/server/api/stream/channel.ts b/src/server/api/stream/channel.ts new file mode 100644 index 0000000000..e2726060dc --- /dev/null +++ b/src/server/api/stream/channel.ts @@ -0,0 +1,39 @@ +import autobind from 'autobind-decorator'; +import Connection from '.'; + +/** + * Stream channel + */ +export default abstract class Channel { + protected connection: Connection; + public id: string; + + protected get user() { + return this.connection.user; + } + + protected get subscriber() { + return this.connection.subscriber; + } + + constructor(id: string, connection: Connection) { + this.id = id; + this.connection = connection; + } + + @autobind + public send(typeOrPayload: any, payload?: any) { + const type = payload === undefined ? typeOrPayload.type : typeOrPayload; + const body = payload === undefined ? typeOrPayload.body : payload; + + this.connection.sendMessageToWs('channel', { + id: this.id, + type: type, + body: body + }); + } + + public abstract init(params: any): void; + public dispose?(): void; + public onMessage?(type: string, body: any): void; +} diff --git a/src/server/api/stream/channels/drive.ts b/src/server/api/stream/channels/drive.ts new file mode 100644 index 0000000000..807fc93cd0 --- /dev/null +++ b/src/server/api/stream/channels/drive.ts @@ -0,0 +1,12 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; + +export default class extends Channel { + @autobind + public async init(params: any) { + // Subscribe drive stream + this.subscriber.on(`driveStream:${this.user._id}`, data => { + this.send(data); + }); + } +} diff --git a/src/server/api/stream/channels/games/reversi-game.ts b/src/server/api/stream/channels/games/reversi-game.ts new file mode 100644 index 0000000000..11f1fb1feb --- /dev/null +++ b/src/server/api/stream/channels/games/reversi-game.ts @@ -0,0 +1,309 @@ +import autobind from 'autobind-decorator'; +import * as CRC32 from 'crc-32'; +import ReversiGame, { pack } from '../../../../../models/games/reversi/game'; +import { publishReversiGameStream } from '../../../../../stream'; +import Reversi from '../../../../../games/reversi/core'; +import * as maps from '../../../../../games/reversi/maps'; +import Channel from '../../channel'; + +export default class extends Channel { + private gameId: string; + + @autobind + public async init(params: any) { + this.gameId = params.gameId as string; + + // Subscribe game stream + this.subscriber.on(`reversiGameStream:${this.gameId}`, data => { + this.send(data); + }); + } + + @autobind + public onMessage(type: string, body: any) { + switch (type) { + case 'accept': this.accept(true); break; + case 'cancel-accept': this.accept(false); break; + case 'update-settings': this.updateSettings(body.settings); break; + case 'init-form': this.initForm(body); break; + case 'update-form': this.updateForm(body.id, body.value); break; + case 'message': this.message(body); break; + case 'set': this.set(body.pos); break; + case 'check': this.check(body.crc32); break; + } + } + + @autobind + private async updateSettings(settings: any) { + const game = await ReversiGame.findOne({ _id: this.gameId }); + + if (game.isStarted) return; + if (!game.user1Id.equals(this.user._id) && !game.user2Id.equals(this.user._id)) return; + if (game.user1Id.equals(this.user._id) && game.user1Accepted) return; + if (game.user2Id.equals(this.user._id) && game.user2Accepted) return; + + await ReversiGame.update({ _id: this.gameId }, { + $set: { + settings + } + }); + + publishReversiGameStream(this.gameId, 'updateSettings', settings); + } + + @autobind + private async initForm(form: any) { + const game = await ReversiGame.findOne({ _id: this.gameId }); + + if (game.isStarted) return; + if (!game.user1Id.equals(this.user._id) && !game.user2Id.equals(this.user._id)) return; + + const set = game.user1Id.equals(this.user._id) ? { + form1: form + } : { + form2: form + }; + + await ReversiGame.update({ _id: this.gameId }, { + $set: set + }); + + publishReversiGameStream(this.gameId, 'initForm', { + userId: this.user._id, + form + }); + } + + @autobind + private async updateForm(id: string, value: any) { + const game = await ReversiGame.findOne({ _id: this.gameId }); + + if (game.isStarted) return; + if (!game.user1Id.equals(this.user._id) && !game.user2Id.equals(this.user._id)) return; + + const form = game.user1Id.equals(this.user._id) ? game.form2 : game.form1; + + const item = form.find((i: any) => i.id == id); + + if (item == null) return; + + item.value = value; + + const set = game.user1Id.equals(this.user._id) ? { + form2: form + } : { + form1: form + }; + + await ReversiGame.update({ _id: this.gameId }, { + $set: set + }); + + publishReversiGameStream(this.gameId, 'updateForm', { + userId: this.user._id, + id, + value + }); + } + + @autobind + private async message(message: any) { + message.id = Math.random(); + publishReversiGameStream(this.gameId, 'message', { + userId: this.user._id, + message + }); + } + + @autobind + private async accept(accept: boolean) { + const game = await ReversiGame.findOne({ _id: this.gameId }); + + if (game.isStarted) return; + + let bothAccepted = false; + + if (game.user1Id.equals(this.user._id)) { + await ReversiGame.update({ _id: this.gameId }, { + $set: { + user1Accepted: accept + } + }); + + publishReversiGameStream(this.gameId, 'changeAccepts', { + user1: accept, + user2: game.user2Accepted + }); + + if (accept && game.user2Accepted) bothAccepted = true; + } else if (game.user2Id.equals(this.user._id)) { + await ReversiGame.update({ _id: this.gameId }, { + $set: { + user2Accepted: accept + } + }); + + publishReversiGameStream(this.gameId, 'changeAccepts', { + user1: game.user1Accepted, + user2: accept + }); + + if (accept && game.user1Accepted) bothAccepted = true; + } else { + return; + } + + if (bothAccepted) { + // 3秒後、まだacceptされていたらゲーム開始 + setTimeout(async () => { + const freshGame = await ReversiGame.findOne({ _id: this.gameId }); + if (freshGame == null || freshGame.isStarted || freshGame.isEnded) return; + if (!freshGame.user1Accepted || !freshGame.user2Accepted) return; + + let bw: number; + if (freshGame.settings.bw == 'random') { + bw = Math.random() > 0.5 ? 1 : 2; + } else { + bw = freshGame.settings.bw as number; + } + + function getRandomMap() { + const mapCount = Object.entries(maps).length; + const rnd = Math.floor(Math.random() * mapCount); + return Object.values(maps)[rnd].data; + } + + const map = freshGame.settings.map != null ? freshGame.settings.map : getRandomMap(); + + await ReversiGame.update({ _id: this.gameId }, { + $set: { + startedAt: new Date(), + isStarted: true, + black: bw, + 'settings.map': map + } + }); + + //#region 盤面に最初から石がないなどして始まった瞬間に勝敗が決定する場合があるのでその処理 + const o = new Reversi(map, { + isLlotheo: freshGame.settings.isLlotheo, + canPutEverywhere: freshGame.settings.canPutEverywhere, + loopedBoard: freshGame.settings.loopedBoard + }); + + if (o.isEnded) { + let winner; + if (o.winner === true) { + winner = freshGame.black == 1 ? freshGame.user1Id : freshGame.user2Id; + } else if (o.winner === false) { + winner = freshGame.black == 1 ? freshGame.user2Id : freshGame.user1Id; + } else { + winner = null; + } + + await ReversiGame.update({ + _id: this.gameId + }, { + $set: { + isEnded: true, + winnerId: winner + } + }); + + publishReversiGameStream(this.gameId, 'ended', { + winnerId: winner, + game: await pack(this.gameId, this.user) + }); + } + //#endregion + + publishReversiGameStream(this.gameId, 'started', await pack(this.gameId, this.user)); + }, 3000); + } + } + + // 石を打つ + @autobind + private async set(pos: number) { + const game = await ReversiGame.findOne({ _id: this.gameId }); + + if (!game.isStarted) return; + if (game.isEnded) return; + if (!game.user1Id.equals(this.user._id) && !game.user2Id.equals(this.user._id)) return; + + const o = new Reversi(game.settings.map, { + isLlotheo: game.settings.isLlotheo, + canPutEverywhere: game.settings.canPutEverywhere, + loopedBoard: game.settings.loopedBoard + }); + + game.logs.forEach(log => { + o.put(log.color, log.pos); + }); + + const myColor = + (game.user1Id.equals(this.user._id) && game.black == 1) || (game.user2Id.equals(this.user._id) && game.black == 2) + ? true + : false; + + if (!o.canPut(myColor, pos)) return; + o.put(myColor, pos); + + let winner; + if (o.isEnded) { + if (o.winner === true) { + winner = game.black == 1 ? game.user1Id : game.user2Id; + } else if (o.winner === false) { + winner = game.black == 1 ? game.user2Id : game.user1Id; + } else { + winner = null; + } + } + + const log = { + at: new Date(), + color: myColor, + pos + }; + + const crc32 = CRC32.str(game.logs.map(x => x.pos.toString()).join('') + pos.toString()); + + await ReversiGame.update({ + _id: this.gameId + }, { + $set: { + crc32, + isEnded: o.isEnded, + winnerId: winner + }, + $push: { + logs: log + } + }); + + publishReversiGameStream(this.gameId, 'set', Object.assign(log, { + next: o.turn + })); + + if (o.isEnded) { + publishReversiGameStream(this.gameId, 'ended', { + winnerId: winner, + game: await pack(this.gameId, this.user) + }); + } + } + + @autobind + private async check(crc32: string) { + const game = await ReversiGame.findOne({ _id: this.gameId }); + + if (!game.isStarted) return; + + // 互換性のため + if (game.crc32 == null) return; + + if (crc32 !== game.crc32) { + this.send('rescue', await pack(game, this.user)); + } + } +} diff --git a/src/server/api/stream/channels/games/reversi.ts b/src/server/api/stream/channels/games/reversi.ts new file mode 100644 index 0000000000..d75025c944 --- /dev/null +++ b/src/server/api/stream/channels/games/reversi.ts @@ -0,0 +1,30 @@ +import autobind from 'autobind-decorator'; +import * as mongo from 'mongodb'; +import Matching, { pack } from '../../../../../models/games/reversi/matching'; +import { publishMainStream } from '../../../../../stream'; +import Channel from '../../channel'; + +export default class extends Channel { + @autobind + public async init(params: any) { + // Subscribe reversi stream + this.subscriber.on(`reversiStream:${this.user._id}`, data => { + this.send(data); + }); + } + + @autobind + public async onMessage(type: string, body: any) { + switch (type) { + case 'ping': + if (body.id == null) return; + const matching = await Matching.findOne({ + parentId: this.user._id, + childId: new mongo.ObjectID(body.id) + }); + if (matching == null) return; + publishMainStream(matching.childId, 'reversiInvited', await pack(matching, matching.childId)); + break; + } + } +} diff --git a/src/server/api/stream/channels/global-timeline.ts b/src/server/api/stream/channels/global-timeline.ts new file mode 100644 index 0000000000..ab0fe5d094 --- /dev/null +++ b/src/server/api/stream/channels/global-timeline.ts @@ -0,0 +1,39 @@ +import autobind from 'autobind-decorator'; +import Mute from '../../../../models/mute'; +import { pack } from '../../../../models/note'; +import shouldMuteThisNote from '../../../../misc/should-mute-this-note'; +import Channel from '../channel'; + +export default class extends Channel { + private mutedUserIds: string[] = []; + + @autobind + public async init(params: any) { + // Subscribe events + this.subscriber.on('globalTimeline', this.onNote); + + const mute = await Mute.find({ muterId: this.user._id }); + this.mutedUserIds = mute.map(m => m.muteeId.toString()); + } + + @autobind + private async onNote(note: any) { + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await pack(note.renoteId, this.user, { + detail: true + }); + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (shouldMuteThisNote(note, this.mutedUserIds)) return; + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('globalTimeline', this.onNote); + } +} diff --git a/src/server/api/stream/channels/hashtag.ts b/src/server/api/stream/channels/hashtag.ts new file mode 100644 index 0000000000..652b0caa5b --- /dev/null +++ b/src/server/api/stream/channels/hashtag.ts @@ -0,0 +1,33 @@ +import autobind from 'autobind-decorator'; +import Mute from '../../../../models/mute'; +import { pack } from '../../../../models/note'; +import shouldMuteThisNote from '../../../../misc/should-mute-this-note'; +import Channel from '../channel'; + +export default class extends Channel { + @autobind + public async init(params: any) { + const mute = this.user ? await Mute.find({ muterId: this.user._id }) : null; + const mutedUserIds = mute ? mute.map(m => m.muteeId.toString()) : []; + + const q: Array<string[]> = params.q; + + // Subscribe stream + this.subscriber.on('hashtag', async note => { + const matched = q.some(tags => tags.every(tag => note.tags.map((t: string) => t.toLowerCase()).includes(tag.toLowerCase()))); + if (!matched) return; + + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await pack(note.renoteId, this.user, { + detail: true + }); + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (shouldMuteThisNote(note, mutedUserIds)) return; + + this.send('note', note); + }); + } +} diff --git a/src/server/api/stream/channels/home-timeline.ts b/src/server/api/stream/channels/home-timeline.ts new file mode 100644 index 0000000000..4c674e75ef --- /dev/null +++ b/src/server/api/stream/channels/home-timeline.ts @@ -0,0 +1,39 @@ +import autobind from 'autobind-decorator'; +import Mute from '../../../../models/mute'; +import { pack } from '../../../../models/note'; +import shouldMuteThisNote from '../../../../misc/should-mute-this-note'; +import Channel from '../channel'; + +export default class extends Channel { + private mutedUserIds: string[] = []; + + @autobind + public async init(params: any) { + // Subscribe events + this.subscriber.on(`homeTimeline:${this.user._id}`, this.onNote); + + const mute = await Mute.find({ muterId: this.user._id }); + this.mutedUserIds = mute.map(m => m.muteeId.toString()); + } + + @autobind + private async onNote(note: any) { + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await pack(note.renoteId, this.user, { + detail: true + }); + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (shouldMuteThisNote(note, this.mutedUserIds)) return; + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off(`homeTimeline:${this.user._id}`, this.onNote); + } +} diff --git a/src/server/api/stream/channels/hybrid-timeline.ts b/src/server/api/stream/channels/hybrid-timeline.ts new file mode 100644 index 0000000000..0b12ab3a8f --- /dev/null +++ b/src/server/api/stream/channels/hybrid-timeline.ts @@ -0,0 +1,41 @@ +import autobind from 'autobind-decorator'; +import Mute from '../../../../models/mute'; +import { pack } from '../../../../models/note'; +import shouldMuteThisNote from '../../../../misc/should-mute-this-note'; +import Channel from '../channel'; + +export default class extends Channel { + private mutedUserIds: string[] = []; + + @autobind + public async init(params: any) { + // Subscribe events + this.subscriber.on('hybridTimeline', this.onNewNote); + this.subscriber.on(`hybridTimeline:${this.user._id}`, this.onNewNote); + + const mute = await Mute.find({ muterId: this.user._id }); + this.mutedUserIds = mute.map(m => m.muteeId.toString()); + } + + @autobind + private async onNewNote(note: any) { + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await pack(note.renoteId, this.user, { + detail: true + }); + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (shouldMuteThisNote(note, this.mutedUserIds)) return; + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('hybridTimeline', this.onNewNote); + this.subscriber.off(`hybridTimeline:${this.user._id}`, this.onNewNote); + } +} diff --git a/src/server/api/stream/channels/index.ts b/src/server/api/stream/channels/index.ts new file mode 100644 index 0000000000..7e71590d00 --- /dev/null +++ b/src/server/api/stream/channels/index.ts @@ -0,0 +1,31 @@ +import main from './main'; +import homeTimeline from './home-timeline'; +import localTimeline from './local-timeline'; +import hybridTimeline from './hybrid-timeline'; +import globalTimeline from './global-timeline'; +import notesStats from './notes-stats'; +import serverStats from './server-stats'; +import userList from './user-list'; +import messaging from './messaging'; +import messagingIndex from './messaging-index'; +import drive from './drive'; +import hashtag from './hashtag'; +import gamesReversi from './games/reversi'; +import gamesReversiGame from './games/reversi-game'; + +export default { + main, + homeTimeline, + localTimeline, + hybridTimeline, + globalTimeline, + notesStats, + serverStats, + userList, + messaging, + messagingIndex, + drive, + hashtag, + gamesReversi, + gamesReversiGame +}; diff --git a/src/server/api/stream/channels/local-timeline.ts b/src/server/api/stream/channels/local-timeline.ts new file mode 100644 index 0000000000..769ec6392f --- /dev/null +++ b/src/server/api/stream/channels/local-timeline.ts @@ -0,0 +1,39 @@ +import autobind from 'autobind-decorator'; +import Mute from '../../../../models/mute'; +import { pack } from '../../../../models/note'; +import shouldMuteThisNote from '../../../../misc/should-mute-this-note'; +import Channel from '../channel'; + +export default class extends Channel { + private mutedUserIds: string[] = []; + + @autobind + public async init(params: any) { + // Subscribe events + this.subscriber.on('localTimeline', this.onNote); + + const mute = this.user ? await Mute.find({ muterId: this.user._id }) : null; + this.mutedUserIds = mute ? mute.map(m => m.muteeId.toString()) : []; + } + + @autobind + private async onNote(note: any) { + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await pack(note.renoteId, this.user, { + detail: true + }); + } + + // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (shouldMuteThisNote(note, this.mutedUserIds)) return; + + this.send('note', note); + } + + @autobind + public dispose() { + // Unsubscribe events + this.subscriber.off('localTimeline', this.onNote); + } +} diff --git a/src/server/api/stream/channels/main.ts b/src/server/api/stream/channels/main.ts new file mode 100644 index 0000000000..a6c5b12760 --- /dev/null +++ b/src/server/api/stream/channels/main.ts @@ -0,0 +1,25 @@ +import autobind from 'autobind-decorator'; +import Mute from '../../../../models/mute'; +import Channel from '../channel'; + +export default class extends Channel { + @autobind + public async init(params: any) { + const mute = await Mute.find({ muterId: this.user._id }); + const mutedUserIds = mute.map(m => m.muteeId.toString()); + + // Subscribe main stream channel + this.subscriber.on(`mainStream:${this.user._id}`, async data => { + const { type, body } = data; + + switch (type) { + case 'notification': { + if (!mutedUserIds.includes(body.userId)) { + this.send('notification', body); + } + break; + } + } + }); + } +} diff --git a/src/server/api/stream/channels/messaging-index.ts b/src/server/api/stream/channels/messaging-index.ts new file mode 100644 index 0000000000..6e87cca7f4 --- /dev/null +++ b/src/server/api/stream/channels/messaging-index.ts @@ -0,0 +1,12 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; + +export default class extends Channel { + @autobind + public async init(params: any) { + // Subscribe messaging index stream + this.subscriber.on(`messagingIndexStream:${this.user._id}`, data => { + this.send(data); + }); + } +} diff --git a/src/server/api/stream/channels/messaging.ts b/src/server/api/stream/channels/messaging.ts new file mode 100644 index 0000000000..e1a78c8678 --- /dev/null +++ b/src/server/api/stream/channels/messaging.ts @@ -0,0 +1,26 @@ +import autobind from 'autobind-decorator'; +import read from '../../common/read-messaging-message'; +import Channel from '../channel'; + +export default class extends Channel { + private otherpartyId: string; + + @autobind + public async init(params: any) { + this.otherpartyId = params.otherparty as string; + + // Subscribe messaging stream + this.subscriber.on(`messagingStream:${this.user._id}-${this.otherpartyId}`, data => { + this.send(data); + }); + } + + @autobind + public onMessage(type: string, body: any) { + switch (type) { + case 'read': + read(this.user._id, this.otherpartyId, body.id); + break; + } + } +} diff --git a/src/server/api/stream/channels/notes-stats.ts b/src/server/api/stream/channels/notes-stats.ts new file mode 100644 index 0000000000..cc68d9886d --- /dev/null +++ b/src/server/api/stream/channels/notes-stats.ts @@ -0,0 +1,34 @@ +import autobind from 'autobind-decorator'; +import Xev from 'xev'; +import Channel from '../channel'; + +const ev = new Xev(); + +export default class extends Channel { + @autobind + public async init(params: any) { + ev.addListener('notesStats', this.onStats); + } + + @autobind + private onStats(stats: any) { + this.send('stats', stats); + } + + @autobind + public onMessage(type: string, body: any) { + switch (type) { + case 'requestLog': + ev.once(`notesStatsLog:${body.id}`, statsLog => { + this.send('statsLog', statsLog); + }); + ev.emit('requestNotesStatsLog', body.id); + break; + } + } + + @autobind + public dispose() { + ev.removeListener('notesStats', this.onStats); + } +} diff --git a/src/server/api/stream/channels/server-stats.ts b/src/server/api/stream/channels/server-stats.ts new file mode 100644 index 0000000000..28a566e8ae --- /dev/null +++ b/src/server/api/stream/channels/server-stats.ts @@ -0,0 +1,37 @@ +import autobind from 'autobind-decorator'; +import Xev from 'xev'; +import Channel from '../channel'; + +const ev = new Xev(); + +export default class extends Channel { + @autobind + public async init(params: any) { + ev.addListener('serverStats', this.onStats); + } + + @autobind + private onStats(stats: any) { + this.send('stats', stats); + } + + @autobind + public onMessage(type: string, body: any) { + switch (type) { + case 'requestLog': + ev.once(`serverStatsLog:${body.id}`, statsLog => { + this.send('statsLog', statsLog); + }); + ev.emit('requestServerStatsLog', { + id: body.id, + length: body.length + }); + break; + } + } + + @autobind + public dispose() { + ev.removeListener('serverStats', this.onStats); + } +} diff --git a/src/server/api/stream/channels/user-list.ts b/src/server/api/stream/channels/user-list.ts new file mode 100644 index 0000000000..4ace308923 --- /dev/null +++ b/src/server/api/stream/channels/user-list.ts @@ -0,0 +1,14 @@ +import autobind from 'autobind-decorator'; +import Channel from '../channel'; + +export default class extends Channel { + @autobind + public async init(params: any) { + const listId = params.listId as string; + + // Subscribe stream + this.subscriber.on(`userListStream:${listId}`, data => { + this.send(data); + }); + } +} diff --git a/src/server/api/stream/drive.ts b/src/server/api/stream/drive.ts deleted file mode 100644 index 28c241e1bc..0000000000 --- a/src/server/api/stream/drive.ts +++ /dev/null @@ -1,9 +0,0 @@ -import * as websocket from 'websocket'; -import Xev from 'xev'; - -export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { - // Subscribe drive stream - subscriber.on(`drive-stream:${user._id}`, data => { - connection.send(JSON.stringify(data)); - }); -} diff --git a/src/server/api/stream/games/reversi-game.ts b/src/server/api/stream/games/reversi-game.ts deleted file mode 100644 index 5cbbf42d59..0000000000 --- a/src/server/api/stream/games/reversi-game.ts +++ /dev/null @@ -1,332 +0,0 @@ -import * as websocket from 'websocket'; -import Xev from 'xev'; -import * as CRC32 from 'crc-32'; -import ReversiGame, { pack } from '../../../../models/games/reversi/game'; -import { publishReversiGameStream } from '../../../../stream'; -import Reversi from '../../../../games/reversi/core'; -import * as maps from '../../../../games/reversi/maps'; -import { ParsedUrlQuery } from 'querystring'; - -export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user?: any): void { - const q = request.resourceURL.query as ParsedUrlQuery; - const gameId = q.game as string; - - // Subscribe game stream - subscriber.on(`reversi-game-stream:${gameId}`, data => { - connection.send(JSON.stringify(data)); - }); - - connection.on('message', async (data) => { - const msg = JSON.parse(data.utf8Data); - - switch (msg.type) { - case 'accept': - accept(true); - break; - - case 'cancel-accept': - accept(false); - break; - - case 'update-settings': - if (msg.settings == null) return; - updateSettings(msg.settings); - break; - - case 'init-form': - if (msg.body == null) return; - initForm(msg.body); - break; - - case 'update-form': - if (msg.id == null || msg.value === undefined) return; - updateForm(msg.id, msg.value); - break; - - case 'message': - if (msg.body == null) return; - message(msg.body); - break; - - case 'set': - if (msg.pos == null) return; - set(msg.pos); - break; - - case 'check': - if (msg.crc32 == null) return; - check(msg.crc32); - break; - } - }); - - async function updateSettings(settings: any) { - const game = await ReversiGame.findOne({ _id: gameId }); - - if (game.isStarted) return; - if (!game.user1Id.equals(user._id) && !game.user2Id.equals(user._id)) return; - if (game.user1Id.equals(user._id) && game.user1Accepted) return; - if (game.user2Id.equals(user._id) && game.user2Accepted) return; - - await ReversiGame.update({ _id: gameId }, { - $set: { - settings - } - }); - - publishReversiGameStream(gameId, 'update-settings', settings); - } - - async function initForm(form: any) { - const game = await ReversiGame.findOne({ _id: gameId }); - - if (game.isStarted) return; - if (!game.user1Id.equals(user._id) && !game.user2Id.equals(user._id)) return; - - const set = game.user1Id.equals(user._id) ? { - form1: form - } : { - form2: form - }; - - await ReversiGame.update({ _id: gameId }, { - $set: set - }); - - publishReversiGameStream(gameId, 'init-form', { - userId: user._id, - form - }); - } - - async function updateForm(id: string, value: any) { - const game = await ReversiGame.findOne({ _id: gameId }); - - if (game.isStarted) return; - if (!game.user1Id.equals(user._id) && !game.user2Id.equals(user._id)) return; - - const form = game.user1Id.equals(user._id) ? game.form2 : game.form1; - - const item = form.find((i: any) => i.id == id); - - if (item == null) return; - - item.value = value; - - const set = game.user1Id.equals(user._id) ? { - form2: form - } : { - form1: form - }; - - await ReversiGame.update({ _id: gameId }, { - $set: set - }); - - publishReversiGameStream(gameId, 'update-form', { - userId: user._id, - id, - value - }); - } - - async function message(message: any) { - message.id = Math.random(); - publishReversiGameStream(gameId, 'message', { - userId: user._id, - message - }); - } - - async function accept(accept: boolean) { - const game = await ReversiGame.findOne({ _id: gameId }); - - if (game.isStarted) return; - - let bothAccepted = false; - - if (game.user1Id.equals(user._id)) { - await ReversiGame.update({ _id: gameId }, { - $set: { - user1Accepted: accept - } - }); - - publishReversiGameStream(gameId, 'change-accepts', { - user1: accept, - user2: game.user2Accepted - }); - - if (accept && game.user2Accepted) bothAccepted = true; - } else if (game.user2Id.equals(user._id)) { - await ReversiGame.update({ _id: gameId }, { - $set: { - user2Accepted: accept - } - }); - - publishReversiGameStream(gameId, 'change-accepts', { - user1: game.user1Accepted, - user2: accept - }); - - if (accept && game.user1Accepted) bothAccepted = true; - } else { - return; - } - - if (bothAccepted) { - // 3秒後、まだacceptされていたらゲーム開始 - setTimeout(async () => { - const freshGame = await ReversiGame.findOne({ _id: gameId }); - if (freshGame == null || freshGame.isStarted || freshGame.isEnded) return; - if (!freshGame.user1Accepted || !freshGame.user2Accepted) return; - - let bw: number; - if (freshGame.settings.bw == 'random') { - bw = Math.random() > 0.5 ? 1 : 2; - } else { - bw = freshGame.settings.bw as number; - } - - function getRandomMap() { - const mapCount = Object.entries(maps).length; - const rnd = Math.floor(Math.random() * mapCount); - return Object.values(maps)[rnd].data; - } - - const map = freshGame.settings.map != null ? freshGame.settings.map : getRandomMap(); - - await ReversiGame.update({ _id: gameId }, { - $set: { - startedAt: new Date(), - isStarted: true, - black: bw, - 'settings.map': map - } - }); - - //#region 盤面に最初から石がないなどして始まった瞬間に勝敗が決定する場合があるのでその処理 - const o = new Reversi(map, { - isLlotheo: freshGame.settings.isLlotheo, - canPutEverywhere: freshGame.settings.canPutEverywhere, - loopedBoard: freshGame.settings.loopedBoard - }); - - if (o.isEnded) { - let winner; - if (o.winner === true) { - winner = freshGame.black == 1 ? freshGame.user1Id : freshGame.user2Id; - } else if (o.winner === false) { - winner = freshGame.black == 1 ? freshGame.user2Id : freshGame.user1Id; - } else { - winner = null; - } - - await ReversiGame.update({ - _id: gameId - }, { - $set: { - isEnded: true, - winnerId: winner - } - }); - - publishReversiGameStream(gameId, 'ended', { - winnerId: winner, - game: await pack(gameId, user) - }); - } - //#endregion - - publishReversiGameStream(gameId, 'started', await pack(gameId, user)); - }, 3000); - } - } - - // 石を打つ - async function set(pos: number) { - const game = await ReversiGame.findOne({ _id: gameId }); - - if (!game.isStarted) return; - if (game.isEnded) return; - if (!game.user1Id.equals(user._id) && !game.user2Id.equals(user._id)) return; - - const o = new Reversi(game.settings.map, { - isLlotheo: game.settings.isLlotheo, - canPutEverywhere: game.settings.canPutEverywhere, - loopedBoard: game.settings.loopedBoard - }); - - game.logs.forEach(log => { - o.put(log.color, log.pos); - }); - - const myColor = - (game.user1Id.equals(user._id) && game.black == 1) || (game.user2Id.equals(user._id) && game.black == 2) - ? true - : false; - - if (!o.canPut(myColor, pos)) return; - o.put(myColor, pos); - - let winner; - if (o.isEnded) { - if (o.winner === true) { - winner = game.black == 1 ? game.user1Id : game.user2Id; - } else if (o.winner === false) { - winner = game.black == 1 ? game.user2Id : game.user1Id; - } else { - winner = null; - } - } - - const log = { - at: new Date(), - color: myColor, - pos - }; - - const crc32 = CRC32.str(game.logs.map(x => x.pos.toString()).join('') + pos.toString()); - - await ReversiGame.update({ - _id: gameId - }, { - $set: { - crc32, - isEnded: o.isEnded, - winnerId: winner - }, - $push: { - logs: log - } - }); - - publishReversiGameStream(gameId, 'set', Object.assign(log, { - next: o.turn - })); - - if (o.isEnded) { - publishReversiGameStream(gameId, 'ended', { - winnerId: winner, - game: await pack(gameId, user) - }); - } - } - - async function check(crc32: string) { - const game = await ReversiGame.findOne({ _id: gameId }); - - if (!game.isStarted) return; - - // 互換性のため - if (game.crc32 == null) return; - - if (crc32 !== game.crc32) { - connection.send(JSON.stringify({ - type: 'rescue', - body: await pack(game, user) - })); - } - } -} diff --git a/src/server/api/stream/games/reversi.ts b/src/server/api/stream/games/reversi.ts deleted file mode 100644 index f467613b21..0000000000 --- a/src/server/api/stream/games/reversi.ts +++ /dev/null @@ -1,28 +0,0 @@ -import * as mongo from 'mongodb'; -import * as websocket from 'websocket'; -import Xev from 'xev'; -import Matching, { pack } from '../../../../models/games/reversi/matching'; -import { publishUserStream } from '../../../../stream'; - -export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { - // Subscribe reversi stream - subscriber.on(`reversi-stream:${user._id}`, data => { - connection.send(JSON.stringify(data)); - }); - - connection.on('message', async (data) => { - const msg = JSON.parse(data.utf8Data); - - switch (msg.type) { - case 'ping': - if (msg.id == null) return; - const matching = await Matching.findOne({ - parentId: user._id, - childId: new mongo.ObjectID(msg.id) - }); - if (matching == null) return; - publishUserStream(matching.childId, 'reversi_invited', await pack(matching, matching.childId)); - break; - } - }); -} diff --git a/src/server/api/stream/global-timeline.ts b/src/server/api/stream/global-timeline.ts deleted file mode 100644 index 03852fb181..0000000000 --- a/src/server/api/stream/global-timeline.ts +++ /dev/null @@ -1,27 +0,0 @@ -import * as websocket from 'websocket'; -import Xev from 'xev'; - -import { IUser } from '../../../models/user'; -import Mute from '../../../models/mute'; -import shouldMuteThisNote from '../../../misc/should-mute-this-note'; - -export default async function( - request: websocket.request, - connection: websocket.connection, - subscriber: Xev, - user: IUser -) { - const mute = await Mute.find({ muterId: user._id }); - const mutedUserIds = mute.map(m => m.muteeId.toString()); - - // Subscribe stream - subscriber.on('global-timeline', async note => { - // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する - if (shouldMuteThisNote(note, mutedUserIds)) return; - - connection.send(JSON.stringify({ - type: 'note', - body: note - })); - }); -} diff --git a/src/server/api/stream/hashtag.ts b/src/server/api/stream/hashtag.ts deleted file mode 100644 index 54da4f9ad9..0000000000 --- a/src/server/api/stream/hashtag.ts +++ /dev/null @@ -1,40 +0,0 @@ -import * as websocket from 'websocket'; -import Xev from 'xev'; - -import { IUser } from '../../../models/user'; -import Mute from '../../../models/mute'; -import { pack } from '../../../models/note'; -import shouldMuteThisNote from '../../../misc/should-mute-this-note'; - -export default async function( - request: websocket.request, - connection: websocket.connection, - subscriber: Xev, - user?: IUser -) { - const mute = user ? await Mute.find({ muterId: user._id }) : null; - const mutedUserIds = mute ? mute.map(m => m.muteeId.toString()) : []; - - const q: Array<string[]> = JSON.parse((request.resourceURL.query as any).q); - - // Subscribe stream - subscriber.on('hashtag', async note => { - const matched = q.some(tags => tags.every(tag => note.tags.map((t: string) => t.toLowerCase()).includes(tag.toLowerCase()))); - if (!matched) return; - - // Renoteなら再pack - if (note.renoteId != null) { - note.renote = await pack(note.renoteId, user, { - detail: true - }); - } - - // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する - if (shouldMuteThisNote(note, mutedUserIds)) return; - - connection.send(JSON.stringify({ - type: 'note', - body: note - })); - }); -} diff --git a/src/server/api/stream/home.ts b/src/server/api/stream/home.ts deleted file mode 100644 index 5575d0d523..0000000000 --- a/src/server/api/stream/home.ts +++ /dev/null @@ -1,110 +0,0 @@ -import * as websocket from 'websocket'; -import Xev from 'xev'; -import * as debug from 'debug'; - -import User, { IUser } from '../../../models/user'; -import Mute from '../../../models/mute'; -import { pack as packNote, pack } from '../../../models/note'; -import readNotification from '../common/read-notification'; -import call from '../call'; -import { IApp } from '../../../models/app'; -import shouldMuteThisNote from '../../../misc/should-mute-this-note'; -import readNote from '../../../services/note/read'; - -const log = debug('misskey'); - -export default async function( - request: websocket.request, - connection: websocket.connection, - subscriber: Xev, - user: IUser, - app: IApp -) { - const mute = await Mute.find({ muterId: user._id }); - const mutedUserIds = mute.map(m => m.muteeId.toString()); - - async function onNoteStream(noteId: any) { - const note = await packNote(noteId, user, { - detail: true - }); - - connection.send(JSON.stringify({ - type: 'note-updated', - body: { - note: note - } - })); - } - - // Subscribe Home stream channel - subscriber.on(`user-stream:${user._id}`, async x => { - // Renoteなら再pack - if (x.type == 'note' && x.body.renoteId != null) { - x.body.renote = await pack(x.body.renoteId, user, { - detail: true - }); - } - - //#region 流れてきたメッセージがミュートしているユーザーが関わるものだったら無視する - if (x.type == 'note') { - if (shouldMuteThisNote(x.body, mutedUserIds)) return; - } else if (x.type == 'notification') { - if (mutedUserIds.includes(x.body.userId)) { - return; - } - } - //#endregion - - connection.send(JSON.stringify(x)); - }); - - connection.on('message', async data => { - const msg = JSON.parse(data.utf8Data); - - switch (msg.type) { - case 'api': - // 新鮮なデータを利用するためにユーザーをフェッチ - call(msg.endpoint, await User.findOne({ _id: user._id }), app, msg.data).then(res => { - connection.send(JSON.stringify({ - type: `api-res:${msg.id}`, - body: { res } - })); - }).catch(e => { - connection.send(JSON.stringify({ - type: `api-res:${msg.id}`, - body: { e } - })); - }); - break; - - case 'alive': - // Update lastUsedAt - User.update({ _id: user._id }, { - $set: { - 'lastUsedAt': new Date() - } - }); - break; - - case 'read_notification': - if (!msg.id) return; - readNotification(user._id, msg.id); - break; - - case 'capture': - if (!msg.id) return; - log(`CAPTURE: ${msg.id} by @${user.username}`); - subscriber.on(`note-stream:${msg.id}`, onNoteStream); - if (msg.read) { - readNote(user._id, msg.id); - } - break; - - case 'decapture': - if (!msg.id) return; - log(`DECAPTURE: ${msg.id} by @${user.username}`); - subscriber.off(`note-stream:${msg.id}`, onNoteStream); - break; - } - }); -} diff --git a/src/server/api/stream/hybrid-timeline.ts b/src/server/api/stream/hybrid-timeline.ts deleted file mode 100644 index 045b822783..0000000000 --- a/src/server/api/stream/hybrid-timeline.ts +++ /dev/null @@ -1,38 +0,0 @@ -import * as websocket from 'websocket'; -import Xev from 'xev'; - -import { IUser } from '../../../models/user'; -import Mute from '../../../models/mute'; -import { pack } from '../../../models/note'; -import shouldMuteThisNote from '../../../misc/should-mute-this-note'; - -export default async function( - request: websocket.request, - connection: websocket.connection, - subscriber: Xev, - user: IUser -) { - const mute = await Mute.find({ muterId: user._id }); - const mutedUserIds = mute.map(m => m.muteeId.toString()); - - // Subscribe stream - subscriber.on('hybrid-timeline', onEvent); - subscriber.on(`hybrid-timeline:${user._id}`, onEvent); - - async function onEvent(note: any) { - // Renoteなら再pack - if (note.renoteId != null) { - note.renote = await pack(note.renoteId, user, { - detail: true - }); - } - - // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する - if (shouldMuteThisNote(note, mutedUserIds)) return; - - connection.send(JSON.stringify({ - type: 'note', - body: note - })); - } -} diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts new file mode 100644 index 0000000000..bd99f2755e --- /dev/null +++ b/src/server/api/stream/index.ts @@ -0,0 +1,213 @@ +import autobind from 'autobind-decorator'; +import * as websocket from 'websocket'; +import Xev from 'xev'; +import * as debug from 'debug'; + +import User, { IUser } from '../../../models/user'; +import readNotification from '../common/read-notification'; +import call from '../call'; +import { IApp } from '../../../models/app'; +import readNote from '../../../services/note/read'; + +import Channel from './channel'; +import channels from './channels'; + +const log = debug('misskey'); + +/** + * Main stream connection + */ +export default class Connection { + public user?: IUser; + public app: IApp; + private wsConnection: websocket.connection; + public subscriber: Xev; + private channels: Channel[] = []; + private subscribingNotes: any = {}; + + constructor( + wsConnection: websocket.connection, + subscriber: Xev, + user: IUser, + app: IApp + ) { + this.wsConnection = wsConnection; + this.user = user; + this.app = app; + this.subscriber = subscriber; + + this.wsConnection.on('message', this.onWsConnectionMessage); + } + + /** + * クライアントからメッセージ受信時 + */ + @autobind + private async onWsConnectionMessage(data: websocket.IMessage) { + const { type, body } = JSON.parse(data.utf8Data); + + switch (type) { + case 'api': this.onApiRequest(body); break; + case 'alive': this.onAlive(); break; + case 'readNotification': this.onReadNotification(body); break; + case 'subNote': this.onSubscribeNote(body); break; + case 'sn': this.onSubscribeNote(body); break; // alias + case 'unsubNote': this.onUnsubscribeNote(body); break; + case 'un': this.onUnsubscribeNote(body); break; // alias + case 'connect': this.onChannelConnectRequested(body); break; + case 'disconnect': this.onChannelDisconnectRequested(body); break; + case 'channel': this.onChannelMessageRequested(body); break; + } + } + + /** + * APIリクエスト要求時 + */ + @autobind + private async onApiRequest(payload: any) { + // 新鮮なデータを利用するためにユーザーをフェッチ + const user = this.user ? await User.findOne({ _id: this.user._id }) : null; + + const endpoint = payload.endpoint || payload.ep; // alias + + // 呼び出し + call(endpoint, user, this.app, payload.data).then(res => { + this.sendMessageToWs(`api:${payload.id}`, { res }); + }).catch(e => { + this.sendMessageToWs(`api:${payload.id}`, { e }); + }); + } + + @autobind + private onAlive() { + // Update lastUsedAt + User.update({ _id: this.user._id }, { + $set: { + 'lastUsedAt': new Date() + } + }); + } + + @autobind + private onReadNotification(payload: any) { + if (!payload.id) return; + readNotification(this.user._id, payload.id); + } + + /** + * 投稿購読要求時 + */ + @autobind + private onSubscribeNote(payload: any) { + if (!payload.id) return; + + if (this.subscribingNotes[payload.id] == null) { + this.subscribingNotes[payload.id] = 0; + } + + this.subscribingNotes[payload.id]++; + + if (this.subscribingNotes[payload.id] == 1) { + this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage); + } + + if (payload.read) { + readNote(this.user._id, payload.id); + } + } + + /** + * 投稿購読解除要求時 + */ + @autobind + private onUnsubscribeNote(payload: any) { + if (!payload.id) return; + + this.subscribingNotes[payload.id]--; + if (this.subscribingNotes[payload.id] <= 0) { + delete this.subscribingNotes[payload.id]; + this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage); + } + } + + @autobind + private async onNoteStreamMessage(data: any) { + this.sendMessageToWs('noteUpdated', { + id: data.body.id, + type: data.type, + body: data.body.body, + }); + } + + /** + * チャンネル接続要求時 + */ + @autobind + private onChannelConnectRequested(payload: any) { + const { channel, id, params } = payload; + log(`CH CONNECT: ${id} ${channel} by @${this.user.username}`); + this.connectChannel(id, params, (channels as any)[channel]); + } + + /** + * チャンネル切断要求時 + */ + @autobind + private onChannelDisconnectRequested(payload: any) { + const { id } = payload; + log(`CH DISCONNECT: ${id} by @${this.user.username}`); + this.disconnectChannel(id); + } + + /** + * クライアントにメッセージ送信 + */ + @autobind + public sendMessageToWs(type: string, payload: any) { + this.wsConnection.send(JSON.stringify({ + type: type, + body: payload + })); + } + + /** + * チャンネルに接続 + */ + @autobind + private connectChannel(id: string, params: any, channelClass: { new(id: string, connection: Connection): Channel }) { + const channel = new channelClass(id, this); + this.channels.push(channel); + channel.init(params); + } + + /** + * チャンネルから切断 + */ + @autobind + private disconnectChannel(id: string) { + const channel = this.channels.find(c => c.id === id); + + if (channel) { + if (channel.dispose) channel.dispose(); + this.channels = this.channels.filter(c => c.id !== id); + } + } + + @autobind + private onChannelMessageRequested(data: any) { + const channel = this.channels.find(c => c.id === data.id); + if (channel != null && channel.onMessage != null) { + channel.onMessage(data.type, data.body); + } + } + + /** + * ストリームが切れたとき + */ + @autobind + public dispose() { + this.channels.forEach(c => { + if (c.dispose) c.dispose(); + }); + } +} diff --git a/src/server/api/stream/local-timeline.ts b/src/server/api/stream/local-timeline.ts deleted file mode 100644 index ae054a5f9f..0000000000 --- a/src/server/api/stream/local-timeline.ts +++ /dev/null @@ -1,35 +0,0 @@ -import * as websocket from 'websocket'; -import Xev from 'xev'; - -import { IUser } from '../../../models/user'; -import Mute from '../../../models/mute'; -import { pack } from '../../../models/note'; -import shouldMuteThisNote from '../../../misc/should-mute-this-note'; - -export default async function( - request: websocket.request, - connection: websocket.connection, - subscriber: Xev, - user?: IUser -) { - const mute = user ? await Mute.find({ muterId: user._id }) : null; - const mutedUserIds = mute ? mute.map(m => m.muteeId.toString()) : []; - - // Subscribe stream - subscriber.on('local-timeline', async note => { - // Renoteなら再pack - if (note.renoteId != null) { - note.renote = await pack(note.renoteId, user, { - detail: true - }); - } - - // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する - if (shouldMuteThisNote(note, mutedUserIds)) return; - - connection.send(JSON.stringify({ - type: 'note', - body: note - })); - }); -} diff --git a/src/server/api/stream/messaging-index.ts b/src/server/api/stream/messaging-index.ts deleted file mode 100644 index 9af63f2812..0000000000 --- a/src/server/api/stream/messaging-index.ts +++ /dev/null @@ -1,9 +0,0 @@ -import * as websocket from 'websocket'; -import Xev from 'xev'; - -export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { - // Subscribe messaging index stream - subscriber.on(`messaging-index-stream:${user._id}`, data => { - connection.send(JSON.stringify(data)); - }); -} diff --git a/src/server/api/stream/messaging.ts b/src/server/api/stream/messaging.ts deleted file mode 100644 index 8b352cea3c..0000000000 --- a/src/server/api/stream/messaging.ts +++ /dev/null @@ -1,25 +0,0 @@ -import * as websocket from 'websocket'; -import Xev from 'xev'; -import read from '../common/read-messaging-message'; -import { ParsedUrlQuery } from 'querystring'; - -export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { - const q = request.resourceURL.query as ParsedUrlQuery; - const otherparty = q.otherparty as string; - - // Subscribe messaging stream - subscriber.on(`messaging-stream:${user._id}-${otherparty}`, data => { - connection.send(JSON.stringify(data)); - }); - - connection.on('message', async (data) => { - const msg = JSON.parse(data.utf8Data); - - switch (msg.type) { - case 'read': - if (!msg.id) return; - read(user._id, otherparty, msg.id); - break; - } - }); -} diff --git a/src/server/api/stream/notes-stats.ts b/src/server/api/stream/notes-stats.ts deleted file mode 100644 index ba99403226..0000000000 --- a/src/server/api/stream/notes-stats.ts +++ /dev/null @@ -1,35 +0,0 @@ -import * as websocket from 'websocket'; -import Xev from 'xev'; - -const ev = new Xev(); - -export default function(request: websocket.request, connection: websocket.connection): void { - const onStats = (stats: any) => { - connection.send(JSON.stringify({ - type: 'stats', - body: stats - })); - }; - - connection.on('message', async data => { - const msg = JSON.parse(data.utf8Data); - - switch (msg.type) { - case 'requestLog': - ev.once(`notesStatsLog:${msg.id}`, statsLog => { - connection.send(JSON.stringify({ - type: 'statsLog', - body: statsLog - })); - }); - ev.emit('requestNotesStatsLog', msg.id); - break; - } - }); - - ev.addListener('notesStats', onStats); - - connection.on('close', () => { - ev.removeListener('notesStats', onStats); - }); -} diff --git a/src/server/api/stream/server-stats.ts b/src/server/api/stream/server-stats.ts deleted file mode 100644 index d4fbeefa04..0000000000 --- a/src/server/api/stream/server-stats.ts +++ /dev/null @@ -1,38 +0,0 @@ -import * as websocket from 'websocket'; -import Xev from 'xev'; - -const ev = new Xev(); - -export default function(request: websocket.request, connection: websocket.connection): void { - const onStats = (stats: any) => { - connection.send(JSON.stringify({ - type: 'stats', - body: stats - })); - }; - - connection.on('message', async data => { - const msg = JSON.parse(data.utf8Data); - - switch (msg.type) { - case 'requestLog': - ev.once(`serverStatsLog:${msg.id}`, statsLog => { - connection.send(JSON.stringify({ - type: 'statsLog', - body: statsLog - })); - }); - ev.emit('requestServerStatsLog', { - id: msg.id, - length: msg.length - }); - break; - } - }); - - ev.addListener('serverStats', onStats); - - connection.on('close', () => { - ev.removeListener('serverStats', onStats); - }); -} diff --git a/src/server/api/stream/user-list.ts b/src/server/api/stream/user-list.ts deleted file mode 100644 index 30f94d5251..0000000000 --- a/src/server/api/stream/user-list.ts +++ /dev/null @@ -1,13 +0,0 @@ -import * as websocket from 'websocket'; -import Xev from 'xev'; -import { ParsedUrlQuery } from 'querystring'; - -export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { - const q = request.resourceURL.query as ParsedUrlQuery; - const listId = q.listId as string; - - // Subscribe stream - subscriber.on(`user-list-stream:${listId}`, data => { - connection.send(JSON.stringify(data)); - }); -} diff --git a/src/server/api/streaming.ts b/src/server/api/streaming.ts index 09ec23a743..4518d21c3f 100644 --- a/src/server/api/streaming.ts +++ b/src/server/api/streaming.ts @@ -2,26 +2,12 @@ import * as http from 'http'; import * as websocket from 'websocket'; import Xev from 'xev'; -import homeStream from './stream/home'; -import localTimelineStream from './stream/local-timeline'; -import hybridTimelineStream from './stream/hybrid-timeline'; -import globalTimelineStream from './stream/global-timeline'; -import userListStream from './stream/user-list'; -import driveStream from './stream/drive'; -import messagingStream from './stream/messaging'; -import messagingIndexStream from './stream/messaging-index'; -import reversiGameStream from './stream/games/reversi-game'; -import reversiStream from './stream/games/reversi'; -import serverStatsStream from './stream/server-stats'; -import notesStatsStream from './stream/notes-stats'; -import hashtagStream from './stream/hashtag'; +import MainStreamConnection from './stream'; import { ParsedUrlQuery } from 'querystring'; import authenticate from './authenticate'; module.exports = (server: http.Server) => { - /** - * Init websocket server - */ + // Init websocket server const ws = new websocket.server({ httpServer: server }); @@ -29,20 +15,16 @@ module.exports = (server: http.Server) => { ws.on('request', async (request) => { const connection = request.accept(); - if (request.resourceURL.pathname === '/server-stats') { - serverStatsStream(request, connection); - return; - } + const ev = new Xev(); - if (request.resourceURL.pathname === '/notes-stats') { - notesStatsStream(request, connection); - return; - } + const q = request.resourceURL.query as ParsedUrlQuery; + const [user, app] = await authenticate(q.i as string); - const ev = new Xev(); + const main = new MainStreamConnection(connection, ev, user, app); connection.once('close', () => { ev.removeAllListeners(); + main.dispose(); }); connection.on('message', async (data) => { @@ -50,46 +32,5 @@ module.exports = (server: http.Server) => { connection.send('pong'); } }); - - const q = request.resourceURL.query as ParsedUrlQuery; - const [user, app] = await authenticate(q.i as string); - - if (request.resourceURL.pathname === '/games/reversi-game') { - reversiGameStream(request, connection, ev, user); - return; - } - - if (request.resourceURL.pathname === '/local-timeline') { - localTimelineStream(request, connection, ev, user); - return; - } - - if (request.resourceURL.pathname === '/hashtag') { - hashtagStream(request, connection, ev, user); - return; - } - - if (user == null) { - connection.send('authentication-failed'); - connection.close(); - return; - } - - const channel: any = - request.resourceURL.pathname === '/' ? homeStream : - request.resourceURL.pathname === '/hybrid-timeline' ? hybridTimelineStream : - request.resourceURL.pathname === '/global-timeline' ? globalTimelineStream : - request.resourceURL.pathname === '/user-list' ? userListStream : - request.resourceURL.pathname === '/drive' ? driveStream : - request.resourceURL.pathname === '/messaging' ? messagingStream : - request.resourceURL.pathname === '/messaging-index' ? messagingIndexStream : - request.resourceURL.pathname === '/games/reversi' ? reversiStream : - null; - - if (channel !== null) { - channel(request, connection, ev, user, app); - } else { - connection.close(); - } }); }; |