summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2023-05-29 11:54:49 +0900
committerGitHub <noreply@github.com>2023-05-29 11:54:49 +0900
commitfd7b77c542b51313d8b8ea60124725fe65a295d5 (patch)
tree78893fdfe273496831124414789376b1e2a18997 /packages/backend/src/queue
parentpnpm devでCtrl+Cで終了させてもプロセスが完全に殺せないの... (diff)
downloadsharkey-fd7b77c542b51313d8b8ea60124725fe65a295d5.tar.gz
sharkey-fd7b77c542b51313d8b8ea60124725fe65a295d5.tar.bz2
sharkey-fd7b77c542b51313d8b8ea60124725fe65a295d5.zip
enhance(backend): migrate bull to bullmq (#10910)
* wip * wip * Update QueueService.ts * wip * refactor * :v: * fix * Update QueueStatsService.ts * refactor * Update ApNoteService.ts * Update mock-resolver.ts * refactor * Update mock-resolver.ts
Diffstat (limited to 'packages/backend/src/queue')
-rw-r--r--packages/backend/src/queue/QueueProcessorService.ts304
-rw-r--r--packages/backend/src/queue/const.ts26
-rw-r--r--packages/backend/src/queue/get-job-info.ts15
-rw-r--r--packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts6
-rw-r--r--packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts5
-rw-r--r--packages/backend/src/queue/processors/CleanChartsProcessorService.ts5
-rw-r--r--packages/backend/src/queue/processors/CleanProcessorService.ts5
-rw-r--r--packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts11
-rw-r--r--packages/backend/src/queue/processors/DeleteAccountProcessorService.ts4
-rw-r--r--packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts12
-rw-r--r--packages/backend/src/queue/processors/DeleteFileProcessorService.ts4
-rw-r--r--packages/backend/src/queue/processors/DeliverProcessorService.ts10
-rw-r--r--packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts7
-rw-r--r--packages/backend/src/queue/processors/ExportAntennasProcessorService.ts6
-rw-r--r--packages/backend/src/queue/processors/ExportBlockingProcessorService.ts13
-rw-r--r--packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts39
-rw-r--r--packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts11
-rw-r--r--packages/backend/src/queue/processors/ExportFollowingProcessorService.ts9
-rw-r--r--packages/backend/src/queue/processors/ExportMutingProcessorService.ts13
-rw-r--r--packages/backend/src/queue/processors/ExportNotesProcessorService.ts11
-rw-r--r--packages/backend/src/queue/processors/ExportUserListsProcessorService.ts9
-rw-r--r--packages/backend/src/queue/processors/ImportAntennasProcessorService.ts6
-rw-r--r--packages/backend/src/queue/processors/ImportBlockingProcessorService.ts13
-rw-r--r--packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts6
-rw-r--r--packages/backend/src/queue/processors/ImportFollowingProcessorService.ts13
-rw-r--r--packages/backend/src/queue/processors/ImportMutingProcessorService.ts11
-rw-r--r--packages/backend/src/queue/processors/ImportUserListsProcessorService.ts7
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts38
-rw-r--r--packages/backend/src/queue/processors/RelationshipProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ResyncChartsProcessorService.ts5
-rw-r--r--packages/backend/src/queue/processors/TickChartsProcessorService.ts5
-rw-r--r--packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts6
32 files changed, 330 insertions, 307 deletions
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts
index dc025f9889..011082cd36 100644
--- a/packages/backend/src/queue/QueueProcessorService.ts
+++ b/packages/backend/src/queue/QueueProcessorService.ts
@@ -1,10 +1,9 @@
import { Inject, Injectable } from '@nestjs/common';
+import * as Bull from 'bullmq';
import type { Config } from '@/config.js';
import { DI } from '@/di-symbols.js';
import type Logger from '@/logger.js';
-import { QueueService } from '@/core/QueueService.js';
import { bindThis } from '@/decorators.js';
-import { getJobInfo } from './get-job-info.js';
import { WebhookDeliverProcessorService } from './processors/WebhookDeliverProcessorService.js';
import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
import { DeliverProcessorService } from './processors/DeliverProcessorService.js';
@@ -35,6 +34,33 @@ import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMu
import { CleanProcessorService } from './processors/CleanProcessorService.js';
import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
import { QueueLoggerService } from './QueueLoggerService.js';
+import { QUEUE, baseQueueOptions } from './const.js';
+
+// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
+function httpRelatedBackoff(attemptsMade: number) {
+ const baseDelay = 60 * 1000; // 1min
+ const maxBackoff = 8 * 60 * 60 * 1000; // 8hours
+ let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay;
+ backoff = Math.min(backoff, maxBackoff);
+ backoff += Math.round(backoff * Math.random() * 0.2);
+ return backoff;
+}
+
+function getJobInfo(job: Bull.Job | undefined, increment = false): string {
+ if (job == null) return '-';
+
+ const age = Date.now() - job.timestamp;
+
+ const formated = age > 60000 ? `${Math.floor(age / 1000 / 60)}m`
+ : age > 10000 ? `${Math.floor(age / 1000)}s`
+ : `${age}ms`;
+
+ // onActiveとかonCompletedのattemptsMadeがなぜか0始まりなのでインクリメントする
+ const currentAttempts = job.attemptsMade + (increment ? 1 : 0);
+ const maxAttempts = job.opts ? job.opts.attempts : 0;
+
+ return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`;
+}
@Injectable()
export class QueueProcessorService {
@@ -45,7 +71,6 @@ export class QueueProcessorService {
private config: Config,
private queueLoggerService: QueueLoggerService,
- private queueService: QueueService,
private webhookDeliverProcessorService: WebhookDeliverProcessorService,
private endedPollNotificationProcessorService: EndedPollNotificationProcessorService,
private deliverProcessorService: DeliverProcessorService,
@@ -97,146 +122,191 @@ export class QueueProcessorService {
}
}
+ //#region system
+ const systemQueueWorker = new Bull.Worker(QUEUE.SYSTEM, (job) => {
+ switch (job.name) {
+ case 'tickCharts': return this.tickChartsProcessorService.process();
+ case 'resyncCharts': return this.resyncChartsProcessorService.process();
+ case 'cleanCharts': return this.cleanChartsProcessorService.process();
+ case 'aggregateRetention': return this.aggregateRetentionProcessorService.process();
+ case 'checkExpiredMutings': return this.checkExpiredMutingsProcessorService.process();
+ case 'clean': return this.cleanProcessorService.process();
+ default: throw new Error(`unrecognized job type ${job.name} for system`);
+ }
+ }, {
+ ...baseQueueOptions(this.config, QUEUE.SYSTEM),
+ });
+
const systemLogger = this.logger.createSubLogger('system');
- const deliverLogger = this.logger.createSubLogger('deliver');
- const webhookLogger = this.logger.createSubLogger('webhook');
- const inboxLogger = this.logger.createSubLogger('inbox');
- const dbLogger = this.logger.createSubLogger('db');
- const relationshipLogger = this.logger.createSubLogger('relationship');
- const objectStorageLogger = this.logger.createSubLogger('objectStorage');
- this.queueService.systemQueue
- .on('waiting', (jobId) => systemLogger.debug(`waiting id=${jobId}`))
+ systemQueueWorker
.on('active', (job) => systemLogger.debug(`active id=${job.id}`))
.on('completed', (job, result) => systemLogger.debug(`completed(${result}) id=${job.id}`))
- .on('failed', (job, err) => systemLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
- .on('error', (job: any, err: Error) => systemLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => systemLogger.warn(`stalled id=${job.id}`));
+ .on('failed', (job, err) => systemLogger.warn(`failed(${err}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }))
+ .on('error', (err: Error) => systemLogger.error(`error ${err}`, { e: renderError(err) }))
+ .on('stalled', (jobId) => systemLogger.warn(`stalled id=${jobId}`));
+ //#endregion
- this.queueService.deliverQueue
- .on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`))
- .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
- .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
- .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`))
- .on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
+ //#region db
+ const dbQueueWorker = new Bull.Worker(QUEUE.DB, (job) => {
+ switch (job.name) {
+ case 'deleteDriveFiles': return this.deleteDriveFilesProcessorService.process(job);
+ case 'exportCustomEmojis': return this.exportCustomEmojisProcessorService.process(job);
+ case 'exportNotes': return this.exportNotesProcessorService.process(job);
+ case 'exportFavorites': return this.exportFavoritesProcessorService.process(job);
+ case 'exportFollowing': return this.exportFollowingProcessorService.process(job);
+ case 'exportMuting': return this.exportMutingProcessorService.process(job);
+ case 'exportBlocking': return this.exportBlockingProcessorService.process(job);
+ case 'exportUserLists': return this.exportUserListsProcessorService.process(job);
+ case 'exportAntennas': return this.exportAntennasProcessorService.process(job);
+ case 'importFollowing': return this.importFollowingProcessorService.process(job);
+ case 'importFollowingToDb': return this.importFollowingProcessorService.processDb(job);
+ case 'importMuting': return this.importMutingProcessorService.process(job);
+ case 'importBlocking': return this.importBlockingProcessorService.process(job);
+ case 'importBlockingToDb': return this.importBlockingProcessorService.processDb(job);
+ case 'importUserLists': return this.importUserListsProcessorService.process(job);
+ case 'importCustomEmojis': return this.importCustomEmojisProcessorService.process(job);
+ case 'importAntennas': return this.importAntennasProcessorService.process(job);
+ case 'deleteAccount': return this.deleteAccountProcessorService.process(job);
+ default: throw new Error(`unrecognized job type ${job.name} for db`);
+ }
+ }, {
+ ...baseQueueOptions(this.config, QUEUE.DB),
+ });
- this.queueService.inboxQueue
- .on('waiting', (jobId) => inboxLogger.debug(`waiting id=${jobId}`))
- .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`))
- .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
- .on('failed', (job, err) => inboxLogger.warn(`failed(${err}) ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`, { job, e: renderError(err) }))
- .on('error', (job: any, err: Error) => inboxLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => inboxLogger.warn(`stalled ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`));
+ const dbLogger = this.logger.createSubLogger('db');
- this.queueService.dbQueue
- .on('waiting', (jobId) => dbLogger.debug(`waiting id=${jobId}`))
+ dbQueueWorker
.on('active', (job) => dbLogger.debug(`active id=${job.id}`))
.on('completed', (job, result) => dbLogger.debug(`completed(${result}) id=${job.id}`))
- .on('failed', (job, err) => dbLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
- .on('error', (job: any, err: Error) => dbLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => dbLogger.warn(`stalled id=${job.id}`));
+ .on('failed', (job, err) => dbLogger.warn(`failed(${err}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }))
+ .on('error', (err: Error) => dbLogger.error(`error ${err}`, { e: renderError(err) }))
+ .on('stalled', (jobId) => dbLogger.warn(`stalled id=${jobId}`));
+ //#endregion
- this.queueService.relationshipQueue
- .on('waiting', (jobId) => relationshipLogger.debug(`waiting id=${jobId}`))
- .on('active', (job) => relationshipLogger.debug(`active id=${job.id}`))
- .on('completed', (job, result) => relationshipLogger.debug(`completed(${result}) id=${job.id}`))
- .on('failed', (job, err) => relationshipLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
- .on('error', (job: any, err: Error) => relationshipLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => relationshipLogger.warn(`stalled id=${job.id}`));
+ //#region deliver
+ const deliverQueueWorker = new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), {
+ ...baseQueueOptions(this.config, QUEUE.DELIVER),
+ concurrency: this.config.deliverJobConcurrency ?? 128,
+ limiter: {
+ max: this.config.deliverJobPerSec ?? 128,
+ duration: 1000,
+ },
+ settings: {
+ backoffStrategy: httpRelatedBackoff,
+ },
+ });
- this.queueService.objectStorageQueue
- .on('waiting', (jobId) => objectStorageLogger.debug(`waiting id=${jobId}`))
- .on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`))
- .on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`))
- .on('failed', (job, err) => objectStorageLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
- .on('error', (job: any, err: Error) => objectStorageLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => objectStorageLogger.warn(`stalled id=${job.id}`));
+ const deliverLogger = this.logger.createSubLogger('deliver');
- this.queueService.webhookDeliverQueue
- .on('waiting', (jobId) => webhookLogger.debug(`waiting id=${jobId}`))
- .on('active', (job) => webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
- .on('completed', (job, result) => webhookLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
- .on('failed', (job, err) => webhookLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`))
- .on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
+ deliverQueueWorker
+ .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`))
+ .on('error', (err: Error) => deliverLogger.error(`error ${err}`, { e: renderError(err) }))
+ .on('stalled', (jobId) => deliverLogger.warn(`stalled id=${jobId}`));
+ //#endregion
- this.queueService.systemQueue.add('tickCharts', {
- }, {
- repeat: { cron: '55 * * * *' },
- removeOnComplete: true,
+ //#region inbox
+ const inboxQueueWorker = new Bull.Worker(QUEUE.INBOX, (job) => this.inboxProcessorService.process(job), {
+ ...baseQueueOptions(this.config, QUEUE.INBOX),
+ concurrency: this.config.inboxJobConcurrency ?? 16,
+ limiter: {
+ max: this.config.inboxJobPerSec ?? 16,
+ duration: 1000,
+ },
+ settings: {
+ backoffStrategy: httpRelatedBackoff,
+ },
});
- this.queueService.systemQueue.add('resyncCharts', {
- }, {
- repeat: { cron: '0 0 * * *' },
- removeOnComplete: true,
- });
+ const inboxLogger = this.logger.createSubLogger('inbox');
- this.queueService.systemQueue.add('cleanCharts', {
- }, {
- repeat: { cron: '0 0 * * *' },
- removeOnComplete: true,
- });
+ inboxQueueWorker
+ .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`))
+ .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
+ .on('failed', (job, err) => inboxLogger.warn(`failed(${err}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, e: renderError(err) }))
+ .on('error', (err: Error) => inboxLogger.error(`error ${err}`, { e: renderError(err) }))
+ .on('stalled', (jobId) => inboxLogger.warn(`stalled id=${jobId}`));
+ //#endregion
- this.queueService.systemQueue.add('aggregateRetention', {
- }, {
- repeat: { cron: '0 0 * * *' },
- removeOnComplete: true,
+ //#region webhook deliver
+ const webhookDeliverQueueWorker = new Bull.Worker(QUEUE.WEBHOOK_DELIVER, (job) => this.webhookDeliverProcessorService.process(job), {
+ ...baseQueueOptions(this.config, QUEUE.WEBHOOK_DELIVER),
+ concurrency: 64,
+ limiter: {
+ max: 64,
+ duration: 1000,
+ },
+ settings: {
+ backoffStrategy: httpRelatedBackoff,
+ },
});
- this.queueService.systemQueue.add('clean', {
+ const webhookLogger = this.logger.createSubLogger('webhook');
+
+ webhookDeliverQueueWorker
+ .on('active', (job) => webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('completed', (job, result) => webhookLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('failed', (job, err) => webhookLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`))
+ .on('error', (err: Error) => webhookLogger.error(`error ${err}`, { e: renderError(err) }))
+ .on('stalled', (jobId) => webhookLogger.warn(`stalled id=${jobId}`));
+ //#endregion
+
+ //#region relationship
+ const relationshipQueueWorker = new Bull.Worker(QUEUE.RELATIONSHIP, (job) => {
+ switch (job.name) {
+ case 'follow': return this.relationshipProcessorService.processFollow(job);
+ case 'unfollow': return this.relationshipProcessorService.processUnfollow(job);
+ case 'block': return this.relationshipProcessorService.processBlock(job);
+ case 'unblock': return this.relationshipProcessorService.processUnblock(job);
+ default: throw new Error(`unrecognized job type ${job.name} for relationship`);
+ }
}, {
- repeat: { cron: '0 0 * * *' },
- removeOnComplete: true,
+ ...baseQueueOptions(this.config, QUEUE.RELATIONSHIP),
+ concurrency: this.config.relashionshipJobConcurrency ?? 16,
+ limiter: {
+ max: this.config.relashionshipJobPerSec ?? 64,
+ duration: 1000,
+ },
});
- this.queueService.systemQueue.add('checkExpiredMutings', {
+ const relationshipLogger = this.logger.createSubLogger('relationship');
+
+ relationshipQueueWorker
+ .on('active', (job) => relationshipLogger.debug(`active id=${job.id}`))
+ .on('completed', (job, result) => relationshipLogger.debug(`completed(${result}) id=${job.id}`))
+ .on('failed', (job, err) => relationshipLogger.warn(`failed(${err}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }))
+ .on('error', (err: Error) => relationshipLogger.error(`error ${err}`, { e: renderError(err) }))
+ .on('stalled', (jobId) => relationshipLogger.warn(`stalled id=${jobId}`));
+ //#endregion
+
+ //#region object storage
+ const objectStorageQueueWorker = new Bull.Worker(QUEUE.OBJECT_STORAGE, (job) => {
+ switch (job.name) {
+ case 'deleteFile': return this.deleteFileProcessorService.process(job);
+ case 'cleanRemoteFiles': return this.cleanRemoteFilesProcessorService.process(job);
+ default: throw new Error(`unrecognized job type ${job.name} for objectStorage`);
+ }
}, {
- repeat: { cron: '*/5 * * * *' },
- removeOnComplete: true,
+ ...baseQueueOptions(this.config, QUEUE.OBJECT_STORAGE),
+ concurrency: 16,
});
- this.queueService.deliverQueue.process(this.config.deliverJobConcurrency ?? 128, (job) => this.deliverProcessorService.process(job));
- this.queueService.inboxQueue.process(this.config.inboxJobConcurrency ?? 16, (job) => this.inboxProcessorService.process(job));
- this.queueService.endedPollNotificationQueue.process((job, done) => this.endedPollNotificationProcessorService.process(job, done));
- this.queueService.webhookDeliverQueue.process(64, (job) => this.webhookDeliverProcessorService.process(job));
-
- this.queueService.dbQueue.process('deleteDriveFiles', (job, done) => this.deleteDriveFilesProcessorService.process(job, done));
- this.queueService.dbQueue.process('exportCustomEmojis', (job, done) => this.exportCustomEmojisProcessorService.process(job, done));
- this.queueService.dbQueue.process('exportNotes', (job, done) => this.exportNotesProcessorService.process(job, done));
- this.queueService.dbQueue.process('exportFavorites', (job, done) => this.exportFavoritesProcessorService.process(job, done));
- this.queueService.dbQueue.process('exportFollowing', (job, done) => this.exportFollowingProcessorService.process(job, done));
- this.queueService.dbQueue.process('exportMuting', (job, done) => this.exportMutingProcessorService.process(job, done));
- this.queueService.dbQueue.process('exportBlocking', (job, done) => this.exportBlockingProcessorService.process(job, done));
- this.queueService.dbQueue.process('exportUserLists', (job, done) => this.exportUserListsProcessorService.process(job, done));
- this.queueService.dbQueue.process('exportAntennas', (job, done) => this.exportAntennasProcessorService.process(job, done));
- this.queueService.dbQueue.process('importFollowing', (job, done) => this.importFollowingProcessorService.process(job, done));
- this.queueService.dbQueue.process('importFollowingToDb', (job) => this.importFollowingProcessorService.processDb(job));
- this.queueService.dbQueue.process('importMuting', (job, done) => this.importMutingProcessorService.process(job, done));
- this.queueService.dbQueue.process('importBlocking', (job, done) => this.importBlockingProcessorService.process(job, done));
- this.queueService.dbQueue.process('importBlockingToDb', (job) => this.importBlockingProcessorService.processDb(job));
- this.queueService.dbQueue.process('importUserLists', (job, done) => this.importUserListsProcessorService.process(job, done));
- this.queueService.dbQueue.process('importCustomEmojis', (job, done) => this.importCustomEmojisProcessorService.process(job, done));
- this.queueService.dbQueue.process('importAntennas', (job, done) => this.importAntennasProcessorService.process(job, done));
- this.queueService.dbQueue.process('deleteAccount', (job) => this.deleteAccountProcessorService.process(job));
+ const objectStorageLogger = this.logger.createSubLogger('objectStorage');
- this.queueService.objectStorageQueue.process('deleteFile', 16, (job) => this.deleteFileProcessorService.process(job));
- this.queueService.objectStorageQueue.process('cleanRemoteFiles', 16, (job, done) => this.cleanRemoteFilesProcessorService.process(job, done));
-
- {
- const maxJobs = this.config.relashionshipJobConcurrency ?? 16;
- this.queueService.relationshipQueue.process('follow', maxJobs, (job) => this.relationshipProcessorService.processFollow(job));
- this.queueService.relationshipQueue.process('unfollow', maxJobs, (job) => this.relationshipProcessorService.processUnfollow(job));
- this.queueService.relationshipQueue.process('block', maxJobs, (job) => this.relationshipProcessorService.processBlock(job));
- this.queueService.relationshipQueue.process('unblock', maxJobs, (job) => this.relationshipProcessorService.processUnblock(job));
- }
+ objectStorageQueueWorker
+ .on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`))
+ .on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`))
+ .on('failed', (job, err) => objectStorageLogger.warn(`failed(${err}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }))
+ .on('error', (err: Error) => objectStorageLogger.error(`error ${err}`, { e: renderError(err) }))
+ .on('stalled', (jobId) => objectStorageLogger.warn(`stalled id=${jobId}`));
+ //#endregion
- this.queueService.systemQueue.process('tickCharts', (job, done) => this.tickChartsProcessorService.process(job, done));
- this.queueService.systemQueue.process('resyncCharts', (job, done) => this.resyncChartsProcessorService.process(job, done));
- this.queueService.systemQueue.process('cleanCharts', (job, done) => this.cleanChartsProcessorService.process(job, done));
- this.queueService.systemQueue.process('aggregateRetention', (job, done) => this.aggregateRetentionProcessorService.process(job, done));
- this.queueService.systemQueue.process('checkExpiredMutings', (job, done) => this.checkExpiredMutingsProcessorService.process(job, done));
- this.queueService.systemQueue.process('clean', (job, done) => this.cleanProcessorService.process(job, done));
+ //#region ended poll notification
+ const endedPollNotificationWorker = new Bull.Worker(QUEUE.ENDED_POLL_NOTIFICATION, (job) => this.endedPollNotificationProcessorService.process(job), {
+ ...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION),
+ });
+ //#endregion
}
}
diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts
new file mode 100644
index 0000000000..d240fe70e0
--- /dev/null
+++ b/packages/backend/src/queue/const.ts
@@ -0,0 +1,26 @@
+import { Config } from '@/config.js';
+import type * as Bull from 'bullmq';
+
+export const QUEUE = {
+ DELIVER: 'deliver',
+ INBOX: 'inbox',
+ SYSTEM: 'system',
+ ENDED_POLL_NOTIFICATION: 'endedPollNotification',
+ DB: 'db',
+ RELATIONSHIP: 'relationship',
+ OBJECT_STORAGE: 'objectStorage',
+ WEBHOOK_DELIVER: 'webhookDeliver',
+};
+
+export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions {
+ return {
+ connection: {
+ port: config.redisForJobQueue.port,
+ host: config.redisForJobQueue.host,
+ family: config.redisForJobQueue.family == null ? 0 : config.redisForJobQueue.family,
+ password: config.redisForJobQueue.pass,
+ db: config.redisForJobQueue.db ?? 0,
+ },
+ prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue:${queueName}` : `queue:${queueName}`,
+ };
+}
diff --git a/packages/backend/src/queue/get-job-info.ts b/packages/backend/src/queue/get-job-info.ts
deleted file mode 100644
index d33e349c36..0000000000
--- a/packages/backend/src/queue/get-job-info.ts
+++ /dev/null
@@ -1,15 +0,0 @@
-import Bull from 'bull';
-
-export function getJobInfo(job: Bull.Job, increment = false) {
- const age = Date.now() - job.timestamp;
-
- const formated = age > 60000 ? `${Math.floor(age / 1000 / 60)}m`
- : age > 10000 ? `${Math.floor(age / 1000)}s`
- : `${age}ms`;
-
- // onActiveとかonCompletedのattemptsMadeがなぜか0始まりなのでインクリメントする
- const currentAttempts = job.attemptsMade + (increment ? 1 : 0);
- const maxAttempts = job.opts ? job.opts.attempts : 0;
-
- return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`;
-}
diff --git a/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts
index e2720b4fe0..600ce0828f 100644
--- a/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts
+++ b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts
@@ -9,7 +9,7 @@ import { deepClone } from '@/misc/clone.js';
import { IdService } from '@/core/IdService.js';
import { isDuplicateKeyValueError } from '@/misc/is-duplicate-key-value-error.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
@Injectable()
export class AggregateRetentionProcessorService {
@@ -32,7 +32,7 @@ export class AggregateRetentionProcessorService {
}
@bindThis
- public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> {
+ public async process(): Promise<void> {
this.logger.info('Aggregating retention...');
const now = new Date();
@@ -62,7 +62,6 @@ export class AggregateRetentionProcessorService {
} catch (err) {
if (isDuplicateKeyValueError(err)) {
this.logger.succ('Skip because it has already been processed by another worker.');
- done();
return;
}
throw err;
@@ -88,6 +87,5 @@ export class AggregateRetentionProcessorService {
}
this.logger.succ('Retention aggregated.');
- done();
}
}
diff --git a/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts b/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts
index 2476d71a5e..c4ee212bab 100644
--- a/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts
+++ b/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts
@@ -7,7 +7,7 @@ import type Logger from '@/logger.js';
import { bindThis } from '@/decorators.js';
import { UserMutingService } from '@/core/UserMutingService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
@Injectable()
export class CheckExpiredMutingsProcessorService {
@@ -27,7 +27,7 @@ export class CheckExpiredMutingsProcessorService {
}
@bindThis
- public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> {
+ public async process(): Promise<void> {
this.logger.info('Checking expired mutings...');
const expired = await this.mutingsRepository.createQueryBuilder('muting')
@@ -41,6 +41,5 @@ export class CheckExpiredMutingsProcessorService {
}
this.logger.succ('All expired mutings checked.');
- done();
}
}
diff --git a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts
index b458167042..22d7c1b4fb 100644
--- a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts
+++ b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts
@@ -16,7 +16,7 @@ import PerUserDriveChart from '@/core/chart/charts/per-user-drive.js';
import ApRequestChart from '@/core/chart/charts/ap-request.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
@Injectable()
export class CleanChartsProcessorService {
@@ -45,7 +45,7 @@ export class CleanChartsProcessorService {
}
@bindThis
- public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> {
+ public async process(): Promise<void> {
this.logger.info('Clean charts...');
await Promise.all([
@@ -64,6 +64,5 @@ export class CleanChartsProcessorService {
]);
this.logger.succ('All charts successfully cleaned.');
- done();
}
}
diff --git a/packages/backend/src/queue/processors/CleanProcessorService.ts b/packages/backend/src/queue/processors/CleanProcessorService.ts
index 1936e8df23..cefa6da5e9 100644
--- a/packages/backend/src/queue/processors/CleanProcessorService.ts
+++ b/packages/backend/src/queue/processors/CleanProcessorService.ts
@@ -7,7 +7,7 @@ import type Logger from '@/logger.js';
import { bindThis } from '@/decorators.js';
import { IdService } from '@/core/IdService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
@Injectable()
export class CleanProcessorService {
@@ -36,7 +36,7 @@ export class CleanProcessorService {
}
@bindThis
- public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> {
+ public async process(): Promise<void> {
this.logger.info('Cleaning...');
this.userIpsRepository.delete({
@@ -72,6 +72,5 @@ export class CleanProcessorService {
}
this.logger.succ('Cleaned.');
- done();
}
}
diff --git a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
index 5a33c27188..c54bf59ae4 100644
--- a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
+++ b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
@@ -5,9 +5,9 @@ import type { DriveFilesRepository } from '@/models/index.js';
import type { Config } from '@/config.js';
import type Logger from '@/logger.js';
import { DriveService } from '@/core/DriveService.js';
-import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
import { bindThis } from '@/decorators.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type * as Bull from 'bullmq';
@Injectable()
export class CleanRemoteFilesProcessorService {
@@ -27,7 +27,7 @@ export class CleanRemoteFilesProcessorService {
}
@bindThis
- public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> {
+ public async process(job: Bull.Job<Record<string, unknown>>): Promise<void> {
this.logger.info('Deleting cached remote files...');
let deletedCount = 0;
@@ -47,7 +47,7 @@ export class CleanRemoteFilesProcessorService {
});
if (files.length === 0) {
- job.progress(100);
+ job.updateProgress(100);
break;
}
@@ -62,10 +62,9 @@ export class CleanRemoteFilesProcessorService {
isLink: false,
});
- job.progress(deletedCount / total);
+ job.updateProgress(deletedCount / total);
}
this.logger.succ('All cached remote files has been deleted.');
- done();
}
}
diff --git a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
index e36a78de6a..39dd801af0 100644
--- a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
@@ -8,10 +8,10 @@ import { DriveService } from '@/core/DriveService.js';
import type { DriveFile } from '@/models/entities/DriveFile.js';
import type { Note } from '@/models/entities/Note.js';
import { EmailService } from '@/core/EmailService.js';
+import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
import type { DbUserDeleteJobData } from '../types.js';
-import { bindThis } from '@/decorators.js';
@Injectable()
export class DeleteAccountProcessorService {
diff --git a/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts b/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts
index 604497cf54..6772c5dc76 100644
--- a/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts
@@ -5,10 +5,10 @@ import type { UsersRepository, DriveFilesRepository } from '@/models/index.js';
import type { Config } from '@/config.js';
import type Logger from '@/logger.js';
import { DriveService } from '@/core/DriveService.js';
+import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
import type { DbJobDataWithUser } from '../types.js';
-import { bindThis } from '@/decorators.js';
@Injectable()
export class DeleteDriveFilesProcessorService {
@@ -31,12 +31,11 @@ export class DeleteDriveFilesProcessorService {
}
@bindThis
- public async process(job: Bull.Job<DbJobDataWithUser>, done: () => void): Promise<void> {
+ public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> {
this.logger.info(`Deleting drive files of ${job.data.user.id} ...`);
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) {
- done();
return;
}
@@ -56,7 +55,7 @@ export class DeleteDriveFilesProcessorService {
});
if (files.length === 0) {
- job.progress(100);
+ job.updateProgress(100);
break;
}
@@ -71,10 +70,9 @@ export class DeleteDriveFilesProcessorService {
userId: user.id,
});
- job.progress(deletedCount / total);
+ job.updateProgress(deletedCount / total);
}
this.logger.succ(`All drive files (${deletedCount}) of ${user.id} has been deleted.`);
- done();
}
}
diff --git a/packages/backend/src/queue/processors/DeleteFileProcessorService.ts b/packages/backend/src/queue/processors/DeleteFileProcessorService.ts
index 2fb2f56f8d..edf87bd921 100644
--- a/packages/backend/src/queue/processors/DeleteFileProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeleteFileProcessorService.ts
@@ -3,10 +3,10 @@ import { DI } from '@/di-symbols.js';
import type { Config } from '@/config.js';
import type Logger from '@/logger.js';
import { DriveService } from '@/core/DriveService.js';
+import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
import type { ObjectStorageFileJobData } from '../types.js';
-import { bindThis } from '@/decorators.js';
@Injectable()
export class DeleteFileProcessorService {
diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts
index f293bd4d7e..406e9df850 100644
--- a/packages/backend/src/queue/processors/DeliverProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts
@@ -1,4 +1,5 @@
import { Inject, Injectable } from '@nestjs/common';
+import * as Bull from 'bullmq';
import { DI } from '@/di-symbols.js';
import type { DriveFilesRepository, InstancesRepository } from '@/models/index.js';
import type { Config } from '@/config.js';
@@ -16,7 +17,6 @@ import { StatusError } from '@/misc/status-error.js';
import { UtilityService } from '@/core/UtilityService.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
import type { DeliverJobData } from '../types.js';
@Injectable()
@@ -121,15 +121,13 @@ export class DeliverProcessorService {
isSuspended: true,
});
});
- return `${host} is gone`;
+ throw new Bull.UnrecoverableError(`${host} is gone`);
}
- // HTTPステータスコード4xxはクライアントエラーであり、それはつまり
- // 何回再送しても成功することはないということなのでエラーにはしないでおく
- return `${res.statusCode} ${res.statusMessage}`;
+ throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`);
}
// 5xx etc.
- throw `${res.statusCode} ${res.statusMessage}`;
+ throw new Error(`${res.statusCode} ${res.statusMessage}`);
} else {
// DNS error, socket error, timeout ...
throw res;
diff --git a/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts b/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts
index 501ed4090a..21501592f2 100644
--- a/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts
+++ b/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts
@@ -6,7 +6,7 @@ import type Logger from '@/logger.js';
import { NotificationService } from '@/core/NotificationService.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
import type { EndedPollNotificationJobData } from '../types.js';
@Injectable()
@@ -30,10 +30,9 @@ export class EndedPollNotificationProcessorService {
}
@bindThis
- public async process(job: Bull.Job<EndedPollNotificationJobData>, done: () => void): Promise<void> {
+ public async process(job: Bull.Job<EndedPollNotificationJobData>): Promise<void> {
const note = await this.notesRepository.findOneBy({ id: job.data.noteId });
if (note == null || !note.hasPoll) {
- done();
return;
}
@@ -51,7 +50,5 @@ export class EndedPollNotificationProcessorService {
noteId: note.id,
});
}
-
- done();
}
}
diff --git a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
index 894903e79b..ac52325c8d 100644
--- a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
@@ -12,7 +12,7 @@ import { createTemp } from '@/misc/create-temp.js';
import { UtilityService } from '@/core/UtilityService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type { DBExportAntennasData } from '../types.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
@Injectable()
export class ExportAntennasProcessorService {
@@ -39,10 +39,9 @@ export class ExportAntennasProcessorService {
}
@bindThis
- public async process(job: Bull.Job<DBExportAntennasData>, done: () => void): Promise<void> {
+ public async process(job: Bull.Job<DBExportAntennasData>): Promise<void> {
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) {
- done();
return;
}
const [path, cleanup] = await createTemp();
@@ -96,7 +95,6 @@ export class ExportAntennasProcessorService {
this.logger.succ('Exported to: ' + driveFile.id);
} finally {
cleanup();
- done();
}
}
}
diff --git a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts
index c7b54070d6..eb758e162d 100644
--- a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts
@@ -9,10 +9,10 @@ import type Logger from '@/logger.js';
import { DriveService } from '@/core/DriveService.js';
import { createTemp } from '@/misc/create-temp.js';
import { UtilityService } from '@/core/UtilityService.js';
+import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
import type { DbJobDataWithUser } from '../types.js';
-import { bindThis } from '@/decorators.js';
@Injectable()
export class ExportBlockingProcessorService {
@@ -36,12 +36,11 @@ export class ExportBlockingProcessorService {
}
@bindThis
- public async process(job: Bull.Job<DbJobDataWithUser>, done: () => void): Promise<void> {
+ public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> {
this.logger.info(`Exporting blocking of ${job.data.user.id} ...`);
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) {
- done();
return;
}
@@ -69,7 +68,7 @@ export class ExportBlockingProcessorService {
});
if (blockings.length === 0) {
- job.progress(100);
+ job.updateProgress(100);
break;
}
@@ -99,7 +98,7 @@ export class ExportBlockingProcessorService {
blockerId: user.id,
});
- job.progress(exportedCount / total);
+ job.updateProgress(exportedCount / total);
}
stream.end();
@@ -112,7 +111,5 @@ export class ExportBlockingProcessorService {
} finally {
cleanup();
}
-
- done();
}
}
diff --git a/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts
index b50f373ef8..3203d9f3e5 100644
--- a/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts
@@ -13,7 +13,7 @@ import { createTemp, createTempDir } from '@/misc/create-temp.js';
import { DownloadService } from '@/core/DownloadService.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
@Injectable()
export class ExportCustomEmojisProcessorService {
@@ -37,12 +37,11 @@ export class ExportCustomEmojisProcessorService {
}
@bindThis
- public async process(job: Bull.Job, done: () => void): Promise<void> {
+ public async process(job: Bull.Job): Promise<void> {
this.logger.info('Exporting custom emojis ...');
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) {
- done();
return;
}
@@ -117,24 +116,26 @@ export class ExportCustomEmojisProcessorService {
metaStream.end();
// Create archive
- const [archivePath, archiveCleanup] = await createTemp();
- const archiveStream = fs.createWriteStream(archivePath);
- const archive = archiver('zip', {
- zlib: { level: 0 },
- });
- archiveStream.on('close', async () => {
- this.logger.succ(`Exported to: ${archivePath}`);
+ await new Promise<void>(async (resolve) => {
+ const [archivePath, archiveCleanup] = await createTemp();
+ const archiveStream = fs.createWriteStream(archivePath);
+ const archive = archiver('zip', {
+ zlib: { level: 0 },
+ });
+ archiveStream.on('close', async () => {
+ this.logger.succ(`Exported to: ${archivePath}`);
- const fileName = 'custom-emojis-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.zip';
- const driveFile = await this.driveService.addFile({ user, path: archivePath, name: fileName, force: true });
+ const fileName = 'custom-emojis-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.zip';
+ const driveFile = await this.driveService.addFile({ user, path: archivePath, name: fileName, force: true });
- this.logger.succ(`Exported to: ${driveFile.id}`);
- cleanup();
- archiveCleanup();
- done();
+ this.logger.succ(`Exported to: ${driveFile.id}`);
+ cleanup();
+ archiveCleanup();
+ resolve();
+ });
+ archive.pipe(archiveStream);
+ archive.directory(path, false);
+ archive.finalize();
});
- archive.pipe(archiveStream);
- archive.directory(path, false);
- archive.finalize();
}
}
diff --git a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts
index f2f2383a88..76c38a6b86 100644
--- a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts
@@ -12,7 +12,7 @@ import type { Poll } from '@/models/entities/Poll.js';
import type { Note } from '@/models/entities/Note.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
import type { DbJobDataWithUser } from '../types.js';
@Injectable()
@@ -42,12 +42,11 @@ export class ExportFavoritesProcessorService {
}
@bindThis
- public async process(job: Bull.Job<DbJobDataWithUser>, done: () => void): Promise<void> {
+ public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> {
this.logger.info(`Exporting favorites of ${job.data.user.id} ...`);
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) {
- done();
return;
}
@@ -91,7 +90,7 @@ export class ExportFavoritesProcessorService {
}) as (NoteFavorite & { note: Note & { user: User } })[];
if (favorites.length === 0) {
- job.progress(100);
+ job.updateProgress(100);
break;
}
@@ -112,7 +111,7 @@ export class ExportFavoritesProcessorService {
userId: user.id,
});
- job.progress(exportedFavoritesCount / total);
+ job.updateProgress(exportedFavoritesCount / total);
}
await write(']');
@@ -127,8 +126,6 @@ export class ExportFavoritesProcessorService {
} finally {
cleanup();
}
-
- done();
}
}
diff --git a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts
index fa9c1ac1ea..8726cb1402 100644
--- a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts
@@ -10,10 +10,10 @@ import { DriveService } from '@/core/DriveService.js';
import { createTemp } from '@/misc/create-temp.js';
import type { Following } from '@/models/entities/Following.js';
import { UtilityService } from '@/core/UtilityService.js';
+import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
import type { DbExportFollowingData } from '../types.js';
-import { bindThis } from '@/decorators.js';
@Injectable()
export class ExportFollowingProcessorService {
@@ -40,12 +40,11 @@ export class ExportFollowingProcessorService {
}
@bindThis
- public async process(job: Bull.Job<DbExportFollowingData>, done: () => void): Promise<void> {
+ public async process(job: Bull.Job<DbExportFollowingData>): Promise<void> {
this.logger.info(`Exporting following of ${job.data.user.id} ...`);
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) {
- done();
return;
}
@@ -116,7 +115,5 @@ export class ExportFollowingProcessorService {
} finally {
cleanup();
}
-
- done();
}
}
diff --git a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts
index b14bf5f5b1..0f11a9e843 100644
--- a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts
@@ -9,10 +9,10 @@ import type Logger from '@/logger.js';
import { DriveService } from '@/core/DriveService.js';
import { createTemp } from '@/misc/create-temp.js';
import { UtilityService } from '@/core/UtilityService.js';
+import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
import type { DbJobDataWithUser } from '../types.js';
-import { bindThis } from '@/decorators.js';
@Injectable()
export class ExportMutingProcessorService {
@@ -39,12 +39,11 @@ export class ExportMutingProcessorService {
}
@bindThis
- public async process(job: Bull.Job<DbJobDataWithUser>, done: () => void): Promise<void> {
+ public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> {
this.logger.info(`Exporting muting of ${job.data.user.id} ...`);
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) {
- done();
return;
}
@@ -73,7 +72,7 @@ export class ExportMutingProcessorService {
});
if (mutes.length === 0) {
- job.progress(100);
+ job.updateProgress(100);
break;
}
@@ -103,7 +102,7 @@ export class ExportMutingProcessorService {
muterId: user.id,
});
- job.progress(exportedCount / total);
+ job.updateProgress(exportedCount / total);
}
stream.end();
@@ -116,7 +115,5 @@ export class ExportMutingProcessorService {
} finally {
cleanup();
}
-
- done();
}
}
diff --git a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
index e4f12ad101..24fb331883 100644
--- a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
@@ -12,7 +12,7 @@ import type { Poll } from '@/models/entities/Poll.js';
import type { Note } from '@/models/entities/Note.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
import type { DbJobDataWithUser } from '../types.js';
@Injectable()
@@ -39,12 +39,11 @@ export class ExportNotesProcessorService {
}
@bindThis
- public async process(job: Bull.Job<DbJobDataWithUser>, done: () => void): Promise<void> {
+ public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> {
this.logger.info(`Exporting notes of ${job.data.user.id} ...`);
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) {
- done();
return;
}
@@ -87,7 +86,7 @@ export class ExportNotesProcessorService {
}) as Note[];
if (notes.length === 0) {
- job.progress(100);
+ job.updateProgress(100);
break;
}
@@ -108,7 +107,7 @@ export class ExportNotesProcessorService {
userId: user.id,
});
- job.progress(exportedNotesCount / total);
+ job.updateProgress(exportedNotesCount / total);
}
await write(']');
@@ -123,8 +122,6 @@ export class ExportNotesProcessorService {
} finally {
cleanup();
}
-
- done();
}
}
diff --git a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts
index 54bde44044..ec63358053 100644
--- a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts
@@ -9,10 +9,10 @@ import type Logger from '@/logger.js';
import { DriveService } from '@/core/DriveService.js';
import { createTemp } from '@/misc/create-temp.js';
import { UtilityService } from '@/core/UtilityService.js';
+import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
import type { DbJobDataWithUser } from '../types.js';
-import { bindThis } from '@/decorators.js';
@Injectable()
export class ExportUserListsProcessorService {
@@ -39,12 +39,11 @@ export class ExportUserListsProcessorService {
}
@bindThis
- public async process(job: Bull.Job<DbJobDataWithUser>, done: () => void): Promise<void> {
+ public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> {
this.logger.info(`Exporting user lists of ${job.data.user.id} ...`);
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) {
- done();
return;
}
@@ -92,7 +91,5 @@ export class ExportUserListsProcessorService {
} finally {
cleanup();
}
-
- done();
}
}
diff --git a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
index d06131b8c8..575cad69d5 100644
--- a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
@@ -8,7 +8,7 @@ import { DI } from '@/di-symbols.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import { DBAntennaImportJobData } from '../types.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
const validate = new Ajv().compile({
type: 'object',
@@ -59,7 +59,7 @@ export class ImportAntennasProcessorService {
}
@bindThis
- public async process(job: Bull.Job<DBAntennaImportJobData>, done: () => void): Promise<void> {
+ public async process(job: Bull.Job<DBAntennaImportJobData>): Promise<void> {
const now = new Date();
try {
for (const antenna of job.data.antenna) {
@@ -89,8 +89,6 @@ export class ImportAntennasProcessorService {
}
} catch (err: any) {
this.logger.error(err);
- } finally {
- done();
}
}
}
diff --git a/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts
index 3f075b02d2..2f1a9e5b03 100644
--- a/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts
@@ -7,11 +7,11 @@ import * as Acct from '@/misc/acct.js';
import { RemoteUserResolveService } from '@/core/RemoteUserResolveService.js';
import { DownloadService } from '@/core/DownloadService.js';
import { UtilityService } from '@/core/UtilityService.js';
-import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
-import type { DbUserImportJobData, DbUserImportToDbJobData } from '../types.js';
import { bindThis } from '@/decorators.js';
import { QueueService } from '@/core/QueueService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type * as Bull from 'bullmq';
+import type { DbUserImportJobData, DbUserImportToDbJobData } from '../types.js';
@Injectable()
export class ImportBlockingProcessorService {
@@ -34,12 +34,11 @@ export class ImportBlockingProcessorService {
}
@bindThis
- public async process(job: Bull.Job<DbUserImportJobData>, done: () => void): Promise<void> {
+ public async process(job: Bull.Job<DbUserImportJobData>): Promise<void> {
this.logger.info(`Importing blocking of ${job.data.user.id} ...`);
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) {
- done();
return;
}
@@ -47,7 +46,6 @@ export class ImportBlockingProcessorService {
id: job.data.fileId,
});
if (file == null) {
- done();
return;
}
@@ -56,7 +54,6 @@ export class ImportBlockingProcessorService {
this.queueService.createImportBlockingToDbJob({ id: user.id }, targets);
this.logger.succ('Import jobs created');
- done();
}
@bindThis
@@ -85,7 +82,7 @@ export class ImportBlockingProcessorService {
}
if (target == null) {
- throw `Unable to resolve user: @${username}@${host}`;
+ throw new Error(`Unable to resolve user: @${username}@${host}`);
}
// skip myself
diff --git a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
index 600468a286..d862567871 100644
--- a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
@@ -12,7 +12,7 @@ import { DriveService } from '@/core/DriveService.js';
import { DownloadService } from '@/core/DownloadService.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
import type { DbUserImportJobData } from '../types.js';
// TODO: 名前衝突時の動作を選べるようにする
@@ -45,14 +45,13 @@ export class ImportCustomEmojisProcessorService {
}
@bindThis
- public async process(job: Bull.Job<DbUserImportJobData>, done: () => void): Promise<void> {
+ public async process(job: Bull.Job<DbUserImportJobData>): Promise<void> {
this.logger.info('Importing custom emojis ...');
const file = await this.driveFilesRepository.findOneBy({
id: job.data.fileId,
});
if (file == null) {
- done();
return;
}
@@ -116,7 +115,6 @@ export class ImportCustomEmojisProcessorService {
cleanup();
this.logger.succ('Imported');
- done();
});
unzipStream.pipe(extractor);
this.logger.succ(`Unzipping to ${outputPath}`);
diff --git a/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts
index aa5cf12c50..15bee9672e 100644
--- a/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts
@@ -7,11 +7,11 @@ import * as Acct from '@/misc/acct.js';
import { RemoteUserResolveService } from '@/core/RemoteUserResolveService.js';
import { DownloadService } from '@/core/DownloadService.js';
import { UtilityService } from '@/core/UtilityService.js';
-import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
-import type { DbUserImportJobData, DbUserImportToDbJobData } from '../types.js';
import { bindThis } from '@/decorators.js';
import { QueueService } from '@/core/QueueService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type * as Bull from 'bullmq';
+import type { DbUserImportJobData, DbUserImportToDbJobData } from '../types.js';
@Injectable()
export class ImportFollowingProcessorService {
@@ -34,12 +34,11 @@ export class ImportFollowingProcessorService {
}
@bindThis
- public async process(job: Bull.Job<DbUserImportJobData>, done: () => void): Promise<void> {
+ public async process(job: Bull.Job<DbUserImportJobData>): Promise<void> {
this.logger.info(`Importing following of ${job.data.user.id} ...`);
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) {
- done();
return;
}
@@ -47,7 +46,6 @@ export class ImportFollowingProcessorService {
id: job.data.fileId,
});
if (file == null) {
- done();
return;
}
@@ -56,7 +54,6 @@ export class ImportFollowingProcessorService {
this.queueService.createImportFollowingToDbJob({ id: user.id }, targets);
this.logger.succ('Import jobs created');
- done();
}
@bindThis
@@ -85,7 +82,7 @@ export class ImportFollowingProcessorService {
}
if (target == null) {
- throw `Unable to resolve user: @${username}@${host}`;
+ throw new Error(`Unable to resolve user: @${username}@${host}`);
}
// skip myself
diff --git a/packages/backend/src/queue/processors/ImportMutingProcessorService.ts b/packages/backend/src/queue/processors/ImportMutingProcessorService.ts
index 379994ee79..723935cd31 100644
--- a/packages/backend/src/queue/processors/ImportMutingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportMutingProcessorService.ts
@@ -9,10 +9,10 @@ import { RemoteUserResolveService } from '@/core/RemoteUserResolveService.js';
import { DownloadService } from '@/core/DownloadService.js';
import { UserMutingService } from '@/core/UserMutingService.js';
import { UtilityService } from '@/core/UtilityService.js';
+import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
import type { DbUserImportJobData } from '../types.js';
-import { bindThis } from '@/decorators.js';
@Injectable()
export class ImportMutingProcessorService {
@@ -38,12 +38,11 @@ export class ImportMutingProcessorService {
}
@bindThis
- public async process(job: Bull.Job<DbUserImportJobData>, done: () => void): Promise<void> {
+ public async process(job: Bull.Job<DbUserImportJobData>): Promise<void> {
this.logger.info(`Importing muting of ${job.data.user.id} ...`);
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) {
- done();
return;
}
@@ -51,7 +50,6 @@ export class ImportMutingProcessorService {
id: job.data.fileId,
});
if (file == null) {
- done();
return;
}
@@ -83,7 +81,7 @@ export class ImportMutingProcessorService {
}
if (target == null) {
- throw `cannot resolve user: @${username}@${host}`;
+ throw new Error(`cannot resolve user: @${username}@${host}`);
}
// skip myself
@@ -98,6 +96,5 @@ export class ImportMutingProcessorService {
}
this.logger.succ('Imported');
- done();
}
}
diff --git a/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts
index c423863410..824ee8157a 100644
--- a/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts
@@ -12,7 +12,7 @@ import { IdService } from '@/core/IdService.js';
import { UtilityService } from '@/core/UtilityService.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
import type { DbUserImportJobData } from '../types.js';
@Injectable()
@@ -46,12 +46,11 @@ export class ImportUserListsProcessorService {
}
@bindThis
- public async process(job: Bull.Job<DbUserImportJobData>, done: () => void): Promise<void> {
+ public async process(job: Bull.Job<DbUserImportJobData>): Promise<void> {
this.logger.info(`Importing user lists of ${job.data.user.id} ...`);
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) {
- done();
return;
}
@@ -59,7 +58,6 @@ export class ImportUserListsProcessorService {
id: job.data.fileId,
});
if (file == null) {
- done();
return;
}
@@ -109,6 +107,5 @@ export class ImportUserListsProcessorService {
}
this.logger.succ('Imported');
- done();
}
}
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
index ab8b1e9e22..ce1d7aaa1b 100644
--- a/packages/backend/src/queue/processors/InboxProcessorService.ts
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -1,8 +1,8 @@
import { URL } from 'node:url';
import { Inject, Injectable } from '@nestjs/common';
import httpSignature from '@peertube/http-signature';
+import * as Bull from 'bullmq';
import { DI } from '@/di-symbols.js';
-import type { InstancesRepository, DriveFilesRepository } from '@/models/index.js';
import type { Config } from '@/config.js';
import type Logger from '@/logger.js';
import { MetaService } from '@/core/MetaService.js';
@@ -23,10 +23,8 @@ import { LdSignatureService } from '@/core/activitypub/LdSignatureService.js';
import { ApInboxService } from '@/core/activitypub/ApInboxService.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
import type { InboxJobData } from '../types.js';
-// ユーザーのinboxにアクティビティが届いた時の処理
@Injectable()
export class InboxProcessorService {
private logger: Logger;
@@ -35,12 +33,6 @@ export class InboxProcessorService {
@Inject(DI.config)
private config: Config,
- @Inject(DI.instancesRepository)
- private instancesRepository: InstancesRepository,
-
- @Inject(DI.driveFilesRepository)
- private driveFilesRepository: DriveFilesRepository,
-
private utilityService: UtilityService,
private metaService: MetaService,
private apInboxService: ApInboxService,
@@ -93,24 +85,24 @@ export class InboxProcessorService {
try {
authUser = await this.apDbResolverService.getAuthUserFromApId(getApId(activity.actor));
} catch (err) {
- // 対象が4xxならスキップ
+ // 対象が4xxならスキップ
if (err instanceof StatusError) {
if (err.isClientError) {
- return `skip: Ignored deleted actors on both ends ${activity.actor} - ${err.statusCode}`;
+ throw new Bull.UnrecoverableError(`skip: Ignored deleted actors on both ends ${activity.actor} - ${err.statusCode}`);
}
- throw `Error in actor ${activity.actor} - ${err.statusCode ?? err}`;
+ throw new Error(`Error in actor ${activity.actor} - ${err.statusCode ?? err}`);
}
}
}
// それでもわからなければ終了
if (authUser == null) {
- return 'skip: failed to resolve user';
+ throw new Bull.UnrecoverableError('skip: failed to resolve user');
}
// publicKey がなくても終了
if (authUser.key == null) {
- return 'skip: failed to resolve user publicKey';
+ throw new Bull.UnrecoverableError('skip: failed to resolve user publicKey');
}
// HTTP-Signatureの検証
@@ -118,10 +110,10 @@ export class InboxProcessorService {
// また、signatureのsignerは、activity.actorと一致する必要がある
if (!httpSignatureValidated || authUser.user.uri !== activity.actor) {
- // 一致しなくても、でもLD-Signatureがありそうならそっちも見る
+ // 一致しなくても、でもLD-Signatureがありそうならそっちも見る
if (activity.signature) {
if (activity.signature.type !== 'RsaSignature2017') {
- return `skip: unsupported LD-signature type ${activity.signature.type}`;
+ throw new Bull.UnrecoverableError(`skip: unsupported LD-signature type ${activity.signature.type}`);
}
// activity.signature.creator: https://example.oom/users/user#main-key
@@ -134,32 +126,32 @@ export class InboxProcessorService {
// keyIdからLD-Signatureのユーザーを取得
authUser = await this.apDbResolverService.getAuthUserFromKeyId(activity.signature.creator);
if (authUser == null) {
- return 'skip: LD-Signatureのユーザーが取得できませんでした';
+ throw new Bull.UnrecoverableError('skip: LD-Signatureのユーザーが取得できませんでした');
}
if (authUser.key == null) {
- return 'skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした';
+ throw new Bull.UnrecoverableError('skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした');
}
// LD-Signature検証
const ldSignature = this.ldSignatureService.use();
const verified = await ldSignature.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false);
if (!verified) {
- return 'skip: LD-Signatureの検証に失敗しました';
+ throw new Bull.UnrecoverableError('skip: LD-Signatureの検証に失敗しました');
}
// もう一度actorチェック
if (authUser.user.uri !== activity.actor) {
- return `skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`;
+ throw new Bull.UnrecoverableError(`skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`);
}
// ブロックしてたら中断
const ldHost = this.utilityService.extractDbHost(authUser.user.uri);
if (this.utilityService.isBlockedHost(meta.blockedHosts, ldHost)) {
- return `Blocked request: ${ldHost}`;
+ throw new Bull.UnrecoverableError(`Blocked request: ${ldHost}`);
}
} else {
- return `skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`;
+ throw new Bull.UnrecoverableError(`skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`);
}
}
@@ -168,7 +160,7 @@ export class InboxProcessorService {
const signerHost = this.utilityService.extractDbHost(authUser.user.uri!);
const activityIdHost = this.utilityService.extractDbHost(activity.id);
if (signerHost !== activityIdHost) {
- return `skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`;
+ throw new Bull.UnrecoverableError(`skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`);
}
}
diff --git a/packages/backend/src/queue/processors/RelationshipProcessorService.ts b/packages/backend/src/queue/processors/RelationshipProcessorService.ts
index ff454df455..722260d948 100644
--- a/packages/backend/src/queue/processors/RelationshipProcessorService.ts
+++ b/packages/backend/src/queue/processors/RelationshipProcessorService.ts
@@ -1,5 +1,5 @@
import { Inject, Injectable } from '@nestjs/common';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
import { UserFollowingService } from '@/core/UserFollowingService.js';
import { UserBlockingService } from '@/core/UserBlockingService.js';
diff --git a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts
index e5840f3da8..eab8e1e68d 100644
--- a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts
+++ b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts
@@ -15,7 +15,7 @@ import PerUserDriveChart from '@/core/chart/charts/per-user-drive.js';
import ApRequestChart from '@/core/chart/charts/ap-request.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
@Injectable()
export class ResyncChartsProcessorService {
@@ -43,7 +43,7 @@ export class ResyncChartsProcessorService {
}
@bindThis
- public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> {
+ public async process(): Promise<void> {
this.logger.info('Resync charts...');
// TODO: ユーザーごとのチャートも更新する
@@ -55,6 +55,5 @@ export class ResyncChartsProcessorService {
]);
this.logger.succ('All charts successfully resynced.');
- done();
}
}
diff --git a/packages/backend/src/queue/processors/TickChartsProcessorService.ts b/packages/backend/src/queue/processors/TickChartsProcessorService.ts
index 7ff84c15a5..f1696bf567 100644
--- a/packages/backend/src/queue/processors/TickChartsProcessorService.ts
+++ b/packages/backend/src/queue/processors/TickChartsProcessorService.ts
@@ -16,7 +16,7 @@ import PerUserDriveChart from '@/core/chart/charts/per-user-drive.js';
import ApRequestChart from '@/core/chart/charts/ap-request.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
+import type * as Bull from 'bullmq';
@Injectable()
export class TickChartsProcessorService {
@@ -45,7 +45,7 @@ export class TickChartsProcessorService {
}
@bindThis
- public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> {
+ public async process(): Promise<void> {
this.logger.info('Tick charts...');
await Promise.all([
@@ -64,6 +64,5 @@ export class TickChartsProcessorService {
]);
this.logger.succ('All charts successfully ticked.');
- done();
}
}
diff --git a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts
index 84a5c21c49..8b40c16749 100644
--- a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts
+++ b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts
@@ -1,4 +1,5 @@
import { Inject, Injectable } from '@nestjs/common';
+import * as Bull from 'bullmq';
import { DI } from '@/di-symbols.js';
import type { WebhooksRepository } from '@/models/index.js';
import type { Config } from '@/config.js';
@@ -7,7 +8,6 @@ import { HttpRequestService } from '@/core/HttpRequestService.js';
import { StatusError } from '@/misc/status-error.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
import type { WebhookDeliverJobData } from '../types.js';
@Injectable()
@@ -66,11 +66,11 @@ export class WebhookDeliverProcessorService {
if (res instanceof StatusError) {
// 4xx
if (res.isClientError) {
- return `${res.statusCode} ${res.statusMessage}`;
+ throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`);
}
// 5xx etc.
- throw `${res.statusCode} ${res.statusMessage}`;
+ throw new Error(`${res.statusCode} ${res.statusMessage}`);
} else {
// DNS error, socket error, timeout ...
throw res;