From 83d9730d93b976c2477808e1a6275937defff300 Mon Sep 17 00:00:00 2001 From: syuilo Date: Mon, 30 Jul 2018 07:20:27 +0900 Subject: #2020 --- src/server/api/stream/drive.ts | 9 ++- src/server/api/stream/games/reversi-game.ts | 11 ++- src/server/api/stream/games/reversi.ts | 11 ++- src/server/api/stream/global-timeline.ts | 12 ++-- src/server/api/stream/home.ts | 103 +++++++++++++--------------- src/server/api/stream/hybrid-timeline.ts | 13 ++-- src/server/api/stream/local-timeline.ts | 12 ++-- src/server/api/stream/messaging-index.ts | 9 ++- src/server/api/stream/messaging.ts | 9 ++- src/server/api/stream/user-list.ts | 7 +- 10 files changed, 86 insertions(+), 110 deletions(-) (limited to 'src/server/api/stream') diff --git a/src/server/api/stream/drive.ts b/src/server/api/stream/drive.ts index c97ab80dcc..28c241e1bc 100644 --- a/src/server/api/stream/drive.ts +++ b/src/server/api/stream/drive.ts @@ -1,10 +1,9 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; -export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void { +export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { // Subscribe drive stream - subscriber.subscribe(`misskey:drive-stream:${user._id}`); - subscriber.on('message', (_, data) => { - connection.send(data); + subscriber.on(`drive-stream:${user._id}`, data => { + connection.send(JSON.stringify(data)); }); } diff --git a/src/server/api/stream/games/reversi-game.ts b/src/server/api/stream/games/reversi-game.ts index da949e90ff..5cbbf42d59 100644 --- a/src/server/api/stream/games/reversi-game.ts +++ b/src/server/api/stream/games/reversi-game.ts @@ -1,5 +1,5 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import * as CRC32 from 'crc-32'; import ReversiGame, { pack } from '../../../../models/games/reversi/game'; import { publishReversiGameStream } from '../../../../stream'; @@ -7,14 +7,13 @@ import Reversi from '../../../../games/reversi/core'; import * as maps from '../../../../games/reversi/maps'; import { ParsedUrlQuery } from 'querystring'; -export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user?: any): void { +export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user?: any): void { const q = request.resourceURL.query as ParsedUrlQuery; - const gameId = q.game; + const gameId = q.game as string; // Subscribe game stream - subscriber.subscribe(`misskey:reversi-game-stream:${gameId}`); - subscriber.on('message', (_, data) => { - connection.send(data); + subscriber.on(`reversi-game-stream:${gameId}`, data => { + connection.send(JSON.stringify(data)); }); connection.on('message', async (data) => { diff --git a/src/server/api/stream/games/reversi.ts b/src/server/api/stream/games/reversi.ts index 3f23466520..f467613b21 100644 --- a/src/server/api/stream/games/reversi.ts +++ b/src/server/api/stream/games/reversi.ts @@ -1,14 +1,13 @@ import * as mongo from 'mongodb'; import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import Matching, { pack } from '../../../../models/games/reversi/matching'; -import publishUserStream from '../../../../stream'; +import { publishUserStream } from '../../../../stream'; -export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void { +export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { // Subscribe reversi stream - subscriber.subscribe(`misskey:reversi-stream:${user._id}`); - subscriber.on('message', (_, data) => { - connection.send(data); + subscriber.on(`reversi-stream:${user._id}`, data => { + connection.send(JSON.stringify(data)); }); connection.on('message', async (data) => { diff --git a/src/server/api/stream/global-timeline.ts b/src/server/api/stream/global-timeline.ts index f31ce17752..4786450cbb 100644 --- a/src/server/api/stream/global-timeline.ts +++ b/src/server/api/stream/global-timeline.ts @@ -1,5 +1,5 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import { IUser } from '../../../models/user'; import Mute from '../../../models/mute'; @@ -7,18 +7,14 @@ import Mute from '../../../models/mute'; export default async function( request: websocket.request, connection: websocket.connection, - subscriber: redis.RedisClient, + subscriber: Xev, user: IUser ) { - // Subscribe stream - subscriber.subscribe(`misskey:global-timeline`); - const mute = await Mute.find({ muterId: user._id }); const mutedUserIds = mute.map(m => m.muteeId.toString()); - subscriber.on('message', async (_, data) => { - const note = JSON.parse(data); - + // Subscribe stream + subscriber.on('global-timeline', async note => { //#region 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する if (mutedUserIds.indexOf(note.userId) != -1) { return; diff --git a/src/server/api/stream/home.ts b/src/server/api/stream/home.ts index d9b8f7fb96..dc3ce9d19f 100644 --- a/src/server/api/stream/home.ts +++ b/src/server/api/stream/home.ts @@ -1,5 +1,5 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import * as debug from 'debug'; import User, { IUser } from '../../../models/user'; @@ -14,68 +14,54 @@ const log = debug('misskey'); export default async function( request: websocket.request, connection: websocket.connection, - subscriber: redis.RedisClient, + subscriber: Xev, user: IUser, app: IApp ) { - // Subscribe Home stream channel - subscriber.subscribe(`misskey:user-stream:${user._id}`); - const mute = await Mute.find({ muterId: user._id }); const mutedUserIds = mute.map(m => m.muteeId.toString()); - subscriber.on('message', async (channel, data) => { - switch (channel.split(':')[1]) { - case 'user-stream': - try { - const x = JSON.parse(data); - - //#region 流れてきたメッセージがミュートしているユーザーが関わるものだったら無視する - if (x.type == 'note') { - if (mutedUserIds.includes(x.body.userId)) { - return; - } - if (x.body.reply != null && mutedUserIds.includes(x.body.reply.userId)) { - return; - } - if (x.body.renote != null && mutedUserIds.includes(x.body.renote.userId)) { - return; - } - } else if (x.type == 'notification') { - if (mutedUserIds.includes(x.body.userId)) { - return; - } - } - //#endregion + async function onNoteStream(noteId: any) { + const note = await packNote(noteId, user, { + detail: true + }); - // Renoteなら再pack - if (x.type == 'note' && x.body.renoteId != null) { - x.body.renote = await pack(x.body.renoteId, user, { - detail: true - }); - data = JSON.stringify(x); - } + connection.send(JSON.stringify({ + type: 'note-updated', + body: { + note: note + } + })); + } - connection.send(data); - } catch (e) { - connection.send(data); - } - break; + // Subscribe Home stream channel + subscriber.on(`user-stream:${user._id}`, async x => { + //#region 流れてきたメッセージがミュートしているユーザーが関わるものだったら無視する + if (x.type == 'note') { + if (mutedUserIds.includes(x.body.userId)) { + return; + } + if (x.body.reply != null && mutedUserIds.includes(x.body.reply.userId)) { + return; + } + if (x.body.renote != null && mutedUserIds.includes(x.body.renote.userId)) { + return; + } + } else if (x.type == 'notification') { + if (mutedUserIds.includes(x.body.userId)) { + return; + } + } + //#endregion - case 'note-stream': - const noteId = channel.split(':')[2]; - log(`RECEIVED: ${noteId} ${data} by @${user.username}`); - const note = await packNote(noteId, user, { - detail: true - }); - connection.send(JSON.stringify({ - type: 'note-updated', - body: { - note: note - } - })); - break; + // Renoteなら再pack + if (x.type == 'note' && x.body.renoteId != null) { + x.body.renote = await pack(x.body.renoteId, user, { + detail: true + }); } + + connection.send(JSON.stringify(x)); }); connection.on('message', async data => { @@ -113,9 +99,14 @@ export default async function( case 'capture': if (!msg.id) return; - const noteId = msg.id; - log(`CAPTURE: ${noteId} by @${user.username}`); - subscriber.subscribe(`misskey:note-stream:${noteId}`); + log(`CAPTURE: ${msg.id} by @${user.username}`); + subscriber.on(`note-stream:${msg.id}`, onNoteStream); + break; + + case 'decapture': + if (!msg.id) return; + log(`DECAPTURE: ${msg.id} by @${user.username}`); + subscriber.off(`note-stream:${msg.id}`, onNoteStream); break; } }); diff --git a/src/server/api/stream/hybrid-timeline.ts b/src/server/api/stream/hybrid-timeline.ts index 513af9c1d4..5f411317c3 100644 --- a/src/server/api/stream/hybrid-timeline.ts +++ b/src/server/api/stream/hybrid-timeline.ts @@ -1,5 +1,5 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import { IUser } from '../../../models/user'; import Mute from '../../../models/mute'; @@ -8,18 +8,17 @@ import { pack } from '../../../models/note'; export default async function( request: websocket.request, connection: websocket.connection, - subscriber: redis.RedisClient, + subscriber: Xev, user: IUser ) { // Subscribe stream - subscriber.subscribe('misskey:hybrid-timeline', `misskey:hybrid-timeline:${user._id}`); + subscriber.on('hybrid-timeline', onEvent); + subscriber.on(`hybrid-timeline:${user._id}`, onEvent); const mute = await Mute.find({ muterId: user._id }); const mutedUserIds = mute.map(m => m.muteeId.toString()); - subscriber.on('message', async (_, data) => { - const note = JSON.parse(data); - + async function onEvent(note: any) { //#region 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する if (mutedUserIds.indexOf(note.userId) != -1) { return; @@ -43,5 +42,5 @@ export default async function( type: 'note', body: note })); - }); + } } diff --git a/src/server/api/stream/local-timeline.ts b/src/server/api/stream/local-timeline.ts index 32718810dc..82060a7aaa 100644 --- a/src/server/api/stream/local-timeline.ts +++ b/src/server/api/stream/local-timeline.ts @@ -1,5 +1,5 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import { IUser } from '../../../models/user'; import Mute from '../../../models/mute'; @@ -8,18 +8,14 @@ import { pack } from '../../../models/note'; export default async function( request: websocket.request, connection: websocket.connection, - subscriber: redis.RedisClient, + subscriber: Xev, user: IUser ) { - // Subscribe stream - subscriber.subscribe('misskey:local-timeline'); - const mute = await Mute.find({ muterId: user._id }); const mutedUserIds = mute.map(m => m.muteeId.toString()); - subscriber.on('message', async (_, data) => { - const note = JSON.parse(data); - + // Subscribe stream + subscriber.on('local-timeline', async note => { //#region 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する if (mutedUserIds.indexOf(note.userId) != -1) { return; diff --git a/src/server/api/stream/messaging-index.ts b/src/server/api/stream/messaging-index.ts index c1b2fbc806..9af63f2812 100644 --- a/src/server/api/stream/messaging-index.ts +++ b/src/server/api/stream/messaging-index.ts @@ -1,10 +1,9 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; -export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void { +export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { // Subscribe messaging index stream - subscriber.subscribe(`misskey:messaging-index-stream:${user._id}`); - subscriber.on('message', (_, data) => { - connection.send(data); + subscriber.on(`messaging-index-stream:${user._id}`, data => { + connection.send(JSON.stringify(data)); }); } diff --git a/src/server/api/stream/messaging.ts b/src/server/api/stream/messaging.ts index 3e6c2cd509..8b352cea3c 100644 --- a/src/server/api/stream/messaging.ts +++ b/src/server/api/stream/messaging.ts @@ -1,16 +1,15 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import read from '../common/read-messaging-message'; import { ParsedUrlQuery } from 'querystring'; -export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void { +export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { const q = request.resourceURL.query as ParsedUrlQuery; const otherparty = q.otherparty as string; // Subscribe messaging stream - subscriber.subscribe(`misskey:messaging-stream:${user._id}-${otherparty}`); - subscriber.on('message', (_, data) => { - connection.send(data); + subscriber.on(`messaging-stream:${user._id}-${otherparty}`, data => { + connection.send(JSON.stringify(data)); }); connection.on('message', async (data) => { diff --git a/src/server/api/stream/user-list.ts b/src/server/api/stream/user-list.ts index ba03b97860..33cc2a1ee1 100644 --- a/src/server/api/stream/user-list.ts +++ b/src/server/api/stream/user-list.ts @@ -1,14 +1,13 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import { ParsedUrlQuery } from 'querystring'; -export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void { +export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { const q = request.resourceURL.query as ParsedUrlQuery; const listId = q.listId as string; // Subscribe stream - subscriber.subscribe(`misskey:user-list-stream:${listId}`); - subscriber.on('message', (_, data) => { + subscriber.on(`user-list-stream:${listId}`, data => { connection.send(data); }); } -- cgit v1.2.3-freya