summaryrefslogtreecommitdiff
path: root/packages/backend/src/core/FanoutTimelineEndpointService.ts
diff options
context:
space:
mode:
authoranatawa12 <anatawa12@icloud.com>2023-12-02 18:25:07 +0900
committerGitHub <noreply@github.com>2023-12-02 18:25:07 +0900
commita631b976c99a4b3977079a4bafc8a7b7de0bf269 (patch)
tree2de46905d201657061e6deaf3db1fdd2d44b0a3c /packages/backend/src/core/FanoutTimelineEndpointService.ts
parentfix(frontend): MFM ruby nyaize (#12362) (diff)
downloadsharkey-a631b976c99a4b3977079a4bafc8a7b7de0bf269.tar.gz
sharkey-a631b976c99a4b3977079a4bafc8a7b7de0bf269.tar.bz2
sharkey-a631b976c99a4b3977079a4bafc8a7b7de0bf269.zip
Refine fanout timeline (#12507)
* chore(endpoints/hybrid-timeline): don't pack inside getFromDb * chore(endpoints/hybrid-timeline): Redisから取得する部分のうちSTLに依存しなそうなところを別のServiceに切り出し * chore(endpoints/local-timeline): FanoutTimelineEndpointServiceで再実装 * chore(endpoints/channels/timeline): FanoutTimelineEndpointServiceで再実装 * chore(endpoints/timeline): FanoutTimelineEndpointServiceで再実装 * chore(endpoints/user-list-timeline): FanoutTimelineEndpointServiceで再実装 * chore(endpoints/users/notes): FanoutTimelineEndpointServiceで再実装 * chore: add useDbFallback to FanoutTimelineEndpointService.timeline and always true for channel / user note list * style: fix lint error * chore: split logic to multiple functions * chore: implement redis fallback * chore: 成功率を上げる * fix: db fallback not working * feat: allowPartial * chore(frontend): set allowPartial * chore(backend): remove fallbackIfEmpty HTL will never be purged so it's no longer required * fix: missing allowPartial in channel timeline * fix: type of timelineConfig in hybrid-timeline --------- Co-authored-by: syuilo <Syuilotan@yahoo.co.jp>
Diffstat (limited to 'packages/backend/src/core/FanoutTimelineEndpointService.ts')
-rw-r--r--packages/backend/src/core/FanoutTimelineEndpointService.ts123
1 files changed, 123 insertions, 0 deletions
diff --git a/packages/backend/src/core/FanoutTimelineEndpointService.ts b/packages/backend/src/core/FanoutTimelineEndpointService.ts
new file mode 100644
index 0000000000..157fcbe877
--- /dev/null
+++ b/packages/backend/src/core/FanoutTimelineEndpointService.ts
@@ -0,0 +1,123 @@
+/*
+ * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+import { Inject, Injectable } from '@nestjs/common';
+import { DI } from '@/di-symbols.js';
+import { bindThis } from '@/decorators.js';
+import type { MiUser } from '@/models/User.js';
+import type { MiNote } from '@/models/Note.js';
+import { Packed } from '@/misc/json-schema.js';
+import type { NotesRepository } from '@/models/_.js';
+import { NoteEntityService } from '@/core/entities/NoteEntityService.js';
+import { FanoutTimelineService } from '@/core/FanoutTimelineService.js';
+
+@Injectable()
+export class FanoutTimelineEndpointService {
+ constructor(
+ @Inject(DI.notesRepository)
+ private notesRepository: NotesRepository,
+
+ private noteEntityService: NoteEntityService,
+ private fanoutTimelineService: FanoutTimelineService,
+ ) {
+ }
+
+ @bindThis
+ async timeline(ps: {
+ untilId: string | null,
+ sinceId: string | null,
+ limit: number,
+ allowPartial: boolean,
+ me?: { id: MiUser['id'] } | undefined | null,
+ useDbFallback: boolean,
+ redisTimelines: string[],
+ noteFilter: (note: MiNote) => boolean,
+ dbFallback: (untilId: string | null, sinceId: string | null, limit: number) => Promise<MiNote[]>,
+ }): Promise<Packed<'Note'>[]> {
+ return await this.noteEntityService.packMany(await this.getMiNotes(ps), ps.me);
+ }
+
+ @bindThis
+ private async getMiNotes(ps: {
+ untilId: string | null,
+ sinceId: string | null,
+ limit: number,
+ allowPartial: boolean,
+ me?: { id: MiUser['id'] } | undefined | null,
+ useDbFallback: boolean,
+ redisTimelines: string[],
+ noteFilter: (note: MiNote) => boolean,
+ dbFallback: (untilId: string | null, sinceId: string | null, limit: number) => Promise<MiNote[]>,
+ }): Promise<MiNote[]> {
+ let noteIds: string[];
+ let shouldFallbackToDb = false;
+
+ // 呼び出し元と以下の処理をシンプルにするためにdbFallbackを置き換える
+ if (!ps.useDbFallback) ps.dbFallback = () => Promise.resolve([]);
+
+ const redisResult = await this.fanoutTimelineService.getMulti(ps.redisTimelines, ps.untilId, ps.sinceId);
+
+ const redisResultIds = Array.from(new Set(redisResult.flat(1)));
+
+ redisResultIds.sort((a, b) => a > b ? -1 : 1);
+ noteIds = redisResultIds.slice(0, ps.limit);
+
+ shouldFallbackToDb = shouldFallbackToDb || (noteIds.length === 0);
+
+ if (!shouldFallbackToDb) {
+ const redisTimeline: MiNote[] = [];
+ let readFromRedis = 0;
+ let lastSuccessfulRate = 1; // rateをキャッシュする?
+ let trialCount = 1;
+
+ while ((redisResultIds.length - readFromRedis) !== 0) {
+ const remainingToRead = ps.limit - redisTimeline.length;
+
+ // DBからの取り直しを減らす初回と同じ割合以上で成功すると仮定するが、クエリの長さを考えて三倍まで
+ const countToGet = remainingToRead * Math.ceil(Math.min(1.1 / lastSuccessfulRate, 3));
+ noteIds = redisResultIds.slice(readFromRedis, readFromRedis + countToGet);
+
+ readFromRedis += noteIds.length;
+
+ const gotFromDb = await this.getAndFilterFromDb(noteIds, ps.noteFilter);
+ redisTimeline.push(...gotFromDb);
+ lastSuccessfulRate = gotFromDb.length / noteIds.length;
+
+ console.log(`fanoutTimelineTrial#${trialCount++}: req: ${ps.limit}, tried: ${noteIds.length}, got: ${gotFromDb.length}, rate: ${lastSuccessfulRate}, total: ${redisTimeline.length}, fromRedis: ${redisResultIds.length}`);
+
+ if (ps.allowPartial ? redisTimeline.length !== 0 : redisTimeline.length >= ps.limit) {
+ // 十分Redisからとれた
+ return redisTimeline.slice(0, ps.limit);
+ }
+ }
+
+ // まだ足りない分はDBにフォールバック
+ const remainingToRead = ps.limit - redisTimeline.length;
+ const gotFromDb = await ps.dbFallback(noteIds[noteIds.length - 1], ps.sinceId, remainingToRead);
+ redisTimeline.push(...gotFromDb);
+ console.log(`fanoutTimelineTrial#db: req: ${ps.limit}, tried: ${remainingToRead}, got: ${gotFromDb.length}, since: ${noteIds[noteIds.length - 1]}, until: ${ps.untilId}, total: ${redisTimeline.length}`);
+ return redisTimeline;
+ }
+
+ return await ps.dbFallback(ps.untilId, ps.sinceId, ps.limit);
+ }
+
+ private async getAndFilterFromDb(noteIds: string[], noteFilter: (note: MiNote) => boolean): Promise<MiNote[]> {
+ const query = this.notesRepository.createQueryBuilder('note')
+ .where('note.id IN (:...noteIds)', { noteIds: noteIds })
+ .innerJoinAndSelect('note.user', 'user')
+ .leftJoinAndSelect('note.reply', 'reply')
+ .leftJoinAndSelect('note.renote', 'renote')
+ .leftJoinAndSelect('reply.user', 'replyUser')
+ .leftJoinAndSelect('renote.user', 'renoteUser')
+ .leftJoinAndSelect('note.channel', 'channel');
+
+ const notes = (await query.getMany()).filter(noteFilter);
+
+ notes.sort((a, b) => a.id > b.id ? -1 : 1);
+
+ return notes;
+ }
+}