summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts')
-rw-r--r--packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts59
1 files changed, 48 insertions, 11 deletions
diff --git a/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts b/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts
index 36c34c753c..bc99dea000 100644
--- a/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts
+++ b/packages/backend/src/queue/processors/CleanRemoteNotesProcessorService.ts
@@ -51,6 +51,17 @@ export class CleanRemoteNotesProcessorService {
skipped: boolean;
transientErrors: number;
}> {
+ const getConfig = () => {
+ return {
+ enabled: this.meta.enableRemoteNotesCleaning,
+ maxDuration: this.meta.remoteNotesCleaningMaxProcessingDurationInMinutes * 60 * 1000, // Convert minutes to milliseconds
+ // The date limit for the newest note to be considered for deletion.
+ // All notes newer than this limit will always be retained.
+ newestLimit: this.idService.gen(Date.now() - (1000 * 60 * 60 * 24 * this.meta.remoteNotesCleaningExpiryDaysForEachNotes)),
+ };
+ };
+
+ const initialConfig = getConfig();
if (!this.meta.enableRemoteNotesCleaning) {
this.logger.info('Remote notes cleaning is disabled, skipping...');
return {
@@ -64,13 +75,9 @@ export class CleanRemoteNotesProcessorService {
this.logger.info('cleaning remote notes...');
- const maxDuration = this.meta.remoteNotesCleaningMaxProcessingDurationInMinutes * 60 * 1000; // Convert minutes to milliseconds
const startAt = Date.now();
//#region queries
- // The date limit for the newest note to be considered for deletion.
- // All notes newer than this limit will always be retained.
- const newestLimit = this.idService.gen(Date.now() - (1000 * 60 * 60 * 24 * this.meta.remoteNotesCleaningExpiryDaysForEachNotes));
// The condition for removing the notes.
// The note must be:
@@ -92,7 +99,7 @@ export class CleanRemoteNotesProcessorService {
const minId = (await this.notesRepository.createQueryBuilder('note')
.select('MIN(note.id)', 'minId')
.where({
- id: LessThan(newestLimit),
+ id: LessThan(initialConfig.newestLimit),
userHost: Not(IsNull()),
replyId: IsNull(),
renoteId: IsNull(),
@@ -155,12 +162,12 @@ export class CleanRemoteNotesProcessorService {
// | fff | fff | TRUE |
// | ggg | ggg | FALSE |
//
- const candidateNotesQuery = this.db.createQueryBuilder()
+ const candidateNotesQuery = ({ limit }: { limit: number }) => this.db.createQueryBuilder()
.select(`"${candidateNotesCteName}"."id"`, 'id')
.addSelect('unremovable."id" IS NULL', 'isRemovable')
.addSelect(`BOOL_OR("${candidateNotesCteName}"."isBase")`, 'isBase')
.addCommonTableExpression(
- `((SELECT "base".* FROM (${candidateNotesQueryBase.orderBy('note.id', 'ASC').limit(currentLimit).getQuery()}) AS "base") UNION ${candidateNotesQueryInductive.getQuery()})`,
+ `((SELECT "base".* FROM (${candidateNotesQueryBase.orderBy('note.id', 'ASC').limit(limit).getQuery()}) AS "base") UNION ${candidateNotesQueryInductive.getQuery()})`,
candidateNotesCteName,
{ recursive: true },
)
@@ -178,6 +185,11 @@ export class CleanRemoteNotesProcessorService {
let lowThroughputWarned = false;
let transientErrors = 0;
for (;;) {
+ const { enabled, maxDuration, newestLimit } = getConfig();
+ if (!enabled) {
+ this.logger.info('Remote notes cleaning is disabled, processing stopped...');
+ break;
+ }
//#region check time
const batchBeginAt = Date.now();
@@ -205,13 +217,38 @@ export class CleanRemoteNotesProcessorService {
let noteIds = null;
try {
- noteIds = await candidateNotesQuery.setParameters(
+ noteIds = await candidateNotesQuery({ limit: currentLimit }).setParameters(
{ newestLimit, cursorLeft },
).getRawMany<{ id: MiNote['id'], isRemovable: boolean, isBase: boolean }>();
} catch (e) {
- if (currentLimit > minimumLimit && e instanceof QueryFailedError && e.driverError?.code === '57014') {
- // Statement timeout (maybe suddenly hit a large note tree), reduce the limit and try again
- // continuous failures will eventually converge to currentLimit == minimumLimit and then throw
+ if (e instanceof QueryFailedError && e.driverError?.code === '57014') {
+ // Statement timeout (maybe suddenly hit a large note tree), if possible, reduce the limit and try again
+ // if not possible, skip the current batch of notes and find the next root note
+ if (currentLimit <= minimumLimit) {
+ job.log('Local note tree complexity is too high, finding next root note...');
+
+ const idWindow = await this.notesRepository.createQueryBuilder('note')
+ .select('id')
+ .where('note.id > :cursorLeft')
+ .andWhere(removalCriteria)
+ .andWhere({ replyId: IsNull(), renoteId: IsNull() })
+ .orderBy('note.id', 'ASC')
+ .limit(minimumLimit + 1)
+ .setParameters({ cursorLeft, newestLimit })
+ .getRawMany<{ id?: MiNote['id'] }>();
+
+ job.log(`Skipped note IDs: ${idWindow.slice(0, minimumLimit).map(id => id.id).join(', ')}`);
+
+ const lastId = idWindow.at(minimumLimit)?.id;
+
+ if (!lastId) {
+ job.log('No more notes to clean.');
+ break;
+ }
+
+ cursorLeft = lastId;
+ continue;
+ }
currentLimit = Math.max(minimumLimit, Math.floor(currentLimit * 0.25));
continue;
}