summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/QueueProcessorService.ts
diff options
context:
space:
mode:
authorJulia <julia@insertdomain.name>2024-12-31 02:30:13 +0000
committerJulia <julia@insertdomain.name>2024-12-31 02:30:13 +0000
commit4c0bbddd0fba7e0d76fb484312e691ee29fe5858 (patch)
tree4bb1a3a2a79c679ac021a2199bd526be469524d4 /packages/backend/src/queue/QueueProcessorService.ts
parentmerge: fixes for 2024.9.4 (if we want to) (!770) (diff)
parentBump version (diff)
downloadsharkey-4c0bbddd0fba7e0d76fb484312e691ee29fe5858.tar.gz
sharkey-4c0bbddd0fba7e0d76fb484312e691ee29fe5858.tar.bz2
sharkey-4c0bbddd0fba7e0d76fb484312e691ee29fe5858.zip
merge: Bump stable version (!842)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/842
Diffstat (limited to 'packages/backend/src/queue/QueueProcessorService.ts')
-rw-r--r--packages/backend/src/queue/QueueProcessorService.ts33
1 files changed, 27 insertions, 6 deletions
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts
index eaeb6d58df..297edfd545 100644
--- a/packages/backend/src/queue/QueueProcessorService.ts
+++ b/packages/backend/src/queue/QueueProcessorService.ts
@@ -10,6 +10,8 @@ import type { Config } from '@/config.js';
import { DI } from '@/di-symbols.js';
import type Logger from '@/logger.js';
import { bindThis } from '@/decorators.js';
+import { CheckModeratorsActivityProcessorService } from '@/queue/processors/CheckModeratorsActivityProcessorService.js';
+import { StatusError } from '@/misc/status-error.js';
import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js';
import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js';
import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
@@ -43,6 +45,7 @@ import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMu
import { BakeBufferedReactionsProcessorService } from './processors/BakeBufferedReactionsProcessorService.js';
import { CleanProcessorService } from './processors/CleanProcessorService.js';
import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
+import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostProcessorService.js';
import { QueueLoggerService } from './QueueLoggerService.js';
import { QUEUE, baseQueueOptions } from './const.js';
import { ImportNotesProcessorService } from './processors/ImportNotesProcessorService.js';
@@ -68,7 +71,7 @@ function getJobInfo(job: Bull.Job | undefined, increment = false): string {
// onActiveとかonCompletedのattemptsMadeがなぜか0始まりなのでインクリメントする
const currentAttempts = job.attemptsMade + (increment ? 1 : 0);
- const maxAttempts = job.opts ? job.opts.attempts : 0;
+ const maxAttempts = job.opts.attempts ?? 0;
return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`;
}
@@ -85,6 +88,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
private relationshipQueueWorker: Bull.Worker;
private objectStorageQueueWorker: Bull.Worker;
private endedPollNotificationQueueWorker: Bull.Worker;
+ private schedulerNotePostQueueWorker: Bull.Worker;
constructor(
@Inject(DI.config)
@@ -124,7 +128,9 @@ export class QueueProcessorService implements OnApplicationShutdown {
private aggregateRetentionProcessorService: AggregateRetentionProcessorService,
private checkExpiredMutingsProcessorService: CheckExpiredMutingsProcessorService,
private bakeBufferedReactionsProcessorService: BakeBufferedReactionsProcessorService,
+ private checkModeratorsActivityProcessorService: CheckModeratorsActivityProcessorService,
private cleanProcessorService: CleanProcessorService,
+ private scheduleNotePostProcessorService: ScheduleNotePostProcessorService,
) {
this.logger = this.queueLoggerService.logger;
@@ -132,7 +138,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
// 何故かeがundefinedで来ることがある
if (!e) return '?';
- if (e instanceof Bull.UnrecoverableError || e.name === 'AbortError') {
+ if (e instanceof Bull.UnrecoverableError || e.name === 'AbortError' || e instanceof StatusError) {
return `${e.name}: ${e.message}`;
}
@@ -146,12 +152,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
function renderJob(job?: Bull.Job) {
if (!job) return '?';
- return {
- name: job.name || undefined,
+ const info: Record<string, string> = {
info: getJobInfo(job),
- failedReason: job.failedReason || undefined,
data: job.data,
};
+
+ if (job.name) info.name = job.name;
+ if (job.failedReason) info.failedReason = job.failedReason;
+
+ return info;
}
//#region system
@@ -164,6 +173,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
case 'aggregateRetention': return this.aggregateRetentionProcessorService.process();
case 'checkExpiredMutings': return this.checkExpiredMutingsProcessorService.process();
case 'bakeBufferedReactions': return this.bakeBufferedReactionsProcessorService.process();
+ case 'checkModeratorsActivity': return this.checkModeratorsActivityProcessorService.process();
case 'clean': return this.cleanProcessorService.process();
default: throw new Error(`unrecognized job type ${job.name} for system`);
}
@@ -339,7 +349,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
+ .on('error', (err: Error) => logger.error('inbox error:', renderError(err)))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -526,6 +536,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
//#endregion
+
+ //#region schedule note post
+ {
+ this.schedulerNotePostQueueWorker = new Bull.Worker(QUEUE.SCHEDULE_NOTE_POST, (job) => this.scheduleNotePostProcessorService.process(job), {
+ ...baseQueueOptions(this.config, QUEUE.SCHEDULE_NOTE_POST),
+ autorun: false,
+ });
+ }
+ //#endregion
}
@bindThis
@@ -540,6 +559,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
this.relationshipQueueWorker.run(),
this.objectStorageQueueWorker.run(),
this.endedPollNotificationQueueWorker.run(),
+ this.schedulerNotePostQueueWorker.run(),
]);
}
@@ -555,6 +575,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
this.relationshipQueueWorker.close(),
this.objectStorageQueueWorker.close(),
this.endedPollNotificationQueueWorker.close(),
+ this.schedulerNotePostQueueWorker.close(),
]);
}