summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/processors
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/queue/processors')
-rw-r--r--packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts264
1 files changed, 174 insertions, 90 deletions
diff --git a/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts b/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts
index da3bb804c2..77a9dc5557 100644
--- a/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts
+++ b/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts
@@ -5,6 +5,7 @@
import { setTimeout } from 'node:timers/promises';
import { Inject, Injectable } from '@nestjs/common';
+import { DataSource, IsNull, LessThan, QueryFailedError, Not } from 'typeorm';
import { DI } from '@/di-symbols.js';
import type { MiMeta, MiNote, NotesRepository } from '@/models/_.js';
import type Logger from '@/logger.js';
@@ -24,6 +25,9 @@ export class CleanRemoteNotesProcessorService {
@Inject(DI.notesRepository)
private notesRepository: NotesRepository,
+ @Inject(DI.db)
+ private db: DataSource,
+
private idService: IdService,
private queueLoggerService: QueueLoggerService,
) {
@@ -31,11 +35,21 @@ export class CleanRemoteNotesProcessorService {
}
@bindThis
+ private computeProgress(minId: string, maxId: string, cursorLeft: string) {
+ const minTs = this.idService.parse(minId).date.getTime();
+ const maxTs = this.idService.parse(maxId).date.getTime();
+ const cursorTs = this.idService.parse(cursorLeft).date.getTime();
+
+ return ((cursorTs - minTs) / (maxTs - minTs)) * 100;
+ }
+
+ @bindThis
public async process(job: Bull.Job<Record<string, unknown>>): Promise<{
deletedCount: number;
oldest: number | null;
newest: number | null;
- skipped?: boolean;
+ skipped: boolean;
+ transientErrors: number;
}> {
if (!this.meta.enableRemoteNotesCleaning) {
this.logger.info('Remote notes cleaning is disabled, skipping...');
@@ -44,6 +58,7 @@ export class CleanRemoteNotesProcessorService {
oldest: null,
newest: null,
skipped: true,
+ transientErrors: 0,
};
}
@@ -52,12 +67,10 @@ export class CleanRemoteNotesProcessorService {
const maxDuration = this.meta.remoteNotesCleaningMaxProcessingDurationInMinutes * 60 * 1000; // Convert minutes to milliseconds
const startAt = Date.now();
- const MAX_NOTE_COUNT_PER_QUERY = 50;
-
- //#retion queries
- // We use string literals instead of query builder for several reasons:
- // - for removeCondition, we need to use it in having clause, which is not supported by Brackets.
- // - for recursive part, we need to preserve the order of columns, but typeorm query builder does not guarantee the order of columns in the result query
+ //#region queries
+ // The date limit for the newest note to be considered for deletion.
+ // All notes newer than this limit will always be retained.
+ const newestLimit = this.idService.gen(Date.now() - (1000 * 60 * 60 * 24 * this.meta.remoteNotesCleaningExpiryDaysForEachNotes));
// The condition for removing the notes.
// The note must be:
@@ -66,56 +79,93 @@ export class CleanRemoteNotesProcessorService {
// - not have clipped
// - not have pinned on the user profile
// - not has been favorite by any user
- const removeCondition = 'note.id < :newestLimit'
- + ' AND note."clippedCount" = 0'
- + ' AND note."userHost" IS NOT NULL'
- // using both userId and noteId instead of just noteId to use index on user_note_pining table.
- // This is safe because notes are only pinned by the user who created them.
- + ' AND NOT EXISTS(SELECT 1 FROM "user_note_pining" WHERE "noteId" = note."id" AND "userId" = note."userId")'
- // We cannot use userId trick because users can favorite notes from other users.
- + ' AND NOT EXISTS(SELECT 1 FROM "note_favorite" WHERE "noteId" = note."id")'
- ;
+ const removalCriteria = [
+ 'note."id" < :newestLimit',
+ 'note."clippedCount" = 0',
+ 'note."userHost" IS NOT NULL',
+ 'NOT EXISTS (SELECT 1 FROM user_note_pining WHERE "noteId" = note."id")',
+ 'NOT EXISTS (SELECT 1 FROM note_favorite WHERE "noteId" = note."id")',
+ ].join(' AND ');
- // The initiator query contains the oldest ${MAX_NOTE_COUNT_PER_QUERY} remote non-clipped notes
- const initiatorQuery = this.notesRepository.createQueryBuilder('note')
- .select('note.id', 'id')
- .where(removeCondition)
- .andWhere('note.id > :cursor')
- .orderBy('note.id', 'ASC')
- .limit(MAX_NOTE_COUNT_PER_QUERY);
+ const minId = (await this.notesRepository.createQueryBuilder('note')
+ .select('MIN(note.id)', 'minId')
+ .where({
+ id: LessThan(newestLimit),
+ userHost: Not(IsNull()),
+ replyId: IsNull(),
+ renoteId: IsNull(),
+ })
+ .getRawOne<{ minId?: MiNote['id'] }>())?.minId;
- // The union query queries the related notes and replies related to the initiator query
- const unionQuery = `
- SELECT "note"."id", "note"."replyId", "note"."renoteId", rn."initiatorId"
- FROM "note" "note"
- INNER JOIN "related_notes" "rn"
- ON "note"."replyId" = rn.id
- OR "note"."renoteId" = rn.id
- OR "note"."id" = rn."replyId"
- OR "note"."id" = rn."renoteId"
- `;
+ if (!minId) {
+ this.logger.info('No notes can possibly be deleted, skipping...');
+ return {
+ deletedCount: 0,
+ oldest: null,
+ newest: null,
+ skipped: false,
+ transientErrors: 0,
+ };
+ }
- const selectRelatedNotesFromInitiatorIdsQuery = `
- SELECT "note"."id" AS "id", "note"."replyId" AS "replyId", "note"."renoteId" AS "renoteId", "note"."id" AS "initiatorId"
- FROM "note" "note" WHERE "note"."id" IN (:...initiatorIds)
- `;
+ // start with a conservative limit and adjust it based on the query duration
+ const minimumLimit = 10;
+ let currentLimit = 100;
+ let cursorLeft = '0';
- const recursiveQuery = `(${selectRelatedNotesFromInitiatorIdsQuery}) UNION (${unionQuery})`;
+ const candidateNotesCteName = 'candidate_notes';
- const removableInitiatorNotesQuery = this.notesRepository.createQueryBuilder('note')
- .select('rn."initiatorId"')
- .innerJoin('related_notes', 'rn', 'note.id = rn.id')
- .groupBy('rn."initiatorId"')
- .having(`bool_and(${removeCondition})`);
+ // tree walk down all root notes, short-circuit when the first unremovable note is found
+ const candidateNotesQueryBase = this.notesRepository.createQueryBuilder('note')
+ .select('note."id"', 'id')
+ .addSelect('note."replyId"', 'replyId')
+ .addSelect('note."renoteId"', 'renoteId')
+ .addSelect('note."id"', 'rootId')
+ .addSelect('TRUE', 'isRemovable')
+ .addSelect('TRUE', 'isBase')
+ .where('note."id" > :cursorLeft')
+ .andWhere(removalCriteria)
+ .andWhere({ replyId: IsNull(), renoteId: IsNull() });
- const notesQuery = this.notesRepository.createQueryBuilder('note')
- .addCommonTableExpression(recursiveQuery, 'related_notes', { recursive: true })
+ const candidateNotesQueryInductive = this.notesRepository.createQueryBuilder('note')
.select('note.id', 'id')
- .addSelect('rn."initiatorId"')
- .innerJoin('related_notes', 'rn', 'note.id = rn.id')
- .where(`rn."initiatorId" IN (${removableInitiatorNotesQuery.getQuery()})`)
- .distinctOn(['note.id']);
- //#endregion
+ .addSelect('note."replyId"', 'replyId')
+ .addSelect('note."renoteId"', 'renoteId')
+ .addSelect('parent."rootId"', 'rootId')
+ .addSelect(removalCriteria, 'isRemovable')
+ .addSelect('FALSE', 'isBase')
+ .innerJoin(candidateNotesCteName, 'parent', 'parent."id" = note."replyId" OR parent."id" = note."renoteId"')
+ .where('parent."isRemovable" = TRUE');
+
+ // A note tree can be deleted if there are no unremovable rows with the same rootId.
+ //
+ // `candidate_notes` will have the following structure after recursive query (some columns omitted):
+ // After performing a LEFT JOIN with `candidate_notes` as `unremovable`,
+ // the note tree containing unremovable notes will be anti-joined.
+ // For removable rows, the `unremovable` columns will have `NULL` values.
+ // | id | rootId | isRemovable |
+ // |-----|--------|-------------|
+ // | aaa | aaa | TRUE |
+ // | bbb | aaa | FALSE |
+ // | ccc | aaa | FALSE |
+ // | ddd | ddd | TRUE |
+ // | eee | ddd | TRUE |
+ // | fff | fff | TRUE |
+ // | ggg | ggg | FALSE |
+ //
+ const candidateNotesQuery = this.db.createQueryBuilder()
+ .select(`"${candidateNotesCteName}"."id"`, 'id')
+ .addSelect('unremovable."id" IS NULL', 'isRemovable')
+ .addSelect(`BOOL_OR("${candidateNotesCteName}"."isBase")`, 'isBase')
+ .addCommonTableExpression(
+ `((SELECT "base".* FROM (${candidateNotesQueryBase.orderBy('note.id', 'ASC').limit(currentLimit).getQuery()}) AS "base") UNION ${candidateNotesQueryInductive.getQuery()})`,
+ candidateNotesCteName,
+ { recursive: true },
+ )
+ .from(candidateNotesCteName, candidateNotesCteName)
+ .leftJoin(candidateNotesCteName, 'unremovable', `unremovable."rootId" = "${candidateNotesCteName}"."rootId" AND unremovable."isRemovable" = FALSE`)
+ .groupBy(`"${candidateNotesCteName}"."id"`)
+ .addGroupBy('unremovable."id" IS NULL');
const stats = {
deletedCount: 0,
@@ -123,74 +173,107 @@ export class CleanRemoteNotesProcessorService {
newest: null as number | null,
};
- // The date limit for the newest note to be considered for deletion.
- // All notes newer than this limit will always be retained.
- const newestLimit = this.idService.gen(Date.now() - (1000 * 60 * 60 * 24 * this.meta.remoteNotesCleaningExpiryDaysForEachNotes));
-
- let cursor = '0'; // oldest note ID to start from
-
- while (true) {
+ let lowThroughputWarned = false;
+ let transientErrors = 0;
+ for (;;) {
//#region check time
const batchBeginAt = Date.now();
const elapsed = batchBeginAt - startAt;
+ const progress = this.computeProgress(minId, newestLimit, cursorLeft > minId ? cursorLeft : minId);
+
if (elapsed >= maxDuration) {
- this.logger.info(`Reached maximum duration of ${maxDuration}ms, stopping...`);
- job.log('Reached maximum duration, stopping cleaning.');
+ job.log(`Reached maximum duration of ${maxDuration}ms, stopping... (last cursor: ${cursorLeft}, final progress ${progress}%)`);
job.updateProgress(100);
break;
}
- job.updateProgress((elapsed / maxDuration) * 100);
+ const wallClockUsage = elapsed / maxDuration;
+ if (wallClockUsage > 0.5 && progress < 50 && !lowThroughputWarned) {
+ const msg = `Not projected to finish in time! (wall clock usage ${wallClockUsage * 100}% at ${progress}%, current limit ${currentLimit})`;
+ this.logger.warn(msg);
+ job.log(msg);
+ lowThroughputWarned = true;
+ }
+ job.updateProgress(progress);
//#endregion
- // First, we fetch the initiator notes that are older than the newestLimit.
- const initiatorNotes: { id: MiNote['id'] }[] = await initiatorQuery.setParameters({ cursor, newestLimit }).getRawMany();
+ const queryBegin = performance.now();
+ let noteIds = null;
- // update the cursor to the newest initiatorId found in the fetched notes.
- const newCursor = initiatorNotes.reduce((max, note) => note.id > max ? note.id : max, cursor);
+ try {
+ noteIds = await candidateNotesQuery.setParameters(
+ { newestLimit, cursorLeft },
+ ).getRawMany<{ id: MiNote['id'], isRemovable: boolean, isBase: boolean }>();
+ } catch (e) {
+ if (currentLimit > minimumLimit && e instanceof QueryFailedError && e.driverError?.code === '57014') {
+ // Statement timeout (maybe suddenly hit a large note tree), reduce the limit and try again
+ // continuous failures will eventually converge to currentLimit == minimumLimit and then throw
+ currentLimit = Math.max(minimumLimit, Math.floor(currentLimit * 0.25));
+ continue;
+ }
+ throw e;
+ }
- if (initiatorNotes.length === 0 || cursor === newCursor || newCursor >= newestLimit) {
- // If no notes were found or the cursor did not change, we can stop.
- job.log('No more notes to clean. (no initiator notes found or cursor did not change.)');
+ if (noteIds.length === 0) {
+ job.log('No more notes to clean.');
break;
}
- const notes: { id: MiNote['id'], initiatorId: MiNote['id'] }[] = await notesQuery.setParameters({
- initiatorIds: initiatorNotes.map(note => note.id),
- newestLimit,
- }).getRawMany();
-
- cursor = newCursor;
+ const queryDuration = performance.now() - queryBegin;
+ // try to adjust such that each query takes about 1~5 seconds and reasonable NodeJS heap so the task stays responsive
+ // this should not oscillate..
+ if (queryDuration > 5000 || noteIds.length > 5000) {
+ currentLimit = Math.floor(currentLimit * 0.5);
+ } else if (queryDuration < 1000 && noteIds.length < 1000) {
+ currentLimit = Math.floor(currentLimit * 1.5);
+ }
+ // clamp to a sane range
+ currentLimit = Math.min(Math.max(currentLimit, minimumLimit), 5000);
- if (notes.length > 0) {
- await this.notesRepository.delete(notes.map(note => note.id));
+ const deletableNoteIds = noteIds.filter(result => result.isRemovable).map(result => result.id);
+ if (deletableNoteIds.length > 0) {
+ try {
+ await this.notesRepository.delete(deletableNoteIds);
- for (const { id } of notes) {
- const t = this.idService.parse(id).date.getTime();
- if (stats.oldest === null || t < stats.oldest) {
- stats.oldest = t;
+ for (const id of deletableNoteIds) {
+ const t = this.idService.parse(id).date.getTime();
+ if (stats.oldest === null || t < stats.oldest) {
+ stats.oldest = t;
+ }
+ if (stats.newest === null || t > stats.newest) {
+ stats.newest = t;
+ }
}
- if (stats.newest === null || t > stats.newest) {
- stats.newest = t;
+
+ stats.deletedCount += deletableNoteIds.length;
+ } catch (e) {
+ // check for integrity violation errors (class 23) that might have occurred between the check and the delete
+ // we can safely continue to the next batch
+ if (e instanceof QueryFailedError && e.driverError?.code?.startsWith('23')) {
+ transientErrors++;
+ job.log(`Error deleting notes: ${e} (transient race condition?)`);
+ } else {
+ throw e;
}
}
-
- stats.deletedCount += notes.length;
}
- job.log(`Deleted ${notes.length} from ${initiatorNotes.length} initiators; ${Date.now() - batchBeginAt}ms`);
+ cursorLeft = noteIds.filter(result => result.isBase).reduce((max, { id }) => id > max ? id : max, cursorLeft);
- if (initiatorNotes.length < MAX_NOTE_COUNT_PER_QUERY) {
- // If we fetched less than the maximum, it means there are no more notes to process.
- job.log(`No more notes to clean. (fewer than MAX_NOTE_COUNT_PER_QUERY =${MAX_NOTE_COUNT_PER_QUERY}.)`);
- break;
+ job.log(`Deleted ${noteIds.length} notes; ${Date.now() - batchBeginAt}ms`);
+
+ if (process.env.NODE_ENV !== 'test') {
+ await setTimeout(Math.min(1000 * 5, queryDuration)); // Wait a moment to avoid overwhelming the db
}
+ };
- await setTimeout(1000 * 5); // Wait a moment to avoid overwhelming the db
+ if (transientErrors > 0) {
+ const msg = `${transientErrors} transient errors occurred while cleaning remote notes. You may need a second pass to complete the cleaning.`;
+ this.logger.warn(msg);
+ job.log(msg);
}
-
this.logger.succ('cleaning of remote notes completed.');
return {
@@ -198,6 +281,7 @@ export class CleanRemoteNotesProcessorService {
oldest: stats.oldest,
newest: stats.newest,
skipped: false,
+ transientErrors,
};
}
}