diff options
| author | Julia <julia@insertdomain.name> | 2025-06-19 21:35:18 +0000 |
|---|---|---|
| committer | Julia <julia@insertdomain.name> | 2025-06-19 21:35:18 +0000 |
| commit | a77c32b17da63d3932b219f74152cce023a30f4a (patch) | |
| tree | d2a05796e942c8f250bbd01369eab0cbe5a14531 /packages/backend/src/core/CacheService.ts | |
| parent | merge: release 2025.4.2 (!1051) (diff) | |
| parent | Merge branch 'develop' into release/2025.4.3 (diff) | |
| download | sharkey-a77c32b17da63d3932b219f74152cce023a30f4a.tar.gz sharkey-a77c32b17da63d3932b219f74152cce023a30f4a.tar.bz2 sharkey-a77c32b17da63d3932b219f74152cce023a30f4a.zip | |
merge: prepare release 2025.4.3 (!1125)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/1125
Approved-by: Marie <github@yuugi.dev>
Approved-by: Julia <julia@insertdomain.name>
Diffstat (limited to 'packages/backend/src/core/CacheService.ts')
| -rw-r--r-- | packages/backend/src/core/CacheService.ts | 354 |
1 files changed, 293 insertions, 61 deletions
diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index 1cf63221f9..2d37cd6bab 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -5,14 +5,16 @@ import { Inject, Injectable } from '@nestjs/common'; import * as Redis from 'ioredis'; -import { IsNull } from 'typeorm'; -import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiFollowing, MiNote } from '@/models/_.js'; +import { In, IsNull } from 'typeorm'; +import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiNote, MiFollowing } from '@/models/_.js'; import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js'; +import { QuantumKVCache } from '@/misc/QuantumKVCache.js'; import type { MiLocalUser, MiUser } from '@/models/User.js'; import { DI } from '@/di-symbols.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { bindThis } from '@/decorators.js'; -import type { GlobalEvents } from '@/core/GlobalEventService.js'; +import type { InternalEventTypes } from '@/core/GlobalEventService.js'; +import { InternalEventService } from '@/core/InternalEventService.js'; import type { OnApplicationShutdown } from '@nestjs/common'; export interface FollowStats { @@ -27,7 +29,7 @@ export interface CachedTranslation { text: string | undefined; } -interface CachedTranslationEntity { +export interface CachedTranslationEntity { l?: string; t?: string; u?: number; @@ -39,14 +41,16 @@ export class CacheService implements OnApplicationShutdown { public localUserByNativeTokenCache: MemoryKVCache<MiLocalUser | null>; public localUserByIdCache: MemoryKVCache<MiLocalUser>; public uriPersonCache: MemoryKVCache<MiUser | null>; - public userProfileCache: RedisKVCache<MiUserProfile>; - public userMutingsCache: RedisKVCache<Set<string>>; - public userBlockingCache: RedisKVCache<Set<string>>; - public userBlockedCache: RedisKVCache<Set<string>>; // NOTE: 「被」Blockキャッシュ - public renoteMutingsCache: RedisKVCache<Set<string>>; - public userFollowingsCache: RedisKVCache<Record<string, Pick<MiFollowing, 'withReplies'> | undefined>>; - private readonly userFollowStatsCache = new MemoryKVCache<FollowStats>(1000 * 60 * 10); // 10 minutes - private readonly translationsCache: RedisKVCache<CachedTranslationEntity>; + public userProfileCache: QuantumKVCache<MiUserProfile>; + public userMutingsCache: QuantumKVCache<Set<string>>; + public userBlockingCache: QuantumKVCache<Set<string>>; + public userBlockedCache: QuantumKVCache<Set<string>>; // NOTE: 「被」Blockキャッシュ + public renoteMutingsCache: QuantumKVCache<Set<string>>; + public userFollowingsCache: QuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>; + public userFollowersCache: QuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>; + public hibernatedUserCache: QuantumKVCache<boolean>; + protected userFollowStatsCache = new MemoryKVCache<FollowStats>(1000 * 60 * 10); // 10 minutes + protected translationsCache: RedisKVCache<CachedTranslationEntity>; constructor( @Inject(DI.redis) @@ -74,6 +78,7 @@ export class CacheService implements OnApplicationShutdown { private followingsRepository: FollowingsRepository, private userEntityService: UserEntityService, + private readonly internalEventService: InternalEventService, ) { //this.onMessage = this.onMessage.bind(this); @@ -82,58 +87,148 @@ export class CacheService implements OnApplicationShutdown { this.localUserByIdCache = new MemoryKVCache<MiLocalUser>(1000 * 60 * 5); // 5m this.uriPersonCache = new MemoryKVCache<MiUser | null>(1000 * 60 * 5); // 5m - this.userProfileCache = new RedisKVCache<MiUserProfile>(this.redisClient, 'userProfile', { + this.userProfileCache = new QuantumKVCache(this.internalEventService, 'userProfile', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m fetcher: (key) => this.userProfilesRepository.findOneByOrFail({ userId: key }), - toRedisConverter: (value) => JSON.stringify(value), - fromRedisConverter: (value) => JSON.parse(value), // TODO: date型の考慮 + bulkFetcher: userIds => this.userProfilesRepository.findBy({ userId: In(userIds) }).then(ps => ps.map(p => [p.userId, p])), }); - this.userMutingsCache = new RedisKVCache<Set<string>>(this.redisClient, 'userMutings', { + this.userMutingsCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userMutings', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m fetcher: (key) => this.mutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))), - toRedisConverter: (value) => JSON.stringify(Array.from(value)), - fromRedisConverter: (value) => new Set(JSON.parse(value)), + bulkFetcher: muterIds => this.mutingsRepository + .createQueryBuilder('muting') + .select('"muting"."muterId"', 'muterId') + .addSelect('array_agg("muting"."muteeId")', 'muteeIds') + .where({ muterId: In(muterIds) }) + .groupBy('muting.muterId') + .getRawMany<{ muterId: string, muteeIds: string[] }>() + .then(ms => ms.map(m => [m.muterId, new Set(m.muteeIds)])), }); - this.userBlockingCache = new RedisKVCache<Set<string>>(this.redisClient, 'userBlocking', { + this.userBlockingCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userBlocking', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m fetcher: (key) => this.blockingsRepository.find({ where: { blockerId: key }, select: ['blockeeId'] }).then(xs => new Set(xs.map(x => x.blockeeId))), - toRedisConverter: (value) => JSON.stringify(Array.from(value)), - fromRedisConverter: (value) => new Set(JSON.parse(value)), + bulkFetcher: blockerIds => this.blockingsRepository + .createQueryBuilder('blocking') + .select('"blocking"."blockerId"', 'blockerId') + .addSelect('array_agg("blocking"."blockeeId")', 'blockeeIds') + .where({ blockerId: In(blockerIds) }) + .groupBy('blocking.blockerId') + .getRawMany<{ blockerId: string, blockeeIds: string[] }>() + .then(ms => ms.map(m => [m.blockerId, new Set(m.blockeeIds)])), }); - this.userBlockedCache = new RedisKVCache<Set<string>>(this.redisClient, 'userBlocked', { + this.userBlockedCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userBlocked', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m fetcher: (key) => this.blockingsRepository.find({ where: { blockeeId: key }, select: ['blockerId'] }).then(xs => new Set(xs.map(x => x.blockerId))), - toRedisConverter: (value) => JSON.stringify(Array.from(value)), - fromRedisConverter: (value) => new Set(JSON.parse(value)), + bulkFetcher: blockeeIds => this.blockingsRepository + .createQueryBuilder('blocking') + .select('"blocking"."blockeeId"', 'blockeeId') + .addSelect('array_agg("blocking"."blockeeId")', 'blockeeIds') + .where({ blockeeId: In(blockeeIds) }) + .groupBy('blocking.blockeeId') + .getRawMany<{ blockeeId: string, blockerIds: string[] }>() + .then(ms => ms.map(m => [m.blockeeId, new Set(m.blockerIds)])), }); - this.renoteMutingsCache = new RedisKVCache<Set<string>>(this.redisClient, 'renoteMutings', { + this.renoteMutingsCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'renoteMutings', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m fetcher: (key) => this.renoteMutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))), - toRedisConverter: (value) => JSON.stringify(Array.from(value)), - fromRedisConverter: (value) => new Set(JSON.parse(value)), + bulkFetcher: muterIds => this.renoteMutingsRepository + .createQueryBuilder('muting') + .select('"muting"."muterId"', 'muterId') + .addSelect('array_agg("muting"."muteeId")', 'muteeIds') + .where({ muterId: In(muterIds) }) + .groupBy('muting.muterId') + .getRawMany<{ muterId: string, muteeIds: string[] }>() + .then(ms => ms.map(m => [m.muterId, new Set(m.muteeIds)])), }); - this.userFollowingsCache = new RedisKVCache<Record<string, Pick<MiFollowing, 'withReplies'> | undefined>>(this.redisClient, 'userFollowings', { + this.userFollowingsCache = new QuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>(this.internalEventService, 'userFollowings', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m - fetcher: (key) => this.followingsRepository.find({ where: { followerId: key }, select: ['followeeId', 'withReplies'] }).then(xs => { - const obj: Record<string, Pick<MiFollowing, 'withReplies'> | undefined> = {}; - for (const x of xs) { - obj[x.followeeId] = { withReplies: x.withReplies }; + fetcher: (key) => this.followingsRepository.findBy({ followerId: key }).then(xs => new Map(xs.map(f => [f.followeeId, f]))), + bulkFetcher: followerIds => this.followingsRepository + .findBy({ followerId: In(followerIds) }) + .then(fs => fs + .reduce((groups, f) => { + let group = groups.get(f.followerId); + if (!group) { + group = new Map(); + groups.set(f.followerId, group); + } + group.set(f.followeeId, f); + return groups; + }, new Map<string, Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>)), + }); + + this.userFollowersCache = new QuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>(this.internalEventService, 'userFollowers', { + lifetime: 1000 * 60 * 30, // 30m + fetcher: followeeId => this.followingsRepository.findBy({ followeeId: followeeId }).then(xs => new Map(xs.map(x => [x.followerId, x]))), + bulkFetcher: followeeIds => this.followingsRepository + .findBy({ followeeId: In(followeeIds) }) + .then(fs => fs + .reduce((groups, f) => { + let group = groups.get(f.followeeId); + if (!group) { + group = new Map(); + groups.set(f.followeeId, group); + } + group.set(f.followerId, f); + return groups; + }, new Map<string, Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>)), + }); + + this.hibernatedUserCache = new QuantumKVCache<boolean>(this.internalEventService, 'hibernatedUsers', { + lifetime: 1000 * 60 * 30, // 30m + fetcher: async userId => { + const { isHibernated } = await this.usersRepository.findOneOrFail({ + where: { id: userId }, + select: { isHibernated: true }, + }); + return isHibernated; + }, + bulkFetcher: async userIds => { + const results = await this.usersRepository.find({ + where: { id: In(userIds) }, + select: { id: true, isHibernated: true }, + }); + return results.map(({ id, isHibernated }) => [id, isHibernated]); + }, + onChanged: async userIds => { + // We only update local copies since each process will get this event, but we can have user objects in multiple different caches. + // Before doing anything else we must "find" all the objects to update. + const userObjects = new Map<string, MiUser[]>(); + const toUpdate: string[] = []; + for (const uid of userIds) { + const toAdd: MiUser[] = []; + + const localUserById = this.localUserByIdCache.get(uid); + if (localUserById) toAdd.push(localUserById); + + const userById = this.userByIdCache.get(uid); + if (userById) toAdd.push(userById); + + if (toAdd.length > 0) { + toUpdate.push(uid); + userObjects.set(uid, toAdd); + } + } + + // In many cases, we won't have to do anything. + // Skipping the DB fetch ensures that this remains a single-step synchronous process. + if (toUpdate.length > 0) { + const hibernations = await this.usersRepository.find({ where: { id: In(toUpdate) }, select: { id: true, isHibernated: true } }); + for (const { id, isHibernated } of hibernations) { + const users = userObjects.get(id); + if (users) { + for (const u of users) { + u.isHibernated = isHibernated; + } + } + } } - return obj; - }), - toRedisConverter: (value) => JSON.stringify(value), - fromRedisConverter: (value) => JSON.parse(value), + }, }); this.translationsCache = new RedisKVCache<CachedTranslationEntity>(this.redisClient, 'translations', { @@ -143,20 +238,21 @@ export class CacheService implements OnApplicationShutdown { // NOTE: チャンネルのフォロー状況キャッシュはChannelFollowingServiceで行っている - this.redisForSub.on('message', this.onMessage); + this.internalEventService.on('userChangeSuspendedState', this.onUserEvent); + this.internalEventService.on('userChangeDeletedState', this.onUserEvent); + this.internalEventService.on('remoteUserUpdated', this.onUserEvent); + this.internalEventService.on('localUserUpdated', this.onUserEvent); + this.internalEventService.on('userChangeSuspendedState', this.onUserEvent); + this.internalEventService.on('userTokenRegenerated', this.onTokenEvent); + this.internalEventService.on('follow', this.onFollowEvent); + this.internalEventService.on('unfollow', this.onFollowEvent); } @bindThis - private async onMessage(_: string, data: string): Promise<void> { - const obj = JSON.parse(data); - - if (obj.channel === 'internal') { - const { type, body } = obj.message as GlobalEvents['internal']['payload']; - switch (type) { - case 'userChangeSuspendedState': - case 'userChangeDeletedState': - case 'remoteUserUpdated': - case 'localUserUpdated': { + private async onUserEvent<E extends 'userChangeSuspendedState' | 'userChangeDeletedState' | 'remoteUserUpdated' | 'localUserUpdated'>(body: InternalEventTypes[E], _: E, isLocal: boolean): Promise<void> { + { + { + { const user = await this.usersRepository.findOneBy({ id: body.id }); if (user == null) { this.userByIdCache.delete(body.id); @@ -166,6 +262,18 @@ export class CacheService implements OnApplicationShutdown { this.uriPersonCache.delete(k); } } + if (isLocal) { + await Promise.all([ + this.userProfileCache.delete(body.id), + this.userMutingsCache.delete(body.id), + this.userBlockingCache.delete(body.id), + this.userBlockedCache.delete(body.id), + this.renoteMutingsCache.delete(body.id), + this.userFollowingsCache.delete(body.id), + this.userFollowersCache.delete(body.id), + this.hibernatedUserCache.delete(body.id), + ]); + } } else { this.userByIdCache.set(user.id, user); for (const [k, v] of this.uriPersonCache.entries) { @@ -178,20 +286,37 @@ export class CacheService implements OnApplicationShutdown { this.localUserByIdCache.set(user.id, user); } } - break; } - case 'userTokenRegenerated': { + } + } + } + + @bindThis + private async onTokenEvent<E extends 'userTokenRegenerated'>(body: InternalEventTypes[E]): Promise<void> { + { + { + { const user = await this.usersRepository.findOneByOrFail({ id: body.id }) as MiLocalUser; this.localUserByNativeTokenCache.delete(body.oldToken); this.localUserByNativeTokenCache.set(body.newToken, user); - break; } + } + } + } + + @bindThis + private async onFollowEvent<E extends 'follow' | 'unfollow'>(body: InternalEventTypes[E], type: E): Promise<void> { + { + switch (type) { case 'follow': { const follower = this.userByIdCache.get(body.followerId); if (follower) follower.followingCount++; const followee = this.userByIdCache.get(body.followeeId); if (followee) followee.followersCount++; - this.userFollowingsCache.delete(body.followerId); + await Promise.all([ + this.userFollowingsCache.delete(body.followerId), + this.userFollowersCache.delete(body.followeeId), + ]); this.userFollowStatsCache.delete(body.followerId); this.userFollowStatsCache.delete(body.followeeId); break; @@ -201,13 +326,14 @@ export class CacheService implements OnApplicationShutdown { if (follower) follower.followingCount--; const followee = this.userByIdCache.get(body.followeeId); if (followee) followee.followersCount--; - this.userFollowingsCache.delete(body.followerId); + await Promise.all([ + this.userFollowingsCache.delete(body.followerId), + this.userFollowersCache.delete(body.followeeId), + ]); this.userFollowStatsCache.delete(body.followerId); this.userFollowStatsCache.delete(body.followeeId); break; } - default: - break; } } } @@ -299,8 +425,114 @@ export class CacheService implements OnApplicationShutdown { } @bindThis + public async getUsers(userIds: Iterable<string>): Promise<Map<string, MiUser>> { + const users = new Map<string, MiUser>; + + const toFetch: string[] = []; + for (const userId of userIds) { + const fromCache = this.userByIdCache.get(userId); + if (fromCache) { + users.set(userId, fromCache); + } else { + toFetch.push(userId); + } + } + + if (toFetch.length > 0) { + const fetched = await this.usersRepository.findBy({ + id: In(toFetch), + }); + + for (const user of fetched) { + users.set(user.id, user); + this.userByIdCache.set(user.id, user); + } + } + + return users; + } + + @bindThis + public async isFollowing(follower: string | { id: string }, followee: string | { id: string }): Promise<boolean> { + const followerId = typeof(follower) === 'string' ? follower : follower.id; + const followeeId = typeof(followee) === 'string' ? followee : followee.id; + + // This lets us use whichever one is in memory, falling back to DB fetch via userFollowingsCache. + return this.userFollowersCache.get(followeeId)?.has(followerId) + ?? (await this.userFollowingsCache.fetch(followerId)).has(followeeId); + } + + /** + * Returns all hibernated followers. + */ + @bindThis + public async getHibernatedFollowers(followeeId: string): Promise<MiFollowing[]> { + const followers = await this.getFollowersWithHibernation(followeeId); + return followers.filter(f => f.isFollowerHibernated); + } + + /** + * Returns all non-hibernated followers. + */ + @bindThis + public async getNonHibernatedFollowers(followeeId: string): Promise<MiFollowing[]> { + const followers = await this.getFollowersWithHibernation(followeeId); + return followers.filter(f => !f.isFollowerHibernated); + } + + /** + * Returns follower relations with populated isFollowerHibernated. + * If you don't need this field, then please use userFollowersCache directly for reduced overhead. + */ + @bindThis + public async getFollowersWithHibernation(followeeId: string): Promise<MiFollowing[]> { + const followers = await this.userFollowersCache.fetch(followeeId); + const hibernations = await this.hibernatedUserCache.fetchMany(followers.keys()).then(fs => fs.reduce((map, f) => { + map.set(f[0], f[1]); + return map; + }, new Map<string, boolean>)); + return Array.from(followers.values()).map(following => ({ + ...following, + isFollowerHibernated: hibernations.get(following.followerId) ?? false, + })); + } + + /** + * Refreshes follower and following relations for the given user. + */ + @bindThis + public async refreshFollowRelationsFor(userId: string): Promise<void> { + const followings = await this.userFollowingsCache.refresh(userId); + const followees = Array.from(followings.values()).map(f => f.followeeId); + await this.userFollowersCache.deleteMany(followees); + } + + @bindThis + public clear(): void { + this.userByIdCache.clear(); + this.localUserByNativeTokenCache.clear(); + this.localUserByIdCache.clear(); + this.uriPersonCache.clear(); + this.userProfileCache.clear(); + this.userMutingsCache.clear(); + this.userBlockingCache.clear(); + this.userBlockedCache.clear(); + this.renoteMutingsCache.clear(); + this.userFollowingsCache.clear(); + this.userFollowStatsCache.clear(); + this.translationsCache.clear(); + } + + @bindThis public dispose(): void { - this.redisForSub.off('message', this.onMessage); + this.internalEventService.off('userChangeSuspendedState', this.onUserEvent); + this.internalEventService.off('userChangeDeletedState', this.onUserEvent); + this.internalEventService.off('remoteUserUpdated', this.onUserEvent); + this.internalEventService.off('localUserUpdated', this.onUserEvent); + this.internalEventService.off('userChangeSuspendedState', this.onUserEvent); + this.internalEventService.off('userTokenRegenerated', this.onTokenEvent); + this.internalEventService.off('follow', this.onFollowEvent); + this.internalEventService.off('unfollow', this.onFollowEvent); this.userByIdCache.dispose(); this.localUserByNativeTokenCache.dispose(); this.localUserByIdCache.dispose(); |