summaryrefslogtreecommitdiff
path: root/packages/backend/src/misc
diff options
context:
space:
mode:
authorHazelnoot <acomputerdog@gmail.com>2025-06-05 10:49:16 -0400
committerHazelnoot <acomputerdog@gmail.com>2025-06-09 11:02:36 -0400
commitf446d77cb51af1f66a0042feec2f0907537a16ce (patch)
treeba2ea3f4aba144345466beab1a10c61360267abc /packages/backend/src/misc
parentimplement InternalEventService (diff)
downloadsharkey-f446d77cb51af1f66a0042feec2f0907537a16ce.tar.gz
sharkey-f446d77cb51af1f66a0042feec2f0907537a16ce.tar.bz2
sharkey-f446d77cb51af1f66a0042feec2f0907537a16ce.zip
implement QuantumKVCache
Diffstat (limited to 'packages/backend/src/misc')
-rw-r--r--packages/backend/src/misc/cache.ts233
1 files changed, 233 insertions, 0 deletions
diff --git a/packages/backend/src/misc/cache.ts b/packages/backend/src/misc/cache.ts
index a6ab96c189..31e6f126b8 100644
--- a/packages/backend/src/misc/cache.ts
+++ b/packages/backend/src/misc/cache.ts
@@ -5,6 +5,8 @@
import * as Redis from 'ioredis';
import { bindThis } from '@/decorators.js';
+import { InternalEventService } from '@/core/InternalEventService.js';
+import { InternalEventTypes } from '@/core/GlobalEventService.js';
export class RedisKVCache<T> {
private readonly lifetime: number;
@@ -322,6 +324,10 @@ export class MemoryKVCache<T> {
clearInterval(this.gcIntervalHandle);
}
+ public get size() {
+ return this.cache.size;
+ }
+
public get entries() {
return this.cache.entries();
}
@@ -410,3 +416,230 @@ export class MemorySingleCache<T> {
return value;
}
}
+
+export interface QuantumKVOpts<T> {
+ /**
+ * Memory cache lifetime in milliseconds.
+ */
+ lifetime: number;
+
+ /**
+ * Callback to fetch the value for a key that wasn't found in the cache.
+ * May be synchronous or async.
+ */
+ fetcher: (key: string, cache: QuantumKVCache<T>) => T | Promise<T>;
+
+ /**
+ * Optional callback when a value is created or changed in the cache, either locally or elsewhere in the cluster.
+ * This is called *after* the cache state is updated.
+ * May be synchronous or async.
+ */
+ onSet?: (key: string, cache: QuantumKVCache<T>) => void | Promise<void>;
+
+ /**
+ * Optional callback when a value is deleted from the cache, either locally or elsewhere in the cluster.
+ * This is called *after* the cache state is updated.
+ * May be synchronous or async.
+ */
+ onDelete?: (key: string, cache: QuantumKVCache<T>) => void | Promise<void>;
+}
+
+/**
+ * QuantumKVCache is a lifetime-bounded memory cache (like MemoryKVCache) with automatic cross-cluster synchronization via Redis.
+ * All nodes in the cluster are guaranteed to have a *subset* view of the current accurate state, though individual processes may have different items in their local cache.
+ * This ensures that a call to get() will never return stale data.
+ */
+export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
+ private readonly memoryCache: MemoryKVCache<T>;
+
+ private readonly fetcher: QuantumKVOpts<T>['fetcher'];
+ private readonly onSet: QuantumKVOpts<T>['onSet'];
+ private readonly onDelete: QuantumKVOpts<T>['onDelete'];
+
+ /**
+ * @param internalEventService Service bus to synchronize events.
+ * @param name Unique name of the cache - must be the same in all processes.
+ * @param opts Cache options
+ */
+ constructor(
+ private readonly internalEventService: InternalEventService,
+ private readonly name: string,
+ opts: QuantumKVOpts<T>,
+ ) {
+ this.memoryCache = new MemoryKVCache(opts.lifetime);
+ this.fetcher = opts.fetcher;
+ this.onSet = opts.onSet;
+ this.onDelete = opts.onDelete;
+
+ this.internalEventService.on('quantumCacheUpdated', this.onQuantumCacheUpdated, {
+ // Ignore our own events, otherwise we'll immediately erase any set value.
+ ignoreLocal: true,
+ });
+ }
+
+ /**
+ * The number of items currently in memory.
+ * This applies to the local subset view, not the cross-cluster cache state.
+ */
+ public get size() {
+ return this.memoryCache.size;
+ }
+
+ /**
+ * Iterates all [key, value] pairs in memory.
+ * This applies to the local subset view, not the cross-cluster cache state.
+ */
+ @bindThis
+ public *entries(): Generator<[key: string, value: T]> {
+ for (const entry of this.memoryCache.entries) {
+ yield [entry[0], entry[1].value];
+ }
+ }
+
+ /**
+ * Iterates all keys in memory.
+ * This applies to the local subset view, not the cross-cluster cache state.
+ */
+ @bindThis
+ public *keys() {
+ for (const entry of this.memoryCache.entries) {
+ yield entry[0];
+ }
+ }
+
+ /**
+ * Iterates all values pairs in memory.
+ * This applies to the local subset view, not the cross-cluster cache state.
+ */
+ @bindThis
+ public *values() {
+ for (const entry of this.memoryCache.entries) {
+ yield entry[1].value;
+ }
+ }
+
+ /**
+ * Creates or updates a value in the cache, and erases any stale caches across the cluster.
+ * Fires an onSet event after the cache has been updated in all processes.
+ * Skips if the value is unchanged.
+ */
+ @bindThis
+ public async set(key: string, value: T): Promise<void> {
+ if (this.memoryCache.get(key) === value) {
+ return;
+ }
+
+ this.memoryCache.set(key, value);
+
+ await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', key });
+
+ if (this.onSet) {
+ await this.onSet(key, this);
+ }
+ }
+
+ /**
+ * Gets or fetches a value from the cache.
+ * Fires an onSet event, but does not emit an update event to other processes.
+ */
+ @bindThis
+ public async get(key: string): Promise<T> {
+ let value = this.memoryCache.get(key);
+ if (value === undefined) {
+ value = await this.fetcher(key, this);
+ this.memoryCache.set(key, value);
+
+ if (this.onSet) {
+ await this.onSet(key, this);
+ }
+ }
+ return value;
+ }
+
+ /**
+ * Alias to get(), included for backwards-compatibility with RedisKVCache.
+ * @deprecated use get() instead
+ */
+ @bindThis
+ public async fetch(key: string): Promise<T> {
+ return await this.get(key);
+ }
+
+ /**
+ * Returns true is a key exists in memory.
+ * This applies to the local subset view, not the cross-cluster cache state.
+ */
+ @bindThis
+ public has(key: string): boolean {
+ return this.memoryCache.get(key) !== undefined;
+ }
+
+ /**
+ * Deletes a value from the cache, and erases any stale caches across the cluster.
+ * Fires an onDelete event after the cache has been updated in all processes.
+ */
+ @bindThis
+ public async delete(key: string): Promise<void> {
+ this.memoryCache.delete(key);
+
+ await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', key });
+
+ if (this.onDelete) {
+ await this.onDelete(key, this);
+ }
+ }
+
+ /**
+ * Refreshes the value of a key from the fetcher, and erases any stale caches across the cluster.
+ * Fires an onSet event after the cache has been updated in all processes.
+ */
+ @bindThis
+ public async refresh(key: string): Promise<T> {
+ const value = await this.fetcher(key, this);
+ await this.set(key, value);
+ return value;
+ }
+
+ /**
+ * Erases all entries from the local memory cache.
+ * Does not send any events or update other processes.
+ */
+ @bindThis
+ public gc() {
+ this.memoryCache.gc();
+ }
+
+ /**
+ * Erases all data and disconnects from the cluster.
+ * This *must* be called when shutting down to prevent memory leaks!
+ */
+ @bindThis
+ public dispose() {
+ this.internalEventService.off('quantumCacheUpdated', this.onQuantumCacheUpdated);
+
+ this.memoryCache.dispose();
+ }
+
+ @bindThis
+ private async onQuantumCacheUpdated(data: InternalEventTypes['quantumCacheUpdated']): Promise<void> {
+ if (data.name === this.name) {
+ this.memoryCache.delete(data.key);
+
+ if (data.op === 's' && this.onSet) {
+ await this.onSet(data.key, this);
+ }
+
+ if (data.op === 'd' && this.onDelete) {
+ await this.onDelete(data.key, this);
+ }
+ }
+ }
+
+ /**
+ * Iterates all [key, value] pairs in memory.
+ * This applies to the local subset view, not the cross-cluster cache state.
+ */
+ [Symbol.iterator](): Iterator<[key: string, value: T]> {
+ return this.entries();
+ }
+}