diff options
| author | misskey-release-bot[bot] <157398866+misskey-release-bot[bot]@users.noreply.github.com> | 2024-10-15 04:53:46 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-10-15 04:53:46 +0000 |
| commit | b99e13e66730209ffa7a87785257b9a785efc7ea (patch) | |
| tree | 73f32398d2afa502dcef62ff37fe99bc832eb792 /packages/backend/src/queue/QueueProcessorService.ts | |
| parent | Merge pull request #14675 from misskey-dev/develop (diff) | |
| parent | Release: 2024.10.1 (diff) | |
| download | misskey-b99e13e66730209ffa7a87785257b9a785efc7ea.tar.gz misskey-b99e13e66730209ffa7a87785257b9a785efc7ea.tar.bz2 misskey-b99e13e66730209ffa7a87785257b9a785efc7ea.zip | |
Merge pull request #14741 from misskey-dev/develop
Release: 2024.10.1
Diffstat (limited to 'packages/backend/src/queue/QueueProcessorService.ts')
| -rw-r--r-- | packages/backend/src/queue/QueueProcessorService.ts | 89 |
1 files changed, 51 insertions, 38 deletions
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index e9e1c45224..6940e1c188 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -10,6 +10,7 @@ import type { Config } from '@/config.js'; import { DI } from '@/di-symbols.js'; import type Logger from '@/logger.js'; import { bindThis } from '@/decorators.js'; +import { CheckModeratorsActivityProcessorService } from '@/queue/processors/CheckModeratorsActivityProcessorService.js'; import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js'; import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js'; import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js'; @@ -66,7 +67,7 @@ function getJobInfo(job: Bull.Job | undefined, increment = false): string { // onActiveとかonCompletedのattemptsMadeがなぜか0始まりなのでインクリメントする const currentAttempts = job.attemptsMade + (increment ? 1 : 0); - const maxAttempts = job.opts ? job.opts.attempts : 0; + const maxAttempts = job.opts.attempts ?? 0; return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`; } @@ -120,24 +121,35 @@ export class QueueProcessorService implements OnApplicationShutdown { private aggregateRetentionProcessorService: AggregateRetentionProcessorService, private checkExpiredMutingsProcessorService: CheckExpiredMutingsProcessorService, private bakeBufferedReactionsProcessorService: BakeBufferedReactionsProcessorService, + private checkModeratorsActivityProcessorService: CheckModeratorsActivityProcessorService, private cleanProcessorService: CleanProcessorService, ) { this.logger = this.queueLoggerService.logger; - function renderError(e: Error): any { - if (e) { // 何故かeがundefinedで来ることがある - return { - stack: e.stack, - message: e.message, - name: e.name, - }; - } else { - return { - stack: '?', - message: '?', - name: '?', - }; + function renderError(e?: Error) { + // 何故かeがundefinedで来ることがある + if (!e) return '?'; + + if (e instanceof Bull.UnrecoverableError || e.name === 'AbortError') { + return `${e.name}: ${e.message}`; } + + return { + stack: e.stack, + message: e.message, + name: e.name, + }; + } + + function renderJob(job?: Bull.Job) { + if (!job) return '?'; + + return { + name: job.name || undefined, + info: getJobInfo(job), + failedReason: job.failedReason || undefined, + data: job.data, + }; } //#region system @@ -150,6 +162,7 @@ export class QueueProcessorService implements OnApplicationShutdown { case 'aggregateRetention': return this.aggregateRetentionProcessorService.process(); case 'checkExpiredMutings': return this.checkExpiredMutingsProcessorService.process(); case 'bakeBufferedReactions': return this.bakeBufferedReactionsProcessorService.process(); + case 'checkModeratorsActivity': return this.checkModeratorsActivityProcessorService.process(); case 'clean': return this.cleanProcessorService.process(); default: throw new Error(`unrecognized job type ${job.name} for system`); } @@ -172,15 +185,15 @@ export class QueueProcessorService implements OnApplicationShutdown { .on('active', (job) => logger.debug(`active id=${job.id}`)) .on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`)) .on('failed', (job, err: Error) => { - logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); + logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) }); if (config.sentryForBackend) { - Sentry.captureMessage(`Queue: System: ${job?.name ?? '?'}: ${err.message}`, { + Sentry.captureMessage(`Queue: System: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, { level: 'error', extra: { job, err }, }); } }) - .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) })) .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -229,15 +242,15 @@ export class QueueProcessorService implements OnApplicationShutdown { .on('active', (job) => logger.debug(`active id=${job.id}`)) .on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`)) .on('failed', (job, err) => { - logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); + logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) }); if (config.sentryForBackend) { - Sentry.captureMessage(`Queue: DB: ${job?.name ?? '?'}: ${err.message}`, { + Sentry.captureMessage(`Queue: DB: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, { level: 'error', extra: { job, err }, }); } }) - .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) })) .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -269,15 +282,15 @@ export class QueueProcessorService implements OnApplicationShutdown { .on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) .on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) .on('failed', (job, err) => { - logger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`); + logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`); if (config.sentryForBackend) { - Sentry.captureMessage(`Queue: Deliver: ${err.message}`, { + Sentry.captureMessage(`Queue: Deliver: ${err.name}: ${err.message}`, { level: 'error', extra: { job, err }, }); } }) - .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) })) .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -309,15 +322,15 @@ export class QueueProcessorService implements OnApplicationShutdown { .on('active', (job) => logger.debug(`active ${getJobInfo(job, true)}`)) .on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)}`)) .on('failed', (job, err) => { - logger.error(`failed(${err.stack}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, e: renderError(err) }); + logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job: renderJob(job), e: renderError(err) }); if (config.sentryForBackend) { - Sentry.captureMessage(`Queue: Inbox: ${err.message}`, { + Sentry.captureMessage(`Queue: Inbox: ${err.name}: ${err.message}`, { level: 'error', extra: { job, err }, }); } }) - .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) })) .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -349,15 +362,15 @@ export class QueueProcessorService implements OnApplicationShutdown { .on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) .on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) .on('failed', (job, err) => { - logger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`); + logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`); if (config.sentryForBackend) { - Sentry.captureMessage(`Queue: UserWebhookDeliver: ${err.message}`, { + Sentry.captureMessage(`Queue: UserWebhookDeliver: ${err.name}: ${err.message}`, { level: 'error', extra: { job, err }, }); } }) - .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) })) .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -389,15 +402,15 @@ export class QueueProcessorService implements OnApplicationShutdown { .on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) .on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) .on('failed', (job, err) => { - logger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`); + logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`); if (config.sentryForBackend) { - Sentry.captureMessage(`Queue: SystemWebhookDeliver: ${err.message}`, { + Sentry.captureMessage(`Queue: SystemWebhookDeliver: ${err.name}: ${err.message}`, { level: 'error', extra: { job, err }, }); } }) - .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) })) .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -436,15 +449,15 @@ export class QueueProcessorService implements OnApplicationShutdown { .on('active', (job) => logger.debug(`active id=${job.id}`)) .on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`)) .on('failed', (job, err) => { - logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); + logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) }); if (config.sentryForBackend) { - Sentry.captureMessage(`Queue: Relationship: ${job?.name ?? '?'}: ${err.message}`, { + Sentry.captureMessage(`Queue: Relationship: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, { level: 'error', extra: { job, err }, }); } }) - .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) })) .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -477,15 +490,15 @@ export class QueueProcessorService implements OnApplicationShutdown { .on('active', (job) => logger.debug(`active id=${job.id}`)) .on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`)) .on('failed', (job, err) => { - logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); + logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) }); if (config.sentryForBackend) { - Sentry.captureMessage(`Queue: ObjectStorage: ${job?.name ?? '?'}: ${err.message}`, { + Sentry.captureMessage(`Queue: ObjectStorage: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, { level: 'error', extra: { job, err }, }); } }) - .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) })) .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion |