From 46a6612dc0e5eaa470170031012ae247f7a5eec5 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Thu, 5 Jun 2025 13:16:23 -0400 Subject: convert many RedisKVCaches to QuantumKVCache or MemoryKVCache --- packages/backend/src/core/CacheService.ts | 104 +++++++++++++++--------------- 1 file changed, 53 insertions(+), 51 deletions(-) (limited to 'packages/backend/src/core/CacheService.ts') diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index 1cf63221f9..f04b18c02b 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -7,12 +7,13 @@ import { Inject, Injectable } from '@nestjs/common'; import * as Redis from 'ioredis'; import { IsNull } from 'typeorm'; import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiFollowing, MiNote } from '@/models/_.js'; -import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js'; +import { MemoryKVCache, QuantumKVCache, RedisKVCache } from '@/misc/cache.js'; import type { MiLocalUser, MiUser } from '@/models/User.js'; import { DI } from '@/di-symbols.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { bindThis } from '@/decorators.js'; -import type { GlobalEvents } from '@/core/GlobalEventService.js'; +import type { GlobalEvents, InternalEventTypes } from '@/core/GlobalEventService.js'; +import { InternalEventService } from '@/core/InternalEventService.js'; import type { OnApplicationShutdown } from '@nestjs/common'; export interface FollowStats { @@ -39,12 +40,12 @@ export class CacheService implements OnApplicationShutdown { public localUserByNativeTokenCache: MemoryKVCache; public localUserByIdCache: MemoryKVCache; public uriPersonCache: MemoryKVCache; - public userProfileCache: RedisKVCache; - public userMutingsCache: RedisKVCache>; - public userBlockingCache: RedisKVCache>; - public userBlockedCache: RedisKVCache>; // NOTE: 「被」Blockキャッシュ - public renoteMutingsCache: RedisKVCache>; - public userFollowingsCache: RedisKVCache | undefined>>; + public userProfileCache: QuantumKVCache; + public userMutingsCache: QuantumKVCache>; + public userBlockingCache: QuantumKVCache>; + public userBlockedCache: QuantumKVCache>; // NOTE: 「被」Blockキャッシュ + public renoteMutingsCache: QuantumKVCache>; + public userFollowingsCache: QuantumKVCache | undefined>>; private readonly userFollowStatsCache = new MemoryKVCache(1000 * 60 * 10); // 10 minutes private readonly translationsCache: RedisKVCache; @@ -74,6 +75,7 @@ export class CacheService implements OnApplicationShutdown { private followingsRepository: FollowingsRepository, private userEntityService: UserEntityService, + private readonly internalEventService: InternalEventService, ) { //this.onMessage = this.onMessage.bind(this); @@ -82,49 +84,33 @@ export class CacheService implements OnApplicationShutdown { this.localUserByIdCache = new MemoryKVCache(1000 * 60 * 5); // 5m this.uriPersonCache = new MemoryKVCache(1000 * 60 * 5); // 5m - this.userProfileCache = new RedisKVCache(this.redisClient, 'userProfile', { + this.userProfileCache = new QuantumKVCache(this.internalEventService, 'userProfile', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m fetcher: (key) => this.userProfilesRepository.findOneByOrFail({ userId: key }), - toRedisConverter: (value) => JSON.stringify(value), - fromRedisConverter: (value) => JSON.parse(value), // TODO: date型の考慮 }); - this.userMutingsCache = new RedisKVCache>(this.redisClient, 'userMutings', { + this.userMutingsCache = new QuantumKVCache>(this.internalEventService, 'userMutings', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m fetcher: (key) => this.mutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))), - toRedisConverter: (value) => JSON.stringify(Array.from(value)), - fromRedisConverter: (value) => new Set(JSON.parse(value)), }); - this.userBlockingCache = new RedisKVCache>(this.redisClient, 'userBlocking', { + this.userBlockingCache = new QuantumKVCache>(this.internalEventService, 'userBlocking', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m fetcher: (key) => this.blockingsRepository.find({ where: { blockerId: key }, select: ['blockeeId'] }).then(xs => new Set(xs.map(x => x.blockeeId))), - toRedisConverter: (value) => JSON.stringify(Array.from(value)), - fromRedisConverter: (value) => new Set(JSON.parse(value)), }); - this.userBlockedCache = new RedisKVCache>(this.redisClient, 'userBlocked', { + this.userBlockedCache = new QuantumKVCache>(this.internalEventService, 'userBlocked', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m fetcher: (key) => this.blockingsRepository.find({ where: { blockeeId: key }, select: ['blockerId'] }).then(xs => new Set(xs.map(x => x.blockerId))), - toRedisConverter: (value) => JSON.stringify(Array.from(value)), - fromRedisConverter: (value) => new Set(JSON.parse(value)), }); - this.renoteMutingsCache = new RedisKVCache>(this.redisClient, 'renoteMutings', { + this.renoteMutingsCache = new QuantumKVCache>(this.internalEventService, 'renoteMutings', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m fetcher: (key) => this.renoteMutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))), - toRedisConverter: (value) => JSON.stringify(Array.from(value)), - fromRedisConverter: (value) => new Set(JSON.parse(value)), }); - this.userFollowingsCache = new RedisKVCache | undefined>>(this.redisClient, 'userFollowings', { + this.userFollowingsCache = new QuantumKVCache | undefined>>(this.internalEventService, 'userFollowings', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m fetcher: (key) => this.followingsRepository.find({ where: { followerId: key }, select: ['followeeId', 'withReplies'] }).then(xs => { const obj: Record | undefined> = {}; for (const x of xs) { @@ -132,8 +118,6 @@ export class CacheService implements OnApplicationShutdown { } return obj; }), - toRedisConverter: (value) => JSON.stringify(value), - fromRedisConverter: (value) => JSON.parse(value), }); this.translationsCache = new RedisKVCache(this.redisClient, 'translations', { @@ -143,20 +127,21 @@ export class CacheService implements OnApplicationShutdown { // NOTE: チャンネルのフォロー状況キャッシュはChannelFollowingServiceで行っている - this.redisForSub.on('message', this.onMessage); + this.internalEventService.on('userChangeSuspendedState', this.onUserEvent); + this.internalEventService.on('userChangeDeletedState', this.onUserEvent); + this.internalEventService.on('remoteUserUpdated', this.onUserEvent); + this.internalEventService.on('localUserUpdated', this.onUserEvent); + this.internalEventService.on('userChangeSuspendedState', this.onUserEvent); + this.internalEventService.on('userTokenRegenerated', this.onTokenEvent); + this.internalEventService.on('follow', this.onFollowEvent); + this.internalEventService.on('unfollow', this.onFollowEvent); } @bindThis - private async onMessage(_: string, data: string): Promise { - const obj = JSON.parse(data); - - if (obj.channel === 'internal') { - const { type, body } = obj.message as GlobalEvents['internal']['payload']; - switch (type) { - case 'userChangeSuspendedState': - case 'userChangeDeletedState': - case 'remoteUserUpdated': - case 'localUserUpdated': { + private async onUserEvent(body: InternalEventTypes[E]): Promise { + { + { + { const user = await this.usersRepository.findOneBy({ id: body.id }); if (user == null) { this.userByIdCache.delete(body.id); @@ -178,20 +163,32 @@ export class CacheService implements OnApplicationShutdown { this.localUserByIdCache.set(user.id, user); } } - break; } - case 'userTokenRegenerated': { + } + } + } + + private async onTokenEvent(body: InternalEventTypes[E]): Promise { + { + { + { const user = await this.usersRepository.findOneByOrFail({ id: body.id }) as MiLocalUser; this.localUserByNativeTokenCache.delete(body.oldToken); this.localUserByNativeTokenCache.set(body.newToken, user); - break; } + } + } + } + + private async onFollowEvent(body: InternalEventTypes[E], type: E): Promise { + { + switch (type) { case 'follow': { const follower = this.userByIdCache.get(body.followerId); if (follower) follower.followingCount++; const followee = this.userByIdCache.get(body.followeeId); if (followee) followee.followersCount++; - this.userFollowingsCache.delete(body.followerId); + await this.userFollowingsCache.delete(body.followerId); this.userFollowStatsCache.delete(body.followerId); this.userFollowStatsCache.delete(body.followeeId); break; @@ -201,13 +198,11 @@ export class CacheService implements OnApplicationShutdown { if (follower) follower.followingCount--; const followee = this.userByIdCache.get(body.followeeId); if (followee) followee.followersCount--; - this.userFollowingsCache.delete(body.followerId); + await this.userFollowingsCache.delete(body.followerId); this.userFollowStatsCache.delete(body.followerId); this.userFollowStatsCache.delete(body.followeeId); break; } - default: - break; } } } @@ -300,7 +295,14 @@ export class CacheService implements OnApplicationShutdown { @bindThis public dispose(): void { - this.redisForSub.off('message', this.onMessage); + this.internalEventService.off('userChangeSuspendedState', this.onUserEvent); + this.internalEventService.off('userChangeDeletedState', this.onUserEvent); + this.internalEventService.off('remoteUserUpdated', this.onUserEvent); + this.internalEventService.off('localUserUpdated', this.onUserEvent); + this.internalEventService.off('userChangeSuspendedState', this.onUserEvent); + this.internalEventService.off('userTokenRegenerated', this.onTokenEvent); + this.internalEventService.off('follow', this.onFollowEvent); + this.internalEventService.off('unfollow', this.onFollowEvent); this.userByIdCache.dispose(); this.localUserByNativeTokenCache.dispose(); this.localUserByIdCache.dispose(); -- cgit v1.2.3-freya From bf1156426eea0bd11a5926ba7883c870bad2e144 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Fri, 6 Jun 2025 00:08:34 -0400 Subject: add CacheService.getUserFollowings and CacheService.getUserBlockers --- packages/backend/src/core/CacheService.ts | 110 +++++++++++++++++++++++++++++- 1 file changed, 109 insertions(+), 1 deletion(-) (limited to 'packages/backend/src/core/CacheService.ts') diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index f04b18c02b..6e979130a0 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -5,7 +5,7 @@ import { Inject, Injectable } from '@nestjs/common'; import * as Redis from 'ioredis'; -import { IsNull } from 'typeorm'; +import { In, IsNull } from 'typeorm'; import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiFollowing, MiNote } from '@/models/_.js'; import { MemoryKVCache, QuantumKVCache, RedisKVCache } from '@/misc/cache.js'; import type { MiLocalUser, MiUser } from '@/models/User.js'; @@ -293,6 +293,114 @@ export class CacheService implements OnApplicationShutdown { }); } + @bindThis + public async getUserFollowings(userIds: Iterable): Promise>> { + const followings = new Map>(); + + const toFetch: string[] = []; + for (const userId of userIds) { + const fromCache = this.userFollowingsCache.get(userId); + if (fromCache) { + followings.set(userId, new Set(Object.keys(fromCache))); + } else { + toFetch.push(userId); + } + } + + if (toFetch.length > 0) { + const fetchedFollowings = await this.followingsRepository + .createQueryBuilder('following') + .select([ + 'following.followerId', + 'following.followeeId', + 'following.withReplies', + ]) + .where({ + followerId: In(toFetch), + }) + .getMany(); + + const toCache = new Map | undefined>>(); + + // Pivot to a map + for (const { followerId, followeeId, withReplies } of fetchedFollowings) { + // Queue for cache + let cacheSet = toCache.get(followerId); + if (!cacheSet) { + cacheSet = {}; + toCache.set(followerId, cacheSet); + } + cacheSet[followeeId] = { withReplies }; + + // Queue for return + let returnSet = followings.get(followerId); + if (!returnSet) { + returnSet = new Set(); + followings.set(followerId, returnSet); + } + returnSet.add(followeeId); + } + + // Update cache to speed up future calls + await this.userFollowingsCache.setMany(toCache.entries()); + } + + return followings; + } + + @bindThis + public async getUserBlockers(userIds: Iterable): Promise>> { + const blockers = new Map>(); + + const toFetch: string[] = []; + for (const userId of userIds) { + const fromCache = this.userBlockedCache.get(userId); + if (fromCache) { + blockers.set(userId, fromCache); + } else { + toFetch.push(userId); + } + } + + if (toFetch.length > 0) { + const fetchedBlockers = await this.blockingsRepository.createQueryBuilder('blocking') + .select([ + 'blocking.blockerId', + 'blocking.blockeeId', + ]) + .where({ + blockeeId: In(toFetch), + }) + .getMany(); + + const toCache = new Map>(); + + // Pivot to a map + for (const { blockerId, blockeeId } of fetchedBlockers) { + // Queue for cache + let cacheSet = toCache.get(blockeeId); + if (!cacheSet) { + cacheSet = new Set(); + toCache.set(blockeeId, cacheSet); + } + cacheSet.add(blockerId); + + // Queue for return + let returnSet = blockers.get(blockeeId); + if (!returnSet) { + returnSet = new Set(); + blockers.set(blockeeId, returnSet); + } + returnSet.add(blockerId); + } + + // Update cache to speed up future calls + await this.userBlockedCache.setMany(toCache.entries()); + } + + return blockers; + } + @bindThis public dispose(): void { this.internalEventService.off('userChangeSuspendedState', this.onUserEvent); -- cgit v1.2.3-freya From 9853a4f3bdf39e2e7b05765a5099e4d2554b1d2c Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Fri, 6 Jun 2025 02:14:20 -0400 Subject: use addMany instead of setMany when populating quantum caches from DB --- packages/backend/src/core/CacheService.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'packages/backend/src/core/CacheService.ts') diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index 6e979130a0..00d97c7e1d 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -342,7 +342,7 @@ export class CacheService implements OnApplicationShutdown { } // Update cache to speed up future calls - await this.userFollowingsCache.setMany(toCache.entries()); + this.userFollowingsCache.addMany(toCache); } return followings; @@ -395,7 +395,7 @@ export class CacheService implements OnApplicationShutdown { } // Update cache to speed up future calls - await this.userBlockedCache.setMany(toCache.entries()); + this.userBlockedCache.addMany(toCache); } return blockers; -- cgit v1.2.3-freya From 3d13860ec8fea79062ba0c498302cb19143e01f8 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Fri, 6 Jun 2025 02:15:31 -0400 Subject: update quantum caches when a user is deleted --- packages/backend/src/core/CacheService.ts | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) (limited to 'packages/backend/src/core/CacheService.ts') diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index 00d97c7e1d..1fba4f32d0 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -138,7 +138,7 @@ export class CacheService implements OnApplicationShutdown { } @bindThis - private async onUserEvent(body: InternalEventTypes[E]): Promise { + private async onUserEvent(body: InternalEventTypes[E], _: E, isLocal: boolean): Promise { { { { @@ -151,6 +151,16 @@ export class CacheService implements OnApplicationShutdown { this.uriPersonCache.delete(k); } } + if (isLocal) { + await Promise.all([ + this.userProfileCache.delete(body.id), + this.userMutingsCache.delete(body.id), + this.userBlockingCache.delete(body.id), + this.userBlockedCache.delete(body.id), + this.renoteMutingsCache.delete(body.id), + this.userFollowingsCache.delete(body.id), + ]); + } } else { this.userByIdCache.set(user.id, user); for (const [k, v] of this.uriPersonCache.entries) { -- cgit v1.2.3-freya From 68b84b28dd23cd1afea286946c1ada7efc25fed0 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Fri, 6 Jun 2025 02:15:59 -0400 Subject: implement CacheService.getUsers and CacheService.getUserProfiles --- packages/backend/src/core/CacheService.ts | 56 +++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) (limited to 'packages/backend/src/core/CacheService.ts') diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index 1fba4f32d0..ae24a9721f 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -411,6 +411,62 @@ export class CacheService implements OnApplicationShutdown { return blockers; } + public async getUserProfiles(userIds: Iterable): Promise> { + const profiles = new Map; + + const toFetch: string[] = []; + for (const userId of userIds) { + const fromCache = this.userProfileCache.get(userId); + if (fromCache) { + profiles.set(userId, fromCache); + } else { + toFetch.push(userId); + } + } + + if (toFetch.length > 0) { + const fetched = await this.userProfilesRepository.findBy({ + userId: In(toFetch), + }); + + for (const profile of fetched) { + profiles.set(profile.userId, profile); + } + + const toCache = new Map(fetched.map(p => [p.userId, p])); + this.userProfileCache.addMany(toCache); + } + + return profiles; + } + + public async getUsers(userIds: Iterable): Promise> { + const users = new Map; + + const toFetch: string[] = []; + for (const userId of userIds) { + const fromCache = this.userByIdCache.get(userId); + if (fromCache) { + users.set(userId, fromCache); + } else { + toFetch.push(userId); + } + } + + if (toFetch.length > 0) { + const fetched = await this.usersRepository.findBy({ + id: In(toFetch), + }); + + for (const user of fetched) { + users.set(user.id, user); + this.userByIdCache.set(user.id, user); + } + } + + return users; + } + @bindThis public dispose(): void { this.internalEventService.off('userChangeSuspendedState', this.onUserEvent); -- cgit v1.2.3-freya From 2e486f02ffb83192fd46fd231fb3604e1b982416 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Fri, 6 Jun 2025 12:17:04 -0400 Subject: implement no-op caches for testing --- packages/backend/src/core/CacheService.ts | 22 +++- packages/backend/src/misc/cache.ts | 41 +++++-- packages/backend/test/misc/noOpCaches.ts | 196 ++++++++++++++++++++++++++++++ 3 files changed, 244 insertions(+), 15 deletions(-) create mode 100644 packages/backend/test/misc/noOpCaches.ts (limited to 'packages/backend/src/core/CacheService.ts') diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index ae24a9721f..2c136eac2b 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -28,7 +28,7 @@ export interface CachedTranslation { text: string | undefined; } -interface CachedTranslationEntity { +export interface CachedTranslationEntity { l?: string; t?: string; u?: number; @@ -46,8 +46,8 @@ export class CacheService implements OnApplicationShutdown { public userBlockedCache: QuantumKVCache>; // NOTE: 「被」Blockキャッシュ public renoteMutingsCache: QuantumKVCache>; public userFollowingsCache: QuantumKVCache | undefined>>; - private readonly userFollowStatsCache = new MemoryKVCache(1000 * 60 * 10); // 10 minutes - private readonly translationsCache: RedisKVCache; + protected userFollowStatsCache = new MemoryKVCache(1000 * 60 * 10); // 10 minutes + protected translationsCache: RedisKVCache; constructor( @Inject(DI.redis) @@ -467,6 +467,22 @@ export class CacheService implements OnApplicationShutdown { return users; } + @bindThis + public clear(): void { + this.userByIdCache.clear(); + this.localUserByNativeTokenCache.clear(); + this.localUserByIdCache.clear(); + this.uriPersonCache.clear(); + this.userProfileCache.clear(); + this.userMutingsCache.clear(); + this.userBlockingCache.clear(); + this.userBlockedCache.clear(); + this.renoteMutingsCache.clear(); + this.userFollowingsCache.clear(); + this.userFollowStatsCache.clear(); + this.translationsCache.clear(); + } + @bindThis public dispose(): void { this.internalEventService.off('userChangeSuspendedState', this.onUserEvent); diff --git a/packages/backend/src/misc/cache.ts b/packages/backend/src/misc/cache.ts index 3145550a44..0a1cf6adb4 100644 --- a/packages/backend/src/misc/cache.ts +++ b/packages/backend/src/misc/cache.ts @@ -11,9 +11,9 @@ import { InternalEventTypes } from '@/core/GlobalEventService.js'; export class RedisKVCache { private readonly lifetime: number; private readonly memoryCache: MemoryKVCache; - private readonly fetcher: (key: string) => Promise; - private readonly toRedisConverter: (value: T) => string; - private readonly fromRedisConverter: (value: string) => T | undefined; + public readonly fetcher: (key: string) => Promise; + public readonly toRedisConverter: (value: T) => string; + public readonly fromRedisConverter: (value: string) => T | undefined; constructor( private redisClient: Redis.Redis, @@ -101,6 +101,11 @@ export class RedisKVCache { // TODO: イベント発行して他プロセスのメモリキャッシュも更新できるようにする } + @bindThis + public clear() { + this.memoryCache.clear(); + } + @bindThis public gc() { this.memoryCache.gc(); @@ -125,16 +130,17 @@ export class RedisSingleCache { opts: { lifetime: number; memoryCacheLifetime: number; - fetcher: RedisSingleCache['fetcher']; - toRedisConverter: RedisSingleCache['toRedisConverter']; - fromRedisConverter: RedisSingleCache['fromRedisConverter']; + fetcher?: RedisSingleCache['fetcher']; + toRedisConverter?: RedisSingleCache['toRedisConverter']; + fromRedisConverter?: RedisSingleCache['fromRedisConverter']; }, ) { this.lifetime = opts.lifetime; this.memoryCache = new MemorySingleCache(opts.memoryCacheLifetime); - this.fetcher = opts.fetcher; - this.toRedisConverter = opts.toRedisConverter; - this.fromRedisConverter = opts.fromRedisConverter; + + this.fetcher = opts.fetcher ?? (() => { throw new Error('fetch not supported - use get/set directly'); }); + this.toRedisConverter = opts.toRedisConverter ?? ((value) => JSON.stringify(value)); + this.fromRedisConverter = opts.fromRedisConverter ?? ((value) => JSON.parse(value)); } @bindThis @@ -417,6 +423,8 @@ export class MemorySingleCache { } } +// TODO move to separate file + export interface QuantumKVOpts { /** * Memory cache lifetime in milliseconds. @@ -452,9 +460,9 @@ export interface QuantumKVOpts { export class QuantumKVCache implements Iterable<[key: string, value: T]> { private readonly memoryCache: MemoryKVCache; - private readonly fetcher: QuantumKVOpts['fetcher']; - private readonly onSet: QuantumKVOpts['onSet']; - private readonly onDelete: QuantumKVOpts['onDelete']; + public readonly fetcher: QuantumKVOpts['fetcher']; + public readonly onSet: QuantumKVOpts['onSet']; + public readonly onDelete: QuantumKVOpts['onDelete']; /** * @param internalEventService Service bus to synchronize events. @@ -676,6 +684,15 @@ export class QuantumKVCache implements Iterable<[key: string, value: T]> { * Does not send any events or update other processes. */ @bindThis + public clear() { + this.memoryCache.clear(); + } + + /** + * Removes expired cache entries from the local view. + * Does not send any events or update other processes. + */ + @bindThis public gc() { this.memoryCache.gc(); } diff --git a/packages/backend/test/misc/noOpCaches.ts b/packages/backend/test/misc/noOpCaches.ts new file mode 100644 index 0000000000..373c7bddcc --- /dev/null +++ b/packages/backend/test/misc/noOpCaches.ts @@ -0,0 +1,196 @@ +/* + * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import * as Redis from 'ioredis'; +import { Inject } from '@nestjs/common'; +import { FakeInternalEventService } from './FakeInternalEventService.js'; +import type { BlockingsRepository, FollowingsRepository, MiFollowing, MiUser, MiUserProfile, MutingsRepository, RenoteMutingsRepository, UserProfilesRepository, UsersRepository } from '@/models/_.js'; +import type { MiLocalUser } from '@/models/User.js'; +import { MemoryKVCache, MemorySingleCache, QuantumKVCache, QuantumKVOpts, RedisKVCache, RedisSingleCache } from '@/misc/cache.js'; +import { CacheService, CachedTranslationEntity, FollowStats } from '@/core/CacheService.js'; +import { DI } from '@/di-symbols.js'; +import { UserEntityService } from '@/core/entities/UserEntityService.js'; + +export function noOpRedis() { + return { + set: () => Promise.resolve(), + get: () => Promise.resolve(null), + del: () => Promise.resolve(), + on: () => {}, + off: () => {}, + } as unknown as Redis.Redis; +} + +export class NoOpCacheService extends CacheService { + public readonly fakeRedis: { + [K in keyof Redis.Redis]: Redis.Redis[K]; + }; + public readonly fakeInternalEventService: FakeInternalEventService; + + constructor( + @Inject(DI.usersRepository) + usersRepository: UsersRepository, + + @Inject(DI.userProfilesRepository) + userProfilesRepository: UserProfilesRepository, + + @Inject(DI.mutingsRepository) + mutingsRepository: MutingsRepository, + + @Inject(DI.blockingsRepository) + blockingsRepository: BlockingsRepository, + + @Inject(DI.renoteMutingsRepository) + renoteMutingsRepository: RenoteMutingsRepository, + + @Inject(DI.followingsRepository) + followingsRepository: FollowingsRepository, + + @Inject(UserEntityService) + userEntityService: UserEntityService, + ) { + const fakeRedis = noOpRedis(); + const fakeInternalEventService = new FakeInternalEventService(); + + super( + fakeRedis, + fakeRedis, + usersRepository, + userProfilesRepository, + mutingsRepository, + blockingsRepository, + renoteMutingsRepository, + followingsRepository, + userEntityService, + fakeInternalEventService, + ); + + this.fakeRedis = fakeRedis; + this.fakeInternalEventService = fakeInternalEventService; + + // Override caches + this.userByIdCache = new NoOpMemoryKVCache(); + this.localUserByNativeTokenCache = new NoOpMemoryKVCache(); + this.localUserByIdCache = new NoOpMemoryKVCache(); + this.uriPersonCache = new NoOpMemoryKVCache(); + this.userProfileCache = new NoOpQuantumKVCache({ + internalEventService: fakeInternalEventService, + fetcher: this.userProfileCache.fetcher, + onSet: this.userProfileCache.onSet, + onDelete: this.userProfileCache.onDelete, + }); + this.userMutingsCache = new NoOpQuantumKVCache>({ + internalEventService: fakeInternalEventService, + fetcher: this.userMutingsCache.fetcher, + onSet: this.userMutingsCache.onSet, + onDelete: this.userMutingsCache.onDelete, + }); + this.userBlockingCache = new NoOpQuantumKVCache>({ + internalEventService: fakeInternalEventService, + fetcher: this.userBlockingCache.fetcher, + onSet: this.userBlockingCache.onSet, + onDelete: this.userBlockingCache.onDelete, + }); + this.userBlockedCache = new NoOpQuantumKVCache>({ + internalEventService: fakeInternalEventService, + fetcher: this.userBlockedCache.fetcher, + onSet: this.userBlockedCache.onSet, + onDelete: this.userBlockedCache.onDelete, + }); + this.renoteMutingsCache = new NoOpQuantumKVCache>({ + internalEventService: fakeInternalEventService, + fetcher: this.renoteMutingsCache.fetcher, + onSet: this.renoteMutingsCache.onSet, + onDelete: this.renoteMutingsCache.onDelete, + }); + this.userFollowingsCache = new NoOpQuantumKVCache | undefined>>({ + internalEventService: fakeInternalEventService, + fetcher: this.userFollowingsCache.fetcher, + onSet: this.userFollowingsCache.onSet, + onDelete: this.userFollowingsCache.onDelete, + }); + this.userFollowStatsCache = new NoOpMemoryKVCache(); + this.translationsCache = new NoOpRedisKVCache({ + redis: fakeRedis, + fetcher: this.translationsCache.fetcher, + toRedisConverter: this.translationsCache.toRedisConverter, + fromRedisConverter: this.translationsCache.fromRedisConverter, + }); + } +} + +export class NoOpMemoryKVCache extends MemoryKVCache { + constructor() { + super(-1); + } +} + +export class NoOpMemorySingleCache extends MemorySingleCache { + constructor() { + super(-1); + } +} + +export class NoOpRedisKVCache extends RedisKVCache { + constructor(opts?: { + redis?: Redis.Redis; + fetcher?: RedisKVCache['fetcher']; + toRedisConverter?: RedisKVCache['toRedisConverter']; + fromRedisConverter?: RedisKVCache['fromRedisConverter']; + }) { + super( + opts?.redis ?? noOpRedis(), + 'no-op', + { + lifetime: -1, + memoryCacheLifetime: -1, + fetcher: opts?.fetcher, + toRedisConverter: opts?.toRedisConverter, + fromRedisConverter: opts?.fromRedisConverter, + }, + ); + } +} + +export class NoOpRedisSingleCache extends RedisSingleCache { + constructor(opts?: { + fakeRedis?: Redis.Redis; + fetcher?: RedisSingleCache['fetcher']; + toRedisConverter?: RedisSingleCache['toRedisConverter']; + fromRedisConverter?: RedisSingleCache['fromRedisConverter']; + }) { + super( + opts?.fakeRedis ?? noOpRedis(), + 'no-op', + { + lifetime: -1, + memoryCacheLifetime: -1, + fetcher: opts?.fetcher, + toRedisConverter: opts?.toRedisConverter, + fromRedisConverter: opts?.fromRedisConverter, + }, + ); + } +} + +export class NoOpQuantumKVCache extends QuantumKVCache { + constructor(opts: { + internalEventService?: FakeInternalEventService, + fetcher: QuantumKVOpts['fetcher'], + onSet?: QuantumKVOpts['onSet'], + onDelete?: QuantumKVOpts['onDelete'], + }) { + super( + opts.internalEventService ?? new FakeInternalEventService(), + 'no-op', + { + lifetime: -1, + fetcher: opts.fetcher, + onSet: opts.onSet, + onDelete: opts.onDelete, + }, + ); + } +} -- cgit v1.2.3-freya From 0c84d73294cb85a2126696abadb37003f3c08d7b Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Fri, 6 Jun 2025 12:26:43 -0400 Subject: move QuantumKVCache to a separate file --- packages/backend/src/core/CacheService.ts | 3 +- .../backend/src/core/ChannelFollowingService.ts | 2 +- .../backend/src/core/PushNotificationService.ts | 2 +- packages/backend/src/core/UserListService.ts | 2 +- packages/backend/src/misc/QuantumKVCache.ts | 318 +++++++++++ packages/backend/src/misc/cache.ts | 311 ----------- packages/backend/test/misc/noOpCaches.ts | 3 +- packages/backend/test/unit/misc/QuantumKVCache.ts | 596 ++++++++++++++++++++ packages/backend/test/unit/misc/cache.ts | 597 --------------------- 9 files changed, 921 insertions(+), 913 deletions(-) create mode 100644 packages/backend/src/misc/QuantumKVCache.ts create mode 100644 packages/backend/test/unit/misc/QuantumKVCache.ts delete mode 100644 packages/backend/test/unit/misc/cache.ts (limited to 'packages/backend/src/core/CacheService.ts') diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index 2c136eac2b..e59857b4ce 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -7,7 +7,8 @@ import { Inject, Injectable } from '@nestjs/common'; import * as Redis from 'ioredis'; import { In, IsNull } from 'typeorm'; import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiFollowing, MiNote } from '@/models/_.js'; -import { MemoryKVCache, QuantumKVCache, RedisKVCache } from '@/misc/cache.js'; +import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js'; +import { QuantumKVCache } from '@/misc/QuantumKVCache.js'; import type { MiLocalUser, MiUser } from '@/models/User.js'; import { DI } from '@/di-symbols.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; diff --git a/packages/backend/src/core/ChannelFollowingService.ts b/packages/backend/src/core/ChannelFollowingService.ts index 26b023179c..430711fef1 100644 --- a/packages/backend/src/core/ChannelFollowingService.ts +++ b/packages/backend/src/core/ChannelFollowingService.ts @@ -12,7 +12,7 @@ import { IdService } from '@/core/IdService.js'; import { GlobalEvents, GlobalEventService, InternalEventTypes } from '@/core/GlobalEventService.js'; import { bindThis } from '@/decorators.js'; import type { MiLocalUser } from '@/models/User.js'; -import { QuantumKVCache, RedisKVCache } from '@/misc/cache.js'; +import { QuantumKVCache } from '@/misc/QuantumKVCache.js'; import { InternalEventService } from './InternalEventService.js'; @Injectable() diff --git a/packages/backend/src/core/PushNotificationService.ts b/packages/backend/src/core/PushNotificationService.ts index 38bc5e3901..e3f10d4504 100644 --- a/packages/backend/src/core/PushNotificationService.ts +++ b/packages/backend/src/core/PushNotificationService.ts @@ -12,7 +12,7 @@ import type { Packed } from '@/misc/json-schema.js'; import { getNoteSummary } from '@/misc/get-note-summary.js'; import type { MiMeta, MiSwSubscription, SwSubscriptionsRepository } from '@/models/_.js'; import { bindThis } from '@/decorators.js'; -import { QuantumKVCache, RedisKVCache } from '@/misc/cache.js'; +import { QuantumKVCache } from '@/misc/QuantumKVCache.js'; import { InternalEventService } from '@/core/InternalEventService.js'; // Defined also packages/sw/types.ts#L13 diff --git a/packages/backend/src/core/UserListService.ts b/packages/backend/src/core/UserListService.ts index 0d2220049a..b4486b9808 100644 --- a/packages/backend/src/core/UserListService.ts +++ b/packages/backend/src/core/UserListService.ts @@ -17,7 +17,7 @@ import { DI } from '@/di-symbols.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { bindThis } from '@/decorators.js'; import { QueueService } from '@/core/QueueService.js'; -import { QuantumKVCache, RedisKVCache } from '@/misc/cache.js'; +import { QuantumKVCache } from '@/misc/QuantumKVCache.js'; import { RoleService } from '@/core/RoleService.js'; import { SystemAccountService } from '@/core/SystemAccountService.js'; import { InternalEventService } from '@/core/InternalEventService.js'; diff --git a/packages/backend/src/misc/QuantumKVCache.ts b/packages/backend/src/misc/QuantumKVCache.ts new file mode 100644 index 0000000000..6b36789f5e --- /dev/null +++ b/packages/backend/src/misc/QuantumKVCache.ts @@ -0,0 +1,318 @@ +/* + * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { InternalEventService } from '@/core/InternalEventService.js'; +import { bindThis } from '@/decorators.js'; +import { InternalEventTypes } from '@/core/GlobalEventService.js'; +import { MemoryKVCache } from '@/misc/cache.js'; + +export interface QuantumKVOpts { + /** + * Memory cache lifetime in milliseconds. + */ + lifetime: number; + + /** + * Callback to fetch the value for a key that wasn't found in the cache. + * May be synchronous or async. + */ + fetcher: (key: string, cache: QuantumKVCache) => T | Promise; + + /** + * Optional callback when a value is created or changed in the cache, either locally or elsewhere in the cluster. + * This is called *after* the cache state is updated. + * May be synchronous or async. + */ + onSet?: (key: string, cache: QuantumKVCache) => void | Promise; + + /** + * Optional callback when a value is deleted from the cache, either locally or elsewhere in the cluster. + * This is called *after* the cache state is updated. + * May be synchronous or async. + */ + onDelete?: (key: string, cache: QuantumKVCache) => void | Promise; +} + +/** + * QuantumKVCache is a lifetime-bounded memory cache (like MemoryKVCache) with automatic cross-cluster synchronization via Redis. + * All nodes in the cluster are guaranteed to have a *subset* view of the current accurate state, though individual processes may have different items in their local cache. + * This ensures that a call to get() will never return stale data. + */ +export class QuantumKVCache implements Iterable<[key: string, value: T]> { + private readonly memoryCache: MemoryKVCache; + + public readonly fetcher: QuantumKVOpts['fetcher']; + public readonly onSet: QuantumKVOpts['onSet']; + public readonly onDelete: QuantumKVOpts['onDelete']; + + /** + * @param internalEventService Service bus to synchronize events. + * @param name Unique name of the cache - must be the same in all processes. + * @param opts Cache options + */ + constructor( + private readonly internalEventService: InternalEventService, + private readonly name: string, + opts: QuantumKVOpts, + ) { + this.memoryCache = new MemoryKVCache(opts.lifetime); + this.fetcher = opts.fetcher; + this.onSet = opts.onSet; + this.onDelete = opts.onDelete; + + this.internalEventService.on('quantumCacheUpdated', this.onQuantumCacheUpdated, { + // Ignore our own events, otherwise we'll immediately erase any set value. + ignoreLocal: true, + }); + } + + /** + * The number of items currently in memory. + * This applies to the local subset view, not the cross-cluster cache state. + */ + public get size() { + return this.memoryCache.size; + } + + /** + * Iterates all [key, value] pairs in memory. + * This applies to the local subset view, not the cross-cluster cache state. + */ + @bindThis + public *entries(): Generator<[key: string, value: T]> { + for (const entry of this.memoryCache.entries) { + yield [entry[0], entry[1].value]; + } + } + + /** + * Iterates all keys in memory. + * This applies to the local subset view, not the cross-cluster cache state. + */ + @bindThis + public *keys() { + for (const entry of this.memoryCache.entries) { + yield entry[0]; + } + } + + /** + * Iterates all values pairs in memory. + * This applies to the local subset view, not the cross-cluster cache state. + */ + @bindThis + public *values() { + for (const entry of this.memoryCache.entries) { + yield entry[1].value; + } + } + + /** + * Creates or updates a value in the cache, and erases any stale caches across the cluster. + * Fires an onSet event after the cache has been updated in all processes. + * Skips if the value is unchanged. + */ + @bindThis + public async set(key: string, value: T): Promise { + if (this.memoryCache.get(key) === value) { + return; + } + + this.memoryCache.set(key, value); + + await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', keys: [key] }); + + if (this.onSet) { + await this.onSet(key, this); + } + } + + /** + * Creates or updates multiple value in the cache, and erases any stale caches across the cluster. + * Fires an onSet for each changed item event after the cache has been updated in all processes. + * Skips if all values are unchanged. + */ + @bindThis + public async setMany(items: Iterable<[key: string, value: T]>): Promise { + const changedKeys: string[] = []; + + for (const item of items) { + if (this.memoryCache.get(item[0]) !== item[1]) { + changedKeys.push(item[0]); + this.memoryCache.set(item[0], item[1]); + } + } + + if (changedKeys.length > 0) { + await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', keys: changedKeys }); + + if (this.onSet) { + for (const key of changedKeys) { + await this.onSet(key, this); + } + } + } + } + + /** + * Adds a value to the local memory cache without notifying other process. + * Neither a Redis event nor onSet callback will be fired, as the value has not actually changed. + * This should only be used when the value is known to be current, like after fetching from the database. + */ + @bindThis + public add(key: string, value: T): void { + this.memoryCache.set(key, value); + } + + /** + * Adds multiple values to the local memory cache without notifying other process. + * Neither a Redis event nor onSet callback will be fired, as the value has not actually changed. + * This should only be used when the value is known to be current, like after fetching from the database. + */ + @bindThis + public addMany(items: Iterable<[key: string, value: T]>): void { + for (const [key, value] of items) { + this.memoryCache.set(key, value); + } + } + + /** + * Gets a value from the local memory cache, or returns undefined if not found. + */ + @bindThis + public get(key: string): T | undefined { + return this.memoryCache.get(key); + } + + /** + * Gets or fetches a value from the cache. + * Fires an onSet event, but does not emit an update event to other processes. + */ + @bindThis + public async fetch(key: string): Promise { + let value = this.memoryCache.get(key); + if (value === undefined) { + value = await this.fetcher(key, this); + this.memoryCache.set(key, value); + + if (this.onSet) { + await this.onSet(key, this); + } + } + return value; + } + + /** + * Returns true is a key exists in memory. + * This applies to the local subset view, not the cross-cluster cache state. + */ + @bindThis + public has(key: string): boolean { + return this.memoryCache.get(key) !== undefined; + } + + /** + * Deletes a value from the cache, and erases any stale caches across the cluster. + * Fires an onDelete event after the cache has been updated in all processes. + */ + @bindThis + public async delete(key: string): Promise { + this.memoryCache.delete(key); + + await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', keys: [key] }); + + if (this.onDelete) { + await this.onDelete(key, this); + } + } + /** + * Deletes multiple values from the cache, and erases any stale caches across the cluster. + * Fires an onDelete event for each key after the cache has been updated in all processes. + * Skips if the input is empty. + */ + @bindThis + public async deleteMany(keys: string[]): Promise { + if (keys.length === 0) { + return; + } + + for (const key of keys) { + this.memoryCache.delete(key); + } + + await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', keys }); + + if (this.onDelete) { + for (const key of keys) { + await this.onDelete(key, this); + } + } + } + + /** + * Refreshes the value of a key from the fetcher, and erases any stale caches across the cluster. + * Fires an onSet event after the cache has been updated in all processes. + */ + @bindThis + public async refresh(key: string): Promise { + const value = await this.fetcher(key, this); + await this.set(key, value); + return value; + } + + /** + * Erases all entries from the local memory cache. + * Does not send any events or update other processes. + */ + @bindThis + public clear() { + this.memoryCache.clear(); + } + + /** + * Removes expired cache entries from the local view. + * Does not send any events or update other processes. + */ + @bindThis + public gc() { + this.memoryCache.gc(); + } + + /** + * Erases all data and disconnects from the cluster. + * This *must* be called when shutting down to prevent memory leaks! + */ + @bindThis + public dispose() { + this.internalEventService.off('quantumCacheUpdated', this.onQuantumCacheUpdated); + + this.memoryCache.dispose(); + } + + @bindThis + private async onQuantumCacheUpdated(data: InternalEventTypes['quantumCacheUpdated']): Promise { + if (data.name === this.name) { + for (const key of data.keys) { + this.memoryCache.delete(key); + + if (data.op === 's' && this.onSet) { + await this.onSet(key, this); + } + + if (data.op === 'd' && this.onDelete) { + await this.onDelete(key, this); + } + } + } + } + + /** + * Iterates all [key, value] pairs in memory. + * This applies to the local subset view, not the cross-cluster cache state. + */ + [Symbol.iterator](): Iterator<[key: string, value: T]> { + return this.entries(); + } +} diff --git a/packages/backend/src/misc/cache.ts b/packages/backend/src/misc/cache.ts index 0a1cf6adb4..932c0b409a 100644 --- a/packages/backend/src/misc/cache.ts +++ b/packages/backend/src/misc/cache.ts @@ -422,314 +422,3 @@ export class MemorySingleCache { return value; } } - -// TODO move to separate file - -export interface QuantumKVOpts { - /** - * Memory cache lifetime in milliseconds. - */ - lifetime: number; - - /** - * Callback to fetch the value for a key that wasn't found in the cache. - * May be synchronous or async. - */ - fetcher: (key: string, cache: QuantumKVCache) => T | Promise; - - /** - * Optional callback when a value is created or changed in the cache, either locally or elsewhere in the cluster. - * This is called *after* the cache state is updated. - * May be synchronous or async. - */ - onSet?: (key: string, cache: QuantumKVCache) => void | Promise; - - /** - * Optional callback when a value is deleted from the cache, either locally or elsewhere in the cluster. - * This is called *after* the cache state is updated. - * May be synchronous or async. - */ - onDelete?: (key: string, cache: QuantumKVCache) => void | Promise; -} - -/** - * QuantumKVCache is a lifetime-bounded memory cache (like MemoryKVCache) with automatic cross-cluster synchronization via Redis. - * All nodes in the cluster are guaranteed to have a *subset* view of the current accurate state, though individual processes may have different items in their local cache. - * This ensures that a call to get() will never return stale data. - */ -export class QuantumKVCache implements Iterable<[key: string, value: T]> { - private readonly memoryCache: MemoryKVCache; - - public readonly fetcher: QuantumKVOpts['fetcher']; - public readonly onSet: QuantumKVOpts['onSet']; - public readonly onDelete: QuantumKVOpts['onDelete']; - - /** - * @param internalEventService Service bus to synchronize events. - * @param name Unique name of the cache - must be the same in all processes. - * @param opts Cache options - */ - constructor( - private readonly internalEventService: InternalEventService, - private readonly name: string, - opts: QuantumKVOpts, - ) { - this.memoryCache = new MemoryKVCache(opts.lifetime); - this.fetcher = opts.fetcher; - this.onSet = opts.onSet; - this.onDelete = opts.onDelete; - - this.internalEventService.on('quantumCacheUpdated', this.onQuantumCacheUpdated, { - // Ignore our own events, otherwise we'll immediately erase any set value. - ignoreLocal: true, - }); - } - - /** - * The number of items currently in memory. - * This applies to the local subset view, not the cross-cluster cache state. - */ - public get size() { - return this.memoryCache.size; - } - - /** - * Iterates all [key, value] pairs in memory. - * This applies to the local subset view, not the cross-cluster cache state. - */ - @bindThis - public *entries(): Generator<[key: string, value: T]> { - for (const entry of this.memoryCache.entries) { - yield [entry[0], entry[1].value]; - } - } - - /** - * Iterates all keys in memory. - * This applies to the local subset view, not the cross-cluster cache state. - */ - @bindThis - public *keys() { - for (const entry of this.memoryCache.entries) { - yield entry[0]; - } - } - - /** - * Iterates all values pairs in memory. - * This applies to the local subset view, not the cross-cluster cache state. - */ - @bindThis - public *values() { - for (const entry of this.memoryCache.entries) { - yield entry[1].value; - } - } - - /** - * Creates or updates a value in the cache, and erases any stale caches across the cluster. - * Fires an onSet event after the cache has been updated in all processes. - * Skips if the value is unchanged. - */ - @bindThis - public async set(key: string, value: T): Promise { - if (this.memoryCache.get(key) === value) { - return; - } - - this.memoryCache.set(key, value); - - await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', keys: [key] }); - - if (this.onSet) { - await this.onSet(key, this); - } - } - - /** - * Creates or updates multiple value in the cache, and erases any stale caches across the cluster. - * Fires an onSet for each changed item event after the cache has been updated in all processes. - * Skips if all values are unchanged. - */ - @bindThis - public async setMany(items: Iterable<[key: string, value: T]>): Promise { - const changedKeys: string[] = []; - - for (const item of items) { - if (this.memoryCache.get(item[0]) !== item[1]) { - changedKeys.push(item[0]); - this.memoryCache.set(item[0], item[1]); - } - } - - if (changedKeys.length > 0) { - await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', keys: changedKeys }); - - if (this.onSet) { - for (const key of changedKeys) { - await this.onSet(key, this); - } - } - } - } - - /** - * Adds a value to the local memory cache without notifying other process. - * Neither a Redis event nor onSet callback will be fired, as the value has not actually changed. - * This should only be used when the value is known to be current, like after fetching from the database. - */ - @bindThis - public add(key: string, value: T): void { - this.memoryCache.set(key, value); - } - - /** - * Adds multiple values to the local memory cache without notifying other process. - * Neither a Redis event nor onSet callback will be fired, as the value has not actually changed. - * This should only be used when the value is known to be current, like after fetching from the database. - */ - @bindThis - public addMany(items: Iterable<[key: string, value: T]>): void { - for (const [key, value] of items) { - this.memoryCache.set(key, value); - } - } - - /** - * Gets a value from the local memory cache, or returns undefined if not found. - */ - @bindThis - public get(key: string): T | undefined { - return this.memoryCache.get(key); - } - - /** - * Gets or fetches a value from the cache. - * Fires an onSet event, but does not emit an update event to other processes. - */ - @bindThis - public async fetch(key: string): Promise { - let value = this.memoryCache.get(key); - if (value === undefined) { - value = await this.fetcher(key, this); - this.memoryCache.set(key, value); - - if (this.onSet) { - await this.onSet(key, this); - } - } - return value; - } - - /** - * Returns true is a key exists in memory. - * This applies to the local subset view, not the cross-cluster cache state. - */ - @bindThis - public has(key: string): boolean { - return this.memoryCache.get(key) !== undefined; - } - - /** - * Deletes a value from the cache, and erases any stale caches across the cluster. - * Fires an onDelete event after the cache has been updated in all processes. - */ - @bindThis - public async delete(key: string): Promise { - this.memoryCache.delete(key); - - await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', keys: [key] }); - - if (this.onDelete) { - await this.onDelete(key, this); - } - } - /** - * Deletes multiple values from the cache, and erases any stale caches across the cluster. - * Fires an onDelete event for each key after the cache has been updated in all processes. - * Skips if the input is empty. - */ - @bindThis - public async deleteMany(keys: string[]): Promise { - if (keys.length === 0) { - return; - } - - for (const key of keys) { - this.memoryCache.delete(key); - } - - await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', keys }); - - if (this.onDelete) { - for (const key of keys) { - await this.onDelete(key, this); - } - } - } - - /** - * Refreshes the value of a key from the fetcher, and erases any stale caches across the cluster. - * Fires an onSet event after the cache has been updated in all processes. - */ - @bindThis - public async refresh(key: string): Promise { - const value = await this.fetcher(key, this); - await this.set(key, value); - return value; - } - - /** - * Erases all entries from the local memory cache. - * Does not send any events or update other processes. - */ - @bindThis - public clear() { - this.memoryCache.clear(); - } - - /** - * Removes expired cache entries from the local view. - * Does not send any events or update other processes. - */ - @bindThis - public gc() { - this.memoryCache.gc(); - } - - /** - * Erases all data and disconnects from the cluster. - * This *must* be called when shutting down to prevent memory leaks! - */ - @bindThis - public dispose() { - this.internalEventService.off('quantumCacheUpdated', this.onQuantumCacheUpdated); - - this.memoryCache.dispose(); - } - - @bindThis - private async onQuantumCacheUpdated(data: InternalEventTypes['quantumCacheUpdated']): Promise { - if (data.name === this.name) { - for (const key of data.keys) { - this.memoryCache.delete(key); - - if (data.op === 's' && this.onSet) { - await this.onSet(key, this); - } - - if (data.op === 'd' && this.onDelete) { - await this.onDelete(key, this); - } - } - } - } - - /** - * Iterates all [key, value] pairs in memory. - * This applies to the local subset view, not the cross-cluster cache state. - */ - [Symbol.iterator](): Iterator<[key: string, value: T]> { - return this.entries(); - } -} diff --git a/packages/backend/test/misc/noOpCaches.ts b/packages/backend/test/misc/noOpCaches.ts index 373c7bddcc..c05632239b 100644 --- a/packages/backend/test/misc/noOpCaches.ts +++ b/packages/backend/test/misc/noOpCaches.ts @@ -8,7 +8,8 @@ import { Inject } from '@nestjs/common'; import { FakeInternalEventService } from './FakeInternalEventService.js'; import type { BlockingsRepository, FollowingsRepository, MiFollowing, MiUser, MiUserProfile, MutingsRepository, RenoteMutingsRepository, UserProfilesRepository, UsersRepository } from '@/models/_.js'; import type { MiLocalUser } from '@/models/User.js'; -import { MemoryKVCache, MemorySingleCache, QuantumKVCache, QuantumKVOpts, RedisKVCache, RedisSingleCache } from '@/misc/cache.js'; +import { MemoryKVCache, MemorySingleCache, RedisKVCache, RedisSingleCache } from '@/misc/cache.js'; +import { QuantumKVCache, QuantumKVOpts } from '@/misc/QuantumKVCache.js'; import { CacheService, CachedTranslationEntity, FollowStats } from '@/core/CacheService.js'; import { DI } from '@/di-symbols.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; diff --git a/packages/backend/test/unit/misc/QuantumKVCache.ts b/packages/backend/test/unit/misc/QuantumKVCache.ts new file mode 100644 index 0000000000..72997494ce --- /dev/null +++ b/packages/backend/test/unit/misc/QuantumKVCache.ts @@ -0,0 +1,596 @@ +/* + * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { jest } from '@jest/globals'; +import { FakeInternalEventService } from '../../misc/FakeInternalEventService.js'; +import { QuantumKVCache, QuantumKVOpts } from '@/misc/QuantumKVCache.js'; + +describe(QuantumKVCache, () => { + let fakeInternalEventService: FakeInternalEventService; + let madeCaches: { dispose: () => void }[]; + + function makeCache(opts?: Partial> & { name?: string }): QuantumKVCache { + const _opts = { + name: 'test', + lifetime: Infinity, + fetcher: () => { throw new Error('not implemented'); }, + } satisfies QuantumKVOpts & { name: string }; + + if (opts) { + Object.assign(_opts, opts); + } + + const cache = new QuantumKVCache(fakeInternalEventService, _opts.name, _opts); + madeCaches.push(cache); + return cache; + } + + beforeEach(() => { + madeCaches = []; + fakeInternalEventService = new FakeInternalEventService(); + }); + + afterEach(() => { + madeCaches.forEach(cache => { + cache.dispose(); + }); + }); + + it('should connect on construct', () => { + makeCache(); + + expect(fakeInternalEventService._calls).toContainEqual(['on', ['quantumCacheUpdated', expect.anything(), { ignoreLocal: true }]]); + }); + + it('should disconnect on dispose', () => { + const cache = makeCache(); + + cache.dispose(); + + const callback = fakeInternalEventService._calls + .find(c => c[0] === 'on' && c[1][0] === 'quantumCacheUpdated') + ?.[1][1]; + expect(fakeInternalEventService._calls).toContainEqual(['off', ['quantumCacheUpdated', callback]]); + }); + + it('should store in memory cache', async () => { + const cache = makeCache(); + + await cache.set('foo', 'bar'); + await cache.set('alpha', 'omega'); + + const result1 = await cache.get('foo'); + const result2 = await cache.get('alpha'); + + expect(result1).toBe('bar'); + expect(result2).toBe('omega'); + }); + + it('should emit event when storing', async () => { + const cache = makeCache({ name: 'fake' }); + + await cache.set('foo', 'bar'); + + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }]]); + }); + + it('should call onSet when storing', async () => { + const fakeOnSet = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onSet: fakeOnSet, + }); + + await cache.set('foo', 'bar'); + + expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); + }); + + it('should not emit event when storing unchanged value', async () => { + const cache = makeCache({ name: 'fake' }); + + await cache.set('foo', 'bar'); + await cache.set('foo', 'bar'); + + expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); + }); + + it('should not call onSet when storing unchanged value', async () => { + const fakeOnSet = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onSet: fakeOnSet, + }); + + await cache.set('foo', 'bar'); + await cache.set('foo', 'bar'); + + expect(fakeOnSet).toHaveBeenCalledTimes(1); + }); + + it('should fetch an unknown value', async () => { + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + }); + + const result = await cache.fetch('foo'); + + expect(result).toBe('value#foo'); + }); + + it('should store fetched value in memory cache', async () => { + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + }); + + await cache.fetch('foo'); + + const result = cache.has('foo'); + expect(result).toBe(true); + }); + + it('should call onSet when fetching', async () => { + const fakeOnSet = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + onSet: fakeOnSet, + }); + + await cache.fetch('foo'); + + expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); + }); + + it('should not emit event when fetching', async () => { + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + }); + + await cache.fetch('foo'); + + expect(fakeInternalEventService._calls).not.toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }]]); + }); + + it('should delete from memory cache', async () => { + const cache = makeCache(); + + await cache.set('foo', 'bar'); + await cache.delete('foo'); + + const result = cache.has('foo'); + expect(result).toBe(false); + }); + + it('should call onDelete when deleting', async () => { + const fakeOnDelete = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onDelete: fakeOnDelete, + }); + + await cache.set('foo', 'bar'); + await cache.delete('foo'); + + expect(fakeOnDelete).toHaveBeenCalledWith('foo', cache); + }); + + it('should emit event when deleting', async () => { + const cache = makeCache({ name: 'fake' }); + + await cache.set('foo', 'bar'); + await cache.delete('foo'); + + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 'd', keys: ['foo'] }]]); + }); + + it('should delete when receiving set event', async () => { + const cache = makeCache({ name: 'fake' }); + await cache.set('foo', 'bar'); + + await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }); + + const result = cache.has('foo'); + expect(result).toBe(false); + }); + + it('should call onSet when receiving set event', async () => { + const fakeOnSet = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onSet: fakeOnSet, + }); + + await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }); + + expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); + }); + + it('should delete when receiving delete event', async () => { + const cache = makeCache({ name: 'fake' }); + await cache.set('foo', 'bar'); + + await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 'd', keys: ['foo'] }); + + const result = cache.has('foo'); + expect(result).toBe(false); + }); + + it('should call onDelete when receiving delete event', async () => { + const fakeOnDelete = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onDelete: fakeOnDelete, + }); + await cache.set('foo', 'bar'); + + await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 'd', keys: ['foo'] }); + + expect(fakeOnDelete).toHaveBeenCalledWith('foo', cache); + }); + + describe('get', () => { + it('should return value if present', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = cache.get('foo'); + + expect(result).toBe('bar'); + }); + it('should return undefined if missing', () => { + const cache = makeCache(); + + const result = cache.get('foo'); + + expect(result).toBe(undefined); + }); + }); + + describe('setMany', () => { + it('should populate all values', async () => { + const cache = makeCache(); + + await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); + + expect(cache.has('foo')).toBe(true); + expect(cache.has('alpha')).toBe(true); + }); + + it('should emit one event', async () => { + const cache = makeCache({ + name: 'fake', + }); + + await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); + + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo', 'alpha'] }]]); + expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); + }); + + it('should call onSet for each item', async () => { + const fakeOnSet = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onSet: fakeOnSet, + }); + + await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); + + expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); + expect(fakeOnSet).toHaveBeenCalledWith('alpha', cache); + }); + + it('should emit events only for changed items', async () => { + const fakeOnSet = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onSet: fakeOnSet, + }); + + await cache.set('foo', 'bar'); + fakeOnSet.mockClear(); + fakeInternalEventService._reset(); + + await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); + + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['alpha'] }]]); + expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); + expect(fakeOnSet).toHaveBeenCalledWith('alpha', cache); + expect(fakeOnSet).toHaveBeenCalledTimes(1); + }); + }); + + describe('deleteMany', () => { + it('should remove keys from memory cache', async () => { + const cache = makeCache(); + + await cache.set('foo', 'bar'); + await cache.set('alpha', 'omega'); + await cache.deleteMany(['foo', 'alpha']); + + expect(cache.has('foo')).toBe(false); + expect(cache.has('alpha')).toBe(false); + }); + + it('should emit only one event', async () => { + const cache = makeCache({ + name: 'fake', + }); + + await cache.deleteMany(['foo', 'alpha']); + + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 'd', keys: ['foo', 'alpha'] }]]); + expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); + }); + + it('should call onDelete for each key', async () => { + const fakeOnDelete = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onDelete: fakeOnDelete, + }); + + await cache.deleteMany(['foo', 'alpha']); + + expect(fakeOnDelete).toHaveBeenCalledWith('foo', cache); + expect(fakeOnDelete).toHaveBeenCalledWith('alpha', cache); + }); + + it('should do nothing if no keys are provided', async () => { + const fakeOnDelete = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onDelete: fakeOnDelete, + }); + + await cache.deleteMany([]); + + expect(fakeOnDelete).not.toHaveBeenCalled(); + expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); + }); + }); + + describe('refresh', () => { + it('should populate the value', async () => { + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + }); + + await cache.refresh('foo'); + + const result = cache.has('foo'); + expect(result).toBe(true); + }); + + it('should return the value', async () => { + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + }); + + const result = await cache.refresh('foo'); + + expect(result).toBe('value#foo'); + }); + + it('should replace the value if it exists', async () => { + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + }); + + await cache.set('foo', 'bar'); + const result = await cache.refresh('foo'); + + expect(result).toBe('value#foo'); + }); + + it('should call onSet', async () => { + const fakeOnSet = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + onSet: fakeOnSet, + }); + + await cache.refresh('foo'); + + expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); + }); + + it('should emit event', async () => { + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + }); + + await cache.refresh('foo'); + + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }]]); + }); + }); + + describe('add', () => { + it('should add the item', () => { + const cache = makeCache(); + cache.add('foo', 'bar'); + expect(cache.has('foo')).toBe(true); + }); + + it('should not emit event', () => { + const cache = makeCache({ + name: 'fake', + }); + + cache.add('foo', 'bar'); + + expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); + }); + + it('should not call onSet', () => { + const fakeOnSet = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onSet: fakeOnSet, + }); + + cache.add('foo', 'bar'); + + expect(fakeOnSet).not.toHaveBeenCalled(); + }); + }); + + describe('addMany', () => { + it('should add all items', () => { + const cache = makeCache(); + + cache.addMany([['foo', 'bar'], ['alpha', 'omega']]); + + expect(cache.has('foo')).toBe(true); + expect(cache.has('alpha')).toBe(true); + }); + + it('should not emit event', () => { + const cache = makeCache({ + name: 'fake', + }); + + cache.addMany([['foo', 'bar'], ['alpha', 'omega']]); + + expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); + }); + + it('should not call onSet', () => { + const fakeOnSet = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onSet: fakeOnSet, + }); + + cache.addMany([['foo', 'bar'], ['alpha', 'omega']]); + + expect(fakeOnSet).not.toHaveBeenCalled(); + }); + }); + + describe('has', () => { + it('should return false when empty', () => { + const cache = makeCache(); + const result = cache.has('foo'); + expect(result).toBe(false); + }); + + it('should return false when value is not in memory', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = cache.has('alpha'); + + expect(result).toBe(false); + }); + + it('should return true when value is in memory', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = cache.has('foo'); + + expect(result).toBe(true); + }); + }); + + describe('size', () => { + it('should return 0 when empty', () => { + const cache = makeCache(); + expect(cache.size).toBe(0); + }); + + it('should return correct size when populated', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + expect(cache.size).toBe(1); + }); + }); + + describe('entries', () => { + it('should return empty when empty', () => { + const cache = makeCache(); + + const result = Array.from(cache.entries()); + + expect(result).toHaveLength(0); + }); + + it('should return all entries when populated', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = Array.from(cache.entries()); + + expect(result).toEqual([['foo', 'bar']]); + }); + }); + + describe('keys', () => { + it('should return empty when empty', () => { + const cache = makeCache(); + + const result = Array.from(cache.keys()); + + expect(result).toHaveLength(0); + }); + + it('should return all keys when populated', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = Array.from(cache.keys()); + + expect(result).toEqual(['foo']); + }); + }); + + describe('values', () => { + it('should return empty when empty', () => { + const cache = makeCache(); + + const result = Array.from(cache.values()); + + expect(result).toHaveLength(0); + }); + + it('should return all values when populated', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = Array.from(cache.values()); + + expect(result).toEqual(['bar']); + }); + }); + + describe('[Symbol.iterator]', () => { + it('should return empty when empty', () => { + const cache = makeCache(); + + const result = Array.from(cache); + + expect(result).toHaveLength(0); + }); + + it('should return all entries when populated', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = Array.from(cache); + + expect(result).toEqual([['foo', 'bar']]); + }); + }); +}); diff --git a/packages/backend/test/unit/misc/cache.ts b/packages/backend/test/unit/misc/cache.ts deleted file mode 100644 index e24f6d4dcc..0000000000 --- a/packages/backend/test/unit/misc/cache.ts +++ /dev/null @@ -1,597 +0,0 @@ -/* - * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors - * SPDX-License-Identifier: AGPL-3.0-only - */ - -import { jest } from '@jest/globals'; -import { FakeInternalEventService } from '../../misc/FakeInternalEventService.js'; -import { QuantumKVCache, QuantumKVOpts } from '@/misc/cache.js'; - -describe(QuantumKVCache, () => { - let fakeInternalEventService: FakeInternalEventService; - let madeCaches: { dispose: () => void }[]; - - function makeCache(opts?: Partial> & { name?: string }): QuantumKVCache { - const _opts = { - name: 'test', - lifetime: Infinity, - fetcher: () => { throw new Error('not implemented'); }, - } satisfies QuantumKVOpts & { name: string }; - - if (opts) { - Object.assign(_opts, opts); - } - - const cache = new QuantumKVCache(fakeInternalEventService, _opts.name, _opts); - madeCaches.push(cache); - return cache; - } - - beforeEach(() => { - madeCaches = []; - fakeInternalEventService = new FakeInternalEventService(); - }); - - afterEach(() => { - madeCaches.forEach(cache => { - cache.dispose(); - }); - }); - - it('should connect on construct', () => { - makeCache(); - - expect(fakeInternalEventService._calls).toContainEqual(['on', ['quantumCacheUpdated', expect.anything(), { ignoreLocal: true }]]); - }); - - it('should disconnect on dispose', () => { - const cache = makeCache(); - - cache.dispose(); - - const callback = fakeInternalEventService._calls - .find(c => c[0] === 'on' && c[1][0] === 'quantumCacheUpdated') - ?.[1][1]; - expect(fakeInternalEventService._calls).toContainEqual(['off', ['quantumCacheUpdated', callback]]); - }); - - it('should store in memory cache', async () => { - const cache = makeCache(); - - await cache.set('foo', 'bar'); - await cache.set('alpha', 'omega'); - - const result1 = await cache.get('foo'); - const result2 = await cache.get('alpha'); - - expect(result1).toBe('bar'); - expect(result2).toBe('omega'); - }); - - it('should emit event when storing', async () => { - const cache = makeCache({ name: 'fake' }); - - await cache.set('foo', 'bar'); - - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }]]); - }); - - it('should call onSet when storing', async () => { - const fakeOnSet = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - onSet: fakeOnSet, - }); - - await cache.set('foo', 'bar'); - - expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); - }); - - it('should not emit event when storing unchanged value', async () => { - const cache = makeCache({ name: 'fake' }); - - await cache.set('foo', 'bar'); - await cache.set('foo', 'bar'); - - expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); - }); - - it('should not call onSet when storing unchanged value', async () => { - const fakeOnSet = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - onSet: fakeOnSet, - }); - - await cache.set('foo', 'bar'); - await cache.set('foo', 'bar'); - - expect(fakeOnSet).toHaveBeenCalledTimes(1); - }); - - it('should fetch an unknown value', async () => { - const cache = makeCache({ - name: 'fake', - fetcher: key => `value#${key}`, - }); - - const result = await cache.fetch('foo'); - - expect(result).toBe('value#foo'); - }); - - it('should store fetched value in memory cache', async () => { - const cache = makeCache({ - name: 'fake', - fetcher: key => `value#${key}`, - }); - - await cache.fetch('foo'); - - const result = cache.has('foo'); - expect(result).toBe(true); - }); - - it('should call onSet when fetching', async () => { - const fakeOnSet = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - fetcher: key => `value#${key}`, - onSet: fakeOnSet, - }); - - await cache.fetch('foo'); - - expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); - }); - - it('should not emit event when fetching', async () => { - const cache = makeCache({ - name: 'fake', - fetcher: key => `value#${key}`, - }); - - await cache.fetch('foo'); - - expect(fakeInternalEventService._calls).not.toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }]]); - }); - - it('should delete from memory cache', async () => { - const cache = makeCache(); - - await cache.set('foo', 'bar'); - await cache.delete('foo'); - - const result = cache.has('foo'); - expect(result).toBe(false); - }); - - it('should call onDelete when deleting', async () => { - const fakeOnDelete = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - onDelete: fakeOnDelete, - }); - - await cache.set('foo', 'bar'); - await cache.delete('foo'); - - expect(fakeOnDelete).toHaveBeenCalledWith('foo', cache); - }); - - it('should emit event when deleting', async () => { - const cache = makeCache({ name: 'fake' }); - - await cache.set('foo', 'bar'); - await cache.delete('foo'); - - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 'd', keys: ['foo'] }]]); - }); - - it('should delete when receiving set event', async () => { - const cache = makeCache({ name: 'fake' }); - await cache.set('foo', 'bar'); - - await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }); - - const result = cache.has('foo'); - expect(result).toBe(false); - }); - - it('should call onSet when receiving set event', async () => { - const fakeOnSet = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - onSet: fakeOnSet, - }); - - await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }); - - expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); - }); - - it('should delete when receiving delete event', async () => { - const cache = makeCache({ name: 'fake' }); - await cache.set('foo', 'bar'); - - await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 'd', keys: ['foo'] }); - - const result = cache.has('foo'); - expect(result).toBe(false); - }); - - it('should call onDelete when receiving delete event', async () => { - const fakeOnDelete = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - onDelete: fakeOnDelete, - }); - await cache.set('foo', 'bar'); - - await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 'd', keys: ['foo'] }); - - expect(fakeOnDelete).toHaveBeenCalledWith('foo', cache); - }); - - describe('get', () => { - it('should return value if present', async () => { - const cache = makeCache(); - await cache.set('foo', 'bar'); - - const result = cache.get('foo'); - - expect(result).toBe('bar'); - }); - it('should return undefined if missing', () => { - const cache = makeCache(); - - const result = cache.get('foo'); - - expect(result).toBe(undefined); - }); - }); - - describe('setMany', () => { - it('should populate all values', async () => { - const cache = makeCache(); - - await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); - - expect(cache.has('foo')).toBe(true); - expect(cache.has('alpha')).toBe(true); - }); - - it('should emit one event', async () => { - const cache = makeCache({ - name: 'fake', - }); - - await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); - - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo', 'alpha'] }]]); - expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); - }); - - it('should call onSet for each item', async () => { - const fakeOnSet = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - onSet: fakeOnSet, - }); - - await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); - - expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); - expect(fakeOnSet).toHaveBeenCalledWith('alpha', cache); - }); - - it('should emit events only for changed items', async () => { - const fakeOnSet = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - onSet: fakeOnSet, - }); - - await cache.set('foo', 'bar'); - fakeOnSet.mockClear(); - fakeInternalEventService._reset(); - - await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); - - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['alpha'] }]]); - expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); - expect(fakeOnSet).toHaveBeenCalledWith('alpha', cache); - expect(fakeOnSet).toHaveBeenCalledTimes(1); - }); - }); - - describe('deleteMany', () => { - it('should remove keys from memory cache', async () => { - const cache = makeCache(); - - await cache.set('foo', 'bar'); - await cache.set('alpha', 'omega'); - await cache.deleteMany(['foo', 'alpha']); - - expect(cache.has('foo')).toBe(false); - expect(cache.has('alpha')).toBe(false); - }); - - it('should emit only one event', async () => { - const cache = makeCache({ - name: 'fake', - }); - - await cache.deleteMany(['foo', 'alpha']); - - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 'd', keys: ['foo', 'alpha'] }]]); - expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); - }); - - it('should call onDelete for each key', async () => { - const fakeOnDelete = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - onDelete: fakeOnDelete, - }); - - await cache.deleteMany(['foo', 'alpha']); - - expect(fakeOnDelete).toHaveBeenCalledWith('foo', cache); - expect(fakeOnDelete).toHaveBeenCalledWith('alpha', cache); - }); - - it('should do nothing if no keys are provided', async () => { - const fakeOnDelete = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - onDelete: fakeOnDelete, - }); - - await cache.deleteMany([]); - - expect(fakeOnDelete).not.toHaveBeenCalled(); - expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); - }); - }); - - describe('refresh', () => { - it('should populate the value', async () => { - const cache = makeCache({ - name: 'fake', - fetcher: key => `value#${key}`, - }); - - await cache.refresh('foo'); - - const result = cache.has('foo'); - expect(result).toBe(true); - }); - - it('should return the value', async () => { - const cache = makeCache({ - name: 'fake', - fetcher: key => `value#${key}`, - }); - - const result = await cache.refresh('foo'); - - expect(result).toBe('value#foo'); - }); - - it('should replace the value if it exists', async () => { - const cache = makeCache({ - name: 'fake', - fetcher: key => `value#${key}`, - }); - - await cache.set('foo', 'bar'); - const result = await cache.refresh('foo'); - - expect(result).toBe('value#foo'); - }); - - it('should call onSet', async () => { - const fakeOnSet = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - name: 'fake', - fetcher: key => `value#${key}`, - onSet: fakeOnSet, - }); - - await cache.refresh('foo'); - - expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); - }); - - it('should emit event', async () => { - const cache = makeCache({ - name: 'fake', - fetcher: key => `value#${key}`, - }); - - await cache.refresh('foo'); - - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }]]); - }); - }); - - describe('add', () => { - it('should add the item', () => { - const cache = makeCache(); - cache.add('foo', 'bar'); - expect(cache.has('foo')).toBe(true); - }); - - it('should not emit event', () => { - const cache = makeCache({ - name: 'fake', - }); - - cache.add('foo', 'bar'); - - expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); - }); - - it('should not call onSet', () => { - const fakeOnSet = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - onSet: fakeOnSet, - }); - - cache.add('foo', 'bar'); - - expect(fakeOnSet).not.toHaveBeenCalled(); - }); - }); - - describe('addMany', () => { - it('should add all items', () => { - const cache = makeCache(); - - cache.addMany([['foo', 'bar'], ['alpha', 'omega']]); - - expect(cache.has('foo')).toBe(true); - expect(cache.has('alpha')).toBe(true); - }); - - - it('should not emit event', () => { - const cache = makeCache({ - name: 'fake', - }); - - cache.addMany([['foo', 'bar'], ['alpha', 'omega']]); - - expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); - }); - - it('should not call onSet', () => { - const fakeOnSet = jest.fn(() => Promise.resolve()); - const cache = makeCache({ - onSet: fakeOnSet, - }); - - cache.addMany([['foo', 'bar'], ['alpha', 'omega']]); - - expect(fakeOnSet).not.toHaveBeenCalled(); - }); - }); - - describe('has', () => { - it('should return false when empty', () => { - const cache = makeCache(); - const result = cache.has('foo'); - expect(result).toBe(false); - }); - - it('should return false when value is not in memory', async () => { - const cache = makeCache(); - await cache.set('foo', 'bar'); - - const result = cache.has('alpha'); - - expect(result).toBe(false); - }); - - it('should return true when value is in memory', async () => { - const cache = makeCache(); - await cache.set('foo', 'bar'); - - const result = cache.has('foo'); - - expect(result).toBe(true); - }); - }); - - describe('size', () => { - it('should return 0 when empty', () => { - const cache = makeCache(); - expect(cache.size).toBe(0); - }); - - it('should return correct size when populated', async () => { - const cache = makeCache(); - await cache.set('foo', 'bar'); - - expect(cache.size).toBe(1); - }); - }); - - describe('entries', () => { - it('should return empty when empty', () => { - const cache = makeCache(); - - const result = Array.from(cache.entries()); - - expect(result).toHaveLength(0); - }); - - it('should return all entries when populated', async () => { - const cache = makeCache(); - await cache.set('foo', 'bar'); - - const result = Array.from(cache.entries()); - - expect(result).toEqual([['foo', 'bar']]); - }); - }); - - describe('keys', () => { - it('should return empty when empty', () => { - const cache = makeCache(); - - const result = Array.from(cache.keys()); - - expect(result).toHaveLength(0); - }); - - it('should return all keys when populated', async () => { - const cache = makeCache(); - await cache.set('foo', 'bar'); - - const result = Array.from(cache.keys()); - - expect(result).toEqual(['foo']); - }); - }); - - describe('values', () => { - it('should return empty when empty', () => { - const cache = makeCache(); - - const result = Array.from(cache.values()); - - expect(result).toHaveLength(0); - }); - - it('should return all values when populated', async () => { - const cache = makeCache(); - await cache.set('foo', 'bar'); - - const result = Array.from(cache.values()); - - expect(result).toEqual(['bar']); - }); - }); - - describe('[Symbol.iterator]', () => { - it('should return empty when empty', () => { - const cache = makeCache(); - - const result = Array.from(cache); - - expect(result).toHaveLength(0); - }); - - it('should return all entries when populated', async () => { - const cache = makeCache(); - await cache.set('foo', 'bar'); - - const result = Array.from(cache); - - expect(result).toEqual([['foo', 'bar']]); - }); - }); -}); -- cgit v1.2.3-freya From 853b548a4369051b8fdaabbda80d7d6ed52adb77 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Sat, 7 Jun 2025 21:27:25 -0400 Subject: re-type userFollowingsCache to match the others --- packages/backend/src/core/AntennaService.ts | 3 +- packages/backend/src/core/CacheService.ts | 38 +++++++++------------- packages/backend/src/core/NotificationService.ts | 12 +++---- .../backend/src/core/entities/NoteEntityService.ts | 14 +++----- .../server/api/endpoints/notes/hybrid-timeline.ts | 2 +- .../src/server/api/endpoints/notes/timeline.ts | 2 +- .../src/server/api/endpoints/users/notes.ts | 2 +- .../backend/src/server/api/stream/Connection.ts | 2 +- packages/backend/src/server/api/stream/channel.ts | 6 ++-- .../server/api/stream/channels/bubble-timeline.ts | 2 +- .../server/api/stream/channels/global-timeline.ts | 2 +- .../server/api/stream/channels/home-timeline.ts | 4 +-- .../server/api/stream/channels/hybrid-timeline.ts | 4 +-- .../server/api/stream/channels/local-timeline.ts | 2 +- .../server/api/stream/channels/role-timeline.ts | 2 +- .../src/server/api/stream/channels/user-list.ts | 2 +- packages/backend/test/misc/noOpCaches.ts | 4 +-- 17 files changed, 47 insertions(+), 56 deletions(-) (limited to 'packages/backend/src/core/CacheService.ts') diff --git a/packages/backend/src/core/AntennaService.ts b/packages/backend/src/core/AntennaService.ts index cf696e3599..667df57943 100644 --- a/packages/backend/src/core/AntennaService.ts +++ b/packages/backend/src/core/AntennaService.ts @@ -130,7 +130,8 @@ export class AntennaService implements OnApplicationShutdown { } if (note.visibility === 'followers') { - const isFollowing = Object.hasOwn(await this.cacheService.userFollowingsCache.fetch(antenna.userId), note.userId); + const followings = await this.cacheService.userFollowingsCache.fetch(antenna.userId); + const isFollowing = followings.has(note.userId); if (!isFollowing && antenna.userId !== note.userId) return false; } diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index e59857b4ce..38a93e57f4 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -6,14 +6,14 @@ import { Inject, Injectable } from '@nestjs/common'; import * as Redis from 'ioredis'; import { In, IsNull } from 'typeorm'; -import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiFollowing, MiNote } from '@/models/_.js'; +import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiNote } from '@/models/_.js'; import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js'; import { QuantumKVCache } from '@/misc/QuantumKVCache.js'; import type { MiLocalUser, MiUser } from '@/models/User.js'; import { DI } from '@/di-symbols.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { bindThis } from '@/decorators.js'; -import type { GlobalEvents, InternalEventTypes } from '@/core/GlobalEventService.js'; +import type { InternalEventTypes } from '@/core/GlobalEventService.js'; import { InternalEventService } from '@/core/InternalEventService.js'; import type { OnApplicationShutdown } from '@nestjs/common'; @@ -46,7 +46,7 @@ export class CacheService implements OnApplicationShutdown { public userBlockingCache: QuantumKVCache>; public userBlockedCache: QuantumKVCache>; // NOTE: 「被」Blockキャッシュ public renoteMutingsCache: QuantumKVCache>; - public userFollowingsCache: QuantumKVCache | undefined>>; + public userFollowingsCache: QuantumKVCache>; protected userFollowStatsCache = new MemoryKVCache(1000 * 60 * 10); // 10 minutes protected translationsCache: RedisKVCache; @@ -110,15 +110,9 @@ export class CacheService implements OnApplicationShutdown { fetcher: (key) => this.renoteMutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))), }); - this.userFollowingsCache = new QuantumKVCache | undefined>>(this.internalEventService, 'userFollowings', { + this.userFollowingsCache = new QuantumKVCache>(this.internalEventService, 'userFollowings', { lifetime: 1000 * 60 * 30, // 30m - fetcher: (key) => this.followingsRepository.find({ where: { followerId: key }, select: ['followeeId', 'withReplies'] }).then(xs => { - const obj: Record | undefined> = {}; - for (const x of xs) { - obj[x.followeeId] = { withReplies: x.withReplies }; - } - return obj; - }), + fetcher: (key) => this.followingsRepository.find({ where: { followerId: key }, select: ['followeeId', 'withReplies'] }).then(xs => new Map(xs.map(f => [f.followeeId, { withReplies: f.withReplies }]))), }); this.translationsCache = new RedisKVCache(this.redisClient, 'translations', { @@ -305,14 +299,14 @@ export class CacheService implements OnApplicationShutdown { } @bindThis - public async getUserFollowings(userIds: Iterable): Promise>> { - const followings = new Map>(); + public async getUserFollowings(userIds: Iterable): Promise>> { + const followings = new Map>(); const toFetch: string[] = []; for (const userId of userIds) { const fromCache = this.userFollowingsCache.get(userId); if (fromCache) { - followings.set(userId, new Set(Object.keys(fromCache))); + followings.set(userId, fromCache); } else { toFetch.push(userId); } @@ -331,25 +325,25 @@ export class CacheService implements OnApplicationShutdown { }) .getMany(); - const toCache = new Map | undefined>>(); + const toCache = new Map>(); // Pivot to a map for (const { followerId, followeeId, withReplies } of fetchedFollowings) { // Queue for cache - let cacheSet = toCache.get(followerId); - if (!cacheSet) { - cacheSet = {}; - toCache.set(followerId, cacheSet); + let cacheMap = toCache.get(followerId); + if (!cacheMap) { + cacheMap = new Map(); + toCache.set(followerId, cacheMap); } - cacheSet[followeeId] = { withReplies }; + cacheMap.set(followeeId, { withReplies }); // Queue for return let returnSet = followings.get(followerId); if (!returnSet) { - returnSet = new Set(); + returnSet = new Map(); followings.set(followerId, returnSet); } - returnSet.add(followeeId); + returnSet.set(followeeId, { withReplies }); } // Update cache to speed up future calls diff --git a/packages/backend/src/core/NotificationService.ts b/packages/backend/src/core/NotificationService.ts index 0f05f5425d..2ce7bdb5a9 100644 --- a/packages/backend/src/core/NotificationService.ts +++ b/packages/backend/src/core/NotificationService.ts @@ -113,27 +113,27 @@ export class NotificationService implements OnApplicationShutdown { } if (recieveConfig?.type === 'following') { - const isFollowing = await this.cacheService.userFollowingsCache.fetch(notifieeId).then(followings => Object.hasOwn(followings, notifierId)); + const isFollowing = await this.cacheService.userFollowingsCache.fetch(notifieeId).then(followings => followings.has(notifierId)); if (!isFollowing) { return null; } } else if (recieveConfig?.type === 'follower') { - const isFollower = await this.cacheService.userFollowingsCache.fetch(notifierId).then(followings => Object.hasOwn(followings, notifieeId)); + const isFollower = await this.cacheService.userFollowingsCache.fetch(notifierId).then(followings => followings.has(notifieeId)); if (!isFollower) { return null; } } else if (recieveConfig?.type === 'mutualFollow') { const [isFollowing, isFollower] = await Promise.all([ - this.cacheService.userFollowingsCache.fetch(notifieeId).then(followings => Object.hasOwn(followings, notifierId)), - this.cacheService.userFollowingsCache.fetch(notifierId).then(followings => Object.hasOwn(followings, notifieeId)), + this.cacheService.userFollowingsCache.fetch(notifieeId).then(followings => followings.has(notifierId)), + this.cacheService.userFollowingsCache.fetch(notifierId).then(followings => followings.has(notifieeId)), ]); if (!(isFollowing && isFollower)) { return null; } } else if (recieveConfig?.type === 'followingOrFollower') { const [isFollowing, isFollower] = await Promise.all([ - this.cacheService.userFollowingsCache.fetch(notifieeId).then(followings => Object.hasOwn(followings, notifierId)), - this.cacheService.userFollowingsCache.fetch(notifierId).then(followings => Object.hasOwn(followings, notifieeId)), + this.cacheService.userFollowingsCache.fetch(notifieeId).then(followings => followings.has(notifierId)), + this.cacheService.userFollowingsCache.fetch(notifierId).then(followings => followings.has(notifieeId)), ]); if (!isFollowing && !isFollower) { return null; diff --git a/packages/backend/src/core/entities/NoteEntityService.ts b/packages/backend/src/core/entities/NoteEntityService.ts index 1b3920e13f..3af66b220d 100644 --- a/packages/backend/src/core/entities/NoteEntityService.ts +++ b/packages/backend/src/core/entities/NoteEntityService.ts @@ -133,7 +133,7 @@ export class NoteEntityService implements OnModuleInit { @bindThis public async hideNote(packedNote: Packed<'Note'>, meId: MiUser['id'] | null, hint?: { - myFollowing?: ReadonlySet, + myFollowing?: ReadonlyMap, myBlockers?: ReadonlySet, }): Promise { if (meId === packedNote.userId) return; @@ -193,7 +193,7 @@ export class NoteEntityService implements OnModuleInit { } else { const isFollowing = hint?.myFollowing ? hint.myFollowing.has(packedNote.userId) - : (await this.cacheService.userFollowingsCache.fetch(meId))[packedNote.userId] != null; + : (await this.cacheService.userFollowingsCache.fetch(meId)).has(packedNote.userId); hide = !isFollowing; } @@ -358,14 +358,10 @@ export class NoteEntityService implements OnModuleInit { : this.cacheService.userBlockingCache.fetch(meId).then((ids) => ids.has(note.userId)), hint?.myFollowing ? hint.myFollowing.has(note.userId) - : this.followingsRepository.existsBy({ - followeeId: note.userId, - followerId: meId, - }), + : this.cacheService.userFollowingsCache.fetch(meId).then(ids => ids.has(note.userId)), hint?.me !== undefined ? (hint.me?.host ?? null) - : this.cacheService.userByIdCache.fetch(meId, () => this.usersRepository.findOneByOrFail({ id: meId })) - .then(me => me.host), + : this.cacheService.findUserById(meId).then(me => me.host), ]); if (blocked) return false; @@ -420,7 +416,7 @@ export class NoteEntityService implements OnModuleInit { packedFiles: Map | null>; packedUsers: Map>; mentionHandles: Record; - userFollowings: Map>; + userFollowings: Map>; userBlockers: Map>; polls: Map; pollVotes: Map>; diff --git a/packages/backend/src/server/api/endpoints/notes/hybrid-timeline.ts b/packages/backend/src/server/api/endpoints/notes/hybrid-timeline.ts index a7b104e198..a5623d1f03 100644 --- a/packages/backend/src/server/api/endpoints/notes/hybrid-timeline.ts +++ b/packages/backend/src/server/api/endpoints/notes/hybrid-timeline.ts @@ -164,7 +164,7 @@ export default class extends Endpoint { // eslint- excludeBots: !ps.withBots, noteFilter: note => { if (note.reply && note.reply.visibility === 'followers') { - if (!Object.hasOwn(followings, note.reply.userId) && note.reply.userId !== me.id) return false; + if (!followings.has(note.reply.userId) && note.reply.userId !== me.id) return false; } return true; diff --git a/packages/backend/src/server/api/endpoints/notes/timeline.ts b/packages/backend/src/server/api/endpoints/notes/timeline.ts index 8cf7bb5795..44c539eaad 100644 --- a/packages/backend/src/server/api/endpoints/notes/timeline.ts +++ b/packages/backend/src/server/api/endpoints/notes/timeline.ts @@ -115,7 +115,7 @@ export default class extends Endpoint { // eslint- excludePureRenotes: !ps.withRenotes, noteFilter: note => { if (note.reply && note.reply.visibility === 'followers') { - if (!Object.hasOwn(followings, note.reply.userId) && note.reply.userId !== me.id) return false; + if (!followings.has(note.reply.userId) && note.reply.userId !== me.id) return false; } if (!ps.withBots && note.user?.isBot) return false; diff --git a/packages/backend/src/server/api/endpoints/users/notes.ts b/packages/backend/src/server/api/endpoints/users/notes.ts index 66b50e0633..4602709067 100644 --- a/packages/backend/src/server/api/endpoints/users/notes.ts +++ b/packages/backend/src/server/api/endpoints/users/notes.ts @@ -134,7 +134,7 @@ export default class extends Endpoint { // eslint- if (ps.withReplies) redisTimelines.push(`userTimelineWithReplies:${ps.userId}`); if (ps.withChannelNotes) redisTimelines.push(`userTimelineWithChannel:${ps.userId}`); - const isFollowing = me && Object.hasOwn(await this.cacheService.userFollowingsCache.fetch(me.id), ps.userId); + const isFollowing = me && (await this.cacheService.userFollowingsCache.fetch(me.id)).has(ps.userId); const timeline = await this.fanoutTimelineEndpointService.timeline({ untilId, diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts index e0535a2f14..21437850d3 100644 --- a/packages/backend/src/server/api/stream/Connection.ts +++ b/packages/backend/src/server/api/stream/Connection.ts @@ -36,7 +36,7 @@ export default class Connection { private channels = new Map(); private subscribingNotes = new Map(); public userProfile: MiUserProfile | null = null; - public following: Record | undefined> = {}; + public following: Map = new Map(); public followingChannels: Set = new Set(); public userIdsWhoMeMuting: Set = new Set(); public userIdsWhoBlockingMe: Set = new Set(); diff --git a/packages/backend/src/server/api/stream/channel.ts b/packages/backend/src/server/api/stream/channel.ts index 3a82865577..40ad454adb 100644 --- a/packages/backend/src/server/api/stream/channel.ts +++ b/packages/backend/src/server/api/stream/channel.ts @@ -70,7 +70,7 @@ export default abstract class Channel { if (!this.user) return false; if (this.user.id === note.userId) return true; if (note.visibility === 'followers') { - return this.following[note.userId] != null; + return this.following.has(note.userId); } if (!note.visibleUserIds) return false; return note.visibleUserIds.includes(this.user.id); @@ -84,7 +84,7 @@ export default abstract class Channel { if (note.user.requireSigninToViewContents && !this.user) return true; // 流れてきたNoteがインスタンスミュートしたインスタンスが関わる - if (isInstanceMuted(note, this.userMutedInstances) && !this.following[note.userId]) return true; + if (isInstanceMuted(note, this.userMutedInstances) && !this.following.has(note.userId)) return true; // 流れてきたNoteがミュートしているユーザーが関わる if (isUserRelated(note, this.userIdsWhoMeMuting)) return true; @@ -101,7 +101,7 @@ export default abstract class Channel { if (note.user.isSilenced || note.user.instance?.isSilenced) { if (this.user == null) return true; if (this.user.id === note.userId) return false; - if (this.following[note.userId] == null) return true; + if (!this.following.has(note.userId)) return true; } // TODO muted threads diff --git a/packages/backend/src/server/api/stream/channels/bubble-timeline.ts b/packages/backend/src/server/api/stream/channels/bubble-timeline.ts index 393fe3883c..72f719b411 100644 --- a/packages/backend/src/server/api/stream/channels/bubble-timeline.ts +++ b/packages/backend/src/server/api/stream/channels/bubble-timeline.ts @@ -62,7 +62,7 @@ class BubbleTimelineChannel extends Channel { const reply = note.reply; // 自分のフォローしていないユーザーの visibility: followers な投稿への返信は弾く if (!this.isNoteVisibleToMe(reply)) return; - if (!this.following[note.userId]?.withReplies) { + if (!this.following.get(note.userId)?.withReplies) { // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合 if (reply.userId !== this.user?.id && !isMe && reply.userId !== note.userId) return; } diff --git a/packages/backend/src/server/api/stream/channels/global-timeline.ts b/packages/backend/src/server/api/stream/channels/global-timeline.ts index bac0277538..5c73f637c7 100644 --- a/packages/backend/src/server/api/stream/channels/global-timeline.ts +++ b/packages/backend/src/server/api/stream/channels/global-timeline.ts @@ -63,7 +63,7 @@ class GlobalTimelineChannel extends Channel { const reply = note.reply; // 自分のフォローしていないユーザーの visibility: followers な投稿への返信は弾く if (!this.isNoteVisibleToMe(reply)) return; - if (!this.following[note.userId]?.withReplies) { + if (!this.following.get(note.userId)?.withReplies) { // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合 if (reply.userId !== this.user?.id && !isMe && reply.userId !== note.userId) return; } diff --git a/packages/backend/src/server/api/stream/channels/home-timeline.ts b/packages/backend/src/server/api/stream/channels/home-timeline.ts index d1dcbd07e5..c7062c0394 100644 --- a/packages/backend/src/server/api/stream/channels/home-timeline.ts +++ b/packages/backend/src/server/api/stream/channels/home-timeline.ts @@ -47,7 +47,7 @@ class HomeTimelineChannel extends Channel { if (!this.followingChannels.has(note.channelId)) return; } else { // その投稿のユーザーをフォローしていなかったら弾く - if (!isMe && !Object.hasOwn(this.following, note.userId)) return; + if (!isMe && !this.following.has(note.userId)) return; } if (this.isNoteMutedOrBlocked(note)) return; @@ -57,7 +57,7 @@ class HomeTimelineChannel extends Channel { const reply = note.reply; // 自分のフォローしていないユーザーの visibility: followers な投稿への返信は弾く if (!this.isNoteVisibleToMe(reply)) return; - if (!this.following[note.userId]?.withReplies) { + if (!this.following.get(note.userId)?.withReplies) { // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合 if (reply.userId !== this.user!.id && !isMe && reply.userId !== note.userId) return; } diff --git a/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts b/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts index d923167e04..7cb64c9f89 100644 --- a/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts +++ b/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts @@ -62,7 +62,7 @@ class HybridTimelineChannel extends Channel { // フォローしているチャンネルの投稿 の場合だけ if (!( (note.channelId == null && isMe) || - (note.channelId == null && Object.hasOwn(this.following, note.userId)) || + (note.channelId == null && this.following.has(note.userId)) || (note.channelId == null && (note.user.host == null && note.visibility === 'public')) || (note.channelId != null && this.followingChannels.has(note.channelId)) )) return; @@ -74,7 +74,7 @@ class HybridTimelineChannel extends Channel { const reply = note.reply; // 自分のフォローしていないユーザーの visibility: followers な投稿への返信は弾く if (!this.isNoteVisibleToMe(reply)) return; - if (!this.following[note.userId]?.withReplies && !this.withReplies) { + if (!this.following.get(note.userId)?.withReplies && !this.withReplies) { // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合 if (reply.userId !== this.user!.id && !isMe && reply.userId !== note.userId) return; } diff --git a/packages/backend/src/server/api/stream/channels/local-timeline.ts b/packages/backend/src/server/api/stream/channels/local-timeline.ts index 2eb3460efa..4869d871d6 100644 --- a/packages/backend/src/server/api/stream/channels/local-timeline.ts +++ b/packages/backend/src/server/api/stream/channels/local-timeline.ts @@ -67,7 +67,7 @@ class LocalTimelineChannel extends Channel { const reply = note.reply; // 自分のフォローしていないユーザーの visibility: followers な投稿への返信は弾く if (!this.isNoteVisibleToMe(reply)) return; - if (!this.following[note.userId]?.withReplies) { + if (!this.following.get(note.userId)?.withReplies) { // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合 if (reply.userId !== this.user?.id && !isMe && reply.userId !== note.userId) return; } diff --git a/packages/backend/src/server/api/stream/channels/role-timeline.ts b/packages/backend/src/server/api/stream/channels/role-timeline.ts index f5984b5ae9..a3886618f1 100644 --- a/packages/backend/src/server/api/stream/channels/role-timeline.ts +++ b/packages/backend/src/server/api/stream/channels/role-timeline.ts @@ -55,7 +55,7 @@ class RoleTimelineChannel extends Channel { const reply = note.reply; // 自分のフォローしていないユーザーの visibility: followers な投稿への返信は弾く if (!this.isNoteVisibleToMe(reply)) return; - if (!this.following[note.userId]?.withReplies) { + if (!this.following.get(note.userId)?.withReplies) { // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合 if (reply.userId !== this.user?.id && !isMe && reply.userId !== note.userId) return; } diff --git a/packages/backend/src/server/api/stream/channels/user-list.ts b/packages/backend/src/server/api/stream/channels/user-list.ts index 3f1a5a8f8f..4dae24a696 100644 --- a/packages/backend/src/server/api/stream/channels/user-list.ts +++ b/packages/backend/src/server/api/stream/channels/user-list.ts @@ -98,7 +98,7 @@ class UserListChannel extends Channel { const reply = note.reply; // 自分のフォローしていないユーザーの visibility: followers な投稿への返信は弾く if (!this.isNoteVisibleToMe(reply)) return; - if (!this.following[note.userId]?.withReplies) { + if (!this.following.get(note.userId)?.withReplies) { // 「チャンネル接続主への返信」でもなければ、「チャンネル接続主が行った返信」でもなければ、「投稿者の投稿者自身への返信」でもない場合 if (reply.userId !== this.user!.id && !isMe && reply.userId !== note.userId) return; } diff --git a/packages/backend/test/misc/noOpCaches.ts b/packages/backend/test/misc/noOpCaches.ts index c05632239b..7e8c27503c 100644 --- a/packages/backend/test/misc/noOpCaches.ts +++ b/packages/backend/test/misc/noOpCaches.ts @@ -6,7 +6,7 @@ import * as Redis from 'ioredis'; import { Inject } from '@nestjs/common'; import { FakeInternalEventService } from './FakeInternalEventService.js'; -import type { BlockingsRepository, FollowingsRepository, MiFollowing, MiUser, MiUserProfile, MutingsRepository, RenoteMutingsRepository, UserProfilesRepository, UsersRepository } from '@/models/_.js'; +import type { BlockingsRepository, FollowingsRepository, MiUser, MiUserProfile, MutingsRepository, RenoteMutingsRepository, UserProfilesRepository, UsersRepository } from '@/models/_.js'; import type { MiLocalUser } from '@/models/User.js'; import { MemoryKVCache, MemorySingleCache, RedisKVCache, RedisSingleCache } from '@/misc/cache.js'; import { QuantumKVCache, QuantumKVOpts } from '@/misc/QuantumKVCache.js'; @@ -106,7 +106,7 @@ export class NoOpCacheService extends CacheService { onSet: this.renoteMutingsCache.onSet, onDelete: this.renoteMutingsCache.onDelete, }); - this.userFollowingsCache = new NoOpQuantumKVCache | undefined>>({ + this.userFollowingsCache = new NoOpQuantumKVCache>({ internalEventService: fakeInternalEventService, fetcher: this.userFollowingsCache.fetcher, onSet: this.userFollowingsCache.onSet, -- cgit v1.2.3-freya From 372714c9b603e6e06ac1f0792c96d4066b7413d5 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Sat, 7 Jun 2025 21:32:55 -0400 Subject: implement userFollowersCache --- packages/backend/src/core/CacheService.ts | 17 +++++++++++++++-- packages/backend/test/misc/noOpCaches.ts | 6 ++++++ 2 files changed, 21 insertions(+), 2 deletions(-) (limited to 'packages/backend/src/core/CacheService.ts') diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index 38a93e57f4..e8b26f8b9b 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -47,6 +47,7 @@ export class CacheService implements OnApplicationShutdown { public userBlockedCache: QuantumKVCache>; // NOTE: 「被」Blockキャッシュ public renoteMutingsCache: QuantumKVCache>; public userFollowingsCache: QuantumKVCache>; + public userFollowersCache: QuantumKVCache>; protected userFollowStatsCache = new MemoryKVCache(1000 * 60 * 10); // 10 minutes protected translationsCache: RedisKVCache; @@ -115,6 +116,11 @@ export class CacheService implements OnApplicationShutdown { fetcher: (key) => this.followingsRepository.find({ where: { followerId: key }, select: ['followeeId', 'withReplies'] }).then(xs => new Map(xs.map(f => [f.followeeId, { withReplies: f.withReplies }]))), }); + this.userFollowersCache = new QuantumKVCache>(this.internalEventService, 'userFollowers', { + lifetime: 1000 * 60 * 30, // 30m + fetcher: (key) => this.followingsRepository.find({ where: { followeeId: key }, select: ['followerId'] }).then(xs => new Set(xs.map(x => x.followerId))), + }); + this.translationsCache = new RedisKVCache(this.redisClient, 'translations', { lifetime: 1000 * 60 * 60 * 24 * 7, // 1 week, memoryCacheLifetime: 1000 * 60, // 1 minute @@ -154,6 +160,7 @@ export class CacheService implements OnApplicationShutdown { this.userBlockedCache.delete(body.id), this.renoteMutingsCache.delete(body.id), this.userFollowingsCache.delete(body.id), + this.userFollowersCache.delete(body.id), ]); } } else { @@ -193,7 +200,10 @@ export class CacheService implements OnApplicationShutdown { if (follower) follower.followingCount++; const followee = this.userByIdCache.get(body.followeeId); if (followee) followee.followersCount++; - await this.userFollowingsCache.delete(body.followerId); + await Promise.all([ + this.userFollowingsCache.delete(body.followerId), + this.userFollowersCache.delete(body.followeeId), + ]); this.userFollowStatsCache.delete(body.followerId); this.userFollowStatsCache.delete(body.followeeId); break; @@ -203,7 +213,10 @@ export class CacheService implements OnApplicationShutdown { if (follower) follower.followingCount--; const followee = this.userByIdCache.get(body.followeeId); if (followee) followee.followersCount--; - await this.userFollowingsCache.delete(body.followerId); + await Promise.all([ + this.userFollowingsCache.delete(body.followerId), + this.userFollowersCache.delete(body.followeeId), + ]); this.userFollowStatsCache.delete(body.followerId); this.userFollowStatsCache.delete(body.followeeId); break; diff --git a/packages/backend/test/misc/noOpCaches.ts b/packages/backend/test/misc/noOpCaches.ts index 7e8c27503c..40c5d2dc65 100644 --- a/packages/backend/test/misc/noOpCaches.ts +++ b/packages/backend/test/misc/noOpCaches.ts @@ -112,6 +112,12 @@ export class NoOpCacheService extends CacheService { onSet: this.userFollowingsCache.onSet, onDelete: this.userFollowingsCache.onDelete, }); + this.userFollowersCache = new NoOpQuantumKVCache>({ + internalEventService: fakeInternalEventService, + fetcher: this.userFollowersCache.fetcher, + onSet: this.userFollowersCache.onSet, + onDelete: this.userFollowersCache.onDelete, + }); this.userFollowStatsCache = new NoOpMemoryKVCache(); this.translationsCache = new NoOpRedisKVCache({ redis: fakeRedis, -- cgit v1.2.3-freya From fa68751a19877474bf78a80ef7204102296f0f17 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Sun, 8 Jun 2025 19:52:59 -0400 Subject: normalize userFollowingsCache / userFollowersCache and add hibernatedUserCache to reduce the number of cache-clears and allow use of caching in many more places --- packages/backend/src/core/AccountMoveService.ts | 24 +- packages/backend/src/core/CacheService.ts | 316 ++++++++++--------- packages/backend/src/core/GlobalEventService.ts | 2 +- packages/backend/src/core/NoteCreateService.ts | 43 ++- packages/backend/src/core/NoteEditService.ts | 33 +- packages/backend/src/core/UserFollowingService.ts | 121 +++----- packages/backend/src/core/UserService.ts | 21 +- packages/backend/src/core/UserSuspendService.ts | 10 +- .../core/activitypub/ApDeliverManagerService.ts | 54 ++-- .../backend/src/core/activitypub/ApInboxService.ts | 16 +- .../src/core/activitypub/models/ApPersonService.ts | 15 +- .../backend/src/core/chart/charts/federation.ts | 1 + .../src/core/chart/charts/per-user-following.ts | 19 +- .../backend/src/core/entities/NoteEntityService.ts | 10 +- .../backend/src/core/entities/UserEntityService.ts | 44 +-- packages/backend/src/misc/QuantumKVCache.ts | 145 ++++++--- packages/backend/src/misc/cache.ts | 18 +- .../processors/DeleteAccountProcessorService.ts | 18 ++ .../src/server/api/endpoints/following/delete.ts | 9 +- .../server/api/endpoints/following/invalidate.ts | 9 +- .../server/api/endpoints/following/update-all.ts | 4 + .../src/server/api/endpoints/following/update.ts | 9 +- .../src/server/api/endpoints/users/followers.ts | 9 +- .../src/server/api/endpoints/users/following.ts | 9 +- .../server/api/endpoints/users/recommendation.ts | 1 + .../backend/src/server/api/stream/Connection.ts | 2 +- packages/backend/test/misc/noOpCaches.ts | 104 +++---- packages/backend/test/unit/misc/QuantumKVCache.ts | 333 +++++++++++++++++---- 28 files changed, 817 insertions(+), 582 deletions(-) (limited to 'packages/backend/src/core/CacheService.ts') diff --git a/packages/backend/src/core/AccountMoveService.ts b/packages/backend/src/core/AccountMoveService.ts index 738026f753..e107f02796 100644 --- a/packages/backend/src/core/AccountMoveService.ts +++ b/packages/backend/src/core/AccountMoveService.ts @@ -26,6 +26,7 @@ import PerUserFollowingChart from '@/core/chart/charts/per-user-following.js'; import { SystemAccountService } from '@/core/SystemAccountService.js'; import { RoleService } from '@/core/RoleService.js'; import { AntennaService } from '@/core/AntennaService.js'; +import { CacheService } from '@/core/CacheService.js'; @Injectable() export class AccountMoveService { @@ -68,6 +69,7 @@ export class AccountMoveService { private systemAccountService: SystemAccountService, private roleService: RoleService, private antennaService: AntennaService, + private readonly cacheService: CacheService, ) { } @@ -107,12 +109,10 @@ export class AccountMoveService { this.globalEventService.publishMainStream(src.id, 'meUpdated', iObj); // Unfollow after 24 hours - const followings = await this.followingsRepository.findBy({ - followerId: src.id, - }); - this.queueService.createDelayedUnfollowJob(followings.map(following => ({ + const followings = await this.cacheService.userFollowingsCache.fetch(src.id); + this.queueService.createDelayedUnfollowJob(Array.from(followings.keys()).map(followeeId => ({ from: { id: src.id }, - to: { id: following.followeeId }, + to: { id: followeeId }, })), process.env.NODE_ENV === 'test' ? 10000 : 1000 * 60 * 60 * 24); await this.postMoveProcess(src, dst); @@ -138,11 +138,9 @@ export class AccountMoveService { // follow the new account const proxy = await this.systemAccountService.fetch('proxy'); - const followings = await this.followingsRepository.findBy({ - followeeId: src.id, - followerHost: IsNull(), // follower is local - followerId: Not(proxy.id), - }); + const followings = await this.cacheService.userFollowersCache.fetch(src.id) + .then(fs => Array.from(fs.values()) + .filter(f => f.followerHost == null && f.followerId !== proxy.id)); const followJobs = followings.map(following => ({ from: { id: following.followerId }, to: { id: dst.id }, @@ -318,9 +316,9 @@ export class AccountMoveService { await this.usersRepository.decrement({ id: In(localFollowerIds) }, 'followingCount', 1); // Decrease follower counts of local followees by 1. - const oldFollowings = await this.followingsRepository.findBy({ followerId: oldAccount.id }); - if (oldFollowings.length > 0) { - await this.usersRepository.decrement({ id: In(oldFollowings.map(following => following.followeeId)) }, 'followersCount', 1); + const oldFollowings = await this.cacheService.userFollowingsCache.fetch(oldAccount.id); + if (oldFollowings.size > 0) { + await this.usersRepository.decrement({ id: In(Array.from(oldFollowings.keys())) }, 'followersCount', 1); } // Update instance stats by decreasing remote followers count by the number of local followers who were following the old account. diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index e8b26f8b9b..9c68597441 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -6,7 +6,7 @@ import { Inject, Injectable } from '@nestjs/common'; import * as Redis from 'ioredis'; import { In, IsNull } from 'typeorm'; -import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiNote } from '@/models/_.js'; +import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiNote, MiFollowing } from '@/models/_.js'; import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js'; import { QuantumKVCache } from '@/misc/QuantumKVCache.js'; import type { MiLocalUser, MiUser } from '@/models/User.js'; @@ -46,8 +46,9 @@ export class CacheService implements OnApplicationShutdown { public userBlockingCache: QuantumKVCache>; public userBlockedCache: QuantumKVCache>; // NOTE: 「被」Blockキャッシュ public renoteMutingsCache: QuantumKVCache>; - public userFollowingsCache: QuantumKVCache>; - public userFollowersCache: QuantumKVCache>; + public userFollowingsCache: QuantumKVCache>>; + public userFollowersCache: QuantumKVCache>>; + public hibernatedUserCache: QuantumKVCache; protected userFollowStatsCache = new MemoryKVCache(1000 * 60 * 10); // 10 minutes protected translationsCache: RedisKVCache; @@ -89,36 +90,145 @@ export class CacheService implements OnApplicationShutdown { this.userProfileCache = new QuantumKVCache(this.internalEventService, 'userProfile', { lifetime: 1000 * 60 * 30, // 30m fetcher: (key) => this.userProfilesRepository.findOneByOrFail({ userId: key }), + bulkFetcher: userIds => this.userProfilesRepository.findBy({ userId: In(userIds) }).then(ps => ps.map(p => [p.userId, p])), }); this.userMutingsCache = new QuantumKVCache>(this.internalEventService, 'userMutings', { lifetime: 1000 * 60 * 30, // 30m fetcher: (key) => this.mutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))), + bulkFetcher: muterIds => this.mutingsRepository + .createQueryBuilder('muting') + .select('"muting"."muterId"', 'muterId') + .addSelect('array_agg("muting"."muteeId")', 'muteeIds') + .where({ muterId: In(muterIds) }) + .groupBy('muting.muterId') + .getRawMany<{ muterId: string, muteeIds: string[] }>() + .then(ms => ms.map(m => [m.muterId, new Set(m.muteeIds)])), }); this.userBlockingCache = new QuantumKVCache>(this.internalEventService, 'userBlocking', { lifetime: 1000 * 60 * 30, // 30m fetcher: (key) => this.blockingsRepository.find({ where: { blockerId: key }, select: ['blockeeId'] }).then(xs => new Set(xs.map(x => x.blockeeId))), + bulkFetcher: blockerIds => this.blockingsRepository + .createQueryBuilder('blocking') + .select('"blocking"."blockerId"', 'blockerId') + .addSelect('array_agg("blocking"."blockeeId")', 'blockeeIds') + .where({ blockerId: In(blockerIds) }) + .groupBy('blocking.blockerId') + .getRawMany<{ blockerId: string, blockeeIds: string[] }>() + .then(ms => ms.map(m => [m.blockerId, new Set(m.blockeeIds)])), }); this.userBlockedCache = new QuantumKVCache>(this.internalEventService, 'userBlocked', { lifetime: 1000 * 60 * 30, // 30m fetcher: (key) => this.blockingsRepository.find({ where: { blockeeId: key }, select: ['blockerId'] }).then(xs => new Set(xs.map(x => x.blockerId))), + bulkFetcher: blockeeIds => this.blockingsRepository + .createQueryBuilder('blocking') + .select('"blocking"."blockeeId"', 'blockeeId') + .addSelect('array_agg("blocking"."blockeeId")', 'blockeeIds') + .where({ blockeeId: In(blockeeIds) }) + .groupBy('blocking.blockeeId') + .getRawMany<{ blockeeId: string, blockerIds: string[] }>() + .then(ms => ms.map(m => [m.blockeeId, new Set(m.blockerIds)])), }); this.renoteMutingsCache = new QuantumKVCache>(this.internalEventService, 'renoteMutings', { lifetime: 1000 * 60 * 30, // 30m fetcher: (key) => this.renoteMutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))), + bulkFetcher: muterIds => this.renoteMutingsRepository + .createQueryBuilder('muting') + .select('"muting"."muterId"', 'muterId') + .addSelect('array_agg("muting"."muteeId")', 'muteeIds') + .where({ muterId: In(muterIds) }) + .groupBy('muting.muterId') + .getRawMany<{ muterId: string, muteeIds: string[] }>() + .then(ms => ms.map(m => [m.muterId, new Set(m.muteeIds)])), }); - this.userFollowingsCache = new QuantumKVCache>(this.internalEventService, 'userFollowings', { + this.userFollowingsCache = new QuantumKVCache>>(this.internalEventService, 'userFollowings', { lifetime: 1000 * 60 * 30, // 30m - fetcher: (key) => this.followingsRepository.find({ where: { followerId: key }, select: ['followeeId', 'withReplies'] }).then(xs => new Map(xs.map(f => [f.followeeId, { withReplies: f.withReplies }]))), + fetcher: (key) => this.followingsRepository.findBy({ followerId: key }).then(xs => new Map(xs.map(f => [f.followeeId, f]))), + bulkFetcher: followerIds => this.followingsRepository + .findBy({ followerId: In(followerIds) }) + .then(fs => fs + .reduce((groups, f) => { + let group = groups.get(f.followerId); + if (!group) { + group = new Map(); + groups.set(f.followerId, group); + } + group.set(f.followeeId, f); + return groups; + }, {} as Map>>)), + }); + + this.userFollowersCache = new QuantumKVCache>>(this.internalEventService, 'userFollowers', { + lifetime: 1000 * 60 * 30, // 30m + fetcher: followeeId => this.followingsRepository.findBy({ followeeId: followeeId }).then(xs => new Map(xs.map(x => [x.followerId, x]))), + bulkFetcher: followeeIds => this.followingsRepository + .findBy({ followeeId: In(followeeIds) }) + .then(fs => fs + .reduce((groups, f) => { + let group = groups.get(f.followeeId); + if (!group) { + group = new Map(); + groups.set(f.followeeId, group); + } + group.set(f.followerId, f); + return groups; + }, {} as Map>>)), }); - this.userFollowersCache = new QuantumKVCache>(this.internalEventService, 'userFollowers', { + this.hibernatedUserCache = new QuantumKVCache(this.internalEventService, 'hibernatedUsers', { lifetime: 1000 * 60 * 30, // 30m - fetcher: (key) => this.followingsRepository.find({ where: { followeeId: key }, select: ['followerId'] }).then(xs => new Set(xs.map(x => x.followerId))), + fetcher: async userId => { + const { isHibernated } = await this.usersRepository.findOneOrFail({ + where: { id: userId }, + select: { isHibernated: true }, + }); + return isHibernated; + }, + bulkFetcher: async userIds => { + const results = await this.usersRepository.find({ + where: { id: In(userIds) }, + select: { id: true, isHibernated: true }, + }); + return results.map(({ id, isHibernated }) => [id, isHibernated]); + }, + onChanged: async userIds => { + // We only update local copies since each process will get this event, but we can have user objects in multiple different caches. + // Before doing anything else we must "find" all the objects to update. + const userObjects = new Map(); + const toUpdate: string[] = []; + for (const uid of userIds) { + const toAdd: MiUser[] = []; + + const localUserById = this.localUserByIdCache.get(uid); + if (localUserById) toAdd.push(localUserById); + + const userById = this.userByIdCache.get(uid); + if (userById) toAdd.push(userById); + + if (toAdd.length > 0) { + toUpdate.push(uid); + userObjects.set(uid, toAdd); + } + } + + // In many cases, we won't have to do anything. + // Skipping the DB fetch ensures that this remains a single-step synchronous process. + if (toUpdate.length > 0) { + const hibernations = await this.usersRepository.find({ where: { id: In(toUpdate) }, select: { id: true, isHibernated: true } }); + for (const { id, isHibernated } of hibernations) { + const users = userObjects.get(id); + if (users) { + for (const u of users) { + u.isHibernated = isHibernated; + } + } + } + } + }, }); this.translationsCache = new RedisKVCache(this.redisClient, 'translations', { @@ -161,6 +271,7 @@ export class CacheService implements OnApplicationShutdown { this.renoteMutingsCache.delete(body.id), this.userFollowingsCache.delete(body.id), this.userFollowersCache.delete(body.id), + this.hibernatedUserCache.delete(body.id), ]); } } else { @@ -312,142 +423,6 @@ export class CacheService implements OnApplicationShutdown { } @bindThis - public async getUserFollowings(userIds: Iterable): Promise>> { - const followings = new Map>(); - - const toFetch: string[] = []; - for (const userId of userIds) { - const fromCache = this.userFollowingsCache.get(userId); - if (fromCache) { - followings.set(userId, fromCache); - } else { - toFetch.push(userId); - } - } - - if (toFetch.length > 0) { - const fetchedFollowings = await this.followingsRepository - .createQueryBuilder('following') - .select([ - 'following.followerId', - 'following.followeeId', - 'following.withReplies', - ]) - .where({ - followerId: In(toFetch), - }) - .getMany(); - - const toCache = new Map>(); - - // Pivot to a map - for (const { followerId, followeeId, withReplies } of fetchedFollowings) { - // Queue for cache - let cacheMap = toCache.get(followerId); - if (!cacheMap) { - cacheMap = new Map(); - toCache.set(followerId, cacheMap); - } - cacheMap.set(followeeId, { withReplies }); - - // Queue for return - let returnSet = followings.get(followerId); - if (!returnSet) { - returnSet = new Map(); - followings.set(followerId, returnSet); - } - returnSet.set(followeeId, { withReplies }); - } - - // Update cache to speed up future calls - this.userFollowingsCache.addMany(toCache); - } - - return followings; - } - - @bindThis - public async getUserBlockers(userIds: Iterable): Promise>> { - const blockers = new Map>(); - - const toFetch: string[] = []; - for (const userId of userIds) { - const fromCache = this.userBlockedCache.get(userId); - if (fromCache) { - blockers.set(userId, fromCache); - } else { - toFetch.push(userId); - } - } - - if (toFetch.length > 0) { - const fetchedBlockers = await this.blockingsRepository.createQueryBuilder('blocking') - .select([ - 'blocking.blockerId', - 'blocking.blockeeId', - ]) - .where({ - blockeeId: In(toFetch), - }) - .getMany(); - - const toCache = new Map>(); - - // Pivot to a map - for (const { blockerId, blockeeId } of fetchedBlockers) { - // Queue for cache - let cacheSet = toCache.get(blockeeId); - if (!cacheSet) { - cacheSet = new Set(); - toCache.set(blockeeId, cacheSet); - } - cacheSet.add(blockerId); - - // Queue for return - let returnSet = blockers.get(blockeeId); - if (!returnSet) { - returnSet = new Set(); - blockers.set(blockeeId, returnSet); - } - returnSet.add(blockerId); - } - - // Update cache to speed up future calls - this.userBlockedCache.addMany(toCache); - } - - return blockers; - } - - public async getUserProfiles(userIds: Iterable): Promise> { - const profiles = new Map; - - const toFetch: string[] = []; - for (const userId of userIds) { - const fromCache = this.userProfileCache.get(userId); - if (fromCache) { - profiles.set(userId, fromCache); - } else { - toFetch.push(userId); - } - } - - if (toFetch.length > 0) { - const fetched = await this.userProfilesRepository.findBy({ - userId: In(toFetch), - }); - - for (const profile of fetched) { - profiles.set(profile.userId, profile); - } - - const toCache = new Map(fetched.map(p => [p.userId, p])); - this.userProfileCache.addMany(toCache); - } - - return profiles; - } - public async getUsers(userIds: Iterable): Promise> { const users = new Map; @@ -475,6 +450,61 @@ export class CacheService implements OnApplicationShutdown { return users; } + @bindThis + public async isFollowing(follower: string | { id: string }, followee: string | { id: string }): Promise { + const followerId = typeof(follower) === 'string' ? follower : follower.id; + const followeeId = typeof(followee) === 'string' ? followee : followee.id; + + // This lets us use whichever one is in memory, falling back to DB fetch via userFollowingsCache. + return this.userFollowersCache.get(followeeId)?.has(followerId) + ?? (await this.userFollowingsCache.fetch(followerId)).has(followeeId); + } + + /** + * Returns all hibernated followers. + */ + @bindThis + public async getHibernatedFollowers(followeeId: string): Promise { + const followers = await this.getFollowersWithHibernation(followeeId); + return followers.filter(f => f.isFollowerHibernated); + } + + /** + * Returns all non-hibernated followers. + */ + @bindThis + public async getNonHibernatedFollowers(followeeId: string): Promise { + const followers = await this.getFollowersWithHibernation(followeeId); + return followers.filter(f => !f.isFollowerHibernated); + } + + /** + * Returns follower relations with populated isFollowerHibernated. + * If you don't need this field, then please use userFollowersCache directly for reduced overhead. + */ + @bindThis + public async getFollowersWithHibernation(followeeId: string): Promise { + const followers = await this.userFollowersCache.fetch(followeeId); + const hibernations = await this.hibernatedUserCache.fetchMany(followers.keys()).then(fs => fs.reduce((map, f) => { + map.set(f[0], f[1]); + return map; + }, new Map)); + return Array.from(followers.values()).map(following => ({ + ...following, + isFollowerHibernated: hibernations.get(following.followerId) ?? false, + })); + } + + /** + * Refreshes follower and following relations for the given user. + */ + @bindThis + public async refreshFollowRelationsFor(userId: string): Promise { + const followings = await this.userFollowingsCache.refresh(userId); + const followees = Array.from(followings.values()).map(f => f.followeeId); + await this.userFollowersCache.deleteMany(followees); + } + @bindThis public clear(): void { this.userByIdCache.clear(); diff --git a/packages/backend/src/core/GlobalEventService.ts b/packages/backend/src/core/GlobalEventService.ts index de35e9db19..c146811331 100644 --- a/packages/backend/src/core/GlobalEventService.ts +++ b/packages/backend/src/core/GlobalEventService.ts @@ -265,7 +265,7 @@ export interface InternalEventTypes { unmute: { muterId: MiUser['id']; muteeId: MiUser['id']; }; userListMemberAdded: { userListId: MiUserList['id']; memberId: MiUser['id']; }; userListMemberRemoved: { userListId: MiUserList['id']; memberId: MiUser['id']; }; - quantumCacheUpdated: { name: string, keys: string[], op: 's' | 'd' }; + quantumCacheUpdated: { name: string, keys: string[] }; } type EventTypesToEventPayload = EventUnionFromDictionary>>; diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index 4dceb6e953..a9f4083446 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -606,11 +606,11 @@ export class NoteCreateService implements OnApplicationShutdown { } if (data.reply == null) { - // TODO: キャッシュ - this.followingsRepository.findBy({ - followeeId: user.id, - notify: 'normal', - }).then(async followings => { + this.cacheService.userFollowersCache.fetch(user.id).then(async followingsMap => { + const followings = Array + .from(followingsMap.values()) + .filter(f => f.notify === 'normal'); + if (note.visibility !== 'specified') { const isPureRenote = this.isRenote(data) && !this.isQuote(data) ? true : false; for (const following of followings) { @@ -948,14 +948,7 @@ export class NoteCreateService implements OnApplicationShutdown { // TODO: キャッシュ? // eslint-disable-next-line prefer-const let [followings, userListMemberships] = await Promise.all([ - this.followingsRepository.find({ - where: { - followeeId: user.id, - followerHost: IsNull(), - isFollowerHibernated: false, - }, - select: ['followerId', 'withReplies'], - }), + this.cacheService.getNonHibernatedFollowers(user.id), this.userListMembershipsRepository.find({ where: { userId: user.id, @@ -1072,17 +1065,19 @@ export class NoteCreateService implements OnApplicationShutdown { }); if (hibernatedUsers.length > 0) { - this.usersRepository.update({ - id: In(hibernatedUsers.map(x => x.id)), - }, { - isHibernated: true, - }); - - this.followingsRepository.update({ - followerId: In(hibernatedUsers.map(x => x.id)), - }, { - isFollowerHibernated: true, - }); + await Promise.all([ + this.usersRepository.update({ + id: In(hibernatedUsers.map(x => x.id)), + }, { + isHibernated: true, + }), + this.followingsRepository.update({ + followerId: In(hibernatedUsers.map(x => x.id)), + }, { + isFollowerHibernated: true, + }), + this.cacheService.hibernatedUserCache.setMany(hibernatedUsers.map(x => [x.id, true])), + ]); } } diff --git a/packages/backend/src/core/NoteEditService.ts b/packages/backend/src/core/NoteEditService.ts index 34af1c76dd..a359381573 100644 --- a/packages/backend/src/core/NoteEditService.ts +++ b/packages/backend/src/core/NoteEditService.ts @@ -833,14 +833,7 @@ export class NoteEditService implements OnApplicationShutdown { // TODO: キャッシュ? // eslint-disable-next-line prefer-const let [followings, userListMemberships] = await Promise.all([ - this.followingsRepository.find({ - where: { - followeeId: user.id, - followerHost: IsNull(), - isFollowerHibernated: false, - }, - select: ['followerId', 'withReplies'], - }), + this.cacheService.getNonHibernatedFollowers(user.id), this.userListMembershipsRepository.find({ where: { userId: user.id, @@ -957,17 +950,19 @@ export class NoteEditService implements OnApplicationShutdown { }); if (hibernatedUsers.length > 0) { - this.usersRepository.update({ - id: In(hibernatedUsers.map(x => x.id)), - }, { - isHibernated: true, - }); - - this.followingsRepository.update({ - followerId: In(hibernatedUsers.map(x => x.id)), - }, { - isFollowerHibernated: true, - }); + await Promise.all([ + this.usersRepository.update({ + id: In(hibernatedUsers.map(x => x.id)), + }, { + isHibernated: true, + }), + this.followingsRepository.update({ + followerId: In(hibernatedUsers.map(x => x.id)), + }, { + isFollowerHibernated: true, + }), + this.cacheService.hibernatedUserCache.setMany(hibernatedUsers.map(x => [x.id, true])), + ]); } } diff --git a/packages/backend/src/core/UserFollowingService.ts b/packages/backend/src/core/UserFollowingService.ts index 6a6c9a3000..8470872eac 100644 --- a/packages/backend/src/core/UserFollowingService.ts +++ b/packages/backend/src/core/UserFollowingService.ts @@ -147,12 +147,7 @@ export class UserFollowingService implements OnModuleInit { if (blocked) throw new IdentifiableError('3338392a-f764-498d-8855-db939dcf8c48', 'blocked'); } - if (await this.followingsRepository.exists({ - where: { - followerId: follower.id, - followeeId: followee.id, - }, - })) { + if (await this.cacheService.isFollowing(follower, followee)) { // すでにフォロー関係が存在している場合 if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee)) { // リモート → ローカル: acceptを送り返しておしまい @@ -180,24 +175,14 @@ export class UserFollowingService implements OnModuleInit { let autoAccept = false; // 鍵アカウントであっても、既にフォローされていた場合はスルー - const isFollowing = await this.followingsRepository.exists({ - where: { - followerId: follower.id, - followeeId: followee.id, - }, - }); + const isFollowing = await this.cacheService.isFollowing(follower, followee); if (isFollowing) { autoAccept = true; } // フォローしているユーザーは自動承認オプション if (!autoAccept && (this.userEntityService.isLocalUser(followee) && followeeProfile.autoAcceptFollowed)) { - const isFollowed = await this.followingsRepository.exists({ - where: { - followerId: followee.id, - followeeId: follower.id, - }, - }); + const isFollowed = await this.cacheService.isFollowing(followee, follower); // intentionally reversed parameters if (isFollowed) autoAccept = true; } @@ -206,12 +191,7 @@ export class UserFollowingService implements OnModuleInit { if (followee.isLocked && !autoAccept) { autoAccept = !!(await this.accountMoveService.validateAlsoKnownAs( follower, - (oldSrc, newSrc) => this.followingsRepository.exists({ - where: { - followeeId: followee.id, - followerId: newSrc.id, - }, - }), + (oldSrc, newSrc) => this.cacheService.isFollowing(newSrc, followee), true, )); } @@ -366,32 +346,29 @@ export class UserFollowingService implements OnModuleInit { }, silent = false, ): Promise { - const following = await this.followingsRepository.findOne({ - relations: { - follower: true, - followee: true, - }, - where: { - followerId: follower.id, - followeeId: followee.id, - }, - }); + const [ + followerUser, + followeeUser, + following, + ] = await Promise.all([ + this.cacheService.findUserById(follower.id), + this.cacheService.findUserById(followee.id), + this.cacheService.userFollowingsCache.fetch(follower.id).then(fs => fs.get(followee.id)), + ]); - if (following === null || !following.follower || !following.followee) { + if (following == null) { this.logger.warn('フォロー解除がリクエストされましたがフォローしていませんでした'); return; } await this.followingsRepository.delete(following.id); + await this.internalEventService.emit('unfollow', { followerId: follower.id, followeeId: followee.id }); - // Handled by CacheService - // this.cacheService.userFollowingsCache.refresh(follower.id); - - this.decrementFollowing(following.follower, following.followee); + this.decrementFollowing(followerUser, followeeUser); if (!silent && this.userEntityService.isLocalUser(follower)) { // Publish unfollow event - this.userEntityService.pack(followee.id, follower, { + this.userEntityService.pack(followeeUser, follower, { schema: 'UserDetailedNotMe', }).then(async packed => { this.globalEventService.publishMainStream(follower.id, 'unfollow', packed); @@ -416,8 +393,6 @@ export class UserFollowingService implements OnModuleInit { follower: MiUser, followee: MiUser, ): Promise { - await this.internalEventService.emit('unfollow', { followerId: follower.id, followeeId: followee.id }); - // Neither followee nor follower has moved. if (!follower.movedToUri && !followee.movedToUri) { //#region Decrement following / followers counts @@ -691,22 +666,22 @@ export class UserFollowingService implements OnModuleInit { */ @bindThis private async removeFollow(followee: Both, follower: Both): Promise { - const following = await this.followingsRepository.findOne({ - relations: { - followee: true, - follower: true, - }, - where: { - followeeId: followee.id, - followerId: follower.id, - }, - }); + const [ + followerUser, + followeeUser, + following, + ] = await Promise.all([ + this.cacheService.findUserById(follower.id), + this.cacheService.findUserById(followee.id), + this.cacheService.userFollowingsCache.fetch(follower.id).then(fs => fs.get(followee.id)), + ]); - if (!following || !following.followee || !following.follower) return; + if (!following) return; await this.followingsRepository.delete(following.id); + await this.internalEventService.emit('unfollow', { followerId: follower.id, followeeId: followee.id }); - this.decrementFollowing(following.follower, following.followee); + this.decrementFollowing(followerUser, followeeUser); } /** @@ -737,36 +712,26 @@ export class UserFollowingService implements OnModuleInit { } @bindThis - public getFollowees(userId: MiUser['id']) { - return this.followingsRepository.createQueryBuilder('following') - .select('following.followeeId') - .where('following.followerId = :followerId', { followerId: userId }) - .getMany(); + public async getFollowees(userId: MiUser['id']) { + const followings = await this.cacheService.userFollowingsCache.fetch(userId); + return Array.from(followings.values()); } @bindThis - public isFollowing(followerId: MiUser['id'], followeeId: MiUser['id']) { - return this.followingsRepository.exists({ - where: { - followerId, - followeeId, - }, - }); + public async isFollowing(followerId: MiUser['id'], followeeId: MiUser['id']) { + return this.cacheService.isFollowing(followerId, followeeId); } @bindThis public async isMutual(aUserId: MiUser['id'], bUserId: MiUser['id']) { - const count = await this.followingsRepository.createQueryBuilder('following') - .where(new Brackets(qb => { - qb.where('following.followerId = :aUserId', { aUserId }) - .andWhere('following.followeeId = :bUserId', { bUserId }); - })) - .orWhere(new Brackets(qb => { - qb.where('following.followerId = :bUserId', { bUserId }) - .andWhere('following.followeeId = :aUserId', { aUserId }); - })) - .getCount(); - - return count === 2; + const [ + isFollowing, + isFollowed, + ] = await Promise.all([ + this.isFollowing(aUserId, bUserId), + this.isFollowing(bUserId, aUserId), + ]); + + return isFollowing && isFollowed; } } diff --git a/packages/backend/src/core/UserService.ts b/packages/backend/src/core/UserService.ts index 1f471513f3..4a04910105 100644 --- a/packages/backend/src/core/UserService.ts +++ b/packages/backend/src/core/UserService.ts @@ -10,6 +10,7 @@ import { DI } from '@/di-symbols.js'; import { bindThis } from '@/decorators.js'; import { SystemWebhookService } from '@/core/SystemWebhookService.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; +import { CacheService } from '@/core/CacheService.js'; @Injectable() export class UserService { @@ -20,6 +21,7 @@ export class UserService { private followingsRepository: FollowingsRepository, private systemWebhookService: SystemWebhookService, private userEntityService: UserEntityService, + private readonly cacheService: CacheService, ) { } @@ -38,14 +40,17 @@ export class UserService { }); const wokeUp = result.isHibernated; if (wokeUp) { - this.usersRepository.update(user.id, { - isHibernated: false, - }); - this.followingsRepository.update({ - followerId: user.id, - }, { - isFollowerHibernated: false, - }); + await Promise.all([ + this.usersRepository.update(user.id, { + isHibernated: false, + }), + this.followingsRepository.update({ + followerId: user.id, + }, { + isFollowerHibernated: false, + }), + this.cacheService.hibernatedUserCache.set(user.id, false), + ]); } } else { this.usersRepository.update(user.id, { diff --git a/packages/backend/src/core/UserSuspendService.ts b/packages/backend/src/core/UserSuspendService.ts index 30dcaa6f7d..f375dff862 100644 --- a/packages/backend/src/core/UserSuspendService.ts +++ b/packages/backend/src/core/UserSuspendService.ts @@ -16,6 +16,7 @@ import { bindThis } from '@/decorators.js'; import { RelationshipJobData } from '@/queue/types.js'; import { ModerationLogService } from '@/core/ModerationLogService.js'; import { isSystemAccount } from '@/misc/is-system-account.js'; +import { CacheService } from '@/core/CacheService.js'; @Injectable() export class UserSuspendService { @@ -34,6 +35,7 @@ export class UserSuspendService { private globalEventService: GlobalEventService, private apRendererService: ApRendererService, private moderationLogService: ModerationLogService, + private readonly cacheService: CacheService, ) { } @@ -143,12 +145,8 @@ export class UserSuspendService { @bindThis private async unFollowAll(follower: MiUser) { - const followings = await this.followingsRepository.find({ - where: { - followerId: follower.id, - followeeId: Not(IsNull()), - }, - }); + const followings = await this.cacheService.userFollowingsCache.fetch(follower.id) + .then(fs => Array.from(fs.values()).filter(f => f.followeeHost != null)); const jobs: RelationshipJobData[] = []; for (const following of followings) { diff --git a/packages/backend/src/core/activitypub/ApDeliverManagerService.ts b/packages/backend/src/core/activitypub/ApDeliverManagerService.ts index 746af41f55..86747f2508 100644 --- a/packages/backend/src/core/activitypub/ApDeliverManagerService.ts +++ b/packages/backend/src/core/activitypub/ApDeliverManagerService.ts @@ -5,7 +5,6 @@ import { Inject, Injectable } from '@nestjs/common'; import { IsNull, Not } from 'typeorm'; -import { UnrecoverableError } from 'bullmq'; import { DI } from '@/di-symbols.js'; import type { FollowingsRepository } from '@/models/_.js'; import type { MiLocalUser, MiRemoteUser, MiUser } from '@/models/User.js'; @@ -14,6 +13,7 @@ import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { bindThis } from '@/decorators.js'; import type { IActivity } from '@/core/activitypub/type.js'; import { ThinUser } from '@/queue/types.js'; +import { CacheService } from '@/core/CacheService.js'; interface IRecipe { type: string; @@ -41,16 +41,14 @@ class DeliverManager { /** * Constructor - * @param userEntityService - * @param followingsRepository * @param queueService + * @param cacheService * @param actor Actor * @param activity Activity to deliver */ constructor( - private userEntityService: UserEntityService, - private followingsRepository: FollowingsRepository, private queueService: QueueService, + private readonly cacheService: CacheService, actor: { id: MiUser['id']; host: null; }, activity: IActivity | null, @@ -114,24 +112,23 @@ class DeliverManager { // Process follower recipes first to avoid duplication when processing direct recipes later. if (this.recipes.some(r => isFollowers(r))) { // followers deliver - // TODO: SELECT DISTINCT ON ("followerSharedInbox") "followerSharedInbox" みたいな問い合わせにすればよりパフォーマンス向上できそう // ただ、sharedInboxがnullなリモートユーザーも稀におり、その対応ができなさそう? - const followers = await this.followingsRepository.find({ - where: { - followeeId: this.actor.id, - followerHost: Not(IsNull()), - }, - select: { - followerSharedInbox: true, - followerInbox: true, - followerId: true, - }, - }); + const followers = await this.cacheService.userFollowingsCache + .fetch(this.actor.id) + .then(f => Array + .from(f.values()) + .filter(f => f.followerHost != null) + .map(f => ({ + followerInbox: f.followerInbox, + followerSharedInbox: f.followerSharedInbox, + }))); for (const following of followers) { - const inbox = following.followerSharedInbox ?? following.followerInbox; - if (inbox === null) throw new UnrecoverableError(`deliver failed for ${this.actor.id}: follower ${following.followerId} inbox is null`); - inboxes.set(inbox, following.followerSharedInbox != null); + if (following.followerSharedInbox) { + inboxes.set(following.followerSharedInbox, true); + } else if (following.followerInbox) { + inboxes.set(following.followerInbox, false); + } } } @@ -153,11 +150,8 @@ class DeliverManager { @Injectable() export class ApDeliverManagerService { constructor( - @Inject(DI.followingsRepository) - private followingsRepository: FollowingsRepository, - - private userEntityService: UserEntityService, private queueService: QueueService, + private readonly cacheService: CacheService, ) { } @@ -169,9 +163,8 @@ export class ApDeliverManagerService { @bindThis public async deliverToFollowers(actor: { id: MiLocalUser['id']; host: null; }, activity: IActivity): Promise { const manager = new DeliverManager( - this.userEntityService, - this.followingsRepository, this.queueService, + this.cacheService, actor, activity, ); @@ -188,9 +181,8 @@ export class ApDeliverManagerService { @bindThis public async deliverToUser(actor: { id: MiLocalUser['id']; host: null; }, activity: IActivity, to: MiRemoteUser): Promise { const manager = new DeliverManager( - this.userEntityService, - this.followingsRepository, this.queueService, + this.cacheService, actor, activity, ); @@ -207,9 +199,8 @@ export class ApDeliverManagerService { @bindThis public async deliverToUsers(actor: { id: MiLocalUser['id']; host: null; }, activity: IActivity, targets: MiRemoteUser[]): Promise { const manager = new DeliverManager( - this.userEntityService, - this.followingsRepository, this.queueService, + this.cacheService, actor, activity, ); @@ -220,9 +211,8 @@ export class ApDeliverManagerService { @bindThis public createDeliverManager(actor: { id: MiUser['id']; host: null; }, activity: IActivity | null): DeliverManager { return new DeliverManager( - this.userEntityService, - this.followingsRepository, this.queueService, + this.cacheService, actor, activity, diff --git a/packages/backend/src/core/activitypub/ApInboxService.ts b/packages/backend/src/core/activitypub/ApInboxService.ts index 7c26deb00f..009d4cbd39 100644 --- a/packages/backend/src/core/activitypub/ApInboxService.ts +++ b/packages/backend/src/core/activitypub/ApInboxService.ts @@ -37,6 +37,7 @@ import InstanceChart from '@/core/chart/charts/instance.js'; import FederationChart from '@/core/chart/charts/federation.js'; import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js'; import { UpdateInstanceQueue } from '@/core/UpdateInstanceQueue.js'; +import { CacheService } from '@/core/CacheService.js'; import { getApHrefNullable, getApId, getApIds, getApType, getNullableApId, isAccept, isActor, isAdd, isAnnounce, isApObject, isBlock, isCollectionOrOrderedCollection, isCreate, isDelete, isFlag, isFollow, isLike, isDislike, isMove, isPost, isReject, isRemove, isTombstone, isUndo, isUpdate, validActor, validPost, isActivity, IObjectWithId } from './type.js'; import { ApNoteService } from './models/ApNoteService.js'; import { ApLoggerService } from './ApLoggerService.js'; @@ -98,6 +99,7 @@ export class ApInboxService { private readonly instanceChart: InstanceChart, private readonly federationChart: FederationChart, private readonly updateInstanceQueue: UpdateInstanceQueue, + private readonly cacheService: CacheService, ) { this.logger = this.apLoggerService.logger; } @@ -766,12 +768,7 @@ export class ApInboxService { return 'skip: follower not found'; } - const isFollowing = await this.followingsRepository.exists({ - where: { - followerId: follower.id, - followeeId: actor.id, - }, - }); + const isFollowing = await this.cacheService.userFollowingsCache.fetch(follower.id).then(f => f.has(actor.id)); if (isFollowing) { await this.userFollowingService.unfollow(follower, actor); @@ -830,12 +827,7 @@ export class ApInboxService { }, }); - const isFollowing = await this.followingsRepository.exists({ - where: { - followerId: actor.id, - followeeId: followee.id, - }, - }); + const isFollowing = await this.cacheService.userFollowingsCache.fetch(actor.id).then(f => f.has(followee.id)); if (requestExist) { await this.userFollowingService.cancelFollowRequest(followee, actor); diff --git a/packages/backend/src/core/activitypub/models/ApPersonService.ts b/packages/backend/src/core/activitypub/models/ApPersonService.ts index b7aa036068..29f7459219 100644 --- a/packages/backend/src/core/activitypub/models/ApPersonService.ts +++ b/packages/backend/src/core/activitypub/models/ApPersonService.ts @@ -741,10 +741,17 @@ export class ApPersonService implements OnModuleInit, OnApplicationShutdown { this.hashtagService.updateUsertags(exist, tags); // 該当ユーザーが既にフォロワーになっていた場合はFollowingもアップデートする - await this.followingsRepository.update( - { followerId: exist.id }, - { followerSharedInbox: person.sharedInbox ?? person.endpoints?.sharedInbox ?? null }, - ); + if (exist.inbox !== person.inbox || exist.sharedInbox !== (person.sharedInbox ?? person.endpoints?.sharedInbox)) { + await this.followingsRepository.update( + { followerId: exist.id }, + { + followerInbox: person.inbox, + followerSharedInbox: person.sharedInbox ?? person.endpoints?.sharedInbox ?? null, + }, + ); + + await this.cacheService.refreshFollowRelationsFor(exist.id); + } await this.updateFeatured(exist.id, resolver).catch(err => { // Permanent error implies hidden or inaccessible, which is a normal thing. diff --git a/packages/backend/src/core/chart/charts/federation.ts b/packages/backend/src/core/chart/charts/federation.ts index b6db6f5454..4bbb5437cc 100644 --- a/packages/backend/src/core/chart/charts/federation.ts +++ b/packages/backend/src/core/chart/charts/federation.ts @@ -44,6 +44,7 @@ export default class FederationChart extends Chart { // eslint-di } protected async tickMinor(): Promise>> { + // TODO optimization: replace these with exists() const pubsubSubQuery = this.followingsRepository.createQueryBuilder('f') .select('f.followerHost') .where('f.followerHost IS NOT NULL'); diff --git a/packages/backend/src/core/chart/charts/per-user-following.ts b/packages/backend/src/core/chart/charts/per-user-following.ts index 588ac638de..8d75a30e9a 100644 --- a/packages/backend/src/core/chart/charts/per-user-following.ts +++ b/packages/backend/src/core/chart/charts/per-user-following.ts @@ -15,6 +15,7 @@ import Chart from '../core.js'; import { ChartLoggerService } from '../ChartLoggerService.js'; import { name, schema } from './entities/per-user-following.js'; import type { KVs } from '../core.js'; +import { CacheService } from '@/core/CacheService.js'; /** * ユーザーごとのフォローに関するチャート @@ -31,23 +32,25 @@ export default class PerUserFollowingChart extends Chart { // esl private appLockService: AppLockService, private userEntityService: UserEntityService, private chartLoggerService: ChartLoggerService, + private readonly cacheService: CacheService, ) { super(db, (k) => appLockService.getChartInsertLock(k), chartLoggerService.logger, name, schema, true); } protected async tickMajor(group: string): Promise>> { const [ - localFollowingsCount, - localFollowersCount, - remoteFollowingsCount, - remoteFollowersCount, + followees, + followers, ] = await Promise.all([ - this.followingsRepository.countBy({ followerId: group, followeeHost: IsNull() }), - this.followingsRepository.countBy({ followeeId: group, followerHost: IsNull() }), - this.followingsRepository.countBy({ followerId: group, followeeHost: Not(IsNull()) }), - this.followingsRepository.countBy({ followeeId: group, followerHost: Not(IsNull()) }), + this.cacheService.userFollowingsCache.fetch(group).then(fs => Array.from(fs.values())), + this.cacheService.userFollowersCache.fetch(group).then(fs => Array.from(fs.values())), ]); + const localFollowingsCount = followees.reduce((sum, f) => sum + (f.followeeHost == null ? 1 : 0), 0); + const localFollowersCount = followers.reduce((sum, f) => sum + (f.followerHost == null ? 1 : 0), 0); + const remoteFollowingsCount = followees.reduce((sum, f) => sum + (f.followeeHost == null ? 0 : 1), 0); + const remoteFollowersCount = followers.reduce((sum, f) => sum + (f.followerHost == null ? 0 : 1), 0); + return { 'local.followings.total': localFollowingsCount, 'local.followers.total': localFollowersCount, diff --git a/packages/backend/src/core/entities/NoteEntityService.ts b/packages/backend/src/core/entities/NoteEntityService.ts index 3af66b220d..9b447a4064 100644 --- a/packages/backend/src/core/entities/NoteEntityService.ts +++ b/packages/backend/src/core/entities/NoteEntityService.ts @@ -11,7 +11,7 @@ import type { Packed } from '@/misc/json-schema.js'; import { awaitAll } from '@/misc/prelude/await-all.js'; import type { MiUser } from '@/models/User.js'; import type { MiNote } from '@/models/Note.js'; -import type { UsersRepository, NotesRepository, FollowingsRepository, PollsRepository, PollVotesRepository, NoteReactionsRepository, ChannelsRepository, MiMeta, MiPollVote, MiPoll, MiChannel } from '@/models/_.js'; +import type { UsersRepository, NotesRepository, FollowingsRepository, PollsRepository, PollVotesRepository, NoteReactionsRepository, ChannelsRepository, MiMeta, MiPollVote, MiPoll, MiChannel, MiFollowing } from '@/models/_.js'; import { bindThis } from '@/decorators.js'; import { DebounceLoader } from '@/misc/loader.js'; import { IdService } from '@/core/IdService.js'; @@ -133,7 +133,7 @@ export class NoteEntityService implements OnModuleInit { @bindThis public async hideNote(packedNote: Packed<'Note'>, meId: MiUser['id'] | null, hint?: { - myFollowing?: ReadonlyMap, + myFollowing?: ReadonlyMap, myBlockers?: ReadonlySet, }): Promise { if (meId === packedNote.userId) return; @@ -416,7 +416,7 @@ export class NoteEntityService implements OnModuleInit { packedFiles: Map | null>; packedUsers: Map>; mentionHandles: Record; - userFollowings: Map>; + userFollowings: Map>>; userBlockers: Map>; polls: Map; pollVotes: Map>; @@ -659,9 +659,9 @@ export class NoteEntityService implements OnModuleInit { // mentionHandles this.getUserHandles(Array.from(mentionedUsers)), // userFollowings - this.cacheService.getUserFollowings(userIds), + this.cacheService.userFollowingsCache.fetchMany(userIds).then(fs => new Map(fs)), // userBlockers - this.cacheService.getUserBlockers(userIds), + this.cacheService.userBlockedCache.fetchMany(userIds).then(bs => new Map(bs)), // polls this.pollsRepository.findBy({ noteId: In(noteIds) }) .then(polls => new Map(polls.map(p => [p.noteId, p]))), diff --git a/packages/backend/src/core/entities/UserEntityService.ts b/packages/backend/src/core/entities/UserEntityService.ts index 8ed482af6f..aecbaa7fd5 100644 --- a/packages/backend/src/core/entities/UserEntityService.ts +++ b/packages/backend/src/core/entities/UserEntityService.ts @@ -79,7 +79,7 @@ function isRemoteUser(user: MiUser | { host: MiUser['host'] }): boolean { export type UserRelation = { id: MiUser['id'] - following: MiFollowing | null, + following: Omit | null, isFollowing: boolean isFollowed: boolean hasPendingFollowRequestFromYou: boolean @@ -197,16 +197,8 @@ export class UserEntityService implements OnModuleInit { memo, mutedInstances, ] = await Promise.all([ - this.followingsRepository.findOneBy({ - followerId: me, - followeeId: target, - }), - this.followingsRepository.exists({ - where: { - followerId: target, - followeeId: me, - }, - }), + this.cacheService.userFollowingsCache.fetch(me).then(f => f.get(target) ?? null), + this.cacheService.userFollowingsCache.fetch(target).then(f => f.has(me)), this.followRequestsRepository.exists({ where: { followerId: me, @@ -227,8 +219,7 @@ export class UserEntityService implements OnModuleInit { .then(mutings => mutings.has(target)), this.cacheService.renoteMutingsCache.fetch(me) .then(mutings => mutings.has(target)), - this.cacheService.userByIdCache.fetch(target, () => this.usersRepository.findOneByOrFail({ id: target })) - .then(user => user.host), + this.cacheService.findUserById(target).then(u => u.host), this.userMemosRepository.createQueryBuilder('m') .select('m.memo') .where({ userId: me, targetUserId: target }) @@ -271,13 +262,8 @@ export class UserEntityService implements OnModuleInit { memos, mutedInstances, ] = await Promise.all([ - this.followingsRepository.findBy({ followerId: me }) - .then(f => new Map(f.map(it => [it.followeeId, it]))), - this.followingsRepository.createQueryBuilder('f') - .select('f.followerId') - .where('f.followeeId = :me', { me }) - .getRawMany<{ f_followerId: string }>() - .then(it => it.map(it => it.f_followerId)), + this.cacheService.userFollowingsCache.fetch(me), + this.cacheService.userFollowersCache.fetch(me), this.followRequestsRepository.createQueryBuilder('f') .select('f.followeeId') .where('f.followerId = :me', { me }) @@ -322,7 +308,7 @@ export class UserEntityService implements OnModuleInit { id: target, following: following, isFollowing: following != null, - isFollowed: followees.includes(target), + isFollowed: followees.has(target), hasPendingFollowRequestFromYou: followersRequests.includes(target), hasPendingFollowRequestToYou: followeesRequests.includes(target), isBlocking: blockees.has(target), @@ -354,7 +340,7 @@ export class UserEntityService implements OnModuleInit { return false; // TODO } - // TODO make redis calls in MULTI? + // TODO optimization: make redis calls in MULTI @bindThis public async getNotificationsInfo(userId: MiUser['id']): Promise<{ hasUnread: boolean; @@ -789,11 +775,11 @@ export class UserEntityService implements OnModuleInit { .map(user => user.host) .filter((host): host is string => host != null)); - const _profilesFromUsers: MiUserProfile[] = []; + const _profilesFromUsers: [string, MiUserProfile][] = []; const _profilesToFetch: string[] = []; for (const user of _users) { if (user.userProfile) { - _profilesFromUsers.push(user.userProfile); + _profilesFromUsers.push([user.id, user.userProfile]); } else { _profilesToFetch.push(user.id); } @@ -803,13 +789,7 @@ export class UserEntityService implements OnModuleInit { const [profilesMap, userMemos, userRelations, pinNotes, userIdsByUri, instances, securityKeyCounts, pendingReceivedFollows, pendingSentFollows] = await Promise.all([ // profilesMap - this.cacheService.getUserProfiles(_profilesToFetch) - .then(profiles => { - for (const profile of _profilesFromUsers) { - profiles.set(profile.userId, profile); - } - return profiles; - }), + this.cacheService.userProfileCache.fetchMany(_profilesToFetch).then(profiles => new Map(profiles.concat(_profilesFromUsers))), // userMemos isDetailed && meId ? this.userMemosRepository.findBy({ userId: meId }) .then(memos => new Map(memos.map(memo => [memo.targetUserId, memo.memo]))) : new Map(), @@ -857,7 +837,7 @@ export class UserEntityService implements OnModuleInit { .groupBy('key.userId') .getRawMany<{ userId: string, userCount: number }>() .then(counts => new Map(counts.map(c => [c.userId, c.userCount]))) : new Map(), - // TODO check query performance + // TODO optimization: cache follow requests // pendingReceivedFollows isDetailedAndMe ? this.followRequestsRepository.createQueryBuilder('req') .select('req.followeeId', 'followeeId') diff --git a/packages/backend/src/misc/QuantumKVCache.ts b/packages/backend/src/misc/QuantumKVCache.ts index 6b36789f5e..b96937d6f2 100644 --- a/packages/backend/src/misc/QuantumKVCache.ts +++ b/packages/backend/src/misc/QuantumKVCache.ts @@ -21,18 +21,18 @@ export interface QuantumKVOpts { fetcher: (key: string, cache: QuantumKVCache) => T | Promise; /** - * Optional callback when a value is created or changed in the cache, either locally or elsewhere in the cluster. - * This is called *after* the cache state is updated. + * Optional callback to fetch the value for multiple keys that weren't found in the cache. * May be synchronous or async. + * If not provided, then the implementation will fall back on repeated calls to fetcher(). */ - onSet?: (key: string, cache: QuantumKVCache) => void | Promise; + bulkFetcher?: (keys: string[], cache: QuantumKVCache) => Iterable<[key: string, value: T]> | Promise>; /** - * Optional callback when a value is deleted from the cache, either locally or elsewhere in the cluster. + * Optional callback when one or more values are changed (created, updated, or deleted) in the cache, either locally or elsewhere in the cluster. * This is called *after* the cache state is updated. - * May be synchronous or async. + * Implementations may be synchronous or async. */ - onDelete?: (key: string, cache: QuantumKVCache) => void | Promise; + onChanged?: (keys: string[], cache: QuantumKVCache) => void | Promise; } /** @@ -44,8 +44,8 @@ export class QuantumKVCache implements Iterable<[key: string, value: T]> { private readonly memoryCache: MemoryKVCache; public readonly fetcher: QuantumKVOpts['fetcher']; - public readonly onSet: QuantumKVOpts['onSet']; - public readonly onDelete: QuantumKVOpts['onDelete']; + public readonly bulkFetcher: QuantumKVOpts['bulkFetcher']; + public readonly onChanged: QuantumKVOpts['onChanged']; /** * @param internalEventService Service bus to synchronize events. @@ -59,8 +59,8 @@ export class QuantumKVCache implements Iterable<[key: string, value: T]> { ) { this.memoryCache = new MemoryKVCache(opts.lifetime); this.fetcher = opts.fetcher; - this.onSet = opts.onSet; - this.onDelete = opts.onDelete; + this.bulkFetcher = opts.bulkFetcher; + this.onChanged = opts.onChanged; this.internalEventService.on('quantumCacheUpdated', this.onQuantumCacheUpdated, { // Ignore our own events, otherwise we'll immediately erase any set value. @@ -122,10 +122,10 @@ export class QuantumKVCache implements Iterable<[key: string, value: T]> { this.memoryCache.set(key, value); - await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', keys: [key] }); + await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, keys: [key] }); - if (this.onSet) { - await this.onSet(key, this); + if (this.onChanged) { + await this.onChanged([key], this); } } @@ -146,12 +146,10 @@ export class QuantumKVCache implements Iterable<[key: string, value: T]> { } if (changedKeys.length > 0) { - await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', keys: changedKeys }); + await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, keys: changedKeys }); - if (this.onSet) { - for (const key of changedKeys) { - await this.onSet(key, this); - } + if (this.onChanged) { + await this.onChanged(changedKeys, this); } } } @@ -180,12 +178,26 @@ export class QuantumKVCache implements Iterable<[key: string, value: T]> { /** * Gets a value from the local memory cache, or returns undefined if not found. + * Returns cached data only - does not make any fetches. */ @bindThis public get(key: string): T | undefined { return this.memoryCache.get(key); } + /** + * Gets multiple values from the local memory cache; returning undefined for any missing keys. + * Returns cached data only - does not make any fetches. + */ + @bindThis + public getMany(keys: Iterable): [key: string, value: T | undefined][] { + const results: [key: string, value: T | undefined][] = []; + for (const key of keys) { + results.push([key, this.get(key)]); + } + return results; + } + /** * Gets or fetches a value from the cache. * Fires an onSet event, but does not emit an update event to other processes. @@ -197,13 +209,49 @@ export class QuantumKVCache implements Iterable<[key: string, value: T]> { value = await this.fetcher(key, this); this.memoryCache.set(key, value); - if (this.onSet) { - await this.onSet(key, this); + if (this.onChanged) { + await this.onChanged([key], this); } } return value; } + /** + * Gets or fetches multiple values from the cache. + * Fires onSet events, but does not emit any update events to other processes. + */ + @bindThis + public async fetchMany(keys: Iterable): Promise<[key: string, value: T][]> { + const results: [key: string, value: T][] = []; + const toFetch: string[] = []; + + // Spliterate into cached results / uncached keys. + for (const key of keys) { + const fromCache = this.get(key); + if (fromCache) { + results.push([key, fromCache]); + } else { + toFetch.push(key); + } + } + + // Fetch any uncached keys + if (toFetch.length > 0) { + const fetched = await this.bulkFetch(toFetch); + + // Add to cache and return set + this.addMany(fetched); + results.push(...fetched); + + // Emit event + if (this.onChanged) { + await this.onChanged(toFetch, this); + } + } + + return results; + } + /** * Returns true is a key exists in memory. * This applies to the local subset view, not the cross-cluster cache state. @@ -221,10 +269,10 @@ export class QuantumKVCache implements Iterable<[key: string, value: T]> { public async delete(key: string): Promise { this.memoryCache.delete(key); - await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', keys: [key] }); + await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, keys: [key] }); - if (this.onDelete) { - await this.onDelete(key, this); + if (this.onChanged) { + await this.onChanged([key], this); } } /** @@ -233,21 +281,22 @@ export class QuantumKVCache implements Iterable<[key: string, value: T]> { * Skips if the input is empty. */ @bindThis - public async deleteMany(keys: string[]): Promise { - if (keys.length === 0) { - return; - } + public async deleteMany(keys: Iterable): Promise { + const deleted: string[] = []; for (const key of keys) { this.memoryCache.delete(key); + deleted.push(key); } - await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', keys }); + if (deleted.length === 0) { + return; + } - if (this.onDelete) { - for (const key of keys) { - await this.onDelete(key, this); - } + await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, keys: deleted }); + + if (this.onChanged) { + await this.onChanged(deleted, this); } } @@ -262,6 +311,13 @@ export class QuantumKVCache implements Iterable<[key: string, value: T]> { return value; } + @bindThis + public async refreshMany(keys: Iterable): Promise<[key: string, value: T][]> { + const values = await this.bulkFetch(keys); + await this.setMany(values); + return values; + } + /** * Erases all entries from the local memory cache. * Does not send any events or update other processes. @@ -291,19 +347,30 @@ export class QuantumKVCache implements Iterable<[key: string, value: T]> { this.memoryCache.dispose(); } + @bindThis + private async bulkFetch(keys: Iterable): Promise<[key: string, value: T][]> { + if (this.bulkFetcher) { + const results = await this.bulkFetcher(Array.from(keys), this); + return Array.from(results); + } + + const results: [key: string, value: T][] = []; + for (const key of keys) { + const value = await this.fetcher(key, this); + results.push([key, value]); + } + return results; + } + @bindThis private async onQuantumCacheUpdated(data: InternalEventTypes['quantumCacheUpdated']): Promise { if (data.name === this.name) { for (const key of data.keys) { this.memoryCache.delete(key); + } - if (data.op === 's' && this.onSet) { - await this.onSet(key, this); - } - - if (data.op === 'd' && this.onDelete) { - await this.onDelete(key, this); - } + if (this.onChanged) { + await this.onChanged(data.keys, this); } } } diff --git a/packages/backend/src/misc/cache.ts b/packages/backend/src/misc/cache.ts index 932c0b409a..666e684c1c 100644 --- a/packages/backend/src/misc/cache.ts +++ b/packages/backend/src/misc/cache.ts @@ -5,8 +5,6 @@ import * as Redis from 'ioredis'; import { bindThis } from '@/decorators.js'; -import { InternalEventService } from '@/core/InternalEventService.js'; -import { InternalEventTypes } from '@/core/GlobalEventService.js'; export class RedisKVCache { private readonly lifetime: number; @@ -120,9 +118,9 @@ export class RedisKVCache { export class RedisSingleCache { private readonly lifetime: number; private readonly memoryCache: MemorySingleCache; - private readonly fetcher: () => Promise; - private readonly toRedisConverter: (value: T) => string; - private readonly fromRedisConverter: (value: string) => T | undefined; + public readonly fetcher: () => Promise; + public readonly toRedisConverter: (value: T) => string; + public readonly fromRedisConverter: (value: string) => T | undefined; constructor( private redisClient: Redis.Redis, @@ -245,6 +243,16 @@ export class MemoryKVCache { return cached.value; } + public has(key: string): boolean { + const cached = this.cache.get(key); + if (cached == null) return false; + if ((Date.now() - cached.date) > this.lifetime) { + this.cache.delete(key); + return false; + } + return true; + } + @bindThis public delete(key: string): void { this.cache.delete(key); diff --git a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts index 6a1a8bcc66..5bf64e4f04 100644 --- a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts +++ b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts @@ -18,6 +18,7 @@ import { SearchService } from '@/core/SearchService.js'; import { ApLogService } from '@/core/ApLogService.js'; import { ReactionService } from '@/core/ReactionService.js'; import { QueueService } from '@/core/QueueService.js'; +import { CacheService } from '@/core/CacheService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbUserDeleteJobData } from '../types.js'; @@ -94,6 +95,7 @@ export class DeleteAccountProcessorService { private searchService: SearchService, private reactionService: ReactionService, private readonly apLogService: ApLogService, + private readonly cacheService: CacheService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('delete-account'); } @@ -140,6 +142,22 @@ export class DeleteAccountProcessorService { } { // Delete user relations + await this.cacheService.refreshFollowRelationsFor(user.id); + await this.cacheService.userFollowingsCache.delete(user.id); + await this.cacheService.userFollowingsCache.delete(user.id); + await this.cacheService.userBlockingCache.delete(user.id); + await this.cacheService.userBlockedCache.delete(user.id); + await this.cacheService.userMutingsCache.delete(user.id); + await this.cacheService.userMutingsCache.delete(user.id); + await this.cacheService.hibernatedUserCache.delete(user.id); + await this.cacheService.renoteMutingsCache.delete(user.id); + await this.cacheService.userProfileCache.delete(user.id); + this.cacheService.userByIdCache.delete(user.id); + this.cacheService.localUserByIdCache.delete(user.id); + if (user.token) { + this.cacheService.localUserByNativeTokenCache.delete(user.token); + } + await this.followingsRepository.delete({ followerId: user.id, }); diff --git a/packages/backend/src/server/api/endpoints/following/delete.ts b/packages/backend/src/server/api/endpoints/following/delete.ts index ba146b6703..442352a4d2 100644 --- a/packages/backend/src/server/api/endpoints/following/delete.ts +++ b/packages/backend/src/server/api/endpoints/following/delete.ts @@ -12,6 +12,7 @@ import { UserFollowingService } from '@/core/UserFollowingService.js'; import { DI } from '@/di-symbols.js'; import { GetterService } from '@/server/api/GetterService.js'; import { ApiError } from '../../error.js'; +import { CacheService } from '@/core/CacheService.js'; export const meta = { tags: ['following', 'users'], @@ -69,6 +70,7 @@ export default class extends Endpoint { // eslint- private userEntityService: UserEntityService, private getterService: GetterService, private userFollowingService: UserFollowingService, + private readonly cacheService: CacheService, ) { super(meta, paramDef, async (ps, me) => { const follower = me; @@ -85,12 +87,7 @@ export default class extends Endpoint { // eslint- }); // Check not following - const exist = await this.followingsRepository.exists({ - where: { - followerId: follower.id, - followeeId: followee.id, - }, - }); + const exist = await this.cacheService.userFollowingsCache.fetch(follower.id).then(f => f.has(followee.id)); if (!exist) { throw new ApiError(meta.errors.notFollowing); diff --git a/packages/backend/src/server/api/endpoints/following/invalidate.ts b/packages/backend/src/server/api/endpoints/following/invalidate.ts index b45d21410b..3809bf29b0 100644 --- a/packages/backend/src/server/api/endpoints/following/invalidate.ts +++ b/packages/backend/src/server/api/endpoints/following/invalidate.ts @@ -11,6 +11,7 @@ import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { UserFollowingService } from '@/core/UserFollowingService.js'; import { DI } from '@/di-symbols.js'; import { GetterService } from '@/server/api/GetterService.js'; +import { CacheService } from '@/core/CacheService.js'; import { ApiError } from '../../error.js'; export const meta = { @@ -69,6 +70,7 @@ export default class extends Endpoint { // eslint- private userEntityService: UserEntityService, private getterService: GetterService, private userFollowingService: UserFollowingService, + private readonly cacheService: CacheService, ) { super(meta, paramDef, async (ps, me) => { const followee = me; @@ -85,12 +87,9 @@ export default class extends Endpoint { // eslint- }); // Check not following - const exist = await this.followingsRepository.findOneBy({ - followerId: follower.id, - followeeId: followee.id, - }); + const isFollowing = await this.cacheService.userFollowingsCache.fetch(follower.id).then(f => f.has(followee.id)); - if (exist == null) { + if (!isFollowing) { throw new ApiError(meta.errors.notFollowing); } diff --git a/packages/backend/src/server/api/endpoints/following/update-all.ts b/packages/backend/src/server/api/endpoints/following/update-all.ts index c953feb393..a02b51cc79 100644 --- a/packages/backend/src/server/api/endpoints/following/update-all.ts +++ b/packages/backend/src/server/api/endpoints/following/update-all.ts @@ -12,6 +12,7 @@ import { UserFollowingService } from '@/core/UserFollowingService.js'; import { DI } from '@/di-symbols.js'; import { GetterService } from '@/server/api/GetterService.js'; import { ApiError } from '../../error.js'; +import { CacheService } from '@/core/CacheService.js'; export const meta = { tags: ['following', 'users'], @@ -39,6 +40,7 @@ export default class extends Endpoint { // eslint- constructor( @Inject(DI.followingsRepository) private followingsRepository: FollowingsRepository, + private readonly cacheService: CacheService, ) { super(meta, paramDef, async (ps, me) => { await this.followingsRepository.update({ @@ -48,6 +50,8 @@ export default class extends Endpoint { // eslint- withReplies: ps.withReplies != null ? ps.withReplies : undefined, }); + await this.cacheService.refreshFollowRelationsFor(me.id); + return; }); } diff --git a/packages/backend/src/server/api/endpoints/following/update.ts b/packages/backend/src/server/api/endpoints/following/update.ts index d62cf210ed..f4ca21856f 100644 --- a/packages/backend/src/server/api/endpoints/following/update.ts +++ b/packages/backend/src/server/api/endpoints/following/update.ts @@ -11,6 +11,7 @@ import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { UserFollowingService } from '@/core/UserFollowingService.js'; import { DI } from '@/di-symbols.js'; import { GetterService } from '@/server/api/GetterService.js'; +import { CacheService } from '@/core/CacheService.js'; import { ApiError } from '../../error.js'; export const meta = { @@ -71,6 +72,7 @@ export default class extends Endpoint { // eslint- private userEntityService: UserEntityService, private getterService: GetterService, private userFollowingService: UserFollowingService, + private readonly cacheService: CacheService, ) { super(meta, paramDef, async (ps, me) => { const follower = me; @@ -87,10 +89,7 @@ export default class extends Endpoint { // eslint- }); // Check not following - const exist = await this.followingsRepository.findOneBy({ - followerId: follower.id, - followeeId: followee.id, - }); + const exist = await this.cacheService.userFollowingsCache.fetch(follower.id).then(f => f.get(followee.id)); if (exist == null) { throw new ApiError(meta.errors.notFollowing); @@ -103,6 +102,8 @@ export default class extends Endpoint { // eslint- withReplies: ps.withReplies != null ? ps.withReplies : undefined, }); + await this.cacheService.refreshFollowRelationsFor(follower.id); + return await this.userEntityService.pack(follower.id, me); }); } diff --git a/packages/backend/src/server/api/endpoints/users/followers.ts b/packages/backend/src/server/api/endpoints/users/followers.ts index c1617e14e5..82ce282bfc 100644 --- a/packages/backend/src/server/api/endpoints/users/followers.ts +++ b/packages/backend/src/server/api/endpoints/users/followers.ts @@ -12,6 +12,7 @@ import { FollowingEntityService } from '@/core/entities/FollowingEntityService.j import { UtilityService } from '@/core/UtilityService.js'; import { DI } from '@/di-symbols.js'; import { RoleService } from '@/core/RoleService.js'; +import { CacheService } from '@/core/CacheService.js'; import { ApiError } from '../../error.js'; export const meta = { @@ -89,6 +90,7 @@ export default class extends Endpoint { // eslint- private followingEntityService: FollowingEntityService, private queryService: QueryService, private roleService: RoleService, + private readonly cacheService: CacheService, ) { super(meta, paramDef, async (ps, me) => { const user = await this.usersRepository.findOneBy(ps.userId != null @@ -110,12 +112,7 @@ export default class extends Endpoint { // eslint- if (me == null) { throw new ApiError(meta.errors.forbidden); } else if (me.id !== user.id) { - const isFollowing = await this.followingsRepository.exists({ - where: { - followeeId: user.id, - followerId: me.id, - }, - }); + const isFollowing = await this.cacheService.userFollowingsCache.fetch(me.id).then(f => f.has(user.id)); if (!isFollowing) { throw new ApiError(meta.errors.forbidden); } diff --git a/packages/backend/src/server/api/endpoints/users/following.ts b/packages/backend/src/server/api/endpoints/users/following.ts index c292c6d6a3..80f0b0c484 100644 --- a/packages/backend/src/server/api/endpoints/users/following.ts +++ b/packages/backend/src/server/api/endpoints/users/following.ts @@ -13,6 +13,7 @@ import { FollowingEntityService } from '@/core/entities/FollowingEntityService.j import { UtilityService } from '@/core/UtilityService.js'; import { DI } from '@/di-symbols.js'; import { RoleService } from '@/core/RoleService.js'; +import { CacheService } from '@/core/CacheService.js'; import { ApiError } from '../../error.js'; export const meta = { @@ -98,6 +99,7 @@ export default class extends Endpoint { // eslint- private followingEntityService: FollowingEntityService, private queryService: QueryService, private roleService: RoleService, + private readonly cacheService: CacheService, ) { super(meta, paramDef, async (ps, me) => { const user = await this.usersRepository.findOneBy(ps.userId != null @@ -119,12 +121,7 @@ export default class extends Endpoint { // eslint- if (me == null) { throw new ApiError(meta.errors.forbidden); } else if (me.id !== user.id) { - const isFollowing = await this.followingsRepository.exists({ - where: { - followeeId: user.id, - followerId: me.id, - }, - }); + const isFollowing = await this.cacheService.userFollowingsCache.fetch(me.id).then(f => f.has(user.id)); if (!isFollowing) { throw new ApiError(meta.errors.forbidden); } diff --git a/packages/backend/src/server/api/endpoints/users/recommendation.ts b/packages/backend/src/server/api/endpoints/users/recommendation.ts index 642d788459..52dd2197b2 100644 --- a/packages/backend/src/server/api/endpoints/users/recommendation.ts +++ b/packages/backend/src/server/api/endpoints/users/recommendation.ts @@ -71,6 +71,7 @@ export default class extends Endpoint { // eslint- this.queryService.generateBlockQueryForUsers(query, me); this.queryService.generateBlockedUserQueryForNotes(query, me); + // TODO optimization: replace with exists() const followingQuery = this.followingsRepository.createQueryBuilder('following') .select('following.followeeId') .where('following.followerId = :followerId', { followerId: me.id }); diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts index 21437850d3..0ee7078eb2 100644 --- a/packages/backend/src/server/api/stream/Connection.ts +++ b/packages/backend/src/server/api/stream/Connection.ts @@ -36,7 +36,7 @@ export default class Connection { private channels = new Map(); private subscribingNotes = new Map(); public userProfile: MiUserProfile | null = null; - public following: Map = new Map(); + public following: Map> = new Map(); public followingChannels: Set = new Set(); public userIdsWhoMeMuting: Set = new Set(); public userIdsWhoBlockingMe: Set = new Set(); diff --git a/packages/backend/test/misc/noOpCaches.ts b/packages/backend/test/misc/noOpCaches.ts index 40c5d2dc65..f3cc1e2ba2 100644 --- a/packages/backend/test/misc/noOpCaches.ts +++ b/packages/backend/test/misc/noOpCaches.ts @@ -6,13 +6,14 @@ import * as Redis from 'ioredis'; import { Inject } from '@nestjs/common'; import { FakeInternalEventService } from './FakeInternalEventService.js'; -import type { BlockingsRepository, FollowingsRepository, MiUser, MiUserProfile, MutingsRepository, RenoteMutingsRepository, UserProfilesRepository, UsersRepository } from '@/models/_.js'; +import type { BlockingsRepository, FollowingsRepository, MiUser, MutingsRepository, RenoteMutingsRepository, UserProfilesRepository, UsersRepository } from '@/models/_.js'; import type { MiLocalUser } from '@/models/User.js'; import { MemoryKVCache, MemorySingleCache, RedisKVCache, RedisSingleCache } from '@/misc/cache.js'; import { QuantumKVCache, QuantumKVOpts } from '@/misc/QuantumKVCache.js'; -import { CacheService, CachedTranslationEntity, FollowStats } from '@/core/CacheService.js'; +import { CacheService, FollowStats } from '@/core/CacheService.js'; import { DI } from '@/di-symbols.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; +import { InternalEventService } from '@/core/InternalEventService.js'; export function noOpRedis() { return { @@ -76,55 +77,16 @@ export class NoOpCacheService extends CacheService { this.localUserByNativeTokenCache = new NoOpMemoryKVCache(); this.localUserByIdCache = new NoOpMemoryKVCache(); this.uriPersonCache = new NoOpMemoryKVCache(); - this.userProfileCache = new NoOpQuantumKVCache({ - internalEventService: fakeInternalEventService, - fetcher: this.userProfileCache.fetcher, - onSet: this.userProfileCache.onSet, - onDelete: this.userProfileCache.onDelete, - }); - this.userMutingsCache = new NoOpQuantumKVCache>({ - internalEventService: fakeInternalEventService, - fetcher: this.userMutingsCache.fetcher, - onSet: this.userMutingsCache.onSet, - onDelete: this.userMutingsCache.onDelete, - }); - this.userBlockingCache = new NoOpQuantumKVCache>({ - internalEventService: fakeInternalEventService, - fetcher: this.userBlockingCache.fetcher, - onSet: this.userBlockingCache.onSet, - onDelete: this.userBlockingCache.onDelete, - }); - this.userBlockedCache = new NoOpQuantumKVCache>({ - internalEventService: fakeInternalEventService, - fetcher: this.userBlockedCache.fetcher, - onSet: this.userBlockedCache.onSet, - onDelete: this.userBlockedCache.onDelete, - }); - this.renoteMutingsCache = new NoOpQuantumKVCache>({ - internalEventService: fakeInternalEventService, - fetcher: this.renoteMutingsCache.fetcher, - onSet: this.renoteMutingsCache.onSet, - onDelete: this.renoteMutingsCache.onDelete, - }); - this.userFollowingsCache = new NoOpQuantumKVCache>({ - internalEventService: fakeInternalEventService, - fetcher: this.userFollowingsCache.fetcher, - onSet: this.userFollowingsCache.onSet, - onDelete: this.userFollowingsCache.onDelete, - }); - this.userFollowersCache = new NoOpQuantumKVCache>({ - internalEventService: fakeInternalEventService, - fetcher: this.userFollowersCache.fetcher, - onSet: this.userFollowersCache.onSet, - onDelete: this.userFollowersCache.onDelete, - }); + this.userProfileCache = NoOpQuantumKVCache.copy(this.userProfileCache, fakeInternalEventService); + this.userMutingsCache = NoOpQuantumKVCache.copy(this.userMutingsCache, fakeInternalEventService); + this.userBlockingCache = NoOpQuantumKVCache.copy(this.userBlockingCache, fakeInternalEventService); + this.userBlockedCache = NoOpQuantumKVCache.copy(this.userBlockedCache, fakeInternalEventService); + this.renoteMutingsCache = NoOpQuantumKVCache.copy(this.renoteMutingsCache, fakeInternalEventService); + this.userFollowingsCache = NoOpQuantumKVCache.copy(this.userFollowingsCache, fakeInternalEventService); + this.userFollowersCache = NoOpQuantumKVCache.copy(this.userFollowersCache, fakeInternalEventService); + this.hibernatedUserCache = NoOpQuantumKVCache.copy(this.hibernatedUserCache, fakeInternalEventService); this.userFollowStatsCache = new NoOpMemoryKVCache(); - this.translationsCache = new NoOpRedisKVCache({ - redis: fakeRedis, - fetcher: this.translationsCache.fetcher, - toRedisConverter: this.translationsCache.toRedisConverter, - fromRedisConverter: this.translationsCache.fromRedisConverter, - }); + this.translationsCache = NoOpRedisKVCache.copy(this.translationsCache, fakeRedis); } } @@ -159,17 +121,26 @@ export class NoOpRedisKVCache extends RedisKVCache { }, ); } + + public static copy(cache: RedisKVCache, redis?: Redis.Redis): NoOpRedisKVCache { + return new NoOpRedisKVCache({ + redis, + fetcher: cache.fetcher, + toRedisConverter: cache.toRedisConverter, + fromRedisConverter: cache.fromRedisConverter, + }); + } } export class NoOpRedisSingleCache extends RedisSingleCache { constructor(opts?: { - fakeRedis?: Redis.Redis; + redis?: Redis.Redis; fetcher?: RedisSingleCache['fetcher']; toRedisConverter?: RedisSingleCache['toRedisConverter']; fromRedisConverter?: RedisSingleCache['fromRedisConverter']; }) { super( - opts?.fakeRedis ?? noOpRedis(), + opts?.redis ?? noOpRedis(), 'no-op', { lifetime: -1, @@ -180,24 +151,37 @@ export class NoOpRedisSingleCache extends RedisSingleCache { }, ); } + + public static copy(cache: RedisSingleCache, redis?: Redis.Redis): NoOpRedisSingleCache { + return new NoOpRedisSingleCache({ + redis, + fetcher: cache.fetcher, + toRedisConverter: cache.toRedisConverter, + fromRedisConverter: cache.fromRedisConverter, + }); + } } export class NoOpQuantumKVCache extends QuantumKVCache { - constructor(opts: { - internalEventService?: FakeInternalEventService, - fetcher: QuantumKVOpts['fetcher'], - onSet?: QuantumKVOpts['onSet'], - onDelete?: QuantumKVOpts['onDelete'], + constructor(opts: Omit, 'lifetime'> & { + internalEventService?: InternalEventService, }) { super( opts.internalEventService ?? new FakeInternalEventService(), 'no-op', { + ...opts, lifetime: -1, - fetcher: opts.fetcher, - onSet: opts.onSet, - onDelete: opts.onDelete, }, ); } + + public static copy(cache: QuantumKVCache, internalEventService?: InternalEventService): NoOpQuantumKVCache { + return new NoOpQuantumKVCache({ + internalEventService, + fetcher: cache.fetcher, + bulkFetcher: cache.bulkFetcher, + onChanged: cache.onChanged, + }); + } } diff --git a/packages/backend/test/unit/misc/QuantumKVCache.ts b/packages/backend/test/unit/misc/QuantumKVCache.ts index 72997494ce..92792171be 100644 --- a/packages/backend/test/unit/misc/QuantumKVCache.ts +++ b/packages/backend/test/unit/misc/QuantumKVCache.ts @@ -73,19 +73,19 @@ describe(QuantumKVCache, () => { await cache.set('foo', 'bar'); - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }]]); + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo'] }]]); }); - it('should call onSet when storing', async () => { - const fakeOnSet = jest.fn(() => Promise.resolve()); + it('should call onChanged when storing', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); const cache = makeCache({ name: 'fake', - onSet: fakeOnSet, + onChanged: fakeOnChanged, }); await cache.set('foo', 'bar'); - expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); + expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], cache); }); it('should not emit event when storing unchanged value', async () => { @@ -97,17 +97,17 @@ describe(QuantumKVCache, () => { expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); }); - it('should not call onSet when storing unchanged value', async () => { - const fakeOnSet = jest.fn(() => Promise.resolve()); + it('should not call onChanged when storing unchanged value', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); const cache = makeCache({ name: 'fake', - onSet: fakeOnSet, + onChanged: fakeOnChanged, }); await cache.set('foo', 'bar'); await cache.set('foo', 'bar'); - expect(fakeOnSet).toHaveBeenCalledTimes(1); + expect(fakeOnChanged).toHaveBeenCalledTimes(1); }); it('should fetch an unknown value', async () => { @@ -133,17 +133,17 @@ describe(QuantumKVCache, () => { expect(result).toBe(true); }); - it('should call onSet when fetching', async () => { - const fakeOnSet = jest.fn(() => Promise.resolve()); + it('should call onChanged when fetching', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); const cache = makeCache({ name: 'fake', fetcher: key => `value#${key}`, - onSet: fakeOnSet, + onChanged: fakeOnChanged, }); await cache.fetch('foo'); - expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); + expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], cache); }); it('should not emit event when fetching', async () => { @@ -154,7 +154,7 @@ describe(QuantumKVCache, () => { await cache.fetch('foo'); - expect(fakeInternalEventService._calls).not.toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }]]); + expect(fakeInternalEventService._calls).not.toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo'] }]]); }); it('should delete from memory cache', async () => { @@ -167,17 +167,17 @@ describe(QuantumKVCache, () => { expect(result).toBe(false); }); - it('should call onDelete when deleting', async () => { - const fakeOnDelete = jest.fn(() => Promise.resolve()); + it('should call onChanged when deleting', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); const cache = makeCache({ name: 'fake', - onDelete: fakeOnDelete, + onChanged: fakeOnChanged, }); await cache.set('foo', 'bar'); await cache.delete('foo'); - expect(fakeOnDelete).toHaveBeenCalledWith('foo', cache); + expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], cache); }); it('should emit event when deleting', async () => { @@ -186,52 +186,52 @@ describe(QuantumKVCache, () => { await cache.set('foo', 'bar'); await cache.delete('foo'); - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 'd', keys: ['foo'] }]]); + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo'] }]]); }); it('should delete when receiving set event', async () => { const cache = makeCache({ name: 'fake' }); await cache.set('foo', 'bar'); - await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }); + await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', keys: ['foo'] }); const result = cache.has('foo'); expect(result).toBe(false); }); - it('should call onSet when receiving set event', async () => { - const fakeOnSet = jest.fn(() => Promise.resolve()); + it('should call onChanged when receiving set event', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); const cache = makeCache({ name: 'fake', - onSet: fakeOnSet, + onChanged: fakeOnChanged, }); - await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }); + await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', keys: ['foo'] }); - expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); + expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], cache); }); it('should delete when receiving delete event', async () => { const cache = makeCache({ name: 'fake' }); await cache.set('foo', 'bar'); - await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 'd', keys: ['foo'] }); + await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', keys: ['foo'] }); const result = cache.has('foo'); expect(result).toBe(false); }); - it('should call onDelete when receiving delete event', async () => { - const fakeOnDelete = jest.fn(() => Promise.resolve()); + it('should call onChanged when receiving delete event', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); const cache = makeCache({ name: 'fake', - onDelete: fakeOnDelete, + onChanged: fakeOnChanged, }); await cache.set('foo', 'bar'); - await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 'd', keys: ['foo'] }); + await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', keys: ['foo'] }); - expect(fakeOnDelete).toHaveBeenCalledWith('foo', cache); + expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], cache); }); describe('get', () => { @@ -269,40 +269,243 @@ describe(QuantumKVCache, () => { await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo', 'alpha'] }]]); + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo', 'alpha'] }]]); expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); }); - it('should call onSet for each item', async () => { - const fakeOnSet = jest.fn(() => Promise.resolve()); + it('should call onChanged once with all items', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); const cache = makeCache({ name: 'fake', - onSet: fakeOnSet, + onChanged: fakeOnChanged, }); await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); - expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); - expect(fakeOnSet).toHaveBeenCalledWith('alpha', cache); + expect(fakeOnChanged).toHaveBeenCalledWith(['foo', 'alpha'], cache); + expect(fakeOnChanged).toHaveBeenCalledTimes(1); }); it('should emit events only for changed items', async () => { - const fakeOnSet = jest.fn(() => Promise.resolve()); + const fakeOnChanged = jest.fn(() => Promise.resolve()); const cache = makeCache({ name: 'fake', - onSet: fakeOnSet, + onChanged: fakeOnChanged, }); await cache.set('foo', 'bar'); - fakeOnSet.mockClear(); + fakeOnChanged.mockClear(); fakeInternalEventService._reset(); await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['alpha'] }]]); + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['alpha'] }]]); + expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); + expect(fakeOnChanged).toHaveBeenCalledWith(['alpha'], cache); + expect(fakeOnChanged).toHaveBeenCalledTimes(1); + }); + }); + + describe('getMany', () => { + it('should return empty for empty input', () => { + const cache = makeCache(); + const result = cache.getMany([]); + expect(result).toEqual([]); + }); + + it('should return the value for all keys', () => { + const cache = makeCache(); + cache.add('foo', 'bar'); + cache.add('alpha', 'omega'); + + const result = cache.getMany(['foo', 'alpha']); + + expect(result).toEqual([['foo', 'bar'], ['alpha', 'omega']]); + }); + + it('should return undefined for missing keys', () => { + const cache = makeCache(); + cache.add('foo', 'bar'); + + const result = cache.getMany(['foo', 'alpha']); + + expect(result).toEqual([['foo', 'bar'], ['alpha', undefined]]); + }); + }); + + describe('fetchMany', () => { + it('should do nothing for empty input', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onChanged: fakeOnChanged, + }); + + await cache.fetchMany([]); + + expect(fakeOnChanged).not.toHaveBeenCalled(); + expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); + }); + + it('should return existing items', async () => { + const cache = makeCache(); + cache.add('foo', 'bar'); + cache.add('alpha', 'omega'); + + const result = await cache.fetchMany(['foo', 'alpha']); + + expect(result).toEqual([['foo', 'bar'], ['alpha', 'omega']]); + }); + + it('should return existing items without events', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onChanged: fakeOnChanged, + }); + cache.add('foo', 'bar'); + cache.add('alpha', 'omega'); + + await cache.fetchMany(['foo', 'alpha']); + + expect(fakeOnChanged).not.toHaveBeenCalled(); + expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); + }); + + it('should call bulkFetcher for missing items', async () => { + const cache = makeCache({ + bulkFetcher: keys => keys.map(k => [k, `${k}#many`]), + fetcher: key => `${key}#single`, + }); + + const results = await cache.fetchMany(['foo', 'alpha']); + + expect(results).toEqual([['foo', 'foo#many'], ['alpha', 'alpha#many']]); + }); + + it('should call bulkFetcher only once', async () => { + const mockBulkFetcher = jest.fn((keys: string[]) => keys.map(k => [k, `${k}#value`] as [string, string])); + const cache = makeCache({ + bulkFetcher: mockBulkFetcher, + }); + + await cache.fetchMany(['foo', 'bar']); + + expect(mockBulkFetcher).toHaveBeenCalledTimes(1); + }); + + it('should call fetcher when fetchMany is undefined', async () => { + const cache = makeCache({ + fetcher: key => `${key}#single`, + }); + + const results = await cache.fetchMany(['foo', 'alpha']); + + expect(results).toEqual([['foo', 'foo#single'], ['alpha', 'alpha#single']]); + }); + + it('should call onChanged', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onChanged: fakeOnChanged, + fetcher: k => k, + }); + + await cache.fetchMany(['foo', 'alpha']); + + expect(fakeOnChanged).toHaveBeenCalledWith(['foo', 'alpha'], cache); + expect(fakeOnChanged).toHaveBeenCalledTimes(1); + }); + + it('should call onChanged only for changed', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onChanged: fakeOnChanged, + fetcher: k => k, + }); + cache.add('foo', 'bar'); + + await cache.fetchMany(['foo', 'alpha']); + + expect(fakeOnChanged).toHaveBeenCalledWith(['alpha'], cache); + expect(fakeOnChanged).toHaveBeenCalledTimes(1); + }); + + it('should not emit event', async () => { + const cache = makeCache({ + fetcher: k => k, + }); + + await cache.fetchMany(['foo', 'alpha']); + + expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); + }); + }); + + describe('refreshMany', () => { + it('should do nothing for empty input', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + onChanged: fakeOnChanged, + }); + + const result = await cache.refreshMany([]); + + expect(result).toEqual([]); + expect(fakeOnChanged).not.toHaveBeenCalled(); + expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); + }); + + it('should call bulkFetcher for all keys', async () => { + const mockBulkFetcher = jest.fn((keys: string[]) => keys.map(k => [k, `${k}#value`] as [string, string])); + const cache = makeCache({ + bulkFetcher: mockBulkFetcher, + }); + + const result = await cache.refreshMany(['foo', 'alpha']); + + expect(result).toEqual([['foo', 'foo#value'], ['alpha', 'alpha#value']]); + expect(mockBulkFetcher).toHaveBeenCalledWith(['foo', 'alpha'], cache); + expect(mockBulkFetcher).toHaveBeenCalledTimes(1); + }); + + it('should replace any existing keys', async () => { + const mockBulkFetcher = jest.fn((keys: string[]) => keys.map(k => [k, `${k}#value`] as [string, string])); + const cache = makeCache({ + bulkFetcher: mockBulkFetcher, + }); + cache.add('foo', 'bar'); + + const result = await cache.refreshMany(['foo', 'alpha']); + + expect(result).toEqual([['foo', 'foo#value'], ['alpha', 'alpha#value']]); + expect(mockBulkFetcher).toHaveBeenCalledWith(['foo', 'alpha'], cache); + expect(mockBulkFetcher).toHaveBeenCalledTimes(1); + }); + + it('should call onChanged for all keys', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + bulkFetcher: keys => keys.map(k => [k, `${k}#value`]), + onChanged: fakeOnChanged, + }); + cache.add('foo', 'bar'); + + await cache.refreshMany(['foo', 'alpha']); + + expect(fakeOnChanged).toHaveBeenCalledWith(['foo', 'alpha'], cache); + expect(fakeOnChanged).toHaveBeenCalledTimes(1); + }); + + it('should emit event for all keys', async () => { + const cache = makeCache({ + name: 'fake', + bulkFetcher: keys => keys.map(k => [k, `${k}#value`]), + }); + cache.add('foo', 'bar'); + + await cache.refreshMany(['foo', 'alpha']); + + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo', 'alpha'] }]]); expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); - expect(fakeOnSet).toHaveBeenCalledWith('alpha', cache); - expect(fakeOnSet).toHaveBeenCalledTimes(1); }); }); @@ -325,33 +528,33 @@ describe(QuantumKVCache, () => { await cache.deleteMany(['foo', 'alpha']); - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 'd', keys: ['foo', 'alpha'] }]]); + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo', 'alpha'] }]]); expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); }); - it('should call onDelete for each key', async () => { - const fakeOnDelete = jest.fn(() => Promise.resolve()); + it('should call onChanged once with all items', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); const cache = makeCache({ name: 'fake', - onDelete: fakeOnDelete, + onChanged: fakeOnChanged, }); await cache.deleteMany(['foo', 'alpha']); - expect(fakeOnDelete).toHaveBeenCalledWith('foo', cache); - expect(fakeOnDelete).toHaveBeenCalledWith('alpha', cache); + expect(fakeOnChanged).toHaveBeenCalledWith(['foo', 'alpha'], cache); + expect(fakeOnChanged).toHaveBeenCalledTimes(1); }); it('should do nothing if no keys are provided', async () => { - const fakeOnDelete = jest.fn(() => Promise.resolve()); + const fakeOnChanged = jest.fn(() => Promise.resolve()); const cache = makeCache({ name: 'fake', - onDelete: fakeOnDelete, + onChanged: fakeOnChanged, }); await cache.deleteMany([]); - expect(fakeOnDelete).not.toHaveBeenCalled(); + expect(fakeOnChanged).not.toHaveBeenCalled(); expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); }); }); @@ -392,17 +595,17 @@ describe(QuantumKVCache, () => { expect(result).toBe('value#foo'); }); - it('should call onSet', async () => { - const fakeOnSet = jest.fn(() => Promise.resolve()); + it('should call onChanged', async () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); const cache = makeCache({ name: 'fake', fetcher: key => `value#${key}`, - onSet: fakeOnSet, + onChanged: fakeOnChanged, }); await cache.refresh('foo'); - expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); + expect(fakeOnChanged).toHaveBeenCalledWith(['foo'], cache); }); it('should emit event', async () => { @@ -413,7 +616,7 @@ describe(QuantumKVCache, () => { await cache.refresh('foo'); - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }]]); + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', keys: ['foo'] }]]); }); }); @@ -434,15 +637,15 @@ describe(QuantumKVCache, () => { expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); }); - it('should not call onSet', () => { - const fakeOnSet = jest.fn(() => Promise.resolve()); + it('should not call onChanged', () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); const cache = makeCache({ - onSet: fakeOnSet, + onChanged: fakeOnChanged, }); cache.add('foo', 'bar'); - expect(fakeOnSet).not.toHaveBeenCalled(); + expect(fakeOnChanged).not.toHaveBeenCalled(); }); }); @@ -466,15 +669,15 @@ describe(QuantumKVCache, () => { expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); }); - it('should not call onSet', () => { - const fakeOnSet = jest.fn(() => Promise.resolve()); + it('should not call onChanged', () => { + const fakeOnChanged = jest.fn(() => Promise.resolve()); const cache = makeCache({ - onSet: fakeOnSet, + onChanged: fakeOnChanged, }); cache.addMany([['foo', 'bar'], ['alpha', 'omega']]); - expect(fakeOnSet).not.toHaveBeenCalled(); + expect(fakeOnChanged).not.toHaveBeenCalled(); }); }); -- cgit v1.2.3-freya From 51572b731426080da51b8e14e0ad8d28b2e25c40 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Mon, 9 Jun 2025 11:26:21 -0400 Subject: fix refactoring mistake in CacheService.ts --- packages/backend/src/core/CacheService.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'packages/backend/src/core/CacheService.ts') diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index 9c68597441..48aa464e82 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -159,7 +159,7 @@ export class CacheService implements OnApplicationShutdown { } group.set(f.followeeId, f); return groups; - }, {} as Map>>)), + }, new Map>>)), }); this.userFollowersCache = new QuantumKVCache>>(this.internalEventService, 'userFollowers', { @@ -176,7 +176,7 @@ export class CacheService implements OnApplicationShutdown { } group.set(f.followerId, f); return groups; - }, {} as Map>>)), + }, new Map>>)), }); this.hibernatedUserCache = new QuantumKVCache(this.internalEventService, 'hibernatedUsers', { -- cgit v1.2.3-freya From 646ffa7b622ff816a3c0e18322c73a04c4c5f48d Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Mon, 9 Jun 2025 11:45:20 -0400 Subject: fix missing @bindThis in CacheService --- packages/backend/src/core/CacheService.ts | 2 ++ 1 file changed, 2 insertions(+) (limited to 'packages/backend/src/core/CacheService.ts') diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index 48aa464e82..2d37cd6bab 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -291,6 +291,7 @@ export class CacheService implements OnApplicationShutdown { } } + @bindThis private async onTokenEvent(body: InternalEventTypes[E]): Promise { { { @@ -303,6 +304,7 @@ export class CacheService implements OnApplicationShutdown { } } + @bindThis private async onFollowEvent(body: InternalEventTypes[E], type: E): Promise { { switch (type) { -- cgit v1.2.3-freya