From 0e4a111f81cceed275d9bec2695f6e401fb654d8 Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 12 Nov 2021 02:02:25 +0900 Subject: refactoring Resolve #7779 --- packages/backend/src/server/api/streaming.ts | 67 ++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 packages/backend/src/server/api/streaming.ts (limited to 'packages/backend/src/server/api/streaming.ts') diff --git a/packages/backend/src/server/api/streaming.ts b/packages/backend/src/server/api/streaming.ts new file mode 100644 index 0000000000..8808bc9860 --- /dev/null +++ b/packages/backend/src/server/api/streaming.ts @@ -0,0 +1,67 @@ +import * as http from 'http'; +import * as websocket from 'websocket'; + +import MainStreamConnection from './stream/index'; +import { ParsedUrlQuery } from 'querystring'; +import authenticate from './authenticate'; +import { EventEmitter } from 'events'; +import { subsdcriber as redisClient } from '../../db/redis'; +import { Users } from '@/models/index'; + +module.exports = (server: http.Server) => { + // Init websocket server + const ws = new websocket.server({ + httpServer: server + }); + + ws.on('request', async (request) => { + const q = request.resourceURL.query as ParsedUrlQuery; + + // TODO: トークンが間違ってるなどしてauthenticateに失敗したら + // コネクション切断するなりエラーメッセージ返すなりする + // (現状はエラーがキャッチされておらずサーバーのログに流れて邪魔なので) + const [user, app] = await authenticate(q.i as string); + + if (user?.isSuspended) { + request.reject(400); + return; + } + + const connection = request.accept(); + + const ev = new EventEmitter(); + + async function onRedisMessage(_: string, data: string) { + const parsed = JSON.parse(data); + ev.emit(parsed.channel, parsed.message); + } + + redisClient.on('message', onRedisMessage); + + const main = new MainStreamConnection(connection, ev, user, app); + + const intervalId = user ? setInterval(() => { + Users.update(user.id, { + lastActiveDate: new Date(), + }); + }, 1000 * 60 * 5) : null; + if (user) { + Users.update(user.id, { + lastActiveDate: new Date(), + }); + } + + connection.once('close', () => { + ev.removeAllListeners(); + main.dispose(); + redisClient.off('message', onRedisMessage); + if (intervalId) clearInterval(intervalId); + }); + + connection.on('message', async (data) => { + if (data.utf8Data === 'ping') { + connection.send('pong'); + } + }); + }); +}; -- cgit v1.2.3-freya