From 341c42ebb9c30428fdc7527dd3d22b2d25885ed6 Mon Sep 17 00:00:00 2001 From: syuilo Date: Wed, 10 May 2023 15:05:08 +0900 Subject: enhance(backend): graceful shutdown for job queue and refactor --- packages/backend/src/core/QueueModule.ts | 41 ++++++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 4 deletions(-) (limited to 'packages/backend/src/core/QueueModule.ts') diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts index d4905a5f88..1d73947776 100644 --- a/packages/backend/src/core/QueueModule.ts +++ b/packages/backend/src/core/QueueModule.ts @@ -1,4 +1,5 @@ -import { Module } from '@nestjs/common'; +import { setTimeout } from 'node:timers/promises'; +import { Inject, Module, OnApplicationShutdown } from '@nestjs/common'; import Bull from 'bull'; import { DI } from '@/di-symbols.js'; import type { Config } from '@/config.js'; @@ -41,9 +42,9 @@ export type SystemQueue = Bull.Queue>; export type EndedPollNotificationQueue = Bull.Queue; export type DeliverQueue = Bull.Queue; export type InboxQueue = Bull.Queue; -export type DbQueue = Bull.Queue>; +export type DbQueue = Bull.Queue; export type RelationshipQueue = Bull.Queue; -export type ObjectStorageQueue = Bull.Queue; +export type ObjectStorageQueue = Bull.Queue; export type WebhookDeliverQueue = Bull.Queue; const $system: Provider = { @@ -118,4 +119,36 @@ const $webhookDeliver: Provider = { $webhookDeliver, ], }) -export class QueueModule {} +export class QueueModule implements OnApplicationShutdown { + constructor( + @Inject('queue:system') public systemQueue: SystemQueue, + @Inject('queue:endedPollNotification') public endedPollNotificationQueue: EndedPollNotificationQueue, + @Inject('queue:deliver') public deliverQueue: DeliverQueue, + @Inject('queue:inbox') public inboxQueue: InboxQueue, + @Inject('queue:db') public dbQueue: DbQueue, + @Inject('queue:relationship') public relationshipQueue: RelationshipQueue, + @Inject('queue:objectStorage') public objectStorageQueue: ObjectStorageQueue, + @Inject('queue:webhookDeliver') public webhookDeliverQueue: WebhookDeliverQueue, + ) {} + + async onApplicationShutdown(signal: string): Promise { + if (process.env.NODE_ENV === 'test') { + // XXX: + // Shutting down the existing connections causes errors on Jest as + // Misskey has asynchronous postgres/redis connections that are not + // awaited. + // Let's wait for some random time for them to finish. + await setTimeout(5000); + } + await Promise.all([ + this.systemQueue.close(), + this.endedPollNotificationQueue.close(), + this.deliverQueue.close(), + this.inboxQueue.close(), + this.dbQueue.close(), + this.relationshipQueue.close(), + this.objectStorageQueue.close(), + this.webhookDeliverQueue.close(), + ]); + } +} -- cgit v1.2.3-freya