summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts')
-rw-r--r--packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts169
1 files changed, 94 insertions, 75 deletions
diff --git a/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts b/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts
index 6c64d6aa39..da3bb804c2 100644
--- a/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts
+++ b/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts
@@ -5,9 +5,8 @@
import { setTimeout } from 'node:timers/promises';
import { Inject, Injectable } from '@nestjs/common';
-import { And, Brackets, In, IsNull, LessThan, MoreThan, Not } from 'typeorm';
import { DI } from '@/di-symbols.js';
-import type { MiMeta, MiNote, NoteFavoritesRepository, NotesRepository, UserNotePiningsRepository } from '@/models/_.js';
+import type { MiMeta, MiNote, NotesRepository } from '@/models/_.js';
import type Logger from '@/logger.js';
import { bindThis } from '@/decorators.js';
import { IdService } from '@/core/IdService.js';
@@ -25,12 +24,6 @@ export class CleanRemoteNotesProcessorService {
@Inject(DI.notesRepository)
private notesRepository: NotesRepository,
- @Inject(DI.noteFavoritesRepository)
- private noteFavoritesRepository: NoteFavoritesRepository,
-
- @Inject(DI.userNotePiningsRepository)
- private userNotePiningsRepository: UserNotePiningsRepository,
-
private idService: IdService,
private queueLoggerService: QueueLoggerService,
) {
@@ -61,6 +54,69 @@ export class CleanRemoteNotesProcessorService {
const MAX_NOTE_COUNT_PER_QUERY = 50;
+ //#retion queries
+ // We use string literals instead of query builder for several reasons:
+ // - for removeCondition, we need to use it in having clause, which is not supported by Brackets.
+ // - for recursive part, we need to preserve the order of columns, but typeorm query builder does not guarantee the order of columns in the result query
+
+ // The condition for removing the notes.
+ // The note must be:
+ // - old enough (older than the newestLimit)
+ // - a remote note (userHost is not null).
+ // - not have clipped
+ // - not have pinned on the user profile
+ // - not has been favorite by any user
+ const removeCondition = 'note.id < :newestLimit'
+ + ' AND note."clippedCount" = 0'
+ + ' AND note."userHost" IS NOT NULL'
+ // using both userId and noteId instead of just noteId to use index on user_note_pining table.
+ // This is safe because notes are only pinned by the user who created them.
+ + ' AND NOT EXISTS(SELECT 1 FROM "user_note_pining" WHERE "noteId" = note."id" AND "userId" = note."userId")'
+ // We cannot use userId trick because users can favorite notes from other users.
+ + ' AND NOT EXISTS(SELECT 1 FROM "note_favorite" WHERE "noteId" = note."id")'
+ ;
+
+ // The initiator query contains the oldest ${MAX_NOTE_COUNT_PER_QUERY} remote non-clipped notes
+ const initiatorQuery = this.notesRepository.createQueryBuilder('note')
+ .select('note.id', 'id')
+ .where(removeCondition)
+ .andWhere('note.id > :cursor')
+ .orderBy('note.id', 'ASC')
+ .limit(MAX_NOTE_COUNT_PER_QUERY);
+
+ // The union query queries the related notes and replies related to the initiator query
+ const unionQuery = `
+ SELECT "note"."id", "note"."replyId", "note"."renoteId", rn."initiatorId"
+ FROM "note" "note"
+ INNER JOIN "related_notes" "rn"
+ ON "note"."replyId" = rn.id
+ OR "note"."renoteId" = rn.id
+ OR "note"."id" = rn."replyId"
+ OR "note"."id" = rn."renoteId"
+ `;
+
+ const selectRelatedNotesFromInitiatorIdsQuery = `
+ SELECT "note"."id" AS "id", "note"."replyId" AS "replyId", "note"."renoteId" AS "renoteId", "note"."id" AS "initiatorId"
+ FROM "note" "note" WHERE "note"."id" IN (:...initiatorIds)
+ `;
+
+ const recursiveQuery = `(${selectRelatedNotesFromInitiatorIdsQuery}) UNION (${unionQuery})`;
+
+ const removableInitiatorNotesQuery = this.notesRepository.createQueryBuilder('note')
+ .select('rn."initiatorId"')
+ .innerJoin('related_notes', 'rn', 'note.id = rn.id')
+ .groupBy('rn."initiatorId"')
+ .having(`bool_and(${removeCondition})`);
+
+ const notesQuery = this.notesRepository.createQueryBuilder('note')
+ .addCommonTableExpression(recursiveQuery, 'related_notes', { recursive: true })
+ .select('note.id', 'id')
+ .addSelect('rn."initiatorId"')
+ .innerJoin('related_notes', 'rn', 'note.id = rn.id')
+ .where(`rn."initiatorId" IN (${removableInitiatorNotesQuery.getQuery()})`)
+ .distinctOn(['note.id']);
+ //#endregion
+
const stats = {
deletedCount: 0,
oldest: null as number | null,
@@ -74,77 +130,45 @@ export class CleanRemoteNotesProcessorService {
let cursor = '0'; // oldest note ID to start from
while (true) {
+ //#region check time
const batchBeginAt = Date.now();
- // We use string literals instead of query builder for several reasons:
- // - for removeCondition, we need to use it in having clause, which is not supported by Brackets.
- // - for recursive part, we need to preserve the order of columns, but typeorm query builder does not guarantee the order of columns in the result query
-
- // The condition for removing the notes.
- // The note must be:
- // - old enough (older than the newestLimit)
- // - a remote note (userHost is not null).
- // - not have clipped
- // - not have pinned on the user profile
- // - not has been favorite by any user
- const removeCondition = 'note.id < :newestLimit'
- + ' AND note."clippedCount" = 0'
- + ' AND note."userHost" IS NOT NULL'
- // using both userId and noteId instead of just noteId to use index on user_note_pining table.
- // This is safe because notes are only pinned by the user who created them.
- + ' AND NOT EXISTS(SELECT 1 FROM "user_note_pining" WHERE "noteId" = note."id" AND "userId" = note."userId")'
- // We cannot use userId trick because users can favorite notes from other users.
- + ' AND NOT EXISTS(SELECT 1 FROM "note_favorite" WHERE "noteId" = note."id")'
- ;
+ const elapsed = batchBeginAt - startAt;
- // The initiator query contains the oldest ${MAX_NOTE_COUNT_PER_QUERY} remote non-clipped notes
- const initiatorQuery = `
- SELECT "note"."id" AS "id", "note"."replyId" AS "replyId", "note"."renoteId" AS "renoteId", "note"."id" AS "initiatorId"
- FROM "note" "note" WHERE ${removeCondition} AND "note"."id" > :cursor ORDER BY "note"."id" ASC LIMIT ${MAX_NOTE_COUNT_PER_QUERY}`;
+ if (elapsed >= maxDuration) {
+ this.logger.info(`Reached maximum duration of ${maxDuration}ms, stopping...`);
+ job.log('Reached maximum duration, stopping cleaning.');
+ job.updateProgress(100);
+ break;
+ }
- // The union query queries the related notes and replies related to the initiator query
- const unionQuery = `
- SELECT "note"."id", "note"."replyId", "note"."renoteId", rn."initiatorId"
- FROM "note" "note"
- INNER JOIN "related_notes" "rn"
- ON "note"."replyId" = rn.id
- OR "note"."renoteId" = rn.id
- OR "note"."id" = rn."replyId"
- OR "note"."id" = rn."renoteId"
- `;
- const recursiveQuery = `(${initiatorQuery}) UNION (${unionQuery})`;
+ job.updateProgress((elapsed / maxDuration) * 100);
+ //#endregion
- const removableInitiatorNotesQuery = this.notesRepository.createQueryBuilder('note')
- .select('rn."initiatorId"')
- .innerJoin('related_notes', 'rn', 'note.id = rn.id')
- .groupBy('rn."initiatorId"')
- .having(`bool_and(${removeCondition})`);
+ // First, we fetch the initiator notes that are older than the newestLimit.
+ const initiatorNotes: { id: MiNote['id'] }[] = await initiatorQuery.setParameters({ cursor, newestLimit }).getRawMany();
- const notesQuery = this.notesRepository.createQueryBuilder('note')
- .addCommonTableExpression(recursiveQuery, 'related_notes', { recursive: true })
- .select('note.id', 'id')
- .addSelect('rn."initiatorId"')
- .innerJoin('related_notes', 'rn', 'note.id = rn.id')
- .where(`rn."initiatorId" IN (${ removableInitiatorNotesQuery.getQuery() })`)
- .setParameters({ cursor, newestLimit });
+ // update the cursor to the newest initiatorId found in the fetched notes.
+ const newCursor = initiatorNotes.reduce((max, note) => note.id > max ? note.id : max, cursor);
- const notes: { id: MiNote['id'], initiatorId: MiNote['id'] }[] = await notesQuery.getRawMany();
+ if (initiatorNotes.length === 0 || cursor === newCursor || newCursor >= newestLimit) {
+ // If no notes were found or the cursor did not change, we can stop.
+ job.log('No more notes to clean. (no initiator notes found or cursor did not change.)');
+ break;
+ }
- const fetchedCount = notes.length;
+ const notes: { id: MiNote['id'], initiatorId: MiNote['id'] }[] = await notesQuery.setParameters({
+ initiatorIds: initiatorNotes.map(note => note.id),
+ newestLimit,
+ }).getRawMany();
- // update the cursor to the newest initiatorId found in the fetched notes.
- // We don't use 'id' since the note can be newer than the initiator note.
- for (const note of notes) {
- if (cursor < note.initiatorId) {
- cursor = note.initiatorId;
- }
- }
+ cursor = newCursor;
if (notes.length > 0) {
await this.notesRepository.delete(notes.map(note => note.id));
- for (const note of notes) {
- const t = this.idService.parse(note.id).date.getTime();
+ for (const { id } of notes) {
+ const t = this.idService.parse(id).date.getTime();
if (stats.oldest === null || t < stats.oldest) {
stats.oldest = t;
}
@@ -156,19 +180,14 @@ export class CleanRemoteNotesProcessorService {
stats.deletedCount += notes.length;
}
- job.log(`Deleted ${notes.length} of ${fetchedCount}; ${Date.now() - batchBeginAt}ms`);
-
- const elapsed = Date.now() - startAt;
+ job.log(`Deleted ${notes.length} from ${initiatorNotes.length} initiators; ${Date.now() - batchBeginAt}ms`);
- if (elapsed >= maxDuration) {
- this.logger.info(`Reached maximum duration of ${maxDuration}ms, stopping...`);
- job.log('Reached maximum duration, stopping cleaning.');
- job.updateProgress(100);
+ if (initiatorNotes.length < MAX_NOTE_COUNT_PER_QUERY) {
+ // If we fetched less than the maximum, it means there are no more notes to process.
+ job.log(`No more notes to clean. (fewer than MAX_NOTE_COUNT_PER_QUERY =${MAX_NOTE_COUNT_PER_QUERY}.)`);
break;
}
- job.updateProgress((elapsed / maxDuration) * 100);
-
await setTimeout(1000 * 5); // Wait a moment to avoid overwhelming the db
}