summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/QueueProcessorService.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/queue/QueueProcessorService.ts')
-rw-r--r--packages/backend/src/queue/QueueProcessorService.ts125
1 files changed, 78 insertions, 47 deletions
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts
index 7f7ce2452c..4c1a6a1d9e 100644
--- a/packages/backend/src/queue/QueueProcessorService.ts
+++ b/packages/backend/src/queue/QueueProcessorService.ts
@@ -11,7 +11,7 @@ import { DI } from '@/di-symbols.js';
import type Logger from '@/logger.js';
import { bindThis } from '@/decorators.js';
import { CheckModeratorsActivityProcessorService } from '@/queue/processors/CheckModeratorsActivityProcessorService.js';
-import { StatusError } from '@/misc/status-error.js';
+import { renderFullError } from '@/misc/render-full-error.js';
import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js';
import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js';
import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
@@ -73,7 +73,9 @@ function getJobInfo(job: Bull.Job | undefined, increment = false): string {
const currentAttempts = job.attemptsMade + (increment ? 1 : 0);
const maxAttempts = job.opts.attempts ?? 0;
- return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`;
+ return job.name
+ ? `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated} name=${job.name}`
+ : `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`;
}
@Injectable()
@@ -134,35 +136,6 @@ export class QueueProcessorService implements OnApplicationShutdown {
) {
this.logger = this.queueLoggerService.logger;
- function renderError(e?: Error) {
- // 何故かeがundefinedで来ることがある
- if (!e) return '?';
-
- if (e instanceof Bull.UnrecoverableError || e.name === 'AbortError' || e instanceof StatusError) {
- return `${e.name}: ${e.message}`;
- }
-
- return {
- stack: e.stack,
- message: e.message,
- name: e.name,
- };
- }
-
- function renderJob(job?: Bull.Job) {
- if (!job) return '?';
-
- const info: Record<string, string> = {
- info: getJobInfo(job),
- data: job.data,
- };
-
- if (job.name) info.name = job.name;
- if (job.failedReason) info.failedReason = job.failedReason;
-
- return info;
- }
-
//#region system
{
const processer = (job: Bull.Job) => {
@@ -196,7 +169,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err: Error) => {
- logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
+ this.logError(logger, err, job);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: System: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
@@ -204,7 +177,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
+ .on('error', (err: Error) => this.logError(logger, err))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -261,7 +234,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
+ this.logError(logger, err, job);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: DB: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
@@ -269,7 +242,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
+ .on('error', (err: Error) => this.logError(logger, err))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -301,7 +274,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
+ this.logError(logger, err, job);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: Deliver: ${err.name}: ${err.message}`, {
level: 'error',
@@ -309,7 +282,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
+ .on('error', (err: Error) => this.logError(logger, err))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -341,7 +314,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job: renderJob(job), e: renderError(err) });
+ this.logError(logger, err, job);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: Inbox: ${err.name}: ${err.message}`, {
level: 'error',
@@ -349,7 +322,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
- .on('error', (err: Error) => logger.error('inbox error:', renderError(err)))
+ .on('error', (err: Error) => this.logError(logger, err))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -381,7 +354,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
+ this.logError(logger, err, job);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: UserWebhookDeliver: ${err.name}: ${err.message}`, {
level: 'error',
@@ -389,7 +362,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
+ .on('error', (err: Error) => this.logError(logger, err))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -421,7 +394,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
+ this.logError(logger, err, job);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: SystemWebhookDeliver: ${err.name}: ${err.message}`, {
level: 'error',
@@ -429,7 +402,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
+ .on('error', (err: Error) => this.logError(logger, err))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -468,7 +441,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
+ this.logError(logger, err, job);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: Relationship: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
@@ -476,7 +449,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
+ .on('error', (err: Error) => this.logError(logger, err))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -509,7 +482,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
+ this.logError(logger, err, job);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: ObjectStorage: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
@@ -517,13 +490,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
+ .on('error', (err: Error) => this.logError(logger, err))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
//#region ended poll notification
{
+ const logger = this.logger.createSubLogger('endedPollNotification');
+
this.endedPollNotificationQueueWorker = new Bull.Worker(QUEUE.ENDED_POLL_NOTIFICATION, (job) => {
if (this.config.sentryForBackend) {
return Sentry.startSpan({ name: 'Queue: EndedPollNotification' }, () => this.endedPollNotificationProcessorService.process(job));
@@ -534,19 +509,75 @@ export class QueueProcessorService implements OnApplicationShutdown {
...baseWorkerOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION),
autorun: false,
});
+ this.endedPollNotificationQueueWorker
+ .on('active', (job) => logger.debug(`active id=${job.id}`))
+ .on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
+ .on('failed', (job, err) => {
+ this.logError(logger, err, job);
+ if (config.sentryForBackend) {
+ Sentry.captureMessage(`Queue: EndedPollNotification: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
+ level: 'error',
+ extra: { job, err },
+ });
+ }
+ })
+ .on('error', (err: Error) => this.logError(logger, err))
+ .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
//#region schedule note post
{
+ const logger = this.logger.createSubLogger('scheduleNotePost');
+
this.schedulerNotePostQueueWorker = new Bull.Worker(QUEUE.SCHEDULE_NOTE_POST, (job) => this.scheduleNotePostProcessorService.process(job), {
...baseWorkerOptions(this.config, QUEUE.SCHEDULE_NOTE_POST),
autorun: false,
});
+ this.schedulerNotePostQueueWorker
+ .on('active', (job) => logger.debug(`active id=${job.id}`))
+ .on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
+ .on('failed', (job, err) => {
+ this.logError(logger, err, job);
+ if (config.sentryForBackend) {
+ Sentry.captureMessage(`Queue: ${QUEUE.SCHEDULE_NOTE_POST}: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
+ level: 'error',
+ extra: { job, err },
+ });
+ }
+ })
+ .on('error', (err: Error) => this.logError(logger, err))
+ .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
}
+ private logError(logger: Logger, err: unknown, job?: Bull.Job | null): void {
+ const parts: string[] = [];
+
+ // Render job
+ if (job) {
+ parts.push('job [');
+ parts.push(getJobInfo(job));
+ parts.push('] failed: ');
+ } else {
+ parts.push('job failed: ');
+ }
+
+ // Render error
+ const fullError = renderFullError(err);
+ const errorText = typeof(fullError) === 'string' ? fullError : undefined;
+ if (errorText) {
+ parts.push(errorText);
+ } else if (job?.failedReason) {
+ parts.push(job.failedReason);
+ }
+
+ const message = parts.join('');
+ const data = typeof(fullError) !== 'string' ? { err: fullError } : undefined;
+ logger.error(message, data);
+ }
+
@bindThis
public async start(): Promise<void> {
await Promise.all([