diff options
| author | syuilo <4439005+syuilo@users.noreply.github.com> | 2024-09-20 21:03:53 +0900 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-09-20 21:03:53 +0900 |
| commit | 0b062f1407688906483e2427d87b708ce1a2dc47 (patch) | |
| tree | 015241c81b40c93d8123371e5973b21da9cd9f9b /packages/backend/src/core/ReactionsBufferingService.ts | |
| parent | Update CHANGELOG.md (埋め込み機能のドキュメントへのリンク) (diff) | |
| download | sharkey-0b062f1407688906483e2427d87b708ce1a2dc47.tar.gz sharkey-0b062f1407688906483e2427d87b708ce1a2dc47.tar.bz2 sharkey-0b062f1407688906483e2427d87b708ce1a2dc47.zip | |
Misskey® Reactions Buffering Technology™ (#14579)
* wip
* wip
* Update ReactionsBufferingService.ts
* Update ReactionsBufferingService.ts
* wip
* wip
* wip
* Update ReactionsBufferingService.ts
* wip
* wip
* wip
* Update NoteEntityService.ts
* wip
* wip
* wip
* wip
* Update CHANGELOG.md
Diffstat (limited to 'packages/backend/src/core/ReactionsBufferingService.ts')
| -rw-r--r-- | packages/backend/src/core/ReactionsBufferingService.ts | 162 |
1 files changed, 162 insertions, 0 deletions
diff --git a/packages/backend/src/core/ReactionsBufferingService.ts b/packages/backend/src/core/ReactionsBufferingService.ts new file mode 100644 index 0000000000..b1a197feeb --- /dev/null +++ b/packages/backend/src/core/ReactionsBufferingService.ts @@ -0,0 +1,162 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Inject, Injectable } from '@nestjs/common'; +import * as Redis from 'ioredis'; +import { DI } from '@/di-symbols.js'; +import type { MiNote } from '@/models/Note.js'; +import { bindThis } from '@/decorators.js'; +import type { MiUser, NotesRepository } from '@/models/_.js'; +import type { Config } from '@/config.js'; +import { PER_NOTE_REACTION_USER_PAIR_CACHE_MAX } from '@/const.js'; + +const REDIS_DELTA_PREFIX = 'reactionsBufferDeltas'; +const REDIS_PAIR_PREFIX = 'reactionsBufferPairs'; + +@Injectable() +export class ReactionsBufferingService { + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.redisForReactions) + private redisForReactions: Redis.Redis, // TODO: 専用のRedisインスタンスにする + + @Inject(DI.notesRepository) + private notesRepository: NotesRepository, + ) { + } + + @bindThis + public async create(noteId: MiNote['id'], userId: MiUser['id'], reaction: string, currentPairs: string[]): Promise<void> { + const pipeline = this.redisForReactions.pipeline(); + pipeline.hincrby(`${REDIS_DELTA_PREFIX}:${noteId}`, reaction, 1); + for (let i = 0; i < currentPairs.length; i++) { + pipeline.zadd(`${REDIS_PAIR_PREFIX}:${noteId}`, i, currentPairs[i]); + } + pipeline.zadd(`${REDIS_PAIR_PREFIX}:${noteId}`, Date.now(), `${userId}/${reaction}`); + pipeline.zremrangebyrank(`${REDIS_PAIR_PREFIX}:${noteId}`, 0, -(PER_NOTE_REACTION_USER_PAIR_CACHE_MAX + 1)); + await pipeline.exec(); + } + + @bindThis + public async delete(noteId: MiNote['id'], userId: MiUser['id'], reaction: string): Promise<void> { + const pipeline = this.redisForReactions.pipeline(); + pipeline.hincrby(`${REDIS_DELTA_PREFIX}:${noteId}`, reaction, -1); + pipeline.zrem(`${REDIS_PAIR_PREFIX}:${noteId}`, `${userId}/${reaction}`); + // TODO: 「消した要素一覧」も持っておかないとcreateされた時に上書きされて復活する + await pipeline.exec(); + } + + @bindThis + public async get(noteId: MiNote['id']): Promise<{ + deltas: Record<string, number>; + pairs: ([MiUser['id'], string])[]; + }> { + const pipeline = this.redisForReactions.pipeline(); + pipeline.hgetall(`${REDIS_DELTA_PREFIX}:${noteId}`); + pipeline.zrange(`${REDIS_PAIR_PREFIX}:${noteId}`, 0, -1); + const results = await pipeline.exec(); + + const resultDeltas = results![0][1] as Record<string, string>; + const resultPairs = results![1][1] as string[]; + + const deltas = {} as Record<string, number>; + for (const [name, count] of Object.entries(resultDeltas)) { + deltas[name] = parseInt(count); + } + + const pairs = resultPairs.map(x => x.split('/') as [MiUser['id'], string]); + + return { + deltas, + pairs, + }; + } + + @bindThis + public async getMany(noteIds: MiNote['id'][]): Promise<Map<MiNote['id'], { + deltas: Record<string, number>; + pairs: ([MiUser['id'], string])[]; + }>> { + const map = new Map<MiNote['id'], { + deltas: Record<string, number>; + pairs: ([MiUser['id'], string])[]; + }>(); + + const pipeline = this.redisForReactions.pipeline(); + for (const noteId of noteIds) { + pipeline.hgetall(`${REDIS_DELTA_PREFIX}:${noteId}`); + pipeline.zrange(`${REDIS_PAIR_PREFIX}:${noteId}`, 0, -1); + } + const results = await pipeline.exec(); + + const opsForEachNotes = 2; + for (let i = 0; i < noteIds.length; i++) { + const noteId = noteIds[i]; + const resultDeltas = results![i * opsForEachNotes][1] as Record<string, string>; + const resultPairs = results![i * opsForEachNotes + 1][1] as string[]; + + const deltas = {} as Record<string, number>; + for (const [name, count] of Object.entries(resultDeltas)) { + deltas[name] = parseInt(count); + } + + const pairs = resultPairs.map(x => x.split('/') as [MiUser['id'], string]); + + map.set(noteId, { + deltas, + pairs, + }); + } + + return map; + } + + // TODO: scanは重い可能性があるので、別途 bufferedNoteIds を直接Redis上に持っておいてもいいかもしれない + @bindThis + public async bake(): Promise<void> { + const bufferedNoteIds = []; + let cursor = '0'; + do { + // https://github.com/redis/ioredis#transparent-key-prefixing + const result = await this.redisForReactions.scan( + cursor, + 'MATCH', + `${this.config.redis.prefix}:${REDIS_DELTA_PREFIX}:*`, + 'COUNT', + '1000'); + + cursor = result[0]; + bufferedNoteIds.push(...result[1].map(x => x.replace(`${this.config.redis.prefix}:${REDIS_DELTA_PREFIX}:`, ''))); + } while (cursor !== '0'); + + const bufferedMap = await this.getMany(bufferedNoteIds); + + // clear + const pipeline = this.redisForReactions.pipeline(); + for (const noteId of bufferedNoteIds) { + pipeline.del(`${REDIS_DELTA_PREFIX}:${noteId}`); + pipeline.del(`${REDIS_PAIR_PREFIX}:${noteId}`); + } + await pipeline.exec(); + + // TODO: SQL一個にまとめたい + for (const [noteId, buffered] of bufferedMap) { + const sql = Object.entries(buffered.deltas) + .map(([reaction, count]) => + `jsonb_set("reactions", '{${reaction}}', (COALESCE("reactions"->>'${reaction}', '0')::int + ${count})::text::jsonb)`) + .join(' || '); + + this.notesRepository.createQueryBuilder().update() + .set({ + reactions: () => sql, + reactionAndUserPairCache: buffered.pairs.map(x => x.join('/')), + }) + .where('id = :id', { id: noteId }) + .execute(); + } + } +} |