diff options
| author | syuilo <Syuilotan@yahoo.co.jp> | 2023-05-10 15:05:08 +0900 |
|---|---|---|
| committer | syuilo <Syuilotan@yahoo.co.jp> | 2023-05-10 15:05:08 +0900 |
| commit | 341c42ebb9c30428fdc7527dd3d22b2d25885ed6 (patch) | |
| tree | be605c167249125ab88e644ba07edec701b73763 /packages/backend/src/queue/QueueProcessorService.ts | |
| parent | Merge branch 'develop' of https://github.com/misskey-dev/misskey into develop (diff) | |
| download | sharkey-341c42ebb9c30428fdc7527dd3d22b2d25885ed6.tar.gz sharkey-341c42ebb9c30428fdc7527dd3d22b2d25885ed6.tar.bz2 sharkey-341c42ebb9c30428fdc7527dd3d22b2d25885ed6.zip | |
enhance(backend): graceful shutdown for job queue and refactor
Diffstat (limited to 'packages/backend/src/queue/QueueProcessorService.ts')
| -rw-r--r-- | packages/backend/src/queue/QueueProcessorService.ts | 108 |
1 files changed, 91 insertions, 17 deletions
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index 706110f6fc..dc025f9889 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -5,15 +5,36 @@ import type Logger from '@/logger.js'; import { QueueService } from '@/core/QueueService.js'; import { bindThis } from '@/decorators.js'; import { getJobInfo } from './get-job-info.js'; -import { SystemQueueProcessorsService } from './SystemQueueProcessorsService.js'; -import { ObjectStorageQueueProcessorsService } from './ObjectStorageQueueProcessorsService.js'; -import { DbQueueProcessorsService } from './DbQueueProcessorsService.js'; import { WebhookDeliverProcessorService } from './processors/WebhookDeliverProcessorService.js'; import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js'; import { DeliverProcessorService } from './processors/DeliverProcessorService.js'; import { InboxProcessorService } from './processors/InboxProcessorService.js'; +import { DeleteDriveFilesProcessorService } from './processors/DeleteDriveFilesProcessorService.js'; +import { ExportCustomEmojisProcessorService } from './processors/ExportCustomEmojisProcessorService.js'; +import { ExportNotesProcessorService } from './processors/ExportNotesProcessorService.js'; +import { ExportFollowingProcessorService } from './processors/ExportFollowingProcessorService.js'; +import { ExportMutingProcessorService } from './processors/ExportMutingProcessorService.js'; +import { ExportBlockingProcessorService } from './processors/ExportBlockingProcessorService.js'; +import { ExportUserListsProcessorService } from './processors/ExportUserListsProcessorService.js'; +import { ExportAntennasProcessorService } from './processors/ExportAntennasProcessorService.js'; +import { ImportFollowingProcessorService } from './processors/ImportFollowingProcessorService.js'; +import { ImportMutingProcessorService } from './processors/ImportMutingProcessorService.js'; +import { ImportBlockingProcessorService } from './processors/ImportBlockingProcessorService.js'; +import { ImportUserListsProcessorService } from './processors/ImportUserListsProcessorService.js'; +import { ImportCustomEmojisProcessorService } from './processors/ImportCustomEmojisProcessorService.js'; +import { ImportAntennasProcessorService } from './processors/ImportAntennasProcessorService.js'; +import { DeleteAccountProcessorService } from './processors/DeleteAccountProcessorService.js'; +import { ExportFavoritesProcessorService } from './processors/ExportFavoritesProcessorService.js'; +import { CleanRemoteFilesProcessorService } from './processors/CleanRemoteFilesProcessorService.js'; +import { DeleteFileProcessorService } from './processors/DeleteFileProcessorService.js'; +import { RelationshipProcessorService } from './processors/RelationshipProcessorService.js'; +import { TickChartsProcessorService } from './processors/TickChartsProcessorService.js'; +import { ResyncChartsProcessorService } from './processors/ResyncChartsProcessorService.js'; +import { CleanChartsProcessorService } from './processors/CleanChartsProcessorService.js'; +import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js'; +import { CleanProcessorService } from './processors/CleanProcessorService.js'; +import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js'; import { QueueLoggerService } from './QueueLoggerService.js'; -import { RelationshipQueueProcessorsService } from './RelationshipQueueProcessorsService.js'; @Injectable() export class QueueProcessorService { @@ -25,14 +46,35 @@ export class QueueProcessorService { private queueLoggerService: QueueLoggerService, private queueService: QueueService, - private systemQueueProcessorsService: SystemQueueProcessorsService, - private objectStorageQueueProcessorsService: ObjectStorageQueueProcessorsService, - private dbQueueProcessorsService: DbQueueProcessorsService, - private relationshipQueueProcessorsService: RelationshipQueueProcessorsService, private webhookDeliverProcessorService: WebhookDeliverProcessorService, private endedPollNotificationProcessorService: EndedPollNotificationProcessorService, private deliverProcessorService: DeliverProcessorService, private inboxProcessorService: InboxProcessorService, + private deleteDriveFilesProcessorService: DeleteDriveFilesProcessorService, + private exportCustomEmojisProcessorService: ExportCustomEmojisProcessorService, + private exportNotesProcessorService: ExportNotesProcessorService, + private exportFavoritesProcessorService: ExportFavoritesProcessorService, + private exportFollowingProcessorService: ExportFollowingProcessorService, + private exportMutingProcessorService: ExportMutingProcessorService, + private exportBlockingProcessorService: ExportBlockingProcessorService, + private exportUserListsProcessorService: ExportUserListsProcessorService, + private exportAntennasProcessorService: ExportAntennasProcessorService, + private importFollowingProcessorService: ImportFollowingProcessorService, + private importMutingProcessorService: ImportMutingProcessorService, + private importBlockingProcessorService: ImportBlockingProcessorService, + private importUserListsProcessorService: ImportUserListsProcessorService, + private importCustomEmojisProcessorService: ImportCustomEmojisProcessorService, + private importAntennasProcessorService: ImportAntennasProcessorService, + private deleteAccountProcessorService: DeleteAccountProcessorService, + private deleteFileProcessorService: DeleteFileProcessorService, + private cleanRemoteFilesProcessorService: CleanRemoteFilesProcessorService, + private relationshipProcessorService: RelationshipProcessorService, + private tickChartsProcessorService: TickChartsProcessorService, + private resyncChartsProcessorService: ResyncChartsProcessorService, + private cleanChartsProcessorService: CleanChartsProcessorService, + private aggregateRetentionProcessorService: AggregateRetentionProcessorService, + private checkExpiredMutingsProcessorService: CheckExpiredMutingsProcessorService, + private cleanProcessorService: CleanProcessorService, ) { this.logger = this.queueLoggerService.logger; } @@ -119,14 +161,6 @@ export class QueueProcessorService { .on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) })) .on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`)); - this.queueService.deliverQueue.process(this.config.deliverJobConcurrency ?? 128, (job) => this.deliverProcessorService.process(job)); - this.queueService.inboxQueue.process(this.config.inboxJobConcurrency ?? 16, (job) => this.inboxProcessorService.process(job)); - this.queueService.endedPollNotificationQueue.process((job, done) => this.endedPollNotificationProcessorService.process(job, done)); - this.queueService.webhookDeliverQueue.process(64, (job) => this.webhookDeliverProcessorService.process(job)); - this.dbQueueProcessorsService.start(this.queueService.dbQueue); - this.relationshipQueueProcessorsService.start(this.queueService.relationshipQueue); - this.objectStorageQueueProcessorsService.start(this.queueService.objectStorageQueue); - this.queueService.systemQueue.add('tickCharts', { }, { repeat: { cron: '55 * * * *' }, @@ -163,6 +197,46 @@ export class QueueProcessorService { removeOnComplete: true, }); - this.systemQueueProcessorsService.start(this.queueService.systemQueue); + this.queueService.deliverQueue.process(this.config.deliverJobConcurrency ?? 128, (job) => this.deliverProcessorService.process(job)); + this.queueService.inboxQueue.process(this.config.inboxJobConcurrency ?? 16, (job) => this.inboxProcessorService.process(job)); + this.queueService.endedPollNotificationQueue.process((job, done) => this.endedPollNotificationProcessorService.process(job, done)); + this.queueService.webhookDeliverQueue.process(64, (job) => this.webhookDeliverProcessorService.process(job)); + + this.queueService.dbQueue.process('deleteDriveFiles', (job, done) => this.deleteDriveFilesProcessorService.process(job, done)); + this.queueService.dbQueue.process('exportCustomEmojis', (job, done) => this.exportCustomEmojisProcessorService.process(job, done)); + this.queueService.dbQueue.process('exportNotes', (job, done) => this.exportNotesProcessorService.process(job, done)); + this.queueService.dbQueue.process('exportFavorites', (job, done) => this.exportFavoritesProcessorService.process(job, done)); + this.queueService.dbQueue.process('exportFollowing', (job, done) => this.exportFollowingProcessorService.process(job, done)); + this.queueService.dbQueue.process('exportMuting', (job, done) => this.exportMutingProcessorService.process(job, done)); + this.queueService.dbQueue.process('exportBlocking', (job, done) => this.exportBlockingProcessorService.process(job, done)); + this.queueService.dbQueue.process('exportUserLists', (job, done) => this.exportUserListsProcessorService.process(job, done)); + this.queueService.dbQueue.process('exportAntennas', (job, done) => this.exportAntennasProcessorService.process(job, done)); + this.queueService.dbQueue.process('importFollowing', (job, done) => this.importFollowingProcessorService.process(job, done)); + this.queueService.dbQueue.process('importFollowingToDb', (job) => this.importFollowingProcessorService.processDb(job)); + this.queueService.dbQueue.process('importMuting', (job, done) => this.importMutingProcessorService.process(job, done)); + this.queueService.dbQueue.process('importBlocking', (job, done) => this.importBlockingProcessorService.process(job, done)); + this.queueService.dbQueue.process('importBlockingToDb', (job) => this.importBlockingProcessorService.processDb(job)); + this.queueService.dbQueue.process('importUserLists', (job, done) => this.importUserListsProcessorService.process(job, done)); + this.queueService.dbQueue.process('importCustomEmojis', (job, done) => this.importCustomEmojisProcessorService.process(job, done)); + this.queueService.dbQueue.process('importAntennas', (job, done) => this.importAntennasProcessorService.process(job, done)); + this.queueService.dbQueue.process('deleteAccount', (job) => this.deleteAccountProcessorService.process(job)); + + this.queueService.objectStorageQueue.process('deleteFile', 16, (job) => this.deleteFileProcessorService.process(job)); + this.queueService.objectStorageQueue.process('cleanRemoteFiles', 16, (job, done) => this.cleanRemoteFilesProcessorService.process(job, done)); + + { + const maxJobs = this.config.relashionshipJobConcurrency ?? 16; + this.queueService.relationshipQueue.process('follow', maxJobs, (job) => this.relationshipProcessorService.processFollow(job)); + this.queueService.relationshipQueue.process('unfollow', maxJobs, (job) => this.relationshipProcessorService.processUnfollow(job)); + this.queueService.relationshipQueue.process('block', maxJobs, (job) => this.relationshipProcessorService.processBlock(job)); + this.queueService.relationshipQueue.process('unblock', maxJobs, (job) => this.relationshipProcessorService.processUnblock(job)); + } + + this.queueService.systemQueue.process('tickCharts', (job, done) => this.tickChartsProcessorService.process(job, done)); + this.queueService.systemQueue.process('resyncCharts', (job, done) => this.resyncChartsProcessorService.process(job, done)); + this.queueService.systemQueue.process('cleanCharts', (job, done) => this.cleanChartsProcessorService.process(job, done)); + this.queueService.systemQueue.process('aggregateRetention', (job, done) => this.aggregateRetentionProcessorService.process(job, done)); + this.queueService.systemQueue.process('checkExpiredMutings', (job, done) => this.checkExpiredMutingsProcessorService.process(job, done)); + this.queueService.systemQueue.process('clean', (job, done) => this.cleanProcessorService.process(job, done)); } } |