diff options
Diffstat (limited to 'packages/backend/src/server/api/stream/Connection.ts')
| -rw-r--r-- | packages/backend/src/server/api/stream/Connection.ts | 39 |
1 files changed, 39 insertions, 0 deletions
diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts index c24459c1e1..8254dcb84f 100644 --- a/packages/backend/src/server/api/stream/Connection.ts +++ b/packages/backend/src/server/api/stream/Connection.ts @@ -18,6 +18,10 @@ import type { JsonObject } from '@/misc/json-value.js'; import type { ChannelsService } from './ChannelsService.js'; import type { EventEmitter } from 'events'; import type Channel from './channel.js'; +import { LoggerService } from '@/core/LoggerService.js'; +import type Logger from '@/logger.js'; + +const MAX_CHANNELS_PER_CONNECTION = 32; /** * Main stream connection @@ -26,6 +30,7 @@ import type Channel from './channel.js'; export default class Connection { public user?: MiUser; public token?: MiAccessToken; + private rateLimiter?: () => Promise<boolean>; private wsConnection: WebSocket.WebSocket; public subscriber: StreamEventEmitter; private channels: Channel[] = []; @@ -39,6 +44,9 @@ export default class Connection { public userIdsWhoMeMutingRenotes: Set<string> = new Set(); public userMutedInstances: Set<string> = new Set(); private fetchIntervalId: NodeJS.Timeout | null = null; + private activeRateLimitRequests: number = 0; + private closingConnection: boolean = false; + private logger: Logger; constructor( private channelsService: ChannelsService, @@ -46,12 +54,18 @@ export default class Connection { private notificationService: NotificationService, private cacheService: CacheService, private channelFollowingService: ChannelFollowingService, + loggerService: LoggerService, user: MiUser | null | undefined, token: MiAccessToken | null | undefined, + private ip: string, + rateLimiter: () => Promise<boolean>, ) { if (user) this.user = user; if (token) this.token = token; + if (rateLimiter) this.rateLimiter = rateLimiter; + + this.logger = loggerService.getLogger('streaming', 'coral'); } @bindThis @@ -104,6 +118,27 @@ export default class Connection { private async onWsConnectionMessage(data: WebSocket.RawData) { let obj: JsonObject; + if (this.closingConnection) return; + + if (this.rateLimiter) { + if (this.activeRateLimitRequests <= 128) { + this.activeRateLimitRequests++; + const shouldRateLimit = await this.rateLimiter(); + this.activeRateLimitRequests--; + + if (shouldRateLimit) return; + if (this.closingConnection) return; + } else { + let connectionInfo = `IP ${this.ip}`; + if (this.user) connectionInfo += `, user ID ${this.user.id}`; + + this.logger.warn(`Closing a connection (${connectionInfo}) due to an excessive influx of messages.`); + this.closingConnection = true; + this.wsConnection.close(1008, 'Please stop spamming the streaming API.'); + return; + } + } + try { obj = JSON.parse(data.toString()); } catch (e) { @@ -263,6 +298,10 @@ export default class Connection { */ @bindThis public connectChannel(id: string, params: JsonObject | undefined, channel: string, pong = false) { + if (this.channels.length >= MAX_CHANNELS_PER_CONNECTION) { + return; + } + const channelService = this.channelsService.getChannelService(channel); if (channelService.requireCredential && this.user == null) { |