diff options
| author | Julia <julia@insertdomain.name> | 2025-05-29 00:07:38 +0000 |
|---|---|---|
| committer | Julia <julia@insertdomain.name> | 2025-05-29 00:07:38 +0000 |
| commit | 6b554c178b81f13f83a69b19d44b72b282a0c119 (patch) | |
| tree | f5537f1a56323a4dd57ba150b3cb84a2d8b5dc63 /packages/backend/src/server/api/StreamingApiServerService.ts | |
| parent | merge: Security fixes (!970) (diff) | |
| parent | bump version for release (diff) | |
| download | sharkey-6b554c178b81f13f83a69b19d44b72b282a0c119.tar.gz sharkey-6b554c178b81f13f83a69b19d44b72b282a0c119.tar.bz2 sharkey-6b554c178b81f13f83a69b19d44b72b282a0c119.zip | |
merge: release 2025.4.2 (!1051)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/1051
Approved-by: Hazelnoot <acomputerdog@gmail.com>
Approved-by: Marie <github@yuugi.dev>
Approved-by: Julia <julia@insertdomain.name>
Diffstat (limited to 'packages/backend/src/server/api/StreamingApiServerService.ts')
| -rw-r--r-- | packages/backend/src/server/api/StreamingApiServerService.ts | 106 |
1 files changed, 69 insertions, 37 deletions
diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index 6e7abcfae6..eaeaecb1c2 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -10,8 +10,9 @@ import * as WebSocket from 'ws'; import proxyAddr from 'proxy-addr'; import ms from 'ms'; import { DI } from '@/di-symbols.js'; -import type { UsersRepository, MiAccessToken } from '@/models/_.js'; -import { NoteReadService } from '@/core/NoteReadService.js'; +import type { UsersRepository, MiAccessToken, MiUser } from '@/models/_.js'; +import type { Config } from '@/config.js'; +import type { Keyed, RateLimit } from '@/misc/rate-limit-utils.js'; import { NotificationService } from '@/core/NotificationService.js'; import { bindThis } from '@/decorators.js'; import { CacheService } from '@/core/CacheService.js'; @@ -25,12 +26,16 @@ import { AuthenticateService, AuthenticationError } from './AuthenticateService. import MainStreamConnection from './stream/Connection.js'; import { ChannelsService } from './stream/ChannelsService.js'; import type * as http from 'node:http'; -import type { IEndpointMeta } from './endpoints.js'; + +// Maximum number of simultaneous connections by client (user ID or IP address). +// Excess connections will be closed automatically. +const MAX_CONNECTIONS_PER_CLIENT = 32; @Injectable() export class StreamingApiServerService { #wss: WebSocket.WebSocketServer; #connections = new Map<WebSocket.WebSocket, number>(); + #connectionsByClient = new Map<string, Set<WebSocket.WebSocket>>(); // key: IP / user ID -> value: connection #cleanConnectionsIntervalId: NodeJS.Timeout | null = null; constructor( @@ -41,7 +46,6 @@ export class StreamingApiServerService { private usersRepository: UsersRepository, private cacheService: CacheService, - private noteReadService: NoteReadService, private authenticateService: AuthenticateService, private channelsService: ChannelsService, private notificationService: NotificationService, @@ -49,22 +53,17 @@ export class StreamingApiServerService { private channelFollowingService: ChannelFollowingService, private rateLimiterService: SkRateLimiterService, private loggerService: LoggerService, + + @Inject(DI.config) + private config: Config, ) { } @bindThis private async rateLimitThis( - user: MiLocalUser | null | undefined, - requestIp: string, - limit: IEndpointMeta['limit'] & { key: NonNullable<string> }, + limitActor: MiUser | string, + limit: Keyed<RateLimit>, ) : Promise<boolean> { - let limitActor: string | MiLocalUser; - if (user) { - limitActor = user; - } else { - limitActor = getIpHash(requestIp); - } - // Rate limit const rateLimit = await this.rateLimiterService.limit(limit, limitActor); return rateLimit.blocked; @@ -74,6 +73,7 @@ export class StreamingApiServerService { public attach(server: http.Server): void { this.#wss = new WebSocket.WebSocketServer({ noServer: true, + perMessageDeflate: this.config.websocketCompression, }); server.on('upgrade', async (request, socket, head) => { @@ -83,21 +83,6 @@ export class StreamingApiServerService { return; } - // ServerServices sets `trustProxy: true`, which inside - // fastify/request.js ends up calling `proxyAddr` in this way, - // so we do the same - const requestIp = proxyAddr(request, () => { return true; } ); - - if (await this.rateLimitThis(null, requestIp, { - key: 'wsconnect', - duration: ms('5min'), - max: 32, - })) { - socket.write('HTTP/1.1 429 Rate Limit Exceeded\r\n\r\n'); - socket.destroy(); - return; - } - const q = new URL(request.url, `http://${request.headers.host}`).searchParams; let user: MiLocalUser | null = null; @@ -135,21 +120,55 @@ export class StreamingApiServerService { return; } + // ServerServices sets `trustProxy: true`, which inside fastify/request.js ends up calling `proxyAddr` in this way, so we do the same. + const requestIp = proxyAddr(request, () => true ); + const limitActor = user?.id ?? getIpHash(requestIp); + if (await this.rateLimitThis(limitActor, { + // Up to 32 connections, then 1 every 10 seconds + type: 'bucket', + key: 'wsconnect', + size: 32, + dripRate: 10 * 1000, + })) { + socket.write('HTTP/1.1 429 Rate Limit Exceeded\r\n\r\n'); + socket.destroy(); + return; + } + + // For performance and code simplicity, obtain and hold this reference for the lifetime of the connection. + // This should be safe because the map entry should only be deleted after *all* connections close. + let connectionsForClient = this.#connectionsByClient.get(limitActor); + if (!connectionsForClient) { + connectionsForClient = new Set(); + this.#connectionsByClient.set(limitActor, connectionsForClient); + } + + // Close excess connections + while (connectionsForClient.size >= MAX_CONNECTIONS_PER_CLIENT) { + // Set maintains insertion order, so first entry is the oldest. + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const oldestConnection = connectionsForClient.values().next().value!; + + // Technically, the close() handler should remove this entry. + // But if that ever fails, then we could enter an infinite loop. + // We manually remove the connection here just in case. + oldestConnection.close(1008, 'Disconnected - too many simultaneous connections'); + connectionsForClient.delete(oldestConnection); + } + const rateLimiter = () => { - // rather high limit, because when catching up at the top of a - // timeline, the frontend may render many many notes, each of - // which causes a message via `useNoteCapture` to ask for - // realtime updates of that note - return this.rateLimitThis(user, requestIp, { + // Rather high limit because when catching up at the top of a timeline, the frontend may render many many notes. + // Each of which causes a message via `useNoteCapture` to ask for realtime updates of that note. + return this.rateLimitThis(limitActor, { + type: 'bucket', key: 'wsmessage', - duration: ms('2sec'), - max: 4096, + size: 4096, // Allow spikes of up to 4096 + dripRate: 50, // Then once every 50ms (20/second rate) }); }; const stream = new MainStreamConnection( this.channelsService, - this.noteReadService, this.notificationService, this.cacheService, this.channelFollowingService, @@ -161,6 +180,19 @@ export class StreamingApiServerService { await stream.init(); this.#wss.handleUpgrade(request, socket, head, (ws) => { + connectionsForClient.add(ws); + + // Call before emit() in case it throws an error. + // We don't want to leave dangling references! + ws.once('close', () => { + connectionsForClient.delete(ws); + + // Make sure we don't leak the Set objects! + if (connectionsForClient.size < 1) { + this.#connectionsByClient.delete(limitActor); + } + }); + this.#wss.emit('connection', ws, request, { stream, user, app, }); |