diff options
| author | syuilo <Syuilotan@yahoo.co.jp> | 2021-11-12 02:02:25 +0900 |
|---|---|---|
| committer | syuilo <Syuilotan@yahoo.co.jp> | 2021-11-12 02:02:25 +0900 |
| commit | 0e4a111f81cceed275d9bec2695f6e401fb654d8 (patch) | |
| tree | 40874799472fa07416f17b50a398ac33b7771905 /packages/backend/src/server/api/streaming.ts | |
| parent | update deps (diff) | |
| download | sharkey-0e4a111f81cceed275d9bec2695f6e401fb654d8.tar.gz sharkey-0e4a111f81cceed275d9bec2695f6e401fb654d8.tar.bz2 sharkey-0e4a111f81cceed275d9bec2695f6e401fb654d8.zip | |
refactoring
Resolve #7779
Diffstat (limited to 'packages/backend/src/server/api/streaming.ts')
| -rw-r--r-- | packages/backend/src/server/api/streaming.ts | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/packages/backend/src/server/api/streaming.ts b/packages/backend/src/server/api/streaming.ts new file mode 100644 index 0000000000..8808bc9860 --- /dev/null +++ b/packages/backend/src/server/api/streaming.ts @@ -0,0 +1,67 @@ +import * as http from 'http'; +import * as websocket from 'websocket'; + +import MainStreamConnection from './stream/index'; +import { ParsedUrlQuery } from 'querystring'; +import authenticate from './authenticate'; +import { EventEmitter } from 'events'; +import { subsdcriber as redisClient } from '../../db/redis'; +import { Users } from '@/models/index'; + +module.exports = (server: http.Server) => { + // Init websocket server + const ws = new websocket.server({ + httpServer: server + }); + + ws.on('request', async (request) => { + const q = request.resourceURL.query as ParsedUrlQuery; + + // TODO: トークンが間違ってるなどしてauthenticateに失敗したら + // コネクション切断するなりエラーメッセージ返すなりする + // (現状はエラーがキャッチされておらずサーバーのログに流れて邪魔なので) + const [user, app] = await authenticate(q.i as string); + + if (user?.isSuspended) { + request.reject(400); + return; + } + + const connection = request.accept(); + + const ev = new EventEmitter(); + + async function onRedisMessage(_: string, data: string) { + const parsed = JSON.parse(data); + ev.emit(parsed.channel, parsed.message); + } + + redisClient.on('message', onRedisMessage); + + const main = new MainStreamConnection(connection, ev, user, app); + + const intervalId = user ? setInterval(() => { + Users.update(user.id, { + lastActiveDate: new Date(), + }); + }, 1000 * 60 * 5) : null; + if (user) { + Users.update(user.id, { + lastActiveDate: new Date(), + }); + } + + connection.once('close', () => { + ev.removeAllListeners(); + main.dispose(); + redisClient.off('message', onRedisMessage); + if (intervalId) clearInterval(intervalId); + }); + + connection.on('message', async (data) => { + if (data.utf8Data === 'ping') { + connection.send('pong'); + } + }); + }); +}; |