From fa68751a19877474bf78a80ef7204102296f0f17 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Sun, 8 Jun 2025 19:52:59 -0400 Subject: normalize userFollowingsCache / userFollowersCache and add hibernatedUserCache to reduce the number of cache-clears and allow use of caching in many more places --- packages/backend/src/core/CacheService.ts | 316 ++++++++++++++++-------------- 1 file changed, 173 insertions(+), 143 deletions(-) (limited to 'packages/backend/src/core/CacheService.ts') diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index e8b26f8b9b..9c68597441 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -6,7 +6,7 @@ import { Inject, Injectable } from '@nestjs/common'; import * as Redis from 'ioredis'; import { In, IsNull } from 'typeorm'; -import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiNote } from '@/models/_.js'; +import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiNote, MiFollowing } from '@/models/_.js'; import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js'; import { QuantumKVCache } from '@/misc/QuantumKVCache.js'; import type { MiLocalUser, MiUser } from '@/models/User.js'; @@ -46,8 +46,9 @@ export class CacheService implements OnApplicationShutdown { public userBlockingCache: QuantumKVCache>; public userBlockedCache: QuantumKVCache>; // NOTE: 「被」Blockキャッシュ public renoteMutingsCache: QuantumKVCache>; - public userFollowingsCache: QuantumKVCache>; - public userFollowersCache: QuantumKVCache>; + public userFollowingsCache: QuantumKVCache>>; + public userFollowersCache: QuantumKVCache>>; + public hibernatedUserCache: QuantumKVCache; protected userFollowStatsCache = new MemoryKVCache(1000 * 60 * 10); // 10 minutes protected translationsCache: RedisKVCache; @@ -89,36 +90,145 @@ export class CacheService implements OnApplicationShutdown { this.userProfileCache = new QuantumKVCache(this.internalEventService, 'userProfile', { lifetime: 1000 * 60 * 30, // 30m fetcher: (key) => this.userProfilesRepository.findOneByOrFail({ userId: key }), + bulkFetcher: userIds => this.userProfilesRepository.findBy({ userId: In(userIds) }).then(ps => ps.map(p => [p.userId, p])), }); this.userMutingsCache = new QuantumKVCache>(this.internalEventService, 'userMutings', { lifetime: 1000 * 60 * 30, // 30m fetcher: (key) => this.mutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))), + bulkFetcher: muterIds => this.mutingsRepository + .createQueryBuilder('muting') + .select('"muting"."muterId"', 'muterId') + .addSelect('array_agg("muting"."muteeId")', 'muteeIds') + .where({ muterId: In(muterIds) }) + .groupBy('muting.muterId') + .getRawMany<{ muterId: string, muteeIds: string[] }>() + .then(ms => ms.map(m => [m.muterId, new Set(m.muteeIds)])), }); this.userBlockingCache = new QuantumKVCache>(this.internalEventService, 'userBlocking', { lifetime: 1000 * 60 * 30, // 30m fetcher: (key) => this.blockingsRepository.find({ where: { blockerId: key }, select: ['blockeeId'] }).then(xs => new Set(xs.map(x => x.blockeeId))), + bulkFetcher: blockerIds => this.blockingsRepository + .createQueryBuilder('blocking') + .select('"blocking"."blockerId"', 'blockerId') + .addSelect('array_agg("blocking"."blockeeId")', 'blockeeIds') + .where({ blockerId: In(blockerIds) }) + .groupBy('blocking.blockerId') + .getRawMany<{ blockerId: string, blockeeIds: string[] }>() + .then(ms => ms.map(m => [m.blockerId, new Set(m.blockeeIds)])), }); this.userBlockedCache = new QuantumKVCache>(this.internalEventService, 'userBlocked', { lifetime: 1000 * 60 * 30, // 30m fetcher: (key) => this.blockingsRepository.find({ where: { blockeeId: key }, select: ['blockerId'] }).then(xs => new Set(xs.map(x => x.blockerId))), + bulkFetcher: blockeeIds => this.blockingsRepository + .createQueryBuilder('blocking') + .select('"blocking"."blockeeId"', 'blockeeId') + .addSelect('array_agg("blocking"."blockeeId")', 'blockeeIds') + .where({ blockeeId: In(blockeeIds) }) + .groupBy('blocking.blockeeId') + .getRawMany<{ blockeeId: string, blockerIds: string[] }>() + .then(ms => ms.map(m => [m.blockeeId, new Set(m.blockerIds)])), }); this.renoteMutingsCache = new QuantumKVCache>(this.internalEventService, 'renoteMutings', { lifetime: 1000 * 60 * 30, // 30m fetcher: (key) => this.renoteMutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))), + bulkFetcher: muterIds => this.renoteMutingsRepository + .createQueryBuilder('muting') + .select('"muting"."muterId"', 'muterId') + .addSelect('array_agg("muting"."muteeId")', 'muteeIds') + .where({ muterId: In(muterIds) }) + .groupBy('muting.muterId') + .getRawMany<{ muterId: string, muteeIds: string[] }>() + .then(ms => ms.map(m => [m.muterId, new Set(m.muteeIds)])), }); - this.userFollowingsCache = new QuantumKVCache>(this.internalEventService, 'userFollowings', { + this.userFollowingsCache = new QuantumKVCache>>(this.internalEventService, 'userFollowings', { lifetime: 1000 * 60 * 30, // 30m - fetcher: (key) => this.followingsRepository.find({ where: { followerId: key }, select: ['followeeId', 'withReplies'] }).then(xs => new Map(xs.map(f => [f.followeeId, { withReplies: f.withReplies }]))), + fetcher: (key) => this.followingsRepository.findBy({ followerId: key }).then(xs => new Map(xs.map(f => [f.followeeId, f]))), + bulkFetcher: followerIds => this.followingsRepository + .findBy({ followerId: In(followerIds) }) + .then(fs => fs + .reduce((groups, f) => { + let group = groups.get(f.followerId); + if (!group) { + group = new Map(); + groups.set(f.followerId, group); + } + group.set(f.followeeId, f); + return groups; + }, {} as Map>>)), + }); + + this.userFollowersCache = new QuantumKVCache>>(this.internalEventService, 'userFollowers', { + lifetime: 1000 * 60 * 30, // 30m + fetcher: followeeId => this.followingsRepository.findBy({ followeeId: followeeId }).then(xs => new Map(xs.map(x => [x.followerId, x]))), + bulkFetcher: followeeIds => this.followingsRepository + .findBy({ followeeId: In(followeeIds) }) + .then(fs => fs + .reduce((groups, f) => { + let group = groups.get(f.followeeId); + if (!group) { + group = new Map(); + groups.set(f.followeeId, group); + } + group.set(f.followerId, f); + return groups; + }, {} as Map>>)), }); - this.userFollowersCache = new QuantumKVCache>(this.internalEventService, 'userFollowers', { + this.hibernatedUserCache = new QuantumKVCache(this.internalEventService, 'hibernatedUsers', { lifetime: 1000 * 60 * 30, // 30m - fetcher: (key) => this.followingsRepository.find({ where: { followeeId: key }, select: ['followerId'] }).then(xs => new Set(xs.map(x => x.followerId))), + fetcher: async userId => { + const { isHibernated } = await this.usersRepository.findOneOrFail({ + where: { id: userId }, + select: { isHibernated: true }, + }); + return isHibernated; + }, + bulkFetcher: async userIds => { + const results = await this.usersRepository.find({ + where: { id: In(userIds) }, + select: { id: true, isHibernated: true }, + }); + return results.map(({ id, isHibernated }) => [id, isHibernated]); + }, + onChanged: async userIds => { + // We only update local copies since each process will get this event, but we can have user objects in multiple different caches. + // Before doing anything else we must "find" all the objects to update. + const userObjects = new Map(); + const toUpdate: string[] = []; + for (const uid of userIds) { + const toAdd: MiUser[] = []; + + const localUserById = this.localUserByIdCache.get(uid); + if (localUserById) toAdd.push(localUserById); + + const userById = this.userByIdCache.get(uid); + if (userById) toAdd.push(userById); + + if (toAdd.length > 0) { + toUpdate.push(uid); + userObjects.set(uid, toAdd); + } + } + + // In many cases, we won't have to do anything. + // Skipping the DB fetch ensures that this remains a single-step synchronous process. + if (toUpdate.length > 0) { + const hibernations = await this.usersRepository.find({ where: { id: In(toUpdate) }, select: { id: true, isHibernated: true } }); + for (const { id, isHibernated } of hibernations) { + const users = userObjects.get(id); + if (users) { + for (const u of users) { + u.isHibernated = isHibernated; + } + } + } + } + }, }); this.translationsCache = new RedisKVCache(this.redisClient, 'translations', { @@ -161,6 +271,7 @@ export class CacheService implements OnApplicationShutdown { this.renoteMutingsCache.delete(body.id), this.userFollowingsCache.delete(body.id), this.userFollowersCache.delete(body.id), + this.hibernatedUserCache.delete(body.id), ]); } } else { @@ -312,142 +423,6 @@ export class CacheService implements OnApplicationShutdown { } @bindThis - public async getUserFollowings(userIds: Iterable): Promise>> { - const followings = new Map>(); - - const toFetch: string[] = []; - for (const userId of userIds) { - const fromCache = this.userFollowingsCache.get(userId); - if (fromCache) { - followings.set(userId, fromCache); - } else { - toFetch.push(userId); - } - } - - if (toFetch.length > 0) { - const fetchedFollowings = await this.followingsRepository - .createQueryBuilder('following') - .select([ - 'following.followerId', - 'following.followeeId', - 'following.withReplies', - ]) - .where({ - followerId: In(toFetch), - }) - .getMany(); - - const toCache = new Map>(); - - // Pivot to a map - for (const { followerId, followeeId, withReplies } of fetchedFollowings) { - // Queue for cache - let cacheMap = toCache.get(followerId); - if (!cacheMap) { - cacheMap = new Map(); - toCache.set(followerId, cacheMap); - } - cacheMap.set(followeeId, { withReplies }); - - // Queue for return - let returnSet = followings.get(followerId); - if (!returnSet) { - returnSet = new Map(); - followings.set(followerId, returnSet); - } - returnSet.set(followeeId, { withReplies }); - } - - // Update cache to speed up future calls - this.userFollowingsCache.addMany(toCache); - } - - return followings; - } - - @bindThis - public async getUserBlockers(userIds: Iterable): Promise>> { - const blockers = new Map>(); - - const toFetch: string[] = []; - for (const userId of userIds) { - const fromCache = this.userBlockedCache.get(userId); - if (fromCache) { - blockers.set(userId, fromCache); - } else { - toFetch.push(userId); - } - } - - if (toFetch.length > 0) { - const fetchedBlockers = await this.blockingsRepository.createQueryBuilder('blocking') - .select([ - 'blocking.blockerId', - 'blocking.blockeeId', - ]) - .where({ - blockeeId: In(toFetch), - }) - .getMany(); - - const toCache = new Map>(); - - // Pivot to a map - for (const { blockerId, blockeeId } of fetchedBlockers) { - // Queue for cache - let cacheSet = toCache.get(blockeeId); - if (!cacheSet) { - cacheSet = new Set(); - toCache.set(blockeeId, cacheSet); - } - cacheSet.add(blockerId); - - // Queue for return - let returnSet = blockers.get(blockeeId); - if (!returnSet) { - returnSet = new Set(); - blockers.set(blockeeId, returnSet); - } - returnSet.add(blockerId); - } - - // Update cache to speed up future calls - this.userBlockedCache.addMany(toCache); - } - - return blockers; - } - - public async getUserProfiles(userIds: Iterable): Promise> { - const profiles = new Map; - - const toFetch: string[] = []; - for (const userId of userIds) { - const fromCache = this.userProfileCache.get(userId); - if (fromCache) { - profiles.set(userId, fromCache); - } else { - toFetch.push(userId); - } - } - - if (toFetch.length > 0) { - const fetched = await this.userProfilesRepository.findBy({ - userId: In(toFetch), - }); - - for (const profile of fetched) { - profiles.set(profile.userId, profile); - } - - const toCache = new Map(fetched.map(p => [p.userId, p])); - this.userProfileCache.addMany(toCache); - } - - return profiles; - } - public async getUsers(userIds: Iterable): Promise> { const users = new Map; @@ -475,6 +450,61 @@ export class CacheService implements OnApplicationShutdown { return users; } + @bindThis + public async isFollowing(follower: string | { id: string }, followee: string | { id: string }): Promise { + const followerId = typeof(follower) === 'string' ? follower : follower.id; + const followeeId = typeof(followee) === 'string' ? followee : followee.id; + + // This lets us use whichever one is in memory, falling back to DB fetch via userFollowingsCache. + return this.userFollowersCache.get(followeeId)?.has(followerId) + ?? (await this.userFollowingsCache.fetch(followerId)).has(followeeId); + } + + /** + * Returns all hibernated followers. + */ + @bindThis + public async getHibernatedFollowers(followeeId: string): Promise { + const followers = await this.getFollowersWithHibernation(followeeId); + return followers.filter(f => f.isFollowerHibernated); + } + + /** + * Returns all non-hibernated followers. + */ + @bindThis + public async getNonHibernatedFollowers(followeeId: string): Promise { + const followers = await this.getFollowersWithHibernation(followeeId); + return followers.filter(f => !f.isFollowerHibernated); + } + + /** + * Returns follower relations with populated isFollowerHibernated. + * If you don't need this field, then please use userFollowersCache directly for reduced overhead. + */ + @bindThis + public async getFollowersWithHibernation(followeeId: string): Promise { + const followers = await this.userFollowersCache.fetch(followeeId); + const hibernations = await this.hibernatedUserCache.fetchMany(followers.keys()).then(fs => fs.reduce((map, f) => { + map.set(f[0], f[1]); + return map; + }, new Map)); + return Array.from(followers.values()).map(following => ({ + ...following, + isFollowerHibernated: hibernations.get(following.followerId) ?? false, + })); + } + + /** + * Refreshes follower and following relations for the given user. + */ + @bindThis + public async refreshFollowRelationsFor(userId: string): Promise { + const followings = await this.userFollowingsCache.refresh(userId); + const followees = Array.from(followings.values()).map(f => f.followeeId); + await this.userFollowersCache.deleteMany(followees); + } + @bindThis public clear(): void { this.userByIdCache.clear(); -- cgit v1.2.3-freya