summaryrefslogtreecommitdiff
path: root/packages/backend/src/server/api
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/server/api')
-rw-r--r--packages/backend/src/server/api/StreamingApiServerService.ts62
-rw-r--r--packages/backend/src/server/api/stream/Connection.ts39
2 files changed, 100 insertions, 1 deletions
diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts
index b8f448477b..2070ab6106 100644
--- a/packages/backend/src/server/api/StreamingApiServerService.ts
+++ b/packages/backend/src/server/api/StreamingApiServerService.ts
@@ -19,7 +19,15 @@ import { ChannelFollowingService } from '@/core/ChannelFollowingService.js';
import { AuthenticateService, AuthenticationError } from './AuthenticateService.js';
import MainStreamConnection from './stream/Connection.js';
import { ChannelsService } from './stream/ChannelsService.js';
+import { RateLimiterService } from './RateLimiterService.js';
+import { RoleService } from '@/core/RoleService.js';
+import { getIpHash } from '@/misc/get-ip-hash.js';
+import proxyAddr from 'proxy-addr';
+import ms from 'ms';
import type * as http from 'node:http';
+import type { IEndpointMeta } from './endpoints.js';
+import { LoggerService } from '@/core/LoggerService.js';
+import type Logger from '@/logger.js';
@Injectable()
export class StreamingApiServerService {
@@ -41,10 +49,36 @@ export class StreamingApiServerService {
private notificationService: NotificationService,
private usersService: UserService,
private channelFollowingService: ChannelFollowingService,
+ private rateLimiterService: RateLimiterService,
+ private roleService: RoleService,
+ private loggerService: LoggerService,
) {
}
@bindThis
+ private async rateLimitThis(
+ user: MiLocalUser | null | undefined,
+ requestIp: string | undefined,
+ limit: IEndpointMeta['limit'] & { key: NonNullable<string> },
+ ) : Promise<boolean> {
+ let limitActor: string;
+ if (user) {
+ limitActor = user.id;
+ } else {
+ limitActor = getIpHash(requestIp || 'wtf');
+ }
+
+ const factor = user ? (await this.roleService.getUserPolicies(user.id)).rateLimitFactor : 1;
+
+ if (factor <= 0) return false;
+
+ // Rate limit
+ return await this.rateLimiterService.limit(limit, limitActor, factor)
+ .then(() => { return false; })
+ .catch(err => { return true; });
+ }
+
+ @bindThis
public attach(server: http.Server): void {
this.#wss = new WebSocket.WebSocketServer({
noServer: true,
@@ -57,6 +91,22 @@ export class StreamingApiServerService {
return;
}
+ // ServerServices sets `trustProxy: true`, which inside
+ // fastify/request.js ends up calling `proxyAddr` in this way,
+ // so we do the same
+ const requestIp = proxyAddr(request, () => { return true; } );
+
+ if (await this.rateLimitThis(null, requestIp, {
+ key: 'wsconnect',
+ duration: ms('5min'),
+ max: 32,
+ minInterval: ms('1sec'),
+ })) {
+ socket.write('HTTP/1.1 429 Rate Limit Exceeded\r\n\r\n');
+ socket.destroy();
+ return;
+ }
+
const q = new URL(request.url, `http://${request.headers.host}`).searchParams;
let user: MiLocalUser | null = null;
@@ -94,13 +144,23 @@ export class StreamingApiServerService {
return;
}
+ const rateLimiter = () => {
+ return this.rateLimitThis(user, requestIp, {
+ key: 'wsmessage',
+ duration: ms('5sec'),
+ max: 256,
+ });
+ };
+
const stream = new MainStreamConnection(
this.channelsService,
this.noteReadService,
this.notificationService,
this.cacheService,
this.channelFollowingService,
- user, app,
+ this.loggerService,
+ user, app, requestIp,
+ rateLimiter,
);
await stream.init();
diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts
index c24459c1e1..8254dcb84f 100644
--- a/packages/backend/src/server/api/stream/Connection.ts
+++ b/packages/backend/src/server/api/stream/Connection.ts
@@ -18,6 +18,10 @@ import type { JsonObject } from '@/misc/json-value.js';
import type { ChannelsService } from './ChannelsService.js';
import type { EventEmitter } from 'events';
import type Channel from './channel.js';
+import { LoggerService } from '@/core/LoggerService.js';
+import type Logger from '@/logger.js';
+
+const MAX_CHANNELS_PER_CONNECTION = 32;
/**
* Main stream connection
@@ -26,6 +30,7 @@ import type Channel from './channel.js';
export default class Connection {
public user?: MiUser;
public token?: MiAccessToken;
+ private rateLimiter?: () => Promise<boolean>;
private wsConnection: WebSocket.WebSocket;
public subscriber: StreamEventEmitter;
private channels: Channel[] = [];
@@ -39,6 +44,9 @@ export default class Connection {
public userIdsWhoMeMutingRenotes: Set<string> = new Set();
public userMutedInstances: Set<string> = new Set();
private fetchIntervalId: NodeJS.Timeout | null = null;
+ private activeRateLimitRequests: number = 0;
+ private closingConnection: boolean = false;
+ private logger: Logger;
constructor(
private channelsService: ChannelsService,
@@ -46,12 +54,18 @@ export default class Connection {
private notificationService: NotificationService,
private cacheService: CacheService,
private channelFollowingService: ChannelFollowingService,
+ loggerService: LoggerService,
user: MiUser | null | undefined,
token: MiAccessToken | null | undefined,
+ private ip: string,
+ rateLimiter: () => Promise<boolean>,
) {
if (user) this.user = user;
if (token) this.token = token;
+ if (rateLimiter) this.rateLimiter = rateLimiter;
+
+ this.logger = loggerService.getLogger('streaming', 'coral');
}
@bindThis
@@ -104,6 +118,27 @@ export default class Connection {
private async onWsConnectionMessage(data: WebSocket.RawData) {
let obj: JsonObject;
+ if (this.closingConnection) return;
+
+ if (this.rateLimiter) {
+ if (this.activeRateLimitRequests <= 128) {
+ this.activeRateLimitRequests++;
+ const shouldRateLimit = await this.rateLimiter();
+ this.activeRateLimitRequests--;
+
+ if (shouldRateLimit) return;
+ if (this.closingConnection) return;
+ } else {
+ let connectionInfo = `IP ${this.ip}`;
+ if (this.user) connectionInfo += `, user ID ${this.user.id}`;
+
+ this.logger.warn(`Closing a connection (${connectionInfo}) due to an excessive influx of messages.`);
+ this.closingConnection = true;
+ this.wsConnection.close(1008, 'Please stop spamming the streaming API.');
+ return;
+ }
+ }
+
try {
obj = JSON.parse(data.toString());
} catch (e) {
@@ -263,6 +298,10 @@ export default class Connection {
*/
@bindThis
public connectChannel(id: string, params: JsonObject | undefined, channel: string, pong = false) {
+ if (this.channels.length >= MAX_CHANNELS_PER_CONNECTION) {
+ return;
+ }
+
const channelService = this.channelsService.getChannelService(channel);
if (channelService.requireCredential && this.user == null) {