diff options
Diffstat (limited to 'src/server/api')
24 files changed, 115 insertions, 143 deletions
diff --git a/src/server/api/common/read-messaging-message.ts b/src/server/api/common/read-messaging-message.ts index a34fd8a703..005240a37c 100644 --- a/src/server/api/common/read-messaging-message.ts +++ b/src/server/api/common/read-messaging-message.ts @@ -1,7 +1,7 @@ import * as mongo from 'mongodb'; import Message from '../../../models/messaging-message'; import { IMessagingMessage as IMessage } from '../../../models/messaging-message'; -import publishUserStream from '../../../stream'; +import { publishUserStream } from '../../../stream'; import { publishMessagingStream } from '../../../stream'; import { publishMessagingIndexStream } from '../../../stream'; import User from '../../../models/user'; diff --git a/src/server/api/common/read-notification.ts b/src/server/api/common/read-notification.ts index 3a1f4cfbde..0b0f3e4e5a 100644 --- a/src/server/api/common/read-notification.ts +++ b/src/server/api/common/read-notification.ts @@ -1,6 +1,6 @@ import * as mongo from 'mongodb'; import { default as Notification, INotification } from '../../../models/notification'; -import publishUserStream from '../../../stream'; +import { publishUserStream } from '../../../stream'; import Mute from '../../../models/mute'; import User from '../../../models/user'; diff --git a/src/server/api/endpoints/games/reversi/match.ts b/src/server/api/endpoints/games/reversi/match.ts index 24746170ff..aba400af1d 100644 --- a/src/server/api/endpoints/games/reversi/match.ts +++ b/src/server/api/endpoints/games/reversi/match.ts @@ -2,7 +2,7 @@ import $ from 'cafy'; import ID from '../../../../../misc/cafy-id'; import Matching, { pack as packMatching } from '../../../../../models/games/reversi/matching'; import ReversiGame, { pack as packGame } from '../../../../../models/games/reversi/game'; import User, { ILocalUser } from '../../../../../models/user'; -import publishUserStream, { publishReversiStream } from '../../../../../stream'; +import { publishUserStream, publishReversiStream } from '../../../../../stream'; import { eighteight } from '../../../../../games/reversi/maps'; export const meta = { diff --git a/src/server/api/endpoints/i/regenerate_token.ts b/src/server/api/endpoints/i/regenerate_token.ts index 374861daaf..fe4a5cd118 100644 --- a/src/server/api/endpoints/i/regenerate_token.ts +++ b/src/server/api/endpoints/i/regenerate_token.ts @@ -1,7 +1,7 @@ import $ from 'cafy'; import * as bcrypt from 'bcryptjs'; import User, { ILocalUser } from '../../../../models/user'; -import event from '../../../../stream'; +import { publishUserStream } from '../../../../stream'; import generateUserToken from '../../common/generate-native-user-token'; export const meta = { @@ -33,5 +33,5 @@ export default async (params: any, user: ILocalUser) => new Promise(async (res, res(); // Publish event - event(user._id, 'my_token_regenerated'); + publishUserStream(user._id, 'my_token_regenerated'); }); diff --git a/src/server/api/endpoints/i/update.ts b/src/server/api/endpoints/i/update.ts index 019c8281a2..aa801b1b04 100644 --- a/src/server/api/endpoints/i/update.ts +++ b/src/server/api/endpoints/i/update.ts @@ -1,6 +1,6 @@ import $ from 'cafy'; import ID from '../../../../misc/cafy-id'; import User, { isValidName, isValidDescription, isValidLocation, isValidBirthday, pack, ILocalUser } from '../../../../models/user'; -import event from '../../../../stream'; +import { publishUserStream } from '../../../../stream'; import DriveFile from '../../../../models/drive-file'; import acceptAllFollowRequests from '../../../../services/following/requests/accept-all'; import { IApp } from '../../../../models/app'; @@ -133,7 +133,7 @@ export default async (params: any, user: ILocalUser, app: IApp) => new Promise(a res(iObj); // Publish meUpdated event - event(user._id, 'meUpdated', iObj); + publishUserStream(user._id, 'meUpdated', iObj); // 鍵垢を解除したとき、溜まっていたフォローリクエストがあるならすべて承認 if (user.isLocked && isLocked === false) { diff --git a/src/server/api/endpoints/i/update_client_setting.ts b/src/server/api/endpoints/i/update_client_setting.ts index 9342f5dadc..aed93c792f 100644 --- a/src/server/api/endpoints/i/update_client_setting.ts +++ b/src/server/api/endpoints/i/update_client_setting.ts @@ -1,6 +1,6 @@ import $ from 'cafy'; import User, { ILocalUser } from '../../../../models/user'; -import event from '../../../../stream'; +import { publishUserStream } from '../../../../stream'; export const meta = { requireCredential: true, @@ -26,7 +26,7 @@ export default async (params: any, user: ILocalUser) => new Promise(async (res, res(); // Publish event - event(user._id, 'clientSettingUpdated', { + publishUserStream(user._id, 'clientSettingUpdated', { key: name, value }); diff --git a/src/server/api/endpoints/i/update_home.ts b/src/server/api/endpoints/i/update_home.ts index 6f39854290..ffca9b90b3 100644 --- a/src/server/api/endpoints/i/update_home.ts +++ b/src/server/api/endpoints/i/update_home.ts @@ -1,6 +1,6 @@ import $ from 'cafy'; import User, { ILocalUser } from '../../../../models/user'; -import event from '../../../../stream'; +import { publishUserStream } from '../../../../stream'; export const meta = { requireCredential: true, @@ -25,5 +25,5 @@ export default async (params: any, user: ILocalUser) => new Promise(async (res, res(); - event(user._id, 'home_updated', home); + publishUserStream(user._id, 'home_updated', home); }); diff --git a/src/server/api/endpoints/i/update_mobile_home.ts b/src/server/api/endpoints/i/update_mobile_home.ts index 1babe409e9..0b72fbe2c1 100644 --- a/src/server/api/endpoints/i/update_mobile_home.ts +++ b/src/server/api/endpoints/i/update_mobile_home.ts @@ -1,6 +1,6 @@ import $ from 'cafy'; import User, { ILocalUser } from '../../../../models/user'; -import event from '../../../../stream'; +import { publishUserStream } from '../../../../stream'; export const meta = { requireCredential: true, @@ -24,5 +24,5 @@ export default async (params: any, user: ILocalUser) => new Promise(async (res, res(); - event(user._id, 'mobile_home_updated', home); + publishUserStream(user._id, 'mobile_home_updated', home); }); diff --git a/src/server/api/endpoints/i/update_widget.ts b/src/server/api/endpoints/i/update_widget.ts index 5bf9c23053..5cbe7c07a3 100644 --- a/src/server/api/endpoints/i/update_widget.ts +++ b/src/server/api/endpoints/i/update_widget.ts @@ -1,6 +1,6 @@ import $ from 'cafy'; import User, { ILocalUser } from '../../../../models/user'; -import event from '../../../../stream'; +import { publishUserStream } from '../../../../stream'; export const meta = { requireCredential: true, @@ -73,7 +73,7 @@ export default async (params: any, user: ILocalUser) => new Promise(async (res, //#endregion if (widget) { - event(user._id, 'widgetUpdated', { + publishUserStream(user._id, 'widgetUpdated', { id, data }); diff --git a/src/server/api/endpoints/messaging/messages/create.ts b/src/server/api/endpoints/messaging/messages/create.ts index 9b897b45e7..8ebf1a2a2b 100644 --- a/src/server/api/endpoints/messaging/messages/create.ts +++ b/src/server/api/endpoints/messaging/messages/create.ts @@ -6,7 +6,7 @@ import User, { ILocalUser } from '../../../../../models/user'; import Mute from '../../../../../models/mute'; import DriveFile from '../../../../../models/drive-file'; import { pack } from '../../../../../models/messaging-message'; -import publishUserStream from '../../../../../stream'; +import { publishUserStream } from '../../../../../stream'; import { publishMessagingStream, publishMessagingIndexStream } from '../../../../../stream'; import pushSw from '../../../../../push-sw'; import config from '../../../../../config'; diff --git a/src/server/api/endpoints/notifications/mark_all_as_read.ts b/src/server/api/endpoints/notifications/mark_all_as_read.ts index 91319d0553..a9875ebb01 100644 --- a/src/server/api/endpoints/notifications/mark_all_as_read.ts +++ b/src/server/api/endpoints/notifications/mark_all_as_read.ts @@ -1,5 +1,5 @@ import Notification from '../../../../models/notification'; -import event from '../../../../stream'; +import { publishUserStream } from '../../../../stream'; import User, { ILocalUser } from '../../../../models/user'; export const meta = { @@ -40,5 +40,5 @@ export default (params: any, user: ILocalUser) => new Promise(async (res, rej) = }); // 全ての通知を読みましたよというイベントを発行 - event(user._id, 'read_all_notifications'); + publishUserStream(user._id, 'read_all_notifications'); }); diff --git a/src/server/api/private/signin.ts b/src/server/api/private/signin.ts index 9719329ddb..65413208dd 100644 --- a/src/server/api/private/signin.ts +++ b/src/server/api/private/signin.ts @@ -3,7 +3,7 @@ import * as bcrypt from 'bcryptjs'; import * as speakeasy from 'speakeasy'; import User, { ILocalUser } from '../../../models/user'; import Signin, { pack } from '../../../models/signin'; -import event from '../../../stream'; +import { publishUserStream } from '../../../stream'; import signin from '../common/signin'; import config from '../../../config'; @@ -86,5 +86,5 @@ export default async (ctx: Koa.Context) => { }); // Publish signin event - event(user._id, 'signin', await pack(record)); + publishUserStream(user._id, 'signin', await pack(record)); }; diff --git a/src/server/api/service/twitter.ts b/src/server/api/service/twitter.ts index 080f5879a3..8c668e832a 100644 --- a/src/server/api/service/twitter.ts +++ b/src/server/api/service/twitter.ts @@ -4,7 +4,7 @@ import * as uuid from 'uuid'; import autwh from 'autwh'; import redis from '../../../db/redis'; import User, { pack, ILocalUser } from '../../../models/user'; -import event from '../../../stream'; +import { publishUserStream } from '../../../stream'; import config from '../../../config'; import signin from '../common/signin'; @@ -49,7 +49,7 @@ router.get('/disconnect/twitter', async ctx => { ctx.body = `Twitterの連携を解除しました :v:`; // Publish i updated event - event(user._id, 'meUpdated', await pack(user, user, { + publishUserStream(user._id, 'meUpdated', await pack(user, user, { detail: true, includeSecrets: true })); @@ -174,7 +174,7 @@ if (config.twitter == null) { ctx.body = `Twitter: @${result.screenName} を、Misskey: @${user.username} に接続しました!`; // Publish i updated event - event(user._id, 'meUpdated', await pack(user, user, { + publishUserStream(user._id, 'meUpdated', await pack(user, user, { detail: true, includeSecrets: true })); diff --git a/src/server/api/stream/drive.ts b/src/server/api/stream/drive.ts index c97ab80dcc..28c241e1bc 100644 --- a/src/server/api/stream/drive.ts +++ b/src/server/api/stream/drive.ts @@ -1,10 +1,9 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; -export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void { +export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { // Subscribe drive stream - subscriber.subscribe(`misskey:drive-stream:${user._id}`); - subscriber.on('message', (_, data) => { - connection.send(data); + subscriber.on(`drive-stream:${user._id}`, data => { + connection.send(JSON.stringify(data)); }); } diff --git a/src/server/api/stream/games/reversi-game.ts b/src/server/api/stream/games/reversi-game.ts index da949e90ff..5cbbf42d59 100644 --- a/src/server/api/stream/games/reversi-game.ts +++ b/src/server/api/stream/games/reversi-game.ts @@ -1,5 +1,5 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import * as CRC32 from 'crc-32'; import ReversiGame, { pack } from '../../../../models/games/reversi/game'; import { publishReversiGameStream } from '../../../../stream'; @@ -7,14 +7,13 @@ import Reversi from '../../../../games/reversi/core'; import * as maps from '../../../../games/reversi/maps'; import { ParsedUrlQuery } from 'querystring'; -export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user?: any): void { +export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user?: any): void { const q = request.resourceURL.query as ParsedUrlQuery; - const gameId = q.game; + const gameId = q.game as string; // Subscribe game stream - subscriber.subscribe(`misskey:reversi-game-stream:${gameId}`); - subscriber.on('message', (_, data) => { - connection.send(data); + subscriber.on(`reversi-game-stream:${gameId}`, data => { + connection.send(JSON.stringify(data)); }); connection.on('message', async (data) => { diff --git a/src/server/api/stream/games/reversi.ts b/src/server/api/stream/games/reversi.ts index 3f23466520..f467613b21 100644 --- a/src/server/api/stream/games/reversi.ts +++ b/src/server/api/stream/games/reversi.ts @@ -1,14 +1,13 @@ import * as mongo from 'mongodb'; import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import Matching, { pack } from '../../../../models/games/reversi/matching'; -import publishUserStream from '../../../../stream'; +import { publishUserStream } from '../../../../stream'; -export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void { +export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { // Subscribe reversi stream - subscriber.subscribe(`misskey:reversi-stream:${user._id}`); - subscriber.on('message', (_, data) => { - connection.send(data); + subscriber.on(`reversi-stream:${user._id}`, data => { + connection.send(JSON.stringify(data)); }); connection.on('message', async (data) => { diff --git a/src/server/api/stream/global-timeline.ts b/src/server/api/stream/global-timeline.ts index f31ce17752..4786450cbb 100644 --- a/src/server/api/stream/global-timeline.ts +++ b/src/server/api/stream/global-timeline.ts @@ -1,5 +1,5 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import { IUser } from '../../../models/user'; import Mute from '../../../models/mute'; @@ -7,18 +7,14 @@ import Mute from '../../../models/mute'; export default async function( request: websocket.request, connection: websocket.connection, - subscriber: redis.RedisClient, + subscriber: Xev, user: IUser ) { - // Subscribe stream - subscriber.subscribe(`misskey:global-timeline`); - const mute = await Mute.find({ muterId: user._id }); const mutedUserIds = mute.map(m => m.muteeId.toString()); - subscriber.on('message', async (_, data) => { - const note = JSON.parse(data); - + // Subscribe stream + subscriber.on('global-timeline', async note => { //#region 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する if (mutedUserIds.indexOf(note.userId) != -1) { return; diff --git a/src/server/api/stream/home.ts b/src/server/api/stream/home.ts index d9b8f7fb96..dc3ce9d19f 100644 --- a/src/server/api/stream/home.ts +++ b/src/server/api/stream/home.ts @@ -1,5 +1,5 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import * as debug from 'debug'; import User, { IUser } from '../../../models/user'; @@ -14,68 +14,54 @@ const log = debug('misskey'); export default async function( request: websocket.request, connection: websocket.connection, - subscriber: redis.RedisClient, + subscriber: Xev, user: IUser, app: IApp ) { - // Subscribe Home stream channel - subscriber.subscribe(`misskey:user-stream:${user._id}`); - const mute = await Mute.find({ muterId: user._id }); const mutedUserIds = mute.map(m => m.muteeId.toString()); - subscriber.on('message', async (channel, data) => { - switch (channel.split(':')[1]) { - case 'user-stream': - try { - const x = JSON.parse(data); - - //#region 流れてきたメッセージがミュートしているユーザーが関わるものだったら無視する - if (x.type == 'note') { - if (mutedUserIds.includes(x.body.userId)) { - return; - } - if (x.body.reply != null && mutedUserIds.includes(x.body.reply.userId)) { - return; - } - if (x.body.renote != null && mutedUserIds.includes(x.body.renote.userId)) { - return; - } - } else if (x.type == 'notification') { - if (mutedUserIds.includes(x.body.userId)) { - return; - } - } - //#endregion + async function onNoteStream(noteId: any) { + const note = await packNote(noteId, user, { + detail: true + }); - // Renoteなら再pack - if (x.type == 'note' && x.body.renoteId != null) { - x.body.renote = await pack(x.body.renoteId, user, { - detail: true - }); - data = JSON.stringify(x); - } + connection.send(JSON.stringify({ + type: 'note-updated', + body: { + note: note + } + })); + } - connection.send(data); - } catch (e) { - connection.send(data); - } - break; + // Subscribe Home stream channel + subscriber.on(`user-stream:${user._id}`, async x => { + //#region 流れてきたメッセージがミュートしているユーザーが関わるものだったら無視する + if (x.type == 'note') { + if (mutedUserIds.includes(x.body.userId)) { + return; + } + if (x.body.reply != null && mutedUserIds.includes(x.body.reply.userId)) { + return; + } + if (x.body.renote != null && mutedUserIds.includes(x.body.renote.userId)) { + return; + } + } else if (x.type == 'notification') { + if (mutedUserIds.includes(x.body.userId)) { + return; + } + } + //#endregion - case 'note-stream': - const noteId = channel.split(':')[2]; - log(`RECEIVED: ${noteId} ${data} by @${user.username}`); - const note = await packNote(noteId, user, { - detail: true - }); - connection.send(JSON.stringify({ - type: 'note-updated', - body: { - note: note - } - })); - break; + // Renoteなら再pack + if (x.type == 'note' && x.body.renoteId != null) { + x.body.renote = await pack(x.body.renoteId, user, { + detail: true + }); } + + connection.send(JSON.stringify(x)); }); connection.on('message', async data => { @@ -113,9 +99,14 @@ export default async function( case 'capture': if (!msg.id) return; - const noteId = msg.id; - log(`CAPTURE: ${noteId} by @${user.username}`); - subscriber.subscribe(`misskey:note-stream:${noteId}`); + log(`CAPTURE: ${msg.id} by @${user.username}`); + subscriber.on(`note-stream:${msg.id}`, onNoteStream); + break; + + case 'decapture': + if (!msg.id) return; + log(`DECAPTURE: ${msg.id} by @${user.username}`); + subscriber.off(`note-stream:${msg.id}`, onNoteStream); break; } }); diff --git a/src/server/api/stream/hybrid-timeline.ts b/src/server/api/stream/hybrid-timeline.ts index 513af9c1d4..5f411317c3 100644 --- a/src/server/api/stream/hybrid-timeline.ts +++ b/src/server/api/stream/hybrid-timeline.ts @@ -1,5 +1,5 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import { IUser } from '../../../models/user'; import Mute from '../../../models/mute'; @@ -8,18 +8,17 @@ import { pack } from '../../../models/note'; export default async function( request: websocket.request, connection: websocket.connection, - subscriber: redis.RedisClient, + subscriber: Xev, user: IUser ) { // Subscribe stream - subscriber.subscribe('misskey:hybrid-timeline', `misskey:hybrid-timeline:${user._id}`); + subscriber.on('hybrid-timeline', onEvent); + subscriber.on(`hybrid-timeline:${user._id}`, onEvent); const mute = await Mute.find({ muterId: user._id }); const mutedUserIds = mute.map(m => m.muteeId.toString()); - subscriber.on('message', async (_, data) => { - const note = JSON.parse(data); - + async function onEvent(note: any) { //#region 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する if (mutedUserIds.indexOf(note.userId) != -1) { return; @@ -43,5 +42,5 @@ export default async function( type: 'note', body: note })); - }); + } } diff --git a/src/server/api/stream/local-timeline.ts b/src/server/api/stream/local-timeline.ts index 32718810dc..82060a7aaa 100644 --- a/src/server/api/stream/local-timeline.ts +++ b/src/server/api/stream/local-timeline.ts @@ -1,5 +1,5 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import { IUser } from '../../../models/user'; import Mute from '../../../models/mute'; @@ -8,18 +8,14 @@ import { pack } from '../../../models/note'; export default async function( request: websocket.request, connection: websocket.connection, - subscriber: redis.RedisClient, + subscriber: Xev, user: IUser ) { - // Subscribe stream - subscriber.subscribe('misskey:local-timeline'); - const mute = await Mute.find({ muterId: user._id }); const mutedUserIds = mute.map(m => m.muteeId.toString()); - subscriber.on('message', async (_, data) => { - const note = JSON.parse(data); - + // Subscribe stream + subscriber.on('local-timeline', async note => { //#region 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する if (mutedUserIds.indexOf(note.userId) != -1) { return; diff --git a/src/server/api/stream/messaging-index.ts b/src/server/api/stream/messaging-index.ts index c1b2fbc806..9af63f2812 100644 --- a/src/server/api/stream/messaging-index.ts +++ b/src/server/api/stream/messaging-index.ts @@ -1,10 +1,9 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; -export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void { +export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { // Subscribe messaging index stream - subscriber.subscribe(`misskey:messaging-index-stream:${user._id}`); - subscriber.on('message', (_, data) => { - connection.send(data); + subscriber.on(`messaging-index-stream:${user._id}`, data => { + connection.send(JSON.stringify(data)); }); } diff --git a/src/server/api/stream/messaging.ts b/src/server/api/stream/messaging.ts index 3e6c2cd509..8b352cea3c 100644 --- a/src/server/api/stream/messaging.ts +++ b/src/server/api/stream/messaging.ts @@ -1,16 +1,15 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import read from '../common/read-messaging-message'; import { ParsedUrlQuery } from 'querystring'; -export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void { +export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { const q = request.resourceURL.query as ParsedUrlQuery; const otherparty = q.otherparty as string; // Subscribe messaging stream - subscriber.subscribe(`misskey:messaging-stream:${user._id}-${otherparty}`); - subscriber.on('message', (_, data) => { - connection.send(data); + subscriber.on(`messaging-stream:${user._id}-${otherparty}`, data => { + connection.send(JSON.stringify(data)); }); connection.on('message', async (data) => { diff --git a/src/server/api/stream/user-list.ts b/src/server/api/stream/user-list.ts index ba03b97860..33cc2a1ee1 100644 --- a/src/server/api/stream/user-list.ts +++ b/src/server/api/stream/user-list.ts @@ -1,14 +1,13 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import { ParsedUrlQuery } from 'querystring'; -export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void { +export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { const q = request.resourceURL.query as ParsedUrlQuery; const listId = q.listId as string; // Subscribe stream - subscriber.subscribe(`misskey:user-list-stream:${listId}`); - subscriber.on('message', (_, data) => { + subscriber.on(`user-list-stream:${listId}`, data => { connection.send(data); }); } diff --git a/src/server/api/streaming.ts b/src/server/api/streaming.ts index afa0de2ce1..c8b2d4e0b9 100644 --- a/src/server/api/streaming.ts +++ b/src/server/api/streaming.ts @@ -1,7 +1,6 @@ import * as http from 'http'; import * as websocket from 'websocket'; -import * as redis from 'redis'; -import config from '../../config'; +import Xev from 'xev'; import homeStream from './stream/home'; import localTimelineStream from './stream/local-timeline'; @@ -39,20 +38,17 @@ module.exports = (server: http.Server) => { return; } - // Connect to Redis - const subscriber = redis.createClient( - config.redis.port, config.redis.host); + const ev = new Xev(); - connection.on('close', () => { - subscriber.unsubscribe(); - subscriber.quit(); + connection.once('close', () => { + ev.removeAllListeners(); }); const q = request.resourceURL.query as ParsedUrlQuery; const [user, app] = await authenticate(q.i as string); if (request.resourceURL.pathname === '/games/reversi-game') { - reversiGameStream(request, connection, subscriber, user); + reversiGameStream(request, connection, ev, user); return; } @@ -75,7 +71,7 @@ module.exports = (server: http.Server) => { null; if (channel !== null) { - channel(request, connection, subscriber, user, app); + channel(request, connection, ev, user, app); } else { connection.close(); } |