summaryrefslogtreecommitdiff
path: root/packages/backend/src/server/api/StreamingApiServerService.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/server/api/StreamingApiServerService.ts')
-rw-r--r--packages/backend/src/server/api/StreamingApiServerService.ts23
1 files changed, 21 insertions, 2 deletions
diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts
index fdda581ada..893dfe956e 100644
--- a/packages/backend/src/server/api/StreamingApiServerService.ts
+++ b/packages/backend/src/server/api/StreamingApiServerService.ts
@@ -19,6 +19,8 @@ import type * as http from 'node:http';
@Injectable()
export class StreamingApiServerService {
#wss: WebSocket.WebSocketServer;
+ #connections = new Map<WebSocket.WebSocket, number>();
+ #cleanConnectionsIntervalId: NodeJS.Timeout | null = null;
constructor(
@Inject(DI.config)
@@ -109,7 +111,9 @@ export class StreamingApiServerService {
await stream.listen(ev, connection);
- const intervalId = user ? setInterval(() => {
+ this.#connections.set(connection, Date.now());
+
+ const userUpdateIntervalId = user ? setInterval(() => {
this.usersRepository.update(user.id, {
lastActiveDate: new Date(),
});
@@ -124,19 +128,34 @@ export class StreamingApiServerService {
ev.removeAllListeners();
stream.dispose();
this.redisForSub.off('message', onRedisMessage);
- if (intervalId) clearInterval(intervalId);
+ if (userUpdateIntervalId) clearInterval(userUpdateIntervalId);
});
connection.on('message', async (data) => {
+ this.#connections.set(connection, Date.now());
if (data.toString() === 'ping') {
connection.send('pong');
}
});
});
+
+ this.#cleanConnectionsIntervalId = setInterval(() => {
+ const now = Date.now();
+ for (const [connection, lastActive] of this.#connections.entries()) {
+ if (now - lastActive > 1000 * 60 * 5) {
+ connection.terminate();
+ this.#connections.delete(connection);
+ }
+ }
+ }, 1000 * 60 * 5);
}
@bindThis
public detach(): Promise<void> {
+ if (this.#cleanConnectionsIntervalId) {
+ clearInterval(this.#cleanConnectionsIntervalId);
+ this.#cleanConnectionsIntervalId = null;
+ }
return new Promise((resolve) => {
this.#wss.close(() => resolve());
});