summaryrefslogtreecommitdiff
path: root/packages/backend
diff options
context:
space:
mode:
authorHazelnoot <acomputerdog@gmail.com>2024-12-09 19:04:06 -0500
committerHazelnoot <acomputerdog@gmail.com>2024-12-09 19:04:06 -0500
commit9daafca155682281f567c9b4da8f3af3564aa281 (patch)
tree405cd7273c04407f929ef1ca4a9768513087fe0d /packages/backend
parentmerge: Implement new SkRateLimiterServer with Leaky Bucket rate limits (resol... (diff)
downloadsharkey-9daafca155682281f567c9b4da8f3af3564aa281.tar.gz
sharkey-9daafca155682281f567c9b4da8f3af3564aa281.tar.bz2
sharkey-9daafca155682281f567c9b4da8f3af3564aa281.zip
fix rate limits under multi-node environments
Diffstat (limited to 'packages/backend')
-rw-r--r--packages/backend/src/server/api/SkRateLimiterService.ts185
-rw-r--r--packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts544
2 files changed, 456 insertions, 273 deletions
diff --git a/packages/backend/src/server/api/SkRateLimiterService.ts b/packages/backend/src/server/api/SkRateLimiterService.ts
index 6415ee905c..05166ed93c 100644
--- a/packages/backend/src/server/api/SkRateLimiterService.ts
+++ b/packages/backend/src/server/api/SkRateLimiterService.ts
@@ -5,16 +5,13 @@
import { Inject, Injectable } from '@nestjs/common';
import Redis from 'ioredis';
-import { LoggerService } from '@/core/LoggerService.js';
import { TimeService } from '@/core/TimeService.js';
import { EnvService } from '@/core/EnvService.js';
import { DI } from '@/di-symbols.js';
-import type Logger from '@/logger.js';
import { BucketRateLimit, LegacyRateLimit, LimitInfo, RateLimit, hasMinLimit, isLegacyRateLimit, Keyed } from '@/misc/rate-limit-utils.js';
@Injectable()
export class SkRateLimiterService {
- private readonly logger: Logger;
private readonly disabled: boolean;
constructor(
@@ -24,14 +21,10 @@ export class SkRateLimiterService {
@Inject(DI.redis)
private readonly redisClient: Redis.Redis,
- @Inject(LoggerService)
- loggerService: LoggerService,
-
@Inject(EnvService)
envService: EnvService,
) {
- this.logger = loggerService.getLogger('limiter');
- this.disabled = envService.env.NODE_ENV !== 'production'; // TODO disable in TEST *only*
+ this.disabled = envService.env.NODE_ENV !== 'production';
}
public async limit(limit: Keyed<RateLimit>, actor: string, factor = 1): Promise<LimitInfo> {
@@ -50,10 +43,25 @@ export class SkRateLimiterService {
throw new Error(`Rate limit factor is zero or negative: ${factor}`);
}
- if (isLegacyRateLimit(limit)) {
- return await this.limitLegacy(limit, actor, factor);
- } else {
- return await this.limitBucket(limit, actor, factor);
+ return await this.tryLimit(limit, actor, factor);
+ }
+
+ private async tryLimit(limit: Keyed<RateLimit>, actor: string, factor: number, retry = 1): Promise<LimitInfo> {
+ try {
+ if (isLegacyRateLimit(limit)) {
+ return await this.limitLegacy(limit, actor, factor);
+ } else {
+ return await this.limitBucket(limit, actor, factor);
+ }
+ } catch (err) {
+ // We may experience collision errors from optimistic locking.
+ // This is expected, so we should retry a few times before giving up.
+ // https://redis.io/docs/latest/develop/interact/transactions/#optimistic-locking-using-check-and-set
+ if (err instanceof TransactionError && retry < 3) {
+ return await this.tryLimit(limit, actor, factor, retry + 1);
+ }
+
+ throw err;
}
}
@@ -94,36 +102,30 @@ export class SkRateLimiterService {
if (limit.minInterval === 0) return null;
if (limit.minInterval < 0) throw new Error(`Invalid rate limit ${limit.key}: minInterval is negative (${limit.minInterval})`);
- const counter = await this.getLimitCounter(limit, actor, 'min');
const minInterval = Math.max(Math.ceil(limit.minInterval * factor), 0);
+ const expirationSec = Math.max(Math.ceil(minInterval / 1000), 1);
- // Update expiration
- if (counter.c > 0) {
- const isCleared = this.timeService.now - counter.t >= minInterval;
+ // Check for window clear
+ const counter = await this.getLimitCounter(limit, actor, 'min');
+ if (counter.counter > 0) {
+ const isCleared = this.timeService.now - counter.timestamp >= minInterval;
if (isCleared) {
- counter.c = 0;
+ counter.counter = 0;
}
}
- const blocked = counter.c > 0;
+ // Increment the limit, then synchronize with redis
+ const blocked = counter.counter > 0;
if (!blocked) {
- counter.c++;
- counter.t = this.timeService.now;
+ counter.counter++;
+ counter.timestamp = this.timeService.now;
+ await this.updateLimitCounter(limit, actor, 'min', expirationSec, counter);
}
// Calculate limit status
- const resetMs = Math.max(Math.ceil(minInterval - (this.timeService.now - counter.t)), 0);
+ const resetMs = Math.max(minInterval - (this.timeService.now - counter.timestamp), 0);
const resetSec = Math.ceil(resetMs / 1000);
- const limitInfo: LimitInfo = { blocked, remaining: 0, resetSec, resetMs, fullResetSec: resetSec, fullResetMs: resetMs };
-
- // Update the limit counter, but not if blocked
- if (!blocked) {
- // Don't await, or we will slow down the API.
- this.setLimitCounter(limit, actor, counter, resetSec, 'min')
- .catch(err => this.logger.error(`Failed to update limit ${limit.key}:min for ${actor}:`, err));
- }
-
- return limitInfo;
+ return { blocked, remaining: 0, resetSec, resetMs, fullResetSec: resetSec, fullResetMs: resetMs };
}
private async limitBucket(limit: Keyed<BucketRateLimit>, actor: string, factor: number): Promise<LimitInfo> {
@@ -131,68 +133,113 @@ export class SkRateLimiterService {
if (limit.dripRate != null && limit.dripRate < 1) throw new Error(`Invalid rate limit ${limit.key}: dripRate is less than 1 (${limit.dripRate})`);
if (limit.dripSize != null && limit.dripSize < 1) throw new Error(`Invalid rate limit ${limit.key}: dripSize is less than 1 (${limit.dripSize})`);
- const counter = await this.getLimitCounter(limit, actor, 'bucket');
const bucketSize = Math.max(Math.ceil(limit.size / factor), 1);
const dripRate = Math.ceil(limit.dripRate ?? 1000);
const dripSize = Math.ceil(limit.dripSize ?? 1);
+ const expirationSec = Math.max(Math.ceil(bucketSize / dripRate), 1);
- // Update drips
- if (counter.c > 0) {
- const dripsSinceLastTick = Math.floor((this.timeService.now - counter.t) / dripRate) * dripSize;
- counter.c = Math.max(counter.c - dripsSinceLastTick, 0);
+ // Simulate bucket drips
+ const counter = await this.getLimitCounter(limit, actor, 'bucket');
+ if (counter.counter > 0) {
+ const dripsSinceLastTick = Math.floor((this.timeService.now - counter.timestamp) / dripRate) * dripSize;
+ counter.counter = Math.max(counter.counter - dripsSinceLastTick, 0);
}
- const blocked = counter.c >= bucketSize;
+ // Increment the limit, then synchronize with redis
+ const blocked = counter.counter >= bucketSize;
if (!blocked) {
- counter.c++;
- counter.t = this.timeService.now;
+ counter.counter++;
+ counter.timestamp = this.timeService.now;
+ await this.updateLimitCounter(limit, actor, 'bucket', expirationSec, counter);
}
+ // Calculate how much time is needed to free up a bucket slot
+ const overflow = Math.max((counter.counter + 1) - bucketSize, 0);
+ const dripsNeeded = Math.ceil(overflow / dripSize);
+ const timeNeeded = Math.max((dripRate * dripsNeeded) - (this.timeService.now - counter.timestamp), 0);
+
// Calculate limit status
- const remaining = Math.max(bucketSize - counter.c, 0);
- const resetMs = remaining > 0 ? 0 : Math.max(dripRate - (this.timeService.now - counter.t), 0);
+ const remaining = Math.max(bucketSize - counter.counter, 0);
+ const resetMs = timeNeeded;
const resetSec = Math.ceil(resetMs / 1000);
- const fullResetMs = Math.ceil(counter.c / dripSize) * dripRate;
+ const fullResetMs = Math.ceil(counter.counter / dripSize) * dripRate;
const fullResetSec = Math.ceil(fullResetMs / 1000);
- const limitInfo: LimitInfo = { blocked, remaining, resetSec, resetMs, fullResetSec, fullResetMs };
+ return { blocked, remaining, resetSec, resetMs, fullResetSec, fullResetMs };
+ }
- // Update the limit counter, but not if blocked
- if (!blocked) {
- // Don't await, or we will slow down the API.
- this.setLimitCounter(limit, actor, counter, fullResetSec, 'bucket')
- .catch(err => this.logger.error(`Failed to update limit ${limit.key} for ${actor}:`, err));
- }
+ private async getLimitCounter(limit: Keyed<RateLimit>, actor: string, subject: string): Promise<LimitCounter> {
+ const timestampKey = createLimitKey(limit, actor, subject, 't');
+ const counterKey = createLimitKey(limit, actor, subject, 'c');
+
+ const [timestamp, counter] = await this.executeRedis(
+ [
+ ['get', timestampKey],
+ ['get', counterKey],
+ ],
+ [
+ timestampKey,
+ counterKey,
+ ],
+ );
- return limitInfo;
+ return {
+ timestamp: timestamp ? parseInt(timestamp) : 0,
+ counter: counter ? parseInt(counter) : 0,
+ };
}
- private async getLimitCounter(limit: Keyed<RateLimit>, actor: string, subject: string): Promise<LimitCounter> {
- const key = createLimitKey(limit, actor, subject);
+ private async updateLimitCounter(limit: Keyed<RateLimit>, actor: string, subject: string, expirationSec: number, counter: LimitCounter): Promise<void> {
+ const timestampKey = createLimitKey(limit, actor, subject, 't');
+ const counterKey = createLimitKey(limit, actor, subject, 'c');
- const value = await this.redisClient.get(key);
- if (value == null) {
- return { t: 0, c: 0 };
+ await this.executeRedis(
+ [
+ ['set', timestampKey, counter.timestamp.toString(), 'EX', expirationSec],
+ ['set', counterKey, counter.counter.toString(), 'EX', expirationSec],
+ ],
+ [
+ timestampKey,
+ counterKey,
+ ],
+ );
+ }
+
+ private async executeRedis<Num extends number>(batch: RedisBatch<Num>, watch: string[]): Promise<RedisResults<Num>> {
+ const results = await this.redisClient
+ .multi(batch)
+ .watch(watch)
+ .exec();
+
+ // Transaction error
+ if (!results) {
+ throw new TransactionError('Redis error: transaction conflict');
}
- return JSON.parse(value);
- }
+ // The entire call failed
+ if (results.length !== batch.length) {
+ throw new Error('Redis error: failed to execute batch');
+ }
- private async setLimitCounter(limit: Keyed<RateLimit>, actor: string, counter: LimitCounter, expiration: number, subject: string): Promise<void> {
- const key = createLimitKey(limit, actor, subject);
- const value = JSON.stringify(counter);
- const expirationSec = Math.max(expiration, 1);
- await this.redisClient.set(key, value, 'EX', expirationSec);
+ // A particular command failed
+ const errors = results.map(r => r[0]).filter(e => e != null);
+ if (errors.length > 0) {
+ throw new AggregateError(errors, `Redis error: failed to execute command(s): '${errors.join('\', \'')}'`);
+ }
+
+ return results.map(r => r[1]) as RedisResults<Num>;
}
}
-function createLimitKey(limit: Keyed<RateLimit>, actor: string, subject: string): string {
- return `rl_${actor}_${limit.key}_${subject}`;
+type RedisBatch<Num extends number> = [string, ...unknown[]][] & { length: Num };
+type RedisResults<Num extends number> = (string | null)[] & { length: Num };
+
+function createLimitKey(limit: Keyed<RateLimit>, actor: string, subject: string, value: string): string {
+ return `rl_${actor}_${limit.key}_${subject}_${value}`;
}
-export interface LimitCounter {
- /** Timestamp */
- t: number;
+class TransactionError extends Error {}
- /** Counter */
- c: number;
+interface LimitCounter {
+ timestamp: number;
+ counter: number;
}
diff --git a/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts b/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts
index dbf7795fc6..871c9afa64 100644
--- a/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts
+++ b/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts
@@ -3,25 +3,19 @@
* SPDX-License-Identifier: AGPL-3.0-only
*/
-import { KEYWORD } from 'color-convert/conversions.js';
-import { jest } from '@jest/globals';
import type Redis from 'ioredis';
-import { LimitCounter, SkRateLimiterService } from '@/server/api/SkRateLimiterService.js';
-import { LoggerService } from '@/core/LoggerService.js';
+import { SkRateLimiterService } from '@/server/api/SkRateLimiterService.js';
import { BucketRateLimit, Keyed, LegacyRateLimit } from '@/misc/rate-limit-utils.js';
/* eslint-disable @typescript-eslint/no-non-null-assertion */
-/* eslint-disable @typescript-eslint/no-unnecessary-condition */
describe(SkRateLimiterService, () => {
let mockTimeService: { now: number, date: Date } = null!;
- let mockRedisGet: ((key: string) => string | null) | undefined = undefined;
- let mockRedisSet: ((args: unknown[]) => void) | undefined = undefined;
+ let mockRedis: Array<(command: [string, ...unknown[]]) => [Error | null, unknown] | null> = null!;
+ let mockRedisExec: (batch: [string, ...unknown[]][]) => Promise<[Error | null, unknown][] | null> = null!;
let mockEnvironment: Record<string, string | undefined> = null!;
let serviceUnderTest: () => SkRateLimiterService = null!;
- let loggedMessages: { level: string, data: unknown[] }[] = [];
-
beforeEach(() => {
mockTimeService = {
now: 0,
@@ -30,16 +24,26 @@ describe(SkRateLimiterService, () => {
},
};
- mockRedisGet = undefined;
- mockRedisSet = undefined;
+ mockRedis = [];
+ mockRedisExec = (batch) => {
+ const results: [Error | null, unknown][] = batch.map(command => {
+ const handlerResults = mockRedis.map(handler => handler(command));
+ const finalResult = handlerResults.findLast(result => result != null);
+ return finalResult ?? [new Error('test error: no handler'), null];
+ });
+ return Promise.resolve(results);
+ };
const mockRedisClient = {
- get(key: string) {
- if (mockRedisGet) return Promise.resolve(mockRedisGet(key));
- else return Promise.resolve(null);
- },
- set(...args: unknown[]): Promise<void> {
- if (mockRedisSet) mockRedisSet(args);
- return Promise.resolve();
+ multi(batch: [string, ...unknown[]][]) {
+ return {
+ watch() {
+ return {
+ exec() {
+ return mockRedisExec(batch);
+ },
+ };
+ },
+ };
},
} as unknown as Redis.Redis;
@@ -49,89 +53,77 @@ describe(SkRateLimiterService, () => {
env: mockEnvironment,
};
- loggedMessages = [];
- const mockLogService = {
- getLogger() {
- return {
- createSubLogger(context: string, color?: KEYWORD) {
- return mockLogService.getLogger(context, color);
- },
- error(...data: unknown[]) {
- loggedMessages.push({ level: 'error', data });
- },
- warn(...data: unknown[]) {
- loggedMessages.push({ level: 'warn', data });
- },
- succ(...data: unknown[]) {
- loggedMessages.push({ level: 'succ', data });
- },
- debug(...data: unknown[]) {
- loggedMessages.push({ level: 'debug', data });
- },
- info(...data: unknown[]) {
- loggedMessages.push({ level: 'info', data });
- },
- };
- },
- } as unknown as LoggerService;
-
let service: SkRateLimiterService | undefined = undefined;
serviceUnderTest = () => {
- return service ??= new SkRateLimiterService(mockTimeService, mockRedisClient, mockLogService, mockEnvService);
+ return service ??= new SkRateLimiterService(mockTimeService, mockRedisClient, mockEnvService);
};
});
- function expectNoUnhandledErrors() {
- const unhandledErrors = loggedMessages.filter(m => m.level === 'error');
- if (unhandledErrors.length > 0) {
- throw new Error(`Test failed: got unhandled errors ${unhandledErrors.join('\n')}`);
- }
- }
-
describe('limit', () => {
const actor = 'actor';
const key = 'test';
- let counter: LimitCounter | undefined = undefined;
- let minCounter: LimitCounter | undefined = undefined;
+ let limitCounter: number | undefined = undefined;
+ let limitTimestamp: number | undefined = undefined;
+ let minCounter: number | undefined = undefined;
+ let minTimestamp: number | undefined = undefined;
beforeEach(() => {
- counter = undefined;
+ limitCounter = undefined;
+ limitTimestamp = undefined;
minCounter = undefined;
+ minTimestamp = undefined;
- mockRedisGet = (key: string) => {
- if (key === 'rl_actor_test_bucket' && counter) {
- return JSON.stringify(counter);
+ mockRedis.push(([command, ...args]) => {
+ if (command === 'set' && args[0] === 'rl_actor_test_bucket_t') {
+ limitTimestamp = parseInt(args[1] as string);
+ return [null, args[1]];
}
-
- if (key === 'rl_actor_test_min' && minCounter) {
- return JSON.stringify(minCounter);
+ if (command === 'get' && args[0] === 'rl_actor_test_bucket_t') {
+ return [null, limitTimestamp?.toString() ?? null];
}
-
- return null;
- };
-
- mockRedisSet = (args: unknown[]) => {
- const [key, value] = args;
-
- if (key === 'rl_actor_test_bucket') {
- if (value == null) counter = undefined;
- else if (typeof(value) === 'string') counter = JSON.parse(value);
- else throw new Error('invalid redis call');
+ // if (command === 'incr' && args[0] === 'rl_actor_test_bucket_c') {
+ // limitCounter = (limitCounter ?? 0) + 1;
+ // return [null, null];
+ // }
+ if (command === 'set' && args[0] === 'rl_actor_test_bucket_c') {
+ limitCounter = parseInt(args[1] as string);
+ return [null, args[1]];
+ }
+ if (command === 'get' && args[0] === 'rl_actor_test_bucket_c') {
+ return [null, limitCounter?.toString() ?? null];
}
- if (key === 'rl_actor_test_min') {
- if (value == null) minCounter = undefined;
- else if (typeof(value) === 'string') minCounter = JSON.parse(value);
- else throw new Error('invalid redis call');
+ if (command === 'set' && args[0] === 'rl_actor_test_min_t') {
+ minTimestamp = parseInt(args[1] as string);
+ return [null, args[1]];
+ }
+ if (command === 'get' && args[0] === 'rl_actor_test_min_t') {
+ return [null, minTimestamp?.toString() ?? null];
+ }
+ // if (command === 'incr' && args[0] === 'rl_actor_test_min_c') {
+ // minCounter = (minCounter ?? 0) + 1;
+ // return [null, null];
+ // }
+ if (command === 'set' && args[0] === 'rl_actor_test_min_c') {
+ minCounter = parseInt(args[1] as string);
+ return [null, args[1]];
}
- };
+ if (command === 'get' && args[0] === 'rl_actor_test_min_c') {
+ return [null, minCounter?.toString() ?? null];
+ }
+ // if (command === 'expire') {
+ // return [null, null];
+ // }
+
+ return null;
+ });
});
it('should bypass in non-production', async () => {
mockEnvironment.NODE_ENV = 'test';
- const info = await serviceUnderTest().limit({ key: 'l', type: undefined, max: 0 }, 'actor');
+ const info = await serviceUnderTest().limit({ key: 'l', type: undefined, max: 0 }, actor);
expect(info.blocked).toBeFalsy();
expect(info.remaining).toBe(Number.MAX_SAFE_INTEGER);
@@ -158,15 +150,10 @@ describe(SkRateLimiterService, () => {
expect(info.blocked).toBeFalsy();
});
- it('should not error when allowed', async () => {
- await serviceUnderTest().limit(limit, actor);
-
- expectNoUnhandledErrors();
- });
-
it('should return correct info when allowed', async () => {
limit.size = 2;
- counter = { c: 1, t: 0 };
+ limitCounter = 1;
+ limitTimestamp = 0;
const info = await serviceUnderTest().limit(limit, actor);
@@ -180,8 +167,7 @@ describe(SkRateLimiterService, () => {
it('should increment counter when called', async () => {
await serviceUnderTest().limit(limit, actor);
- expect(counter).not.toBeUndefined();
- expect(counter?.c).toBe(1);
+ expect(limitCounter).toBe(1);
});
it('should set timestamp when called', async () => {
@@ -189,29 +175,28 @@ describe(SkRateLimiterService, () => {
await serviceUnderTest().limit(limit, actor);
- expect(counter).not.toBeUndefined();
- expect(counter?.t).toBe(1000);
+ expect(limitTimestamp).toBe(1000);
});
it('should decrement counter when dripRate has passed', async () => {
- counter = { c: 2, t: 0 };
+ limitCounter = 2;
+ limitTimestamp = 0;
mockTimeService.now = 2000;
await serviceUnderTest().limit(limit, actor);
- expect(counter).not.toBeUndefined();
- expect(counter?.c).toBe(1); // 2 (starting) - 2 (2x1 drip) + 1 (call) = 1
+ expect(limitCounter).toBe(1); // 2 (starting) - 2 (2x1 drip) + 1 (call) = 1
});
it('should decrement counter by dripSize', async () => {
- counter = { c: 2, t: 0 };
+ limitCounter = 2;
+ limitTimestamp = 0;
limit.dripSize = 2;
mockTimeService.now = 1000;
await serviceUnderTest().limit(limit, actor);
- expect(counter).not.toBeUndefined();
- expect(counter?.c).toBe(1); // 2 (starting) - 2 (1x2 drip) + 1 (call) = 1
+ expect(limitCounter).toBe(1); // 2 (starting) - 2 (1x2 drip) + 1 (call) = 1
});
it('should maintain counter between calls over time', async () => {
@@ -226,25 +211,13 @@ describe(SkRateLimiterService, () => {
mockTimeService.now += 1000; // 2 - 1 = 1
await serviceUnderTest().limit(limit, actor); // 1 + 1 = 2
- expect(counter?.c).toBe(2);
- expect(counter?.t).toBe(3000);
- });
-
- it('should log error and continue when update fails', async () => {
- mockRedisSet = () => {
- throw new Error('test error');
- };
-
- await serviceUnderTest().limit(limit, actor);
-
- const matchingError = loggedMessages
- .find(m => m.level === 'error' && m.data
- .some(d => typeof(d) === 'string' && d.includes('Failed to update limit')));
- expect(matchingError).toBeTruthy();
+ expect(limitCounter).toBe(2);
+ expect(limitTimestamp).toBe(3000);
});
it('should block when bucket is filled', async () => {
- counter = { c: 1, t: 0 };
+ limitCounter = 1;
+ limitTimestamp = 0;
const info = await serviceUnderTest().limit(limit, actor);
@@ -252,7 +225,8 @@ describe(SkRateLimiterService, () => {
});
it('should calculate correct info when blocked', async () => {
- counter = { c: 1, t: 0 };
+ limitCounter = 1;
+ limitTimestamp = 0;
const info = await serviceUnderTest().limit(limit, actor);
@@ -263,7 +237,8 @@ describe(SkRateLimiterService, () => {
});
it('should allow when bucket is filled but should drip', async () => {
- counter = { c: 1, t: 0 };
+ limitCounter = 1;
+ limitTimestamp = 0;
mockTimeService.now = 1000;
const info = await serviceUnderTest().limit(limit, actor);
@@ -272,7 +247,8 @@ describe(SkRateLimiterService, () => {
});
it('should scale limit by factor', async () => {
- counter = { c: 1, t: 0 };
+ limitCounter = 1;
+ limitTimestamp = 0;
const i1 = await serviceUnderTest().limit(limit, actor, 0.5); // 1 + 1 = 2
const i2 = await serviceUnderTest().limit(limit, actor, 0.5); // 2 + 1 = 3
@@ -281,23 +257,39 @@ describe(SkRateLimiterService, () => {
expect(i2.blocked).toBeTruthy();
});
- it('should set key expiration', async () => {
- const mock = jest.fn(mockRedisSet);
- mockRedisSet = mock;
+ it('should set counter expiration', async () => {
+ const commands: unknown[][] = [];
+ mockRedis.push(command => {
+ commands.push(command);
+ return null;
+ });
await serviceUnderTest().limit(limit, actor);
- expect(mock).toHaveBeenCalledWith(['rl_actor_test_bucket', '{"t":0,"c":1}', 'EX', 1]);
+ expect(commands).toContainEqual(['set', 'rl_actor_test_bucket_c', '1', 'EX', 1]);
+ });
+
+ it('should set timestamp expiration', async () => {
+ const commands: unknown[][] = [];
+ mockRedis.push(command => {
+ commands.push(command);
+ return null;
+ });
+
+ await serviceUnderTest().limit(limit, actor);
+
+ expect(commands).toContainEqual(['set', 'rl_actor_test_bucket_t', '0', 'EX', 1]);
});
it('should not increment when already blocked', async () => {
- counter = { c: 1, t: 0 };
+ limitCounter = 1;
+ limitTimestamp = 0;
mockTimeService.now += 100;
await serviceUnderTest().limit(limit, actor);
- expect(counter?.c).toBe(1);
- expect(counter?.t).toBe(0);
+ expect(limitCounter).toBe(1);
+ expect(limitTimestamp).toBe(0);
});
it('should skip if factor is zero', async () => {
@@ -384,6 +376,48 @@ describe(SkRateLimiterService, () => {
await expect(promise).rejects.toThrow(/dripSize is less than 1/);
});
+
+ it('should retry when redis conflicts', async () => {
+ let numCalls = 0;
+ const realMockRedisExec = mockRedisExec;
+ mockRedisExec = () => {
+ if (numCalls > 0) {
+ mockRedisExec = realMockRedisExec;
+ }
+ numCalls++;
+ return Promise.resolve(null);
+ };
+
+ await serviceUnderTest().limit(limit, actor);
+
+ expect(numCalls).toBe(2);
+ });
+
+ it('should bail out after 3 tries', async () => {
+ let numCalls = 0;
+ mockRedisExec = () => {
+ numCalls++;
+ return Promise.resolve(null);
+ };
+
+ const promise = serviceUnderTest().limit(limit, actor);
+
+ await expect(promise).rejects.toThrow(/transaction conflict/);
+ expect(numCalls).toBe(3);
+ });
+
+ it('should apply correction if extra calls slip through', async () => {
+ limitCounter = 2;
+
+ const info = await serviceUnderTest().limit(limit, actor);
+
+ expect(info.blocked).toBeTruthy();
+ expect(info.remaining).toBe(0);
+ expect(info.resetMs).toBe(2000);
+ expect(info.resetSec).toBe(2);
+ expect(info.fullResetMs).toBe(2000);
+ expect(info.fullResetSec).toBe(2);
+ });
});
describe('with min interval', () => {
@@ -403,12 +437,6 @@ describe(SkRateLimiterService, () => {
expect(info.blocked).toBeFalsy();
});
- it('should not error when allowed', async () => {
- await serviceUnderTest().limit(limit, actor);
-
- expectNoUnhandledErrors();
- });
-
it('should calculate correct info when allowed', async () => {
const info = await serviceUnderTest().limit(limit, actor);
@@ -423,7 +451,7 @@ describe(SkRateLimiterService, () => {
await serviceUnderTest().limit(limit, actor);
expect(minCounter).not.toBeUndefined();
- expect(minCounter?.c).toBe(1);
+ expect(minCounter).toBe(1);
});
it('should set timestamp when called', async () => {
@@ -432,27 +460,29 @@ describe(SkRateLimiterService, () => {
await serviceUnderTest().limit(limit, actor);
expect(minCounter).not.toBeUndefined();
- expect(minCounter?.t).toBe(1000);
+ expect(minTimestamp).toBe(1000);
});
it('should decrement counter when minInterval has passed', async () => {
- minCounter = { c: 1, t: 0 };
+ minCounter = 1;
+ minTimestamp = 0;
mockTimeService.now = 1000;
await serviceUnderTest().limit(limit, actor);
expect(minCounter).not.toBeUndefined();
- expect(minCounter?.c).toBe(1); // 1 (starting) - 1 (interval) + 1 (call) = 1
+ expect(minCounter).toBe(1); // 1 (starting) - 1 (interval) + 1 (call) = 1
});
it('should reset counter entirely', async () => {
- minCounter = { c: 2, t: 0 };
+ minCounter = 2;
+ minTimestamp = 0;
mockTimeService.now = 1000;
await serviceUnderTest().limit(limit, actor);
expect(minCounter).not.toBeUndefined();
- expect(minCounter?.c).toBe(1); // 2 (starting) - 2 (interval) + 1 (call) = 1
+ expect(minCounter).toBe(1); // 2 (starting) - 2 (interval) + 1 (call) = 1
});
it('should maintain counter between calls over time', async () => {
@@ -465,25 +495,13 @@ describe(SkRateLimiterService, () => {
mockTimeService.now += 1000; // 0 - 1 = 0
await serviceUnderTest().limit(limit, actor); // 0 + 1 = 1
- expect(minCounter?.c).toBe(1);
- expect(minCounter?.t).toBe(3000);
- });
-
- it('should log error and continue when update fails', async () => {
- mockRedisSet = () => {
- throw new Error('test error');
- };
-
- await serviceUnderTest().limit(limit, actor);
-
- const matchingError = loggedMessages
- .find(m => m.level === 'error' && m.data
- .some(d => typeof(d) === 'string' && d.includes('Failed to update limit')));
- expect(matchingError).toBeTruthy();
+ expect(minCounter).toBe(1);
+ expect(minTimestamp).toBe(3000);
});
it('should block when interval exceeded', async () => {
- minCounter = { c: 1, t: 0 };
+ minCounter = 1;
+ minTimestamp = 0;
const info = await serviceUnderTest().limit(limit, actor);
@@ -491,7 +509,8 @@ describe(SkRateLimiterService, () => {
});
it('should calculate correct info when blocked', async () => {
- minCounter = { c: 1, t: 0 };
+ minCounter = 1;
+ minTimestamp = 0;
const info = await serviceUnderTest().limit(limit, actor);
@@ -502,7 +521,8 @@ describe(SkRateLimiterService, () => {
});
it('should allow when bucket is filled but interval has passed', async () => {
- minCounter = { c: 1, t: 0 };
+ minCounter = 1;
+ minTimestamp = 0;
mockTimeService.now = 1000;
const info = await serviceUnderTest().limit(limit, actor);
@@ -511,7 +531,8 @@ describe(SkRateLimiterService, () => {
});
it('should scale interval by factor', async () => {
- minCounter = { c: 1, t: 0 };
+ minCounter = 1;
+ minTimestamp = 0;
mockTimeService.now += 500;
const info = await serviceUnderTest().limit(limit, actor, 0.5);
@@ -519,23 +540,39 @@ describe(SkRateLimiterService, () => {
expect(info.blocked).toBeFalsy();
});
- it('should set key expiration', async () => {
- const mock = jest.fn(mockRedisSet);
- mockRedisSet = mock;
+ it('should set counter expiration', async () => {
+ const commands: unknown[][] = [];
+ mockRedis.push(command => {
+ commands.push(command);
+ return null;
+ });
await serviceUnderTest().limit(limit, actor);
- expect(mock).toHaveBeenCalledWith(['rl_actor_test_min', '{"t":0,"c":1}', 'EX', 1]);
+ expect(commands).toContainEqual(['set', 'rl_actor_test_min_c', '1', 'EX', 1]);
+ });
+
+ it('should set timestamp expiration', async () => {
+ const commands: unknown[][] = [];
+ mockRedis.push(command => {
+ commands.push(command);
+ return null;
+ });
+
+ await serviceUnderTest().limit(limit, actor);
+
+ expect(commands).toContainEqual(['set', 'rl_actor_test_min_t', '0', 'EX', 1]);
});
it('should not increment when already blocked', async () => {
- minCounter = { c: 1, t: 0 };
+ minCounter = 1;
+ minTimestamp = 0;
mockTimeService.now += 100;
await serviceUnderTest().limit(limit, actor);
- expect(minCounter?.c).toBe(1);
- expect(minCounter?.t).toBe(0);
+ expect(minCounter).toBe(1);
+ expect(minTimestamp).toBe(0);
});
it('should skip if factor is zero', async () => {
@@ -567,6 +604,19 @@ describe(SkRateLimiterService, () => {
await expect(promise).rejects.toThrow(/minInterval is negative/);
});
+
+ it('should not apply correction to extra calls', async () => {
+ minCounter = 2;
+
+ const info = await serviceUnderTest().limit(limit, actor);
+
+ expect(info.blocked).toBeTruthy();
+ expect(info.remaining).toBe(0);
+ expect(info.resetMs).toBe(1000);
+ expect(info.resetSec).toBe(1);
+ expect(info.fullResetMs).toBe(1000);
+ expect(info.fullResetSec).toBe(1);
+ });
});
describe('with legacy limit', () => {
@@ -587,16 +637,11 @@ describe(SkRateLimiterService, () => {
expect(info.blocked).toBeFalsy();
});
- it('should not error when allowed', async () => {
- await serviceUnderTest().limit(limit, actor);
-
- expectNoUnhandledErrors();
- });
-
it('should infer dripRate from duration', async () => {
limit.max = 10;
limit.duration = 10000;
- counter = { c: 10, t: 0 };
+ limitCounter = 10;
+ limitTimestamp = 0;
const i1 = await serviceUnderTest().limit(limit, actor);
mockTimeService.now += 1000;
@@ -619,7 +664,8 @@ describe(SkRateLimiterService, () => {
it('should calculate correct info when allowed', async () => {
limit.max = 10;
limit.duration = 10000;
- counter = { c: 10, t: 0 };
+ limitCounter = 10;
+ limitTimestamp = 0;
mockTimeService.now += 2000;
const info = await serviceUnderTest().limit(limit, actor);
@@ -634,7 +680,8 @@ describe(SkRateLimiterService, () => {
it('should calculate correct info when blocked', async () => {
limit.max = 10;
limit.duration = 10000;
- counter = { c: 10, t: 0 };
+ limitCounter = 10;
+ limitTimestamp = 0;
const info = await serviceUnderTest().limit(limit, actor);
@@ -646,7 +693,8 @@ describe(SkRateLimiterService, () => {
});
it('should allow when bucket is filled but interval has passed', async () => {
- counter = { c: 10, t: 0 };
+ limitCounter = 10;
+ limitTimestamp = 0;
mockTimeService.now = 1000;
const info = await serviceUnderTest().limit(limit, actor);
@@ -655,37 +703,55 @@ describe(SkRateLimiterService, () => {
});
it('should scale limit by factor', async () => {
- counter = { c: 10, t: 0 };
+ limitCounter = 10;
+ limitTimestamp = 0;
const info = await serviceUnderTest().limit(limit, actor, 0.5); // 10 + 1 = 11
expect(info.blocked).toBeTruthy();
});
- it('should set key expiration', async () => {
- const mock = jest.fn(mockRedisSet);
- mockRedisSet = mock;
+ it('should set counter expiration', async () => {
+ const commands: unknown[][] = [];
+ mockRedis.push(command => {
+ commands.push(command);
+ return null;
+ });
+
+ await serviceUnderTest().limit(limit, actor);
+
+ expect(commands).toContainEqual(['set', 'rl_actor_test_bucket_c', '1', 'EX', 1]);
+ });
+
+ it('should set timestamp expiration', async () => {
+ const commands: unknown[][] = [];
+ mockRedis.push(command => {
+ commands.push(command);
+ return null;
+ });
await serviceUnderTest().limit(limit, actor);
- expect(mock).toHaveBeenCalledWith(['rl_actor_test_bucket', '{"t":0,"c":1}', 'EX', 1]);
+ expect(commands).toContainEqual(['set', 'rl_actor_test_bucket_t', '0', 'EX', 1]);
});
it('should not increment when already blocked', async () => {
- counter = { c: 1, t: 0 };
+ limitCounter = 1;
+ limitTimestamp = 0;
mockTimeService.now += 100;
await serviceUnderTest().limit(limit, actor);
- expect(counter?.c).toBe(1);
- expect(counter?.t).toBe(0);
+ expect(limitCounter).toBe(1);
+ expect(limitTimestamp).toBe(0);
});
it('should not allow dripRate to be lower than 0', async () => {
// real-world case; taken from StreamingApiServerService
limit.max = 4096;
limit.duration = 2000;
- counter = { c: 4096, t: 0 };
+ limitCounter = 4096;
+ limitTimestamp = 0;
const i1 = await serviceUnderTest().limit(limit, actor);
mockTimeService.now = 1;
@@ -723,6 +789,19 @@ describe(SkRateLimiterService, () => {
await expect(promise).rejects.toThrow(/size is less than 1/);
});
+
+ it('should apply correction if extra calls slip through', async () => {
+ limitCounter = 2;
+
+ const info = await serviceUnderTest().limit(limit, actor);
+
+ expect(info.blocked).toBeTruthy();
+ expect(info.remaining).toBe(0);
+ expect(info.resetMs).toBe(2000);
+ expect(info.resetSec).toBe(2);
+ expect(info.fullResetMs).toBe(2000);
+ expect(info.fullResetSec).toBe(2);
+ });
});
describe('with legacy limit and min interval', () => {
@@ -744,14 +823,9 @@ describe(SkRateLimiterService, () => {
expect(info.blocked).toBeFalsy();
});
- it('should not error when allowed', async () => {
- await serviceUnderTest().limit(limit, actor);
-
- expectNoUnhandledErrors();
- });
-
it('should block when limit exceeded', async () => {
- counter = { c: 5, t: 0 };
+ limitCounter = 5;
+ limitTimestamp = 0;
const info = await serviceUnderTest().limit(limit, actor);
@@ -759,7 +833,8 @@ describe(SkRateLimiterService, () => {
});
it('should block when minInterval exceeded', async () => {
- minCounter = { c: 1, t: 0 };
+ minCounter = 1;
+ minTimestamp = 0;
const info = await serviceUnderTest().limit(limit, actor);
@@ -767,7 +842,8 @@ describe(SkRateLimiterService, () => {
});
it('should calculate correct info when allowed', async () => {
- counter = { c: 1, t: 0 };
+ limitCounter = 1;
+ limitTimestamp = 0;
const info = await serviceUnderTest().limit(limit, actor);
@@ -779,7 +855,8 @@ describe(SkRateLimiterService, () => {
});
it('should calculate correct info when blocked by limit', async () => {
- counter = { c: 5, t: 0 };
+ limitCounter = 5;
+ limitTimestamp = 0;
const info = await serviceUnderTest().limit(limit, actor);
@@ -791,7 +868,8 @@ describe(SkRateLimiterService, () => {
});
it('should calculate correct info when blocked by minInterval', async () => {
- minCounter = { c: 1, t: 0 };
+ minCounter = 1;
+ minTimestamp = 0;
const info = await serviceUnderTest().limit(limit, actor);
@@ -803,7 +881,8 @@ describe(SkRateLimiterService, () => {
});
it('should allow when counter is filled but interval has passed', async () => {
- counter = { c: 5, t: 0 };
+ limitCounter = 5;
+ limitTimestamp = 0;
mockTimeService.now = 1000;
const info = await serviceUnderTest().limit(limit, actor);
@@ -812,7 +891,8 @@ describe(SkRateLimiterService, () => {
});
it('should allow when minCounter is filled but interval has passed', async () => {
- minCounter = { c: 1, t: 0 };
+ minCounter = 1;
+ minTimestamp = 0;
mockTimeService.now = 1000;
const info = await serviceUnderTest().limit(limit, actor);
@@ -821,8 +901,10 @@ describe(SkRateLimiterService, () => {
});
it('should scale limit and interval by factor', async () => {
- counter = { c: 5, t: 0 };
- minCounter = { c: 1, t: 0 };
+ limitCounter = 5;
+ limitTimestamp = 0;
+ minCounter = 1;
+ minTimestamp = 0;
mockTimeService.now += 500;
const info = await serviceUnderTest().limit(limit, actor, 0.5);
@@ -830,27 +912,81 @@ describe(SkRateLimiterService, () => {
expect(info.blocked).toBeFalsy();
});
- it('should set key expiration', async () => {
- const mock = jest.fn(mockRedisSet);
- mockRedisSet = mock;
+ it('should set bucket counter expiration', async () => {
+ const commands: unknown[][] = [];
+ mockRedis.push(command => {
+ commands.push(command);
+ return null;
+ });
+
+ await serviceUnderTest().limit(limit, actor);
+
+ expect(commands).toContainEqual(['set', 'rl_actor_test_bucket_c', '1', 'EX', 1]);
+ });
+
+ it('should set bucket timestamp expiration', async () => {
+ const commands: unknown[][] = [];
+ mockRedis.push(command => {
+ commands.push(command);
+ return null;
+ });
await serviceUnderTest().limit(limit, actor);
- expect(mock).toHaveBeenCalledWith(['rl_actor_test_bucket', '{"t":0,"c":1}', 'EX', 1]);
- expect(mock).toHaveBeenCalledWith(['rl_actor_test_min', '{"t":0,"c":1}', 'EX', 1]);
+ expect(commands).toContainEqual(['set', 'rl_actor_test_bucket_t', '0', 'EX', 1]);
+ });
+
+ it('should set min counter expiration', async () => {
+ const commands: unknown[][] = [];
+ mockRedis.push(command => {
+ commands.push(command);
+ return null;
+ });
+
+ await serviceUnderTest().limit(limit, actor);
+
+ expect(commands).toContainEqual(['set', 'rl_actor_test_min_c', '1', 'EX', 1]);
+ });
+
+ it('should set min timestamp expiration', async () => {
+ const commands: unknown[][] = [];
+ mockRedis.push(command => {
+ commands.push(command);
+ return null;
+ });
+
+ await serviceUnderTest().limit(limit, actor);
+
+ expect(commands).toContainEqual(['set', 'rl_actor_test_min_t', '0', 'EX', 1]);
});
it('should not increment when already blocked', async () => {
- counter = { c: 5, t: 0 };
- minCounter = { c: 1, t: 0 };
+ limitCounter = 5;
+ limitTimestamp = 0;
+ minCounter = 1;
+ minTimestamp = 0;
mockTimeService.now += 100;
await serviceUnderTest().limit(limit, actor);
- expect(counter?.c).toBe(5);
- expect(counter?.t).toBe(0);
- expect(minCounter?.c).toBe(1);
- expect(minCounter?.t).toBe(0);
+ expect(limitCounter).toBe(5);
+ expect(limitTimestamp).toBe(0);
+ expect(minCounter).toBe(1);
+ expect(minTimestamp).toBe(0);
+ });
+
+ it('should apply correction if extra calls slip through', async () => {
+ limitCounter = 6;
+ minCounter = 6;
+
+ const info = await serviceUnderTest().limit(limit, actor);
+
+ expect(info.blocked).toBeTruthy();
+ expect(info.remaining).toBe(0);
+ expect(info.resetMs).toBe(2000);
+ expect(info.resetSec).toBe(2);
+ expect(info.fullResetMs).toBe(6000);
+ expect(info.fullResetSec).toBe(6);
});
});
});