summaryrefslogtreecommitdiff
path: root/packages/backend/src/core/ReactionsBufferingService.ts
diff options
context:
space:
mode:
authorsyuilo <4439005+syuilo@users.noreply.github.com>2024-09-20 21:03:53 +0900
committerGitHub <noreply@github.com>2024-09-20 21:03:53 +0900
commit0b062f1407688906483e2427d87b708ce1a2dc47 (patch)
tree015241c81b40c93d8123371e5973b21da9cd9f9b /packages/backend/src/core/ReactionsBufferingService.ts
parentUpdate CHANGELOG.md (埋め込み機能のドキュメントへのリンク) (diff)
downloadsharkey-0b062f1407688906483e2427d87b708ce1a2dc47.tar.gz
sharkey-0b062f1407688906483e2427d87b708ce1a2dc47.tar.bz2
sharkey-0b062f1407688906483e2427d87b708ce1a2dc47.zip
Misskey® Reactions Buffering Technology™ (#14579)
* wip * wip * Update ReactionsBufferingService.ts * Update ReactionsBufferingService.ts * wip * wip * wip * Update ReactionsBufferingService.ts * wip * wip * wip * Update NoteEntityService.ts * wip * wip * wip * wip * Update CHANGELOG.md
Diffstat (limited to 'packages/backend/src/core/ReactionsBufferingService.ts')
-rw-r--r--packages/backend/src/core/ReactionsBufferingService.ts162
1 files changed, 162 insertions, 0 deletions
diff --git a/packages/backend/src/core/ReactionsBufferingService.ts b/packages/backend/src/core/ReactionsBufferingService.ts
new file mode 100644
index 0000000000..b1a197feeb
--- /dev/null
+++ b/packages/backend/src/core/ReactionsBufferingService.ts
@@ -0,0 +1,162 @@
+/*
+ * SPDX-FileCopyrightText: syuilo and misskey-project
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+import { Inject, Injectable } from '@nestjs/common';
+import * as Redis from 'ioredis';
+import { DI } from '@/di-symbols.js';
+import type { MiNote } from '@/models/Note.js';
+import { bindThis } from '@/decorators.js';
+import type { MiUser, NotesRepository } from '@/models/_.js';
+import type { Config } from '@/config.js';
+import { PER_NOTE_REACTION_USER_PAIR_CACHE_MAX } from '@/const.js';
+
+const REDIS_DELTA_PREFIX = 'reactionsBufferDeltas';
+const REDIS_PAIR_PREFIX = 'reactionsBufferPairs';
+
+@Injectable()
+export class ReactionsBufferingService {
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.redisForReactions)
+ private redisForReactions: Redis.Redis, // TODO: 専用のRedisインスタンスにする
+
+ @Inject(DI.notesRepository)
+ private notesRepository: NotesRepository,
+ ) {
+ }
+
+ @bindThis
+ public async create(noteId: MiNote['id'], userId: MiUser['id'], reaction: string, currentPairs: string[]): Promise<void> {
+ const pipeline = this.redisForReactions.pipeline();
+ pipeline.hincrby(`${REDIS_DELTA_PREFIX}:${noteId}`, reaction, 1);
+ for (let i = 0; i < currentPairs.length; i++) {
+ pipeline.zadd(`${REDIS_PAIR_PREFIX}:${noteId}`, i, currentPairs[i]);
+ }
+ pipeline.zadd(`${REDIS_PAIR_PREFIX}:${noteId}`, Date.now(), `${userId}/${reaction}`);
+ pipeline.zremrangebyrank(`${REDIS_PAIR_PREFIX}:${noteId}`, 0, -(PER_NOTE_REACTION_USER_PAIR_CACHE_MAX + 1));
+ await pipeline.exec();
+ }
+
+ @bindThis
+ public async delete(noteId: MiNote['id'], userId: MiUser['id'], reaction: string): Promise<void> {
+ const pipeline = this.redisForReactions.pipeline();
+ pipeline.hincrby(`${REDIS_DELTA_PREFIX}:${noteId}`, reaction, -1);
+ pipeline.zrem(`${REDIS_PAIR_PREFIX}:${noteId}`, `${userId}/${reaction}`);
+ // TODO: 「消した要素一覧」も持っておかないとcreateされた時に上書きされて復活する
+ await pipeline.exec();
+ }
+
+ @bindThis
+ public async get(noteId: MiNote['id']): Promise<{
+ deltas: Record<string, number>;
+ pairs: ([MiUser['id'], string])[];
+ }> {
+ const pipeline = this.redisForReactions.pipeline();
+ pipeline.hgetall(`${REDIS_DELTA_PREFIX}:${noteId}`);
+ pipeline.zrange(`${REDIS_PAIR_PREFIX}:${noteId}`, 0, -1);
+ const results = await pipeline.exec();
+
+ const resultDeltas = results![0][1] as Record<string, string>;
+ const resultPairs = results![1][1] as string[];
+
+ const deltas = {} as Record<string, number>;
+ for (const [name, count] of Object.entries(resultDeltas)) {
+ deltas[name] = parseInt(count);
+ }
+
+ const pairs = resultPairs.map(x => x.split('/') as [MiUser['id'], string]);
+
+ return {
+ deltas,
+ pairs,
+ };
+ }
+
+ @bindThis
+ public async getMany(noteIds: MiNote['id'][]): Promise<Map<MiNote['id'], {
+ deltas: Record<string, number>;
+ pairs: ([MiUser['id'], string])[];
+ }>> {
+ const map = new Map<MiNote['id'], {
+ deltas: Record<string, number>;
+ pairs: ([MiUser['id'], string])[];
+ }>();
+
+ const pipeline = this.redisForReactions.pipeline();
+ for (const noteId of noteIds) {
+ pipeline.hgetall(`${REDIS_DELTA_PREFIX}:${noteId}`);
+ pipeline.zrange(`${REDIS_PAIR_PREFIX}:${noteId}`, 0, -1);
+ }
+ const results = await pipeline.exec();
+
+ const opsForEachNotes = 2;
+ for (let i = 0; i < noteIds.length; i++) {
+ const noteId = noteIds[i];
+ const resultDeltas = results![i * opsForEachNotes][1] as Record<string, string>;
+ const resultPairs = results![i * opsForEachNotes + 1][1] as string[];
+
+ const deltas = {} as Record<string, number>;
+ for (const [name, count] of Object.entries(resultDeltas)) {
+ deltas[name] = parseInt(count);
+ }
+
+ const pairs = resultPairs.map(x => x.split('/') as [MiUser['id'], string]);
+
+ map.set(noteId, {
+ deltas,
+ pairs,
+ });
+ }
+
+ return map;
+ }
+
+ // TODO: scanは重い可能性があるので、別途 bufferedNoteIds を直接Redis上に持っておいてもいいかもしれない
+ @bindThis
+ public async bake(): Promise<void> {
+ const bufferedNoteIds = [];
+ let cursor = '0';
+ do {
+ // https://github.com/redis/ioredis#transparent-key-prefixing
+ const result = await this.redisForReactions.scan(
+ cursor,
+ 'MATCH',
+ `${this.config.redis.prefix}:${REDIS_DELTA_PREFIX}:*`,
+ 'COUNT',
+ '1000');
+
+ cursor = result[0];
+ bufferedNoteIds.push(...result[1].map(x => x.replace(`${this.config.redis.prefix}:${REDIS_DELTA_PREFIX}:`, '')));
+ } while (cursor !== '0');
+
+ const bufferedMap = await this.getMany(bufferedNoteIds);
+
+ // clear
+ const pipeline = this.redisForReactions.pipeline();
+ for (const noteId of bufferedNoteIds) {
+ pipeline.del(`${REDIS_DELTA_PREFIX}:${noteId}`);
+ pipeline.del(`${REDIS_PAIR_PREFIX}:${noteId}`);
+ }
+ await pipeline.exec();
+
+ // TODO: SQL一個にまとめたい
+ for (const [noteId, buffered] of bufferedMap) {
+ const sql = Object.entries(buffered.deltas)
+ .map(([reaction, count]) =>
+ `jsonb_set("reactions", '{${reaction}}', (COALESCE("reactions"->>'${reaction}', '0')::int + ${count})::text::jsonb)`)
+ .join(' || ');
+
+ this.notesRepository.createQueryBuilder().update()
+ .set({
+ reactions: () => sql,
+ reactionAndUserPairCache: buffered.pairs.map(x => x.join('/')),
+ })
+ .where('id = :id', { id: noteId })
+ .execute();
+ }
+ }
+}