From 85e3e496880316c4b55199ea651c210a25420a6e Mon Sep 17 00:00:00 2001 From: tamaina Date: Fri, 8 Aug 2025 21:31:31 +0900 Subject: fix(backend): Fix and create unit test of CleanRemoteNotesProcessorService (#16368) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * wip * test(backend): CleanRemoteNotesProcessorService (basic) * test(backend): CleanRemoteNotesProcessorService (advanced) * :v: * a * split initiator query * no order by * ??? * old → older --- .../processors/CleanRemoteNotesProcessorService.ts | 169 ++++++++++++--------- 1 file changed, 94 insertions(+), 75 deletions(-) (limited to 'packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts') diff --git a/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts b/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts index 6c64d6aa39..da3bb804c2 100644 --- a/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts +++ b/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts @@ -5,9 +5,8 @@ import { setTimeout } from 'node:timers/promises'; import { Inject, Injectable } from '@nestjs/common'; -import { And, Brackets, In, IsNull, LessThan, MoreThan, Not } from 'typeorm'; import { DI } from '@/di-symbols.js'; -import type { MiMeta, MiNote, NoteFavoritesRepository, NotesRepository, UserNotePiningsRepository } from '@/models/_.js'; +import type { MiMeta, MiNote, NotesRepository } from '@/models/_.js'; import type Logger from '@/logger.js'; import { bindThis } from '@/decorators.js'; import { IdService } from '@/core/IdService.js'; @@ -25,12 +24,6 @@ export class CleanRemoteNotesProcessorService { @Inject(DI.notesRepository) private notesRepository: NotesRepository, - @Inject(DI.noteFavoritesRepository) - private noteFavoritesRepository: NoteFavoritesRepository, - - @Inject(DI.userNotePiningsRepository) - private userNotePiningsRepository: UserNotePiningsRepository, - private idService: IdService, private queueLoggerService: QueueLoggerService, ) { @@ -61,6 +54,69 @@ export class CleanRemoteNotesProcessorService { const MAX_NOTE_COUNT_PER_QUERY = 50; + //#retion queries + // We use string literals instead of query builder for several reasons: + // - for removeCondition, we need to use it in having clause, which is not supported by Brackets. + // - for recursive part, we need to preserve the order of columns, but typeorm query builder does not guarantee the order of columns in the result query + + // The condition for removing the notes. + // The note must be: + // - old enough (older than the newestLimit) + // - a remote note (userHost is not null). + // - not have clipped + // - not have pinned on the user profile + // - not has been favorite by any user + const removeCondition = 'note.id < :newestLimit' + + ' AND note."clippedCount" = 0' + + ' AND note."userHost" IS NOT NULL' + // using both userId and noteId instead of just noteId to use index on user_note_pining table. + // This is safe because notes are only pinned by the user who created them. + + ' AND NOT EXISTS(SELECT 1 FROM "user_note_pining" WHERE "noteId" = note."id" AND "userId" = note."userId")' + // We cannot use userId trick because users can favorite notes from other users. + + ' AND NOT EXISTS(SELECT 1 FROM "note_favorite" WHERE "noteId" = note."id")' + ; + + // The initiator query contains the oldest ${MAX_NOTE_COUNT_PER_QUERY} remote non-clipped notes + const initiatorQuery = this.notesRepository.createQueryBuilder('note') + .select('note.id', 'id') + .where(removeCondition) + .andWhere('note.id > :cursor') + .orderBy('note.id', 'ASC') + .limit(MAX_NOTE_COUNT_PER_QUERY); + + // The union query queries the related notes and replies related to the initiator query + const unionQuery = ` + SELECT "note"."id", "note"."replyId", "note"."renoteId", rn."initiatorId" + FROM "note" "note" + INNER JOIN "related_notes" "rn" + ON "note"."replyId" = rn.id + OR "note"."renoteId" = rn.id + OR "note"."id" = rn."replyId" + OR "note"."id" = rn."renoteId" + `; + + const selectRelatedNotesFromInitiatorIdsQuery = ` + SELECT "note"."id" AS "id", "note"."replyId" AS "replyId", "note"."renoteId" AS "renoteId", "note"."id" AS "initiatorId" + FROM "note" "note" WHERE "note"."id" IN (:...initiatorIds) + `; + + const recursiveQuery = `(${selectRelatedNotesFromInitiatorIdsQuery}) UNION (${unionQuery})`; + + const removableInitiatorNotesQuery = this.notesRepository.createQueryBuilder('note') + .select('rn."initiatorId"') + .innerJoin('related_notes', 'rn', 'note.id = rn.id') + .groupBy('rn."initiatorId"') + .having(`bool_and(${removeCondition})`); + + const notesQuery = this.notesRepository.createQueryBuilder('note') + .addCommonTableExpression(recursiveQuery, 'related_notes', { recursive: true }) + .select('note.id', 'id') + .addSelect('rn."initiatorId"') + .innerJoin('related_notes', 'rn', 'note.id = rn.id') + .where(`rn."initiatorId" IN (${removableInitiatorNotesQuery.getQuery()})`) + .distinctOn(['note.id']); + //#endregion + const stats = { deletedCount: 0, oldest: null as number | null, @@ -74,77 +130,45 @@ export class CleanRemoteNotesProcessorService { let cursor = '0'; // oldest note ID to start from while (true) { + //#region check time const batchBeginAt = Date.now(); - // We use string literals instead of query builder for several reasons: - // - for removeCondition, we need to use it in having clause, which is not supported by Brackets. - // - for recursive part, we need to preserve the order of columns, but typeorm query builder does not guarantee the order of columns in the result query - - // The condition for removing the notes. - // The note must be: - // - old enough (older than the newestLimit) - // - a remote note (userHost is not null). - // - not have clipped - // - not have pinned on the user profile - // - not has been favorite by any user - const removeCondition = 'note.id < :newestLimit' - + ' AND note."clippedCount" = 0' - + ' AND note."userHost" IS NOT NULL' - // using both userId and noteId instead of just noteId to use index on user_note_pining table. - // This is safe because notes are only pinned by the user who created them. - + ' AND NOT EXISTS(SELECT 1 FROM "user_note_pining" WHERE "noteId" = note."id" AND "userId" = note."userId")' - // We cannot use userId trick because users can favorite notes from other users. - + ' AND NOT EXISTS(SELECT 1 FROM "note_favorite" WHERE "noteId" = note."id")' - ; + const elapsed = batchBeginAt - startAt; - // The initiator query contains the oldest ${MAX_NOTE_COUNT_PER_QUERY} remote non-clipped notes - const initiatorQuery = ` - SELECT "note"."id" AS "id", "note"."replyId" AS "replyId", "note"."renoteId" AS "renoteId", "note"."id" AS "initiatorId" - FROM "note" "note" WHERE ${removeCondition} AND "note"."id" > :cursor ORDER BY "note"."id" ASC LIMIT ${MAX_NOTE_COUNT_PER_QUERY}`; + if (elapsed >= maxDuration) { + this.logger.info(`Reached maximum duration of ${maxDuration}ms, stopping...`); + job.log('Reached maximum duration, stopping cleaning.'); + job.updateProgress(100); + break; + } - // The union query queries the related notes and replies related to the initiator query - const unionQuery = ` - SELECT "note"."id", "note"."replyId", "note"."renoteId", rn."initiatorId" - FROM "note" "note" - INNER JOIN "related_notes" "rn" - ON "note"."replyId" = rn.id - OR "note"."renoteId" = rn.id - OR "note"."id" = rn."replyId" - OR "note"."id" = rn."renoteId" - `; - const recursiveQuery = `(${initiatorQuery}) UNION (${unionQuery})`; + job.updateProgress((elapsed / maxDuration) * 100); + //#endregion - const removableInitiatorNotesQuery = this.notesRepository.createQueryBuilder('note') - .select('rn."initiatorId"') - .innerJoin('related_notes', 'rn', 'note.id = rn.id') - .groupBy('rn."initiatorId"') - .having(`bool_and(${removeCondition})`); + // First, we fetch the initiator notes that are older than the newestLimit. + const initiatorNotes: { id: MiNote['id'] }[] = await initiatorQuery.setParameters({ cursor, newestLimit }).getRawMany(); - const notesQuery = this.notesRepository.createQueryBuilder('note') - .addCommonTableExpression(recursiveQuery, 'related_notes', { recursive: true }) - .select('note.id', 'id') - .addSelect('rn."initiatorId"') - .innerJoin('related_notes', 'rn', 'note.id = rn.id') - .where(`rn."initiatorId" IN (${ removableInitiatorNotesQuery.getQuery() })`) - .setParameters({ cursor, newestLimit }); + // update the cursor to the newest initiatorId found in the fetched notes. + const newCursor = initiatorNotes.reduce((max, note) => note.id > max ? note.id : max, cursor); - const notes: { id: MiNote['id'], initiatorId: MiNote['id'] }[] = await notesQuery.getRawMany(); + if (initiatorNotes.length === 0 || cursor === newCursor || newCursor >= newestLimit) { + // If no notes were found or the cursor did not change, we can stop. + job.log('No more notes to clean. (no initiator notes found or cursor did not change.)'); + break; + } - const fetchedCount = notes.length; + const notes: { id: MiNote['id'], initiatorId: MiNote['id'] }[] = await notesQuery.setParameters({ + initiatorIds: initiatorNotes.map(note => note.id), + newestLimit, + }).getRawMany(); - // update the cursor to the newest initiatorId found in the fetched notes. - // We don't use 'id' since the note can be newer than the initiator note. - for (const note of notes) { - if (cursor < note.initiatorId) { - cursor = note.initiatorId; - } - } + cursor = newCursor; if (notes.length > 0) { await this.notesRepository.delete(notes.map(note => note.id)); - for (const note of notes) { - const t = this.idService.parse(note.id).date.getTime(); + for (const { id } of notes) { + const t = this.idService.parse(id).date.getTime(); if (stats.oldest === null || t < stats.oldest) { stats.oldest = t; } @@ -156,19 +180,14 @@ export class CleanRemoteNotesProcessorService { stats.deletedCount += notes.length; } - job.log(`Deleted ${notes.length} of ${fetchedCount}; ${Date.now() - batchBeginAt}ms`); - - const elapsed = Date.now() - startAt; + job.log(`Deleted ${notes.length} from ${initiatorNotes.length} initiators; ${Date.now() - batchBeginAt}ms`); - if (elapsed >= maxDuration) { - this.logger.info(`Reached maximum duration of ${maxDuration}ms, stopping...`); - job.log('Reached maximum duration, stopping cleaning.'); - job.updateProgress(100); + if (initiatorNotes.length < MAX_NOTE_COUNT_PER_QUERY) { + // If we fetched less than the maximum, it means there are no more notes to process. + job.log(`No more notes to clean. (fewer than MAX_NOTE_COUNT_PER_QUERY =${MAX_NOTE_COUNT_PER_QUERY}.)`); break; } - job.updateProgress((elapsed / maxDuration) * 100); - await setTimeout(1000 * 5); // Wait a moment to avoid overwhelming the db } -- cgit v1.2.3-freya