From 05f9ad11bb51ca932cba7716302924dfaa7aafdd Mon Sep 17 00:00:00 2001 From: syuilo Date: Thu, 11 Oct 2018 18:09:41 +0900 Subject: Redisがインストールされているときはイベントの共有にRedisのpub/subを使うように MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/api/stream/index.ts | 6 +++--- src/server/api/streaming.ts | 32 ++++++++++++++++++++++++++++++-- 2 files changed, 33 insertions(+), 5 deletions(-) (limited to 'src/server/api') diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts index 838d847004..afedd4362c 100644 --- a/src/server/api/stream/index.ts +++ b/src/server/api/stream/index.ts @@ -1,6 +1,5 @@ import autobind from 'autobind-decorator'; import * as websocket from 'websocket'; -import Xev from 'xev'; import * as debug from 'debug'; import User, { IUser } from '../../../models/user'; @@ -11,6 +10,7 @@ import readNote from '../../../services/note/read'; import Channel from './channel'; import channels from './channels'; +import { EventEmitter } from 'events'; const log = debug('misskey'); @@ -21,14 +21,14 @@ export default class Connection { public user?: IUser; public app: IApp; private wsConnection: websocket.connection; - public subscriber: Xev; + public subscriber: EventEmitter; private channels: Channel[] = []; private subscribingNotes: any = {}; public sendMessageToWsOverride: any = null; // 後方互換性のため constructor( wsConnection: websocket.connection, - subscriber: Xev, + subscriber: EventEmitter, user: IUser, app: IApp ) { diff --git a/src/server/api/streaming.ts b/src/server/api/streaming.ts index a0a219a317..249157e222 100644 --- a/src/server/api/streaming.ts +++ b/src/server/api/streaming.ts @@ -1,11 +1,14 @@ import * as http from 'http'; import * as websocket from 'websocket'; +import * as redis from 'redis'; import Xev from 'xev'; import MainStreamConnection from './stream'; import { ParsedUrlQuery } from 'querystring'; import authenticate from './authenticate'; import channels from './stream/channels'; +import { EventEmitter } from 'events'; +import config from '../../config'; module.exports = (server: http.Server) => { // Init websocket server @@ -16,11 +19,36 @@ module.exports = (server: http.Server) => { ws.on('request', async (request) => { const connection = request.accept(); - const ev = new Xev(); - const q = request.resourceURL.query as ParsedUrlQuery; const [user, app] = await authenticate(q.i as string); + let ev: EventEmitter; + + if (config.redis) { + // Connect to Redis + const subscriber = redis.createClient( + config.redis.port, config.redis.host); + + subscriber.subscribe('misskey'); + + ev = new EventEmitter(); + + subscriber.on('message', async (_, data) => { + const obj = JSON.parse(data); + + console.log(obj); + + ev.emit(obj.channel, obj.message); + }); + + connection.once('close', () => { + subscriber.unsubscribe(); + subscriber.quit(); + }); + } else { + ev = new Xev(); + } + const main = new MainStreamConnection(connection, ev, user, app); // 後方互換性のため -- cgit v1.2.3-freya