summaryrefslogtreecommitdiff
path: root/packages/backend/src/core/CacheService.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/core/CacheService.ts')
-rw-r--r--packages/backend/src/core/CacheService.ts354
1 files changed, 293 insertions, 61 deletions
diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts
index 1cf63221f9..2d37cd6bab 100644
--- a/packages/backend/src/core/CacheService.ts
+++ b/packages/backend/src/core/CacheService.ts
@@ -5,14 +5,16 @@
import { Inject, Injectable } from '@nestjs/common';
import * as Redis from 'ioredis';
-import { IsNull } from 'typeorm';
-import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiFollowing, MiNote } from '@/models/_.js';
+import { In, IsNull } from 'typeorm';
+import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiNote, MiFollowing } from '@/models/_.js';
import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js';
+import { QuantumKVCache } from '@/misc/QuantumKVCache.js';
import type { MiLocalUser, MiUser } from '@/models/User.js';
import { DI } from '@/di-symbols.js';
import { UserEntityService } from '@/core/entities/UserEntityService.js';
import { bindThis } from '@/decorators.js';
-import type { GlobalEvents } from '@/core/GlobalEventService.js';
+import type { InternalEventTypes } from '@/core/GlobalEventService.js';
+import { InternalEventService } from '@/core/InternalEventService.js';
import type { OnApplicationShutdown } from '@nestjs/common';
export interface FollowStats {
@@ -27,7 +29,7 @@ export interface CachedTranslation {
text: string | undefined;
}
-interface CachedTranslationEntity {
+export interface CachedTranslationEntity {
l?: string;
t?: string;
u?: number;
@@ -39,14 +41,16 @@ export class CacheService implements OnApplicationShutdown {
public localUserByNativeTokenCache: MemoryKVCache<MiLocalUser | null>;
public localUserByIdCache: MemoryKVCache<MiLocalUser>;
public uriPersonCache: MemoryKVCache<MiUser | null>;
- public userProfileCache: RedisKVCache<MiUserProfile>;
- public userMutingsCache: RedisKVCache<Set<string>>;
- public userBlockingCache: RedisKVCache<Set<string>>;
- public userBlockedCache: RedisKVCache<Set<string>>; // NOTE: 「被」Blockキャッシュ
- public renoteMutingsCache: RedisKVCache<Set<string>>;
- public userFollowingsCache: RedisKVCache<Record<string, Pick<MiFollowing, 'withReplies'> | undefined>>;
- private readonly userFollowStatsCache = new MemoryKVCache<FollowStats>(1000 * 60 * 10); // 10 minutes
- private readonly translationsCache: RedisKVCache<CachedTranslationEntity>;
+ public userProfileCache: QuantumKVCache<MiUserProfile>;
+ public userMutingsCache: QuantumKVCache<Set<string>>;
+ public userBlockingCache: QuantumKVCache<Set<string>>;
+ public userBlockedCache: QuantumKVCache<Set<string>>; // NOTE: 「被」Blockキャッシュ
+ public renoteMutingsCache: QuantumKVCache<Set<string>>;
+ public userFollowingsCache: QuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>;
+ public userFollowersCache: QuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>;
+ public hibernatedUserCache: QuantumKVCache<boolean>;
+ protected userFollowStatsCache = new MemoryKVCache<FollowStats>(1000 * 60 * 10); // 10 minutes
+ protected translationsCache: RedisKVCache<CachedTranslationEntity>;
constructor(
@Inject(DI.redis)
@@ -74,6 +78,7 @@ export class CacheService implements OnApplicationShutdown {
private followingsRepository: FollowingsRepository,
private userEntityService: UserEntityService,
+ private readonly internalEventService: InternalEventService,
) {
//this.onMessage = this.onMessage.bind(this);
@@ -82,58 +87,148 @@ export class CacheService implements OnApplicationShutdown {
this.localUserByIdCache = new MemoryKVCache<MiLocalUser>(1000 * 60 * 5); // 5m
this.uriPersonCache = new MemoryKVCache<MiUser | null>(1000 * 60 * 5); // 5m
- this.userProfileCache = new RedisKVCache<MiUserProfile>(this.redisClient, 'userProfile', {
+ this.userProfileCache = new QuantumKVCache(this.internalEventService, 'userProfile', {
lifetime: 1000 * 60 * 30, // 30m
- memoryCacheLifetime: 1000 * 60, // 1m
fetcher: (key) => this.userProfilesRepository.findOneByOrFail({ userId: key }),
- toRedisConverter: (value) => JSON.stringify(value),
- fromRedisConverter: (value) => JSON.parse(value), // TODO: date型の考慮
+ bulkFetcher: userIds => this.userProfilesRepository.findBy({ userId: In(userIds) }).then(ps => ps.map(p => [p.userId, p])),
});
- this.userMutingsCache = new RedisKVCache<Set<string>>(this.redisClient, 'userMutings', {
+ this.userMutingsCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userMutings', {
lifetime: 1000 * 60 * 30, // 30m
- memoryCacheLifetime: 1000 * 60, // 1m
fetcher: (key) => this.mutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))),
- toRedisConverter: (value) => JSON.stringify(Array.from(value)),
- fromRedisConverter: (value) => new Set(JSON.parse(value)),
+ bulkFetcher: muterIds => this.mutingsRepository
+ .createQueryBuilder('muting')
+ .select('"muting"."muterId"', 'muterId')
+ .addSelect('array_agg("muting"."muteeId")', 'muteeIds')
+ .where({ muterId: In(muterIds) })
+ .groupBy('muting.muterId')
+ .getRawMany<{ muterId: string, muteeIds: string[] }>()
+ .then(ms => ms.map(m => [m.muterId, new Set(m.muteeIds)])),
});
- this.userBlockingCache = new RedisKVCache<Set<string>>(this.redisClient, 'userBlocking', {
+ this.userBlockingCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userBlocking', {
lifetime: 1000 * 60 * 30, // 30m
- memoryCacheLifetime: 1000 * 60, // 1m
fetcher: (key) => this.blockingsRepository.find({ where: { blockerId: key }, select: ['blockeeId'] }).then(xs => new Set(xs.map(x => x.blockeeId))),
- toRedisConverter: (value) => JSON.stringify(Array.from(value)),
- fromRedisConverter: (value) => new Set(JSON.parse(value)),
+ bulkFetcher: blockerIds => this.blockingsRepository
+ .createQueryBuilder('blocking')
+ .select('"blocking"."blockerId"', 'blockerId')
+ .addSelect('array_agg("blocking"."blockeeId")', 'blockeeIds')
+ .where({ blockerId: In(blockerIds) })
+ .groupBy('blocking.blockerId')
+ .getRawMany<{ blockerId: string, blockeeIds: string[] }>()
+ .then(ms => ms.map(m => [m.blockerId, new Set(m.blockeeIds)])),
});
- this.userBlockedCache = new RedisKVCache<Set<string>>(this.redisClient, 'userBlocked', {
+ this.userBlockedCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userBlocked', {
lifetime: 1000 * 60 * 30, // 30m
- memoryCacheLifetime: 1000 * 60, // 1m
fetcher: (key) => this.blockingsRepository.find({ where: { blockeeId: key }, select: ['blockerId'] }).then(xs => new Set(xs.map(x => x.blockerId))),
- toRedisConverter: (value) => JSON.stringify(Array.from(value)),
- fromRedisConverter: (value) => new Set(JSON.parse(value)),
+ bulkFetcher: blockeeIds => this.blockingsRepository
+ .createQueryBuilder('blocking')
+ .select('"blocking"."blockeeId"', 'blockeeId')
+ .addSelect('array_agg("blocking"."blockeeId")', 'blockeeIds')
+ .where({ blockeeId: In(blockeeIds) })
+ .groupBy('blocking.blockeeId')
+ .getRawMany<{ blockeeId: string, blockerIds: string[] }>()
+ .then(ms => ms.map(m => [m.blockeeId, new Set(m.blockerIds)])),
});
- this.renoteMutingsCache = new RedisKVCache<Set<string>>(this.redisClient, 'renoteMutings', {
+ this.renoteMutingsCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'renoteMutings', {
lifetime: 1000 * 60 * 30, // 30m
- memoryCacheLifetime: 1000 * 60, // 1m
fetcher: (key) => this.renoteMutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))),
- toRedisConverter: (value) => JSON.stringify(Array.from(value)),
- fromRedisConverter: (value) => new Set(JSON.parse(value)),
+ bulkFetcher: muterIds => this.renoteMutingsRepository
+ .createQueryBuilder('muting')
+ .select('"muting"."muterId"', 'muterId')
+ .addSelect('array_agg("muting"."muteeId")', 'muteeIds')
+ .where({ muterId: In(muterIds) })
+ .groupBy('muting.muterId')
+ .getRawMany<{ muterId: string, muteeIds: string[] }>()
+ .then(ms => ms.map(m => [m.muterId, new Set(m.muteeIds)])),
});
- this.userFollowingsCache = new RedisKVCache<Record<string, Pick<MiFollowing, 'withReplies'> | undefined>>(this.redisClient, 'userFollowings', {
+ this.userFollowingsCache = new QuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>(this.internalEventService, 'userFollowings', {
lifetime: 1000 * 60 * 30, // 30m
- memoryCacheLifetime: 1000 * 60, // 1m
- fetcher: (key) => this.followingsRepository.find({ where: { followerId: key }, select: ['followeeId', 'withReplies'] }).then(xs => {
- const obj: Record<string, Pick<MiFollowing, 'withReplies'> | undefined> = {};
- for (const x of xs) {
- obj[x.followeeId] = { withReplies: x.withReplies };
+ fetcher: (key) => this.followingsRepository.findBy({ followerId: key }).then(xs => new Map(xs.map(f => [f.followeeId, f]))),
+ bulkFetcher: followerIds => this.followingsRepository
+ .findBy({ followerId: In(followerIds) })
+ .then(fs => fs
+ .reduce((groups, f) => {
+ let group = groups.get(f.followerId);
+ if (!group) {
+ group = new Map();
+ groups.set(f.followerId, group);
+ }
+ group.set(f.followeeId, f);
+ return groups;
+ }, new Map<string, Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>)),
+ });
+
+ this.userFollowersCache = new QuantumKVCache<Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>(this.internalEventService, 'userFollowers', {
+ lifetime: 1000 * 60 * 30, // 30m
+ fetcher: followeeId => this.followingsRepository.findBy({ followeeId: followeeId }).then(xs => new Map(xs.map(x => [x.followerId, x]))),
+ bulkFetcher: followeeIds => this.followingsRepository
+ .findBy({ followeeId: In(followeeIds) })
+ .then(fs => fs
+ .reduce((groups, f) => {
+ let group = groups.get(f.followeeId);
+ if (!group) {
+ group = new Map();
+ groups.set(f.followeeId, group);
+ }
+ group.set(f.followerId, f);
+ return groups;
+ }, new Map<string, Map<string, Omit<MiFollowing, 'isFollowerHibernated'>>>)),
+ });
+
+ this.hibernatedUserCache = new QuantumKVCache<boolean>(this.internalEventService, 'hibernatedUsers', {
+ lifetime: 1000 * 60 * 30, // 30m
+ fetcher: async userId => {
+ const { isHibernated } = await this.usersRepository.findOneOrFail({
+ where: { id: userId },
+ select: { isHibernated: true },
+ });
+ return isHibernated;
+ },
+ bulkFetcher: async userIds => {
+ const results = await this.usersRepository.find({
+ where: { id: In(userIds) },
+ select: { id: true, isHibernated: true },
+ });
+ return results.map(({ id, isHibernated }) => [id, isHibernated]);
+ },
+ onChanged: async userIds => {
+ // We only update local copies since each process will get this event, but we can have user objects in multiple different caches.
+ // Before doing anything else we must "find" all the objects to update.
+ const userObjects = new Map<string, MiUser[]>();
+ const toUpdate: string[] = [];
+ for (const uid of userIds) {
+ const toAdd: MiUser[] = [];
+
+ const localUserById = this.localUserByIdCache.get(uid);
+ if (localUserById) toAdd.push(localUserById);
+
+ const userById = this.userByIdCache.get(uid);
+ if (userById) toAdd.push(userById);
+
+ if (toAdd.length > 0) {
+ toUpdate.push(uid);
+ userObjects.set(uid, toAdd);
+ }
+ }
+
+ // In many cases, we won't have to do anything.
+ // Skipping the DB fetch ensures that this remains a single-step synchronous process.
+ if (toUpdate.length > 0) {
+ const hibernations = await this.usersRepository.find({ where: { id: In(toUpdate) }, select: { id: true, isHibernated: true } });
+ for (const { id, isHibernated } of hibernations) {
+ const users = userObjects.get(id);
+ if (users) {
+ for (const u of users) {
+ u.isHibernated = isHibernated;
+ }
+ }
+ }
}
- return obj;
- }),
- toRedisConverter: (value) => JSON.stringify(value),
- fromRedisConverter: (value) => JSON.parse(value),
+ },
});
this.translationsCache = new RedisKVCache<CachedTranslationEntity>(this.redisClient, 'translations', {
@@ -143,20 +238,21 @@ export class CacheService implements OnApplicationShutdown {
// NOTE: チャンネルのフォロー状況キャッシュはChannelFollowingServiceで行っている
- this.redisForSub.on('message', this.onMessage);
+ this.internalEventService.on('userChangeSuspendedState', this.onUserEvent);
+ this.internalEventService.on('userChangeDeletedState', this.onUserEvent);
+ this.internalEventService.on('remoteUserUpdated', this.onUserEvent);
+ this.internalEventService.on('localUserUpdated', this.onUserEvent);
+ this.internalEventService.on('userChangeSuspendedState', this.onUserEvent);
+ this.internalEventService.on('userTokenRegenerated', this.onTokenEvent);
+ this.internalEventService.on('follow', this.onFollowEvent);
+ this.internalEventService.on('unfollow', this.onFollowEvent);
}
@bindThis
- private async onMessage(_: string, data: string): Promise<void> {
- const obj = JSON.parse(data);
-
- if (obj.channel === 'internal') {
- const { type, body } = obj.message as GlobalEvents['internal']['payload'];
- switch (type) {
- case 'userChangeSuspendedState':
- case 'userChangeDeletedState':
- case 'remoteUserUpdated':
- case 'localUserUpdated': {
+ private async onUserEvent<E extends 'userChangeSuspendedState' | 'userChangeDeletedState' | 'remoteUserUpdated' | 'localUserUpdated'>(body: InternalEventTypes[E], _: E, isLocal: boolean): Promise<void> {
+ {
+ {
+ {
const user = await this.usersRepository.findOneBy({ id: body.id });
if (user == null) {
this.userByIdCache.delete(body.id);
@@ -166,6 +262,18 @@ export class CacheService implements OnApplicationShutdown {
this.uriPersonCache.delete(k);
}
}
+ if (isLocal) {
+ await Promise.all([
+ this.userProfileCache.delete(body.id),
+ this.userMutingsCache.delete(body.id),
+ this.userBlockingCache.delete(body.id),
+ this.userBlockedCache.delete(body.id),
+ this.renoteMutingsCache.delete(body.id),
+ this.userFollowingsCache.delete(body.id),
+ this.userFollowersCache.delete(body.id),
+ this.hibernatedUserCache.delete(body.id),
+ ]);
+ }
} else {
this.userByIdCache.set(user.id, user);
for (const [k, v] of this.uriPersonCache.entries) {
@@ -178,20 +286,37 @@ export class CacheService implements OnApplicationShutdown {
this.localUserByIdCache.set(user.id, user);
}
}
- break;
}
- case 'userTokenRegenerated': {
+ }
+ }
+ }
+
+ @bindThis
+ private async onTokenEvent<E extends 'userTokenRegenerated'>(body: InternalEventTypes[E]): Promise<void> {
+ {
+ {
+ {
const user = await this.usersRepository.findOneByOrFail({ id: body.id }) as MiLocalUser;
this.localUserByNativeTokenCache.delete(body.oldToken);
this.localUserByNativeTokenCache.set(body.newToken, user);
- break;
}
+ }
+ }
+ }
+
+ @bindThis
+ private async onFollowEvent<E extends 'follow' | 'unfollow'>(body: InternalEventTypes[E], type: E): Promise<void> {
+ {
+ switch (type) {
case 'follow': {
const follower = this.userByIdCache.get(body.followerId);
if (follower) follower.followingCount++;
const followee = this.userByIdCache.get(body.followeeId);
if (followee) followee.followersCount++;
- this.userFollowingsCache.delete(body.followerId);
+ await Promise.all([
+ this.userFollowingsCache.delete(body.followerId),
+ this.userFollowersCache.delete(body.followeeId),
+ ]);
this.userFollowStatsCache.delete(body.followerId);
this.userFollowStatsCache.delete(body.followeeId);
break;
@@ -201,13 +326,14 @@ export class CacheService implements OnApplicationShutdown {
if (follower) follower.followingCount--;
const followee = this.userByIdCache.get(body.followeeId);
if (followee) followee.followersCount--;
- this.userFollowingsCache.delete(body.followerId);
+ await Promise.all([
+ this.userFollowingsCache.delete(body.followerId),
+ this.userFollowersCache.delete(body.followeeId),
+ ]);
this.userFollowStatsCache.delete(body.followerId);
this.userFollowStatsCache.delete(body.followeeId);
break;
}
- default:
- break;
}
}
}
@@ -299,8 +425,114 @@ export class CacheService implements OnApplicationShutdown {
}
@bindThis
+ public async getUsers(userIds: Iterable<string>): Promise<Map<string, MiUser>> {
+ const users = new Map<string, MiUser>;
+
+ const toFetch: string[] = [];
+ for (const userId of userIds) {
+ const fromCache = this.userByIdCache.get(userId);
+ if (fromCache) {
+ users.set(userId, fromCache);
+ } else {
+ toFetch.push(userId);
+ }
+ }
+
+ if (toFetch.length > 0) {
+ const fetched = await this.usersRepository.findBy({
+ id: In(toFetch),
+ });
+
+ for (const user of fetched) {
+ users.set(user.id, user);
+ this.userByIdCache.set(user.id, user);
+ }
+ }
+
+ return users;
+ }
+
+ @bindThis
+ public async isFollowing(follower: string | { id: string }, followee: string | { id: string }): Promise<boolean> {
+ const followerId = typeof(follower) === 'string' ? follower : follower.id;
+ const followeeId = typeof(followee) === 'string' ? followee : followee.id;
+
+ // This lets us use whichever one is in memory, falling back to DB fetch via userFollowingsCache.
+ return this.userFollowersCache.get(followeeId)?.has(followerId)
+ ?? (await this.userFollowingsCache.fetch(followerId)).has(followeeId);
+ }
+
+ /**
+ * Returns all hibernated followers.
+ */
+ @bindThis
+ public async getHibernatedFollowers(followeeId: string): Promise<MiFollowing[]> {
+ const followers = await this.getFollowersWithHibernation(followeeId);
+ return followers.filter(f => f.isFollowerHibernated);
+ }
+
+ /**
+ * Returns all non-hibernated followers.
+ */
+ @bindThis
+ public async getNonHibernatedFollowers(followeeId: string): Promise<MiFollowing[]> {
+ const followers = await this.getFollowersWithHibernation(followeeId);
+ return followers.filter(f => !f.isFollowerHibernated);
+ }
+
+ /**
+ * Returns follower relations with populated isFollowerHibernated.
+ * If you don't need this field, then please use userFollowersCache directly for reduced overhead.
+ */
+ @bindThis
+ public async getFollowersWithHibernation(followeeId: string): Promise<MiFollowing[]> {
+ const followers = await this.userFollowersCache.fetch(followeeId);
+ const hibernations = await this.hibernatedUserCache.fetchMany(followers.keys()).then(fs => fs.reduce((map, f) => {
+ map.set(f[0], f[1]);
+ return map;
+ }, new Map<string, boolean>));
+ return Array.from(followers.values()).map(following => ({
+ ...following,
+ isFollowerHibernated: hibernations.get(following.followerId) ?? false,
+ }));
+ }
+
+ /**
+ * Refreshes follower and following relations for the given user.
+ */
+ @bindThis
+ public async refreshFollowRelationsFor(userId: string): Promise<void> {
+ const followings = await this.userFollowingsCache.refresh(userId);
+ const followees = Array.from(followings.values()).map(f => f.followeeId);
+ await this.userFollowersCache.deleteMany(followees);
+ }
+
+ @bindThis
+ public clear(): void {
+ this.userByIdCache.clear();
+ this.localUserByNativeTokenCache.clear();
+ this.localUserByIdCache.clear();
+ this.uriPersonCache.clear();
+ this.userProfileCache.clear();
+ this.userMutingsCache.clear();
+ this.userBlockingCache.clear();
+ this.userBlockedCache.clear();
+ this.renoteMutingsCache.clear();
+ this.userFollowingsCache.clear();
+ this.userFollowStatsCache.clear();
+ this.translationsCache.clear();
+ }
+
+ @bindThis
public dispose(): void {
- this.redisForSub.off('message', this.onMessage);
+ this.internalEventService.off('userChangeSuspendedState', this.onUserEvent);
+ this.internalEventService.off('userChangeDeletedState', this.onUserEvent);
+ this.internalEventService.off('remoteUserUpdated', this.onUserEvent);
+ this.internalEventService.off('localUserUpdated', this.onUserEvent);
+ this.internalEventService.off('userChangeSuspendedState', this.onUserEvent);
+ this.internalEventService.off('userTokenRegenerated', this.onTokenEvent);
+ this.internalEventService.off('follow', this.onFollowEvent);
+ this.internalEventService.off('unfollow', this.onFollowEvent);
this.userByIdCache.dispose();
this.localUserByNativeTokenCache.dispose();
this.localUserByIdCache.dispose();