From 18655386f3013512ae543e6cf161dcf471fa6a68 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Thu, 27 Mar 2025 10:55:22 -0400 Subject: convert streaming rate limit to bucket --- .../src/server/api/StreamingApiServerService.ts | 35 +++++++++------------- 1 file changed, 14 insertions(+), 21 deletions(-) (limited to 'packages/backend/src/server/api/StreamingApiServerService.ts') diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index 0954744f81..1c2569bf8d 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -10,7 +10,9 @@ import * as WebSocket from 'ws'; import proxyAddr from 'proxy-addr'; import ms from 'ms'; import { DI } from '@/di-symbols.js'; -import type { UsersRepository, MiAccessToken } from '@/models/_.js'; +import type { UsersRepository, MiAccessToken, MiUser } from '@/models/_.js'; +import type { Config } from '@/config.js'; +import type { Keyed, RateLimit } from '@/misc/rate-limit-utils.js'; import { NoteReadService } from '@/core/NoteReadService.js'; import { NotificationService } from '@/core/NotificationService.js'; import { bindThis } from '@/decorators.js'; @@ -25,8 +27,6 @@ import { AuthenticateService, AuthenticationError } from './AuthenticateService. import MainStreamConnection from './stream/Connection.js'; import { ChannelsService } from './stream/ChannelsService.js'; import type * as http from 'node:http'; -import type { IEndpointMeta } from './endpoints.js'; -import type { Config } from "@/config.js"; @Injectable() export class StreamingApiServerService { @@ -58,17 +58,9 @@ export class StreamingApiServerService { @bindThis private async rateLimitThis( - user: MiLocalUser | null | undefined, - requestIp: string, - limit: IEndpointMeta['limit'] & { key: NonNullable }, + limitActor: MiUser | string, + limit: Keyed, ) : Promise { - let limitActor: string | MiLocalUser; - if (user) { - limitActor = user; - } else { - limitActor = getIpHash(requestIp); - } - // Rate limit const rateLimit = await this.rateLimiterService.limit(limit, limitActor); return rateLimit.blocked; @@ -93,7 +85,8 @@ export class StreamingApiServerService { // so we do the same const requestIp = proxyAddr(request, () => { return true; } ); - if (await this.rateLimitThis(null, requestIp, { + const limitActor = getIpHash(requestIp); + if (await this.rateLimitThis(limitActor, { key: 'wsconnect', duration: ms('5min'), max: 32, @@ -141,14 +134,14 @@ export class StreamingApiServerService { } const rateLimiter = () => { - // rather high limit, because when catching up at the top of a - // timeline, the frontend may render many many notes, each of - // which causes a message via `useNoteCapture` to ask for - // realtime updates of that note - return this.rateLimitThis(user, requestIp, { + const limitActor = user ?? getIpHash(requestIp); + + // Rather high limit because when catching up at the top of a timeline, the frontend may render many many notes. + // Each of which causes a message via `useNoteCapture` to ask for realtime updates of that note. + return this.rateLimitThis(limitActor, { key: 'wsmessage', - duration: ms('2sec'), - max: 4096, + max: 4096, // Allow spikes of up to 4096 + dripRate: 50, // Then once every 50ms (20/second rate) }); }; -- cgit v1.2.3-freya From c41d617e6364d34021ea10f7ee9bc081b6d3a244 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Thu, 27 Mar 2025 19:33:32 -0400 Subject: limit the number of active connections per client, and limit upgrade requests by user --- .../src/server/api/StreamingApiServerService.ts | 70 ++++++++++++++++------ 1 file changed, 52 insertions(+), 18 deletions(-) (limited to 'packages/backend/src/server/api/StreamingApiServerService.ts') diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index 1c2569bf8d..c7db4549d3 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -28,10 +28,15 @@ import MainStreamConnection from './stream/Connection.js'; import { ChannelsService } from './stream/ChannelsService.js'; import type * as http from 'node:http'; +// Maximum number of simultaneous connections by client (user ID or IP address). +// Excess connections will be closed automatically. +const MAX_CONNECTIONS_PER_CLIENT = 32; + @Injectable() export class StreamingApiServerService { #wss: WebSocket.WebSocketServer; #connections = new Map(); + #connectionsByClient = new Map>(); // key: IP / user ID -> value: connection #cleanConnectionsIntervalId: NodeJS.Timeout | null = null; constructor( @@ -80,22 +85,6 @@ export class StreamingApiServerService { return; } - // ServerServices sets `trustProxy: true`, which inside - // fastify/request.js ends up calling `proxyAddr` in this way, - // so we do the same - const requestIp = proxyAddr(request, () => { return true; } ); - - const limitActor = getIpHash(requestIp); - if (await this.rateLimitThis(limitActor, { - key: 'wsconnect', - duration: ms('5min'), - max: 32, - })) { - socket.write('HTTP/1.1 429 Rate Limit Exceeded\r\n\r\n'); - socket.destroy(); - return; - } - const q = new URL(request.url, `http://${request.headers.host}`).searchParams; let user: MiLocalUser | null = null; @@ -133,9 +122,41 @@ export class StreamingApiServerService { return; } - const rateLimiter = () => { - const limitActor = user ?? getIpHash(requestIp); + // ServerServices sets `trustProxy: true`, which inside fastify/request.js ends up calling `proxyAddr` in this way, so we do the same. + const requestIp = proxyAddr(request, () => true ); + const limitActor = user?.id ?? getIpHash(requestIp); + if (await this.rateLimitThis(limitActor, { + key: 'wsconnect', + duration: ms('5min'), + max: 32, + })) { + socket.write('HTTP/1.1 429 Rate Limit Exceeded\r\n\r\n'); + socket.destroy(); + return; + } + + // For performance and code simplicity, obtain and hold this reference for the lifetime of the connection. + // This should be safe because the map entry should only be deleted after *all* connections close. + let connectionsForClient = this.#connectionsByClient.get(limitActor); + if (!connectionsForClient) { + connectionsForClient = new Set(); + this.#connectionsByClient.set(limitActor, connectionsForClient); + } + // Close excess connections + while (connectionsForClient.size >= MAX_CONNECTIONS_PER_CLIENT) { + // Set maintains insertion order, so first entry is the oldest. + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const oldestConnection = connectionsForClient.values().next().value!; + + // Technically, the close() handler should remove this entry. + // But if that ever fails, then we could enter an infinite loop. + // We manually remove the connection here just in case. + oldestConnection.close(1008, 'Disconnected - too many simultaneous connections'); + connectionsForClient.delete(oldestConnection); + } + + const rateLimiter = () => { // Rather high limit because when catching up at the top of a timeline, the frontend may render many many notes. // Each of which causes a message via `useNoteCapture` to ask for realtime updates of that note. return this.rateLimitThis(limitActor, { @@ -159,6 +180,19 @@ export class StreamingApiServerService { await stream.init(); this.#wss.handleUpgrade(request, socket, head, (ws) => { + connectionsForClient.add(ws); + + // Call before emit() in case it throws an error. + // We don't want to leave dangling references! + ws.once('close', () => { + connectionsForClient.delete(ws); + + // Make sure we don't leak the Set objects! + if (connectionsForClient.size < 1) { + this.#connectionsByClient.delete(limitActor); + } + }); + this.#wss.emit('connection', ws, request, { stream, user, app, }); -- cgit v1.2.3-freya From 47ea8527fd175c55a6d0128b91aced13ea442135 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Sat, 29 Mar 2025 09:44:38 -0400 Subject: fix wsmessage rate limit definition --- packages/backend/src/server/api/StreamingApiServerService.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'packages/backend/src/server/api/StreamingApiServerService.ts') diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index c7db4549d3..d86deef1d7 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -160,8 +160,9 @@ export class StreamingApiServerService { // Rather high limit because when catching up at the top of a timeline, the frontend may render many many notes. // Each of which causes a message via `useNoteCapture` to ask for realtime updates of that note. return this.rateLimitThis(limitActor, { + type: 'bucket', key: 'wsmessage', - max: 4096, // Allow spikes of up to 4096 + size: 4096, // Allow spikes of up to 4096 dripRate: 50, // Then once every 50ms (20/second rate) }); }; -- cgit v1.2.3-freya