diff options
| author | Hazelnoot <acomputerdog@gmail.com> | 2025-06-12 15:42:39 +0000 |
|---|---|---|
| committer | Hazelnoot <acomputerdog@gmail.com> | 2025-06-12 15:42:39 +0000 |
| commit | 55551a5a8a3a218b9f56491f2dbe3f70e3ef3556 (patch) | |
| tree | 927f789e18162a91da0144356d040995864613ee /packages/backend/src/core | |
| parent | merge: Enforce DM visibility in generateVisibilityQuery (!1108) (diff) | |
| parent | fix relations in MastodonDataService.ts (diff) | |
| download | sharkey-55551a5a8a3a218b9f56491f2dbe3f70e3ef3556.tar.gz sharkey-55551a5a8a3a218b9f56491f2dbe3f70e3ef3556.tar.bz2 sharkey-55551a5a8a3a218b9f56491f2dbe3f70e3ef3556.zip | |
merge: Avoid more N+1 queries in NoteEntityService and UserEntityService (!1099)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/1099
Approved-by: dakkar <dakkar@thenautilus.net>
Approved-by: Marie <github@yuugi.dev>
Diffstat (limited to 'packages/backend/src/core')
28 files changed, 1037 insertions, 589 deletions
diff --git a/packages/backend/src/core/AccountMoveService.ts b/packages/backend/src/core/AccountMoveService.ts index 738026f753..e107f02796 100644 --- a/packages/backend/src/core/AccountMoveService.ts +++ b/packages/backend/src/core/AccountMoveService.ts @@ -26,6 +26,7 @@ import PerUserFollowingChart from '@/core/chart/charts/per-user-following.js'; import { SystemAccountService } from '@/core/SystemAccountService.js'; import { RoleService } from '@/core/RoleService.js'; import { AntennaService } from '@/core/AntennaService.js'; +import { CacheService } from '@/core/CacheService.js'; @Injectable() export class AccountMoveService { @@ -68,6 +69,7 @@ export class AccountMoveService { private systemAccountService: SystemAccountService, private roleService: RoleService, private antennaService: AntennaService, + private readonly cacheService: CacheService, ) { } @@ -107,12 +109,10 @@ export class AccountMoveService { this.globalEventService.publishMainStream(src.id, 'meUpdated', iObj); // Unfollow after 24 hours - const followings = await this.followingsRepository.findBy({ - followerId: src.id, - }); - this.queueService.createDelayedUnfollowJob(followings.map(following => ({ + const followings = await this.cacheService.userFollowingsCache.fetch(src.id); + this.queueService.createDelayedUnfollowJob(Array.from(followings.keys()).map(followeeId => ({ from: { id: src.id }, - to: { id: following.followeeId }, + to: { id: followeeId }, })), process.env.NODE_ENV === 'test' ? 10000 : 1000 * 60 * 60 * 24); await this.postMoveProcess(src, dst); @@ -138,11 +138,9 @@ export class AccountMoveService { // follow the new account const proxy = await this.systemAccountService.fetch('proxy'); - const followings = await this.followingsRepository.findBy({ - followeeId: src.id, - followerHost: IsNull(), // follower is local - followerId: Not(proxy.id), - }); + const followings = await this.cacheService.userFollowersCache.fetch(src.id) + .then(fs => Array.from(fs.values()) + .filter(f => f.followerHost == null && f.followerId !== proxy.id)); const followJobs = followings.map(following => ({ from: { id: following.followerId }, to: { id: dst.id }, @@ -318,9 +316,9 @@ export class AccountMoveService { await this.usersRepository.decrement({ id: In(localFollowerIds) }, 'followingCount', 1); // Decrease follower counts of local followees by 1. - const oldFollowings = await this.followingsRepository.findBy({ followerId: oldAccount.id }); - if (oldFollowings.length > 0) { - await this.usersRepository.decrement({ id: In(oldFollowings.map(following => following.followeeId)) }, 'followersCount', 1); + const oldFollowings = await this.cacheService.userFollowingsCache.fetch(oldAccount.id); + if (oldFollowings.size > 0) { + await this.usersRepository.decrement({ id: In(Array.from(oldFollowings.keys())) }, 'followersCount', 1); } // Update instance stats by decreasing remote followers count by the number of local followers who were following the old account. diff --git a/packages/backend/src/core/AntennaService.ts b/packages/backend/src/core/AntennaService.ts index cf696e3599..667df57943 100644 --- a/packages/backend/src/core/AntennaService.ts +++ b/packages/backend/src/core/AntennaService.ts @@ -130,7 +130,8 @@ export class AntennaService implements OnApplicationShutdown { } if (note.visibility === 'followers') { - const isFollowing = Object.hasOwn(await this.cacheService.userFollowingsCache.fetch(antenna.userId), note.userId); + const followings = await this.cacheService.userFollowingsCache.fetch(antenna.userId); + const isFollowing = followings.has(note.userId); if (!isFollowing && antenna.userId !== note.userId) return false; } diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index 1cf63221f9..2d37cd6bab 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -5,14 +5,16 @@ import { Inject, Injectable } from '@nestjs/common'; import * as Redis from 'ioredis'; -import { IsNull } from 'typeorm'; -import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiFollowing, MiNote } from '@/models/_.js'; +import { In, IsNull } from 'typeorm'; +import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiNote, MiFollowing } from '@/models/_.js'; import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js'; +import { QuantumKVCache } from '@/misc/QuantumKVCache.js'; import type { MiLocalUser, MiUser } from '@/models/User.js'; import { DI } from '@/di-symbols.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { bindThis } from '@/decorators.js'; -import type { GlobalEvents } from '@/core/GlobalEventService.js'; +import type { InternalEventTypes } from '@/core/GlobalEventService.js'; +import { InternalEventService } from '@/core/InternalEventService.js'; import type { OnApplicationShutdown } from '@nestjs/common'; export interface FollowStats { @@ -27,7 +29,7 @@ export interface CachedTranslation { text: string | undefined; } -interface CachedTranslationEntity { +export interface CachedTranslationEntity { l?: string; t?: string; u?: number; @@ -39,14 +41,16 @@ export class CacheService implements OnApplicationShutdown { public localUserByNativeTokenCache: MemoryKVCache<MiLocalUser | null>; public localUserByIdCache: MemoryKVCache<MiLocalUser>; public uriPersonCache: MemoryKVCache<MiUser | null>; - public userProfileCache: RedisKVCache<MiUserProfile>; - public userMutingsCache: RedisKVCache<Set<string>>; - public userBlockingCache: RedisKVCache<Set<string>>; - public userBlockedCache: RedisKVCache<Set<string>>; // NOTE: 「被」Blockキャッシュ - public renoteMutingsCache: RedisKVCache<Set<string>>; - public userFollowingsCache: RedisKVCache<Record<string, Pick<MiFollowing, 'withReplies'> | undefined>>; - private readonly userFollowStatsCache = new MemoryKVCache<FollowStats>(1000 * 60 * 10); // 10 minutes - private readonly translationsCache: RedisKVCache<CachedTranslationEntity>; + public userProfileCache: QuantumKVCache<MiUserProfile>; + public userMutingsCache: QuantumKVCache<Set<string>>; + public userBlockingCache: QuantumKVCache<Set<string>>; + public userBlockedCache: QuantumKVCache<Set<string>>; // NOTE: 「被」Blockキャッシュ + public renoteMutingsCache: QuantumKVCache<Set<string>>; + public userFollowingsCache: QuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>; + public userFollowersCache: QuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>; + public hibernatedUserCache: QuantumKVCache<boolean>; + protected userFollowStatsCache = new MemoryKVCache<FollowStats>(1000 * 60 * 10); // 10 minutes + protected translationsCache: RedisKVCache<CachedTranslationEntity>; constructor( @Inject(DI.redis) @@ -74,6 +78,7 @@ export class CacheService implements OnApplicationShutdown { private followingsRepository: FollowingsRepository, private userEntityService: UserEntityService, + private readonly internalEventService: InternalEventService, ) { //this.onMessage = this.onMessage.bind(this); @@ -82,58 +87,148 @@ export class CacheService implements OnApplicationShutdown { this.localUserByIdCache = new MemoryKVCache<MiLocalUser>(1000 * 60 * 5); // 5m this.uriPersonCache = new MemoryKVCache<MiUser | null>(1000 * 60 * 5); // 5m - this.userProfileCache = new RedisKVCache<MiUserProfile>(this.redisClient, 'userProfile', { + this.userProfileCache = new QuantumKVCache(this.internalEventService, 'userProfile', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m fetcher: (key) => this.userProfilesRepository.findOneByOrFail({ userId: key }), - toRedisConverter: (value) => JSON.stringify(value), - fromRedisConverter: (value) => JSON.parse(value), // TODO: date型の考慮 + bulkFetcher: userIds => this.userProfilesRepository.findBy({ userId: In(userIds) }).then(ps => ps.map(p => [p.userId, p])), }); - this.userMutingsCache = new RedisKVCache<Set<string>>(this.redisClient, 'userMutings', { + this.userMutingsCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userMutings', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m fetcher: (key) => this.mutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))), - toRedisConverter: (value) => JSON.stringify(Array.from(value)), - fromRedisConverter: (value) => new Set(JSON.parse(value)), + bulkFetcher: muterIds => this.mutingsRepository + .createQueryBuilder('muting') + .select('"muting"."muterId"', 'muterId') + .addSelect('array_agg("muting"."muteeId")', 'muteeIds') + .where({ muterId: In(muterIds) }) + .groupBy('muting.muterId') + .getRawMany<{ muterId: string, muteeIds: string[] }>() + .then(ms => ms.map(m => [m.muterId, new Set(m.muteeIds)])), }); - this.userBlockingCache = new RedisKVCache<Set<string>>(this.redisClient, 'userBlocking', { + this.userBlockingCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userBlocking', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m fetcher: (key) => this.blockingsRepository.find({ where: { blockerId: key }, select: ['blockeeId'] }).then(xs => new Set(xs.map(x => x.blockeeId))), - toRedisConverter: (value) => JSON.stringify(Array.from(value)), - fromRedisConverter: (value) => new Set(JSON.parse(value)), + bulkFetcher: blockerIds => this.blockingsRepository + .createQueryBuilder('blocking') + .select('"blocking"."blockerId"', 'blockerId') + .addSelect('array_agg("blocking"."blockeeId")', 'blockeeIds') + .where({ blockerId: In(blockerIds) }) + .groupBy('blocking.blockerId') + .getRawMany<{ blockerId: string, blockeeIds: string[] }>() + .then(ms => ms.map(m => [m.blockerId, new Set(m.blockeeIds)])), }); - this.userBlockedCache = new RedisKVCache<Set<string>>(this.redisClient, 'userBlocked', { + this.userBlockedCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userBlocked', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m fetcher: (key) => this.blockingsRepository.find({ where: { blockeeId: key }, select: ['blockerId'] }).then(xs => new Set(xs.map(x => x.blockerId))), - toRedisConverter: (value) => JSON.stringify(Array.from(value)), - fromRedisConverter: (value) => new Set(JSON.parse(value)), + bulkFetcher: blockeeIds => this.blockingsRepository + .createQueryBuilder('blocking') + .select('"blocking"."blockeeId"', 'blockeeId') + .addSelect('array_agg("blocking"."blockeeId")', 'blockeeIds') + .where({ blockeeId: In(blockeeIds) }) + .groupBy('blocking.blockeeId') + .getRawMany<{ blockeeId: string, blockerIds: string[] }>() + .then(ms => ms.map(m => [m.blockeeId, new Set(m.blockerIds)])), }); - this.renoteMutingsCache = new RedisKVCache<Set<string>>(this.redisClient, 'renoteMutings', { + this.renoteMutingsCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'renoteMutings', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m fetcher: (key) => this.renoteMutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))), - toRedisConverter: (value) => JSON.stringify(Array.from(value)), - fromRedisConverter: (value) => new Set(JSON.parse(value)), + bulkFetcher: muterIds => this.renoteMutingsRepository + .createQueryBuilder('muting') + .select('"muting"."muterId"', 'muterId') + .addSelect('array_agg("muting"."muteeId")', 'muteeIds') + .where({ muterId: In(muterIds) }) + .groupBy('muting.muterId') + .getRawMany<{ muterId: string, muteeIds: string[] }>() + .then(ms => ms.map(m => [m.muterId, new Set(m.muteeIds)])), }); - this.userFollowingsCache = new RedisKVCache<Record<string, Pick<MiFollowing, 'withReplies'> | undefined>>(this.redisClient, 'userFollowings', { + this.userFollowingsCache = new QuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>(this.internalEventService, 'userFollowings', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m - fetcher: (key) => this.followingsRepository.find({ where: { followerId: key }, select: ['followeeId', 'withReplies'] }).then(xs => { - const obj: Record<string, Pick<MiFollowing, 'withReplies'> | undefined> = {}; - for (const x of xs) { - obj[x.followeeId] = { withReplies: x.withReplies }; + fetcher: (key) => this.followingsRepository.findBy({ followerId: key }).then(xs => new Map(xs.map(f => [f.followeeId, f]))), + bulkFetcher: followerIds => this.followingsRepository + .findBy({ followerId: In(followerIds) }) + .then(fs => fs + .reduce((groups, f) => { + let group = groups.get(f.followerId); + if (!group) { + group = new Map(); + groups.set(f.followerId, group); + } + group.set(f.followeeId, f); + return groups; + }, new Map<string, Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>)), + }); + + this.userFollowersCache = new QuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>(this.internalEventService, 'userFollowers', { + lifetime: 1000 * 60 * 30, // 30m + fetcher: followeeId => this.followingsRepository.findBy({ followeeId: followeeId }).then(xs => new Map(xs.map(x => [x.followerId, x]))), + bulkFetcher: followeeIds => this.followingsRepository + .findBy({ followeeId: In(followeeIds) }) + .then(fs => fs + .reduce((groups, f) => { + let group = groups.get(f.followeeId); + if (!group) { + group = new Map(); + groups.set(f.followeeId, group); + } + group.set(f.followerId, f); + return groups; + }, new Map<string, Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>)), + }); + + this.hibernatedUserCache = new QuantumKVCache<boolean>(this.internalEventService, 'hibernatedUsers', { + lifetime: 1000 * 60 * 30, // 30m + fetcher: async userId => { + const { isHibernated } = await this.usersRepository.findOneOrFail({ + where: { id: userId }, + select: { isHibernated: true }, + }); + return isHibernated; + }, + bulkFetcher: async userIds => { + const results = await this.usersRepository.find({ + where: { id: In(userIds) }, + select: { id: true, isHibernated: true }, + }); + return results.map(({ id, isHibernated }) => [id, isHibernated]); + }, + onChanged: async userIds => { + // We only update local copies since each process will get this event, but we can have user objects in multiple different caches. + // Before doing anything else we must "find" all the objects to update. + const userObjects = new Map<string, MiUser[]>(); + const toUpdate: string[] = []; + for (const uid of userIds) { + const toAdd: MiUser[] = []; + + const localUserById = this.localUserByIdCache.get(uid); + if (localUserById) toAdd.push(localUserById); + + const userById = this.userByIdCache.get(uid); + if (userById) toAdd.push(userById); + + if (toAdd.length > 0) { + toUpdate.push(uid); + userObjects.set(uid, toAdd); + } + } + + // In many cases, we won't have to do anything. + // Skipping the DB fetch ensures that this remains a single-step synchronous process. + if (toUpdate.length > 0) { + const hibernations = await this.usersRepository.find({ where: { id: In(toUpdate) }, select: { id: true, isHibernated: true } }); + for (const { id, isHibernated } of hibernations) { + const users = userObjects.get(id); + if (users) { + for (const u of users) { + u.isHibernated = isHibernated; + } + } + } } - return obj; - }), - toRedisConverter: (value) => JSON.stringify(value), - fromRedisConverter: (value) => JSON.parse(value), + }, }); this.translationsCache = new RedisKVCache<CachedTranslationEntity>(this.redisClient, 'translations', { @@ -143,20 +238,21 @@ export class CacheService implements OnApplicationShutdown { // NOTE: チャンネルのフォロー状況キャッシュはChannelFollowingServiceで行っている - this.redisForSub.on('message', this.onMessage); + this.internalEventService.on('userChangeSuspendedState', this.onUserEvent); + this.internalEventService.on('userChangeDeletedState', this.onUserEvent); + this.internalEventService.on('remoteUserUpdated', this.onUserEvent); + this.internalEventService.on('localUserUpdated', this.onUserEvent); + this.internalEventService.on('userChangeSuspendedState', this.onUserEvent); + this.internalEventService.on('userTokenRegenerated', this.onTokenEvent); + this.internalEventService.on('follow', this.onFollowEvent); + this.internalEventService.on('unfollow', this.onFollowEvent); } @bindThis - private async onMessage(_: string, data: string): Promise<void> { - const obj = JSON.parse(data); - - if (obj.channel === 'internal') { - const { type, body } = obj.message as GlobalEvents['internal']['payload']; - switch (type) { - case 'userChangeSuspendedState': - case 'userChangeDeletedState': - case 'remoteUserUpdated': - case 'localUserUpdated': { + private async onUserEvent<E extends 'userChangeSuspendedState' | 'userChangeDeletedState' | 'remoteUserUpdated' | 'localUserUpdated'>(body: InternalEventTypes[E], _: E, isLocal: boolean): Promise<void> { + { + { + { const user = await this.usersRepository.findOneBy({ id: body.id }); if (user == null) { this.userByIdCache.delete(body.id); @@ -166,6 +262,18 @@ export class CacheService implements OnApplicationShutdown { this.uriPersonCache.delete(k); } } + if (isLocal) { + await Promise.all([ + this.userProfileCache.delete(body.id), + this.userMutingsCache.delete(body.id), + this.userBlockingCache.delete(body.id), + this.userBlockedCache.delete(body.id), + this.renoteMutingsCache.delete(body.id), + this.userFollowingsCache.delete(body.id), + this.userFollowersCache.delete(body.id), + this.hibernatedUserCache.delete(body.id), + ]); + } } else { this.userByIdCache.set(user.id, user); for (const [k, v] of this.uriPersonCache.entries) { @@ -178,20 +286,37 @@ export class CacheService implements OnApplicationShutdown { this.localUserByIdCache.set(user.id, user); } } - break; } - case 'userTokenRegenerated': { + } + } + } + + @bindThis + private async onTokenEvent<E extends 'userTokenRegenerated'>(body: InternalEventTypes[E]): Promise<void> { + { + { + { const user = await this.usersRepository.findOneByOrFail({ id: body.id }) as MiLocalUser; this.localUserByNativeTokenCache.delete(body.oldToken); this.localUserByNativeTokenCache.set(body.newToken, user); - break; } + } + } + } + + @bindThis + private async onFollowEvent<E extends 'follow' | 'unfollow'>(body: InternalEventTypes[E], type: E): Promise<void> { + { + switch (type) { case 'follow': { const follower = this.userByIdCache.get(body.followerId); if (follower) follower.followingCount++; const followee = this.userByIdCache.get(body.followeeId); if (followee) followee.followersCount++; - this.userFollowingsCache.delete(body.followerId); + await Promise.all([ + this.userFollowingsCache.delete(body.followerId), + this.userFollowersCache.delete(body.followeeId), + ]); this.userFollowStatsCache.delete(body.followerId); this.userFollowStatsCache.delete(body.followeeId); break; @@ -201,13 +326,14 @@ export class CacheService implements OnApplicationShutdown { if (follower) follower.followingCount--; const followee = this.userByIdCache.get(body.followeeId); if (followee) followee.followersCount--; - this.userFollowingsCache.delete(body.followerId); + await Promise.all([ + this.userFollowingsCache.delete(body.followerId), + this.userFollowersCache.delete(body.followeeId), + ]); this.userFollowStatsCache.delete(body.followerId); this.userFollowStatsCache.delete(body.followeeId); break; } - default: - break; } } } @@ -299,8 +425,114 @@ export class CacheService implements OnApplicationShutdown { } @bindThis + public async getUsers(userIds: Iterable<string>): Promise<Map<string, MiUser>> { + const users = new Map<string, MiUser>; + + const toFetch: string[] = []; + for (const userId of userIds) { + const fromCache = this.userByIdCache.get(userId); + if (fromCache) { + users.set(userId, fromCache); + } else { + toFetch.push(userId); + } + } + + if (toFetch.length > 0) { + const fetched = await this.usersRepository.findBy({ + id: In(toFetch), + }); + + for (const user of fetched) { + users.set(user.id, user); + this.userByIdCache.set(user.id, user); + } + } + + return users; + } + + @bindThis + public async isFollowing(follower: string | { id: string }, followee: string | { id: string }): Promise<boolean> { + const followerId = typeof(follower) === 'string' ? follower : follower.id; + const followeeId = typeof(followee) === 'string' ? followee : followee.id; + + // This lets us use whichever one is in memory, falling back to DB fetch via userFollowingsCache. + return this.userFollowersCache.get(followeeId)?.has(followerId) + ?? (await this.userFollowingsCache.fetch(followerId)).has(followeeId); + } + + /** + * Returns all hibernated followers. + */ + @bindThis + public async getHibernatedFollowers(followeeId: string): Promise<MiFollowing[]> { + const followers = await this.getFollowersWithHibernation(followeeId); + return followers.filter(f => f.isFollowerHibernated); + } + + /** + * Returns all non-hibernated followers. + */ + @bindThis + public async getNonHibernatedFollowers(followeeId: string): Promise<MiFollowing[]> { + const followers = await this.getFollowersWithHibernation(followeeId); + return followers.filter(f => !f.isFollowerHibernated); + } + + /** + * Returns follower relations with populated isFollowerHibernated. + * If you don't need this field, then please use userFollowersCache directly for reduced overhead. + */ + @bindThis + public async getFollowersWithHibernation(followeeId: string): Promise<MiFollowing[]> { + const followers = await this.userFollowersCache.fetch(followeeId); + const hibernations = await this.hibernatedUserCache.fetchMany(followers.keys()).then(fs => fs.reduce((map, f) => { + map.set(f[0], f[1]); + return map; + }, new Map<string, boolean>)); + return Array.from(followers.values()).map(following => ({ + ...following, + isFollowerHibernated: hibernations.get(following.followerId) ?? false, + })); + } + + /** + * Refreshes follower and following relations for the given user. + */ + @bindThis + public async refreshFollowRelationsFor(userId: string): Promise<void> { + const followings = await this.userFollowingsCache.refresh(userId); + const followees = Array.from(followings.values()).map(f => f.followeeId); + await this.userFollowersCache.deleteMany(followees); + } + + @bindThis + public clear(): void { + this.userByIdCache.clear(); + this.localUserByNativeTokenCache.clear(); + this.localUserByIdCache.clear(); + this.uriPersonCache.clear(); + this.userProfileCache.clear(); + this.userMutingsCache.clear(); + this.userBlockingCache.clear(); + this.userBlockedCache.clear(); + this.renoteMutingsCache.clear(); + this.userFollowingsCache.clear(); + this.userFollowStatsCache.clear(); + this.translationsCache.clear(); + } + + @bindThis public dispose(): void { - this.redisForSub.off('message', this.onMessage); + this.internalEventService.off('userChangeSuspendedState', this.onUserEvent); + this.internalEventService.off('userChangeDeletedState', this.onUserEvent); + this.internalEventService.off('remoteUserUpdated', this.onUserEvent); + this.internalEventService.off('localUserUpdated', this.onUserEvent); + this.internalEventService.off('userChangeSuspendedState', this.onUserEvent); + this.internalEventService.off('userTokenRegenerated', this.onTokenEvent); + this.internalEventService.off('follow', this.onFollowEvent); + this.internalEventService.off('unfollow', this.onFollowEvent); this.userByIdCache.dispose(); this.localUserByNativeTokenCache.dispose(); this.localUserByIdCache.dispose(); diff --git a/packages/backend/src/core/ChannelFollowingService.ts b/packages/backend/src/core/ChannelFollowingService.ts index 12251595e2..430711fef1 100644 --- a/packages/backend/src/core/ChannelFollowingService.ts +++ b/packages/backend/src/core/ChannelFollowingService.ts @@ -9,14 +9,15 @@ import { DI } from '@/di-symbols.js'; import type { ChannelFollowingsRepository } from '@/models/_.js'; import { MiChannel } from '@/models/_.js'; import { IdService } from '@/core/IdService.js'; -import { GlobalEvents, GlobalEventService } from '@/core/GlobalEventService.js'; +import { GlobalEvents, GlobalEventService, InternalEventTypes } from '@/core/GlobalEventService.js'; import { bindThis } from '@/decorators.js'; import type { MiLocalUser } from '@/models/User.js'; -import { RedisKVCache } from '@/misc/cache.js'; +import { QuantumKVCache } from '@/misc/QuantumKVCache.js'; +import { InternalEventService } from './InternalEventService.js'; @Injectable() export class ChannelFollowingService implements OnModuleInit { - public userFollowingChannelsCache: RedisKVCache<Set<string>>; + public userFollowingChannelsCache: QuantumKVCache<Set<string>>; constructor( @Inject(DI.redis) @@ -27,19 +28,18 @@ export class ChannelFollowingService implements OnModuleInit { private channelFollowingsRepository: ChannelFollowingsRepository, private idService: IdService, private globalEventService: GlobalEventService, + private readonly internalEventService: InternalEventService, ) { - this.userFollowingChannelsCache = new RedisKVCache<Set<string>>(this.redisClient, 'userFollowingChannels', { + this.userFollowingChannelsCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userFollowingChannels', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m fetcher: (key) => this.channelFollowingsRepository.find({ where: { followerId: key }, select: ['followeeId'], }).then(xs => new Set(xs.map(x => x.followeeId))), - toRedisConverter: (value) => JSON.stringify(Array.from(value)), - fromRedisConverter: (value) => new Set(JSON.parse(value)), }); - this.redisForSub.on('message', this.onMessage); + this.internalEventService.on('followChannel', this.onMessage); + this.internalEventService.on('unfollowChannel', this.onMessage); } onModuleInit() { @@ -79,18 +79,15 @@ export class ChannelFollowingService implements OnModuleInit { } @bindThis - private async onMessage(_: string, data: string): Promise<void> { - const obj = JSON.parse(data); - - if (obj.channel === 'internal') { - const { type, body } = obj.message as GlobalEvents['internal']['payload']; + private async onMessage<E extends 'followChannel' | 'unfollowChannel'>(body: InternalEventTypes[E], type: E): Promise<void> { + { switch (type) { case 'followChannel': { - this.userFollowingChannelsCache.refresh(body.userId); + await this.userFollowingChannelsCache.delete(body.userId); break; } case 'unfollowChannel': { - this.userFollowingChannelsCache.delete(body.userId); + await this.userFollowingChannelsCache.delete(body.userId); break; } } @@ -99,6 +96,8 @@ export class ChannelFollowingService implements OnModuleInit { @bindThis public dispose(): void { + this.internalEventService.off('followChannel', this.onMessage); + this.internalEventService.off('unfollowChannel', this.onMessage); this.userFollowingChannelsCache.dispose(); } diff --git a/packages/backend/src/core/CoreModule.ts b/packages/backend/src/core/CoreModule.ts index dd8e61d322..6839ba0159 100644 --- a/packages/backend/src/core/CoreModule.ts +++ b/packages/backend/src/core/CoreModule.ts @@ -41,6 +41,7 @@ import { HttpRequestService } from './HttpRequestService.js'; import { IdService } from './IdService.js'; import { ImageProcessingService } from './ImageProcessingService.js'; import { SystemAccountService } from './SystemAccountService.js'; +import { InternalEventService } from './InternalEventService.js'; import { InternalStorageService } from './InternalStorageService.js'; import { MetaService } from './MetaService.js'; import { MfmService } from './MfmService.js'; @@ -186,6 +187,7 @@ const $HashtagService: Provider = { provide: 'HashtagService', useExisting: Hash const $HttpRequestService: Provider = { provide: 'HttpRequestService', useExisting: HttpRequestService }; const $IdService: Provider = { provide: 'IdService', useExisting: IdService }; const $ImageProcessingService: Provider = { provide: 'ImageProcessingService', useExisting: ImageProcessingService }; +const $InternalEventService: Provider = { provide: 'InternalEventService', useExisting: InternalEventService }; const $InternalStorageService: Provider = { provide: 'InternalStorageService', useExisting: InternalStorageService }; const $MetaService: Provider = { provide: 'MetaService', useExisting: MetaService }; const $MfmService: Provider = { provide: 'MfmService', useExisting: MfmService }; @@ -345,6 +347,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp HttpRequestService, IdService, ImageProcessingService, + InternalEventService, InternalStorageService, MetaService, MfmService, @@ -500,6 +503,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp $HttpRequestService, $IdService, $ImageProcessingService, + $InternalEventService, $InternalStorageService, $MetaService, $MfmService, @@ -656,6 +660,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp HttpRequestService, IdService, ImageProcessingService, + InternalEventService, InternalStorageService, MetaService, MfmService, @@ -810,6 +815,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp $HttpRequestService, $IdService, $ImageProcessingService, + $InternalEventService, $InternalStorageService, $MetaService, $MfmService, diff --git a/packages/backend/src/core/GlobalEventService.ts b/packages/backend/src/core/GlobalEventService.ts index c0027ae129..c146811331 100644 --- a/packages/backend/src/core/GlobalEventService.ts +++ b/packages/backend/src/core/GlobalEventService.ts @@ -265,6 +265,7 @@ export interface InternalEventTypes { unmute: { muterId: MiUser['id']; muteeId: MiUser['id']; }; userListMemberAdded: { userListId: MiUserList['id']; memberId: MiUser['id']; }; userListMemberRemoved: { userListId: MiUserList['id']; memberId: MiUser['id']; }; + quantumCacheUpdated: { name: string, keys: string[] }; } type EventTypesToEventPayload<T> = EventUnionFromDictionary<UndefinedAsNullAll<SerializedAll<T>>>; @@ -353,12 +354,12 @@ export class GlobalEventService { } @bindThis - private publish(channel: StreamChannels, type: string | null, value?: any): void { + private async publish(channel: StreamChannels, type: string | null, value?: any): Promise<void> { const message = type == null ? value : value == null ? { type: type, body: null } : { type: type, body: value }; - this.redisForPub.publish(this.config.host, JSON.stringify({ + await this.redisForPub.publish(this.config.host, JSON.stringify({ channel: channel, message: message, })); @@ -370,6 +371,11 @@ export class GlobalEventService { } @bindThis + public async publishInternalEventAsync<K extends keyof InternalEventTypes>(type: K, value?: InternalEventTypes[K]): Promise<void> { + await this.publish('internal', type, typeof value === 'undefined' ? null : value); + } + + @bindThis public publishBroadcastStream<K extends keyof BroadcastTypes>(type: K, value?: BroadcastTypes[K]): void { this.publish('broadcast', type, typeof value === 'undefined' ? null : value); } diff --git a/packages/backend/src/core/InternalEventService.ts b/packages/backend/src/core/InternalEventService.ts new file mode 100644 index 0000000000..5b164b605e --- /dev/null +++ b/packages/backend/src/core/InternalEventService.ts @@ -0,0 +1,103 @@ +/* + * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; +import Redis from 'ioredis'; +import { DI } from '@/di-symbols.js'; +import { GlobalEventService } from '@/core/GlobalEventService.js'; +import type { GlobalEvents, InternalEventTypes } from '@/core/GlobalEventService.js'; +import { bindThis } from '@/decorators.js'; + +export type Listener<K extends keyof InternalEventTypes> = (value: InternalEventTypes[K], key: K, isLocal: boolean) => void | Promise<void>; + +export interface ListenerProps { + ignoreLocal?: boolean, + ignoreRemote?: boolean, +} + +@Injectable() +export class InternalEventService implements OnApplicationShutdown { + private readonly listeners = new Map<keyof InternalEventTypes, Map<Listener<keyof InternalEventTypes>, ListenerProps>>(); + + constructor( + @Inject(DI.redisForSub) + private readonly redisForSub: Redis.Redis, + + private readonly globalEventService: GlobalEventService, + ) { + this.redisForSub.on('message', this.onMessage); + } + + @bindThis + public on<K extends keyof InternalEventTypes>(type: K, listener: Listener<K>, props?: ListenerProps): void { + let set = this.listeners.get(type); + if (!set) { + set = new Map(); + this.listeners.set(type, set); + } + + // Functionally, this is just a set with metadata on the values. + set.set(listener as Listener<keyof InternalEventTypes>, props ?? {}); + } + + @bindThis + public off<K extends keyof InternalEventTypes>(type: K, listener: Listener<K>): void { + this.listeners.get(type)?.delete(listener as Listener<keyof InternalEventTypes>); + } + + @bindThis + public async emit<K extends keyof InternalEventTypes>(type: K, value: InternalEventTypes[K]): Promise<void> { + await this.emitInternal(type, value, true); + await this.globalEventService.publishInternalEventAsync(type, { ...value, _pid: process.pid }); + } + + @bindThis + private async emitInternal<K extends keyof InternalEventTypes>(type: K, value: InternalEventTypes[K], isLocal: boolean): Promise<void> { + const listeners = this.listeners.get(type); + if (!listeners) { + return; + } + + const promises: Promise<void>[] = []; + for (const [listener, props] of listeners) { + if ((isLocal && !props.ignoreLocal) || (!isLocal && !props.ignoreRemote)) { + const promise = Promise.resolve(listener(value, type, isLocal)); + promises.push(promise); + } + } + await Promise.all(promises); + } + + @bindThis + private async onMessage(_: string, data: string): Promise<void> { + const obj = JSON.parse(data); + + if (obj.channel === 'internal') { + const { type, body } = obj.message as GlobalEvents['internal']['payload']; + if (!isLocalInternalEvent(body) || body._pid !== process.pid) { + await this.emitInternal(type, body as InternalEventTypes[keyof InternalEventTypes], false); + } + } + } + + @bindThis + public dispose(): void { + this.redisForSub.off('message', this.onMessage); + this.listeners.clear(); + } + + @bindThis + public onApplicationShutdown(): void { + this.dispose(); + } +} + +interface LocalInternalEvent { + _pid: number; +} + +function isLocalInternalEvent(body: object): body is LocalInternalEvent { + return '_pid' in body && typeof(body._pid) === 'number'; +} diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index 4dceb6e953..a9f4083446 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -606,11 +606,11 @@ export class NoteCreateService implements OnApplicationShutdown { } if (data.reply == null) { - // TODO: キャッシュ - this.followingsRepository.findBy({ - followeeId: user.id, - notify: 'normal', - }).then(async followings => { + this.cacheService.userFollowersCache.fetch(user.id).then(async followingsMap => { + const followings = Array + .from(followingsMap.values()) + .filter(f => f.notify === 'normal'); + if (note.visibility !== 'specified') { const isPureRenote = this.isRenote(data) && !this.isQuote(data) ? true : false; for (const following of followings) { @@ -948,14 +948,7 @@ export class NoteCreateService implements OnApplicationShutdown { // TODO: キャッシュ? // eslint-disable-next-line prefer-const let [followings, userListMemberships] = await Promise.all([ - this.followingsRepository.find({ - where: { - followeeId: user.id, - followerHost: IsNull(), - isFollowerHibernated: false, - }, - select: ['followerId', 'withReplies'], - }), + this.cacheService.getNonHibernatedFollowers(user.id), this.userListMembershipsRepository.find({ where: { userId: user.id, @@ -1072,17 +1065,19 @@ export class NoteCreateService implements OnApplicationShutdown { }); if (hibernatedUsers.length > 0) { - this.usersRepository.update({ - id: In(hibernatedUsers.map(x => x.id)), - }, { - isHibernated: true, - }); - - this.followingsRepository.update({ - followerId: In(hibernatedUsers.map(x => x.id)), - }, { - isFollowerHibernated: true, - }); + await Promise.all([ + this.usersRepository.update({ + id: In(hibernatedUsers.map(x => x.id)), + }, { + isHibernated: true, + }), + this.followingsRepository.update({ + followerId: In(hibernatedUsers.map(x => x.id)), + }, { + isFollowerHibernated: true, + }), + this.cacheService.hibernatedUserCache.setMany(hibernatedUsers.map(x => [x.id, true])), + ]); } } diff --git a/packages/backend/src/core/NoteEditService.ts b/packages/backend/src/core/NoteEditService.ts index 34af1c76dd..a359381573 100644 --- a/packages/backend/src/core/NoteEditService.ts +++ b/packages/backend/src/core/NoteEditService.ts @@ -833,14 +833,7 @@ export class NoteEditService implements OnApplicationShutdown { // TODO: キャッシュ? // eslint-disable-next-line prefer-const let [followings, userListMemberships] = await Promise.all([ - this.followingsRepository.find({ - where: { - followeeId: user.id, - followerHost: IsNull(), - isFollowerHibernated: false, - }, - select: ['followerId', 'withReplies'], - }), + this.cacheService.getNonHibernatedFollowers(user.id), this.userListMembershipsRepository.find({ where: { userId: user.id, @@ -957,17 +950,19 @@ export class NoteEditService implements OnApplicationShutdown { }); if (hibernatedUsers.length > 0) { - this.usersRepository.update({ - id: In(hibernatedUsers.map(x => x.id)), - }, { - isHibernated: true, - }); - - this.followingsRepository.update({ - followerId: In(hibernatedUsers.map(x => x.id)), - }, { - isFollowerHibernated: true, - }); + await Promise.all([ + this.usersRepository.update({ + id: In(hibernatedUsers.map(x => x.id)), + }, { + isHibernated: true, + }), + this.followingsRepository.update({ + followerId: In(hibernatedUsers.map(x => x.id)), + }, { + isFollowerHibernated: true, + }), + this.cacheService.hibernatedUserCache.setMany(hibernatedUsers.map(x => [x.id, true])), + ]); } } diff --git a/packages/backend/src/core/NotificationService.ts b/packages/backend/src/core/NotificationService.ts index 0f05f5425d..2ce7bdb5a9 100644 --- a/packages/backend/src/core/NotificationService.ts +++ b/packages/backend/src/core/NotificationService.ts @@ -113,27 +113,27 @@ export class NotificationService implements OnApplicationShutdown { } if (recieveConfig?.type === 'following') { - const isFollowing = await this.cacheService.userFollowingsCache.fetch(notifieeId).then(followings => Object.hasOwn(followings, notifierId)); + const isFollowing = await this.cacheService.userFollowingsCache.fetch(notifieeId).then(followings => followings.has(notifierId)); if (!isFollowing) { return null; } } else if (recieveConfig?.type === 'follower') { - const isFollower = await this.cacheService.userFollowingsCache.fetch(notifierId).then(followings => Object.hasOwn(followings, notifieeId)); + const isFollower = await this.cacheService.userFollowingsCache.fetch(notifierId).then(followings => followings.has(notifieeId)); if (!isFollower) { return null; } } else if (recieveConfig?.type === 'mutualFollow') { const [isFollowing, isFollower] = await Promise.all([ - this.cacheService.userFollowingsCache.fetch(notifieeId).then(followings => Object.hasOwn(followings, notifierId)), - this.cacheService.userFollowingsCache.fetch(notifierId).then(followings => Object.hasOwn(followings, notifieeId)), + this.cacheService.userFollowingsCache.fetch(notifieeId).then(followings => followings.has(notifierId)), + this.cacheService.userFollowingsCache.fetch(notifierId).then(followings => followings.has(notifieeId)), ]); if (!(isFollowing && isFollower)) { return null; } } else if (recieveConfig?.type === 'followingOrFollower') { const [isFollowing, isFollower] = await Promise.all([ - this.cacheService.userFollowingsCache.fetch(notifieeId).then(followings => Object.hasOwn(followings, notifierId)), - this.cacheService.userFollowingsCache.fetch(notifierId).then(followings => Object.hasOwn(followings, notifieeId)), + this.cacheService.userFollowingsCache.fetch(notifieeId).then(followings => followings.has(notifierId)), + this.cacheService.userFollowingsCache.fetch(notifierId).then(followings => followings.has(notifieeId)), ]); if (!isFollowing && !isFollower) { return null; diff --git a/packages/backend/src/core/PushNotificationService.ts b/packages/backend/src/core/PushNotificationService.ts index 9333c1ebc5..e3f10d4504 100644 --- a/packages/backend/src/core/PushNotificationService.ts +++ b/packages/backend/src/core/PushNotificationService.ts @@ -12,7 +12,8 @@ import type { Packed } from '@/misc/json-schema.js'; import { getNoteSummary } from '@/misc/get-note-summary.js'; import type { MiMeta, MiSwSubscription, SwSubscriptionsRepository } from '@/models/_.js'; import { bindThis } from '@/decorators.js'; -import { RedisKVCache } from '@/misc/cache.js'; +import { QuantumKVCache } from '@/misc/QuantumKVCache.js'; +import { InternalEventService } from '@/core/InternalEventService.js'; // Defined also packages/sw/types.ts#L13 type PushNotificationsTypes = { @@ -48,7 +49,7 @@ function truncateBody<T extends keyof PushNotificationsTypes>(type: T, body: Pus @Injectable() export class PushNotificationService implements OnApplicationShutdown { - private subscriptionsCache: RedisKVCache<MiSwSubscription[]>; + private subscriptionsCache: QuantumKVCache<MiSwSubscription[]>; constructor( @Inject(DI.config) @@ -62,13 +63,11 @@ export class PushNotificationService implements OnApplicationShutdown { @Inject(DI.swSubscriptionsRepository) private swSubscriptionsRepository: SwSubscriptionsRepository, + private readonly internalEventService: InternalEventService, ) { - this.subscriptionsCache = new RedisKVCache<MiSwSubscription[]>(this.redisClient, 'userSwSubscriptions', { + this.subscriptionsCache = new QuantumKVCache<MiSwSubscription[]>(this.internalEventService, 'userSwSubscriptions', { lifetime: 1000 * 60 * 60 * 1, // 1h - memoryCacheLifetime: 1000 * 60 * 3, // 3m fetcher: (key) => this.swSubscriptionsRepository.findBy({ userId: key }), - toRedisConverter: (value) => JSON.stringify(value), - fromRedisConverter: (value) => JSON.parse(value), }); } @@ -114,8 +113,8 @@ export class PushNotificationService implements OnApplicationShutdown { endpoint: subscription.endpoint, auth: subscription.auth, publickey: subscription.publickey, - }).then(() => { - this.refreshCache(userId); + }).then(async () => { + await this.refreshCache(userId); }); } }); @@ -123,8 +122,8 @@ export class PushNotificationService implements OnApplicationShutdown { } @bindThis - public refreshCache(userId: string): void { - this.subscriptionsCache.refresh(userId); + public async refreshCache(userId: string): Promise<void> { + await this.subscriptionsCache.refresh(userId); } @bindThis diff --git a/packages/backend/src/core/QueryService.ts b/packages/backend/src/core/QueryService.ts index 2d8ea51e65..d0e281e20c 100644 --- a/packages/backend/src/core/QueryService.ts +++ b/packages/backend/src/core/QueryService.ts @@ -94,7 +94,7 @@ export class QueryService { @bindThis public generateBlockQueryForUsers<E extends ObjectLiteral>(q: SelectQueryBuilder<E>, me: { id: MiUser['id'] }): SelectQueryBuilder<E> { this.andNotBlockingUser(q, ':meId', 'user.id'); - this.andNotBlockingUser(q, 'user.id', ':me.id'); + this.andNotBlockingUser(q, 'user.id', ':meId'); return q.setParameters({ meId: me.id }); } diff --git a/packages/backend/src/core/ReactionService.ts b/packages/backend/src/core/ReactionService.ts index c23bb51178..8d2dc7d4e8 100644 --- a/packages/backend/src/core/ReactionService.ts +++ b/packages/backend/src/core/ReactionService.ts @@ -122,7 +122,7 @@ export class ReactionService { } // check visibility - if (!await this.noteEntityService.isVisibleForMe(note, user.id)) { + if (!await this.noteEntityService.isVisibleForMe(note, user.id, { me: user })) { throw new IdentifiableError('68e9d2d1-48bf-42c2-b90a-b20e09fd3d48', 'Note not accessible for you.'); } diff --git a/packages/backend/src/core/UserBlockingService.ts b/packages/backend/src/core/UserBlockingService.ts index 8da1bb2092..1a1e7c4778 100644 --- a/packages/backend/src/core/UserBlockingService.ts +++ b/packages/backend/src/core/UserBlockingService.ts @@ -77,8 +77,10 @@ export class UserBlockingService implements OnModuleInit { await this.blockingsRepository.insert(blocking); - this.cacheService.userBlockingCache.refresh(blocker.id); - this.cacheService.userBlockedCache.refresh(blockee.id); + await Promise.all([ + this.cacheService.userBlockingCache.delete(blocker.id), + this.cacheService.userBlockedCache.delete(blockee.id), + ]); this.globalEventService.publishInternalEvent('blockingCreated', { blockerId: blocker.id, @@ -168,8 +170,10 @@ export class UserBlockingService implements OnModuleInit { await this.blockingsRepository.delete(blocking.id); - this.cacheService.userBlockingCache.refresh(blocker.id); - this.cacheService.userBlockedCache.refresh(blockee.id); + await Promise.all([ + this.cacheService.userBlockingCache.delete(blocker.id), + this.cacheService.userBlockedCache.delete(blockee.id), + ]); this.globalEventService.publishInternalEvent('blockingDeleted', { blockerId: blocker.id, diff --git a/packages/backend/src/core/UserFollowingService.ts b/packages/backend/src/core/UserFollowingService.ts index 897b950022..8470872eac 100644 --- a/packages/backend/src/core/UserFollowingService.ts +++ b/packages/backend/src/core/UserFollowingService.ts @@ -29,6 +29,7 @@ import { AccountMoveService } from '@/core/AccountMoveService.js'; import { UtilityService } from '@/core/UtilityService.js'; import type { ThinUser } from '@/queue/types.js'; import { LoggerService } from '@/core/LoggerService.js'; +import { InternalEventService } from '@/core/InternalEventService.js'; import type Logger from '../logger.js'; type Local = MiLocalUser | { @@ -86,6 +87,7 @@ export class UserFollowingService implements OnModuleInit { private accountMoveService: AccountMoveService, private perUserFollowingChart: PerUserFollowingChart, private instanceChart: InstanceChart, + private readonly internalEventService: InternalEventService, loggerService: LoggerService, ) { @@ -145,12 +147,7 @@ export class UserFollowingService implements OnModuleInit { if (blocked) throw new IdentifiableError('3338392a-f764-498d-8855-db939dcf8c48', 'blocked'); } - if (await this.followingsRepository.exists({ - where: { - followerId: follower.id, - followeeId: followee.id, - }, - })) { + if (await this.cacheService.isFollowing(follower, followee)) { // すでにフォロー関係が存在している場合 if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee)) { // リモート → ローカル: acceptを送り返しておしまい @@ -178,24 +175,14 @@ export class UserFollowingService implements OnModuleInit { let autoAccept = false; // 鍵アカウントであっても、既にフォローされていた場合はスルー - const isFollowing = await this.followingsRepository.exists({ - where: { - followerId: follower.id, - followeeId: followee.id, - }, - }); + const isFollowing = await this.cacheService.isFollowing(follower, followee); if (isFollowing) { autoAccept = true; } // フォローしているユーザーは自動承認オプション if (!autoAccept && (this.userEntityService.isLocalUser(followee) && followeeProfile.autoAcceptFollowed)) { - const isFollowed = await this.followingsRepository.exists({ - where: { - followerId: followee.id, - followeeId: follower.id, - }, - }); + const isFollowed = await this.cacheService.isFollowing(followee, follower); // intentionally reversed parameters if (isFollowed) autoAccept = true; } @@ -204,12 +191,7 @@ export class UserFollowingService implements OnModuleInit { if (followee.isLocked && !autoAccept) { autoAccept = !!(await this.accountMoveService.validateAlsoKnownAs( follower, - (oldSrc, newSrc) => this.followingsRepository.exists({ - where: { - followeeId: followee.id, - followerId: newSrc.id, - }, - }), + (oldSrc, newSrc) => this.cacheService.isFollowing(newSrc, followee), true, )); } @@ -264,7 +246,8 @@ export class UserFollowingService implements OnModuleInit { } }); - this.cacheService.userFollowingsCache.refresh(follower.id); + // Handled by CacheService + //this.cacheService.userFollowingsCache.refresh(follower.id); const requestExist = await this.followRequestsRepository.exists({ where: { @@ -291,7 +274,7 @@ export class UserFollowingService implements OnModuleInit { }, followee.id); } - this.globalEventService.publishInternalEvent('follow', { followerId: follower.id, followeeId: followee.id }); + await this.internalEventService.emit('follow', { followerId: follower.id, followeeId: followee.id }); const [followeeUser, followerUser] = await Promise.all([ this.usersRepository.findOneByOrFail({ id: followee.id }), @@ -363,31 +346,29 @@ export class UserFollowingService implements OnModuleInit { }, silent = false, ): Promise<void> { - const following = await this.followingsRepository.findOne({ - relations: { - follower: true, - followee: true, - }, - where: { - followerId: follower.id, - followeeId: followee.id, - }, - }); + const [ + followerUser, + followeeUser, + following, + ] = await Promise.all([ + this.cacheService.findUserById(follower.id), + this.cacheService.findUserById(followee.id), + this.cacheService.userFollowingsCache.fetch(follower.id).then(fs => fs.get(followee.id)), + ]); - if (following === null || !following.follower || !following.followee) { + if (following == null) { this.logger.warn('フォロー解除がリクエストされましたがフォローしていませんでした'); return; } await this.followingsRepository.delete(following.id); + await this.internalEventService.emit('unfollow', { followerId: follower.id, followeeId: followee.id }); - this.cacheService.userFollowingsCache.refresh(follower.id); - - this.decrementFollowing(following.follower, following.followee); + this.decrementFollowing(followerUser, followeeUser); if (!silent && this.userEntityService.isLocalUser(follower)) { // Publish unfollow event - this.userEntityService.pack(followee.id, follower, { + this.userEntityService.pack(followeeUser, follower, { schema: 'UserDetailedNotMe', }).then(async packed => { this.globalEventService.publishMainStream(follower.id, 'unfollow', packed); @@ -412,8 +393,6 @@ export class UserFollowingService implements OnModuleInit { follower: MiUser, followee: MiUser, ): Promise<void> { - this.globalEventService.publishInternalEvent('unfollow', { followerId: follower.id, followeeId: followee.id }); - // Neither followee nor follower has moved. if (!follower.movedToUri && !followee.movedToUri) { //#region Decrement following / followers counts @@ -687,22 +666,22 @@ export class UserFollowingService implements OnModuleInit { */ @bindThis private async removeFollow(followee: Both, follower: Both): Promise<void> { - const following = await this.followingsRepository.findOne({ - relations: { - followee: true, - follower: true, - }, - where: { - followeeId: followee.id, - followerId: follower.id, - }, - }); + const [ + followerUser, + followeeUser, + following, + ] = await Promise.all([ + this.cacheService.findUserById(follower.id), + this.cacheService.findUserById(followee.id), + this.cacheService.userFollowingsCache.fetch(follower.id).then(fs => fs.get(followee.id)), + ]); - if (!following || !following.followee || !following.follower) return; + if (!following) return; await this.followingsRepository.delete(following.id); + await this.internalEventService.emit('unfollow', { followerId: follower.id, followeeId: followee.id }); - this.decrementFollowing(following.follower, following.followee); + this.decrementFollowing(followerUser, followeeUser); } /** @@ -733,36 +712,26 @@ export class UserFollowingService implements OnModuleInit { } @bindThis - public getFollowees(userId: MiUser['id']) { - return this.followingsRepository.createQueryBuilder('following') - .select('following.followeeId') - .where('following.followerId = :followerId', { followerId: userId }) - .getMany(); + public async getFollowees(userId: MiUser['id']) { + const followings = await this.cacheService.userFollowingsCache.fetch(userId); + return Array.from(followings.values()); } @bindThis - public isFollowing(followerId: MiUser['id'], followeeId: MiUser['id']) { - return this.followingsRepository.exists({ - where: { - followerId, - followeeId, - }, - }); + public async isFollowing(followerId: MiUser['id'], followeeId: MiUser['id']) { + return this.cacheService.isFollowing(followerId, followeeId); } @bindThis public async isMutual(aUserId: MiUser['id'], bUserId: MiUser['id']) { - const count = await this.followingsRepository.createQueryBuilder('following') - .where(new Brackets(qb => { - qb.where('following.followerId = :aUserId', { aUserId }) - .andWhere('following.followeeId = :bUserId', { bUserId }); - })) - .orWhere(new Brackets(qb => { - qb.where('following.followerId = :bUserId', { bUserId }) - .andWhere('following.followeeId = :aUserId', { aUserId }); - })) - .getCount(); + const [ + isFollowing, + isFollowed, + ] = await Promise.all([ + this.isFollowing(aUserId, bUserId), + this.isFollowing(bUserId, aUserId), + ]); - return count === 2; + return isFollowing && isFollowed; } } diff --git a/packages/backend/src/core/UserKeypairService.ts b/packages/backend/src/core/UserKeypairService.ts index 92d61cd103..d8a67d273b 100644 --- a/packages/backend/src/core/UserKeypairService.ts +++ b/packages/backend/src/core/UserKeypairService.ts @@ -7,14 +7,14 @@ import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import * as Redis from 'ioredis'; import type { MiUser } from '@/models/User.js'; import type { UserKeypairsRepository } from '@/models/_.js'; -import { RedisKVCache } from '@/misc/cache.js'; +import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js'; import type { MiUserKeypair } from '@/models/UserKeypair.js'; import { DI } from '@/di-symbols.js'; import { bindThis } from '@/decorators.js'; @Injectable() export class UserKeypairService implements OnApplicationShutdown { - private cache: RedisKVCache<MiUserKeypair>; + private cache: MemoryKVCache<MiUserKeypair>; constructor( @Inject(DI.redis) @@ -23,18 +23,12 @@ export class UserKeypairService implements OnApplicationShutdown { @Inject(DI.userKeypairsRepository) private userKeypairsRepository: UserKeypairsRepository, ) { - this.cache = new RedisKVCache<MiUserKeypair>(this.redisClient, 'userKeypair', { - lifetime: 1000 * 60 * 60 * 24, // 24h - memoryCacheLifetime: 1000 * 60 * 60, // 1h - fetcher: (key) => this.userKeypairsRepository.findOneByOrFail({ userId: key }), - toRedisConverter: (value) => JSON.stringify(value), - fromRedisConverter: (value) => JSON.parse(value), - }); + this.cache = new MemoryKVCache<MiUserKeypair>(1000 * 60 * 60 * 24); // 24h } @bindThis public async getUserKeypair(userId: MiUser['id']): Promise<MiUserKeypair> { - return await this.cache.fetch(userId); + return await this.cache.fetch(userId, () => this.userKeypairsRepository.findOneByOrFail({ userId })); } @bindThis diff --git a/packages/backend/src/core/UserListService.ts b/packages/backend/src/core/UserListService.ts index e7200ab1bf..b4486b9808 100644 --- a/packages/backend/src/core/UserListService.ts +++ b/packages/backend/src/core/UserListService.ts @@ -11,21 +11,22 @@ import type { MiUser } from '@/models/User.js'; import type { MiUserList } from '@/models/UserList.js'; import type { MiUserListMembership } from '@/models/UserListMembership.js'; import { IdService } from '@/core/IdService.js'; -import type { GlobalEvents } from '@/core/GlobalEventService.js'; +import type { GlobalEvents, InternalEventTypes } from '@/core/GlobalEventService.js'; import { GlobalEventService } from '@/core/GlobalEventService.js'; import { DI } from '@/di-symbols.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { bindThis } from '@/decorators.js'; import { QueueService } from '@/core/QueueService.js'; -import { RedisKVCache } from '@/misc/cache.js'; +import { QuantumKVCache } from '@/misc/QuantumKVCache.js'; import { RoleService } from '@/core/RoleService.js'; import { SystemAccountService } from '@/core/SystemAccountService.js'; +import { InternalEventService } from '@/core/InternalEventService.js'; @Injectable() export class UserListService implements OnApplicationShutdown, OnModuleInit { public static TooManyUsersError = class extends Error {}; - public membersCache: RedisKVCache<Set<string>>; + public membersCache: QuantumKVCache<Set<string>>; private roleService: RoleService; constructor( @@ -48,16 +49,15 @@ export class UserListService implements OnApplicationShutdown, OnModuleInit { private globalEventService: GlobalEventService, private queueService: QueueService, private systemAccountService: SystemAccountService, + private readonly internalEventService: InternalEventService, ) { - this.membersCache = new RedisKVCache<Set<string>>(this.redisClient, 'userListMembers', { + this.membersCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userListMembers', { lifetime: 1000 * 60 * 30, // 30m - memoryCacheLifetime: 1000 * 60, // 1m fetcher: (key) => this.userListMembershipsRepository.find({ where: { userListId: key }, select: ['userId'] }).then(xs => new Set(xs.map(x => x.userId))), - toRedisConverter: (value) => JSON.stringify(Array.from(value)), - fromRedisConverter: (value) => new Set(JSON.parse(value)), }); - this.redisForSub.on('message', this.onMessage); + this.internalEventService.on('userListMemberAdded', this.onMessage); + this.internalEventService.on('userListMemberRemoved', this.onMessage); } async onModuleInit() { @@ -65,15 +65,12 @@ export class UserListService implements OnApplicationShutdown, OnModuleInit { } @bindThis - private async onMessage(_: string, data: string): Promise<void> { - const obj = JSON.parse(data); - - if (obj.channel === 'internal') { - const { type, body } = obj.message as GlobalEvents['internal']['payload']; + private async onMessage<E extends 'userListMemberAdded' | 'userListMemberRemoved'>(body: InternalEventTypes[E], type: E): Promise<void> { + { switch (type) { case 'userListMemberAdded': { const { userListId, memberId } = body; - const members = await this.membersCache.get(userListId); + const members = this.membersCache.get(userListId); if (members) { members.add(memberId); } @@ -81,7 +78,7 @@ export class UserListService implements OnApplicationShutdown, OnModuleInit { } case 'userListMemberRemoved': { const { userListId, memberId } = body; - const members = await this.membersCache.get(userListId); + const members = this.membersCache.get(userListId); if (members) { members.delete(memberId); } @@ -150,7 +147,8 @@ export class UserListService implements OnApplicationShutdown, OnModuleInit { @bindThis public dispose(): void { - this.redisForSub.off('message', this.onMessage); + this.internalEventService.off('userListMemberAdded', this.onMessage); + this.internalEventService.off('userListMemberRemoved', this.onMessage); this.membersCache.dispose(); } diff --git a/packages/backend/src/core/UserMutingService.ts b/packages/backend/src/core/UserMutingService.ts index 06643be5fb..c15a979d0f 100644 --- a/packages/backend/src/core/UserMutingService.ts +++ b/packages/backend/src/core/UserMutingService.ts @@ -32,7 +32,7 @@ export class UserMutingService { muteeId: target.id, }); - this.cacheService.userMutingsCache.refresh(user.id); + await this.cacheService.userMutingsCache.delete(user.id); } @bindThis @@ -43,9 +43,6 @@ export class UserMutingService { id: In(mutings.map(m => m.id)), }); - const muterIds = [...new Set(mutings.map(m => m.muterId))]; - for (const muterId of muterIds) { - this.cacheService.userMutingsCache.refresh(muterId); - } + await this.cacheService.userMutingsCache.deleteMany(mutings.map(m => m.muterId)); } } diff --git a/packages/backend/src/core/UserRenoteMutingService.ts b/packages/backend/src/core/UserRenoteMutingService.ts index bdc5e23f4b..7c0693f216 100644 --- a/packages/backend/src/core/UserRenoteMutingService.ts +++ b/packages/backend/src/core/UserRenoteMutingService.ts @@ -33,7 +33,7 @@ export class UserRenoteMutingService { muteeId: target.id, }); - await this.cacheService.renoteMutingsCache.refresh(user.id); + await this.cacheService.renoteMutingsCache.delete(user.id); } @bindThis @@ -44,9 +44,6 @@ export class UserRenoteMutingService { id: In(mutings.map(m => m.id)), }); - const muterIds = [...new Set(mutings.map(m => m.muterId))]; - for (const muterId of muterIds) { - await this.cacheService.renoteMutingsCache.refresh(muterId); - } + await this.cacheService.renoteMutingsCache.deleteMany(mutings.map(m => m.muterId)); } } diff --git a/packages/backend/src/core/UserService.ts b/packages/backend/src/core/UserService.ts index 1f471513f3..4a04910105 100644 --- a/packages/backend/src/core/UserService.ts +++ b/packages/backend/src/core/UserService.ts @@ -10,6 +10,7 @@ import { DI } from '@/di-symbols.js'; import { bindThis } from '@/decorators.js'; import { SystemWebhookService } from '@/core/SystemWebhookService.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; +import { CacheService } from '@/core/CacheService.js'; @Injectable() export class UserService { @@ -20,6 +21,7 @@ export class UserService { private followingsRepository: FollowingsRepository, private systemWebhookService: SystemWebhookService, private userEntityService: UserEntityService, + private readonly cacheService: CacheService, ) { } @@ -38,14 +40,17 @@ export class UserService { }); const wokeUp = result.isHibernated; if (wokeUp) { - this.usersRepository.update(user.id, { - isHibernated: false, - }); - this.followingsRepository.update({ - followerId: user.id, - }, { - isFollowerHibernated: false, - }); + await Promise.all([ + this.usersRepository.update(user.id, { + isHibernated: false, + }), + this.followingsRepository.update({ + followerId: user.id, + }, { + isFollowerHibernated: false, + }), + this.cacheService.hibernatedUserCache.set(user.id, false), + ]); } } else { this.usersRepository.update(user.id, { diff --git a/packages/backend/src/core/UserSuspendService.ts b/packages/backend/src/core/UserSuspendService.ts index 30dcaa6f7d..f375dff862 100644 --- a/packages/backend/src/core/UserSuspendService.ts +++ b/packages/backend/src/core/UserSuspendService.ts @@ -16,6 +16,7 @@ import { bindThis } from '@/decorators.js'; import { RelationshipJobData } from '@/queue/types.js'; import { ModerationLogService } from '@/core/ModerationLogService.js'; import { isSystemAccount } from '@/misc/is-system-account.js'; +import { CacheService } from '@/core/CacheService.js'; @Injectable() export class UserSuspendService { @@ -34,6 +35,7 @@ export class UserSuspendService { private globalEventService: GlobalEventService, private apRendererService: ApRendererService, private moderationLogService: ModerationLogService, + private readonly cacheService: CacheService, ) { } @@ -143,12 +145,8 @@ export class UserSuspendService { @bindThis private async unFollowAll(follower: MiUser) { - const followings = await this.followingsRepository.find({ - where: { - followerId: follower.id, - followeeId: Not(IsNull()), - }, - }); + const followings = await this.cacheService.userFollowingsCache.fetch(follower.id) + .then(fs => Array.from(fs.values()).filter(f => f.followeeHost != null)); const jobs: RelationshipJobData[] = []; for (const following of followings) { diff --git a/packages/backend/src/core/activitypub/ApDeliverManagerService.ts b/packages/backend/src/core/activitypub/ApDeliverManagerService.ts index 746af41f55..91f6f2d9fc 100644 --- a/packages/backend/src/core/activitypub/ApDeliverManagerService.ts +++ b/packages/backend/src/core/activitypub/ApDeliverManagerService.ts @@ -5,7 +5,6 @@ import { Inject, Injectable } from '@nestjs/common'; import { IsNull, Not } from 'typeorm'; -import { UnrecoverableError } from 'bullmq'; import { DI } from '@/di-symbols.js'; import type { FollowingsRepository } from '@/models/_.js'; import type { MiLocalUser, MiRemoteUser, MiUser } from '@/models/User.js'; @@ -14,6 +13,7 @@ import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { bindThis } from '@/decorators.js'; import type { IActivity } from '@/core/activitypub/type.js'; import { ThinUser } from '@/queue/types.js'; +import { CacheService } from '@/core/CacheService.js'; interface IRecipe { type: string; @@ -41,16 +41,14 @@ class DeliverManager { /** * Constructor - * @param userEntityService - * @param followingsRepository * @param queueService + * @param cacheService * @param actor Actor * @param activity Activity to deliver */ constructor( - private userEntityService: UserEntityService, - private followingsRepository: FollowingsRepository, private queueService: QueueService, + private readonly cacheService: CacheService, actor: { id: MiUser['id']; host: null; }, activity: IActivity | null, @@ -114,24 +112,23 @@ class DeliverManager { // Process follower recipes first to avoid duplication when processing direct recipes later. if (this.recipes.some(r => isFollowers(r))) { // followers deliver - // TODO: SELECT DISTINCT ON ("followerSharedInbox") "followerSharedInbox" みたいな問い合わせにすればよりパフォーマンス向上できそう // ただ、sharedInboxがnullなリモートユーザーも稀におり、その対応ができなさそう? - const followers = await this.followingsRepository.find({ - where: { - followeeId: this.actor.id, - followerHost: Not(IsNull()), - }, - select: { - followerSharedInbox: true, - followerInbox: true, - followerId: true, - }, - }); + const followers = await this.cacheService.userFollowersCache + .fetch(this.actor.id) + .then(f => Array + .from(f.values()) + .filter(f => f.followerHost != null) + .map(f => ({ + followerInbox: f.followerInbox, + followerSharedInbox: f.followerSharedInbox, + }))); for (const following of followers) { - const inbox = following.followerSharedInbox ?? following.followerInbox; - if (inbox === null) throw new UnrecoverableError(`deliver failed for ${this.actor.id}: follower ${following.followerId} inbox is null`); - inboxes.set(inbox, following.followerSharedInbox != null); + if (following.followerSharedInbox) { + inboxes.set(following.followerSharedInbox, true); + } else if (following.followerInbox) { + inboxes.set(following.followerInbox, false); + } } } @@ -153,11 +150,8 @@ class DeliverManager { @Injectable() export class ApDeliverManagerService { constructor( - @Inject(DI.followingsRepository) - private followingsRepository: FollowingsRepository, - - private userEntityService: UserEntityService, private queueService: QueueService, + private readonly cacheService: CacheService, ) { } @@ -169,9 +163,8 @@ export class ApDeliverManagerService { @bindThis public async deliverToFollowers(actor: { id: MiLocalUser['id']; host: null; }, activity: IActivity): Promise<void> { const manager = new DeliverManager( - this.userEntityService, - this.followingsRepository, this.queueService, + this.cacheService, actor, activity, ); @@ -188,9 +181,8 @@ export class ApDeliverManagerService { @bindThis public async deliverToUser(actor: { id: MiLocalUser['id']; host: null; }, activity: IActivity, to: MiRemoteUser): Promise<void> { const manager = new DeliverManager( - this.userEntityService, - this.followingsRepository, this.queueService, + this.cacheService, actor, activity, ); @@ -207,9 +199,8 @@ export class ApDeliverManagerService { @bindThis public async deliverToUsers(actor: { id: MiLocalUser['id']; host: null; }, activity: IActivity, targets: MiRemoteUser[]): Promise<void> { const manager = new DeliverManager( - this.userEntityService, - this.followingsRepository, this.queueService, + this.cacheService, actor, activity, ); @@ -220,9 +211,8 @@ export class ApDeliverManagerService { @bindThis public createDeliverManager(actor: { id: MiUser['id']; host: null; }, activity: IActivity | null): DeliverManager { return new DeliverManager( - this.userEntityService, - this.followingsRepository, this.queueService, + this.cacheService, actor, activity, diff --git a/packages/backend/src/core/activitypub/ApInboxService.ts b/packages/backend/src/core/activitypub/ApInboxService.ts index b384ec58c5..009d4cbd39 100644 --- a/packages/backend/src/core/activitypub/ApInboxService.ts +++ b/packages/backend/src/core/activitypub/ApInboxService.ts @@ -37,6 +37,7 @@ import InstanceChart from '@/core/chart/charts/instance.js'; import FederationChart from '@/core/chart/charts/federation.js'; import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js'; import { UpdateInstanceQueue } from '@/core/UpdateInstanceQueue.js'; +import { CacheService } from '@/core/CacheService.js'; import { getApHrefNullable, getApId, getApIds, getApType, getNullableApId, isAccept, isActor, isAdd, isAnnounce, isApObject, isBlock, isCollectionOrOrderedCollection, isCreate, isDelete, isFlag, isFollow, isLike, isDislike, isMove, isPost, isReject, isRemove, isTombstone, isUndo, isUpdate, validActor, validPost, isActivity, IObjectWithId } from './type.js'; import { ApNoteService } from './models/ApNoteService.js'; import { ApLoggerService } from './ApLoggerService.js'; @@ -98,6 +99,7 @@ export class ApInboxService { private readonly instanceChart: InstanceChart, private readonly federationChart: FederationChart, private readonly updateInstanceQueue: UpdateInstanceQueue, + private readonly cacheService: CacheService, ) { this.logger = this.apLoggerService.logger; } @@ -365,7 +367,7 @@ export class ApInboxService { const renote = await this.apNoteService.resolveNote(target, { resolver, sentFrom: getApId(target) }); if (renote == null) return 'announce target is null'; - if (!await this.noteEntityService.isVisibleForMe(renote, actor.id)) { + if (!await this.noteEntityService.isVisibleForMe(renote, actor.id, { me: actor })) { return 'skip: invalid actor for this activity'; } @@ -766,12 +768,7 @@ export class ApInboxService { return 'skip: follower not found'; } - const isFollowing = await this.followingsRepository.exists({ - where: { - followerId: follower.id, - followeeId: actor.id, - }, - }); + const isFollowing = await this.cacheService.userFollowingsCache.fetch(follower.id).then(f => f.has(actor.id)); if (isFollowing) { await this.userFollowingService.unfollow(follower, actor); @@ -830,12 +827,7 @@ export class ApInboxService { }, }); - const isFollowing = await this.followingsRepository.exists({ - where: { - followerId: actor.id, - followeeId: followee.id, - }, - }); + const isFollowing = await this.cacheService.userFollowingsCache.fetch(actor.id).then(f => f.has(followee.id)); if (requestExist) { await this.userFollowingService.cancelFollowRequest(followee, actor); diff --git a/packages/backend/src/core/activitypub/models/ApPersonService.ts b/packages/backend/src/core/activitypub/models/ApPersonService.ts index b7aa036068..29f7459219 100644 --- a/packages/backend/src/core/activitypub/models/ApPersonService.ts +++ b/packages/backend/src/core/activitypub/models/ApPersonService.ts @@ -741,10 +741,17 @@ export class ApPersonService implements OnModuleInit, OnApplicationShutdown { this.hashtagService.updateUsertags(exist, tags); // 該当ユーザーが既にフォロワーになっていた場合はFollowingもアップデートする - await this.followingsRepository.update( - { followerId: exist.id }, - { followerSharedInbox: person.sharedInbox ?? person.endpoints?.sharedInbox ?? null }, - ); + if (exist.inbox !== person.inbox || exist.sharedInbox !== (person.sharedInbox ?? person.endpoints?.sharedInbox)) { + await this.followingsRepository.update( + { followerId: exist.id }, + { + followerInbox: person.inbox, + followerSharedInbox: person.sharedInbox ?? person.endpoints?.sharedInbox ?? null, + }, + ); + + await this.cacheService.refreshFollowRelationsFor(exist.id); + } await this.updateFeatured(exist.id, resolver).catch(err => { // Permanent error implies hidden or inaccessible, which is a normal thing. diff --git a/packages/backend/src/core/chart/charts/federation.ts b/packages/backend/src/core/chart/charts/federation.ts index b6db6f5454..4bbb5437cc 100644 --- a/packages/backend/src/core/chart/charts/federation.ts +++ b/packages/backend/src/core/chart/charts/federation.ts @@ -44,6 +44,7 @@ export default class FederationChart extends Chart<typeof schema> { // eslint-di } protected async tickMinor(): Promise<Partial<KVs<typeof schema>>> { + // TODO optimization: replace these with exists() const pubsubSubQuery = this.followingsRepository.createQueryBuilder('f') .select('f.followerHost') .where('f.followerHost IS NOT NULL'); diff --git a/packages/backend/src/core/chart/charts/per-user-following.ts b/packages/backend/src/core/chart/charts/per-user-following.ts index 588ac638de..8d75a30e9a 100644 --- a/packages/backend/src/core/chart/charts/per-user-following.ts +++ b/packages/backend/src/core/chart/charts/per-user-following.ts @@ -15,6 +15,7 @@ import Chart from '../core.js'; import { ChartLoggerService } from '../ChartLoggerService.js'; import { name, schema } from './entities/per-user-following.js'; import type { KVs } from '../core.js'; +import { CacheService } from '@/core/CacheService.js'; /** * ユーザーごとのフォローに関するチャート @@ -31,23 +32,25 @@ export default class PerUserFollowingChart extends Chart<typeof schema> { // esl private appLockService: AppLockService, private userEntityService: UserEntityService, private chartLoggerService: ChartLoggerService, + private readonly cacheService: CacheService, ) { super(db, (k) => appLockService.getChartInsertLock(k), chartLoggerService.logger, name, schema, true); } protected async tickMajor(group: string): Promise<Partial<KVs<typeof schema>>> { const [ - localFollowingsCount, - localFollowersCount, - remoteFollowingsCount, - remoteFollowersCount, + followees, + followers, ] = await Promise.all([ - this.followingsRepository.countBy({ followerId: group, followeeHost: IsNull() }), - this.followingsRepository.countBy({ followeeId: group, followerHost: IsNull() }), - this.followingsRepository.countBy({ followerId: group, followeeHost: Not(IsNull()) }), - this.followingsRepository.countBy({ followeeId: group, followerHost: Not(IsNull()) }), + this.cacheService.userFollowingsCache.fetch(group).then(fs => Array.from(fs.values())), + this.cacheService.userFollowersCache.fetch(group).then(fs => Array.from(fs.values())), ]); + const localFollowingsCount = followees.reduce((sum, f) => sum + (f.followeeHost == null ? 1 : 0), 0); + const localFollowersCount = followers.reduce((sum, f) => sum + (f.followerHost == null ? 1 : 0), 0); + const remoteFollowingsCount = followees.reduce((sum, f) => sum + (f.followeeHost == null ? 0 : 1), 0); + const remoteFollowersCount = followers.reduce((sum, f) => sum + (f.followerHost == null ? 0 : 1), 0); + return { 'local.followings.total': localFollowingsCount, 'local.followers.total': localFollowersCount, diff --git a/packages/backend/src/core/entities/NoteEntityService.ts b/packages/backend/src/core/entities/NoteEntityService.ts index d2373a70a2..4248fde77f 100644 --- a/packages/backend/src/core/entities/NoteEntityService.ts +++ b/packages/backend/src/core/entities/NoteEntityService.ts @@ -11,7 +11,7 @@ import type { Packed } from '@/misc/json-schema.js'; import { awaitAll } from '@/misc/prelude/await-all.js'; import type { MiUser } from '@/models/User.js'; import type { MiNote } from '@/models/Note.js'; -import type { UsersRepository, NotesRepository, FollowingsRepository, PollsRepository, PollVotesRepository, NoteReactionsRepository, ChannelsRepository, MiMeta } from '@/models/_.js'; +import type { UsersRepository, NotesRepository, FollowingsRepository, PollsRepository, PollVotesRepository, NoteReactionsRepository, ChannelsRepository, MiMeta, MiPollVote, MiPoll, MiChannel, MiFollowing } from '@/models/_.js'; import { bindThis } from '@/decorators.js'; import { DebounceLoader } from '@/misc/loader.js'; import { IdService } from '@/core/IdService.js'; @@ -26,13 +26,13 @@ import type { UserEntityService } from './UserEntityService.js'; import type { DriveFileEntityService } from './DriveFileEntityService.js'; // is-renote.tsとよしなにリンク -function isPureRenote(note: MiNote): note is MiNote & { renoteId: MiNote['id']; renote: MiNote } { +function isPureRenote(note: MiNote): note is MiNote & { renoteId: MiNote['id'] } { return ( - note.renote != null && - note.reply == null && + note.renoteId != null && + note.replyId == null && note.text == null && note.cw == null && - (note.fileIds == null || note.fileIds.length === 0) && + note.fileIds.length === 0 && !note.hasPoll ); } @@ -132,7 +132,10 @@ export class NoteEntityService implements OnModuleInit { } @bindThis - public async hideNote(packedNote: Packed<'Note'>, meId: MiUser['id'] | null): Promise<void> { + public async hideNote(packedNote: Packed<'Note'>, meId: MiUser['id'] | null, hint?: { + myFollowing?: ReadonlyMap<string, unknown>, + myBlockers?: ReadonlySet<string>, + }): Promise<void> { if (meId === packedNote.userId) return; // TODO: isVisibleForMe を使うようにしても良さそう(型違うけど) @@ -188,14 +191,9 @@ export class NoteEntityService implements OnModuleInit { } else if (packedNote.renote && (meId === packedNote.renote.userId)) { hide = false; } else { - // フォロワーかどうか - // TODO: 当関数呼び出しごとにクエリが走るのは重そうだからなんとかする - const isFollowing = await this.followingsRepository.exists({ - where: { - followeeId: packedNote.userId, - followerId: meId, - }, - }); + const isFollowing = hint?.myFollowing + ? hint.myFollowing.has(packedNote.userId) + : (await this.cacheService.userFollowingsCache.fetch(meId)).has(packedNote.userId); hide = !isFollowing; } @@ -211,7 +209,8 @@ export class NoteEntityService implements OnModuleInit { } if (!hide && meId && packedNote.userId !== meId) { - const isBlocked = (await this.cacheService.userBlockedCache.fetch(meId)).has(packedNote.userId); + const blockers = hint?.myBlockers ?? await this.cacheService.userBlockedCache.fetch(meId); + const isBlocked = blockers.has(packedNote.userId); if (isBlocked) hide = true; } @@ -235,8 +234,11 @@ export class NoteEntityService implements OnModuleInit { } @bindThis - private async populatePoll(note: MiNote, meId: MiUser['id'] | null) { - const poll = await this.pollsRepository.findOneByOrFail({ noteId: note.id }); + private async populatePoll(note: MiNote, meId: MiUser['id'] | null, hint?: { + poll?: MiPoll, + myVotes?: MiPollVote[], + }) { + const poll = hint?.poll ?? await this.pollsRepository.findOneByOrFail({ noteId: note.id }); const choices = poll.choices.map(c => ({ text: c, votes: poll.votes[poll.choices.indexOf(c)], @@ -245,7 +247,7 @@ export class NoteEntityService implements OnModuleInit { if (meId) { if (poll.multiple) { - const votes = await this.pollVotesRepository.findBy({ + const votes = hint?.myVotes ?? await this.pollVotesRepository.findBy({ userId: meId, noteId: note.id, }); @@ -255,7 +257,7 @@ export class NoteEntityService implements OnModuleInit { choices[myChoice].isVoted = true; } } else { - const vote = await this.pollVotesRepository.findOneBy({ + const vote = hint?.myVotes ? hint.myVotes[0] : await this.pollVotesRepository.findOneBy({ userId: meId, noteId: note.id, }); @@ -317,7 +319,12 @@ export class NoteEntityService implements OnModuleInit { } @bindThis - public async isVisibleForMe(note: MiNote, meId: MiUser['id'] | null): Promise<boolean> { + public async isVisibleForMe(note: MiNote, meId: MiUser['id'] | null, hint?: { + myFollowing?: ReadonlySet<string>, + myBlocking?: ReadonlySet<string>, + myBlockers?: ReadonlySet<string>, + me?: Pick<MiUser, 'host'> | null, + }): Promise<boolean> { // This code must always be synchronized with the checks in generateVisibilityQuery. // visibility が specified かつ自分が指定されていなかったら非表示 if (note.visibility === 'specified') { @@ -345,16 +352,16 @@ export class NoteEntityService implements OnModuleInit { return true; } else { // フォロワーかどうか - const [blocked, following, user] = await Promise.all([ - this.cacheService.userBlockingCache.fetch(meId).then((ids) => ids.has(note.userId)), - this.followingsRepository.count({ - where: { - followeeId: note.userId, - followerId: meId, - }, - take: 1, - }), - this.usersRepository.findOneByOrFail({ id: meId }), + const [blocked, following, userHost] = await Promise.all([ + hint?.myBlocking + ? hint.myBlocking.has(note.userId) + : this.cacheService.userBlockingCache.fetch(meId).then((ids) => ids.has(note.userId)), + hint?.myFollowing + ? hint.myFollowing.has(note.userId) + : this.cacheService.userFollowingsCache.fetch(meId).then(ids => ids.has(note.userId)), + hint?.me !== undefined + ? (hint.me?.host ?? null) + : this.cacheService.findUserById(meId).then(me => me.host), ]); if (blocked) return false; @@ -366,12 +373,13 @@ export class NoteEntityService implements OnModuleInit { in which case we can never know the following. Instead we have to assume that the users are following each other. */ - return following > 0 || (note.userHost != null && user.host != null); + return following || (note.userHost != null && userHost != null); } } if (meId != null) { - const isBlocked = (await this.cacheService.userBlockedCache.fetch(meId)).has(note.userId); + const blockers = hint?.myBlockers ?? await this.cacheService.userBlockedCache.fetch(meId); + const isBlocked = blockers.has(note.userId); if (isBlocked) return false; } @@ -408,6 +416,12 @@ export class NoteEntityService implements OnModuleInit { packedFiles: Map<MiNote['fileIds'][number], Packed<'DriveFile'> | null>; packedUsers: Map<MiUser['id'], Packed<'UserLite'>>; mentionHandles: Record<string, string | undefined>; + userFollowings: Map<string, Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>; + userBlockers: Map<string, Set<string>>; + polls: Map<string, MiPoll>; + pollVotes: Map<string, Map<string, MiPollVote[]>>; + channels: Map<string, MiChannel>; + notes: Map<string, MiNote>; }; }, ): Promise<Packed<'Note'>> { @@ -437,9 +451,7 @@ export class NoteEntityService implements OnModuleInit { } const channel = note.channelId - ? note.channel - ? note.channel - : await this.channelsRepository.findOneBy({ id: note.channelId }) + ? (opts._hint_?.channels.get(note.channelId) ?? note.channel ?? await this.channelsRepository.findOneBy({ id: note.channelId })) : null; const reactionEmojiNames = Object.keys(reactions) @@ -485,7 +497,10 @@ export class NoteEntityService implements OnModuleInit { mentionHandles: note.mentions.length > 0 ? this.getUserHandles(note.mentions, options?._hint_?.mentionHandles) : undefined, uri: note.uri ?? undefined, url: note.url ?? undefined, - poll: note.hasPoll ? this.populatePoll(note, meId) : undefined, + poll: note.hasPoll ? this.populatePoll(note, meId, { + poll: opts._hint_?.polls.get(note.id), + myVotes: opts._hint_?.pollVotes.get(note.id)?.get(note.userId), + }) : undefined, ...(meId && Object.keys(reactions).length > 0 ? { myReaction: this.populateMyReaction({ @@ -499,14 +514,14 @@ export class NoteEntityService implements OnModuleInit { clippedCount: note.clippedCount, processErrors: note.processErrors, - reply: note.replyId ? this.pack(note.reply ?? note.replyId, me, { + reply: note.replyId ? this.pack(note.reply ?? opts._hint_?.notes.get(note.replyId) ?? note.replyId, me, { detail: false, skipHide: opts.skipHide, withReactionAndUserPairCache: opts.withReactionAndUserPairCache, _hint_: options?._hint_, }) : undefined, - renote: note.renoteId ? this.pack(note.renote ?? note.renoteId, me, { + renote: note.renoteId ? this.pack(note.renote ?? opts._hint_?.notes.get(note.renoteId) ?? note.renoteId, me, { detail: true, skipHide: opts.skipHide, withReactionAndUserPairCache: opts.withReactionAndUserPairCache, @@ -518,7 +533,10 @@ export class NoteEntityService implements OnModuleInit { this.treatVisibility(packed); if (!opts.skipHide) { - await this.hideNote(packed, meId); + await this.hideNote(packed, meId, meId == null ? undefined : { + myFollowing: opts._hint_?.userFollowings.get(meId), + myBlockers: opts._hint_?.userBlockers.get(meId), + }); } return packed; @@ -535,79 +553,139 @@ export class NoteEntityService implements OnModuleInit { ) { if (notes.length === 0) return []; - const targetNotes: MiNote[] = []; + const targetNotesMap = new Map<string, MiNote>(); + const targetNotesToFetch : string[] = []; for (const note of notes) { if (isPureRenote(note)) { // we may need to fetch 'my reaction' for renote target. - targetNotes.push(note.renote); - if (note.renote.reply) { - // idem if the renote is also a reply. - targetNotes.push(note.renote.reply); + if (note.renote) { + targetNotesMap.set(note.renote.id, note.renote); + if (note.renote.reply) { + // idem if the renote is also a reply. + targetNotesMap.set(note.renote.reply.id, note.renote.reply); + } + } else if (options?.detail) { + targetNotesToFetch.push(note.renoteId); } } else { if (note.reply) { // idem for OP of a regular reply. - targetNotes.push(note.reply); + targetNotesMap.set(note.reply.id, note.reply); + } else if (note.replyId && options?.detail) { + targetNotesToFetch.push(note.replyId); } - targetNotes.push(note); + targetNotesMap.set(note.id, note); } } - const bufferedReactions = this.meta.enableReactionsBuffering ? await this.reactionsBufferingService.getMany([...getAppearNoteIds(notes)]) : null; + // Don't fetch notes that were added by ID and then found inline in another note. + for (let i = targetNotesToFetch.length - 1; i >= 0; i--) { + if (targetNotesMap.has(targetNotesToFetch[i])) { + targetNotesToFetch.splice(i, 1); + } + } - const meId = me ? me.id : null; - const myReactionsMap = new Map<MiNote['id'], string | null>(); - if (meId) { - const idsNeedFetchMyReaction = new Set<MiNote['id']>(); + // Populate any relations that weren't included in the source + if (targetNotesToFetch.length > 0) { + const newNotes = await this.notesRepository.find({ + where: { + id: In(targetNotesToFetch), + }, + relations: { + user: { + userProfile: true, + }, + reply: { + user: { + userProfile: true, + }, + }, + renote: { + user: { + userProfile: true, + }, + reply: { + user: { + userProfile: true, + }, + }, + }, + channel: true, + }, + }); - for (const note of targetNotes) { - const reactionsCount = Object.values(this.reactionsBufferingService.mergeReactions(note.reactions, bufferedReactions?.get(note.id)?.deltas ?? {})).reduce((a, b) => a + b, 0); - if (reactionsCount === 0) { - myReactionsMap.set(note.id, null); - } else if (reactionsCount <= note.reactionAndUserPairCache.length + (bufferedReactions?.get(note.id)?.pairs.length ?? 0)) { - const pairInBuffer = bufferedReactions?.get(note.id)?.pairs.find(p => p[0] === meId); - if (pairInBuffer) { - myReactionsMap.set(note.id, pairInBuffer[1]); - } else { - const pair = note.reactionAndUserPairCache.find(p => p.startsWith(meId)); - myReactionsMap.set(note.id, pair ? pair.split('/')[1] : null); - } - } else { - idsNeedFetchMyReaction.add(note.id); - } + for (const note of newNotes) { + targetNotesMap.set(note.id, note); } + } - const myReactions = idsNeedFetchMyReaction.size > 0 ? await this.noteReactionsRepository.findBy({ - userId: meId, - noteId: In(Array.from(idsNeedFetchMyReaction)), - }) : []; + const targetNotes = Array.from(targetNotesMap.values()); + const noteIds = Array.from(targetNotesMap.keys()); - for (const id of idsNeedFetchMyReaction) { - myReactionsMap.set(id, myReactions.find(reaction => reaction.noteId === id)?.reaction ?? null); + const usersMap = new Map<string, MiUser | string>(); + const allUsers = notes.flatMap(note => [ + note.user ?? note.userId, + note.reply?.user ?? note.replyUserId, + note.renote?.user ?? note.renoteUserId, + ]); + + for (const user of allUsers) { + if (!user) continue; + + if (typeof(user) === 'object') { + // ID -> Entity + usersMap.set(user.id, user); + } else if (!usersMap.has(user)) { + // ID -> ID + usersMap.set(user, user); } } - await this.customEmojiService.prefetchEmojis(this.aggregateNoteEmojis(notes)); - // TODO: 本当は renote とか reply がないのに renoteId とか replyId があったらここで解決しておく - const fileIds = notes.map(n => [n.fileIds, n.renote?.fileIds, n.reply?.fileIds]).flat(2).filter(x => x != null); - const packedFiles = fileIds.length > 0 ? await this.driveFileEntityService.packManyByIdsMap(fileIds) : new Map(); - const users = [ - ...notes.map(({ user, userId }) => user ?? userId), - ...notes.map(({ replyUserId }) => replyUserId).filter(x => x != null), - ...notes.map(({ renoteUserId }) => renoteUserId).filter(x => x != null), - ]; - const packedUsers = await this.userEntityService.packMany(users, me) - .then(users => new Map(users.map(u => [u.id, u]))); + const users = Array.from(usersMap.values()); + const userIds = Array.from(usersMap.keys()); - // Recursively add all mentioned users from all notes + replies + renotes - const allMentionedUsers = targetNotes.reduce((users, note) => { - for (const user of note.mentions) { - users.add(user); - } - return users; - }, new Set<string>()); - const mentionHandles = await this.getUserHandles(Array.from(allMentionedUsers)); + const fileIds = new Set(targetNotes.flatMap(n => n.fileIds)); + const mentionedUsers = new Set(targetNotes.flatMap(note => note.mentions)); + + const [{ bufferedReactions, myReactionsMap }, packedFiles, packedUsers, mentionHandles, userFollowings, userBlockers, polls, pollVotes, channels] = await Promise.all([ + // bufferedReactions & myReactionsMap + this.getReactions(targetNotes, me), + // packedFiles + this.driveFileEntityService.packManyByIdsMap(Array.from(fileIds)), + // packedUsers + this.userEntityService.packMany(users, me) + .then(users => new Map(users.map(u => [u.id, u]))), + // mentionHandles + this.getUserHandles(Array.from(mentionedUsers)), + // userFollowings + this.cacheService.userFollowingsCache.fetchMany(userIds).then(fs => new Map(fs)), + // userBlockers + this.cacheService.userBlockedCache.fetchMany(userIds).then(bs => new Map(bs)), + // polls + this.pollsRepository.findBy({ noteId: In(noteIds) }) + .then(polls => new Map(polls.map(p => [p.noteId, p]))), + // pollVotes + this.pollVotesRepository.findBy({ noteId: In(noteIds), userId: In(userIds) }) + .then(votes => votes.reduce((noteMap, vote) => { + let userMap = noteMap.get(vote.noteId); + if (!userMap) { + userMap = new Map<string, MiPollVote[]>(); + noteMap.set(vote.noteId, userMap); + } + let voteList = userMap.get(vote.userId); + if (!voteList) { + voteList = []; + userMap.set(vote.userId, voteList); + } + voteList.push(vote); + return noteMap; + }, new Map<string, Map<string, MiPollVote[]>>)), + // channels + this.getChannels(targetNotes), + // (not returned) + this.customEmojiService.prefetchEmojis(this.aggregateNoteEmojis(notes)), + ]); return await Promise.all(notes.map(n => this.pack(n, me, { ...options, @@ -617,6 +695,12 @@ export class NoteEntityService implements OnModuleInit { packedFiles, packedUsers, mentionHandles, + userFollowings, + userBlockers, + polls, + pollVotes, + channels, + notes: new Map(targetNotes.map(n => [n.id, n])), }, }))); } @@ -685,6 +769,68 @@ export class NoteEntityService implements OnModuleInit { }, {} as Record<string, string | undefined>); } + private async getChannels(notes: MiNote[]): Promise<Map<string, MiChannel>> { + const channels = new Map<string, MiChannel>(); + const channelsToFetch = new Set<string>(); + + for (const note of notes) { + if (note.channel) { + channels.set(note.channel.id, note.channel); + } else if (note.channelId) { + channelsToFetch.add(note.channelId); + } + } + + if (channelsToFetch.size > 0) { + const newChannels = await this.channelsRepository.findBy({ + id: In(Array.from(channelsToFetch)), + }); + for (const channel of newChannels) { + channels.set(channel.id, channel); + } + } + + return channels; + } + + private async getReactions(notes: MiNote[], me: { id: string } | null | undefined) { + const bufferedReactions = this.meta.enableReactionsBuffering ? await this.reactionsBufferingService.getMany([...getAppearNoteIds(notes)]) : null; + + const meId = me ? me.id : null; + const myReactionsMap = new Map<MiNote['id'], string | null>(); + if (meId) { + const idsNeedFetchMyReaction = new Set<MiNote['id']>(); + + for (const note of notes) { + const reactionsCount = Object.values(this.reactionsBufferingService.mergeReactions(note.reactions, bufferedReactions?.get(note.id)?.deltas ?? {})).reduce((a, b) => a + b, 0); + if (reactionsCount === 0) { + myReactionsMap.set(note.id, null); + } else if (reactionsCount <= note.reactionAndUserPairCache.length + (bufferedReactions?.get(note.id)?.pairs.length ?? 0)) { + const pairInBuffer = bufferedReactions?.get(note.id)?.pairs.find(p => p[0] === meId); + if (pairInBuffer) { + myReactionsMap.set(note.id, pairInBuffer[1]); + } else { + const pair = note.reactionAndUserPairCache.find(p => p.startsWith(meId)); + myReactionsMap.set(note.id, pair ? pair.split('/')[1] : null); + } + } else { + idsNeedFetchMyReaction.add(note.id); + } + } + + const myReactions = idsNeedFetchMyReaction.size > 0 ? await this.noteReactionsRepository.findBy({ + userId: meId, + noteId: In(Array.from(idsNeedFetchMyReaction)), + }) : []; + + for (const id of idsNeedFetchMyReaction) { + myReactionsMap.set(id, myReactions.find(reaction => reaction.noteId === id)?.reaction ?? null); + } + } + + return { bufferedReactions, myReactionsMap }; + } + @bindThis public genLocalNoteUri(noteId: string): string { return `${this.config.url}/notes/${noteId}`; diff --git a/packages/backend/src/core/entities/UserEntityService.ts b/packages/backend/src/core/entities/UserEntityService.ts index 326baaefd4..91bf258ff4 100644 --- a/packages/backend/src/core/entities/UserEntityService.ts +++ b/packages/backend/src/core/entities/UserEntityService.ts @@ -30,6 +30,7 @@ import type { FollowingsRepository, FollowRequestsRepository, MiFollowing, + MiInstance, MiMeta, MiUserNotePining, MiUserProfile, @@ -42,7 +43,7 @@ import type { UsersRepository, } from '@/models/_.js'; import { bindThis } from '@/decorators.js'; -import { RoleService } from '@/core/RoleService.js'; +import { RolePolicies, RoleService } from '@/core/RoleService.js'; import { ApPersonService } from '@/core/activitypub/models/ApPersonService.js'; import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; import { IdService } from '@/core/IdService.js'; @@ -52,6 +53,7 @@ import { AvatarDecorationService } from '@/core/AvatarDecorationService.js'; import { ChatService } from '@/core/ChatService.js'; import { isSystemAccount } from '@/misc/is-system-account.js'; import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js'; +import type { CacheService } from '@/core/CacheService.js'; import type { OnModuleInit } from '@nestjs/common'; import type { NoteEntityService } from './NoteEntityService.js'; import type { PageEntityService } from './PageEntityService.js'; @@ -77,7 +79,7 @@ function isRemoteUser(user: MiUser | { host: MiUser['host'] }): boolean { export type UserRelation = { id: MiUser['id'] - following: MiFollowing | null, + following: Omit<MiFollowing, 'isFollowerHibernated'> | null, isFollowing: boolean isFollowed: boolean hasPendingFollowRequestFromYou: boolean @@ -103,6 +105,7 @@ export class UserEntityService implements OnModuleInit { private idService: IdService; private avatarDecorationService: AvatarDecorationService; private chatService: ChatService; + private cacheService: CacheService; constructor( private moduleRef: ModuleRef, @@ -163,6 +166,7 @@ export class UserEntityService implements OnModuleInit { this.idService = this.moduleRef.get('IdService'); this.avatarDecorationService = this.moduleRef.get('AvatarDecorationService'); this.chatService = this.moduleRef.get('ChatService'); + this.cacheService = this.moduleRef.get('CacheService'); } //#region Validators @@ -193,16 +197,8 @@ export class UserEntityService implements OnModuleInit { memo, mutedInstances, ] = await Promise.all([ - this.followingsRepository.findOneBy({ - followerId: me, - followeeId: target, - }), - this.followingsRepository.exists({ - where: { - followerId: target, - followeeId: me, - }, - }), + this.cacheService.userFollowingsCache.fetch(me).then(f => f.get(target) ?? null), + this.cacheService.userFollowingsCache.fetch(target).then(f => f.has(me)), this.followRequestsRepository.exists({ where: { followerId: me, @@ -215,45 +211,22 @@ export class UserEntityService implements OnModuleInit { followeeId: me, }, }), - this.blockingsRepository.exists({ - where: { - blockerId: me, - blockeeId: target, - }, - }), - this.blockingsRepository.exists({ - where: { - blockerId: target, - blockeeId: me, - }, - }), - this.mutingsRepository.exists({ - where: { - muterId: me, - muteeId: target, - }, - }), - this.renoteMutingsRepository.exists({ - where: { - muterId: me, - muteeId: target, - }, - }), - this.usersRepository.createQueryBuilder('u') - .select('u.host') - .where({ id: target }) - .getRawOne<{ u_host: string }>() - .then(it => it?.u_host ?? null), + this.cacheService.userBlockingCache.fetch(me) + .then(blockees => blockees.has(target)), + this.cacheService.userBlockedCache.fetch(me) + .then(blockers => blockers.has(target)), + this.cacheService.userMutingsCache.fetch(me) + .then(mutings => mutings.has(target)), + this.cacheService.renoteMutingsCache.fetch(me) + .then(mutings => mutings.has(target)), + this.cacheService.findUserById(target).then(u => u.host), this.userMemosRepository.createQueryBuilder('m') .select('m.memo') .where({ userId: me, targetUserId: target }) .getRawOne<{ m_memo: string | null }>() .then(it => it?.m_memo ?? null), - this.userProfilesRepository.createQueryBuilder('p') - .select('p.mutedInstances') - .where({ userId: me }) - .getRawOne<{ p_mutedInstances: string[] }>() - .then(it => it?.p_mutedInstances ?? []), + this.cacheService.userProfileCache.fetch(me) + .then(profile => profile.mutedInstances), ]); const isInstanceMuted = !!host && mutedInstances.includes(host); @@ -277,8 +250,8 @@ export class UserEntityService implements OnModuleInit { @bindThis public async getRelations(me: MiUser['id'], targets: MiUser['id'][]): Promise<Map<MiUser['id'], UserRelation>> { const [ - followers, - followees, + myFollowing, + myFollowers, followersRequests, followeesRequests, blockers, @@ -289,13 +262,8 @@ export class UserEntityService implements OnModuleInit { memos, mutedInstances, ] = await Promise.all([ - this.followingsRepository.findBy({ followerId: me }) - .then(f => new Map(f.map(it => [it.followeeId, it]))), - this.followingsRepository.createQueryBuilder('f') - .select('f.followerId') - .where('f.followeeId = :me', { me }) - .getRawMany<{ f_followerId: string }>() - .then(it => it.map(it => it.f_followerId)), + this.cacheService.userFollowingsCache.fetch(me), + this.cacheService.userFollowersCache.fetch(me), this.followRequestsRepository.createQueryBuilder('f') .select('f.followeeId') .where('f.followerId = :me', { me }) @@ -306,34 +274,18 @@ export class UserEntityService implements OnModuleInit { .where('f.followeeId = :me', { me }) .getRawMany<{ f_followerId: string }>() .then(it => it.map(it => it.f_followerId)), - this.blockingsRepository.createQueryBuilder('b') - .select('b.blockeeId') - .where('b.blockerId = :me', { me }) - .getRawMany<{ b_blockeeId: string }>() - .then(it => it.map(it => it.b_blockeeId)), - this.blockingsRepository.createQueryBuilder('b') - .select('b.blockerId') - .where('b.blockeeId = :me', { me }) - .getRawMany<{ b_blockerId: string }>() - .then(it => it.map(it => it.b_blockerId)), - this.mutingsRepository.createQueryBuilder('m') - .select('m.muteeId') - .where('m.muterId = :me', { me }) - .getRawMany<{ m_muteeId: string }>() - .then(it => it.map(it => it.m_muteeId)), - this.renoteMutingsRepository.createQueryBuilder('m') - .select('m.muteeId') - .where('m.muterId = :me', { me }) - .getRawMany<{ m_muteeId: string }>() - .then(it => it.map(it => it.m_muteeId)), - this.usersRepository.createQueryBuilder('u') - .select(['u.id', 'u.host']) - .where({ id: In(targets) } ) - .getRawMany<{ m_id: string, m_host: string }>() - .then(it => it.reduce((map, it) => { - map[it.m_id] = it.m_host; - return map; - }, {} as Record<string, string>)), + this.cacheService.userBlockedCache.fetch(me), + this.cacheService.userBlockingCache.fetch(me), + this.cacheService.userMutingsCache.fetch(me), + this.cacheService.renoteMutingsCache.fetch(me), + this.cacheService.getUsers(targets) + .then(users => { + const record: Record<string, string | null> = {}; + for (const [id, user] of users) { + record[id] = user.host; + } + return record; + }), this.userMemosRepository.createQueryBuilder('m') .select(['m.targetUserId', 'm.memo']) .where({ userId: me, targetUserId: In(targets) }) @@ -342,16 +294,13 @@ export class UserEntityService implements OnModuleInit { map[it.m_targetUserId] = it.m_memo; return map; }, {} as Record<string, string | null>)), - this.userProfilesRepository.createQueryBuilder('p') - .select('p.mutedInstances') - .where({ userId: me }) - .getRawOne<{ p_mutedInstances: string[] }>() - .then(it => it?.p_mutedInstances ?? []), + this.cacheService.userProfileCache.fetch(me) + .then(p => p.mutedInstances), ]); return new Map( targets.map(target => { - const following = followers.get(target) ?? null; + const following = myFollowing.get(target) ?? null; return [ target, @@ -359,14 +308,14 @@ export class UserEntityService implements OnModuleInit { id: target, following: following, isFollowing: following != null, - isFollowed: followees.includes(target), + isFollowed: myFollowers.has(target), hasPendingFollowRequestFromYou: followersRequests.includes(target), hasPendingFollowRequestToYou: followeesRequests.includes(target), - isBlocking: blockers.includes(target), - isBlocked: blockees.includes(target), - isMuted: muters.includes(target), - isRenoteMuted: renoteMuters.includes(target), - isInstanceMuted: mutedInstances.includes(hosts[target]), + isBlocking: blockees.has(target), + isBlocked: blockers.has(target), + isMuted: muters.has(target), + isRenoteMuted: renoteMuters.has(target), + isInstanceMuted: hosts[target] != null && mutedInstances.includes(hosts[target]), memo: memos[target] ?? null, }, ]; @@ -391,6 +340,7 @@ export class UserEntityService implements OnModuleInit { return false; // TODO } + // TODO optimization: make redis calls in MULTI @bindThis public async getNotificationsInfo(userId: MiUser['id']): Promise<{ hasUnread: boolean; @@ -424,16 +374,14 @@ export class UserEntityService implements OnModuleInit { @bindThis public async getHasPendingReceivedFollowRequest(userId: MiUser['id']): Promise<boolean> { - const count = await this.followRequestsRepository.countBy({ + return await this.followRequestsRepository.existsBy({ followeeId: userId, }); - - return count > 0; } @bindThis public async getHasPendingSentFollowRequest(userId: MiUser['id']): Promise<boolean> { - return this.followRequestsRepository.existsBy({ + return await this.followRequestsRepository.existsBy({ followerId: userId, }); } @@ -480,6 +428,12 @@ export class UserEntityService implements OnModuleInit { userRelations?: Map<MiUser['id'], UserRelation>, userMemos?: Map<MiUser['id'], string | null>, pinNotes?: Map<MiUser['id'], MiUserNotePining[]>, + iAmModerator?: boolean, + userIdsByUri?: Map<string, string>, + instances?: Map<string, MiInstance | null>, + securityKeyCounts?: Map<string, number>, + pendingReceivedFollows?: Set<string>, + pendingSentFollows?: Set<string>, }, ): Promise<Packed<S>> { const opts = Object.assign({ @@ -521,7 +475,7 @@ export class UserEntityService implements OnModuleInit { const isDetailed = opts.schema !== 'UserLite'; const meId = me ? me.id : null; const isMe = meId === user.id; - const iAmModerator = me ? await this.roleService.isModerator(me as MiUser) : false; + const iAmModerator = opts.iAmModerator ?? (me ? await this.roleService.isModerator(me as MiUser) : false); const profile = isDetailed ? (opts.userProfile ?? user.userProfile ?? await this.userProfilesRepository.findOneByOrFail({ userId: user.id })) @@ -582,6 +536,9 @@ export class UserEntityService implements OnModuleInit { const checkHost = user.host == null ? this.config.host : user.host; const notificationsInfo = isMe && isDetailed ? await this.getNotificationsInfo(user.id) : null; + let fetchPoliciesPromise: Promise<RolePolicies> | null = null; + const fetchPolicies = () => fetchPoliciesPromise ??= this.roleService.getUserPolicies(user); + const packed = { id: user.id, name: user.name, @@ -607,13 +564,13 @@ export class UserEntityService implements OnModuleInit { mandatoryCW: user.mandatoryCW, rejectQuotes: user.rejectQuotes, attributionDomains: user.attributionDomains, - isSilenced: user.isSilenced || this.roleService.getUserPolicies(user.id).then(r => !r.canPublicNote), + isSilenced: user.isSilenced || fetchPolicies().then(r => !r.canPublicNote), speakAsCat: user.speakAsCat ?? false, approved: user.approved, requireSigninToViewContents: user.requireSigninToViewContents === false ? undefined : true, makeNotesFollowersOnlyBefore: user.makeNotesFollowersOnlyBefore ?? undefined, makeNotesHiddenBefore: user.makeNotesHiddenBefore ?? undefined, - instance: user.host ? this.federatedInstanceService.fetch(user.host).then(instance => instance ? { + instance: user.host ? Promise.resolve(opts.instances?.has(user.host) ? opts.instances.get(user.host) : this.federatedInstanceService.fetch(user.host)).then(instance => instance ? { name: instance.name, softwareName: instance.softwareName, softwareVersion: instance.softwareVersion, @@ -628,7 +585,7 @@ export class UserEntityService implements OnModuleInit { emojis: this.customEmojiService.populateEmojis(user.emojis, checkHost), onlineStatus: this.getOnlineStatus(user), // パフォーマンス上の理由でローカルユーザーのみ - badgeRoles: user.host == null ? this.roleService.getUserBadgeRoles(user.id).then((rs) => rs + badgeRoles: user.host == null ? this.roleService.getUserBadgeRoles(user).then((rs) => rs .filter((r) => r.isPublic || iAmModerator) .sort((a, b) => b.displayOrder - a.displayOrder) .map((r) => ({ @@ -641,9 +598,9 @@ export class UserEntityService implements OnModuleInit { ...(isDetailed ? { url: profile!.url, uri: user.uri, - movedTo: user.movedToUri ? this.apPersonService.resolvePerson(user.movedToUri).then(user => user.id).catch(() => null) : null, + movedTo: user.movedToUri ? Promise.resolve(opts.userIdsByUri?.get(user.movedToUri) ?? this.apPersonService.resolvePerson(user.movedToUri).then(user => user.id).catch(() => null)) : null, alsoKnownAs: user.alsoKnownAs - ? Promise.all(user.alsoKnownAs.map(uri => this.apPersonService.fetchPerson(uri).then(user => user?.id).catch(() => null))) + ? Promise.all(user.alsoKnownAs.map(uri => Promise.resolve(opts.userIdsByUri?.get(uri) ?? this.apPersonService.fetchPerson(uri).then(user => user?.id).catch(() => null)))) .then(xs => xs.length === 0 ? null : xs.filter(x => x != null)) : null, updatedAt: user.updatedAt ? user.updatedAt.toISOString() : null, @@ -670,8 +627,8 @@ export class UserEntityService implements OnModuleInit { followersVisibility: profile!.followersVisibility, followingVisibility: profile!.followingVisibility, chatScope: user.chatScope, - canChat: this.roleService.getUserPolicies(user.id).then(r => r.chatAvailability === 'available'), - roles: this.roleService.getUserRoles(user.id).then(roles => roles.filter(role => role.isPublic).sort((a, b) => b.displayOrder - a.displayOrder).map(role => ({ + canChat: fetchPolicies().then(r => r.chatAvailability === 'available'), + roles: this.roleService.getUserRoles(user).then(roles => roles.filter(role => role.isPublic).sort((a, b) => b.displayOrder - a.displayOrder).map(role => ({ id: role.id, name: role.name, color: role.color, @@ -689,7 +646,7 @@ export class UserEntityService implements OnModuleInit { twoFactorEnabled: profile!.twoFactorEnabled, usePasswordLessLogin: profile!.usePasswordLessLogin, securityKeys: profile!.twoFactorEnabled - ? this.userSecurityKeysRepository.countBy({ userId: user.id }).then(result => result >= 1) + ? Promise.resolve(opts.securityKeyCounts?.get(user.id) ?? this.userSecurityKeysRepository.countBy({ userId: user.id })).then(result => result >= 1) : false, } : {}), @@ -722,8 +679,8 @@ export class UserEntityService implements OnModuleInit { hasUnreadAntenna: this.getHasUnreadAntenna(user.id), hasUnreadChannel: false, // 後方互換性のため hasUnreadNotification: notificationsInfo?.hasUnread, // 後方互換性のため - hasPendingReceivedFollowRequest: this.getHasPendingReceivedFollowRequest(user.id), - hasPendingSentFollowRequest: this.getHasPendingSentFollowRequest(user.id), + hasPendingReceivedFollowRequest: opts.pendingReceivedFollows?.has(user.id) ?? this.getHasPendingReceivedFollowRequest(user.id), + hasPendingSentFollowRequest: opts.pendingSentFollows?.has(user.id) ?? this.getHasPendingSentFollowRequest(user.id), unreadNotificationsCount: notificationsInfo?.unreadCount, mutedWords: profile!.mutedWords, hardMutedWords: profile!.hardMutedWords, @@ -733,7 +690,7 @@ export class UserEntityService implements OnModuleInit { emailNotificationTypes: profile!.emailNotificationTypes, achievements: profile!.achievements, loggedInDays: profile!.loggedInDates.length, - policies: this.roleService.getUserPolicies(user.id), + policies: fetchPolicies(), defaultCW: profile!.defaultCW, defaultCWPriority: profile!.defaultCWPriority, allowUnsignedFetch: user.allowUnsignedFetch, @@ -783,6 +740,8 @@ export class UserEntityService implements OnModuleInit { includeSecrets?: boolean, }, ): Promise<Packed<S>[]> { + if (users.length === 0) return []; + // -- IDのみの要素を補完して完全なエンティティ一覧を作る const _users = users.filter((user): user is MiUser => typeof user !== 'string'); @@ -800,57 +759,105 @@ export class UserEntityService implements OnModuleInit { } const _userIds = _users.map(u => u.id); - // -- 実行者の有無や指定スキーマの種別によって要否が異なる値群を取得 - - let profilesMap: Map<MiUser['id'], MiUserProfile> = new Map(); - let userRelations: Map<MiUser['id'], UserRelation> = new Map(); - let userMemos: Map<MiUser['id'], string | null> = new Map(); - let pinNotes: Map<MiUser['id'], MiUserNotePining[]> = new Map(); + const iAmModerator = await this.roleService.isModerator(me as MiUser); + const meId = me ? me.id : null; + const isMe = meId && _userIds.includes(meId); + const isDetailed = options && options.schema !== 'UserLite'; + const isDetailedAndMe = isDetailed && isMe; + const isDetailedAndMeOrMod = isDetailed && (isMe || iAmModerator); + const isDetailedAndNotMe = isDetailed && !isMe; - if (options?.schema !== 'UserLite') { - const _profiles: MiUserProfile[] = []; - const _profilesToFetch: string[] = []; - for (const user of _users) { - if (user.userProfile) { - _profiles.push(user.userProfile); - } else { - _profilesToFetch.push(user.id); - } - } - if (_profilesToFetch.length > 0) { - const fetched = await this.userProfilesRepository.findBy({ userId: In(_profilesToFetch) }); - _profiles.push(...fetched); - } - profilesMap = new Map(_profiles.map(p => [p.userId, p])); + const userUris = new Set(_users + .flatMap(user => [user.uri, user.movedToUri]) + .filter((uri): uri is string => uri != null)); - const meId = me ? me.id : null; - if (meId) { - userMemos = await this.userMemosRepository.findBy({ userId: meId }) - .then(memos => new Map(memos.map(memo => [memo.targetUserId, memo.memo]))); + const userHosts = new Set(_users + .map(user => user.host) + .filter((host): host is string => host != null)); - if (_userIds.length > 0) { - userRelations = await this.getRelations(meId, _userIds); - pinNotes = await this.userNotePiningsRepository.createQueryBuilder('pin') - .where('pin.userId IN (:...userIds)', { userIds: _userIds }) - .innerJoinAndSelect('pin.note', 'note') - .getMany() - .then(pinsNotes => { - const map = new Map<MiUser['id'], MiUserNotePining[]>(); - for (const note of pinsNotes) { - const notes = map.get(note.userId) ?? []; - notes.push(note); - map.set(note.userId, notes); - } - for (const [, notes] of map.entries()) { - // pack側ではDESCで取得しているので、それに合わせて降順に並び替えておく - notes.sort((a, b) => b.id.localeCompare(a.id)); - } - return map; - }); - } + const _profilesFromUsers: [string, MiUserProfile][] = []; + const _profilesToFetch: string[] = []; + for (const user of _users) { + if (user.userProfile) { + _profilesFromUsers.push([user.id, user.userProfile]); + } else { + _profilesToFetch.push(user.id); } } + // -- 実行者の有無や指定スキーマの種別によって要否が異なる値群を取得 + + const [profilesMap, userMemos, userRelations, pinNotes, userIdsByUri, instances, securityKeyCounts, pendingReceivedFollows, pendingSentFollows] = await Promise.all([ + // profilesMap + this.cacheService.userProfileCache.fetchMany(_profilesToFetch).then(profiles => new Map(profiles.concat(_profilesFromUsers))), + // userMemos + isDetailed && meId ? this.userMemosRepository.findBy({ userId: meId }) + .then(memos => new Map(memos.map(memo => [memo.targetUserId, memo.memo]))) : new Map(), + // userRelations + isDetailedAndNotMe && meId ? this.getRelations(meId, _userIds) : new Map(), + // pinNotes + isDetailed ? this.userNotePiningsRepository.createQueryBuilder('pin') + .where('pin.userId IN (:...userIds)', { userIds: _userIds }) + .innerJoinAndSelect('pin.note', 'note') + .getMany() + .then(pinsNotes => { + const map = new Map<MiUser['id'], MiUserNotePining[]>(); + for (const note of pinsNotes) { + const notes = map.get(note.userId) ?? []; + notes.push(note); + map.set(note.userId, notes); + } + for (const [, notes] of map.entries()) { + // pack側ではDESCで取得しているので、それに合わせて降順に並び替えておく + notes.sort((a, b) => b.id.localeCompare(a.id)); + } + return map; + }) : new Map(), + // userIdsByUrl + isDetailed ? this.usersRepository.createQueryBuilder('user') + .select([ + 'user.id', + 'user.uri', + ]) + .where({ + uri: In(Array.from(userUris)), + }) + .getRawMany<{ user_uri: string, user_id: string }>() + .then(users => new Map(users.map(u => [u.user_uri, u.user_id]))) : new Map(), + // instances + Promise.all(Array.from(userHosts).map(async host => [host, await this.federatedInstanceService.fetch(host)] as const)) + .then(hosts => new Map(hosts)), + // securityKeyCounts + isDetailedAndMeOrMod ? this.userSecurityKeysRepository.createQueryBuilder('key') + .select('key.userId', 'userId') + .addSelect('count(key.id)', 'userCount') + .where({ + userId: In(_userIds), + }) + .groupBy('key.userId') + .getRawMany<{ userId: string, userCount: number }>() + .then(counts => new Map(counts.map(c => [c.userId, c.userCount]))) : new Map(), + // TODO optimization: cache follow requests + // pendingReceivedFollows + isDetailedAndMe ? this.followRequestsRepository.createQueryBuilder('req') + .select('req.followeeId', 'followeeId') + .where({ + followeeId: In(_userIds), + }) + .groupBy('req.followeeId') + .getRawMany<{ followeeId: string }>() + .then(reqs => new Set(reqs.map(r => r.followeeId))) : new Set<string>(), + // pendingSentFollows + isDetailedAndMe ? this.followRequestsRepository.createQueryBuilder('req') + .select('req.followerId', 'followerId') + .where({ + followerId: In(_userIds), + }) + .groupBy('req.followerId') + .getRawMany<{ followerId: string }>() + .then(reqs => new Set(reqs.map(r => r.followerId))) : new Set<string>(), + ]); + return Promise.all( _users.map(u => this.pack( u, @@ -861,6 +868,12 @@ export class UserEntityService implements OnModuleInit { userRelations: userRelations, userMemos: userMemos, pinNotes: pinNotes, + iAmModerator, + userIdsByUri, + instances, + securityKeyCounts, + pendingReceivedFollows, + pendingSentFollows, }, )), ); |