summaryrefslogtreecommitdiff
path: root/packages/backend/src/server/api
diff options
context:
space:
mode:
authorJulia Johannesen <julia@insertdomain.name>2024-08-16 17:13:20 -0400
committerJulia Johannesen <julia@insertdomain.name>2024-08-16 17:13:20 -0400
commit6d3f9503ed1fd04718396b248cc5a753245c0f67 (patch)
tree54746985d2704c6ee089f25bbc5b6da7c15e0d35 /packages/backend/src/server/api
parentuse the correct remote address (diff)
downloadsharkey-6d3f9503ed1fd04718396b248cc5a753245c0f67.tar.gz
sharkey-6d3f9503ed1fd04718396b248cc5a753245c0f67.tar.bz2
sharkey-6d3f9503ed1fd04718396b248cc5a753245c0f67.zip
Limit number of rate limit requests
Diffstat (limited to 'packages/backend/src/server/api')
-rw-r--r--packages/backend/src/server/api/StreamingApiServerService.ts5
-rw-r--r--packages/backend/src/server/api/stream/Connection.ts26
2 files changed, 29 insertions, 2 deletions
diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts
index 1435169812..f48af45fb1 100644
--- a/packages/backend/src/server/api/StreamingApiServerService.ts
+++ b/packages/backend/src/server/api/StreamingApiServerService.ts
@@ -26,12 +26,15 @@ import proxyAddr from 'proxy-addr';
import ms from 'ms';
import type * as http from 'node:http';
import type { IEndpointMeta } from './endpoints.js';
+import { LoggerService } from '@/core/LoggerService.js';
+import type Logger from '@/logger.js';
@Injectable()
export class StreamingApiServerService {
#wss: WebSocket.WebSocketServer;
#connections = new Map<WebSocket.WebSocket, number>();
#cleanConnectionsIntervalId: NodeJS.Timeout | null = null;
+ #logger: Logger;
constructor(
@Inject(DI.redisForSub)
@@ -49,6 +52,7 @@ export class StreamingApiServerService {
private channelFollowingService: ChannelFollowingService,
private rateLimiterService: RateLimiterService,
private roleService: RoleService,
+ private loggerService: LoggerService,
) {
}
@@ -155,6 +159,7 @@ export class StreamingApiServerService {
this.notificationService,
this.cacheService,
this.channelFollowingService,
+ this.loggerService,
user, app,
rateLimiter,
);
diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts
index dfc6f0d298..0a7828d163 100644
--- a/packages/backend/src/server/api/stream/Connection.ts
+++ b/packages/backend/src/server/api/stream/Connection.ts
@@ -17,6 +17,8 @@ import { ChannelFollowingService } from '@/core/ChannelFollowingService.js';
import type { ChannelsService } from './ChannelsService.js';
import type { EventEmitter } from 'events';
import type Channel from './channel.js';
+import { LoggerService } from '@/core/LoggerService.js';
+import type Logger from '@/logger.js';
/**
* Main stream connection
@@ -39,6 +41,9 @@ export default class Connection {
public userIdsWhoMeMutingRenotes: Set<string> = new Set();
public userMutedInstances: Set<string> = new Set();
private fetchIntervalId: NodeJS.Timeout | null = null;
+ private activeRateLimitRequests: number = 0;
+ private closingConnection: boolean = false;
+ private logger: Logger;
constructor(
private channelsService: ChannelsService,
@@ -46,6 +51,7 @@ export default class Connection {
private notificationService: NotificationService,
private cacheService: CacheService,
private channelFollowingService: ChannelFollowingService,
+ private loggerService: LoggerService,
user: MiUser | null | undefined,
token: MiAccessToken | null | undefined,
@@ -54,6 +60,8 @@ export default class Connection {
if (user) this.user = user;
if (token) this.token = token;
if (rateLimiter) this.rateLimiter = rateLimiter;
+
+ this.logger = loggerService.getLogger('streaming', 'coral', false);
}
@bindThis
@@ -106,8 +114,22 @@ export default class Connection {
private async onWsConnectionMessage(data: WebSocket.RawData) {
let obj: Record<string, any>;
- if (this.rateLimiter && await this.rateLimiter()) {
- return;
+ if (this.closingConnection) return;
+
+ if (this.rateLimiter) {
+ if (this.activeRateLimitRequests <= 128) {
+ this.activeRateLimitRequests++;
+ const shouldRateLimit = await this.rateLimiter();
+ this.activeRateLimitRequests--;
+
+ if (shouldRateLimit) return;
+ if (this.closingConnection) return;
+ } else {
+ this.logger.warn('Closing a connection due to an excessive influx of messages.');
+ this.closingConnection = true;
+ this.wsConnection.close(1008, 'Please stop spamming the streaming API.');
+ return;
+ }
}
try {