diff options
Diffstat (limited to 'packages/backend/src/misc')
| -rw-r--r-- | packages/backend/src/misc/QuantumKVCache.ts | 145 | ||||
| -rw-r--r-- | packages/backend/src/misc/cache.ts | 18 |
2 files changed, 119 insertions, 44 deletions
diff --git a/packages/backend/src/misc/QuantumKVCache.ts b/packages/backend/src/misc/QuantumKVCache.ts index 6b36789f5e..b96937d6f2 100644 --- a/packages/backend/src/misc/QuantumKVCache.ts +++ b/packages/backend/src/misc/QuantumKVCache.ts @@ -21,18 +21,18 @@ export interface QuantumKVOpts<T> { fetcher: (key: string, cache: QuantumKVCache<T>) => T | Promise<T>; /** - * Optional callback when a value is created or changed in the cache, either locally or elsewhere in the cluster. - * This is called *after* the cache state is updated. + * Optional callback to fetch the value for multiple keys that weren't found in the cache. * May be synchronous or async. + * If not provided, then the implementation will fall back on repeated calls to fetcher(). */ - onSet?: (key: string, cache: QuantumKVCache<T>) => void | Promise<void>; + bulkFetcher?: (keys: string[], cache: QuantumKVCache<T>) => Iterable<[key: string, value: T]> | Promise<Iterable<[key: string, value: T]>>; /** - * Optional callback when a value is deleted from the cache, either locally or elsewhere in the cluster. + * Optional callback when one or more values are changed (created, updated, or deleted) in the cache, either locally or elsewhere in the cluster. * This is called *after* the cache state is updated. - * May be synchronous or async. + * Implementations may be synchronous or async. */ - onDelete?: (key: string, cache: QuantumKVCache<T>) => void | Promise<void>; + onChanged?: (keys: string[], cache: QuantumKVCache<T>) => void | Promise<void>; } /** @@ -44,8 +44,8 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> { private readonly memoryCache: MemoryKVCache<T>; public readonly fetcher: QuantumKVOpts<T>['fetcher']; - public readonly onSet: QuantumKVOpts<T>['onSet']; - public readonly onDelete: QuantumKVOpts<T>['onDelete']; + public readonly bulkFetcher: QuantumKVOpts<T>['bulkFetcher']; + public readonly onChanged: QuantumKVOpts<T>['onChanged']; /** * @param internalEventService Service bus to synchronize events. @@ -59,8 +59,8 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> { ) { this.memoryCache = new MemoryKVCache(opts.lifetime); this.fetcher = opts.fetcher; - this.onSet = opts.onSet; - this.onDelete = opts.onDelete; + this.bulkFetcher = opts.bulkFetcher; + this.onChanged = opts.onChanged; this.internalEventService.on('quantumCacheUpdated', this.onQuantumCacheUpdated, { // Ignore our own events, otherwise we'll immediately erase any set value. @@ -122,10 +122,10 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> { this.memoryCache.set(key, value); - await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', keys: [key] }); + await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, keys: [key] }); - if (this.onSet) { - await this.onSet(key, this); + if (this.onChanged) { + await this.onChanged([key], this); } } @@ -146,12 +146,10 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> { } if (changedKeys.length > 0) { - await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', keys: changedKeys }); + await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, keys: changedKeys }); - if (this.onSet) { - for (const key of changedKeys) { - await this.onSet(key, this); - } + if (this.onChanged) { + await this.onChanged(changedKeys, this); } } } @@ -180,6 +178,7 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> { /** * Gets a value from the local memory cache, or returns undefined if not found. + * Returns cached data only - does not make any fetches. */ @bindThis public get(key: string): T | undefined { @@ -187,6 +186,19 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> { } /** + * Gets multiple values from the local memory cache; returning undefined for any missing keys. + * Returns cached data only - does not make any fetches. + */ + @bindThis + public getMany(keys: Iterable<string>): [key: string, value: T | undefined][] { + const results: [key: string, value: T | undefined][] = []; + for (const key of keys) { + results.push([key, this.get(key)]); + } + return results; + } + + /** * Gets or fetches a value from the cache. * Fires an onSet event, but does not emit an update event to other processes. */ @@ -197,14 +209,50 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> { value = await this.fetcher(key, this); this.memoryCache.set(key, value); - if (this.onSet) { - await this.onSet(key, this); + if (this.onChanged) { + await this.onChanged([key], this); } } return value; } /** + * Gets or fetches multiple values from the cache. + * Fires onSet events, but does not emit any update events to other processes. + */ + @bindThis + public async fetchMany(keys: Iterable<string>): Promise<[key: string, value: T][]> { + const results: [key: string, value: T][] = []; + const toFetch: string[] = []; + + // Spliterate into cached results / uncached keys. + for (const key of keys) { + const fromCache = this.get(key); + if (fromCache) { + results.push([key, fromCache]); + } else { + toFetch.push(key); + } + } + + // Fetch any uncached keys + if (toFetch.length > 0) { + const fetched = await this.bulkFetch(toFetch); + + // Add to cache and return set + this.addMany(fetched); + results.push(...fetched); + + // Emit event + if (this.onChanged) { + await this.onChanged(toFetch, this); + } + } + + return results; + } + + /** * Returns true is a key exists in memory. * This applies to the local subset view, not the cross-cluster cache state. */ @@ -221,10 +269,10 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> { public async delete(key: string): Promise<void> { this.memoryCache.delete(key); - await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', keys: [key] }); + await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, keys: [key] }); - if (this.onDelete) { - await this.onDelete(key, this); + if (this.onChanged) { + await this.onChanged([key], this); } } /** @@ -233,21 +281,22 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> { * Skips if the input is empty. */ @bindThis - public async deleteMany(keys: string[]): Promise<void> { - if (keys.length === 0) { - return; - } + public async deleteMany(keys: Iterable<string>): Promise<void> { + const deleted: string[] = []; for (const key of keys) { this.memoryCache.delete(key); + deleted.push(key); } - await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', keys }); + if (deleted.length === 0) { + return; + } - if (this.onDelete) { - for (const key of keys) { - await this.onDelete(key, this); - } + await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, keys: deleted }); + + if (this.onChanged) { + await this.onChanged(deleted, this); } } @@ -262,6 +311,13 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> { return value; } + @bindThis + public async refreshMany(keys: Iterable<string>): Promise<[key: string, value: T][]> { + const values = await this.bulkFetch(keys); + await this.setMany(values); + return values; + } + /** * Erases all entries from the local memory cache. * Does not send any events or update other processes. @@ -292,18 +348,29 @@ export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> { } @bindThis + private async bulkFetch(keys: Iterable<string>): Promise<[key: string, value: T][]> { + if (this.bulkFetcher) { + const results = await this.bulkFetcher(Array.from(keys), this); + return Array.from(results); + } + + const results: [key: string, value: T][] = []; + for (const key of keys) { + const value = await this.fetcher(key, this); + results.push([key, value]); + } + return results; + } + + @bindThis private async onQuantumCacheUpdated(data: InternalEventTypes['quantumCacheUpdated']): Promise<void> { if (data.name === this.name) { for (const key of data.keys) { this.memoryCache.delete(key); + } - if (data.op === 's' && this.onSet) { - await this.onSet(key, this); - } - - if (data.op === 'd' && this.onDelete) { - await this.onDelete(key, this); - } + if (this.onChanged) { + await this.onChanged(data.keys, this); } } } diff --git a/packages/backend/src/misc/cache.ts b/packages/backend/src/misc/cache.ts index 932c0b409a..666e684c1c 100644 --- a/packages/backend/src/misc/cache.ts +++ b/packages/backend/src/misc/cache.ts @@ -5,8 +5,6 @@ import * as Redis from 'ioredis'; import { bindThis } from '@/decorators.js'; -import { InternalEventService } from '@/core/InternalEventService.js'; -import { InternalEventTypes } from '@/core/GlobalEventService.js'; export class RedisKVCache<T> { private readonly lifetime: number; @@ -120,9 +118,9 @@ export class RedisKVCache<T> { export class RedisSingleCache<T> { private readonly lifetime: number; private readonly memoryCache: MemorySingleCache<T>; - private readonly fetcher: () => Promise<T>; - private readonly toRedisConverter: (value: T) => string; - private readonly fromRedisConverter: (value: string) => T | undefined; + public readonly fetcher: () => Promise<T>; + public readonly toRedisConverter: (value: T) => string; + public readonly fromRedisConverter: (value: string) => T | undefined; constructor( private redisClient: Redis.Redis, @@ -245,6 +243,16 @@ export class MemoryKVCache<T> { return cached.value; } + public has(key: string): boolean { + const cached = this.cache.get(key); + if (cached == null) return false; + if ((Date.now() - cached.date) > this.lifetime) { + this.cache.delete(key); + return false; + } + return true; + } + @bindThis public delete(key: string): void { this.cache.delete(key); |