diff options
| author | syuilo <Syuilotan@yahoo.co.jp> | 2023-05-10 15:05:08 +0900 |
|---|---|---|
| committer | syuilo <Syuilotan@yahoo.co.jp> | 2023-05-10 15:05:08 +0900 |
| commit | 341c42ebb9c30428fdc7527dd3d22b2d25885ed6 (patch) | |
| tree | be605c167249125ab88e644ba07edec701b73763 /packages/backend/src/core/QueueModule.ts | |
| parent | Merge branch 'develop' of https://github.com/misskey-dev/misskey into develop (diff) | |
| download | sharkey-341c42ebb9c30428fdc7527dd3d22b2d25885ed6.tar.gz sharkey-341c42ebb9c30428fdc7527dd3d22b2d25885ed6.tar.bz2 sharkey-341c42ebb9c30428fdc7527dd3d22b2d25885ed6.zip | |
enhance(backend): graceful shutdown for job queue and refactor
Diffstat (limited to 'packages/backend/src/core/QueueModule.ts')
| -rw-r--r-- | packages/backend/src/core/QueueModule.ts | 41 |
1 files changed, 37 insertions, 4 deletions
diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts index d4905a5f88..1d73947776 100644 --- a/packages/backend/src/core/QueueModule.ts +++ b/packages/backend/src/core/QueueModule.ts @@ -1,4 +1,5 @@ -import { Module } from '@nestjs/common'; +import { setTimeout } from 'node:timers/promises'; +import { Inject, Module, OnApplicationShutdown } from '@nestjs/common'; import Bull from 'bull'; import { DI } from '@/di-symbols.js'; import type { Config } from '@/config.js'; @@ -41,9 +42,9 @@ export type SystemQueue = Bull.Queue<Record<string, unknown>>; export type EndedPollNotificationQueue = Bull.Queue<EndedPollNotificationJobData>; export type DeliverQueue = Bull.Queue<DeliverJobData>; export type InboxQueue = Bull.Queue<InboxJobData>; -export type DbQueue = Bull.Queue<DbJobData<keyof DbJobMap>>; +export type DbQueue = Bull.Queue; export type RelationshipQueue = Bull.Queue<RelationshipJobData>; -export type ObjectStorageQueue = Bull.Queue<ObjectStorageJobData>; +export type ObjectStorageQueue = Bull.Queue; export type WebhookDeliverQueue = Bull.Queue<WebhookDeliverJobData>; const $system: Provider = { @@ -118,4 +119,36 @@ const $webhookDeliver: Provider = { $webhookDeliver, ], }) -export class QueueModule {} +export class QueueModule implements OnApplicationShutdown { + constructor( + @Inject('queue:system') public systemQueue: SystemQueue, + @Inject('queue:endedPollNotification') public endedPollNotificationQueue: EndedPollNotificationQueue, + @Inject('queue:deliver') public deliverQueue: DeliverQueue, + @Inject('queue:inbox') public inboxQueue: InboxQueue, + @Inject('queue:db') public dbQueue: DbQueue, + @Inject('queue:relationship') public relationshipQueue: RelationshipQueue, + @Inject('queue:objectStorage') public objectStorageQueue: ObjectStorageQueue, + @Inject('queue:webhookDeliver') public webhookDeliverQueue: WebhookDeliverQueue, + ) {} + + async onApplicationShutdown(signal: string): Promise<void> { + if (process.env.NODE_ENV === 'test') { + // XXX: + // Shutting down the existing connections causes errors on Jest as + // Misskey has asynchronous postgres/redis connections that are not + // awaited. + // Let's wait for some random time for them to finish. + await setTimeout(5000); + } + await Promise.all([ + this.systemQueue.close(), + this.endedPollNotificationQueue.close(), + this.deliverQueue.close(), + this.inboxQueue.close(), + this.dbQueue.close(), + this.relationshipQueue.close(), + this.objectStorageQueue.close(), + this.webhookDeliverQueue.close(), + ]); + } +} |