summaryrefslogtreecommitdiff
path: root/packages/backend/src/misc
diff options
context:
space:
mode:
authorHazelnoot <acomputerdog@gmail.com>2025-06-06 12:26:43 -0400
committerHazelnoot <acomputerdog@gmail.com>2025-06-09 11:02:36 -0400
commit0c84d73294cb85a2126696abadb37003f3c08d7b (patch)
treeddd9986c9010d79f2e5aa9520bfe67f8f20c38c2 /packages/backend/src/misc
parentdisable caches in unit tests (diff)
downloadsharkey-0c84d73294cb85a2126696abadb37003f3c08d7b.tar.gz
sharkey-0c84d73294cb85a2126696abadb37003f3c08d7b.tar.bz2
sharkey-0c84d73294cb85a2126696abadb37003f3c08d7b.zip
move QuantumKVCache to a separate file
Diffstat (limited to 'packages/backend/src/misc')
-rw-r--r--packages/backend/src/misc/QuantumKVCache.ts318
-rw-r--r--packages/backend/src/misc/cache.ts311
2 files changed, 318 insertions, 311 deletions
diff --git a/packages/backend/src/misc/QuantumKVCache.ts b/packages/backend/src/misc/QuantumKVCache.ts
new file mode 100644
index 0000000000..6b36789f5e
--- /dev/null
+++ b/packages/backend/src/misc/QuantumKVCache.ts
@@ -0,0 +1,318 @@
+/*
+ * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+import { InternalEventService } from '@/core/InternalEventService.js';
+import { bindThis } from '@/decorators.js';
+import { InternalEventTypes } from '@/core/GlobalEventService.js';
+import { MemoryKVCache } from '@/misc/cache.js';
+
+export interface QuantumKVOpts<T> {
+ /**
+ * Memory cache lifetime in milliseconds.
+ */
+ lifetime: number;
+
+ /**
+ * Callback to fetch the value for a key that wasn't found in the cache.
+ * May be synchronous or async.
+ */
+ fetcher: (key: string, cache: QuantumKVCache<T>) => T | Promise<T>;
+
+ /**
+ * Optional callback when a value is created or changed in the cache, either locally or elsewhere in the cluster.
+ * This is called *after* the cache state is updated.
+ * May be synchronous or async.
+ */
+ onSet?: (key: string, cache: QuantumKVCache<T>) => void | Promise<void>;
+
+ /**
+ * Optional callback when a value is deleted from the cache, either locally or elsewhere in the cluster.
+ * This is called *after* the cache state is updated.
+ * May be synchronous or async.
+ */
+ onDelete?: (key: string, cache: QuantumKVCache<T>) => void | Promise<void>;
+}
+
+/**
+ * QuantumKVCache is a lifetime-bounded memory cache (like MemoryKVCache) with automatic cross-cluster synchronization via Redis.
+ * All nodes in the cluster are guaranteed to have a *subset* view of the current accurate state, though individual processes may have different items in their local cache.
+ * This ensures that a call to get() will never return stale data.
+ */
+export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
+ private readonly memoryCache: MemoryKVCache<T>;
+
+ public readonly fetcher: QuantumKVOpts<T>['fetcher'];
+ public readonly onSet: QuantumKVOpts<T>['onSet'];
+ public readonly onDelete: QuantumKVOpts<T>['onDelete'];
+
+ /**
+ * @param internalEventService Service bus to synchronize events.
+ * @param name Unique name of the cache - must be the same in all processes.
+ * @param opts Cache options
+ */
+ constructor(
+ private readonly internalEventService: InternalEventService,
+ private readonly name: string,
+ opts: QuantumKVOpts<T>,
+ ) {
+ this.memoryCache = new MemoryKVCache(opts.lifetime);
+ this.fetcher = opts.fetcher;
+ this.onSet = opts.onSet;
+ this.onDelete = opts.onDelete;
+
+ this.internalEventService.on('quantumCacheUpdated', this.onQuantumCacheUpdated, {
+ // Ignore our own events, otherwise we'll immediately erase any set value.
+ ignoreLocal: true,
+ });
+ }
+
+ /**
+ * The number of items currently in memory.
+ * This applies to the local subset view, not the cross-cluster cache state.
+ */
+ public get size() {
+ return this.memoryCache.size;
+ }
+
+ /**
+ * Iterates all [key, value] pairs in memory.
+ * This applies to the local subset view, not the cross-cluster cache state.
+ */
+ @bindThis
+ public *entries(): Generator<[key: string, value: T]> {
+ for (const entry of this.memoryCache.entries) {
+ yield [entry[0], entry[1].value];
+ }
+ }
+
+ /**
+ * Iterates all keys in memory.
+ * This applies to the local subset view, not the cross-cluster cache state.
+ */
+ @bindThis
+ public *keys() {
+ for (const entry of this.memoryCache.entries) {
+ yield entry[0];
+ }
+ }
+
+ /**
+ * Iterates all values pairs in memory.
+ * This applies to the local subset view, not the cross-cluster cache state.
+ */
+ @bindThis
+ public *values() {
+ for (const entry of this.memoryCache.entries) {
+ yield entry[1].value;
+ }
+ }
+
+ /**
+ * Creates or updates a value in the cache, and erases any stale caches across the cluster.
+ * Fires an onSet event after the cache has been updated in all processes.
+ * Skips if the value is unchanged.
+ */
+ @bindThis
+ public async set(key: string, value: T): Promise<void> {
+ if (this.memoryCache.get(key) === value) {
+ return;
+ }
+
+ this.memoryCache.set(key, value);
+
+ await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', keys: [key] });
+
+ if (this.onSet) {
+ await this.onSet(key, this);
+ }
+ }
+
+ /**
+ * Creates or updates multiple value in the cache, and erases any stale caches across the cluster.
+ * Fires an onSet for each changed item event after the cache has been updated in all processes.
+ * Skips if all values are unchanged.
+ */
+ @bindThis
+ public async setMany(items: Iterable<[key: string, value: T]>): Promise<void> {
+ const changedKeys: string[] = [];
+
+ for (const item of items) {
+ if (this.memoryCache.get(item[0]) !== item[1]) {
+ changedKeys.push(item[0]);
+ this.memoryCache.set(item[0], item[1]);
+ }
+ }
+
+ if (changedKeys.length > 0) {
+ await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', keys: changedKeys });
+
+ if (this.onSet) {
+ for (const key of changedKeys) {
+ await this.onSet(key, this);
+ }
+ }
+ }
+ }
+
+ /**
+ * Adds a value to the local memory cache without notifying other process.
+ * Neither a Redis event nor onSet callback will be fired, as the value has not actually changed.
+ * This should only be used when the value is known to be current, like after fetching from the database.
+ */
+ @bindThis
+ public add(key: string, value: T): void {
+ this.memoryCache.set(key, value);
+ }
+
+ /**
+ * Adds multiple values to the local memory cache without notifying other process.
+ * Neither a Redis event nor onSet callback will be fired, as the value has not actually changed.
+ * This should only be used when the value is known to be current, like after fetching from the database.
+ */
+ @bindThis
+ public addMany(items: Iterable<[key: string, value: T]>): void {
+ for (const [key, value] of items) {
+ this.memoryCache.set(key, value);
+ }
+ }
+
+ /**
+ * Gets a value from the local memory cache, or returns undefined if not found.
+ */
+ @bindThis
+ public get(key: string): T | undefined {
+ return this.memoryCache.get(key);
+ }
+
+ /**
+ * Gets or fetches a value from the cache.
+ * Fires an onSet event, but does not emit an update event to other processes.
+ */
+ @bindThis
+ public async fetch(key: string): Promise<T> {
+ let value = this.memoryCache.get(key);
+ if (value === undefined) {
+ value = await this.fetcher(key, this);
+ this.memoryCache.set(key, value);
+
+ if (this.onSet) {
+ await this.onSet(key, this);
+ }
+ }
+ return value;
+ }
+
+ /**
+ * Returns true is a key exists in memory.
+ * This applies to the local subset view, not the cross-cluster cache state.
+ */
+ @bindThis
+ public has(key: string): boolean {
+ return this.memoryCache.get(key) !== undefined;
+ }
+
+ /**
+ * Deletes a value from the cache, and erases any stale caches across the cluster.
+ * Fires an onDelete event after the cache has been updated in all processes.
+ */
+ @bindThis
+ public async delete(key: string): Promise<void> {
+ this.memoryCache.delete(key);
+
+ await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', keys: [key] });
+
+ if (this.onDelete) {
+ await this.onDelete(key, this);
+ }
+ }
+ /**
+ * Deletes multiple values from the cache, and erases any stale caches across the cluster.
+ * Fires an onDelete event for each key after the cache has been updated in all processes.
+ * Skips if the input is empty.
+ */
+ @bindThis
+ public async deleteMany(keys: string[]): Promise<void> {
+ if (keys.length === 0) {
+ return;
+ }
+
+ for (const key of keys) {
+ this.memoryCache.delete(key);
+ }
+
+ await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', keys });
+
+ if (this.onDelete) {
+ for (const key of keys) {
+ await this.onDelete(key, this);
+ }
+ }
+ }
+
+ /**
+ * Refreshes the value of a key from the fetcher, and erases any stale caches across the cluster.
+ * Fires an onSet event after the cache has been updated in all processes.
+ */
+ @bindThis
+ public async refresh(key: string): Promise<T> {
+ const value = await this.fetcher(key, this);
+ await this.set(key, value);
+ return value;
+ }
+
+ /**
+ * Erases all entries from the local memory cache.
+ * Does not send any events or update other processes.
+ */
+ @bindThis
+ public clear() {
+ this.memoryCache.clear();
+ }
+
+ /**
+ * Removes expired cache entries from the local view.
+ * Does not send any events or update other processes.
+ */
+ @bindThis
+ public gc() {
+ this.memoryCache.gc();
+ }
+
+ /**
+ * Erases all data and disconnects from the cluster.
+ * This *must* be called when shutting down to prevent memory leaks!
+ */
+ @bindThis
+ public dispose() {
+ this.internalEventService.off('quantumCacheUpdated', this.onQuantumCacheUpdated);
+
+ this.memoryCache.dispose();
+ }
+
+ @bindThis
+ private async onQuantumCacheUpdated(data: InternalEventTypes['quantumCacheUpdated']): Promise<void> {
+ if (data.name === this.name) {
+ for (const key of data.keys) {
+ this.memoryCache.delete(key);
+
+ if (data.op === 's' && this.onSet) {
+ await this.onSet(key, this);
+ }
+
+ if (data.op === 'd' && this.onDelete) {
+ await this.onDelete(key, this);
+ }
+ }
+ }
+ }
+
+ /**
+ * Iterates all [key, value] pairs in memory.
+ * This applies to the local subset view, not the cross-cluster cache state.
+ */
+ [Symbol.iterator](): Iterator<[key: string, value: T]> {
+ return this.entries();
+ }
+}
diff --git a/packages/backend/src/misc/cache.ts b/packages/backend/src/misc/cache.ts
index 0a1cf6adb4..932c0b409a 100644
--- a/packages/backend/src/misc/cache.ts
+++ b/packages/backend/src/misc/cache.ts
@@ -422,314 +422,3 @@ export class MemorySingleCache<T> {
return value;
}
}
-
-// TODO move to separate file
-
-export interface QuantumKVOpts<T> {
- /**
- * Memory cache lifetime in milliseconds.
- */
- lifetime: number;
-
- /**
- * Callback to fetch the value for a key that wasn't found in the cache.
- * May be synchronous or async.
- */
- fetcher: (key: string, cache: QuantumKVCache<T>) => T | Promise<T>;
-
- /**
- * Optional callback when a value is created or changed in the cache, either locally or elsewhere in the cluster.
- * This is called *after* the cache state is updated.
- * May be synchronous or async.
- */
- onSet?: (key: string, cache: QuantumKVCache<T>) => void | Promise<void>;
-
- /**
- * Optional callback when a value is deleted from the cache, either locally or elsewhere in the cluster.
- * This is called *after* the cache state is updated.
- * May be synchronous or async.
- */
- onDelete?: (key: string, cache: QuantumKVCache<T>) => void | Promise<void>;
-}
-
-/**
- * QuantumKVCache is a lifetime-bounded memory cache (like MemoryKVCache) with automatic cross-cluster synchronization via Redis.
- * All nodes in the cluster are guaranteed to have a *subset* view of the current accurate state, though individual processes may have different items in their local cache.
- * This ensures that a call to get() will never return stale data.
- */
-export class QuantumKVCache<T> implements Iterable<[key: string, value: T]> {
- private readonly memoryCache: MemoryKVCache<T>;
-
- public readonly fetcher: QuantumKVOpts<T>['fetcher'];
- public readonly onSet: QuantumKVOpts<T>['onSet'];
- public readonly onDelete: QuantumKVOpts<T>['onDelete'];
-
- /**
- * @param internalEventService Service bus to synchronize events.
- * @param name Unique name of the cache - must be the same in all processes.
- * @param opts Cache options
- */
- constructor(
- private readonly internalEventService: InternalEventService,
- private readonly name: string,
- opts: QuantumKVOpts<T>,
- ) {
- this.memoryCache = new MemoryKVCache(opts.lifetime);
- this.fetcher = opts.fetcher;
- this.onSet = opts.onSet;
- this.onDelete = opts.onDelete;
-
- this.internalEventService.on('quantumCacheUpdated', this.onQuantumCacheUpdated, {
- // Ignore our own events, otherwise we'll immediately erase any set value.
- ignoreLocal: true,
- });
- }
-
- /**
- * The number of items currently in memory.
- * This applies to the local subset view, not the cross-cluster cache state.
- */
- public get size() {
- return this.memoryCache.size;
- }
-
- /**
- * Iterates all [key, value] pairs in memory.
- * This applies to the local subset view, not the cross-cluster cache state.
- */
- @bindThis
- public *entries(): Generator<[key: string, value: T]> {
- for (const entry of this.memoryCache.entries) {
- yield [entry[0], entry[1].value];
- }
- }
-
- /**
- * Iterates all keys in memory.
- * This applies to the local subset view, not the cross-cluster cache state.
- */
- @bindThis
- public *keys() {
- for (const entry of this.memoryCache.entries) {
- yield entry[0];
- }
- }
-
- /**
- * Iterates all values pairs in memory.
- * This applies to the local subset view, not the cross-cluster cache state.
- */
- @bindThis
- public *values() {
- for (const entry of this.memoryCache.entries) {
- yield entry[1].value;
- }
- }
-
- /**
- * Creates or updates a value in the cache, and erases any stale caches across the cluster.
- * Fires an onSet event after the cache has been updated in all processes.
- * Skips if the value is unchanged.
- */
- @bindThis
- public async set(key: string, value: T): Promise<void> {
- if (this.memoryCache.get(key) === value) {
- return;
- }
-
- this.memoryCache.set(key, value);
-
- await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', keys: [key] });
-
- if (this.onSet) {
- await this.onSet(key, this);
- }
- }
-
- /**
- * Creates or updates multiple value in the cache, and erases any stale caches across the cluster.
- * Fires an onSet for each changed item event after the cache has been updated in all processes.
- * Skips if all values are unchanged.
- */
- @bindThis
- public async setMany(items: Iterable<[key: string, value: T]>): Promise<void> {
- const changedKeys: string[] = [];
-
- for (const item of items) {
- if (this.memoryCache.get(item[0]) !== item[1]) {
- changedKeys.push(item[0]);
- this.memoryCache.set(item[0], item[1]);
- }
- }
-
- if (changedKeys.length > 0) {
- await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', keys: changedKeys });
-
- if (this.onSet) {
- for (const key of changedKeys) {
- await this.onSet(key, this);
- }
- }
- }
- }
-
- /**
- * Adds a value to the local memory cache without notifying other process.
- * Neither a Redis event nor onSet callback will be fired, as the value has not actually changed.
- * This should only be used when the value is known to be current, like after fetching from the database.
- */
- @bindThis
- public add(key: string, value: T): void {
- this.memoryCache.set(key, value);
- }
-
- /**
- * Adds multiple values to the local memory cache without notifying other process.
- * Neither a Redis event nor onSet callback will be fired, as the value has not actually changed.
- * This should only be used when the value is known to be current, like after fetching from the database.
- */
- @bindThis
- public addMany(items: Iterable<[key: string, value: T]>): void {
- for (const [key, value] of items) {
- this.memoryCache.set(key, value);
- }
- }
-
- /**
- * Gets a value from the local memory cache, or returns undefined if not found.
- */
- @bindThis
- public get(key: string): T | undefined {
- return this.memoryCache.get(key);
- }
-
- /**
- * Gets or fetches a value from the cache.
- * Fires an onSet event, but does not emit an update event to other processes.
- */
- @bindThis
- public async fetch(key: string): Promise<T> {
- let value = this.memoryCache.get(key);
- if (value === undefined) {
- value = await this.fetcher(key, this);
- this.memoryCache.set(key, value);
-
- if (this.onSet) {
- await this.onSet(key, this);
- }
- }
- return value;
- }
-
- /**
- * Returns true is a key exists in memory.
- * This applies to the local subset view, not the cross-cluster cache state.
- */
- @bindThis
- public has(key: string): boolean {
- return this.memoryCache.get(key) !== undefined;
- }
-
- /**
- * Deletes a value from the cache, and erases any stale caches across the cluster.
- * Fires an onDelete event after the cache has been updated in all processes.
- */
- @bindThis
- public async delete(key: string): Promise<void> {
- this.memoryCache.delete(key);
-
- await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', keys: [key] });
-
- if (this.onDelete) {
- await this.onDelete(key, this);
- }
- }
- /**
- * Deletes multiple values from the cache, and erases any stale caches across the cluster.
- * Fires an onDelete event for each key after the cache has been updated in all processes.
- * Skips if the input is empty.
- */
- @bindThis
- public async deleteMany(keys: string[]): Promise<void> {
- if (keys.length === 0) {
- return;
- }
-
- for (const key of keys) {
- this.memoryCache.delete(key);
- }
-
- await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', keys });
-
- if (this.onDelete) {
- for (const key of keys) {
- await this.onDelete(key, this);
- }
- }
- }
-
- /**
- * Refreshes the value of a key from the fetcher, and erases any stale caches across the cluster.
- * Fires an onSet event after the cache has been updated in all processes.
- */
- @bindThis
- public async refresh(key: string): Promise<T> {
- const value = await this.fetcher(key, this);
- await this.set(key, value);
- return value;
- }
-
- /**
- * Erases all entries from the local memory cache.
- * Does not send any events or update other processes.
- */
- @bindThis
- public clear() {
- this.memoryCache.clear();
- }
-
- /**
- * Removes expired cache entries from the local view.
- * Does not send any events or update other processes.
- */
- @bindThis
- public gc() {
- this.memoryCache.gc();
- }
-
- /**
- * Erases all data and disconnects from the cluster.
- * This *must* be called when shutting down to prevent memory leaks!
- */
- @bindThis
- public dispose() {
- this.internalEventService.off('quantumCacheUpdated', this.onQuantumCacheUpdated);
-
- this.memoryCache.dispose();
- }
-
- @bindThis
- private async onQuantumCacheUpdated(data: InternalEventTypes['quantumCacheUpdated']): Promise<void> {
- if (data.name === this.name) {
- for (const key of data.keys) {
- this.memoryCache.delete(key);
-
- if (data.op === 's' && this.onSet) {
- await this.onSet(key, this);
- }
-
- if (data.op === 'd' && this.onDelete) {
- await this.onDelete(key, this);
- }
- }
- }
- }
-
- /**
- * Iterates all [key, value] pairs in memory.
- * This applies to the local subset view, not the cross-cluster cache state.
- */
- [Symbol.iterator](): Iterator<[key: string, value: T]> {
- return this.entries();
- }
-}