From 311a31da58ebfc079653f860ea4cf4ed9a051d42 Mon Sep 17 00:00:00 2001 From: dakkar Date: Thu, 15 Aug 2024 11:35:51 +0100 Subject: rough rate limiting for websockets --- .../src/server/api/StreamingApiServerService.ts | 48 ++++++++++++++++++++++ 1 file changed, 48 insertions(+) (limited to 'packages/backend/src/server/api/StreamingApiServerService.ts') diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index b8f448477b..7ac1bcf469 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -19,7 +19,12 @@ import { ChannelFollowingService } from '@/core/ChannelFollowingService.js'; import { AuthenticateService, AuthenticationError } from './AuthenticateService.js'; import MainStreamConnection from './stream/Connection.js'; import { ChannelsService } from './stream/ChannelsService.js'; +import { RateLimiterService } from './RateLimiterService.js'; +import { RoleService } from '@/core/RoleService.js'; +import { getIpHash } from '@/misc/get-ip-hash.js'; +import ms from 'ms'; import type * as http from 'node:http'; +import type { IEndpointMeta } from './endpoints.js'; @Injectable() export class StreamingApiServerService { @@ -41,9 +46,32 @@ export class StreamingApiServerService { private notificationService: NotificationService, private usersService: UserService, private channelFollowingService: ChannelFollowingService, + private rateLimiterService: RateLimiterService, + private roleService: RoleService, ) { } + @bindThis + private async rateLimitThis( + user: MiLocalUser | null | undefined, + requestIp: string | undefined, + limit: IEndpointMeta['limit'] & { key: NonNullable }, + ) : Promise { + let limitActor: string; + if (user) { + limitActor = user.id; + } else { + limitActor = getIpHash(requestIp || 'wtf'); + } + + const factor = user ? (await this.roleService.getUserPolicies(user.id)).rateLimitFactor : 1; + + if (factor <= 0) return false; + + // Rate limit + return await this.rateLimiterService.limit(limit, limitActor, factor).then(() => { return false }).catch(err => { return true }); + } + @bindThis public attach(server: http.Server): void { this.#wss = new WebSocket.WebSocketServer({ @@ -57,6 +85,17 @@ export class StreamingApiServerService { return; } + if (await this.rateLimitThis(null, request.socket.remoteAddress, { + key: 'wsconnect', + duration: ms('1min'), + max: 20, + minInterval: ms('1sec'), + })) { + socket.write('HTTP/1.1 429 Rate Limit Exceeded\r\n\r\n'); + socket.destroy(); + return; + } + const q = new URL(request.url, `http://${request.headers.host}`).searchParams; let user: MiLocalUser | null = null; @@ -94,6 +133,14 @@ export class StreamingApiServerService { return; } + const rateLimiter = () => { + return this.rateLimitThis(user, request.socket.remoteAddress, { + key: 'wsmessage', + duration: ms('1sec'), + max: 100, + }); + }; + const stream = new MainStreamConnection( this.channelsService, this.noteReadService, @@ -101,6 +148,7 @@ export class StreamingApiServerService { this.cacheService, this.channelFollowingService, user, app, + rateLimiter, ); await stream.init(); -- cgit v1.2.3-freya From 4cd44130e0abd47f1f9c4b7fd74c5c49c16bd79c Mon Sep 17 00:00:00 2001 From: dakkar Date: Fri, 16 Aug 2024 18:00:50 +0100 Subject: use the correct remote address we're doing the same thing that Fastify does in the non-streaming ServerService --- packages/backend/package.json | 1 + .../backend/src/server/api/StreamingApiServerService.ts | 14 +++++++++++--- pnpm-lock.yaml | 3 +++ 3 files changed, 15 insertions(+), 3 deletions(-) (limited to 'packages/backend/src/server/api/StreamingApiServerService.ts') diff --git a/packages/backend/package.json b/packages/backend/package.json index 8e8d76bf23..65eda6153c 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -156,6 +156,7 @@ "pkce-challenge": "4.1.0", "probe-image-size": "7.2.3", "promise-limit": "2.7.0", + "proxy-addr": "^2.0.7", "pug": "3.0.2", "punycode": "2.3.1", "qrcode": "1.5.3", diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index 7ac1bcf469..1435169812 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -22,6 +22,7 @@ import { ChannelsService } from './stream/ChannelsService.js'; import { RateLimiterService } from './RateLimiterService.js'; import { RoleService } from '@/core/RoleService.js'; import { getIpHash } from '@/misc/get-ip-hash.js'; +import proxyAddr from 'proxy-addr'; import ms from 'ms'; import type * as http from 'node:http'; import type { IEndpointMeta } from './endpoints.js'; @@ -69,7 +70,9 @@ export class StreamingApiServerService { if (factor <= 0) return false; // Rate limit - return await this.rateLimiterService.limit(limit, limitActor, factor).then(() => { return false }).catch(err => { return true }); + return await this.rateLimiterService.limit(limit, limitActor, factor) + .then(() => { return false; }) + .catch(err => { return true; }); } @bindThis @@ -85,7 +88,12 @@ export class StreamingApiServerService { return; } - if (await this.rateLimitThis(null, request.socket.remoteAddress, { + // ServerServices sets `trustProxy: true`, which inside + // fastify/request.js ends up calling `proxyAddr` in this way, + // so we do the same + const requestIp = proxyAddr(request, () => { return true; } ); + + if (await this.rateLimitThis(null, requestIp, { key: 'wsconnect', duration: ms('1min'), max: 20, @@ -134,7 +142,7 @@ export class StreamingApiServerService { } const rateLimiter = () => { - return this.rateLimitThis(user, request.socket.remoteAddress, { + return this.rateLimitThis(user, requestIp, { key: 'wsmessage', duration: ms('1sec'), max: 100, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1f3cd8216f..d6e9f1196a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -352,6 +352,9 @@ importers: promise-limit: specifier: 2.7.0 version: 2.7.0 + proxy-addr: + specifier: ^2.0.7 + version: 2.0.7 pug: specifier: 3.0.2 version: 3.0.2 -- cgit v1.2.3-freya From 6d3f9503ed1fd04718396b248cc5a753245c0f67 Mon Sep 17 00:00:00 2001 From: Julia Johannesen Date: Fri, 16 Aug 2024 17:13:20 -0400 Subject: Limit number of rate limit requests --- .../src/server/api/StreamingApiServerService.ts | 5 +++++ .../backend/src/server/api/stream/Connection.ts | 26 ++++++++++++++++++++-- 2 files changed, 29 insertions(+), 2 deletions(-) (limited to 'packages/backend/src/server/api/StreamingApiServerService.ts') diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index 1435169812..f48af45fb1 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -26,12 +26,15 @@ import proxyAddr from 'proxy-addr'; import ms from 'ms'; import type * as http from 'node:http'; import type { IEndpointMeta } from './endpoints.js'; +import { LoggerService } from '@/core/LoggerService.js'; +import type Logger from '@/logger.js'; @Injectable() export class StreamingApiServerService { #wss: WebSocket.WebSocketServer; #connections = new Map(); #cleanConnectionsIntervalId: NodeJS.Timeout | null = null; + #logger: Logger; constructor( @Inject(DI.redisForSub) @@ -49,6 +52,7 @@ export class StreamingApiServerService { private channelFollowingService: ChannelFollowingService, private rateLimiterService: RateLimiterService, private roleService: RoleService, + private loggerService: LoggerService, ) { } @@ -155,6 +159,7 @@ export class StreamingApiServerService { this.notificationService, this.cacheService, this.channelFollowingService, + this.loggerService, user, app, rateLimiter, ); diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts index dfc6f0d298..0a7828d163 100644 --- a/packages/backend/src/server/api/stream/Connection.ts +++ b/packages/backend/src/server/api/stream/Connection.ts @@ -17,6 +17,8 @@ import { ChannelFollowingService } from '@/core/ChannelFollowingService.js'; import type { ChannelsService } from './ChannelsService.js'; import type { EventEmitter } from 'events'; import type Channel from './channel.js'; +import { LoggerService } from '@/core/LoggerService.js'; +import type Logger from '@/logger.js'; /** * Main stream connection @@ -39,6 +41,9 @@ export default class Connection { public userIdsWhoMeMutingRenotes: Set = new Set(); public userMutedInstances: Set = new Set(); private fetchIntervalId: NodeJS.Timeout | null = null; + private activeRateLimitRequests: number = 0; + private closingConnection: boolean = false; + private logger: Logger; constructor( private channelsService: ChannelsService, @@ -46,6 +51,7 @@ export default class Connection { private notificationService: NotificationService, private cacheService: CacheService, private channelFollowingService: ChannelFollowingService, + private loggerService: LoggerService, user: MiUser | null | undefined, token: MiAccessToken | null | undefined, @@ -54,6 +60,8 @@ export default class Connection { if (user) this.user = user; if (token) this.token = token; if (rateLimiter) this.rateLimiter = rateLimiter; + + this.logger = loggerService.getLogger('streaming', 'coral', false); } @bindThis @@ -106,8 +114,22 @@ export default class Connection { private async onWsConnectionMessage(data: WebSocket.RawData) { let obj: Record; - if (this.rateLimiter && await this.rateLimiter()) { - return; + if (this.closingConnection) return; + + if (this.rateLimiter) { + if (this.activeRateLimitRequests <= 128) { + this.activeRateLimitRequests++; + const shouldRateLimit = await this.rateLimiter(); + this.activeRateLimitRequests--; + + if (shouldRateLimit) return; + if (this.closingConnection) return; + } else { + this.logger.warn('Closing a connection due to an excessive influx of messages.'); + this.closingConnection = true; + this.wsConnection.close(1008, 'Please stop spamming the streaming API.'); + return; + } } try { -- cgit v1.2.3-freya From 9c1c1e9f099c25cf33f5804fdf3fba547ba73c92 Mon Sep 17 00:00:00 2001 From: Julia Johannesen Date: Sat, 17 Aug 2024 13:08:46 -0400 Subject: Fix logging stuff --- packages/backend/src/server/api/StreamingApiServerService.ts | 1 - packages/backend/src/server/api/stream/Connection.ts | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) (limited to 'packages/backend/src/server/api/StreamingApiServerService.ts') diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index f48af45fb1..db948122bf 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -34,7 +34,6 @@ export class StreamingApiServerService { #wss: WebSocket.WebSocketServer; #connections = new Map(); #cleanConnectionsIntervalId: NodeJS.Timeout | null = null; - #logger: Logger; constructor( @Inject(DI.redisForSub) diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts index e8cd557c1c..0914cdbb22 100644 --- a/packages/backend/src/server/api/stream/Connection.ts +++ b/packages/backend/src/server/api/stream/Connection.ts @@ -53,7 +53,7 @@ export default class Connection { private notificationService: NotificationService, private cacheService: CacheService, private channelFollowingService: ChannelFollowingService, - private loggerService: LoggerService, + loggerService: LoggerService, user: MiUser | null | undefined, token: MiAccessToken | null | undefined, -- cgit v1.2.3-freya From c5f7dcbb7e4be0c8d73e69df56df1c05e3413e24 Mon Sep 17 00:00:00 2001 From: Julia Johannesen Date: Sat, 17 Aug 2024 14:17:58 -0400 Subject: Come up with better limits --- packages/backend/src/server/api/StreamingApiServerService.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'packages/backend/src/server/api/StreamingApiServerService.ts') diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index db948122bf..19c78fd4d1 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -98,8 +98,8 @@ export class StreamingApiServerService { if (await this.rateLimitThis(null, requestIp, { key: 'wsconnect', - duration: ms('1min'), - max: 20, + duration: ms('5min'), + max: 32, minInterval: ms('1sec'), })) { socket.write('HTTP/1.1 429 Rate Limit Exceeded\r\n\r\n'); @@ -147,8 +147,8 @@ export class StreamingApiServerService { const rateLimiter = () => { return this.rateLimitThis(user, requestIp, { key: 'wsmessage', - duration: ms('1sec'), - max: 100, + duration: ms('5sec'), + max: 256, }); }; -- cgit v1.2.3-freya From 3dd993a76a5e5d87a0b31e1eff5093958f239021 Mon Sep 17 00:00:00 2001 From: Julia Johannesen Date: Sat, 17 Aug 2024 14:27:43 -0400 Subject: Add IP and user ID to connection close message --- packages/backend/src/server/api/StreamingApiServerService.ts | 2 +- packages/backend/src/server/api/stream/Connection.ts | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) (limited to 'packages/backend/src/server/api/StreamingApiServerService.ts') diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index 19c78fd4d1..2070ab6106 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -159,7 +159,7 @@ export class StreamingApiServerService { this.cacheService, this.channelFollowingService, this.loggerService, - user, app, + user, app, requestIp, rateLimiter, ); diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts index 0914cdbb22..b71a99b89e 100644 --- a/packages/backend/src/server/api/stream/Connection.ts +++ b/packages/backend/src/server/api/stream/Connection.ts @@ -57,6 +57,7 @@ export default class Connection { user: MiUser | null | undefined, token: MiAccessToken | null | undefined, + private ip: string, rateLimiter: () => Promise, ) { if (user) this.user = user; @@ -127,7 +128,10 @@ export default class Connection { if (shouldRateLimit) return; if (this.closingConnection) return; } else { - this.logger.warn('Closing a connection due to an excessive influx of messages.'); + let connectionInfo = `IP ${this.ip}`; + if (this.user) connectionInfo += `, user ID ${this.user.id}`; + + this.logger.warn(`Closing a connection (${connectionInfo}) due to an excessive influx of messages.`); this.closingConnection = true; this.wsConnection.close(1008, 'Please stop spamming the streaming API.'); return; -- cgit v1.2.3-freya From caa0fecdc98e7d1dd7e5c7efb15b86fc2659a80d Mon Sep 17 00:00:00 2001 From: dakkar Date: Sun, 18 Aug 2024 15:23:45 +0100 Subject: relax websocket rate limits * the frontend opens 2 websockets at startup (I'm not completely clear why), and that `minInterval:1sec` was breaking the second connection * as the comment says, "catching up" generates many noteSubscribe messages --- packages/backend/src/server/api/StreamingApiServerService.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'packages/backend/src/server/api/StreamingApiServerService.ts') diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index 2070ab6106..a2dafb2ebd 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -100,7 +100,6 @@ export class StreamingApiServerService { key: 'wsconnect', duration: ms('5min'), max: 32, - minInterval: ms('1sec'), })) { socket.write('HTTP/1.1 429 Rate Limit Exceeded\r\n\r\n'); socket.destroy(); @@ -145,10 +144,14 @@ export class StreamingApiServerService { } const rateLimiter = () => { + // rather high limit, because when catching up at the top of a + // timeline, the frontend may render many many notes, each of + // which causes a message via `useNoteCapture` to ask for + // realtime updates of that note return this.rateLimitThis(user, requestIp, { key: 'wsmessage', - duration: ms('5sec'), - max: 256, + duration: ms('2sec'), + max: 4090, }); }; -- cgit v1.2.3-freya From 77144b058c48fc7b5f51ad992151c20447cf28d9 Mon Sep 17 00:00:00 2001 From: dakkar Date: Sun, 18 Aug 2024 17:57:51 +0100 Subject: make the cap of `activeRateLimitRequests` match the rate limit It's trivial to have more than 128 requests in flight: open a busy timeline, scroll a bit down, wait for many notes to arrive, scroll to the top. The frontend will send "subscribe to note" messages for each new note that it accumulated, all at once. We don't want to shut down the connection in those common cases! --- packages/backend/src/server/api/StreamingApiServerService.ts | 2 +- packages/backend/src/server/api/stream/Connection.ts | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'packages/backend/src/server/api/StreamingApiServerService.ts') diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index a2dafb2ebd..9b8464f705 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -151,7 +151,7 @@ export class StreamingApiServerService { return this.rateLimitThis(user, requestIp, { key: 'wsmessage', duration: ms('2sec'), - max: 4090, + max: 4096, }); }; diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts index b71a99b89e..7ea92eb797 100644 --- a/packages/backend/src/server/api/stream/Connection.ts +++ b/packages/backend/src/server/api/stream/Connection.ts @@ -120,7 +120,9 @@ export default class Connection { if (this.closingConnection) return; if (this.rateLimiter) { - if (this.activeRateLimitRequests <= 128) { + // this 4096 should match the `max` of the `rateLimiter`, see + // StreamingApiServerService + if (this.activeRateLimitRequests <= 4096) { this.activeRateLimitRequests++; const shouldRateLimit = await this.rateLimiter(); this.activeRateLimitRequests--; -- cgit v1.2.3-freya