From 30d699268450af375dabc2226ec4f3196a53f7f7 Mon Sep 17 00:00:00 2001 From: syuilo Date: Tue, 4 Apr 2023 14:06:57 +0900 Subject: perf(backend): 通知をRedisに保存するように MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolve #10168 --- packages/backend/src/server/api/stream/index.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'packages/backend/src/server/api/stream') diff --git a/packages/backend/src/server/api/stream/index.ts b/packages/backend/src/server/api/stream/index.ts index 7c6eb9a20a..f1f8bfd3a2 100644 --- a/packages/backend/src/server/api/stream/index.ts +++ b/packages/backend/src/server/api/stream/index.ts @@ -195,8 +195,7 @@ export default class Connection { @bindThis private onReadNotification(payload: any) { - if (!payload.id) return; - this.notificationService.readNotification(this.user!.id, [payload.id]); + this.notificationService.readAllNotification(this.user!.id); } /** -- cgit v1.2.3-freya From ecaf152b4a6eb702375debaef0dddc2cca798116 Mon Sep 17 00:00:00 2001 From: syuilo Date: Tue, 4 Apr 2023 17:32:09 +0900 Subject: enhance(backend): improve cache --- packages/backend/src/core/CacheService.ts | 95 +++++++++++++++++++++ packages/backend/src/core/CoreModule.ts | 12 +-- packages/backend/src/core/InstanceActorService.ts | 12 +-- packages/backend/src/core/NoteCreateService.ts | 6 +- packages/backend/src/core/NotificationService.ts | 18 ++-- packages/backend/src/core/RelayService.ts | 8 +- packages/backend/src/core/RoleService.ts | 30 +++---- packages/backend/src/core/UserCacheService.ts | 88 -------------------- .../src/core/activitypub/ApDbResolverService.ts | 10 +-- .../src/core/activitypub/models/ApPersonService.ts | 14 ++-- .../src/core/entities/NotificationEntityService.ts | 1 + .../backend/src/core/entities/UserEntityService.ts | 1 - packages/backend/src/misc/cache.ts | 97 ++++++++++++++++++++-- .../queue/processors/DeliverProcessorService.ts | 10 +-- .../backend/src/server/NodeinfoServerService.ts | 8 +- .../backend/src/server/api/AuthenticateService.ts | 8 +- .../src/server/api/endpoints/i/regenerate-token.ts | 2 +- .../backend/src/server/api/endpoints/i/update.ts | 8 +- .../src/server/api/endpoints/mute/create.ts | 3 + .../src/server/api/endpoints/renote-mute/create.ts | 2 - packages/backend/src/server/api/stream/types.ts | 2 +- packages/backend/test/unit/RoleService.ts | 4 +- 22 files changed, 267 insertions(+), 172 deletions(-) create mode 100644 packages/backend/src/core/CacheService.ts delete mode 100644 packages/backend/src/core/UserCacheService.ts (limited to 'packages/backend/src/server/api/stream') diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts new file mode 100644 index 0000000000..887baeb2c2 --- /dev/null +++ b/packages/backend/src/core/CacheService.ts @@ -0,0 +1,95 @@ +import { Inject, Injectable } from '@nestjs/common'; +import Redis from 'ioredis'; +import type { UserProfile, UsersRepository } from '@/models/index.js'; +import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js'; +import type { LocalUser, User } from '@/models/entities/User.js'; +import { DI } from '@/di-symbols.js'; +import { UserEntityService } from '@/core/entities/UserEntityService.js'; +import { bindThis } from '@/decorators.js'; +import { StreamMessages } from '@/server/api/stream/types.js'; +import type { OnApplicationShutdown } from '@nestjs/common'; + +@Injectable() +export class CacheService implements OnApplicationShutdown { + public userByIdCache: MemoryKVCache; + public localUserByNativeTokenCache: MemoryKVCache; + public localUserByIdCache: MemoryKVCache; + public uriPersonCache: MemoryKVCache; + public userProfileCache: RedisKVCache; + public userMutingsCache: RedisKVCache; + + constructor( + @Inject(DI.redis) + private redisClient: Redis.Redis, + + @Inject(DI.redisSubscriber) + private redisSubscriber: Redis.Redis, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + private userEntityService: UserEntityService, + ) { + //this.onMessage = this.onMessage.bind(this); + + this.userByIdCache = new MemoryKVCache(Infinity); + this.localUserByNativeTokenCache = new MemoryKVCache(Infinity); + this.localUserByIdCache = new MemoryKVCache(Infinity); + this.uriPersonCache = new MemoryKVCache(Infinity); + this.userProfileCache = new RedisKVCache(this.redisClient, 'userProfile', 1000 * 60 * 60 * 24, 1000 * 60); + this.userMutingsCache = new RedisKVCache(this.redisClient, 'userMutings', 1000 * 60 * 60 * 24, 1000 * 60); + + this.redisSubscriber.on('message', this.onMessage); + } + + @bindThis + private async onMessage(_: string, data: string): Promise { + const obj = JSON.parse(data); + + if (obj.channel === 'internal') { + const { type, body } = obj.message as StreamMessages['internal']['payload']; + switch (type) { + case 'userChangeSuspendedState': + case 'remoteUserUpdated': { + const user = await this.usersRepository.findOneByOrFail({ id: body.id }); + this.userByIdCache.set(user.id, user); + for (const [k, v] of this.uriPersonCache.cache.entries()) { + if (v.value?.id === user.id) { + this.uriPersonCache.set(k, user); + } + } + if (this.userEntityService.isLocalUser(user)) { + this.localUserByNativeTokenCache.set(user.token!, user); + this.localUserByIdCache.set(user.id, user); + } + break; + } + case 'userTokenRegenerated': { + const user = await this.usersRepository.findOneByOrFail({ id: body.id }) as LocalUser; + this.localUserByNativeTokenCache.delete(body.oldToken); + this.localUserByNativeTokenCache.set(body.newToken, user); + break; + } + case 'follow': { + const follower = this.userByIdCache.get(body.followerId); + if (follower) follower.followingCount++; + const followee = this.userByIdCache.get(body.followeeId); + if (followee) followee.followersCount++; + break; + } + default: + break; + } + } + } + + @bindThis + public findUserById(userId: User['id']) { + return this.userByIdCache.fetch(userId, () => this.usersRepository.findOneByOrFail({ id: userId })); + } + + @bindThis + public onApplicationShutdown(signal?: string | undefined) { + this.redisSubscriber.off('message', this.onMessage); + } +} diff --git a/packages/backend/src/core/CoreModule.ts b/packages/backend/src/core/CoreModule.ts index d67e80fc1d..5c867e6cfc 100644 --- a/packages/backend/src/core/CoreModule.ts +++ b/packages/backend/src/core/CoreModule.ts @@ -38,7 +38,7 @@ import { S3Service } from './S3Service.js'; import { SignupService } from './SignupService.js'; import { TwoFactorAuthenticationService } from './TwoFactorAuthenticationService.js'; import { UserBlockingService } from './UserBlockingService.js'; -import { UserCacheService } from './UserCacheService.js'; +import { CacheService } from './CacheService.js'; import { UserFollowingService } from './UserFollowingService.js'; import { UserKeypairStoreService } from './UserKeypairStoreService.js'; import { UserListService } from './UserListService.js'; @@ -159,7 +159,7 @@ const $S3Service: Provider = { provide: 'S3Service', useExisting: S3Service }; const $SignupService: Provider = { provide: 'SignupService', useExisting: SignupService }; const $TwoFactorAuthenticationService: Provider = { provide: 'TwoFactorAuthenticationService', useExisting: TwoFactorAuthenticationService }; const $UserBlockingService: Provider = { provide: 'UserBlockingService', useExisting: UserBlockingService }; -const $UserCacheService: Provider = { provide: 'UserCacheService', useExisting: UserCacheService }; +const $CacheService: Provider = { provide: 'CacheService', useExisting: CacheService }; const $UserFollowingService: Provider = { provide: 'UserFollowingService', useExisting: UserFollowingService }; const $UserKeypairStoreService: Provider = { provide: 'UserKeypairStoreService', useExisting: UserKeypairStoreService }; const $UserListService: Provider = { provide: 'UserListService', useExisting: UserListService }; @@ -282,7 +282,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting SignupService, TwoFactorAuthenticationService, UserBlockingService, - UserCacheService, + CacheService, UserFollowingService, UserKeypairStoreService, UserListService, @@ -399,7 +399,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting $SignupService, $TwoFactorAuthenticationService, $UserBlockingService, - $UserCacheService, + $CacheService, $UserFollowingService, $UserKeypairStoreService, $UserListService, @@ -517,7 +517,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting SignupService, TwoFactorAuthenticationService, UserBlockingService, - UserCacheService, + CacheService, UserFollowingService, UserKeypairStoreService, UserListService, @@ -633,7 +633,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting $SignupService, $TwoFactorAuthenticationService, $UserBlockingService, - $UserCacheService, + $CacheService, $UserFollowingService, $UserKeypairStoreService, $UserListService, diff --git a/packages/backend/src/core/InstanceActorService.ts b/packages/backend/src/core/InstanceActorService.ts index 049e27dec8..898fb4ce85 100644 --- a/packages/backend/src/core/InstanceActorService.ts +++ b/packages/backend/src/core/InstanceActorService.ts @@ -2,7 +2,7 @@ import { Inject, Injectable } from '@nestjs/common'; import { IsNull } from 'typeorm'; import type { LocalUser } from '@/models/entities/User.js'; import type { UsersRepository } from '@/models/index.js'; -import { MemoryKVCache } from '@/misc/cache.js'; +import { MemoryCache } from '@/misc/cache.js'; import { DI } from '@/di-symbols.js'; import { CreateSystemUserService } from '@/core/CreateSystemUserService.js'; import { bindThis } from '@/decorators.js'; @@ -11,7 +11,7 @@ const ACTOR_USERNAME = 'instance.actor' as const; @Injectable() export class InstanceActorService { - private cache: MemoryKVCache; + private cache: MemoryCache; constructor( @Inject(DI.usersRepository) @@ -19,12 +19,12 @@ export class InstanceActorService { private createSystemUserService: CreateSystemUserService, ) { - this.cache = new MemoryKVCache(Infinity); + this.cache = new MemoryCache(Infinity); } @bindThis public async getInstanceActor(): Promise { - const cached = this.cache.get(null); + const cached = this.cache.get(); if (cached) return cached; const user = await this.usersRepository.findOneBy({ @@ -33,11 +33,11 @@ export class InstanceActorService { }) as LocalUser | undefined; if (user) { - this.cache.set(null, user); + this.cache.set(user); return user; } else { const created = await this.createSystemUserService.createSystemUser(ACTOR_USERNAME) as LocalUser; - this.cache.set(null, created); + this.cache.set(created); return created; } } diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index 552f241044..83290b310e 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -20,7 +20,7 @@ import { isDuplicateKeyValueError } from '@/misc/is-duplicate-key-value-error.js import { checkWordMute } from '@/misc/check-word-mute.js'; import type { Channel } from '@/models/entities/Channel.js'; import { normalizeForSearch } from '@/misc/normalize-for-search.js'; -import { MemoryKVCache } from '@/misc/cache.js'; +import { MemoryCache } from '@/misc/cache.js'; import type { UserProfile } from '@/models/entities/UserProfile.js'; import { RelayService } from '@/core/RelayService.js'; import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; @@ -47,7 +47,7 @@ import { DB_MAX_NOTE_TEXT_LENGTH } from '@/const.js'; import { RoleService } from '@/core/RoleService.js'; import { MetaService } from '@/core/MetaService.js'; -const mutedWordsCache = new MemoryKVCache<{ userId: UserProfile['userId']; mutedWords: UserProfile['mutedWords']; }[]>(1000 * 60 * 5); +const mutedWordsCache = new MemoryCache<{ userId: UserProfile['userId']; mutedWords: UserProfile['mutedWords']; }[]>(1000 * 60 * 5); type NotificationType = 'reply' | 'renote' | 'quote' | 'mention'; @@ -473,7 +473,7 @@ export class NoteCreateService implements OnApplicationShutdown { this.incNotesCountOfUser(user); // Word mute - mutedWordsCache.fetch(null, () => this.userProfilesRepository.find({ + mutedWordsCache.fetch(() => this.userProfilesRepository.find({ where: { enableWordMute: true, }, diff --git a/packages/backend/src/core/NotificationService.ts b/packages/backend/src/core/NotificationService.ts index 2a4dbba6a4..9c179f9318 100644 --- a/packages/backend/src/core/NotificationService.ts +++ b/packages/backend/src/core/NotificationService.ts @@ -3,7 +3,7 @@ import Redis from 'ioredis'; import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import { In } from 'typeorm'; import { DI } from '@/di-symbols.js'; -import type { MutingsRepository, UserProfilesRepository, UsersRepository } from '@/models/index.js'; +import type { MutingsRepository, UserProfile, UserProfilesRepository, UsersRepository } from '@/models/index.js'; import type { User } from '@/models/entities/User.js'; import type { Notification } from '@/models/entities/Notification.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; @@ -12,6 +12,7 @@ import { GlobalEventService } from '@/core/GlobalEventService.js'; import { PushNotificationService } from '@/core/PushNotificationService.js'; import { NotificationEntityService } from '@/core/entities/NotificationEntityService.js'; import { IdService } from '@/core/IdService.js'; +import { CacheService } from '@/core/CacheService.js'; @Injectable() export class NotificationService implements OnApplicationShutdown { @@ -35,6 +36,7 @@ export class NotificationService implements OnApplicationShutdown { private idService: IdService, private globalEventService: GlobalEventService, private pushNotificationService: PushNotificationService, + private cacheService: CacheService, ) { } @@ -49,7 +51,6 @@ export class NotificationService implements OnApplicationShutdown { '+', '-', 'COUNT', 1); - console.log('latestNotificationIdsRes', latestNotificationIdsRes); const latestNotificationId = latestNotificationIdsRes[0]?.[0]; if (latestNotificationId == null) return; @@ -72,9 +73,8 @@ export class NotificationService implements OnApplicationShutdown { type: Notification['type'], data: Partial, ): Promise { - // TODO: Cache - const profile = await this.userProfilesRepository.findOneBy({ userId: notifieeId }); - const isMuted = profile?.mutingNotificationTypes.includes(type); + const profile = await this.cacheService.userProfileCache.fetch(notifieeId, () => this.userProfilesRepository.findOneByOrFail({ userId: notifieeId })); + const isMuted = profile.mutingNotificationTypes.includes(type); if (isMuted) return null; if (data.notifierId) { @@ -82,12 +82,8 @@ export class NotificationService implements OnApplicationShutdown { return null; } - // TODO: cache - const mutings = await this.mutingsRepository.findOneBy({ - muterId: notifieeId, - muteeId: data.notifierId, - }); - if (mutings) { + const mutings = await this.cacheService.userMutingsCache.fetch(notifieeId, () => this.mutingsRepository.findBy({ muterId: notifieeId }).then(xs => xs.map(x => x.muteeId))); + if (mutings.includes(data.notifierId)) { return null; } } diff --git a/packages/backend/src/core/RelayService.ts b/packages/backend/src/core/RelayService.ts index be5a4d4b02..4df7fb3bff 100644 --- a/packages/backend/src/core/RelayService.ts +++ b/packages/backend/src/core/RelayService.ts @@ -3,7 +3,7 @@ import { IsNull } from 'typeorm'; import type { LocalUser, User } from '@/models/entities/User.js'; import type { RelaysRepository, UsersRepository } from '@/models/index.js'; import { IdService } from '@/core/IdService.js'; -import { MemoryKVCache } from '@/misc/cache.js'; +import { MemoryCache } from '@/misc/cache.js'; import type { Relay } from '@/models/entities/Relay.js'; import { QueueService } from '@/core/QueueService.js'; import { CreateSystemUserService } from '@/core/CreateSystemUserService.js'; @@ -16,7 +16,7 @@ const ACTOR_USERNAME = 'relay.actor' as const; @Injectable() export class RelayService { - private relaysCache: MemoryKVCache; + private relaysCache: MemoryCache; constructor( @Inject(DI.usersRepository) @@ -30,7 +30,7 @@ export class RelayService { private createSystemUserService: CreateSystemUserService, private apRendererService: ApRendererService, ) { - this.relaysCache = new MemoryKVCache(1000 * 60 * 10); + this.relaysCache = new MemoryCache(1000 * 60 * 10); } @bindThis @@ -109,7 +109,7 @@ export class RelayService { public async deliverToRelays(user: { id: User['id']; host: null; }, activity: any): Promise { if (activity == null) return; - const relays = await this.relaysCache.fetch(null, () => this.relaysRepository.findBy({ + const relays = await this.relaysCache.fetch(() => this.relaysRepository.findBy({ status: 'accepted', })); if (relays.length === 0) return; diff --git a/packages/backend/src/core/RoleService.ts b/packages/backend/src/core/RoleService.ts index 678bcfc337..52e6292a1e 100644 --- a/packages/backend/src/core/RoleService.ts +++ b/packages/backend/src/core/RoleService.ts @@ -2,12 +2,12 @@ import { Inject, Injectable } from '@nestjs/common'; import Redis from 'ioredis'; import { In } from 'typeorm'; import type { Role, RoleAssignment, RoleAssignmentsRepository, RolesRepository, UsersRepository } from '@/models/index.js'; -import { MemoryKVCache } from '@/misc/cache.js'; +import { MemoryKVCache, MemoryCache } from '@/misc/cache.js'; import type { User } from '@/models/entities/User.js'; import { DI } from '@/di-symbols.js'; import { bindThis } from '@/decorators.js'; import { MetaService } from '@/core/MetaService.js'; -import { UserCacheService } from '@/core/UserCacheService.js'; +import { CacheService } from '@/core/CacheService.js'; import type { RoleCondFormulaValue } from '@/models/entities/Role.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { StreamMessages } from '@/server/api/stream/types.js'; @@ -57,7 +57,7 @@ export const DEFAULT_POLICIES: RolePolicies = { @Injectable() export class RoleService implements OnApplicationShutdown { - private rolesCache: MemoryKVCache; + private rolesCache: MemoryCache; private roleAssignmentByUserIdCache: MemoryKVCache; public static AlreadyAssignedError = class extends Error {}; @@ -77,14 +77,14 @@ export class RoleService implements OnApplicationShutdown { private roleAssignmentsRepository: RoleAssignmentsRepository, private metaService: MetaService, - private userCacheService: UserCacheService, + private cacheService: CacheService, private userEntityService: UserEntityService, private globalEventService: GlobalEventService, private idService: IdService, ) { //this.onMessage = this.onMessage.bind(this); - this.rolesCache = new MemoryKVCache(Infinity); + this.rolesCache = new MemoryCache(Infinity); this.roleAssignmentByUserIdCache = new MemoryKVCache(Infinity); this.redisSubscriber.on('message', this.onMessage); @@ -98,7 +98,7 @@ export class RoleService implements OnApplicationShutdown { const { type, body } = obj.message as StreamMessages['internal']['payload']; switch (type) { case 'roleCreated': { - const cached = this.rolesCache.get(null); + const cached = this.rolesCache.get(); if (cached) { cached.push({ ...body, @@ -110,7 +110,7 @@ export class RoleService implements OnApplicationShutdown { break; } case 'roleUpdated': { - const cached = this.rolesCache.get(null); + const cached = this.rolesCache.get(); if (cached) { const i = cached.findIndex(x => x.id === body.id); if (i > -1) { @@ -125,9 +125,9 @@ export class RoleService implements OnApplicationShutdown { break; } case 'roleDeleted': { - const cached = this.rolesCache.get(null); + const cached = this.rolesCache.get(); if (cached) { - this.rolesCache.set(null, cached.filter(x => x.id !== body.id)); + this.rolesCache.set(cached.filter(x => x.id !== body.id)); } break; } @@ -214,9 +214,9 @@ export class RoleService implements OnApplicationShutdown { // 期限切れのロールを除外 assigns = assigns.filter(a => a.expiresAt == null || (a.expiresAt.getTime() > now)); const assignedRoleIds = assigns.map(x => x.roleId); - const roles = await this.rolesCache.fetch(null, () => this.rolesRepository.findBy({})); + const roles = await this.rolesCache.fetch(() => this.rolesRepository.findBy({})); const assignedRoles = roles.filter(r => assignedRoleIds.includes(r.id)); - const user = roles.some(r => r.target === 'conditional') ? await this.userCacheService.findById(userId) : null; + const user = roles.some(r => r.target === 'conditional') ? await this.cacheService.findUserById(userId) : null; const matchedCondRoles = roles.filter(r => r.target === 'conditional' && this.evalCond(user!, r.condFormula)); return [...assignedRoles, ...matchedCondRoles]; } @@ -231,11 +231,11 @@ export class RoleService implements OnApplicationShutdown { // 期限切れのロールを除外 assigns = assigns.filter(a => a.expiresAt == null || (a.expiresAt.getTime() > now)); const assignedRoleIds = assigns.map(x => x.roleId); - const roles = await this.rolesCache.fetch(null, () => this.rolesRepository.findBy({})); + const roles = await this.rolesCache.fetch(() => this.rolesRepository.findBy({})); const assignedBadgeRoles = roles.filter(r => r.asBadge && assignedRoleIds.includes(r.id)); const badgeCondRoles = roles.filter(r => r.asBadge && (r.target === 'conditional')); if (badgeCondRoles.length > 0) { - const user = roles.some(r => r.target === 'conditional') ? await this.userCacheService.findById(userId) : null; + const user = roles.some(r => r.target === 'conditional') ? await this.cacheService.findUserById(userId) : null; const matchedBadgeCondRoles = badgeCondRoles.filter(r => this.evalCond(user!, r.condFormula)); return [...assignedBadgeRoles, ...matchedBadgeCondRoles]; } else { @@ -301,7 +301,7 @@ export class RoleService implements OnApplicationShutdown { @bindThis public async getModeratorIds(includeAdmins = true): Promise { - const roles = await this.rolesCache.fetch(null, () => this.rolesRepository.findBy({})); + const roles = await this.rolesCache.fetch(() => this.rolesRepository.findBy({})); const moderatorRoles = includeAdmins ? roles.filter(r => r.isModerator || r.isAdministrator) : roles.filter(r => r.isModerator); const assigns = moderatorRoles.length > 0 ? await this.roleAssignmentsRepository.findBy({ roleId: In(moderatorRoles.map(r => r.id)), @@ -321,7 +321,7 @@ export class RoleService implements OnApplicationShutdown { @bindThis public async getAdministratorIds(): Promise { - const roles = await this.rolesCache.fetch(null, () => this.rolesRepository.findBy({})); + const roles = await this.rolesCache.fetch(() => this.rolesRepository.findBy({})); const administratorRoles = roles.filter(r => r.isAdministrator); const assigns = administratorRoles.length > 0 ? await this.roleAssignmentsRepository.findBy({ roleId: In(administratorRoles.map(r => r.id)), diff --git a/packages/backend/src/core/UserCacheService.ts b/packages/backend/src/core/UserCacheService.ts deleted file mode 100644 index e452caf5d1..0000000000 --- a/packages/backend/src/core/UserCacheService.ts +++ /dev/null @@ -1,88 +0,0 @@ -import { Inject, Injectable } from '@nestjs/common'; -import Redis from 'ioredis'; -import type { UsersRepository } from '@/models/index.js'; -import { MemoryKVCache } from '@/misc/cache.js'; -import type { LocalUser, User } from '@/models/entities/User.js'; -import { DI } from '@/di-symbols.js'; -import { UserEntityService } from '@/core/entities/UserEntityService.js'; -import { bindThis } from '@/decorators.js'; -import { StreamMessages } from '@/server/api/stream/types.js'; -import type { OnApplicationShutdown } from '@nestjs/common'; - -@Injectable() -export class UserCacheService implements OnApplicationShutdown { - public userByIdCache: MemoryKVCache; - public localUserByNativeTokenCache: MemoryKVCache; - public localUserByIdCache: MemoryKVCache; - public uriPersonCache: MemoryKVCache; - - constructor( - @Inject(DI.redisSubscriber) - private redisSubscriber: Redis.Redis, - - @Inject(DI.usersRepository) - private usersRepository: UsersRepository, - - private userEntityService: UserEntityService, - ) { - //this.onMessage = this.onMessage.bind(this); - - this.userByIdCache = new MemoryKVCache(Infinity); - this.localUserByNativeTokenCache = new MemoryKVCache(Infinity); - this.localUserByIdCache = new MemoryKVCache(Infinity); - this.uriPersonCache = new MemoryKVCache(Infinity); - - this.redisSubscriber.on('message', this.onMessage); - } - - @bindThis - private async onMessage(_: string, data: string): Promise { - const obj = JSON.parse(data); - - if (obj.channel === 'internal') { - const { type, body } = obj.message as StreamMessages['internal']['payload']; - switch (type) { - case 'userChangeSuspendedState': - case 'remoteUserUpdated': { - const user = await this.usersRepository.findOneByOrFail({ id: body.id }); - this.userByIdCache.set(user.id, user); - for (const [k, v] of this.uriPersonCache.cache.entries()) { - if (v.value?.id === user.id) { - this.uriPersonCache.set(k, user); - } - } - if (this.userEntityService.isLocalUser(user)) { - this.localUserByNativeTokenCache.set(user.token, user); - this.localUserByIdCache.set(user.id, user); - } - break; - } - case 'userTokenRegenerated': { - const user = await this.usersRepository.findOneByOrFail({ id: body.id }) as LocalUser; - this.localUserByNativeTokenCache.delete(body.oldToken); - this.localUserByNativeTokenCache.set(body.newToken, user); - break; - } - case 'follow': { - const follower = this.userByIdCache.get(body.followerId); - if (follower) follower.followingCount++; - const followee = this.userByIdCache.get(body.followeeId); - if (followee) followee.followersCount++; - break; - } - default: - break; - } - } - } - - @bindThis - public findById(userId: User['id']) { - return this.userByIdCache.fetch(userId, () => this.usersRepository.findOneByOrFail({ id: userId })); - } - - @bindThis - public onApplicationShutdown(signal?: string | undefined) { - this.redisSubscriber.off('message', this.onMessage); - } -} diff --git a/packages/backend/src/core/activitypub/ApDbResolverService.ts b/packages/backend/src/core/activitypub/ApDbResolverService.ts index dc0a865bbe..4b032be89a 100644 --- a/packages/backend/src/core/activitypub/ApDbResolverService.ts +++ b/packages/backend/src/core/activitypub/ApDbResolverService.ts @@ -5,7 +5,7 @@ import type { NotesRepository, UserPublickeysRepository, UsersRepository } from import type { Config } from '@/config.js'; import { MemoryKVCache } from '@/misc/cache.js'; import type { UserPublickey } from '@/models/entities/UserPublickey.js'; -import { UserCacheService } from '@/core/UserCacheService.js'; +import { CacheService } from '@/core/CacheService.js'; import type { Note } from '@/models/entities/Note.js'; import { bindThis } from '@/decorators.js'; import { RemoteUser, User } from '@/models/entities/User.js'; @@ -47,7 +47,7 @@ export class ApDbResolverService { @Inject(DI.userPublickeysRepository) private userPublickeysRepository: UserPublickeysRepository, - private userCacheService: UserCacheService, + private cacheService: CacheService, private apPersonService: ApPersonService, ) { this.publicKeyCache = new MemoryKVCache(Infinity); @@ -107,11 +107,11 @@ export class ApDbResolverService { if (parsed.local) { if (parsed.type !== 'users') return null; - return await this.userCacheService.userByIdCache.fetchMaybe(parsed.id, () => this.usersRepository.findOneBy({ + return await this.cacheService.userByIdCache.fetchMaybe(parsed.id, () => this.usersRepository.findOneBy({ id: parsed.id, }).then(x => x ?? undefined)) ?? null; } else { - return await this.userCacheService.uriPersonCache.fetch(parsed.uri, () => this.usersRepository.findOneBy({ + return await this.cacheService.uriPersonCache.fetch(parsed.uri, () => this.usersRepository.findOneBy({ uri: parsed.uri, })); } @@ -138,7 +138,7 @@ export class ApDbResolverService { if (key == null) return null; return { - user: await this.userCacheService.findById(key.userId) as RemoteUser, + user: await this.cacheService.findUserById(key.userId) as RemoteUser, key, }; } diff --git a/packages/backend/src/core/activitypub/models/ApPersonService.ts b/packages/backend/src/core/activitypub/models/ApPersonService.ts index 41f7eafa41..67e907c271 100644 --- a/packages/backend/src/core/activitypub/models/ApPersonService.ts +++ b/packages/backend/src/core/activitypub/models/ApPersonService.ts @@ -8,7 +8,7 @@ import type { Config } from '@/config.js'; import type { RemoteUser } from '@/models/entities/User.js'; import { User } from '@/models/entities/User.js'; import { truncate } from '@/misc/truncate.js'; -import type { UserCacheService } from '@/core/UserCacheService.js'; +import type { CacheService } from '@/core/CacheService.js'; import { normalizeForSearch } from '@/misc/normalize-for-search.js'; import { isDuplicateKeyValueError } from '@/misc/is-duplicate-key-value-error.js'; import type Logger from '@/logger.js'; @@ -54,7 +54,7 @@ export class ApPersonService implements OnModuleInit { private metaService: MetaService; private federatedInstanceService: FederatedInstanceService; private fetchInstanceMetadataService: FetchInstanceMetadataService; - private userCacheService: UserCacheService; + private cacheService: CacheService; private apResolverService: ApResolverService; private apNoteService: ApNoteService; private apImageService: ApImageService; @@ -97,7 +97,7 @@ export class ApPersonService implements OnModuleInit { //private metaService: MetaService, //private federatedInstanceService: FederatedInstanceService, //private fetchInstanceMetadataService: FetchInstanceMetadataService, - //private userCacheService: UserCacheService, + //private cacheService: CacheService, //private apResolverService: ApResolverService, //private apNoteService: ApNoteService, //private apImageService: ApImageService, @@ -118,7 +118,7 @@ export class ApPersonService implements OnModuleInit { this.metaService = this.moduleRef.get('MetaService'); this.federatedInstanceService = this.moduleRef.get('FederatedInstanceService'); this.fetchInstanceMetadataService = this.moduleRef.get('FetchInstanceMetadataService'); - this.userCacheService = this.moduleRef.get('UserCacheService'); + this.cacheService = this.moduleRef.get('CacheService'); this.apResolverService = this.moduleRef.get('ApResolverService'); this.apNoteService = this.moduleRef.get('ApNoteService'); this.apImageService = this.moduleRef.get('ApImageService'); @@ -207,14 +207,14 @@ export class ApPersonService implements OnModuleInit { public async fetchPerson(uri: string, resolver?: Resolver): Promise { if (typeof uri !== 'string') throw new Error('uri is not string'); - const cached = this.userCacheService.uriPersonCache.get(uri); + const cached = this.cacheService.uriPersonCache.get(uri); if (cached) return cached; // URIがこのサーバーを指しているならデータベースからフェッチ if (uri.startsWith(this.config.url + '/')) { const id = uri.split('/').pop(); const u = await this.usersRepository.findOneBy({ id }); - if (u) this.userCacheService.uriPersonCache.set(uri, u); + if (u) this.cacheService.uriPersonCache.set(uri, u); return u; } @@ -222,7 +222,7 @@ export class ApPersonService implements OnModuleInit { const exist = await this.usersRepository.findOneBy({ uri }); if (exist) { - this.userCacheService.uriPersonCache.set(uri, exist); + this.cacheService.uriPersonCache.set(uri, exist); return exist; } //#endregion diff --git a/packages/backend/src/core/entities/NotificationEntityService.ts b/packages/backend/src/core/entities/NotificationEntityService.ts index 7cffb8d568..6b9a9d3320 100644 --- a/packages/backend/src/core/entities/NotificationEntityService.ts +++ b/packages/backend/src/core/entities/NotificationEntityService.ts @@ -54,6 +54,7 @@ export class NotificationEntityService implements OnModuleInit { public async pack( src: Notification, meId: User['id'], + // eslint-disable-next-line @typescript-eslint/ban-types options: { }, diff --git a/packages/backend/src/core/entities/UserEntityService.ts b/packages/backend/src/core/entities/UserEntityService.ts index 71aa2ee6de..e8474c7e0e 100644 --- a/packages/backend/src/core/entities/UserEntityService.ts +++ b/packages/backend/src/core/entities/UserEntityService.ts @@ -255,7 +255,6 @@ export class UserEntityService implements OnModuleInit { '+', '-', 'COUNT', 1); - console.log('latestNotificationIdsRes', latestNotificationIdsRes); const latestNotificationId = latestNotificationIdsRes[0]?.[0]; return latestNotificationId != null && (latestReadNotificationId == null || latestReadNotificationId < latestNotificationId); diff --git a/packages/backend/src/misc/cache.ts b/packages/backend/src/misc/cache.ts index a805d18421..870dfd237c 100644 --- a/packages/backend/src/misc/cache.ts +++ b/packages/backend/src/misc/cache.ts @@ -1,9 +1,94 @@ +import Redis from 'ioredis'; import { bindThis } from '@/decorators.js'; +// redis通すとDateのインスタンスはstringに変換されるので +type Serialized = { + [K in keyof T]: + T[K] extends Date + ? string + : T[K] extends (Date | null) + ? (string | null) + : T[K] extends Record + ? Serialized + : T[K]; +}; + +export class RedisKVCache { + private redisClient: Redis.Redis; + private name: string; + private lifetime: number; + private memoryCache: MemoryKVCache; + + constructor(redisClient: RedisKVCache['redisClient'], name: RedisKVCache['name'], lifetime: RedisKVCache['lifetime'], memoryCacheLifetime: number) { + this.redisClient = redisClient; + this.name = name; + this.lifetime = lifetime; + this.memoryCache = new MemoryKVCache(memoryCacheLifetime); + } + + @bindThis + public async set(key: string, value: T): Promise { + this.memoryCache.set(key, value); + if (this.lifetime === Infinity) { + await this.redisClient.set( + `kvcache:${this.name}:${key}`, + JSON.stringify(value), + ); + } else { + await this.redisClient.set( + `kvcache:${this.name}:${key}`, + JSON.stringify(value), + 'ex', Math.round(this.lifetime / 1000), + ); + } + } + + @bindThis + public async get(key: string): Promise | T | undefined> { + const memoryCached = this.memoryCache.get(key); + if (memoryCached !== undefined) return memoryCached; + + const cached = await this.redisClient.get(`kvcache:${this.name}:${key}`); + if (cached == null) return undefined; + return JSON.parse(cached); + } + + @bindThis + public async delete(key: string): Promise { + this.memoryCache.delete(key); + await this.redisClient.del(`kvcache:${this.name}:${key}`); + } + + /** + * キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します + * optional: キャッシュが存在してもvalidatorでfalseを返すとキャッシュ無効扱いにします + */ + @bindThis + public async fetch(key: string, fetcher: () => Promise, validator?: (cachedValue: Serialized | T) => boolean): Promise | T> { + const cachedValue = await this.get(key); + if (cachedValue !== undefined) { + if (validator) { + if (validator(cachedValue)) { + // Cache HIT + return cachedValue; + } + } else { + // Cache HIT + return cachedValue; + } + } + + // Cache MISS + const value = await fetcher(); + this.set(key, value); + return value; + } +} + // TODO: メモリ節約のためあまり参照されないキーを定期的に削除できるようにする? export class MemoryKVCache { - public cache: Map; + public cache: Map; private lifetime: number; constructor(lifetime: MemoryKVCache['lifetime']) { @@ -12,7 +97,7 @@ export class MemoryKVCache { } @bindThis - public set(key: string | null, value: T): void { + public set(key: string, value: T): void { this.cache.set(key, { date: Date.now(), value, @@ -20,7 +105,7 @@ export class MemoryKVCache { } @bindThis - public get(key: string | null): T | undefined { + public get(key: string): T | undefined { const cached = this.cache.get(key); if (cached == null) return undefined; if ((Date.now() - cached.date) > this.lifetime) { @@ -31,7 +116,7 @@ export class MemoryKVCache { } @bindThis - public delete(key: string | null) { + public delete(key: string) { this.cache.delete(key); } @@ -40,7 +125,7 @@ export class MemoryKVCache { * optional: キャッシュが存在してもvalidatorでfalseを返すとキャッシュ無効扱いにします */ @bindThis - public async fetch(key: string | null, fetcher: () => Promise, validator?: (cachedValue: T) => boolean): Promise { + public async fetch(key: string, fetcher: () => Promise, validator?: (cachedValue: T) => boolean): Promise { const cachedValue = this.get(key); if (cachedValue !== undefined) { if (validator) { @@ -65,7 +150,7 @@ export class MemoryKVCache { * optional: キャッシュが存在してもvalidatorでfalseを返すとキャッシュ無効扱いにします */ @bindThis - public async fetchMaybe(key: string | null, fetcher: () => Promise, validator?: (cachedValue: T) => boolean): Promise { + public async fetchMaybe(key: string, fetcher: () => Promise, validator?: (cachedValue: T) => boolean): Promise { const cachedValue = this.get(key); if (cachedValue !== undefined) { if (validator) { diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts index 71865c778a..a9af22ad09 100644 --- a/packages/backend/src/queue/processors/DeliverProcessorService.ts +++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts @@ -7,7 +7,7 @@ import { MetaService } from '@/core/MetaService.js'; import { ApRequestService } from '@/core/activitypub/ApRequestService.js'; import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js'; -import { MemoryKVCache } from '@/misc/cache.js'; +import { MemoryCache } from '@/misc/cache.js'; import type { Instance } from '@/models/entities/Instance.js'; import InstanceChart from '@/core/chart/charts/instance.js'; import ApRequestChart from '@/core/chart/charts/ap-request.js'; @@ -22,7 +22,7 @@ import type { DeliverJobData } from '../types.js'; @Injectable() export class DeliverProcessorService { private logger: Logger; - private suspendedHostsCache: MemoryKVCache; + private suspendedHostsCache: MemoryCache; private latest: string | null; constructor( @@ -46,7 +46,7 @@ export class DeliverProcessorService { private queueLoggerService: QueueLoggerService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('deliver'); - this.suspendedHostsCache = new MemoryKVCache(1000 * 60 * 60); + this.suspendedHostsCache = new MemoryCache(1000 * 60 * 60); } @bindThis @@ -60,14 +60,14 @@ export class DeliverProcessorService { } // isSuspendedなら中断 - let suspendedHosts = this.suspendedHostsCache.get(null); + let suspendedHosts = this.suspendedHostsCache.get(); if (suspendedHosts == null) { suspendedHosts = await this.instancesRepository.find({ where: { isSuspended: true, }, }); - this.suspendedHostsCache.set(null, suspendedHosts); + this.suspendedHostsCache.set(suspendedHosts); } if (suspendedHosts.map(x => x.host).includes(this.utilityService.toPuny(host))) { return 'skip (suspended)'; diff --git a/packages/backend/src/server/NodeinfoServerService.ts b/packages/backend/src/server/NodeinfoServerService.ts index 3387bd53aa..66c1faaac2 100644 --- a/packages/backend/src/server/NodeinfoServerService.ts +++ b/packages/backend/src/server/NodeinfoServerService.ts @@ -4,7 +4,7 @@ import type { NotesRepository, UsersRepository } from '@/models/index.js'; import type { Config } from '@/config.js'; import { MetaService } from '@/core/MetaService.js'; import { MAX_NOTE_TEXT_LENGTH } from '@/const.js'; -import { MemoryKVCache } from '@/misc/cache.js'; +import { MemoryCache } from '@/misc/cache.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { bindThis } from '@/decorators.js'; import NotesChart from '@/core/chart/charts/notes.js'; @@ -118,17 +118,17 @@ export class NodeinfoServerService { }; }; - const cache = new MemoryKVCache>>(1000 * 60 * 10); + const cache = new MemoryCache>>(1000 * 60 * 10); fastify.get(nodeinfo2_1path, async (request, reply) => { - const base = await cache.fetch(null, () => nodeinfo2()); + const base = await cache.fetch(() => nodeinfo2()); reply.header('Cache-Control', 'public, max-age=600'); return { version: '2.1', ...base }; }); fastify.get(nodeinfo2_0path, async (request, reply) => { - const base = await cache.fetch(null, () => nodeinfo2()); + const base = await cache.fetch(() => nodeinfo2()); delete (base as any).software.repository; diff --git a/packages/backend/src/server/api/AuthenticateService.ts b/packages/backend/src/server/api/AuthenticateService.ts index cd6bce9ef9..6548c475b2 100644 --- a/packages/backend/src/server/api/AuthenticateService.ts +++ b/packages/backend/src/server/api/AuthenticateService.ts @@ -5,7 +5,7 @@ import type { LocalUser } from '@/models/entities/User.js'; import type { AccessToken } from '@/models/entities/AccessToken.js'; import { MemoryKVCache } from '@/misc/cache.js'; import type { App } from '@/models/entities/App.js'; -import { UserCacheService } from '@/core/UserCacheService.js'; +import { CacheService } from '@/core/CacheService.js'; import isNativeToken from '@/misc/is-native-token.js'; import { bindThis } from '@/decorators.js'; @@ -30,7 +30,7 @@ export class AuthenticateService { @Inject(DI.appsRepository) private appsRepository: AppsRepository, - private userCacheService: UserCacheService, + private cacheService: CacheService, ) { this.appCache = new MemoryKVCache(Infinity); } @@ -42,7 +42,7 @@ export class AuthenticateService { } if (isNativeToken(token)) { - const user = await this.userCacheService.localUserByNativeTokenCache.fetch(token, + const user = await this.cacheService.localUserByNativeTokenCache.fetch(token, () => this.usersRepository.findOneBy({ token }) as Promise); if (user == null) { @@ -67,7 +67,7 @@ export class AuthenticateService { lastUsedAt: new Date(), }); - const user = await this.userCacheService.localUserByIdCache.fetch(accessToken.userId, + const user = await this.cacheService.localUserByIdCache.fetch(accessToken.userId, () => this.usersRepository.findOneBy({ id: accessToken.userId, }) as Promise); diff --git a/packages/backend/src/server/api/endpoints/i/regenerate-token.ts b/packages/backend/src/server/api/endpoints/i/regenerate-token.ts index f942f43cc8..786e64374c 100644 --- a/packages/backend/src/server/api/endpoints/i/regenerate-token.ts +++ b/packages/backend/src/server/api/endpoints/i/regenerate-token.ts @@ -34,7 +34,7 @@ export default class extends Endpoint { ) { super(meta, paramDef, async (ps, me) => { const freshUser = await this.usersRepository.findOneByOrFail({ id: me.id }); - const oldToken = freshUser.token; + const oldToken = freshUser.token!; const profile = await this.userProfilesRepository.findOneByOrFail({ userId: me.id }); diff --git a/packages/backend/src/server/api/endpoints/i/update.ts b/packages/backend/src/server/api/endpoints/i/update.ts index b1eaab3908..46b16e9dce 100644 --- a/packages/backend/src/server/api/endpoints/i/update.ts +++ b/packages/backend/src/server/api/endpoints/i/update.ts @@ -18,6 +18,7 @@ import { AccountUpdateService } from '@/core/AccountUpdateService.js'; import { HashtagService } from '@/core/HashtagService.js'; import { DI } from '@/di-symbols.js'; import { RoleService } from '@/core/RoleService.js'; +import { CacheService } from '@/core/CacheService.js'; import { ApiError } from '../../error.js'; export const meta = { @@ -152,6 +153,7 @@ export default class extends Endpoint { private accountUpdateService: AccountUpdateService, private hashtagService: HashtagService, private roleService: RoleService, + private cacheService: CacheService, ) { super(meta, paramDef, async (ps, _user, token) => { const user = await this.usersRepository.findOneByOrFail({ id: _user.id }); @@ -276,9 +278,13 @@ export default class extends Endpoint { includeSecrets: isSecure, }); + const updatedProfile = await this.userProfilesRepository.findOneByOrFail({ userId: user.id }); + + this.cacheService.userProfileCache.set(user.id, updatedProfile); + // Publish meUpdated event this.globalEventService.publishMainStream(user.id, 'meUpdated', iObj); - this.globalEventService.publishUserEvent(user.id, 'updateUserProfile', await this.userProfilesRepository.findOneByOrFail({ userId: user.id })); + this.globalEventService.publishUserEvent(user.id, 'updateUserProfile', updatedProfile); // 鍵垢を解除したとき、溜まっていたフォローリクエストがあるならすべて承認 if (user.isLocked && ps.isLocked === false) { diff --git a/packages/backend/src/server/api/endpoints/mute/create.ts b/packages/backend/src/server/api/endpoints/mute/create.ts index 9099eea52e..fd062e1cab 100644 --- a/packages/backend/src/server/api/endpoints/mute/create.ts +++ b/packages/backend/src/server/api/endpoints/mute/create.ts @@ -7,6 +7,7 @@ import type { Muting } from '@/models/entities/Muting.js'; import { GlobalEventService } from '@/core/GlobalEventService.js'; import { DI } from '@/di-symbols.js'; import { GetterService } from '@/server/api/GetterService.js'; +import { CacheService } from '@/core/CacheService.js'; import { ApiError } from '../../error.js'; export const meta = { @@ -65,6 +66,7 @@ export default class extends Endpoint { private globalEventService: GlobalEventService, private getterService: GetterService, private idService: IdService, + private cacheService: CacheService, ) { super(meta, paramDef, async (ps, me) => { const muter = me; @@ -103,6 +105,7 @@ export default class extends Endpoint { muteeId: mutee.id, } as Muting); + this.cacheService.userMutingsCache.delete(muter.id); this.globalEventService.publishUserEvent(me.id, 'mute', mutee); }); } diff --git a/packages/backend/src/server/api/endpoints/renote-mute/create.ts b/packages/backend/src/server/api/endpoints/renote-mute/create.ts index 051a005b67..b285269617 100644 --- a/packages/backend/src/server/api/endpoints/renote-mute/create.ts +++ b/packages/backend/src/server/api/endpoints/renote-mute/create.ts @@ -92,8 +92,6 @@ export default class extends Endpoint { muterId: muter.id, muteeId: mutee.id, } as RenoteMuting); - - // publishUserEvent(user.id, 'mute', mutee); }); } } diff --git a/packages/backend/src/server/api/stream/types.ts b/packages/backend/src/server/api/stream/types.ts index b8f50e0546..1e6e51e76d 100644 --- a/packages/backend/src/server/api/stream/types.ts +++ b/packages/backend/src/server/api/stream/types.ts @@ -19,7 +19,7 @@ import type { EventEmitter } from 'events'; //#region Stream type-body definitions export interface InternalStreamTypes { userChangeSuspendedState: { id: User['id']; isSuspended: User['isSuspended']; }; - userTokenRegenerated: { id: User['id']; oldToken: User['token']; newToken: User['token']; }; + userTokenRegenerated: { id: User['id']; oldToken: string; newToken: string; }; remoteUserUpdated: { id: User['id']; }; follow: { followerId: User['id']; followeeId: User['id']; }; unfollow: { followerId: User['id']; followeeId: User['id']; }; diff --git a/packages/backend/test/unit/RoleService.ts b/packages/backend/test/unit/RoleService.ts index 6fe04274e6..907f1f2edc 100644 --- a/packages/backend/test/unit/RoleService.ts +++ b/packages/backend/test/unit/RoleService.ts @@ -11,7 +11,7 @@ import type { Role, RolesRepository, RoleAssignmentsRepository, UsersRepository, import { DI } from '@/di-symbols.js'; import { MetaService } from '@/core/MetaService.js'; import { genAid } from '@/misc/id/aid.js'; -import { UserCacheService } from '@/core/UserCacheService.js'; +import { CacheService } from '@/core/CacheService.js'; import { IdService } from '@/core/IdService.js'; import { GlobalEventService } from '@/core/GlobalEventService.js'; import { sleep } from '../utils.js'; @@ -65,7 +65,7 @@ describe('RoleService', () => { ], providers: [ RoleService, - UserCacheService, + CacheService, IdService, GlobalEventService, ], -- cgit v1.2.3-freya From 625fed88383207e20b3a40d5ef8dbba9f70a8a9e Mon Sep 17 00:00:00 2001 From: syuilo Date: Wed, 5 Apr 2023 07:52:49 +0900 Subject: enhance(backend): チャンネルの既読管理を削除 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 現状上手く機能していない - パフォーマンス上の理由 - 実装するにしてももっと効率的な方法がある --- packages/backend/src/core/NoteCreateService.ts | 12 ----- packages/backend/src/core/NoteReadService.ts | 58 ++-------------------- .../backend/src/core/entities/UserEntityService.ts | 14 +----- packages/backend/src/models/json-schema/user.ts | 4 -- packages/backend/src/server/api/stream/index.ts | 5 +- packages/backend/src/server/api/stream/types.ts | 2 - packages/frontend/src/init.ts | 9 ---- packages/misskey-js/src/entities.ts | 1 - packages/misskey-js/src/streaming.types.ts | 2 - 9 files changed, 6 insertions(+), 101 deletions(-) (limited to 'packages/backend/src/server/api/stream') diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index 83290b310e..fcc17ace1e 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -502,18 +502,6 @@ export class NoteCreateService implements OnApplicationShutdown { }); } - // Channel - if (note.channelId) { - this.channelFollowingsRepository.findBy({ followeeId: note.channelId }).then(followings => { - for (const following of followings) { - this.noteReadService.insertNoteUnread(following.followerId, note, { - isSpecified: false, - isMentioned: false, - }); - } - }); - } - if (data.reply) { this.saveReply(data.reply, note); } diff --git a/packages/backend/src/core/NoteReadService.ts b/packages/backend/src/core/NoteReadService.ts index 7c6808fbd0..1129bd159c 100644 --- a/packages/backend/src/core/NoteReadService.ts +++ b/packages/backend/src/core/NoteReadService.ts @@ -1,28 +1,20 @@ import { setTimeout } from 'node:timers/promises'; import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; -import { In, IsNull, Not } from 'typeorm'; +import { In } from 'typeorm'; import { DI } from '@/di-symbols.js'; import type { User } from '@/models/entities/User.js'; -import type { Channel } from '@/models/entities/Channel.js'; import type { Packed } from '@/misc/json-schema.js'; import type { Note } from '@/models/entities/Note.js'; import { IdService } from '@/core/IdService.js'; import { GlobalEventService } from '@/core/GlobalEventService.js'; -import type { UsersRepository, NoteUnreadsRepository, MutingsRepository, NoteThreadMutingsRepository, FollowingsRepository, ChannelFollowingsRepository } from '@/models/index.js'; -import { UserEntityService } from '@/core/entities/UserEntityService.js'; +import type { NoteUnreadsRepository, MutingsRepository, NoteThreadMutingsRepository } from '@/models/index.js'; import { bindThis } from '@/decorators.js'; -import { NotificationService } from './NotificationService.js'; -import { AntennaService } from './AntennaService.js'; -import { PushNotificationService } from './PushNotificationService.js'; @Injectable() export class NoteReadService implements OnApplicationShutdown { #shutdownController = new AbortController(); constructor( - @Inject(DI.usersRepository) - private usersRepository: UsersRepository, - @Inject(DI.noteUnreadsRepository) private noteUnreadsRepository: NoteUnreadsRepository, @@ -32,18 +24,8 @@ export class NoteReadService implements OnApplicationShutdown { @Inject(DI.noteThreadMutingsRepository) private noteThreadMutingsRepository: NoteThreadMutingsRepository, - @Inject(DI.followingsRepository) - private followingsRepository: FollowingsRepository, - - @Inject(DI.channelFollowingsRepository) - private channelFollowingsRepository: ChannelFollowingsRepository, - - private userEntityService: UserEntityService, private idService: IdService, private globalEventService: GlobalEventService, - private notificationService: NotificationService, - private antennaService: AntennaService, - private pushNotificationService: PushNotificationService, ) { } @@ -54,7 +36,6 @@ export class NoteReadService implements OnApplicationShutdown { isMentioned: boolean; }): Promise { //#region ミュートしているなら無視 - // TODO: 現在の仕様ではChannelにミュートは適用されないのでよしなにケアする const mute = await this.mutingsRepository.findBy({ muterId: userId, }); @@ -74,7 +55,6 @@ export class NoteReadService implements OnApplicationShutdown { userId: userId, isSpecified: params.isSpecified, isMentioned: params.isMentioned, - noteChannelId: note.channelId, noteUserId: note.userId, }; @@ -92,9 +72,6 @@ export class NoteReadService implements OnApplicationShutdown { if (params.isSpecified) { this.globalEventService.publishMainStream(userId, 'unreadSpecifiedNote', note.id); } - if (note.channelId) { - this.globalEventService.publishMainStream(userId, 'unreadChannel', note.id); - } }, () => { /* aborted, ignore it */ }); } @@ -102,22 +79,9 @@ export class NoteReadService implements OnApplicationShutdown { public async read( userId: User['id'], notes: (Note | Packed<'Note'>)[], - info?: { - following: Set; - followingChannels: Set; - }, ): Promise { - const followingChannels = info?.followingChannels ? info.followingChannels : new Set((await this.channelFollowingsRepository.find({ - where: { - followerId: userId, - }, - select: ['followeeId'], - })).map(x => x.followeeId)); - - const myAntennas = (await this.antennaService.getAntennas()).filter(a => a.userId === userId); const readMentions: (Note | Packed<'Note'>)[] = []; const readSpecifiedNotes: (Note | Packed<'Note'>)[] = []; - const readChannelNotes: (Note | Packed<'Note'>)[] = []; for (const note of notes) { if (note.mentions && note.mentions.includes(userId)) { @@ -125,17 +89,13 @@ export class NoteReadService implements OnApplicationShutdown { } else if (note.visibleUserIds && note.visibleUserIds.includes(userId)) { readSpecifiedNotes.push(note); } - - if (note.channelId && followingChannels.has(note.channelId)) { - readChannelNotes.push(note); - } } - if ((readMentions.length > 0) || (readSpecifiedNotes.length > 0) || (readChannelNotes.length > 0)) { + if ((readMentions.length > 0) || (readSpecifiedNotes.length > 0)) { // Remove the record await this.noteUnreadsRepository.delete({ userId: userId, - noteId: In([...readMentions.map(n => n.id), ...readSpecifiedNotes.map(n => n.id), ...readChannelNotes.map(n => n.id)]), + noteId: In([...readMentions.map(n => n.id), ...readSpecifiedNotes.map(n => n.id)]), }); // TODO: ↓まとめてクエリしたい @@ -159,16 +119,6 @@ export class NoteReadService implements OnApplicationShutdown { this.globalEventService.publishMainStream(userId, 'readAllUnreadSpecifiedNotes'); } }); - - this.noteUnreadsRepository.countBy({ - userId: userId, - noteChannelId: Not(IsNull()), - }).then(channelNoteCount => { - if (channelNoteCount === 0) { - // 全て既読になったイベントを発行 - this.globalEventService.publishMainStream(userId, 'readAllChannels'); - } - }); } } diff --git a/packages/backend/src/core/entities/UserEntityService.ts b/packages/backend/src/core/entities/UserEntityService.ts index e8474c7e0e..f2f5e4b582 100644 --- a/packages/backend/src/core/entities/UserEntityService.ts +++ b/packages/backend/src/core/entities/UserEntityService.ts @@ -234,18 +234,6 @@ export class UserEntityService implements OnModuleInit { return false; // TODO } - @bindThis - public async getHasUnreadChannel(userId: User['id']): Promise { - const channels = await this.channelFollowingsRepository.findBy({ followerId: userId }); - - const unread = channels.length > 0 ? await this.noteUnreadsRepository.findOneBy({ - userId: userId, - noteChannelId: In(channels.map(x => x.followeeId)), - }) : null; - - return unread != null; - } - @bindThis public async getHasUnreadNotification(userId: User['id']): Promise { const latestReadNotificationId = await this.redisClient.get(`latestReadNotification:${userId}`); @@ -463,7 +451,7 @@ export class UserEntityService implements OnModuleInit { }).then(count => count > 0), hasUnreadAnnouncement: this.getHasUnreadAnnouncement(user.id), hasUnreadAntenna: this.getHasUnreadAntenna(user.id), - hasUnreadChannel: this.getHasUnreadChannel(user.id), + hasUnreadChannel: false, // 後方互換性のため hasUnreadNotification: this.getHasUnreadNotification(user.id), hasPendingReceivedFollowRequest: this.getHasPendingReceivedFollowRequest(user.id), mutedWords: profile!.mutedWords, diff --git a/packages/backend/src/models/json-schema/user.ts b/packages/backend/src/models/json-schema/user.ts index e8a7212c52..e388a77a5e 100644 --- a/packages/backend/src/models/json-schema/user.ts +++ b/packages/backend/src/models/json-schema/user.ts @@ -311,10 +311,6 @@ export const packedMeDetailedOnlySchema = { type: 'boolean', nullable: false, optional: false, }, - hasUnreadChannel: { - type: 'boolean', - nullable: false, optional: false, - }, hasUnreadNotification: { type: 'boolean', nullable: false, optional: false, diff --git a/packages/backend/src/server/api/stream/index.ts b/packages/backend/src/server/api/stream/index.ts index f1f8bfd3a2..2f473cd012 100644 --- a/packages/backend/src/server/api/stream/index.ts +++ b/packages/backend/src/server/api/stream/index.ts @@ -186,10 +186,7 @@ export default class Connection { if (note == null) return; if (this.user && (note.userId !== this.user.id)) { - this.noteReadService.read(this.user.id, [note], { - following: this.following, - followingChannels: this.followingChannels, - }); + this.noteReadService.read(this.user.id, [note]); } } diff --git a/packages/backend/src/server/api/stream/types.ts b/packages/backend/src/server/api/stream/types.ts index 1e6e51e76d..f4eedc3964 100644 --- a/packages/backend/src/server/api/stream/types.ts +++ b/packages/backend/src/server/api/stream/types.ts @@ -97,8 +97,6 @@ export interface MainStreamTypes { readAllAntennas: undefined; unreadAntenna: Antenna; readAllAnnouncements: undefined; - readAllChannels: undefined; - unreadChannel: Note['id']; myTokenRegenerated: undefined; signin: Signin; registryUpdated: { diff --git a/packages/frontend/src/init.ts b/packages/frontend/src/init.ts index 26c5adfc70..7809017951 100644 --- a/packages/frontend/src/init.ts +++ b/packages/frontend/src/init.ts @@ -513,15 +513,6 @@ if ($i) { updateAccount({ hasUnreadAnnouncement: false }); }); - main.on('readAllChannels', () => { - updateAccount({ hasUnreadChannel: false }); - }); - - main.on('unreadChannel', () => { - updateAccount({ hasUnreadChannel: true }); - sound.play('channel'); - }); - // トークンが再生成されたとき // このままではMisskeyが利用できないので強制的にサインアウトさせる main.on('myTokenRegenerated', () => { diff --git a/packages/misskey-js/src/entities.ts b/packages/misskey-js/src/entities.ts index 37a8bc6184..7343fd74ad 100644 --- a/packages/misskey-js/src/entities.ts +++ b/packages/misskey-js/src/entities.ts @@ -88,7 +88,6 @@ export type MeDetailed = UserDetailed & { hasPendingReceivedFollowRequest: boolean; hasUnreadAnnouncement: boolean; hasUnreadAntenna: boolean; - hasUnreadChannel: boolean; hasUnreadMentions: boolean; hasUnreadMessagingMessage: boolean; hasUnreadNotification: boolean; diff --git a/packages/misskey-js/src/streaming.types.ts b/packages/misskey-js/src/streaming.types.ts index a64545f8e2..96ac7787e1 100644 --- a/packages/misskey-js/src/streaming.types.ts +++ b/packages/misskey-js/src/streaming.types.ts @@ -28,8 +28,6 @@ export type Channels = { readAllAntennas: () => void; unreadAntenna: (payload: Antenna) => void; readAllAnnouncements: () => void; - readAllChannels: () => void; - unreadChannel: (payload: Note['id']) => void; myTokenRegenerated: () => void; reversiNoInvites: () => void; reversiInvited: (payload: FIXME) => void; -- cgit v1.2.3-freya From f44504097c360fc84179161abee47b79a936b455 Mon Sep 17 00:00:00 2001 From: syuilo Date: Wed, 5 Apr 2023 10:21:10 +0900 Subject: enhance(backend): improve cache --- packages/backend/src/core/CacheService.ts | 85 +++++++++- packages/backend/src/core/DeleteAccountService.ts | 3 - packages/backend/src/core/GlobalEventService.ts | 6 - packages/backend/src/core/NotificationService.ts | 6 +- packages/backend/src/core/UserBlockingService.ts | 130 ++------------ packages/backend/src/core/UserFollowingService.ts | 9 +- packages/backend/src/core/UserMutingService.ts | 31 +++- packages/backend/src/misc/cache.ts | 66 ++++---- .../CheckExpiredMutingsProcessorService.ts | 14 +- .../src/server/api/StreamingApiServerService.ts | 21 ++- .../server/api/endpoints/admin/accounts/delete.ts | 5 - .../src/server/api/endpoints/admin/suspend-user.ts | 5 - .../src/server/api/endpoints/channels/follow.ts | 3 - .../src/server/api/endpoints/channels/unfollow.ts | 4 - .../src/server/api/endpoints/i/regenerate-token.ts | 5 - .../src/server/api/endpoints/i/revoke-token.ts | 3 - .../backend/src/server/api/endpoints/i/update.ts | 1 - .../src/server/api/endpoints/mute/create.ts | 21 +-- .../src/server/api/endpoints/mute/delete.ts | 13 +- .../src/server/api/endpoints/renote-mute/delete.ts | 2 - packages/backend/src/server/api/stream/channel.ts | 12 +- .../src/server/api/stream/channels/antenna.ts | 6 +- .../src/server/api/stream/channels/channel.ts | 6 +- .../server/api/stream/channels/global-timeline.ts | 6 +- .../src/server/api/stream/channels/hashtag.ts | 6 +- .../server/api/stream/channels/home-timeline.ts | 11 +- .../server/api/stream/channels/hybrid-timeline.ts | 8 +- .../server/api/stream/channels/local-timeline.ts | 6 +- .../backend/src/server/api/stream/channels/main.ts | 4 +- .../src/server/api/stream/channels/user-list.ts | 6 +- packages/backend/src/server/api/stream/index.ts | 187 +++++---------------- packages/backend/src/server/api/stream/types.ts | 21 +-- 32 files changed, 264 insertions(+), 448 deletions(-) (limited to 'packages/backend/src/server/api/stream') diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index 887baeb2c2..f0c311b9b0 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -1,6 +1,6 @@ import { Inject, Injectable } from '@nestjs/common'; import Redis from 'ioredis'; -import type { UserProfile, UsersRepository } from '@/models/index.js'; +import type { BlockingsRepository, ChannelFollowingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, UserProfile, UserProfilesRepository, UsersRepository } from '@/models/index.js'; import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js'; import type { LocalUser, User } from '@/models/entities/User.js'; import { DI } from '@/di-symbols.js'; @@ -16,7 +16,12 @@ export class CacheService implements OnApplicationShutdown { public localUserByIdCache: MemoryKVCache; public uriPersonCache: MemoryKVCache; public userProfileCache: RedisKVCache; - public userMutingsCache: RedisKVCache; + public userMutingsCache: RedisKVCache>; + public userBlockingCache: RedisKVCache>; + public userBlockedCache: RedisKVCache>; // NOTE: 「被」Blockキャッシュ + public renoteMutingsCache: RedisKVCache>; + public userFollowingsCache: RedisKVCache>; + public userFollowingChannelsCache: RedisKVCache>; constructor( @Inject(DI.redis) @@ -28,6 +33,24 @@ export class CacheService implements OnApplicationShutdown { @Inject(DI.usersRepository) private usersRepository: UsersRepository, + @Inject(DI.userProfilesRepository) + private userProfilesRepository: UserProfilesRepository, + + @Inject(DI.mutingsRepository) + private mutingsRepository: MutingsRepository, + + @Inject(DI.blockingsRepository) + private blockingsRepository: BlockingsRepository, + + @Inject(DI.renoteMutingsRepository) + private renoteMutingsRepository: RenoteMutingsRepository, + + @Inject(DI.followingsRepository) + private followingsRepository: FollowingsRepository, + + @Inject(DI.channelFollowingsRepository) + private channelFollowingsRepository: ChannelFollowingsRepository, + private userEntityService: UserEntityService, ) { //this.onMessage = this.onMessage.bind(this); @@ -36,8 +59,62 @@ export class CacheService implements OnApplicationShutdown { this.localUserByNativeTokenCache = new MemoryKVCache(Infinity); this.localUserByIdCache = new MemoryKVCache(Infinity); this.uriPersonCache = new MemoryKVCache(Infinity); - this.userProfileCache = new RedisKVCache(this.redisClient, 'userProfile', 1000 * 60 * 60 * 24, 1000 * 60); - this.userMutingsCache = new RedisKVCache(this.redisClient, 'userMutings', 1000 * 60 * 60 * 24, 1000 * 60); + + this.userProfileCache = new RedisKVCache(this.redisClient, 'userProfile', { + lifetime: 1000 * 60 * 30, // 30m + memoryCacheLifetime: 1000 * 60, // 1m + fetcher: (key) => this.userProfilesRepository.findOneByOrFail({ userId: key }), + toRedisConverter: (value) => JSON.stringify(value), + fromRedisConverter: (value) => JSON.parse(value), // TODO: date型の考慮 + }); + + this.userMutingsCache = new RedisKVCache>(this.redisClient, 'userMutings', { + lifetime: 1000 * 60 * 30, // 30m + memoryCacheLifetime: 1000 * 60, // 1m + fetcher: (key) => this.mutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))), + toRedisConverter: (value) => JSON.stringify(Array.from(value)), + fromRedisConverter: (value) => new Set(JSON.parse(value)), + }); + + this.userBlockingCache = new RedisKVCache>(this.redisClient, 'userBlocking', { + lifetime: 1000 * 60 * 30, // 30m + memoryCacheLifetime: 1000 * 60, // 1m + fetcher: (key) => this.blockingsRepository.find({ where: { blockerId: key }, select: ['blockeeId'] }).then(xs => new Set(xs.map(x => x.blockeeId))), + toRedisConverter: (value) => JSON.stringify(Array.from(value)), + fromRedisConverter: (value) => new Set(JSON.parse(value)), + }); + + this.userBlockedCache = new RedisKVCache>(this.redisClient, 'userBlocked', { + lifetime: 1000 * 60 * 30, // 30m + memoryCacheLifetime: 1000 * 60, // 1m + fetcher: (key) => this.blockingsRepository.find({ where: { blockeeId: key }, select: ['blockerId'] }).then(xs => new Set(xs.map(x => x.blockerId))), + toRedisConverter: (value) => JSON.stringify(Array.from(value)), + fromRedisConverter: (value) => new Set(JSON.parse(value)), + }); + + this.renoteMutingsCache = new RedisKVCache>(this.redisClient, 'renoteMutings', { + lifetime: 1000 * 60 * 30, // 30m + memoryCacheLifetime: 1000 * 60, // 1m + fetcher: (key) => this.renoteMutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))), + toRedisConverter: (value) => JSON.stringify(Array.from(value)), + fromRedisConverter: (value) => new Set(JSON.parse(value)), + }); + + this.userFollowingsCache = new RedisKVCache>(this.redisClient, 'userFollowings', { + lifetime: 1000 * 60 * 30, // 30m + memoryCacheLifetime: 1000 * 60, // 1m + fetcher: (key) => this.followingsRepository.find({ where: { followerId: key }, select: ['followeeId'] }).then(xs => new Set(xs.map(x => x.followeeId))), + toRedisConverter: (value) => JSON.stringify(Array.from(value)), + fromRedisConverter: (value) => new Set(JSON.parse(value)), + }); + + this.userFollowingChannelsCache = new RedisKVCache>(this.redisClient, 'userFollowingChannels', { + lifetime: 1000 * 60 * 30, // 30m + memoryCacheLifetime: 1000 * 60, // 1m + fetcher: (key) => this.channelFollowingsRepository.find({ where: { followerId: key }, select: ['followeeId'] }).then(xs => new Set(xs.map(x => x.followeeId))), + toRedisConverter: (value) => JSON.stringify(Array.from(value)), + fromRedisConverter: (value) => new Set(JSON.parse(value)), + }); this.redisSubscriber.on('message', this.onMessage); } diff --git a/packages/backend/src/core/DeleteAccountService.ts b/packages/backend/src/core/DeleteAccountService.ts index 2acb5f2303..327283106f 100644 --- a/packages/backend/src/core/DeleteAccountService.ts +++ b/packages/backend/src/core/DeleteAccountService.ts @@ -36,8 +36,5 @@ export class DeleteAccountService { await this.usersRepository.update(user.id, { isDeleted: true, }); - - // Terminate streaming - this.globalEventService.publishUserEvent(user.id, 'terminate', {}); } } diff --git a/packages/backend/src/core/GlobalEventService.ts b/packages/backend/src/core/GlobalEventService.ts index d261a6c657..25c064a2b4 100644 --- a/packages/backend/src/core/GlobalEventService.ts +++ b/packages/backend/src/core/GlobalEventService.ts @@ -14,7 +14,6 @@ import type { MainStreamTypes, NoteStreamTypes, UserListStreamTypes, - UserStreamTypes, } from '@/server/api/stream/types.js'; import type { Packed } from '@/misc/json-schema.js'; import { DI } from '@/di-symbols.js'; @@ -49,11 +48,6 @@ export class GlobalEventService { this.publish('internal', type, typeof value === 'undefined' ? null : value); } - @bindThis - public publishUserEvent(userId: User['id'], type: K, value?: UserStreamTypes[K]): void { - this.publish(`user:${userId}`, type, typeof value === 'undefined' ? null : value); - } - @bindThis public publishBroadcastStream(type: K, value?: BroadcastTypes[K]): void { this.publish('broadcast', type, typeof value === 'undefined' ? null : value); diff --git a/packages/backend/src/core/NotificationService.ts b/packages/backend/src/core/NotificationService.ts index 9c179f9318..366dc08c02 100644 --- a/packages/backend/src/core/NotificationService.ts +++ b/packages/backend/src/core/NotificationService.ts @@ -73,7 +73,7 @@ export class NotificationService implements OnApplicationShutdown { type: Notification['type'], data: Partial, ): Promise { - const profile = await this.cacheService.userProfileCache.fetch(notifieeId, () => this.userProfilesRepository.findOneByOrFail({ userId: notifieeId })); + const profile = await this.cacheService.userProfileCache.fetch(notifieeId); const isMuted = profile.mutingNotificationTypes.includes(type); if (isMuted) return null; @@ -82,8 +82,8 @@ export class NotificationService implements OnApplicationShutdown { return null; } - const mutings = await this.cacheService.userMutingsCache.fetch(notifieeId, () => this.mutingsRepository.findBy({ muterId: notifieeId }).then(xs => xs.map(x => x.muteeId))); - if (mutings.includes(data.notifierId)) { + const mutings = await this.cacheService.userMutingsCache.fetch(notifieeId); + if (mutings.has(data.notifierId)) { return null; } } diff --git a/packages/backend/src/core/UserBlockingService.ts b/packages/backend/src/core/UserBlockingService.ts index 040b6de2ef..6eaef8f68a 100644 --- a/packages/backend/src/core/UserBlockingService.ts +++ b/packages/backend/src/core/UserBlockingService.ts @@ -1,40 +1,26 @@ -import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; -import Redis from 'ioredis'; +import { Inject, Injectable } from '@nestjs/common'; import { IdService } from '@/core/IdService.js'; import type { User } from '@/models/entities/User.js'; import type { Blocking } from '@/models/entities/Blocking.js'; import { QueueService } from '@/core/QueueService.js'; import { GlobalEventService } from '@/core/GlobalEventService.js'; -import PerUserFollowingChart from '@/core/chart/charts/per-user-following.js'; import { DI } from '@/di-symbols.js'; -import type { UsersRepository, FollowingsRepository, FollowRequestsRepository, BlockingsRepository, UserListsRepository, UserListJoiningsRepository } from '@/models/index.js'; +import type { FollowRequestsRepository, BlockingsRepository, UserListsRepository, UserListJoiningsRepository } from '@/models/index.js'; import Logger from '@/logger.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { ApRendererService } from '@/core/activitypub/ApRendererService.js'; import { LoggerService } from '@/core/LoggerService.js'; import { WebhookService } from '@/core/WebhookService.js'; import { bindThis } from '@/decorators.js'; -import { MemoryKVCache } from '@/misc/cache.js'; -import { StreamMessages } from '@/server/api/stream/types.js'; +import { CacheService } from '@/core/CacheService.js'; +import { UserFollowingService } from '@/core/UserFollowingService.js'; @Injectable() -export class UserBlockingService implements OnApplicationShutdown { +export class UserBlockingService { private logger: Logger; - // キーがユーザーIDで、値がそのユーザーがブロックしているユーザーのIDのリストなキャッシュ - private blockingsByUserIdCache: MemoryKVCache; - constructor( - @Inject(DI.redisSubscriber) - private redisSubscriber: Redis.Redis, - - @Inject(DI.usersRepository) - private usersRepository: UsersRepository, - - @Inject(DI.followingsRepository) - private followingsRepository: FollowingsRepository, - @Inject(DI.followRequestsRepository) private followRequestsRepository: FollowRequestsRepository, @@ -47,47 +33,17 @@ export class UserBlockingService implements OnApplicationShutdown { @Inject(DI.userListJoiningsRepository) private userListJoiningsRepository: UserListJoiningsRepository, + private cacheService: CacheService, + private userFollowingService: UserFollowingService, private userEntityService: UserEntityService, private idService: IdService, private queueService: QueueService, private globalEventService: GlobalEventService, private webhookService: WebhookService, private apRendererService: ApRendererService, - private perUserFollowingChart: PerUserFollowingChart, private loggerService: LoggerService, ) { this.logger = this.loggerService.getLogger('user-block'); - - this.blockingsByUserIdCache = new MemoryKVCache(Infinity); - - this.redisSubscriber.on('message', this.onMessage); - } - - @bindThis - private async onMessage(_: string, data: string): Promise { - const obj = JSON.parse(data); - - if (obj.channel === 'internal') { - const { type, body } = obj.message as StreamMessages['internal']['payload']; - switch (type) { - case 'blockingCreated': { - const cached = this.blockingsByUserIdCache.get(body.blockerId); - if (cached) { - this.blockingsByUserIdCache.set(body.blockerId, [...cached, ...[body.blockeeId]]); - } - break; - } - case 'blockingDeleted': { - const cached = this.blockingsByUserIdCache.get(body.blockerId); - if (cached) { - this.blockingsByUserIdCache.set(body.blockerId, cached.filter(x => x !== body.blockeeId)); - } - break; - } - default: - break; - } - } } @bindThis @@ -95,8 +51,8 @@ export class UserBlockingService implements OnApplicationShutdown { await Promise.all([ this.cancelRequest(blocker, blockee), this.cancelRequest(blockee, blocker), - this.unFollow(blocker, blockee), - this.unFollow(blockee, blocker), + this.userFollowingService.unfollow(blocker, blockee), + this.userFollowingService.unfollow(blockee, blocker), this.removeFromList(blockee, blocker), ]); @@ -111,6 +67,9 @@ export class UserBlockingService implements OnApplicationShutdown { await this.blockingsRepository.insert(blocking); + this.cacheService.userBlockingCache.refresh(blocker.id); + this.cacheService.userBlockedCache.refresh(blockee.id); + this.globalEventService.publishInternalEvent('blockingCreated', { blockerId: blocker.id, blockeeId: blockee.id, @@ -148,7 +107,6 @@ export class UserBlockingService implements OnApplicationShutdown { this.userEntityService.pack(followee, follower, { detail: true, }).then(async packed => { - this.globalEventService.publishUserEvent(follower.id, 'unfollow', packed); this.globalEventService.publishMainStream(follower.id, 'unfollow', packed); const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === follower.id && x.on.includes('unfollow')); @@ -173,54 +131,6 @@ export class UserBlockingService implements OnApplicationShutdown { } } - @bindThis - private async unFollow(follower: User, followee: User) { - const following = await this.followingsRepository.findOneBy({ - followerId: follower.id, - followeeId: followee.id, - }); - - if (following == null) { - return; - } - - await Promise.all([ - this.followingsRepository.delete(following.id), - this.usersRepository.decrement({ id: follower.id }, 'followingCount', 1), - this.usersRepository.decrement({ id: followee.id }, 'followersCount', 1), - this.perUserFollowingChart.update(follower, followee, false), - ]); - - // Publish unfollow event - if (this.userEntityService.isLocalUser(follower)) { - this.userEntityService.pack(followee, follower, { - detail: true, - }).then(async packed => { - this.globalEventService.publishUserEvent(follower.id, 'unfollow', packed); - this.globalEventService.publishMainStream(follower.id, 'unfollow', packed); - - const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === follower.id && x.on.includes('unfollow')); - for (const webhook of webhooks) { - this.queueService.webhookDeliver(webhook, 'unfollow', { - user: packed, - }); - } - }); - } - - // リモートにフォローをしていたらUndoFollow送信 - if (this.userEntityService.isLocalUser(follower) && this.userEntityService.isRemoteUser(followee)) { - const content = this.apRendererService.addContext(this.apRendererService.renderUndo(this.apRendererService.renderFollow(follower, followee), follower)); - this.queueService.deliver(follower, content, followee.inbox, false); - } - - // リモートからフォローをされていたらRejectFollow送信 - if (this.userEntityService.isLocalUser(followee) && this.userEntityService.isRemoteUser(follower)) { - const content = this.apRendererService.addContext(this.apRendererService.renderReject(this.apRendererService.renderFollow(follower, followee), followee)); - this.queueService.deliver(followee, content, follower.inbox, false); - } - } - @bindThis private async removeFromList(listOwner: User, user: User) { const userLists = await this.userListsRepository.findBy({ @@ -254,6 +164,9 @@ export class UserBlockingService implements OnApplicationShutdown { await this.blockingsRepository.delete(blocking.id); + this.cacheService.userBlockingCache.refresh(blocker.id); + this.cacheService.userBlockedCache.refresh(blockee.id); + this.globalEventService.publishInternalEvent('blockingDeleted', { blockerId: blocker.id, blockeeId: blockee.id, @@ -268,17 +181,6 @@ export class UserBlockingService implements OnApplicationShutdown { @bindThis public async checkBlocked(blockerId: User['id'], blockeeId: User['id']): Promise { - const blockedUserIds = await this.blockingsByUserIdCache.fetch(blockerId, () => this.blockingsRepository.find({ - where: { - blockerId, - }, - select: ['blockeeId'], - }).then(records => records.map(record => record.blockeeId))); - return blockedUserIds.includes(blockeeId); - } - - @bindThis - public onApplicationShutdown(signal?: string | undefined) { - this.redisSubscriber.off('message', this.onMessage); + return (await this.cacheService.userBlockingCache.fetch(blockerId)).has(blockeeId); } } diff --git a/packages/backend/src/core/UserFollowingService.ts b/packages/backend/src/core/UserFollowingService.ts index b51b553c70..4f22c5cd46 100644 --- a/packages/backend/src/core/UserFollowingService.ts +++ b/packages/backend/src/core/UserFollowingService.ts @@ -18,6 +18,7 @@ import { ApRendererService } from '@/core/activitypub/ApRendererService.js'; import { bindThis } from '@/decorators.js'; import { UserBlockingService } from '@/core/UserBlockingService.js'; import { MetaService } from '@/core/MetaService.js'; +import { CacheService } from '@/core/CacheService.js'; import Logger from '../logger.js'; const logger = new Logger('following/create'); @@ -53,6 +54,7 @@ export class UserFollowingService { @Inject(DI.instancesRepository) private instancesRepository: InstancesRepository, + private cacheService: CacheService, private userEntityService: UserEntityService, private userBlockingService: UserBlockingService, private idService: IdService, @@ -172,6 +174,8 @@ export class UserFollowingService { } }); + this.cacheService.userFollowingsCache.refresh(follower.id); + const req = await this.followRequestsRepository.findOneBy({ followeeId: followee.id, followerId: follower.id, @@ -225,7 +229,6 @@ export class UserFollowingService { this.userEntityService.pack(followee.id, follower, { detail: true, }).then(async packed => { - this.globalEventService.publishUserEvent(follower.id, 'follow', packed as Packed<'UserDetailedNotMe'>); this.globalEventService.publishMainStream(follower.id, 'follow', packed as Packed<'UserDetailedNotMe'>); const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === follower.id && x.on.includes('follow')); @@ -279,6 +282,8 @@ export class UserFollowingService { await this.followingsRepository.delete(following.id); + this.cacheService.userFollowingsCache.refresh(follower.id); + this.decrementFollowing(follower, followee); // Publish unfollow event @@ -286,7 +291,6 @@ export class UserFollowingService { this.userEntityService.pack(followee.id, follower, { detail: true, }).then(async packed => { - this.globalEventService.publishUserEvent(follower.id, 'unfollow', packed); this.globalEventService.publishMainStream(follower.id, 'unfollow', packed); const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === follower.id && x.on.includes('unfollow')); @@ -579,7 +583,6 @@ export class UserFollowingService { detail: true, }); - this.globalEventService.publishUserEvent(follower.id, 'unfollow', packedFollowee); this.globalEventService.publishMainStream(follower.id, 'unfollow', packedFollowee); const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === follower.id && x.on.includes('unfollow')); diff --git a/packages/backend/src/core/UserMutingService.ts b/packages/backend/src/core/UserMutingService.ts index e98f11709f..657b5764b9 100644 --- a/packages/backend/src/core/UserMutingService.ts +++ b/packages/backend/src/core/UserMutingService.ts @@ -1,34 +1,47 @@ import { Inject, Injectable } from '@nestjs/common'; -import type { UsersRepository, MutingsRepository } from '@/models/index.js'; +import { In } from 'typeorm'; +import type { MutingsRepository, Muting } from '@/models/index.js'; import { IdService } from '@/core/IdService.js'; -import { QueueService } from '@/core/QueueService.js'; -import { GlobalEventService } from '@/core/GlobalEventService.js'; import type { User } from '@/models/entities/User.js'; import { DI } from '@/di-symbols.js'; import { bindThis } from '@/decorators.js'; +import { CacheService } from '@/core/CacheService'; @Injectable() export class UserMutingService { constructor( - @Inject(DI.usersRepository) - private usersRepository: UsersRepository, - @Inject(DI.mutingsRepository) private mutingsRepository: MutingsRepository, private idService: IdService, - private queueService: QueueService, - private globalEventService: GlobalEventService, + private cacheService: CacheService, ) { } @bindThis - public async mute(user: User, target: User): Promise { + public async mute(user: User, target: User, expiresAt: Date | null = null): Promise { await this.mutingsRepository.insert({ id: this.idService.genId(), createdAt: new Date(), + expiresAt: expiresAt ?? null, muterId: user.id, muteeId: target.id, }); + + this.cacheService.userMutingsCache.refresh(user.id); + } + + @bindThis + public async unmute(mutings: Muting[]): Promise { + if (mutings.length === 0) return; + + await this.mutingsRepository.delete({ + id: In(mutings.map(m => m.id)), + }); + + const muterIds = [...new Set(mutings.map(m => m.muterId))]; + for (const muterId of muterIds) { + this.cacheService.userMutingsCache.refresh(muterId); + } } } diff --git a/packages/backend/src/misc/cache.ts b/packages/backend/src/misc/cache.ts index 870dfd237c..ef6f610125 100644 --- a/packages/backend/src/misc/cache.ts +++ b/packages/backend/src/misc/cache.ts @@ -1,29 +1,29 @@ import Redis from 'ioredis'; import { bindThis } from '@/decorators.js'; -// redis通すとDateのインスタンスはstringに変換されるので -type Serialized = { - [K in keyof T]: - T[K] extends Date - ? string - : T[K] extends (Date | null) - ? (string | null) - : T[K] extends Record - ? Serialized - : T[K]; -}; - export class RedisKVCache { private redisClient: Redis.Redis; private name: string; private lifetime: number; private memoryCache: MemoryKVCache; + private fetcher: (key: string) => Promise; + private toRedisConverter: (value: T) => string; + private fromRedisConverter: (value: string) => T; - constructor(redisClient: RedisKVCache['redisClient'], name: RedisKVCache['name'], lifetime: RedisKVCache['lifetime'], memoryCacheLifetime: number) { + constructor(redisClient: RedisKVCache['redisClient'], name: RedisKVCache['name'], opts: { + lifetime: RedisKVCache['lifetime']; + memoryCacheLifetime: number; + fetcher: RedisKVCache['fetcher']; + toRedisConverter: RedisKVCache['toRedisConverter']; + fromRedisConverter: RedisKVCache['fromRedisConverter']; + }) { this.redisClient = redisClient; this.name = name; - this.lifetime = lifetime; - this.memoryCache = new MemoryKVCache(memoryCacheLifetime); + this.lifetime = opts.lifetime; + this.memoryCache = new MemoryKVCache(opts.memoryCacheLifetime); + this.fetcher = opts.fetcher; + this.toRedisConverter = opts.toRedisConverter; + this.fromRedisConverter = opts.fromRedisConverter; } @bindThis @@ -32,25 +32,25 @@ export class RedisKVCache { if (this.lifetime === Infinity) { await this.redisClient.set( `kvcache:${this.name}:${key}`, - JSON.stringify(value), + this.toRedisConverter(value), ); } else { await this.redisClient.set( `kvcache:${this.name}:${key}`, - JSON.stringify(value), + this.toRedisConverter(value), 'ex', Math.round(this.lifetime / 1000), ); } } @bindThis - public async get(key: string): Promise | T | undefined> { + public async get(key: string): Promise { const memoryCached = this.memoryCache.get(key); if (memoryCached !== undefined) return memoryCached; const cached = await this.redisClient.get(`kvcache:${this.name}:${key}`); if (cached == null) return undefined; - return JSON.parse(cached); + return this.fromRedisConverter(cached); } @bindThis @@ -60,29 +60,29 @@ export class RedisKVCache { } /** - * キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します - * optional: キャッシュが存在してもvalidatorでfalseを返すとキャッシュ無効扱いにします - */ + * キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します + */ @bindThis - public async fetch(key: string, fetcher: () => Promise, validator?: (cachedValue: Serialized | T) => boolean): Promise | T> { + public async fetch(key: string): Promise { const cachedValue = await this.get(key); if (cachedValue !== undefined) { - if (validator) { - if (validator(cachedValue)) { - // Cache HIT - return cachedValue; - } - } else { - // Cache HIT - return cachedValue; - } + // Cache HIT + return cachedValue; } // Cache MISS - const value = await fetcher(); + const value = await this.fetcher(key); this.set(key, value); return value; } + + @bindThis + public async refresh(key: string) { + const value = await this.fetcher(key); + this.set(key, value); + + // TODO: イベント発行して他プロセスのメモリキャッシュも更新できるようにする + } } // TODO: メモリ節約のためあまり参照されないキーを定期的に削除できるようにする? diff --git a/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts b/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts index f4cd560fc9..2476d71a5e 100644 --- a/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts +++ b/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts @@ -4,10 +4,10 @@ import { DI } from '@/di-symbols.js'; import type { MutingsRepository } from '@/models/index.js'; import type { Config } from '@/config.js'; import type Logger from '@/logger.js'; -import { GlobalEventService } from '@/core/GlobalEventService.js'; +import { bindThis } from '@/decorators.js'; +import { UserMutingService } from '@/core/UserMutingService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type Bull from 'bull'; -import { bindThis } from '@/decorators.js'; @Injectable() export class CheckExpiredMutingsProcessorService { @@ -20,7 +20,7 @@ export class CheckExpiredMutingsProcessorService { @Inject(DI.mutingsRepository) private mutingsRepository: MutingsRepository, - private globalEventService: GlobalEventService, + private userMutingService: UserMutingService, private queueLoggerService: QueueLoggerService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('check-expired-mutings'); @@ -37,13 +37,7 @@ export class CheckExpiredMutingsProcessorService { .getMany(); if (expired.length > 0) { - await this.mutingsRepository.delete({ - id: In(expired.map(m => m.id)), - }); - - for (const m of expired) { - this.globalEventService.publishUserEvent(m.muterId, 'unmute', m.mutee!); - } + await this.userMutingService.unmute(expired); } this.logger.succ('All expired mutings checked.'); diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index 13526f277d..bd2d436a23 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -9,6 +9,7 @@ import { NoteReadService } from '@/core/NoteReadService.js'; import { GlobalEventService } from '@/core/GlobalEventService.js'; import { NotificationService } from '@/core/NotificationService.js'; import { bindThis } from '@/decorators.js'; +import { CacheService } from '@/core/CacheService.js'; import { AuthenticateService } from './AuthenticateService.js'; import MainStreamConnection from './stream/index.js'; import { ChannelsService } from './stream/ChannelsService.js'; @@ -45,7 +46,7 @@ export class StreamingApiServerService { @Inject(DI.userProfilesRepository) private userProfilesRepository: UserProfilesRepository, - private globalEventService: GlobalEventService, + private cacheService: CacheService, private noteReadService: NoteReadService, private authenticateService: AuthenticateService, private channelsService: ChannelsService, @@ -73,8 +74,6 @@ export class StreamingApiServerService { return; } - const connection = request.accept(); - const ev = new EventEmitter(); async function onRedisMessage(_: string, data: string): Promise { @@ -85,19 +84,19 @@ export class StreamingApiServerService { this.redisSubscriber.on('message', onRedisMessage); const main = new MainStreamConnection( - this.followingsRepository, - this.mutingsRepository, - this.renoteMutingsRepository, - this.blockingsRepository, - this.channelFollowingsRepository, - this.userProfilesRepository, this.channelsService, - this.globalEventService, this.noteReadService, this.notificationService, - connection, ev, user, miapp, + this.cacheService, + ev, user, miapp, ); + await main.init(); + + const connection = request.accept(); + + main.init2(connection); + const intervalId = user ? setInterval(() => { this.usersRepository.update(user.id, { lastActiveDate: new Date(), diff --git a/packages/backend/src/server/api/endpoints/admin/accounts/delete.ts b/packages/backend/src/server/api/endpoints/admin/accounts/delete.ts index e9f72676f0..16232813a8 100644 --- a/packages/backend/src/server/api/endpoints/admin/accounts/delete.ts +++ b/packages/backend/src/server/api/endpoints/admin/accounts/delete.ts @@ -61,11 +61,6 @@ export default class extends Endpoint { await this.usersRepository.update(user.id, { isDeleted: true, }); - - if (this.userEntityService.isLocalUser(user)) { - // Terminate streaming - this.globalEventService.publishUserEvent(user.id, 'terminate', {}); - } }); } } diff --git a/packages/backend/src/server/api/endpoints/admin/suspend-user.ts b/packages/backend/src/server/api/endpoints/admin/suspend-user.ts index 770b61850a..3c99225272 100644 --- a/packages/backend/src/server/api/endpoints/admin/suspend-user.ts +++ b/packages/backend/src/server/api/endpoints/admin/suspend-user.ts @@ -62,11 +62,6 @@ export default class extends Endpoint { targetId: user.id, }); - // Terminate streaming - if (this.userEntityService.isLocalUser(user)) { - this.globalEventService.publishUserEvent(user.id, 'terminate', {}); - } - (async () => { await this.userSuspendService.doPostSuspend(user).catch(e => {}); await this.unFollowAll(user).catch(e => {}); diff --git a/packages/backend/src/server/api/endpoints/channels/follow.ts b/packages/backend/src/server/api/endpoints/channels/follow.ts index 91693918f2..8ab59991c7 100644 --- a/packages/backend/src/server/api/endpoints/channels/follow.ts +++ b/packages/backend/src/server/api/endpoints/channels/follow.ts @@ -41,7 +41,6 @@ export default class extends Endpoint { private channelFollowingsRepository: ChannelFollowingsRepository, private idService: IdService, - private globalEventService: GlobalEventService, ) { super(meta, paramDef, async (ps, me) => { const channel = await this.channelsRepository.findOneBy({ @@ -58,8 +57,6 @@ export default class extends Endpoint { followerId: me.id, followeeId: channel.id, }); - - this.globalEventService.publishUserEvent(me.id, 'followChannel', channel); }); } } diff --git a/packages/backend/src/server/api/endpoints/channels/unfollow.ts b/packages/backend/src/server/api/endpoints/channels/unfollow.ts index ac2ef825be..855ba47f8c 100644 --- a/packages/backend/src/server/api/endpoints/channels/unfollow.ts +++ b/packages/backend/src/server/api/endpoints/channels/unfollow.ts @@ -38,8 +38,6 @@ export default class extends Endpoint { @Inject(DI.channelFollowingsRepository) private channelFollowingsRepository: ChannelFollowingsRepository, - - private globalEventService: GlobalEventService, ) { super(meta, paramDef, async (ps, me) => { const channel = await this.channelsRepository.findOneBy({ @@ -54,8 +52,6 @@ export default class extends Endpoint { followerId: me.id, followeeId: channel.id, }); - - this.globalEventService.publishUserEvent(me.id, 'unfollowChannel', channel); }); } } diff --git a/packages/backend/src/server/api/endpoints/i/regenerate-token.ts b/packages/backend/src/server/api/endpoints/i/regenerate-token.ts index 786e64374c..23ff63f5e9 100644 --- a/packages/backend/src/server/api/endpoints/i/regenerate-token.ts +++ b/packages/backend/src/server/api/endpoints/i/regenerate-token.ts @@ -54,11 +54,6 @@ export default class extends Endpoint { // Publish event this.globalEventService.publishInternalEvent('userTokenRegenerated', { id: me.id, oldToken, newToken }); this.globalEventService.publishMainStream(me.id, 'myTokenRegenerated'); - - // Terminate streaming - setTimeout(() => { - this.globalEventService.publishUserEvent(me.id, 'terminate', {}); - }, 5000); }); } } diff --git a/packages/backend/src/server/api/endpoints/i/revoke-token.ts b/packages/backend/src/server/api/endpoints/i/revoke-token.ts index 5e1dddb6b7..93daeb0cd7 100644 --- a/packages/backend/src/server/api/endpoints/i/revoke-token.ts +++ b/packages/backend/src/server/api/endpoints/i/revoke-token.ts @@ -35,9 +35,6 @@ export default class extends Endpoint { id: ps.tokenId, userId: me.id, }); - - // Terminate streaming - this.globalEventService.publishUserEvent(me.id, 'terminate'); } }); } diff --git a/packages/backend/src/server/api/endpoints/i/update.ts b/packages/backend/src/server/api/endpoints/i/update.ts index 46b16e9dce..c20f2b7913 100644 --- a/packages/backend/src/server/api/endpoints/i/update.ts +++ b/packages/backend/src/server/api/endpoints/i/update.ts @@ -284,7 +284,6 @@ export default class extends Endpoint { // Publish meUpdated event this.globalEventService.publishMainStream(user.id, 'meUpdated', iObj); - this.globalEventService.publishUserEvent(user.id, 'updateUserProfile', updatedProfile); // 鍵垢を解除したとき、溜まっていたフォローリクエストがあるならすべて承認 if (user.isLocked && ps.isLocked === false) { diff --git a/packages/backend/src/server/api/endpoints/mute/create.ts b/packages/backend/src/server/api/endpoints/mute/create.ts index fd062e1cab..6e24e1024d 100644 --- a/packages/backend/src/server/api/endpoints/mute/create.ts +++ b/packages/backend/src/server/api/endpoints/mute/create.ts @@ -1,13 +1,10 @@ import { Inject, Injectable } from '@nestjs/common'; import ms from 'ms'; import { Endpoint } from '@/server/api/endpoint-base.js'; -import { IdService } from '@/core/IdService.js'; import type { MutingsRepository } from '@/models/index.js'; -import type { Muting } from '@/models/entities/Muting.js'; -import { GlobalEventService } from '@/core/GlobalEventService.js'; import { DI } from '@/di-symbols.js'; import { GetterService } from '@/server/api/GetterService.js'; -import { CacheService } from '@/core/CacheService.js'; +import { UserMutingService } from '@/core/UserMutingService.js'; import { ApiError } from '../../error.js'; export const meta = { @@ -63,10 +60,8 @@ export default class extends Endpoint { @Inject(DI.mutingsRepository) private mutingsRepository: MutingsRepository, - private globalEventService: GlobalEventService, private getterService: GetterService, - private idService: IdService, - private cacheService: CacheService, + private userMutingService: UserMutingService, ) { super(meta, paramDef, async (ps, me) => { const muter = me; @@ -96,17 +91,7 @@ export default class extends Endpoint { return; } - // Create mute - await this.mutingsRepository.insert({ - id: this.idService.genId(), - createdAt: new Date(), - expiresAt: ps.expiresAt ? new Date(ps.expiresAt) : null, - muterId: muter.id, - muteeId: mutee.id, - } as Muting); - - this.cacheService.userMutingsCache.delete(muter.id); - this.globalEventService.publishUserEvent(me.id, 'mute', mutee); + await this.userMutingService.mute(muter, mutee, ps.expiresAt ? new Date(ps.expiresAt) : null); }); } } diff --git a/packages/backend/src/server/api/endpoints/mute/delete.ts b/packages/backend/src/server/api/endpoints/mute/delete.ts index 612c4a4c04..90b74590be 100644 --- a/packages/backend/src/server/api/endpoints/mute/delete.ts +++ b/packages/backend/src/server/api/endpoints/mute/delete.ts @@ -1,10 +1,10 @@ import { Inject, Injectable } from '@nestjs/common'; import { Endpoint } from '@/server/api/endpoint-base.js'; import type { MutingsRepository } from '@/models/index.js'; -import { GlobalEventService } from '@/core/GlobalEventService.js'; import { DI } from '@/di-symbols.js'; -import { ApiError } from '../../error.js'; import { GetterService } from '@/server/api/GetterService.js'; +import { UserMutingService } from '@/core/UserMutingService.js'; +import { ApiError } from '../../error.js'; export const meta = { tags: ['account'], @@ -49,7 +49,7 @@ export default class extends Endpoint { @Inject(DI.mutingsRepository) private mutingsRepository: MutingsRepository, - private globalEventService: GlobalEventService, + private userMutingService: UserMutingService, private getterService: GetterService, ) { super(meta, paramDef, async (ps, me) => { @@ -76,12 +76,7 @@ export default class extends Endpoint { throw new ApiError(meta.errors.notMuting); } - // Delete mute - await this.mutingsRepository.delete({ - id: exist.id, - }); - - this.globalEventService.publishUserEvent(me.id, 'unmute', mutee); + await this.userMutingService.unmute([exist]); }); } } diff --git a/packages/backend/src/server/api/endpoints/renote-mute/delete.ts b/packages/backend/src/server/api/endpoints/renote-mute/delete.ts index 51a895fb7e..70901a1406 100644 --- a/packages/backend/src/server/api/endpoints/renote-mute/delete.ts +++ b/packages/backend/src/server/api/endpoints/renote-mute/delete.ts @@ -80,8 +80,6 @@ export default class extends Endpoint { await this.renoteMutingsRepository.delete({ id: exist.id, }); - - // publishUserEvent(user.id, 'unmute', mutee); }); } } diff --git a/packages/backend/src/server/api/stream/channel.ts b/packages/backend/src/server/api/stream/channel.ts index 32935325aa..e67aec9ecd 100644 --- a/packages/backend/src/server/api/stream/channel.ts +++ b/packages/backend/src/server/api/stream/channel.ts @@ -23,16 +23,16 @@ export default abstract class Channel { return this.connection.following; } - protected get muting() { - return this.connection.muting; + protected get userIdsWhoMeMuting() { + return this.connection.userIdsWhoMeMuting; } - protected get renoteMuting() { - return this.connection.renoteMuting; + protected get userIdsWhoMeMutingRenotes() { + return this.connection.userIdsWhoMeMutingRenotes; } - protected get blocking() { - return this.connection.blocking; + protected get userIdsWhoBlockingMe() { + return this.connection.userIdsWhoBlockingMe; } protected get followingChannels() { diff --git a/packages/backend/src/server/api/stream/channels/antenna.ts b/packages/backend/src/server/api/stream/channels/antenna.ts index e2a42fbfe9..d48dea7258 100644 --- a/packages/backend/src/server/api/stream/channels/antenna.ts +++ b/packages/backend/src/server/api/stream/channels/antenna.ts @@ -35,11 +35,11 @@ class AntennaChannel extends Channel { const note = await this.noteEntityService.pack(data.body.id, this.user, { detail: true }); // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する - if (isUserRelated(note, this.muting)) return; + if (isUserRelated(note, this.userIdsWhoMeMuting)) return; // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する - if (isUserRelated(note, this.blocking)) return; + if (isUserRelated(note, this.userIdsWhoBlockingMe)) return; - if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return; + if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return; this.connection.cacheNote(note); diff --git a/packages/backend/src/server/api/stream/channels/channel.ts b/packages/backend/src/server/api/stream/channels/channel.ts index 12caa7f233..9e5b40997b 100644 --- a/packages/backend/src/server/api/stream/channels/channel.ts +++ b/packages/backend/src/server/api/stream/channels/channel.ts @@ -47,11 +47,11 @@ class ChannelChannel extends Channel { } // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する - if (isUserRelated(note, this.muting)) return; + if (isUserRelated(note, this.userIdsWhoMeMuting)) return; // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する - if (isUserRelated(note, this.blocking)) return; + if (isUserRelated(note, this.userIdsWhoBlockingMe)) return; - if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return; + if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return; this.connection.cacheNote(note); diff --git a/packages/backend/src/server/api/stream/channels/global-timeline.ts b/packages/backend/src/server/api/stream/channels/global-timeline.ts index d79247cd6e..5454836fe1 100644 --- a/packages/backend/src/server/api/stream/channels/global-timeline.ts +++ b/packages/backend/src/server/api/stream/channels/global-timeline.ts @@ -64,11 +64,11 @@ class GlobalTimelineChannel extends Channel { if (isInstanceMuted(note, new Set(this.userProfile?.mutedInstances ?? []))) return; // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する - if (isUserRelated(note, this.muting)) return; + if (isUserRelated(note, this.userIdsWhoMeMuting)) return; // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する - if (isUserRelated(note, this.blocking)) return; + if (isUserRelated(note, this.userIdsWhoBlockingMe)) return; - if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return; + if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return; // 流れてきたNoteがミュートすべきNoteだったら無視する // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) diff --git a/packages/backend/src/server/api/stream/channels/hashtag.ts b/packages/backend/src/server/api/stream/channels/hashtag.ts index 98dc858ded..0268fdedde 100644 --- a/packages/backend/src/server/api/stream/channels/hashtag.ts +++ b/packages/backend/src/server/api/stream/channels/hashtag.ts @@ -46,11 +46,11 @@ class HashtagChannel extends Channel { } // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する - if (isUserRelated(note, this.muting)) return; + if (isUserRelated(note, this.userIdsWhoMeMuting)) return; // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する - if (isUserRelated(note, this.blocking)) return; + if (isUserRelated(note, this.userIdsWhoBlockingMe)) return; - if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return; + if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return; this.connection.cacheNote(note); diff --git a/packages/backend/src/server/api/stream/channels/home-timeline.ts b/packages/backend/src/server/api/stream/channels/home-timeline.ts index c623fef64a..ee874ad81e 100644 --- a/packages/backend/src/server/api/stream/channels/home-timeline.ts +++ b/packages/backend/src/server/api/stream/channels/home-timeline.ts @@ -24,7 +24,6 @@ class HomeTimelineChannel extends Channel { @bindThis public async init(params: any) { - // Subscribe events this.subscriber.on('notesStream', this.onNote); } @@ -38,7 +37,7 @@ class HomeTimelineChannel extends Channel { } // Ignore notes from instances the user has muted - if (isInstanceMuted(note, new Set(this.userProfile?.mutedInstances ?? []))) return; + if (isInstanceMuted(note, new Set(this.userProfile!.mutedInstances ?? []))) return; if (['followers', 'specified'].includes(note.visibility)) { note = await this.noteEntityService.pack(note.id, this.user!, { @@ -71,18 +70,18 @@ class HomeTimelineChannel extends Channel { } // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する - if (isUserRelated(note, this.muting)) return; + if (isUserRelated(note, this.userIdsWhoMeMuting)) return; // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する - if (isUserRelated(note, this.blocking)) return; + if (isUserRelated(note, this.userIdsWhoBlockingMe)) return; - if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return; + if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return; // 流れてきたNoteがミュートすべきNoteだったら無視する // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) // 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、 // レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。 // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる - if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + if (await checkWordMute(note, this.user, this.userProfile!.mutedWords)) return; this.connection.cacheNote(note); diff --git a/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts b/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts index f54767bc9d..4f7b4e78b6 100644 --- a/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts +++ b/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts @@ -72,7 +72,7 @@ class HybridTimelineChannel extends Channel { } // Ignore notes from instances the user has muted - if (isInstanceMuted(note, new Set(this.userProfile?.mutedInstances ?? []))) return; + if (isInstanceMuted(note, new Set(this.userProfile!.mutedInstances ?? []))) return; // 関係ない返信は除外 if (note.reply && !this.user!.showTimelineReplies) { @@ -82,11 +82,11 @@ class HybridTimelineChannel extends Channel { } // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する - if (isUserRelated(note, this.muting)) return; + if (isUserRelated(note, this.userIdsWhoMeMuting)) return; // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する - if (isUserRelated(note, this.blocking)) return; + if (isUserRelated(note, this.userIdsWhoBlockingMe)) return; - if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return; + if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return; // 流れてきたNoteがミュートすべきNoteだったら無視する // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) diff --git a/packages/backend/src/server/api/stream/channels/local-timeline.ts b/packages/backend/src/server/api/stream/channels/local-timeline.ts index eb0642900d..836c5aae6c 100644 --- a/packages/backend/src/server/api/stream/channels/local-timeline.ts +++ b/packages/backend/src/server/api/stream/channels/local-timeline.ts @@ -61,11 +61,11 @@ class LocalTimelineChannel extends Channel { } // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する - if (isUserRelated(note, this.muting)) return; + if (isUserRelated(note, this.userIdsWhoMeMuting)) return; // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する - if (isUserRelated(note, this.blocking)) return; + if (isUserRelated(note, this.userIdsWhoBlockingMe)) return; - if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return; + if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return; // 流れてきたNoteがミュートすべきNoteだったら無視する // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) diff --git a/packages/backend/src/server/api/stream/channels/main.ts b/packages/backend/src/server/api/stream/channels/main.ts index 4dd16b530a..139320ce35 100644 --- a/packages/backend/src/server/api/stream/channels/main.ts +++ b/packages/backend/src/server/api/stream/channels/main.ts @@ -26,7 +26,7 @@ class MainChannel extends Channel { case 'notification': { // Ignore notifications from instances the user has muted if (isUserFromMutedInstance(data.body, new Set(this.userProfile?.mutedInstances ?? []))) return; - if (data.body.userId && this.muting.has(data.body.userId)) return; + if (data.body.userId && this.userIdsWhoMeMuting.has(data.body.userId)) return; if (data.body.note && data.body.note.isHidden) { const note = await this.noteEntityService.pack(data.body.note.id, this.user, { @@ -40,7 +40,7 @@ class MainChannel extends Channel { case 'mention': { if (isInstanceMuted(data.body, new Set(this.userProfile?.mutedInstances ?? []))) return; - if (this.muting.has(data.body.userId)) return; + if (this.userIdsWhoMeMuting.has(data.body.userId)) return; if (data.body.isHidden) { const note = await this.noteEntityService.pack(data.body.id, this.user, { detail: true, diff --git a/packages/backend/src/server/api/stream/channels/user-list.ts b/packages/backend/src/server/api/stream/channels/user-list.ts index 8a42e99a54..8802fc5ab8 100644 --- a/packages/backend/src/server/api/stream/channels/user-list.ts +++ b/packages/backend/src/server/api/stream/channels/user-list.ts @@ -89,11 +89,11 @@ class UserListChannel extends Channel { } // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する - if (isUserRelated(note, this.muting)) return; + if (isUserRelated(note, this.userIdsWhoMeMuting)) return; // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する - if (isUserRelated(note, this.blocking)) return; + if (isUserRelated(note, this.userIdsWhoBlockingMe)) return; - if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return; + if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return; this.send('note', note); } diff --git a/packages/backend/src/server/api/stream/index.ts b/packages/backend/src/server/api/stream/index.ts index 2f473cd012..a6f9145952 100644 --- a/packages/backend/src/server/api/stream/index.ts +++ b/packages/backend/src/server/api/stream/index.ts @@ -1,13 +1,11 @@ import type { User } from '@/models/entities/User.js'; -import type { Channel as ChannelModel } from '@/models/entities/Channel.js'; -import type { FollowingsRepository, MutingsRepository, RenoteMutingsRepository, UserProfilesRepository, ChannelFollowingsRepository, BlockingsRepository } from '@/models/index.js'; import type { AccessToken } from '@/models/entities/AccessToken.js'; -import type { UserProfile } from '@/models/entities/UserProfile.js'; import type { Packed } from '@/misc/json-schema.js'; -import type { GlobalEventService } from '@/core/GlobalEventService.js'; import type { NoteReadService } from '@/core/NoteReadService.js'; import type { NotificationService } from '@/core/NotificationService.js'; import { bindThis } from '@/decorators.js'; +import { CacheService } from '@/core/CacheService.js'; +import { UserProfile } from '@/models/index.js'; import type { ChannelsService } from './ChannelsService.js'; import type * as websocket from 'websocket'; import type { EventEmitter } from 'events'; @@ -19,106 +17,71 @@ import type { StreamEventEmitter, StreamMessages } from './types.js'; */ export default class Connection { public user?: User; - public userProfile?: UserProfile | null; - public following: Set = new Set(); - public muting: Set = new Set(); - public renoteMuting: Set = new Set(); - public blocking: Set = new Set(); // "被"blocking - public followingChannels: Set = new Set(); public token?: AccessToken; private wsConnection: websocket.connection; public subscriber: StreamEventEmitter; private channels: Channel[] = []; private subscribingNotes: any = {}; private cachedNotes: Packed<'Note'>[] = []; + public userProfile: UserProfile | null = null; + public following: Set = new Set(); + public followingChannels: Set = new Set(); + public userIdsWhoMeMuting: Set = new Set(); + public userIdsWhoBlockingMe: Set = new Set(); + public userIdsWhoMeMutingRenotes: Set = new Set(); + private fetchIntervalId: NodeJS.Timer | null = null; constructor( - private followingsRepository: FollowingsRepository, - private mutingsRepository: MutingsRepository, - private renoteMutingsRepository: RenoteMutingsRepository, - private blockingsRepository: BlockingsRepository, - private channelFollowingsRepository: ChannelFollowingsRepository, - private userProfilesRepository: UserProfilesRepository, private channelsService: ChannelsService, - private globalEventService: GlobalEventService, private noteReadService: NoteReadService, private notificationService: NotificationService, + private cacheService: CacheService, - wsConnection: websocket.connection, subscriber: EventEmitter, user: User | null | undefined, token: AccessToken | null | undefined, ) { - this.wsConnection = wsConnection; this.subscriber = subscriber; if (user) this.user = user; if (token) this.token = token; + } - //this.onWsConnectionMessage = this.onWsConnectionMessage.bind(this); - //this.onUserEvent = this.onUserEvent.bind(this); - //this.onNoteStreamMessage = this.onNoteStreamMessage.bind(this); - //this.onBroadcastMessage = this.onBroadcastMessage.bind(this); - - this.wsConnection.on('message', this.onWsConnectionMessage); - - this.subscriber.on('broadcast', data => { - this.onBroadcastMessage(data); - }); + @bindThis + public async fetch() { + if (this.user == null) return; + const [userProfile, following, followingChannels, userIdsWhoMeMuting, userIdsWhoBlockingMe, userIdsWhoMeMutingRenotes] = await Promise.all([ + this.cacheService.userProfileCache.fetch(this.user.id), + this.cacheService.userFollowingsCache.fetch(this.user.id), + this.cacheService.userFollowingChannelsCache.fetch(this.user.id), + this.cacheService.userMutingsCache.fetch(this.user.id), + this.cacheService.userBlockedCache.fetch(this.user.id), + this.cacheService.renoteMutingsCache.fetch(this.user.id), + ]); + this.userProfile = userProfile; + this.following = following; + this.followingChannels = followingChannels; + this.userIdsWhoMeMuting = userIdsWhoMeMuting; + this.userIdsWhoBlockingMe = userIdsWhoBlockingMe; + this.userIdsWhoMeMutingRenotes = userIdsWhoMeMutingRenotes; + } - if (this.user) { - this.updateFollowing(); - this.updateMuting(); - this.updateRenoteMuting(); - this.updateBlocking(); - this.updateFollowingChannels(); - this.updateUserProfile(); + @bindThis + public async init() { + if (this.user != null) { + await this.fetch(); - this.subscriber.on(`user:${this.user.id}`, this.onUserEvent); + this.fetchIntervalId = setInterval(this.fetch, 1000 * 10); } } @bindThis - private onUserEvent(data: StreamMessages['user']['payload']) { // { type, body }と展開するとそれぞれ型が分離してしまう - switch (data.type) { - case 'follow': - this.following.add(data.body.id); - break; - - case 'unfollow': - this.following.delete(data.body.id); - break; - - case 'mute': - this.muting.add(data.body.id); - break; - - case 'unmute': - this.muting.delete(data.body.id); - break; - - // TODO: renote mute events - // TODO: block events - - case 'followChannel': - this.followingChannels.add(data.body.id); - break; - - case 'unfollowChannel': - this.followingChannels.delete(data.body.id); - break; - - case 'updateUserProfile': - this.userProfile = data.body; - break; - - case 'terminate': - this.wsConnection.close(); - this.dispose(); - break; - - default: - break; - } + public async init2(wsConnection: websocket.connection) { + this.wsConnection = wsConnection; + this.wsConnection.on('message', this.onWsConnectionMessage); + + this.subscriber.on('broadcast', data => { + this.onBroadcastMessage(data); + }); } /** @@ -318,78 +281,12 @@ export default class Connection { } } - @bindThis - private async updateFollowing() { - const followings = await this.followingsRepository.find({ - where: { - followerId: this.user!.id, - }, - select: ['followeeId'], - }); - - this.following = new Set(followings.map(x => x.followeeId)); - } - - @bindThis - private async updateMuting() { - const mutings = await this.mutingsRepository.find({ - where: { - muterId: this.user!.id, - }, - select: ['muteeId'], - }); - - this.muting = new Set(mutings.map(x => x.muteeId)); - } - - @bindThis - private async updateRenoteMuting() { - const renoteMutings = await this.renoteMutingsRepository.find({ - where: { - muterId: this.user!.id, - }, - select: ['muteeId'], - }); - - this.renoteMuting = new Set(renoteMutings.map(x => x.muteeId)); - } - - @bindThis - private async updateBlocking() { // ここでいうBlockingは被Blockingの意 - const blockings = await this.blockingsRepository.find({ - where: { - blockeeId: this.user!.id, - }, - select: ['blockerId'], - }); - - this.blocking = new Set(blockings.map(x => x.blockerId)); - } - - @bindThis - private async updateFollowingChannels() { - const followings = await this.channelFollowingsRepository.find({ - where: { - followerId: this.user!.id, - }, - select: ['followeeId'], - }); - - this.followingChannels = new Set(followings.map(x => x.followeeId)); - } - - @bindThis - private async updateUserProfile() { - this.userProfile = await this.userProfilesRepository.findOneBy({ - userId: this.user!.id, - }); - } - /** * ストリームが切れたとき */ @bindThis public dispose() { + if (this.fetchIntervalId) clearInterval(this.fetchIntervalId); for (const c of this.channels.filter(c => c.dispose)) { if (c.dispose) c.dispose(); } diff --git a/packages/backend/src/server/api/stream/types.ts b/packages/backend/src/server/api/stream/types.ts index f4eedc3964..ed73897e73 100644 --- a/packages/backend/src/server/api/stream/types.ts +++ b/packages/backend/src/server/api/stream/types.ts @@ -38,6 +38,11 @@ export interface InternalStreamTypes { antennaDeleted: Antenna; antennaUpdated: Antenna; metaUpdated: Meta; + followChannel: { userId: User['id']; channelId: Channel['id']; }; + unfollowChannel: { userId: User['id']; channelId: Channel['id']; }; + updateUserProfile: UserProfile; + mute: { muterId: User['id']; muteeId: User['id']; }; + unmute: { muterId: User['id']; muteeId: User['id']; }; } export interface BroadcastTypes { @@ -56,18 +61,6 @@ export interface BroadcastTypes { }; } -export interface UserStreamTypes { - terminate: Record; - followChannel: Channel; - unfollowChannel: Channel; - updateUserProfile: UserProfile; - mute: User; - unmute: User; - follow: Packed<'UserDetailedNotMe'>; - unfollow: Packed<'User'>; - userAdded: Packed<'User'>; -} - export interface MainStreamTypes { notification: Packed<'Notification'>; mention: Packed<'Note'>; @@ -200,10 +193,6 @@ export type StreamMessages = { name: 'broadcast'; payload: EventUnionFromDictionary>; }; - user: { - name: `user:${User['id']}`; - payload: EventUnionFromDictionary>; - }; main: { name: `mainStream:${User['id']}`; payload: EventUnionFromDictionary>; -- cgit v1.2.3-freya