diff options
Diffstat (limited to 'packages/backend/src')
| -rw-r--r-- | packages/backend/src/GlobalModule.ts | 14 | ||||
| -rw-r--r-- | packages/backend/src/config.ts | 3 | ||||
| -rw-r--r-- | packages/backend/src/const.ts | 2 | ||||
| -rw-r--r-- | packages/backend/src/core/CoreModule.ts | 6 | ||||
| -rw-r--r-- | packages/backend/src/core/QueueService.ts | 6 | ||||
| -rw-r--r-- | packages/backend/src/core/ReactionService.ts | 60 | ||||
| -rw-r--r-- | packages/backend/src/core/ReactionsBufferingService.ts | 162 | ||||
| -rw-r--r-- | packages/backend/src/core/entities/NoteEntityService.ts | 80 | ||||
| -rw-r--r-- | packages/backend/src/di-symbols.ts | 1 | ||||
| -rw-r--r-- | packages/backend/src/models/Meta.ts | 5 | ||||
| -rw-r--r-- | packages/backend/src/queue/QueueProcessorModule.ts | 2 | ||||
| -rw-r--r-- | packages/backend/src/queue/QueueProcessorService.ts | 3 | ||||
| -rw-r--r-- | packages/backend/src/queue/processors/BakeBufferedReactionsProcessorService.ts | 40 | ||||
| -rw-r--r-- | packages/backend/src/server/HealthServerService.ts | 4 | ||||
| -rw-r--r-- | packages/backend/src/server/api/endpoints/admin/meta.ts | 5 | ||||
| -rw-r--r-- | packages/backend/src/server/api/endpoints/admin/update-meta.ts | 5 |
16 files changed, 357 insertions, 41 deletions
diff --git a/packages/backend/src/GlobalModule.ts b/packages/backend/src/GlobalModule.ts index 09971e8ca0..2ecc1f4742 100644 --- a/packages/backend/src/GlobalModule.ts +++ b/packages/backend/src/GlobalModule.ts @@ -78,11 +78,19 @@ const $redisForTimelines: Provider = { inject: [DI.config], }; +const $redisForReactions: Provider = { + provide: DI.redisForReactions, + useFactory: (config: Config) => { + return new Redis.Redis(config.redisForReactions); + }, + inject: [DI.config], +}; + @Global() @Module({ imports: [RepositoryModule], - providers: [$config, $db, $meilisearch, $redis, $redisForPub, $redisForSub, $redisForTimelines], - exports: [$config, $db, $meilisearch, $redis, $redisForPub, $redisForSub, $redisForTimelines, RepositoryModule], + providers: [$config, $db, $meilisearch, $redis, $redisForPub, $redisForSub, $redisForTimelines, $redisForReactions], + exports: [$config, $db, $meilisearch, $redis, $redisForPub, $redisForSub, $redisForTimelines, $redisForReactions, RepositoryModule], }) export class GlobalModule implements OnApplicationShutdown { constructor( @@ -91,6 +99,7 @@ export class GlobalModule implements OnApplicationShutdown { @Inject(DI.redisForPub) private redisForPub: Redis.Redis, @Inject(DI.redisForSub) private redisForSub: Redis.Redis, @Inject(DI.redisForTimelines) private redisForTimelines: Redis.Redis, + @Inject(DI.redisForReactions) private redisForReactions: Redis.Redis, ) { } public async dispose(): Promise<void> { @@ -103,6 +112,7 @@ export class GlobalModule implements OnApplicationShutdown { this.redisForPub.disconnect(), this.redisForSub.disconnect(), this.redisForTimelines.disconnect(), + this.redisForReactions.disconnect(), ]); } diff --git a/packages/backend/src/config.ts b/packages/backend/src/config.ts index cbd6d1c086..97ba79c574 100644 --- a/packages/backend/src/config.ts +++ b/packages/backend/src/config.ts @@ -49,6 +49,7 @@ type Source = { redisForPubsub?: RedisOptionsSource; redisForJobQueue?: RedisOptionsSource; redisForTimelines?: RedisOptionsSource; + redisForReactions?: RedisOptionsSource; meilisearch?: { host: string; port: string; @@ -171,6 +172,7 @@ export type Config = { redisForPubsub: RedisOptions & RedisOptionsSource; redisForJobQueue: RedisOptions & RedisOptionsSource; redisForTimelines: RedisOptions & RedisOptionsSource; + redisForReactions: RedisOptions & RedisOptionsSource; sentryForBackend: { options: Partial<Sentry.NodeOptions>; enableNodeProfiling: boolean; } | undefined; sentryForFrontend: { options: Partial<Sentry.NodeOptions> } | undefined; perChannelMaxNoteCacheCount: number; @@ -251,6 +253,7 @@ export function loadConfig(): Config { redisForPubsub: config.redisForPubsub ? convertRedisOptions(config.redisForPubsub, host) : redis, redisForJobQueue: config.redisForJobQueue ? convertRedisOptions(config.redisForJobQueue, host) : redis, redisForTimelines: config.redisForTimelines ? convertRedisOptions(config.redisForTimelines, host) : redis, + redisForReactions: config.redisForReactions ? convertRedisOptions(config.redisForReactions, host) : redis, sentryForBackend: config.sentryForBackend, sentryForFrontend: config.sentryForFrontend, id: config.id, diff --git a/packages/backend/src/const.ts b/packages/backend/src/const.ts index a238f4973a..e3a61861f4 100644 --- a/packages/backend/src/const.ts +++ b/packages/backend/src/const.ts @@ -8,6 +8,8 @@ export const MAX_NOTE_TEXT_LENGTH = 3000; export const USER_ONLINE_THRESHOLD = 1000 * 60 * 10; // 10min export const USER_ACTIVE_THRESHOLD = 1000 * 60 * 60 * 24 * 3; // 3days +export const PER_NOTE_REACTION_USER_PAIR_CACHE_MAX = 16; + //#region hard limits // If you change DB_* values, you must also change the DB schema. diff --git a/packages/backend/src/core/CoreModule.ts b/packages/backend/src/core/CoreModule.ts index 674241ac12..3b3c35f976 100644 --- a/packages/backend/src/core/CoreModule.ts +++ b/packages/backend/src/core/CoreModule.ts @@ -50,6 +50,7 @@ import { PollService } from './PollService.js'; import { PushNotificationService } from './PushNotificationService.js'; import { QueryService } from './QueryService.js'; import { ReactionService } from './ReactionService.js'; +import { ReactionsBufferingService } from './ReactionsBufferingService.js'; import { RelayService } from './RelayService.js'; import { RoleService } from './RoleService.js'; import { S3Service } from './S3Service.js'; @@ -193,6 +194,7 @@ const $ProxyAccountService: Provider = { provide: 'ProxyAccountService', useExis const $PushNotificationService: Provider = { provide: 'PushNotificationService', useExisting: PushNotificationService }; const $QueryService: Provider = { provide: 'QueryService', useExisting: QueryService }; const $ReactionService: Provider = { provide: 'ReactionService', useExisting: ReactionService }; +const $ReactionsBufferingService: Provider = { provide: 'ReactionsBufferingService', useExisting: ReactionsBufferingService }; const $RelayService: Provider = { provide: 'RelayService', useExisting: RelayService }; const $RoleService: Provider = { provide: 'RoleService', useExisting: RoleService }; const $S3Service: Provider = { provide: 'S3Service', useExisting: S3Service }; @@ -342,6 +344,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting PushNotificationService, QueryService, ReactionService, + ReactionsBufferingService, RelayService, RoleService, S3Service, @@ -487,6 +490,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting $PushNotificationService, $QueryService, $ReactionService, + $ReactionsBufferingService, $RelayService, $RoleService, $S3Service, @@ -633,6 +637,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting PushNotificationService, QueryService, ReactionService, + ReactionsBufferingService, RelayService, RoleService, S3Service, @@ -777,6 +782,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting $PushNotificationService, $QueryService, $ReactionService, + $ReactionsBufferingService, $RelayService, $RoleService, $S3Service, diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts index ddb90a051f..f35e456556 100644 --- a/packages/backend/src/core/QueueService.ts +++ b/packages/backend/src/core/QueueService.ts @@ -87,6 +87,12 @@ export class QueueService { repeat: { pattern: '*/5 * * * *' }, removeOnComplete: true, }); + + this.systemQueue.add('bakeBufferedReactions', { + }, { + repeat: { pattern: '0 0 * * *' }, + removeOnComplete: true, + }); } @bindThis diff --git a/packages/backend/src/core/ReactionService.ts b/packages/backend/src/core/ReactionService.ts index 371207c33a..5993c42a1f 100644 --- a/packages/backend/src/core/ReactionService.ts +++ b/packages/backend/src/core/ReactionService.ts @@ -4,7 +4,6 @@ */ import { Inject, Injectable } from '@nestjs/common'; -import * as Redis from 'ioredis'; import { DI } from '@/di-symbols.js'; import type { EmojisRepository, NoteReactionsRepository, UsersRepository, NotesRepository } from '@/models/_.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; @@ -30,9 +29,10 @@ import { RoleService } from '@/core/RoleService.js'; import { FeaturedService } from '@/core/FeaturedService.js'; import { trackPromise } from '@/misc/promise-tracker.js'; import { isQuote, isRenote } from '@/misc/is-renote.js'; +import { ReactionsBufferingService } from '@/core/ReactionsBufferingService.js'; +import { PER_NOTE_REACTION_USER_PAIR_CACHE_MAX } from '@/const.js'; const FALLBACK = '\u2764'; -const PER_NOTE_REACTION_USER_PAIR_CACHE_MAX = 16; const legacies: Record<string, string> = { 'like': '👍', @@ -71,9 +71,6 @@ const decodeCustomEmojiRegexp = /^:([\w+-]+)(?:@([\w.-]+))?:$/; @Injectable() export class ReactionService { constructor( - @Inject(DI.redis) - private redisClient: Redis.Redis, - @Inject(DI.usersRepository) private usersRepository: UsersRepository, @@ -93,6 +90,7 @@ export class ReactionService { private userEntityService: UserEntityService, private noteEntityService: NoteEntityService, private userBlockingService: UserBlockingService, + private reactionsBufferingService: ReactionsBufferingService, private idService: IdService, private featuredService: FeaturedService, private globalEventService: GlobalEventService, @@ -174,7 +172,6 @@ export class ReactionService { reaction, }; - // Create reaction try { await this.noteReactionsRepository.insert(record); } catch (e) { @@ -198,16 +195,25 @@ export class ReactionService { } // Increment reactions count - const sql = `jsonb_set("reactions", '{${reaction}}', (COALESCE("reactions"->>'${reaction}', '0')::int + 1)::text::jsonb)`; - await this.notesRepository.createQueryBuilder().update() - .set({ - reactions: () => sql, - ...(note.reactionAndUserPairCache.length < PER_NOTE_REACTION_USER_PAIR_CACHE_MAX ? { - reactionAndUserPairCache: () => `array_append("reactionAndUserPairCache", '${user.id}/${reaction}')`, - } : {}), - }) - .where('id = :id', { id: note.id }) - .execute(); + if (meta.enableReactionsBuffering) { + await this.reactionsBufferingService.create(note.id, user.id, reaction, note.reactionAndUserPairCache); + + // for debugging + if (reaction === ':angry_ai:') { + this.reactionsBufferingService.bake(); + } + } else { + const sql = `jsonb_set("reactions", '{${reaction}}', (COALESCE("reactions"->>'${reaction}', '0')::int + 1)::text::jsonb)`; + await this.notesRepository.createQueryBuilder().update() + .set({ + reactions: () => sql, + ...(note.reactionAndUserPairCache.length < PER_NOTE_REACTION_USER_PAIR_CACHE_MAX ? { + reactionAndUserPairCache: () => `array_append("reactionAndUserPairCache", '${user.id}/${reaction}')`, + } : {}), + }) + .where('id = :id', { id: note.id }) + .execute(); + } // 30%の確率、セルフではない、3日以内に投稿されたノートの場合ハイライト用ランキング更新 if ( @@ -304,15 +310,21 @@ export class ReactionService { throw new IdentifiableError('60527ec9-b4cb-4a88-a6bd-32d3ad26817d', 'not reacted'); } + const meta = await this.metaService.fetch(); + // Decrement reactions count - const sql = `jsonb_set("reactions", '{${exist.reaction}}', (COALESCE("reactions"->>'${exist.reaction}', '0')::int - 1)::text::jsonb)`; - await this.notesRepository.createQueryBuilder().update() - .set({ - reactions: () => sql, - reactionAndUserPairCache: () => `array_remove("reactionAndUserPairCache", '${user.id}/${exist.reaction}')`, - }) - .where('id = :id', { id: note.id }) - .execute(); + if (meta.enableReactionsBuffering) { + await this.reactionsBufferingService.delete(note.id, user.id, exist.reaction); + } else { + const sql = `jsonb_set("reactions", '{${exist.reaction}}', (COALESCE("reactions"->>'${exist.reaction}', '0')::int - 1)::text::jsonb)`; + await this.notesRepository.createQueryBuilder().update() + .set({ + reactions: () => sql, + reactionAndUserPairCache: () => `array_remove("reactionAndUserPairCache", '${user.id}/${exist.reaction}')`, + }) + .where('id = :id', { id: note.id }) + .execute(); + } this.globalEventService.publishNoteStream(note.id, 'unreacted', { reaction: this.decodeReaction(exist.reaction).reaction, diff --git a/packages/backend/src/core/ReactionsBufferingService.ts b/packages/backend/src/core/ReactionsBufferingService.ts new file mode 100644 index 0000000000..b1a197feeb --- /dev/null +++ b/packages/backend/src/core/ReactionsBufferingService.ts @@ -0,0 +1,162 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Inject, Injectable } from '@nestjs/common'; +import * as Redis from 'ioredis'; +import { DI } from '@/di-symbols.js'; +import type { MiNote } from '@/models/Note.js'; +import { bindThis } from '@/decorators.js'; +import type { MiUser, NotesRepository } from '@/models/_.js'; +import type { Config } from '@/config.js'; +import { PER_NOTE_REACTION_USER_PAIR_CACHE_MAX } from '@/const.js'; + +const REDIS_DELTA_PREFIX = 'reactionsBufferDeltas'; +const REDIS_PAIR_PREFIX = 'reactionsBufferPairs'; + +@Injectable() +export class ReactionsBufferingService { + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.redisForReactions) + private redisForReactions: Redis.Redis, // TODO: 専用のRedisインスタンスにする + + @Inject(DI.notesRepository) + private notesRepository: NotesRepository, + ) { + } + + @bindThis + public async create(noteId: MiNote['id'], userId: MiUser['id'], reaction: string, currentPairs: string[]): Promise<void> { + const pipeline = this.redisForReactions.pipeline(); + pipeline.hincrby(`${REDIS_DELTA_PREFIX}:${noteId}`, reaction, 1); + for (let i = 0; i < currentPairs.length; i++) { + pipeline.zadd(`${REDIS_PAIR_PREFIX}:${noteId}`, i, currentPairs[i]); + } + pipeline.zadd(`${REDIS_PAIR_PREFIX}:${noteId}`, Date.now(), `${userId}/${reaction}`); + pipeline.zremrangebyrank(`${REDIS_PAIR_PREFIX}:${noteId}`, 0, -(PER_NOTE_REACTION_USER_PAIR_CACHE_MAX + 1)); + await pipeline.exec(); + } + + @bindThis + public async delete(noteId: MiNote['id'], userId: MiUser['id'], reaction: string): Promise<void> { + const pipeline = this.redisForReactions.pipeline(); + pipeline.hincrby(`${REDIS_DELTA_PREFIX}:${noteId}`, reaction, -1); + pipeline.zrem(`${REDIS_PAIR_PREFIX}:${noteId}`, `${userId}/${reaction}`); + // TODO: 「消した要素一覧」も持っておかないとcreateされた時に上書きされて復活する + await pipeline.exec(); + } + + @bindThis + public async get(noteId: MiNote['id']): Promise<{ + deltas: Record<string, number>; + pairs: ([MiUser['id'], string])[]; + }> { + const pipeline = this.redisForReactions.pipeline(); + pipeline.hgetall(`${REDIS_DELTA_PREFIX}:${noteId}`); + pipeline.zrange(`${REDIS_PAIR_PREFIX}:${noteId}`, 0, -1); + const results = await pipeline.exec(); + + const resultDeltas = results![0][1] as Record<string, string>; + const resultPairs = results![1][1] as string[]; + + const deltas = {} as Record<string, number>; + for (const [name, count] of Object.entries(resultDeltas)) { + deltas[name] = parseInt(count); + } + + const pairs = resultPairs.map(x => x.split('/') as [MiUser['id'], string]); + + return { + deltas, + pairs, + }; + } + + @bindThis + public async getMany(noteIds: MiNote['id'][]): Promise<Map<MiNote['id'], { + deltas: Record<string, number>; + pairs: ([MiUser['id'], string])[]; + }>> { + const map = new Map<MiNote['id'], { + deltas: Record<string, number>; + pairs: ([MiUser['id'], string])[]; + }>(); + + const pipeline = this.redisForReactions.pipeline(); + for (const noteId of noteIds) { + pipeline.hgetall(`${REDIS_DELTA_PREFIX}:${noteId}`); + pipeline.zrange(`${REDIS_PAIR_PREFIX}:${noteId}`, 0, -1); + } + const results = await pipeline.exec(); + + const opsForEachNotes = 2; + for (let i = 0; i < noteIds.length; i++) { + const noteId = noteIds[i]; + const resultDeltas = results![i * opsForEachNotes][1] as Record<string, string>; + const resultPairs = results![i * opsForEachNotes + 1][1] as string[]; + + const deltas = {} as Record<string, number>; + for (const [name, count] of Object.entries(resultDeltas)) { + deltas[name] = parseInt(count); + } + + const pairs = resultPairs.map(x => x.split('/') as [MiUser['id'], string]); + + map.set(noteId, { + deltas, + pairs, + }); + } + + return map; + } + + // TODO: scanは重い可能性があるので、別途 bufferedNoteIds を直接Redis上に持っておいてもいいかもしれない + @bindThis + public async bake(): Promise<void> { + const bufferedNoteIds = []; + let cursor = '0'; + do { + // https://github.com/redis/ioredis#transparent-key-prefixing + const result = await this.redisForReactions.scan( + cursor, + 'MATCH', + `${this.config.redis.prefix}:${REDIS_DELTA_PREFIX}:*`, + 'COUNT', + '1000'); + + cursor = result[0]; + bufferedNoteIds.push(...result[1].map(x => x.replace(`${this.config.redis.prefix}:${REDIS_DELTA_PREFIX}:`, ''))); + } while (cursor !== '0'); + + const bufferedMap = await this.getMany(bufferedNoteIds); + + // clear + const pipeline = this.redisForReactions.pipeline(); + for (const noteId of bufferedNoteIds) { + pipeline.del(`${REDIS_DELTA_PREFIX}:${noteId}`); + pipeline.del(`${REDIS_PAIR_PREFIX}:${noteId}`); + } + await pipeline.exec(); + + // TODO: SQL一個にまとめたい + for (const [noteId, buffered] of bufferedMap) { + const sql = Object.entries(buffered.deltas) + .map(([reaction, count]) => + `jsonb_set("reactions", '{${reaction}}', (COALESCE("reactions"->>'${reaction}', '0')::int + ${count})::text::jsonb)`) + .join(' || '); + + this.notesRepository.createQueryBuilder().update() + .set({ + reactions: () => sql, + reactionAndUserPairCache: buffered.pairs.map(x => x.join('/')), + }) + .where('id = :id', { id: noteId }) + .execute(); + } + } +} diff --git a/packages/backend/src/core/entities/NoteEntityService.ts b/packages/backend/src/core/entities/NoteEntityService.ts index 2cd092231c..7506d804c3 100644 --- a/packages/backend/src/core/entities/NoteEntityService.ts +++ b/packages/backend/src/core/entities/NoteEntityService.ts @@ -11,24 +11,39 @@ import type { Packed } from '@/misc/json-schema.js'; import { awaitAll } from '@/misc/prelude/await-all.js'; import type { MiUser } from '@/models/User.js'; import type { MiNote } from '@/models/Note.js'; -import type { MiNoteReaction } from '@/models/NoteReaction.js'; import type { UsersRepository, NotesRepository, FollowingsRepository, PollsRepository, PollVotesRepository, NoteReactionsRepository, ChannelsRepository } from '@/models/_.js'; import { bindThis } from '@/decorators.js'; import { DebounceLoader } from '@/misc/loader.js'; import { IdService } from '@/core/IdService.js'; +import { ReactionsBufferingService } from '@/core/ReactionsBufferingService.js'; +import { MetaService } from '@/core/MetaService.js'; import type { OnModuleInit } from '@nestjs/common'; import type { CustomEmojiService } from '../CustomEmojiService.js'; import type { ReactionService } from '../ReactionService.js'; import type { UserEntityService } from './UserEntityService.js'; import type { DriveFileEntityService } from './DriveFileEntityService.js'; +function mergeReactions(src: Record<string, number>, delta: Record<string, number>) { + const reactions = { ...src }; + for (const [name, count] of Object.entries(delta)) { + if (reactions[name] != null) { + reactions[name] += count; + } else { + reactions[name] = count; + } + } + return reactions; +} + @Injectable() export class NoteEntityService implements OnModuleInit { private userEntityService: UserEntityService; private driveFileEntityService: DriveFileEntityService; private customEmojiService: CustomEmojiService; private reactionService: ReactionService; + private reactionsBufferingService: ReactionsBufferingService; private idService: IdService; + private metaService: MetaService; private noteLoader = new DebounceLoader(this.findNoteOrFail); constructor( @@ -59,6 +74,9 @@ export class NoteEntityService implements OnModuleInit { //private driveFileEntityService: DriveFileEntityService, //private customEmojiService: CustomEmojiService, //private reactionService: ReactionService, + //private reactionsBufferingService: ReactionsBufferingService, + //private idService: IdService, + //private metaService: MetaService, ) { } @@ -67,7 +85,9 @@ export class NoteEntityService implements OnModuleInit { this.driveFileEntityService = this.moduleRef.get('DriveFileEntityService'); this.customEmojiService = this.moduleRef.get('CustomEmojiService'); this.reactionService = this.moduleRef.get('ReactionService'); + this.reactionsBufferingService = this.moduleRef.get('ReactionsBufferingService'); this.idService = this.moduleRef.get('IdService'); + this.metaService = this.moduleRef.get('MetaService'); } @bindThis @@ -287,6 +307,7 @@ export class NoteEntityService implements OnModuleInit { skipHide?: boolean; withReactionAndUserPairCache?: boolean; _hint_?: { + bufferdReactions: Map<MiNote['id'], { deltas: Record<string, number>; pairs: ([MiUser['id'], string])[] }> | null; myReactions: Map<MiNote['id'], string | null>; packedFiles: Map<MiNote['fileIds'][number], Packed<'DriveFile'> | null>; packedUsers: Map<MiUser['id'], Packed<'UserLite'>> @@ -303,6 +324,16 @@ export class NoteEntityService implements OnModuleInit { const note = typeof src === 'object' ? src : await this.noteLoader.load(src); const host = note.userHost; + const bufferdReactions = opts._hint_?.bufferdReactions != null ? (opts._hint_.bufferdReactions.get(note.id) ?? { deltas: {}, pairs: [] }) : await this.reactionsBufferingService.get(note.id); + const reactions = mergeReactions(note.reactions, bufferdReactions.deltas ?? {}); + for (const [name, count] of Object.entries(reactions)) { + if (count <= 0) { + delete reactions[name]; + } + } + + const reactionAndUserPairCache = note.reactionAndUserPairCache.concat(bufferdReactions.pairs.map(x => x.join('/'))); + let text = note.text; if (note.name && (note.url ?? note.uri)) { @@ -315,7 +346,7 @@ export class NoteEntityService implements OnModuleInit { : await this.channelsRepository.findOneBy({ id: note.channelId }) : null; - const reactionEmojiNames = Object.keys(note.reactions) + const reactionEmojiNames = Object.keys(reactions) .filter(x => x.startsWith(':') && x.includes('@') && !x.includes('@.')) // リモートカスタム絵文字のみ .map(x => this.reactionService.decodeReaction(x).reaction.replaceAll(':', '')); const packedFiles = options?._hint_?.packedFiles; @@ -334,10 +365,10 @@ export class NoteEntityService implements OnModuleInit { visibleUserIds: note.visibility === 'specified' ? note.visibleUserIds : undefined, renoteCount: note.renoteCount, repliesCount: note.repliesCount, - reactionCount: Object.values(note.reactions).reduce((a, b) => a + b, 0), - reactions: this.reactionService.convertLegacyReactions(note.reactions), + reactionCount: Object.values(reactions).reduce((a, b) => a + b, 0), + reactions: reactions, reactionEmojis: this.customEmojiService.populateEmojis(reactionEmojiNames, host), - reactionAndUserPairCache: opts.withReactionAndUserPairCache ? note.reactionAndUserPairCache : undefined, + reactionAndUserPairCache: opts.withReactionAndUserPairCache ? reactionAndUserPairCache : undefined, emojis: host != null ? this.customEmojiService.populateEmojis(note.emojis, host) : undefined, tags: note.tags.length > 0 ? note.tags : undefined, fileIds: note.fileIds, @@ -376,8 +407,12 @@ export class NoteEntityService implements OnModuleInit { poll: note.hasPoll ? this.populatePoll(note, meId) : undefined, - ...(meId && Object.keys(note.reactions).length > 0 ? { - myReaction: this.populateMyReaction(note, meId, options?._hint_), + ...(meId && Object.keys(reactions).length > 0 ? { + myReaction: this.populateMyReaction({ + id: note.id, + reactions: reactions, + reactionAndUserPairCache: reactionAndUserPairCache, + }, meId, options?._hint_), } : {}), } : {}), }); @@ -400,6 +435,10 @@ export class NoteEntityService implements OnModuleInit { ) { if (notes.length === 0) return []; + const meta = await this.metaService.fetch(); + + const bufferdReactions = meta.enableReactionsBuffering ? await this.reactionsBufferingService.getMany(notes.map(x => x.id)) : null; + const meId = me ? me.id : null; const myReactionsMap = new Map<MiNote['id'], string | null>(); if (meId) { @@ -410,23 +449,33 @@ export class NoteEntityService implements OnModuleInit { for (const note of notes) { if (note.renote && (note.text == null && note.fileIds.length === 0)) { // pure renote - const reactionsCount = Object.values(note.renote.reactions).reduce((a, b) => a + b, 0); + const reactionsCount = Object.values(mergeReactions(note.renote.reactions, bufferdReactions?.get(note.renote.id)?.deltas ?? {})).reduce((a, b) => a + b, 0); if (reactionsCount === 0) { myReactionsMap.set(note.renote.id, null); - } else if (reactionsCount <= note.renote.reactionAndUserPairCache.length) { - const pair = note.renote.reactionAndUserPairCache.find(p => p.startsWith(meId)); - myReactionsMap.set(note.renote.id, pair ? pair.split('/')[1] : null); + } else if (reactionsCount <= note.renote.reactionAndUserPairCache.length + (bufferdReactions?.get(note.renote.id)?.pairs.length ?? 0)) { + const pairInBuffer = bufferdReactions?.get(note.renote.id)?.pairs.find(p => p[0] === meId); + if (pairInBuffer) { + myReactionsMap.set(note.renote.id, pairInBuffer[1]); + } else { + const pair = note.renote.reactionAndUserPairCache.find(p => p.startsWith(meId)); + myReactionsMap.set(note.renote.id, pair ? pair.split('/')[1] : null); + } } else { idsNeedFetchMyReaction.add(note.renote.id); } } else { if (note.id < oldId) { - const reactionsCount = Object.values(note.reactions).reduce((a, b) => a + b, 0); + const reactionsCount = Object.values(mergeReactions(note.reactions, bufferdReactions?.get(note.id)?.deltas ?? {})).reduce((a, b) => a + b, 0); if (reactionsCount === 0) { myReactionsMap.set(note.id, null); - } else if (reactionsCount <= note.reactionAndUserPairCache.length) { - const pair = note.reactionAndUserPairCache.find(p => p.startsWith(meId)); - myReactionsMap.set(note.id, pair ? pair.split('/')[1] : null); + } else if (reactionsCount <= note.reactionAndUserPairCache.length + (bufferdReactions?.get(note.id)?.pairs.length ?? 0)) { + const pairInBuffer = bufferdReactions?.get(note.id)?.pairs.find(p => p[0] === meId); + if (pairInBuffer) { + myReactionsMap.set(note.id, pairInBuffer[1]); + } else { + const pair = note.reactionAndUserPairCache.find(p => p.startsWith(meId)); + myReactionsMap.set(note.id, pair ? pair.split('/')[1] : null); + } } else { idsNeedFetchMyReaction.add(note.id); } @@ -461,6 +510,7 @@ export class NoteEntityService implements OnModuleInit { return await Promise.all(notes.map(n => this.pack(n, me, { ...options, _hint_: { + bufferdReactions, myReactions: myReactionsMap, packedFiles, packedUsers, diff --git a/packages/backend/src/di-symbols.ts b/packages/backend/src/di-symbols.ts index 271082b4ff..b6f003c2e6 100644 --- a/packages/backend/src/di-symbols.ts +++ b/packages/backend/src/di-symbols.ts @@ -11,6 +11,7 @@ export const DI = { redisForPub: Symbol('redisForPub'), redisForSub: Symbol('redisForSub'), redisForTimelines: Symbol('redisForTimelines'), + redisForReactions: Symbol('redisForReactions'), //#region Repositories usersRepository: Symbol('usersRepository'), diff --git a/packages/backend/src/models/Meta.ts b/packages/backend/src/models/Meta.ts index 70d41801b5..9ab76d373f 100644 --- a/packages/backend/src/models/Meta.ts +++ b/packages/backend/src/models/Meta.ts @@ -589,6 +589,11 @@ export class MiMeta { }) public perUserListTimelineCacheMax: number; + @Column('boolean', { + default: false, + }) + public enableReactionsBuffering: boolean; + @Column('integer', { default: 0, }) diff --git a/packages/backend/src/queue/QueueProcessorModule.ts b/packages/backend/src/queue/QueueProcessorModule.ts index a1fd38fcc5..0027b5ef3d 100644 --- a/packages/backend/src/queue/QueueProcessorModule.ts +++ b/packages/backend/src/queue/QueueProcessorModule.ts @@ -14,6 +14,7 @@ import { InboxProcessorService } from './processors/InboxProcessorService.js'; import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js'; import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js'; import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js'; +import { BakeBufferedReactionsProcessorService } from './processors/BakeBufferedReactionsProcessorService.js'; import { CleanChartsProcessorService } from './processors/CleanChartsProcessorService.js'; import { CleanProcessorService } from './processors/CleanProcessorService.js'; import { CleanRemoteFilesProcessorService } from './processors/CleanRemoteFilesProcessorService.js'; @@ -51,6 +52,7 @@ import { RelationshipProcessorService } from './processors/RelationshipProcessor ResyncChartsProcessorService, CleanChartsProcessorService, CheckExpiredMutingsProcessorService, + BakeBufferedReactionsProcessorService, CleanProcessorService, DeleteDriveFilesProcessorService, ExportCustomEmojisProcessorService, diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index 7bd74f3210..e9e1c45224 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -39,6 +39,7 @@ import { TickChartsProcessorService } from './processors/TickChartsProcessorServ import { ResyncChartsProcessorService } from './processors/ResyncChartsProcessorService.js'; import { CleanChartsProcessorService } from './processors/CleanChartsProcessorService.js'; import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js'; +import { BakeBufferedReactionsProcessorService } from './processors/BakeBufferedReactionsProcessorService.js'; import { CleanProcessorService } from './processors/CleanProcessorService.js'; import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js'; import { QueueLoggerService } from './QueueLoggerService.js'; @@ -118,6 +119,7 @@ export class QueueProcessorService implements OnApplicationShutdown { private cleanChartsProcessorService: CleanChartsProcessorService, private aggregateRetentionProcessorService: AggregateRetentionProcessorService, private checkExpiredMutingsProcessorService: CheckExpiredMutingsProcessorService, + private bakeBufferedReactionsProcessorService: BakeBufferedReactionsProcessorService, private cleanProcessorService: CleanProcessorService, ) { this.logger = this.queueLoggerService.logger; @@ -147,6 +149,7 @@ export class QueueProcessorService implements OnApplicationShutdown { case 'cleanCharts': return this.cleanChartsProcessorService.process(); case 'aggregateRetention': return this.aggregateRetentionProcessorService.process(); case 'checkExpiredMutings': return this.checkExpiredMutingsProcessorService.process(); + case 'bakeBufferedReactions': return this.bakeBufferedReactionsProcessorService.process(); case 'clean': return this.cleanProcessorService.process(); default: throw new Error(`unrecognized job type ${job.name} for system`); } diff --git a/packages/backend/src/queue/processors/BakeBufferedReactionsProcessorService.ts b/packages/backend/src/queue/processors/BakeBufferedReactionsProcessorService.ts new file mode 100644 index 0000000000..cd56ba9837 --- /dev/null +++ b/packages/backend/src/queue/processors/BakeBufferedReactionsProcessorService.ts @@ -0,0 +1,40 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Inject, Injectable } from '@nestjs/common'; +import type Logger from '@/logger.js'; +import { bindThis } from '@/decorators.js'; +import { ReactionsBufferingService } from '@/core/ReactionsBufferingService.js'; +import { MetaService } from '@/core/MetaService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; + +@Injectable() +export class BakeBufferedReactionsProcessorService { + private logger: Logger; + + constructor( + private reactionsBufferingService: ReactionsBufferingService, + private metaService: MetaService, + private queueLoggerService: QueueLoggerService, + ) { + this.logger = this.queueLoggerService.logger.createSubLogger('bake-buffered-reactions'); + } + + @bindThis + public async process(): Promise<void> { + const meta = await this.metaService.fetch(); + if (!meta.enableReactionsBuffering) { + this.logger.info('Reactions buffering is disabled. Skipping...'); + return; + } + + this.logger.info('Baking buffered reactions...'); + + await this.reactionsBufferingService.bake(); + + this.logger.succ('All buffered reactions baked.'); + } +} diff --git a/packages/backend/src/server/HealthServerService.ts b/packages/backend/src/server/HealthServerService.ts index 2c3ed85925..5980609f02 100644 --- a/packages/backend/src/server/HealthServerService.ts +++ b/packages/backend/src/server/HealthServerService.ts @@ -27,6 +27,9 @@ export class HealthServerService { @Inject(DI.redisForTimelines) private redisForTimelines: Redis.Redis, + @Inject(DI.redisForReactions) + private redisForReactions: Redis.Redis, + @Inject(DI.db) private db: DataSource, @@ -43,6 +46,7 @@ export class HealthServerService { this.redisForPub.ping(), this.redisForSub.ping(), this.redisForTimelines.ping(), + this.redisForReactions.ping(), this.db.query('SELECT 1'), ...(this.meilisearch ? [this.meilisearch.health()] : []), ]).then(() => 200, () => 503)); diff --git a/packages/backend/src/server/api/endpoints/admin/meta.ts b/packages/backend/src/server/api/endpoints/admin/meta.ts index 2e7f73da73..29e8bfaf14 100644 --- a/packages/backend/src/server/api/endpoints/admin/meta.ts +++ b/packages/backend/src/server/api/endpoints/admin/meta.ts @@ -377,6 +377,10 @@ export const meta = { type: 'number', optional: false, nullable: false, }, + enableReactionsBuffering: { + type: 'boolean', + optional: false, nullable: false, + }, notesPerOneAd: { type: 'number', optional: false, nullable: false, @@ -617,6 +621,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint- perRemoteUserUserTimelineCacheMax: instance.perRemoteUserUserTimelineCacheMax, perUserHomeTimelineCacheMax: instance.perUserHomeTimelineCacheMax, perUserListTimelineCacheMax: instance.perUserListTimelineCacheMax, + enableReactionsBuffering: instance.enableReactionsBuffering, notesPerOneAd: instance.notesPerOneAd, summalyProxy: instance.urlPreviewSummaryProxyUrl, urlPreviewEnabled: instance.urlPreviewEnabled, diff --git a/packages/backend/src/server/api/endpoints/admin/update-meta.ts b/packages/backend/src/server/api/endpoints/admin/update-meta.ts index 5efdc9d8c4..865e73f274 100644 --- a/packages/backend/src/server/api/endpoints/admin/update-meta.ts +++ b/packages/backend/src/server/api/endpoints/admin/update-meta.ts @@ -142,6 +142,7 @@ export const paramDef = { perRemoteUserUserTimelineCacheMax: { type: 'integer' }, perUserHomeTimelineCacheMax: { type: 'integer' }, perUserListTimelineCacheMax: { type: 'integer' }, + enableReactionsBuffering: { type: 'boolean' }, notesPerOneAd: { type: 'integer' }, silencedHosts: { type: 'array', @@ -598,6 +599,10 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint- set.perUserListTimelineCacheMax = ps.perUserListTimelineCacheMax; } + if (ps.enableReactionsBuffering !== undefined) { + set.enableReactionsBuffering = ps.enableReactionsBuffering; + } + if (ps.notesPerOneAd !== undefined) { set.notesPerOneAd = ps.notesPerOneAd; } |