summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/QueueProcessorService.ts
diff options
context:
space:
mode:
authorJulia <julia@insertdomain.name>2024-11-05 03:59:23 +0000
committerJulia <julia@insertdomain.name>2024-11-05 03:59:23 +0000
commit680e3ac7a3084313ed4857ffca2c582c5b3c7348 (patch)
tree5621986847b8390b7c4f8e2f63cc53b180982b67 /packages/backend/src/queue/QueueProcessorService.ts
parentmerge: Bump version (!635) (diff)
parentcomment out sharkey-specific crowdin link (diff)
downloadsharkey-680e3ac7a3084313ed4857ffca2c582c5b3c7348.tar.gz
sharkey-680e3ac7a3084313ed4857ffca2c582c5b3c7348.tar.bz2
sharkey-680e3ac7a3084313ed4857ffca2c582c5b3c7348.zip
merge: release 2024.9.1 (!733)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/733 Approved-by: Marie <github@yuugi.dev> Approved-by: Julia <julia@insertdomain.name>
Diffstat (limited to 'packages/backend/src/queue/QueueProcessorService.ts')
-rw-r--r--packages/backend/src/queue/QueueProcessorService.ts87
1 files changed, 50 insertions, 37 deletions
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts
index 7a6169bf9c..eaeb6d58df 100644
--- a/packages/backend/src/queue/QueueProcessorService.ts
+++ b/packages/backend/src/queue/QueueProcessorService.ts
@@ -40,6 +40,7 @@ import { TickChartsProcessorService } from './processors/TickChartsProcessorServ
import { ResyncChartsProcessorService } from './processors/ResyncChartsProcessorService.js';
import { CleanChartsProcessorService } from './processors/CleanChartsProcessorService.js';
import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js';
+import { BakeBufferedReactionsProcessorService } from './processors/BakeBufferedReactionsProcessorService.js';
import { CleanProcessorService } from './processors/CleanProcessorService.js';
import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
import { QueueLoggerService } from './QueueLoggerService.js';
@@ -122,24 +123,35 @@ export class QueueProcessorService implements OnApplicationShutdown {
private cleanChartsProcessorService: CleanChartsProcessorService,
private aggregateRetentionProcessorService: AggregateRetentionProcessorService,
private checkExpiredMutingsProcessorService: CheckExpiredMutingsProcessorService,
+ private bakeBufferedReactionsProcessorService: BakeBufferedReactionsProcessorService,
private cleanProcessorService: CleanProcessorService,
) {
this.logger = this.queueLoggerService.logger;
- function renderError(e: Error): any {
- if (e) { // 何故かeがundefinedで来ることがある
- return {
- stack: e.stack,
- message: e.message,
- name: e.name,
- };
- } else {
- return {
- stack: '?',
- message: '?',
- name: '?',
- };
+ function renderError(e?: Error) {
+ // 何故かeがundefinedで来ることがある
+ if (!e) return '?';
+
+ if (e instanceof Bull.UnrecoverableError || e.name === 'AbortError') {
+ return `${e.name}: ${e.message}`;
}
+
+ return {
+ stack: e.stack,
+ message: e.message,
+ name: e.name,
+ };
+ }
+
+ function renderJob(job?: Bull.Job) {
+ if (!job) return '?';
+
+ return {
+ name: job.name || undefined,
+ info: getJobInfo(job),
+ failedReason: job.failedReason || undefined,
+ data: job.data,
+ };
}
//#region system
@@ -151,6 +163,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
case 'cleanCharts': return this.cleanChartsProcessorService.process();
case 'aggregateRetention': return this.aggregateRetentionProcessorService.process();
case 'checkExpiredMutings': return this.checkExpiredMutingsProcessorService.process();
+ case 'bakeBufferedReactions': return this.bakeBufferedReactionsProcessorService.process();
case 'clean': return this.cleanProcessorService.process();
default: throw new Error(`unrecognized job type ${job.name} for system`);
}
@@ -173,15 +186,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err: Error) => {
- logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) });
+ logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
if (config.sentryForBackend) {
- Sentry.captureMessage(`Queue: System: ${job?.name ?? '?'}: ${err.message}`, {
+ Sentry.captureMessage(`Queue: System: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
+ .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -238,15 +251,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) });
+ logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
if (config.sentryForBackend) {
- Sentry.captureMessage(`Queue: DB: ${job?.name ?? '?'}: ${err.message}`, {
+ Sentry.captureMessage(`Queue: DB: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
+ .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -278,15 +291,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
+ logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
if (config.sentryForBackend) {
- Sentry.captureMessage(`Queue: Deliver: ${err.message}`, {
+ Sentry.captureMessage(`Queue: Deliver: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
+ .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -318,15 +331,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.stack}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, e: renderError(err) });
+ logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job: renderJob(job), e: renderError(err) });
if (config.sentryForBackend) {
- Sentry.captureMessage(`Queue: Inbox: ${err.message}`, {
+ Sentry.captureMessage(`Queue: Inbox: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
+ .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -358,15 +371,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
+ logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
if (config.sentryForBackend) {
- Sentry.captureMessage(`Queue: UserWebhookDeliver: ${err.message}`, {
+ Sentry.captureMessage(`Queue: UserWebhookDeliver: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
+ .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -398,15 +411,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
+ logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
if (config.sentryForBackend) {
- Sentry.captureMessage(`Queue: SystemWebhookDeliver: ${err.message}`, {
+ Sentry.captureMessage(`Queue: SystemWebhookDeliver: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
+ .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -445,15 +458,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) });
+ logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
if (config.sentryForBackend) {
- Sentry.captureMessage(`Queue: Relationship: ${job?.name ?? '?'}: ${err.message}`, {
+ Sentry.captureMessage(`Queue: Relationship: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
+ .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -486,15 +499,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) });
+ logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
if (config.sentryForBackend) {
- Sentry.captureMessage(`Queue: ObjectStorage: ${job?.name ?? '?'}: ${err.message}`, {
+ Sentry.captureMessage(`Queue: ObjectStorage: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
+ .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion