From f92fb3bb8c70b5ca35f4f61a437b101bd5bae7e8 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Wed, 5 Feb 2025 11:16:39 -0500 Subject: move SkRateLimiterService to correct directory --- .../backend/src/server/SkRateLimiterService.ts | 280 +++++++++++++++++++++ 1 file changed, 280 insertions(+) create mode 100644 packages/backend/src/server/SkRateLimiterService.ts (limited to 'packages/backend/src/server/SkRateLimiterService.ts') diff --git a/packages/backend/src/server/SkRateLimiterService.ts b/packages/backend/src/server/SkRateLimiterService.ts new file mode 100644 index 0000000000..038f12cb25 --- /dev/null +++ b/packages/backend/src/server/SkRateLimiterService.ts @@ -0,0 +1,280 @@ +/* + * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Inject, Injectable } from '@nestjs/common'; +import Redis from 'ioredis'; +import type { TimeService } from '@/core/TimeService.js'; +import type { EnvService } from '@/core/EnvService.js'; +import { BucketRateLimit, LegacyRateLimit, LimitInfo, RateLimit, hasMinLimit, isLegacyRateLimit, Keyed, hasMaxLimit, disabledLimitInfo, MaxLegacyLimit, MinLegacyLimit } from '@/misc/rate-limit-utils.js'; +import { DI } from '@/di-symbols.js'; +import { MemoryKVCache } from '@/misc/cache.js'; +import type { MiUser } from '@/models/_.js'; +import type { RoleService } from '@/core/RoleService.js'; + +// Sentinel value used for caching the default role template. +// Required because MemoryKVCache doesn't support null keys. +const defaultUserKey = ''; + +@Injectable() +export class SkRateLimiterService { + // 1-minute cache interval + private readonly factorCache = new MemoryKVCache(1000 * 60); + private readonly disabled: boolean; + + constructor( + @Inject('TimeService') + private readonly timeService: TimeService, + + @Inject(DI.redis) + private readonly redisClient: Redis.Redis, + + @Inject('RoleService') + private readonly roleService: RoleService, + + @Inject('EnvService') + envService: EnvService, + ) { + this.disabled = envService.env.NODE_ENV === 'test'; + } + + /** + * Check & increment a rate limit for a client. + * + * If the client (actorOrUser) is passed as a string, then it uses the default rate limit factor from the role template. + * If the client (actorOrUser) is passed as an MiUser, then it queries the user's actual rate limit factor from their assigned roles. + * + * A factor of zero (0) will disable the limit, while any negative number will produce an error. + * A factor between zero (0) and one (1) will increase the limit from its default values (allowing more actions per time interval). + * A factor greater than one (1) will decrease the limit from its default values (allowing fewer actions per time interval). + * + * @param limit The limit definition + * @param actorOrUser authenticated client user or IP hash + */ + public async limit(limit: Keyed, actorOrUser: string | MiUser): Promise { + if (this.disabled) { + return disabledLimitInfo; + } + + const actor = typeof(actorOrUser) === 'object' ? actorOrUser.id : actorOrUser; + const userCacheKey = typeof(actorOrUser) === 'object' ? actorOrUser.id : defaultUserKey; + const userRoleKey = typeof(actorOrUser) === 'object' ? actorOrUser.id : null; + const factor = this.factorCache.get(userCacheKey) ?? await this.factorCache.fetch(userCacheKey, async () => { + const role = await this.roleService.getUserPolicies(userRoleKey); + return role.rateLimitFactor; + }); + + if (factor === 0) { + return disabledLimitInfo; + } + + if (factor < 0) { + throw new Error(`Rate limit factor is zero or negative: ${factor}`); + } + + if (isLegacyRateLimit(limit)) { + return await this.limitLegacy(limit, actor, factor); + } else { + return await this.limitBucket(limit, actor, factor); + } + } + + private async limitLegacy(limit: Keyed, actor: string, factor: number): Promise { + if (hasMaxLimit(limit)) { + return await this.limitLegacyMinMax(limit, actor, factor); + } else if (hasMinLimit(limit)) { + return await this.limitLegacyMinOnly(limit, actor, factor); + } else { + return disabledLimitInfo; + } + } + + private async limitLegacyMinMax(limit: Keyed, actor: string, factor: number): Promise { + if (limit.duration === 0) return disabledLimitInfo; + if (limit.duration < 0) throw new Error(`Invalid rate limit ${limit.key}: duration is negative (${limit.duration})`); + if (limit.max < 1) throw new Error(`Invalid rate limit ${limit.key}: max is less than 1 (${limit.max})`); + + // Derive initial dripRate from minInterval OR duration/max. + const initialDripRate = Math.max(limit.minInterval ?? Math.round(limit.duration / limit.max), 1); + + // Calculate dripSize to reach max at exactly duration + const dripSize = Math.max(Math.round(limit.max / (limit.duration / initialDripRate)), 1); + + // Calculate final dripRate from dripSize and duration/max + const dripRate = Math.max(Math.round(limit.duration / (limit.max / dripSize)), 1); + + const bucketLimit: Keyed = { + type: 'bucket', + key: limit.key, + size: limit.max, + dripRate, + dripSize, + }; + return await this.limitBucket(bucketLimit, actor, factor); + } + + private async limitLegacyMinOnly(limit: Keyed, actor: string, factor: number): Promise { + if (limit.minInterval === 0) return disabledLimitInfo; + if (limit.minInterval < 0) throw new Error(`Invalid rate limit ${limit.key}: minInterval is negative (${limit.minInterval})`); + + const dripRate = Math.max(Math.round(limit.minInterval), 1); + const bucketLimit: Keyed = { + type: 'bucket', + key: limit.key, + size: 1, + dripRate, + dripSize: 1, + }; + return await this.limitBucket(bucketLimit, actor, factor); + } + + /** + * Implementation of Leaky Bucket rate limiting - see SkRateLimiterService.md for details. + */ + private async limitBucket(limit: Keyed, actor: string, factor: number): Promise { + if (limit.size < 1) throw new Error(`Invalid rate limit ${limit.key}: size is less than 1 (${limit.size})`); + if (limit.dripRate != null && limit.dripRate < 1) throw new Error(`Invalid rate limit ${limit.key}: dripRate is less than 1 (${limit.dripRate})`); + if (limit.dripSize != null && limit.dripSize < 1) throw new Error(`Invalid rate limit ${limit.key}: dripSize is less than 1 (${limit.dripSize})`); + + // 0 - Calculate + const now = this.timeService.now; + const bucketSize = Math.max(Math.ceil(limit.size / factor), 1); + const dripRate = Math.ceil(limit.dripRate ?? 1000); + const dripSize = Math.ceil(limit.dripSize ?? 1); + const expirationSec = Math.max(Math.ceil((dripRate * Math.ceil(bucketSize / dripSize)) / 1000), 1); + + // 1 - Read + const counterKey = createLimitKey(limit, actor, 'c'); + const timestampKey = createLimitKey(limit, actor, 't'); + const counter = await this.getLimitCounter(counterKey, timestampKey); + + // 2 - Drip + const dripsSinceLastTick = Math.floor((now - counter.timestamp) / dripRate) * dripSize; + const deltaCounter = Math.min(dripsSinceLastTick, counter.counter); + const deltaTimestamp = dripsSinceLastTick * dripRate; + if (deltaCounter > 0) { + // Execute the next drip(s) + const results = await this.executeRedisMulti( + ['get', timestampKey], + ['incrby', timestampKey, deltaTimestamp], + ['expire', timestampKey, expirationSec], + ['get', timestampKey], + ['decrby', counterKey, deltaCounter], + ['expire', counterKey, expirationSec], + ['get', counterKey], + ); + const expectedTimestamp = counter.timestamp; + const canaryTimestamp = results[0] ? parseInt(results[0]) : 0; + counter.timestamp = results[3] ? parseInt(results[3]) : 0; + counter.counter = results[6] ? parseInt(results[6]) : 0; + + // Check for a data collision and rollback + if (canaryTimestamp !== expectedTimestamp) { + const rollbackResults = await this.executeRedisMulti( + ['decrby', timestampKey, deltaTimestamp], + ['get', timestampKey], + ['incrby', counterKey, deltaCounter], + ['get', counterKey], + ); + counter.timestamp = rollbackResults[1] ? parseInt(rollbackResults[1]) : 0; + counter.counter = rollbackResults[3] ? parseInt(rollbackResults[3]) : 0; + } + } + + // 3 - Check + const blocked = counter.counter >= bucketSize; + if (!blocked) { + if (counter.timestamp === 0) { + const results = await this.executeRedisMulti( + ['set', timestampKey, now], + ['expire', timestampKey, expirationSec], + ['incr', counterKey], + ['expire', counterKey, expirationSec], + ['get', counterKey], + ); + counter.timestamp = now; + counter.counter = results[4] ? parseInt(results[4]) : 0; + } else { + const results = await this.executeRedisMulti( + ['incr', counterKey], + ['expire', counterKey, expirationSec], + ['get', counterKey], + ); + counter.counter = results[2] ? parseInt(results[2]) : 0; + } + } + + // Calculate how much time is needed to free up a bucket slot + const overflow = Math.max((counter.counter + 1) - bucketSize, 0); + const dripsNeeded = Math.ceil(overflow / dripSize); + const timeNeeded = Math.max((dripRate * dripsNeeded) - (this.timeService.now - counter.timestamp), 0); + + // Calculate limit status + const remaining = Math.max(bucketSize - counter.counter, 0); + const resetMs = timeNeeded; + const resetSec = Math.ceil(resetMs / 1000); + const fullResetMs = Math.ceil(counter.counter / dripSize) * dripRate; + const fullResetSec = Math.ceil(fullResetMs / 1000); + return { blocked, remaining, resetSec, resetMs, fullResetSec, fullResetMs }; + } + + private async getLimitCounter(counterKey: string, timestampKey: string): Promise { + const [counter, timestamp] = await this.executeRedisMulti( + ['get', counterKey], + ['get', timestampKey], + ); + + return { + counter: counter ? parseInt(counter) : 0, + timestamp: timestamp ? parseInt(timestamp) : 0, + }; + } + + private async executeRedisMulti(...batch: RedisCommand[]): Promise { + const results = await this.redisClient.multi(batch).exec(); + + // Transaction conflict (retryable) + if (!results) { + throw new ConflictError('Redis error: transaction conflict'); + } + + // Transaction failed (fatal) + if (results.length !== batch.length) { + throw new Error('Redis error: failed to execute batch'); + } + + // Map responses + const errors: Error[] = []; + const responses: RedisResult[] = []; + for (const [error, response] of results) { + if (error) errors.push(error); + responses.push(response as RedisResult); + } + + // Command failed (fatal) + if (errors.length > 0) { + const errorMessages = errors + .map((e, i) => `Error in command ${i}: ${e}`) + .join('\', \''); + throw new AggregateError(errors, `Redis error: failed to execute command(s): '${errorMessages}'`); + } + + return responses; + } +} + +// Not correct, but good enough for the basic commands we use. +type RedisResult = string | null; +type RedisCommand = [command: string, ...args: unknown[]]; + +function createLimitKey(limit: Keyed, actor: string, value: string): string { + return `rl_${actor}_${limit.key}_${value}`; +} + +class ConflictError extends Error {} + +interface LimitCounter { + timestamp: number; + counter: number; +} -- cgit v1.2.3-freya From 788751d24d91afd1f97b92b633c2177ee38398f4 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Tue, 18 Feb 2025 10:36:29 -0500 Subject: implement redisForRateLimit --- .config/ci.yml | 10 ++++++++++ .config/cypress-devcontainer.yml | 10 ++++++++++ .config/docker_example.yml | 10 ++++++++++ .config/example.yml | 10 ++++++++++ packages/backend/scripts/check_connect.js | 1 + packages/backend/src/GlobalModule.ts | 14 ++++++++++++-- packages/backend/src/config.ts | 5 ++++- packages/backend/src/di-symbols.ts | 1 + packages/backend/src/server/SkRateLimiterService.md | 1 + packages/backend/src/server/SkRateLimiterService.ts | 2 +- 10 files changed, 60 insertions(+), 4 deletions(-) (limited to 'packages/backend/src/server/SkRateLimiterService.ts') diff --git a/.config/ci.yml b/.config/ci.yml index 311a98d8fb..d8c365a47e 100644 --- a/.config/ci.yml +++ b/.config/ci.yml @@ -103,6 +103,16 @@ redis: # #prefix: example-prefix # #db: 1 +#redisForRateLimit: +# host: localhost +# port: 6379 +# #family: 0 # 0=Both, 4=IPv4, 6=IPv6 +# #pass: example-pass +# #prefix: example-prefix +# #db: 1 +# # You can specify more ioredis options... +# #username: example-username + # ┌───────────────────────────────┐ #───┘ Fulltext search configuration └───────────────────────────── diff --git a/.config/cypress-devcontainer.yml b/.config/cypress-devcontainer.yml index 391bc9998c..02442360fe 100644 --- a/.config/cypress-devcontainer.yml +++ b/.config/cypress-devcontainer.yml @@ -124,6 +124,16 @@ redis: # #prefix: example-prefix # #db: 1 +#redisForRateLimit: +# host: localhost +# port: 6379 +# #family: 0 # 0=Both, 4=IPv4, 6=IPv6 +# #pass: example-pass +# #prefix: example-prefix +# #db: 1 +# # You can specify more ioredis options... +# #username: example-username + # ┌───────────────────────────────┐ #───┘ Fulltext search configuration └───────────────────────────── diff --git a/.config/docker_example.yml b/.config/docker_example.yml index 1e03e902bf..fee5e35b72 100644 --- a/.config/docker_example.yml +++ b/.config/docker_example.yml @@ -171,6 +171,16 @@ redis: # #prefix: example-prefix # #db: 1 +#redisForRateLimit: +# host: localhost +# port: 6379 +# #family: 0 # 0=Both, 4=IPv4, 6=IPv6 +# #pass: example-pass +# #prefix: example-prefix +# #db: 1 +# # You can specify more ioredis options... +# #username: example-username + # ┌───────────────────────────────┐ #───┘ Fulltext search configuration └───────────────────────────── diff --git a/.config/example.yml b/.config/example.yml index 7d4cd0c659..2a1afd8481 100644 --- a/.config/example.yml +++ b/.config/example.yml @@ -198,6 +198,16 @@ redis: # # You can specify more ioredis options... # #username: example-username +#redisForRateLimit: +# host: localhost +# port: 6379 +# #family: 0 # 0=Both, 4=IPv4, 6=IPv6 +# #pass: example-pass +# #prefix: example-prefix +# #db: 1 +# # You can specify more ioredis options... +# #username: example-username + # ┌───────────────────────────────┐ #───┘ Fulltext search configuration └───────────────────────────── diff --git a/packages/backend/scripts/check_connect.js b/packages/backend/scripts/check_connect.js index 17b198ef62..b15ce51ec8 100644 --- a/packages/backend/scripts/check_connect.js +++ b/packages/backend/scripts/check_connect.js @@ -51,6 +51,7 @@ const promises = Array config.redisForJobQueue, config.redisForTimelines, config.redisForReactions, + config.redisForRateLimit, ])) .map(connectToRedis) .concat([ diff --git a/packages/backend/src/GlobalModule.ts b/packages/backend/src/GlobalModule.ts index ace7f7841c..7ca566477d 100644 --- a/packages/backend/src/GlobalModule.ts +++ b/packages/backend/src/GlobalModule.ts @@ -92,6 +92,14 @@ const $redisForReactions: Provider = { inject: [DI.config], }; +const $redisForRateLimit: Provider = { + provide: DI.redisForRateLimit, + useFactory: (config: Config) => { + return new Redis.Redis(config.redisForRateLimit); + }, + inject: [DI.config], +}; + const $meta: Provider = { provide: DI.meta, useFactory: async (db: DataSource, redisForSub: Redis.Redis) => { @@ -152,8 +160,8 @@ const $meta: Provider = { @Global() @Module({ imports: [RepositoryModule], - providers: [$config, $db, $meta, $meilisearch, $redis, $redisForPub, $redisForSub, $redisForTimelines, $redisForReactions], - exports: [$config, $db, $meta, $meilisearch, $redis, $redisForPub, $redisForSub, $redisForTimelines, $redisForReactions, RepositoryModule], + providers: [$config, $db, $meta, $meilisearch, $redis, $redisForPub, $redisForSub, $redisForTimelines, $redisForReactions, $redisForRateLimit], + exports: [$config, $db, $meta, $meilisearch, $redis, $redisForPub, $redisForSub, $redisForTimelines, $redisForReactions, $redisForRateLimit, RepositoryModule], }) export class GlobalModule implements OnApplicationShutdown { constructor( @@ -163,6 +171,7 @@ export class GlobalModule implements OnApplicationShutdown { @Inject(DI.redisForSub) private redisForSub: Redis.Redis, @Inject(DI.redisForTimelines) private redisForTimelines: Redis.Redis, @Inject(DI.redisForReactions) private redisForReactions: Redis.Redis, + @Inject(DI.redisForRateLimit) private redisForRateLimit: Redis.Redis, ) { } public async dispose(): Promise { @@ -176,6 +185,7 @@ export class GlobalModule implements OnApplicationShutdown { this.redisForSub.disconnect(), this.redisForTimelines.disconnect(), this.redisForReactions.disconnect(), + this.redisForRateLimit.disconnect(), ]); } diff --git a/packages/backend/src/config.ts b/packages/backend/src/config.ts index d35befdc2b..45aa1c1e22 100644 --- a/packages/backend/src/config.ts +++ b/packages/backend/src/config.ts @@ -53,6 +53,7 @@ type Source = { redisForJobQueue?: RedisOptionsSource; redisForTimelines?: RedisOptionsSource; redisForReactions?: RedisOptionsSource; + redisForRateLimit?: RedisOptionsSource; fulltextSearch?: { provider?: FulltextSearchProvider; }; @@ -225,6 +226,7 @@ export type Config = { redisForJobQueue: RedisOptions & RedisOptionsSource; redisForTimelines: RedisOptions & RedisOptionsSource; redisForReactions: RedisOptions & RedisOptionsSource; + redisForRateLimit: RedisOptions & RedisOptionsSource; sentryForBackend: { options: Partial; enableNodeProfiling: boolean; } | undefined; sentryForFrontend: { options: Partial } | undefined; perChannelMaxNoteCacheCount: number; @@ -333,6 +335,7 @@ export function loadConfig(): Config { redisForJobQueue: config.redisForJobQueue ? convertRedisOptions(config.redisForJobQueue, host) : redis, redisForTimelines: config.redisForTimelines ? convertRedisOptions(config.redisForTimelines, host) : redis, redisForReactions: config.redisForReactions ? convertRedisOptions(config.redisForReactions, host) : redis, + redisForRateLimit: config.redisForRateLimit ? convertRedisOptions(config.redisForRateLimit, host) : redis, sentryForBackend: config.sentryForBackend, sentryForFrontend: config.sentryForFrontend, id: config.id, @@ -518,7 +521,7 @@ function applyEnvOverrides(config: Source) { _apply_top(['db', ['host', 'port', 'db', 'user', 'pass', 'disableCache']]); _apply_top(['dbSlaves', Array.from((config.dbSlaves ?? []).keys()), ['host', 'port', 'db', 'user', 'pass']]); _apply_top([ - ['redis', 'redisForPubsub', 'redisForJobQueue', 'redisForTimelines', 'redisForReactions'], + ['redis', 'redisForPubsub', 'redisForJobQueue', 'redisForTimelines', 'redisForReactions', 'redisForRateLimit'], ['host', 'port', 'username', 'pass', 'db', 'prefix'], ]); _apply_top(['fulltextSearch', 'provider']); diff --git a/packages/backend/src/di-symbols.ts b/packages/backend/src/di-symbols.ts index 296cc4815b..71f627dc4e 100644 --- a/packages/backend/src/di-symbols.ts +++ b/packages/backend/src/di-symbols.ts @@ -13,6 +13,7 @@ export const DI = { redisForSub: Symbol('redisForSub'), redisForTimelines: Symbol('redisForTimelines'), redisForReactions: Symbol('redisForReactions'), + redisForRateLimit: Symbol('redisForRateLimit'), //#region Repositories usersRepository: Symbol('usersRepository'), diff --git a/packages/backend/src/server/SkRateLimiterService.md b/packages/backend/src/server/SkRateLimiterService.md index fb007538fa..c8a2b4e85c 100644 --- a/packages/backend/src/server/SkRateLimiterService.md +++ b/packages/backend/src/server/SkRateLimiterService.md @@ -39,6 +39,7 @@ The first call is read-only, while the others perform at least one write operati Two integer keys are stored per client/subject, and both expire together after the maximum duration of the limit. While performance has not been formally tested, it's expected that SkRateLimiterService has an impact roughly on par with the legacy RateLimiterService. Redis memory usage should be notably lower due to the reduced number of keys and avoidance of set / array constructions. +If redis load does become a concern, then a dedicated node can be assigned via the `redisForRateLimit` config setting. ## Concurrency and Multi-Node Correctness diff --git a/packages/backend/src/server/SkRateLimiterService.ts b/packages/backend/src/server/SkRateLimiterService.ts index 038f12cb25..30bf092e4f 100644 --- a/packages/backend/src/server/SkRateLimiterService.ts +++ b/packages/backend/src/server/SkRateLimiterService.ts @@ -27,7 +27,7 @@ export class SkRateLimiterService { @Inject('TimeService') private readonly timeService: TimeService, - @Inject(DI.redis) + @Inject(DI.redisForRateLimit) private readonly redisClient: Redis.Redis, @Inject('RoleService') -- cgit v1.2.3-freya