diff options
| author | syuilo <Syuilotan@yahoo.co.jp> | 2023-05-29 11:54:49 +0900 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-05-29 11:54:49 +0900 |
| commit | fd7b77c542b51313d8b8ea60124725fe65a295d5 (patch) | |
| tree | 78893fdfe273496831124414789376b1e2a18997 /packages/backend/src/queue/QueueProcessorService.ts | |
| parent | pnpm devでCtrl+Cで終了させてもプロセスが完全に殺せないの... (diff) | |
| download | sharkey-fd7b77c542b51313d8b8ea60124725fe65a295d5.tar.gz sharkey-fd7b77c542b51313d8b8ea60124725fe65a295d5.tar.bz2 sharkey-fd7b77c542b51313d8b8ea60124725fe65a295d5.zip | |
enhance(backend): migrate bull to bullmq (#10910)
* wip
* wip
* Update QueueService.ts
* wip
* refactor
* :v:
* fix
* Update QueueStatsService.ts
* refactor
* Update ApNoteService.ts
* Update mock-resolver.ts
* refactor
* Update mock-resolver.ts
Diffstat (limited to 'packages/backend/src/queue/QueueProcessorService.ts')
| -rw-r--r-- | packages/backend/src/queue/QueueProcessorService.ts | 304 |
1 files changed, 187 insertions, 117 deletions
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index dc025f9889..011082cd36 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -1,10 +1,9 @@ import { Inject, Injectable } from '@nestjs/common'; +import * as Bull from 'bullmq'; import type { Config } from '@/config.js'; import { DI } from '@/di-symbols.js'; import type Logger from '@/logger.js'; -import { QueueService } from '@/core/QueueService.js'; import { bindThis } from '@/decorators.js'; -import { getJobInfo } from './get-job-info.js'; import { WebhookDeliverProcessorService } from './processors/WebhookDeliverProcessorService.js'; import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js'; import { DeliverProcessorService } from './processors/DeliverProcessorService.js'; @@ -35,6 +34,33 @@ import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMu import { CleanProcessorService } from './processors/CleanProcessorService.js'; import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js'; import { QueueLoggerService } from './QueueLoggerService.js'; +import { QUEUE, baseQueueOptions } from './const.js'; + +// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019 +function httpRelatedBackoff(attemptsMade: number) { + const baseDelay = 60 * 1000; // 1min + const maxBackoff = 8 * 60 * 60 * 1000; // 8hours + let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay; + backoff = Math.min(backoff, maxBackoff); + backoff += Math.round(backoff * Math.random() * 0.2); + return backoff; +} + +function getJobInfo(job: Bull.Job | undefined, increment = false): string { + if (job == null) return '-'; + + const age = Date.now() - job.timestamp; + + const formated = age > 60000 ? `${Math.floor(age / 1000 / 60)}m` + : age > 10000 ? `${Math.floor(age / 1000)}s` + : `${age}ms`; + + // onActiveとかonCompletedのattemptsMadeがなぜか0始まりなのでインクリメントする + const currentAttempts = job.attemptsMade + (increment ? 1 : 0); + const maxAttempts = job.opts ? job.opts.attempts : 0; + + return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`; +} @Injectable() export class QueueProcessorService { @@ -45,7 +71,6 @@ export class QueueProcessorService { private config: Config, private queueLoggerService: QueueLoggerService, - private queueService: QueueService, private webhookDeliverProcessorService: WebhookDeliverProcessorService, private endedPollNotificationProcessorService: EndedPollNotificationProcessorService, private deliverProcessorService: DeliverProcessorService, @@ -97,146 +122,191 @@ export class QueueProcessorService { } } + //#region system + const systemQueueWorker = new Bull.Worker(QUEUE.SYSTEM, (job) => { + switch (job.name) { + case 'tickCharts': return this.tickChartsProcessorService.process(); + case 'resyncCharts': return this.resyncChartsProcessorService.process(); + case 'cleanCharts': return this.cleanChartsProcessorService.process(); + case 'aggregateRetention': return this.aggregateRetentionProcessorService.process(); + case 'checkExpiredMutings': return this.checkExpiredMutingsProcessorService.process(); + case 'clean': return this.cleanProcessorService.process(); + default: throw new Error(`unrecognized job type ${job.name} for system`); + } + }, { + ...baseQueueOptions(this.config, QUEUE.SYSTEM), + }); + const systemLogger = this.logger.createSubLogger('system'); - const deliverLogger = this.logger.createSubLogger('deliver'); - const webhookLogger = this.logger.createSubLogger('webhook'); - const inboxLogger = this.logger.createSubLogger('inbox'); - const dbLogger = this.logger.createSubLogger('db'); - const relationshipLogger = this.logger.createSubLogger('relationship'); - const objectStorageLogger = this.logger.createSubLogger('objectStorage'); - this.queueService.systemQueue - .on('waiting', (jobId) => systemLogger.debug(`waiting id=${jobId}`)) + systemQueueWorker .on('active', (job) => systemLogger.debug(`active id=${job.id}`)) .on('completed', (job, result) => systemLogger.debug(`completed(${result}) id=${job.id}`)) - .on('failed', (job, err) => systemLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) })) - .on('error', (job: any, err: Error) => systemLogger.error(`error ${err}`, { job, e: renderError(err) })) - .on('stalled', (job) => systemLogger.warn(`stalled id=${job.id}`)); + .on('failed', (job, err) => systemLogger.warn(`failed(${err}) id=${job ? job.id : '-'}`, { job, e: renderError(err) })) + .on('error', (err: Error) => systemLogger.error(`error ${err}`, { e: renderError(err) })) + .on('stalled', (jobId) => systemLogger.warn(`stalled id=${jobId}`)); + //#endregion - this.queueService.deliverQueue - .on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`)) - .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) - .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) - .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`)) - .on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`, { job, e: renderError(err) })) - .on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`)); + //#region db + const dbQueueWorker = new Bull.Worker(QUEUE.DB, (job) => { + switch (job.name) { + case 'deleteDriveFiles': return this.deleteDriveFilesProcessorService.process(job); + case 'exportCustomEmojis': return this.exportCustomEmojisProcessorService.process(job); + case 'exportNotes': return this.exportNotesProcessorService.process(job); + case 'exportFavorites': return this.exportFavoritesProcessorService.process(job); + case 'exportFollowing': return this.exportFollowingProcessorService.process(job); + case 'exportMuting': return this.exportMutingProcessorService.process(job); + case 'exportBlocking': return this.exportBlockingProcessorService.process(job); + case 'exportUserLists': return this.exportUserListsProcessorService.process(job); + case 'exportAntennas': return this.exportAntennasProcessorService.process(job); + case 'importFollowing': return this.importFollowingProcessorService.process(job); + case 'importFollowingToDb': return this.importFollowingProcessorService.processDb(job); + case 'importMuting': return this.importMutingProcessorService.process(job); + case 'importBlocking': return this.importBlockingProcessorService.process(job); + case 'importBlockingToDb': return this.importBlockingProcessorService.processDb(job); + case 'importUserLists': return this.importUserListsProcessorService.process(job); + case 'importCustomEmojis': return this.importCustomEmojisProcessorService.process(job); + case 'importAntennas': return this.importAntennasProcessorService.process(job); + case 'deleteAccount': return this.deleteAccountProcessorService.process(job); + default: throw new Error(`unrecognized job type ${job.name} for db`); + } + }, { + ...baseQueueOptions(this.config, QUEUE.DB), + }); - this.queueService.inboxQueue - .on('waiting', (jobId) => inboxLogger.debug(`waiting id=${jobId}`)) - .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`)) - .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`)) - .on('failed', (job, err) => inboxLogger.warn(`failed(${err}) ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`, { job, e: renderError(err) })) - .on('error', (job: any, err: Error) => inboxLogger.error(`error ${err}`, { job, e: renderError(err) })) - .on('stalled', (job) => inboxLogger.warn(`stalled ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`)); + const dbLogger = this.logger.createSubLogger('db'); - this.queueService.dbQueue - .on('waiting', (jobId) => dbLogger.debug(`waiting id=${jobId}`)) + dbQueueWorker .on('active', (job) => dbLogger.debug(`active id=${job.id}`)) .on('completed', (job, result) => dbLogger.debug(`completed(${result}) id=${job.id}`)) - .on('failed', (job, err) => dbLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) })) - .on('error', (job: any, err: Error) => dbLogger.error(`error ${err}`, { job, e: renderError(err) })) - .on('stalled', (job) => dbLogger.warn(`stalled id=${job.id}`)); + .on('failed', (job, err) => dbLogger.warn(`failed(${err}) id=${job ? job.id : '-'}`, { job, e: renderError(err) })) + .on('error', (err: Error) => dbLogger.error(`error ${err}`, { e: renderError(err) })) + .on('stalled', (jobId) => dbLogger.warn(`stalled id=${jobId}`)); + //#endregion - this.queueService.relationshipQueue - .on('waiting', (jobId) => relationshipLogger.debug(`waiting id=${jobId}`)) - .on('active', (job) => relationshipLogger.debug(`active id=${job.id}`)) - .on('completed', (job, result) => relationshipLogger.debug(`completed(${result}) id=${job.id}`)) - .on('failed', (job, err) => relationshipLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) })) - .on('error', (job: any, err: Error) => relationshipLogger.error(`error ${err}`, { job, e: renderError(err) })) - .on('stalled', (job) => relationshipLogger.warn(`stalled id=${job.id}`)); + //#region deliver + const deliverQueueWorker = new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), { + ...baseQueueOptions(this.config, QUEUE.DELIVER), + concurrency: this.config.deliverJobConcurrency ?? 128, + limiter: { + max: this.config.deliverJobPerSec ?? 128, + duration: 1000, + }, + settings: { + backoffStrategy: httpRelatedBackoff, + }, + }); - this.queueService.objectStorageQueue - .on('waiting', (jobId) => objectStorageLogger.debug(`waiting id=${jobId}`)) - .on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`)) - .on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`)) - .on('failed', (job, err) => objectStorageLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) })) - .on('error', (job: any, err: Error) => objectStorageLogger.error(`error ${err}`, { job, e: renderError(err) })) - .on('stalled', (job) => objectStorageLogger.warn(`stalled id=${job.id}`)); + const deliverLogger = this.logger.createSubLogger('deliver'); - this.queueService.webhookDeliverQueue - .on('waiting', (jobId) => webhookLogger.debug(`waiting id=${jobId}`)) - .on('active', (job) => webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) - .on('completed', (job, result) => webhookLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) - .on('failed', (job, err) => webhookLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`)) - .on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) })) - .on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`)); + deliverQueueWorker + .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`)) + .on('error', (err: Error) => deliverLogger.error(`error ${err}`, { e: renderError(err) })) + .on('stalled', (jobId) => deliverLogger.warn(`stalled id=${jobId}`)); + //#endregion - this.queueService.systemQueue.add('tickCharts', { - }, { - repeat: { cron: '55 * * * *' }, - removeOnComplete: true, + //#region inbox + const inboxQueueWorker = new Bull.Worker(QUEUE.INBOX, (job) => this.inboxProcessorService.process(job), { + ...baseQueueOptions(this.config, QUEUE.INBOX), + concurrency: this.config.inboxJobConcurrency ?? 16, + limiter: { + max: this.config.inboxJobPerSec ?? 16, + duration: 1000, + }, + settings: { + backoffStrategy: httpRelatedBackoff, + }, }); - this.queueService.systemQueue.add('resyncCharts', { - }, { - repeat: { cron: '0 0 * * *' }, - removeOnComplete: true, - }); + const inboxLogger = this.logger.createSubLogger('inbox'); - this.queueService.systemQueue.add('cleanCharts', { - }, { - repeat: { cron: '0 0 * * *' }, - removeOnComplete: true, - }); + inboxQueueWorker + .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`)) + .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`)) + .on('failed', (job, err) => inboxLogger.warn(`failed(${err}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, e: renderError(err) })) + .on('error', (err: Error) => inboxLogger.error(`error ${err}`, { e: renderError(err) })) + .on('stalled', (jobId) => inboxLogger.warn(`stalled id=${jobId}`)); + //#endregion - this.queueService.systemQueue.add('aggregateRetention', { - }, { - repeat: { cron: '0 0 * * *' }, - removeOnComplete: true, + //#region webhook deliver + const webhookDeliverQueueWorker = new Bull.Worker(QUEUE.WEBHOOK_DELIVER, (job) => this.webhookDeliverProcessorService.process(job), { + ...baseQueueOptions(this.config, QUEUE.WEBHOOK_DELIVER), + concurrency: 64, + limiter: { + max: 64, + duration: 1000, + }, + settings: { + backoffStrategy: httpRelatedBackoff, + }, }); - this.queueService.systemQueue.add('clean', { + const webhookLogger = this.logger.createSubLogger('webhook'); + + webhookDeliverQueueWorker + .on('active', (job) => webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('completed', (job, result) => webhookLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('failed', (job, err) => webhookLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`)) + .on('error', (err: Error) => webhookLogger.error(`error ${err}`, { e: renderError(err) })) + .on('stalled', (jobId) => webhookLogger.warn(`stalled id=${jobId}`)); + //#endregion + + //#region relationship + const relationshipQueueWorker = new Bull.Worker(QUEUE.RELATIONSHIP, (job) => { + switch (job.name) { + case 'follow': return this.relationshipProcessorService.processFollow(job); + case 'unfollow': return this.relationshipProcessorService.processUnfollow(job); + case 'block': return this.relationshipProcessorService.processBlock(job); + case 'unblock': return this.relationshipProcessorService.processUnblock(job); + default: throw new Error(`unrecognized job type ${job.name} for relationship`); + } }, { - repeat: { cron: '0 0 * * *' }, - removeOnComplete: true, + ...baseQueueOptions(this.config, QUEUE.RELATIONSHIP), + concurrency: this.config.relashionshipJobConcurrency ?? 16, + limiter: { + max: this.config.relashionshipJobPerSec ?? 64, + duration: 1000, + }, }); - this.queueService.systemQueue.add('checkExpiredMutings', { + const relationshipLogger = this.logger.createSubLogger('relationship'); + + relationshipQueueWorker + .on('active', (job) => relationshipLogger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => relationshipLogger.debug(`completed(${result}) id=${job.id}`)) + .on('failed', (job, err) => relationshipLogger.warn(`failed(${err}) id=${job ? job.id : '-'}`, { job, e: renderError(err) })) + .on('error', (err: Error) => relationshipLogger.error(`error ${err}`, { e: renderError(err) })) + .on('stalled', (jobId) => relationshipLogger.warn(`stalled id=${jobId}`)); + //#endregion + + //#region object storage + const objectStorageQueueWorker = new Bull.Worker(QUEUE.OBJECT_STORAGE, (job) => { + switch (job.name) { + case 'deleteFile': return this.deleteFileProcessorService.process(job); + case 'cleanRemoteFiles': return this.cleanRemoteFilesProcessorService.process(job); + default: throw new Error(`unrecognized job type ${job.name} for objectStorage`); + } }, { - repeat: { cron: '*/5 * * * *' }, - removeOnComplete: true, + ...baseQueueOptions(this.config, QUEUE.OBJECT_STORAGE), + concurrency: 16, }); - this.queueService.deliverQueue.process(this.config.deliverJobConcurrency ?? 128, (job) => this.deliverProcessorService.process(job)); - this.queueService.inboxQueue.process(this.config.inboxJobConcurrency ?? 16, (job) => this.inboxProcessorService.process(job)); - this.queueService.endedPollNotificationQueue.process((job, done) => this.endedPollNotificationProcessorService.process(job, done)); - this.queueService.webhookDeliverQueue.process(64, (job) => this.webhookDeliverProcessorService.process(job)); - - this.queueService.dbQueue.process('deleteDriveFiles', (job, done) => this.deleteDriveFilesProcessorService.process(job, done)); - this.queueService.dbQueue.process('exportCustomEmojis', (job, done) => this.exportCustomEmojisProcessorService.process(job, done)); - this.queueService.dbQueue.process('exportNotes', (job, done) => this.exportNotesProcessorService.process(job, done)); - this.queueService.dbQueue.process('exportFavorites', (job, done) => this.exportFavoritesProcessorService.process(job, done)); - this.queueService.dbQueue.process('exportFollowing', (job, done) => this.exportFollowingProcessorService.process(job, done)); - this.queueService.dbQueue.process('exportMuting', (job, done) => this.exportMutingProcessorService.process(job, done)); - this.queueService.dbQueue.process('exportBlocking', (job, done) => this.exportBlockingProcessorService.process(job, done)); - this.queueService.dbQueue.process('exportUserLists', (job, done) => this.exportUserListsProcessorService.process(job, done)); - this.queueService.dbQueue.process('exportAntennas', (job, done) => this.exportAntennasProcessorService.process(job, done)); - this.queueService.dbQueue.process('importFollowing', (job, done) => this.importFollowingProcessorService.process(job, done)); - this.queueService.dbQueue.process('importFollowingToDb', (job) => this.importFollowingProcessorService.processDb(job)); - this.queueService.dbQueue.process('importMuting', (job, done) => this.importMutingProcessorService.process(job, done)); - this.queueService.dbQueue.process('importBlocking', (job, done) => this.importBlockingProcessorService.process(job, done)); - this.queueService.dbQueue.process('importBlockingToDb', (job) => this.importBlockingProcessorService.processDb(job)); - this.queueService.dbQueue.process('importUserLists', (job, done) => this.importUserListsProcessorService.process(job, done)); - this.queueService.dbQueue.process('importCustomEmojis', (job, done) => this.importCustomEmojisProcessorService.process(job, done)); - this.queueService.dbQueue.process('importAntennas', (job, done) => this.importAntennasProcessorService.process(job, done)); - this.queueService.dbQueue.process('deleteAccount', (job) => this.deleteAccountProcessorService.process(job)); + const objectStorageLogger = this.logger.createSubLogger('objectStorage'); - this.queueService.objectStorageQueue.process('deleteFile', 16, (job) => this.deleteFileProcessorService.process(job)); - this.queueService.objectStorageQueue.process('cleanRemoteFiles', 16, (job, done) => this.cleanRemoteFilesProcessorService.process(job, done)); - - { - const maxJobs = this.config.relashionshipJobConcurrency ?? 16; - this.queueService.relationshipQueue.process('follow', maxJobs, (job) => this.relationshipProcessorService.processFollow(job)); - this.queueService.relationshipQueue.process('unfollow', maxJobs, (job) => this.relationshipProcessorService.processUnfollow(job)); - this.queueService.relationshipQueue.process('block', maxJobs, (job) => this.relationshipProcessorService.processBlock(job)); - this.queueService.relationshipQueue.process('unblock', maxJobs, (job) => this.relationshipProcessorService.processUnblock(job)); - } + objectStorageQueueWorker + .on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`)) + .on('failed', (job, err) => objectStorageLogger.warn(`failed(${err}) id=${job ? job.id : '-'}`, { job, e: renderError(err) })) + .on('error', (err: Error) => objectStorageLogger.error(`error ${err}`, { e: renderError(err) })) + .on('stalled', (jobId) => objectStorageLogger.warn(`stalled id=${jobId}`)); + //#endregion - this.queueService.systemQueue.process('tickCharts', (job, done) => this.tickChartsProcessorService.process(job, done)); - this.queueService.systemQueue.process('resyncCharts', (job, done) => this.resyncChartsProcessorService.process(job, done)); - this.queueService.systemQueue.process('cleanCharts', (job, done) => this.cleanChartsProcessorService.process(job, done)); - this.queueService.systemQueue.process('aggregateRetention', (job, done) => this.aggregateRetentionProcessorService.process(job, done)); - this.queueService.systemQueue.process('checkExpiredMutings', (job, done) => this.checkExpiredMutingsProcessorService.process(job, done)); - this.queueService.systemQueue.process('clean', (job, done) => this.cleanProcessorService.process(job, done)); + //#region ended poll notification + const endedPollNotificationWorker = new Bull.Worker(QUEUE.ENDED_POLL_NOTIFICATION, (job) => this.endedPollNotificationProcessorService.process(job), { + ...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION), + }); + //#endregion } } |