From fd7b77c542b51313d8b8ea60124725fe65a295d5 Mon Sep 17 00:00:00 2001 From: syuilo Date: Mon, 29 May 2023 11:54:49 +0900 Subject: enhance(backend): migrate bull to bullmq (#10910) * wip * wip * Update QueueService.ts * wip * refactor * :v: * fix * Update QueueStatsService.ts * refactor * Update ApNoteService.ts * Update mock-resolver.ts * refactor * Update mock-resolver.ts --- packages/backend/src/core/QueueModule.ts | 53 +++++++------------------------- 1 file changed, 11 insertions(+), 42 deletions(-) (limited to 'packages/backend/src/core/QueueModule.ts') diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts index 1d73947776..6db9bb14cf 100644 --- a/packages/backend/src/core/QueueModule.ts +++ b/packages/backend/src/core/QueueModule.ts @@ -1,42 +1,11 @@ import { setTimeout } from 'node:timers/promises'; import { Inject, Module, OnApplicationShutdown } from '@nestjs/common'; -import Bull from 'bull'; +import * as Bull from 'bullmq'; import { DI } from '@/di-symbols.js'; import type { Config } from '@/config.js'; +import { QUEUE, baseQueueOptions } from '@/queue/const.js'; import type { Provider } from '@nestjs/common'; -import type { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData, DbJobMap } from '../queue/types.js'; - -function q(config: Config, name: string, limitPerSec = -1) { - return new Bull(name, { - redis: { - port: config.redisForJobQueue.port, - host: config.redisForJobQueue.host, - family: config.redisForJobQueue.family == null ? 0 : config.redisForJobQueue.family, - password: config.redisForJobQueue.pass, - db: config.redisForJobQueue.db ?? 0, - }, - prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue` : 'queue', - limiter: limitPerSec > 0 ? { - max: limitPerSec, - duration: 1000, - } : undefined, - settings: { - backoffStrategies: { - apBackoff, - }, - }, - }); -} - -// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019 -function apBackoff(attemptsMade: number, err: Error) { - const baseDelay = 60 * 1000; // 1min - const maxBackoff = 8 * 60 * 60 * 1000; // 8hours - let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay; - backoff = Math.min(backoff, maxBackoff); - backoff += Math.round(backoff * Math.random() * 0.2); - return backoff; -} +import type { DeliverJobData, InboxJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData } from '../queue/types.js'; export type SystemQueue = Bull.Queue>; export type EndedPollNotificationQueue = Bull.Queue; @@ -49,49 +18,49 @@ export type WebhookDeliverQueue = Bull.Queue; const $system: Provider = { provide: 'queue:system', - useFactory: (config: Config) => q(config, 'system'), + useFactory: (config: Config) => new Bull.Queue(QUEUE.SYSTEM, baseQueueOptions(config, QUEUE.SYSTEM)), inject: [DI.config], }; const $endedPollNotification: Provider = { provide: 'queue:endedPollNotification', - useFactory: (config: Config) => q(config, 'endedPollNotification'), + useFactory: (config: Config) => new Bull.Queue(QUEUE.ENDED_POLL_NOTIFICATION, baseQueueOptions(config, QUEUE.ENDED_POLL_NOTIFICATION)), inject: [DI.config], }; const $deliver: Provider = { provide: 'queue:deliver', - useFactory: (config: Config) => q(config, 'deliver', config.deliverJobPerSec ?? 128), + useFactory: (config: Config) => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(config, QUEUE.DELIVER)), inject: [DI.config], }; const $inbox: Provider = { provide: 'queue:inbox', - useFactory: (config: Config) => q(config, 'inbox', config.inboxJobPerSec ?? 16), + useFactory: (config: Config) => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(config, QUEUE.INBOX)), inject: [DI.config], }; const $db: Provider = { provide: 'queue:db', - useFactory: (config: Config) => q(config, 'db'), + useFactory: (config: Config) => new Bull.Queue(QUEUE.DB, baseQueueOptions(config, QUEUE.DB)), inject: [DI.config], }; const $relationship: Provider = { provide: 'queue:relationship', - useFactory: (config: Config) => q(config, 'relationship', config.relashionshipJobPerSec ?? 64), + useFactory: (config: Config) => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(config, QUEUE.RELATIONSHIP)), inject: [DI.config], }; const $objectStorage: Provider = { provide: 'queue:objectStorage', - useFactory: (config: Config) => q(config, 'objectStorage'), + useFactory: (config: Config) => new Bull.Queue(QUEUE.OBJECT_STORAGE, baseQueueOptions(config, QUEUE.OBJECT_STORAGE)), inject: [DI.config], }; const $webhookDeliver: Provider = { provide: 'queue:webhookDeliver', - useFactory: (config: Config) => q(config, 'webhookDeliver', 64), + useFactory: (config: Config) => new Bull.Queue(QUEUE.WEBHOOK_DELIVER, baseQueueOptions(config, QUEUE.WEBHOOK_DELIVER)), inject: [DI.config], }; -- cgit v1.2.3-freya From b6f21b6edb95882d1616c36ed7eeb1f78809e2f6 Mon Sep 17 00:00:00 2001 From: syuilo Date: Mon, 29 May 2023 13:21:26 +0900 Subject: refactor --- packages/backend/src/GlobalModule.ts | 6 +++++- packages/backend/src/core/AntennaService.ts | 15 ++++++++++----- packages/backend/src/core/CacheService.ts | 7 ++++++- packages/backend/src/core/MetaService.ts | 7 ++++++- packages/backend/src/core/NoteCreateService.ts | 8 +++++++- packages/backend/src/core/NoteReadService.ts | 8 +++++++- packages/backend/src/core/NotificationService.ts | 8 +++++++- packages/backend/src/core/QueueModule.ts | 6 +++++- packages/backend/src/core/RoleService.ts | 7 ++++++- packages/backend/src/core/WebhookService.ts | 7 ++++++- packages/backend/src/core/chart/ChartManagementService.ts | 8 +++++++- packages/backend/src/daemons/JanitorService.ts | 7 ++++++- packages/backend/src/daemons/QueueStatsService.ts | 9 +++++++-- packages/backend/src/daemons/ServerStatsService.ts | 7 ++++++- packages/backend/src/server/ServerService.ts | 8 +++++++- packages/backend/src/server/api/ApiCallService.ts | 7 ++++++- 16 files changed, 104 insertions(+), 21 deletions(-) (limited to 'packages/backend/src/core/QueueModule.ts') diff --git a/packages/backend/src/GlobalModule.ts b/packages/backend/src/GlobalModule.ts index 564787392f..406e3192bb 100644 --- a/packages/backend/src/GlobalModule.ts +++ b/packages/backend/src/GlobalModule.ts @@ -100,7 +100,7 @@ export class GlobalModule implements OnApplicationShutdown { @Inject(DI.redisForSub) private redisForSub: Redis.Redis, ) {} - async onApplicationShutdown(signal: string): Promise { + public async dispose(): Promise { if (process.env.NODE_ENV === 'test') { // XXX: // Shutting down the existing connections causes errors on Jest as @@ -116,4 +116,8 @@ export class GlobalModule implements OnApplicationShutdown { this.redisForSub.disconnect(), ]); } + + async onApplicationShutdown(signal: string): Promise { + await this.dispose(); + } } diff --git a/packages/backend/src/core/AntennaService.ts b/packages/backend/src/core/AntennaService.ts index 2d4226a32d..d8df371916 100644 --- a/packages/backend/src/core/AntennaService.ts +++ b/packages/backend/src/core/AntennaService.ts @@ -55,11 +55,6 @@ export class AntennaService implements OnApplicationShutdown { this.redisForSub.on('message', this.onRedisMessage); } - @bindThis - public onApplicationShutdown(signal?: string | undefined) { - this.redisForSub.off('message', this.onRedisMessage); - } - @bindThis private async onRedisMessage(_: string, data: string): Promise { const obj = JSON.parse(data); @@ -196,4 +191,14 @@ export class AntennaService implements OnApplicationShutdown { return this.antennas; } + + @bindThis + public dispose(): void { + this.redisForSub.off('message', this.onRedisMessage); + } + + @bindThis + public onApplicationShutdown(signal?: string | undefined): void { + this.dispose(); + } } diff --git a/packages/backend/src/core/CacheService.ts b/packages/backend/src/core/CacheService.ts index cf1e81ffc8..de33e4c243 100644 --- a/packages/backend/src/core/CacheService.ts +++ b/packages/backend/src/core/CacheService.ts @@ -166,7 +166,12 @@ export class CacheService implements OnApplicationShutdown { } @bindThis - public onApplicationShutdown(signal?: string | undefined) { + public dispose(): void { this.redisForSub.off('message', this.onMessage); } + + @bindThis + public onApplicationShutdown(signal?: string | undefined): void { + this.dispose(); + } } diff --git a/packages/backend/src/core/MetaService.ts b/packages/backend/src/core/MetaService.ts index 0b861be8d0..5acc9ad9ad 100644 --- a/packages/backend/src/core/MetaService.ts +++ b/packages/backend/src/core/MetaService.ts @@ -120,8 +120,13 @@ export class MetaService implements OnApplicationShutdown { } @bindThis - public onApplicationShutdown(signal?: string | undefined) { + public dispose(): void { clearInterval(this.intervalId); this.redisForSub.off('message', this.onMessage); } + + @bindThis + public onApplicationShutdown(signal?: string | undefined): void { + this.dispose(); + } } diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index 2fd7a8ac86..1c8491bf57 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -790,7 +790,13 @@ export class NoteCreateService implements OnApplicationShutdown { return mentionedUsers; } - onApplicationShutdown(signal?: string | undefined) { + @bindThis + public dispose(): void { this.#shutdownController.abort(); } + + @bindThis + public onApplicationShutdown(signal?: string | undefined): void { + this.dispose(); + } } diff --git a/packages/backend/src/core/NoteReadService.ts b/packages/backend/src/core/NoteReadService.ts index 1129bd159c..e57e57d310 100644 --- a/packages/backend/src/core/NoteReadService.ts +++ b/packages/backend/src/core/NoteReadService.ts @@ -122,7 +122,13 @@ export class NoteReadService implements OnApplicationShutdown { } } - onApplicationShutdown(signal?: string | undefined): void { + @bindThis + public dispose(): void { this.#shutdownController.abort(); } + + @bindThis + public onApplicationShutdown(signal?: string | undefined): void { + this.dispose(); + } } diff --git a/packages/backend/src/core/NotificationService.ts b/packages/backend/src/core/NotificationService.ts index a245908c98..ed47165f7b 100644 --- a/packages/backend/src/core/NotificationService.ts +++ b/packages/backend/src/core/NotificationService.ts @@ -152,7 +152,13 @@ export class NotificationService implements OnApplicationShutdown { */ } - onApplicationShutdown(signal?: string | undefined): void { + @bindThis + public dispose(): void { this.#shutdownController.abort(); } + + @bindThis + public onApplicationShutdown(signal?: string | undefined): void { + this.dispose(); + } } diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts index 6db9bb14cf..3384ca4577 100644 --- a/packages/backend/src/core/QueueModule.ts +++ b/packages/backend/src/core/QueueModule.ts @@ -100,7 +100,7 @@ export class QueueModule implements OnApplicationShutdown { @Inject('queue:webhookDeliver') public webhookDeliverQueue: WebhookDeliverQueue, ) {} - async onApplicationShutdown(signal: string): Promise { + public async dispose(): Promise { if (process.env.NODE_ENV === 'test') { // XXX: // Shutting down the existing connections causes errors on Jest as @@ -120,4 +120,8 @@ export class QueueModule implements OnApplicationShutdown { this.webhookDeliverQueue.close(), ]); } + + async onApplicationShutdown(signal: string): Promise { + await this.dispose(); + } } diff --git a/packages/backend/src/core/RoleService.ts b/packages/backend/src/core/RoleService.ts index 130ec5ec8c..40ae106662 100644 --- a/packages/backend/src/core/RoleService.ts +++ b/packages/backend/src/core/RoleService.ts @@ -433,7 +433,12 @@ export class RoleService implements OnApplicationShutdown { } @bindThis - public onApplicationShutdown(signal?: string | undefined) { + public dispose(): void { this.redisForSub.off('message', this.onMessage); } + + @bindThis + public onApplicationShutdown(signal?: string | undefined): void { + this.dispose(); + } } diff --git a/packages/backend/src/core/WebhookService.ts b/packages/backend/src/core/WebhookService.ts index 57baade777..467755a072 100644 --- a/packages/backend/src/core/WebhookService.ts +++ b/packages/backend/src/core/WebhookService.ts @@ -81,7 +81,12 @@ export class WebhookService implements OnApplicationShutdown { } @bindThis - public onApplicationShutdown(signal?: string | undefined) { + public dispose(): void { this.redisForSub.off('message', this.onMessage); } + + @bindThis + public onApplicationShutdown(signal?: string | undefined): void { + this.dispose(); + } } diff --git a/packages/backend/src/core/chart/ChartManagementService.ts b/packages/backend/src/core/chart/ChartManagementService.ts index 03e3612658..b0e9e534df 100644 --- a/packages/backend/src/core/chart/ChartManagementService.ts +++ b/packages/backend/src/core/chart/ChartManagementService.ts @@ -60,7 +60,8 @@ export class ChartManagementService implements OnApplicationShutdown { }, 1000 * 60 * 20); } - async onApplicationShutdown(signal: string): Promise { + @bindThis + public async dispose(): Promise { clearInterval(this.saveIntervalId); if (process.env.NODE_ENV !== 'test') { await Promise.all( @@ -68,4 +69,9 @@ export class ChartManagementService implements OnApplicationShutdown { ); } } + + @bindThis + async onApplicationShutdown(signal: string): Promise { + await this.dispose(); + } } diff --git a/packages/backend/src/daemons/JanitorService.ts b/packages/backend/src/daemons/JanitorService.ts index 8cdfb703f1..f826d50625 100644 --- a/packages/backend/src/daemons/JanitorService.ts +++ b/packages/backend/src/daemons/JanitorService.ts @@ -34,7 +34,12 @@ export class JanitorService implements OnApplicationShutdown { } @bindThis - public onApplicationShutdown(signal?: string | undefined) { + public dispose(): void { clearInterval(this.intervalId); } + + @bindThis + public onApplicationShutdown(signal?: string | undefined): void { + this.dispose(); + } } diff --git a/packages/backend/src/daemons/QueueStatsService.ts b/packages/backend/src/daemons/QueueStatsService.ts index 0a5b3184d2..53a0d14cd7 100644 --- a/packages/backend/src/daemons/QueueStatsService.ts +++ b/packages/backend/src/daemons/QueueStatsService.ts @@ -81,9 +81,14 @@ export class QueueStatsService implements OnApplicationShutdown { this.intervalId = setInterval(tick, interval); } - + @bindThis - public onApplicationShutdown(signal?: string | undefined) { + public dispose(): void { clearInterval(this.intervalId); } + + @bindThis + public onApplicationShutdown(signal?: string | undefined): void { + this.dispose(); + } } diff --git a/packages/backend/src/daemons/ServerStatsService.ts b/packages/backend/src/daemons/ServerStatsService.ts index bb190cf60f..6cd71c0e2a 100644 --- a/packages/backend/src/daemons/ServerStatsService.ts +++ b/packages/backend/src/daemons/ServerStatsService.ts @@ -63,9 +63,14 @@ export class ServerStatsService implements OnApplicationShutdown { } @bindThis - public onApplicationShutdown(signal?: string | undefined) { + public dispose(): void { clearInterval(this.intervalId); } + + @bindThis + public onApplicationShutdown(signal?: string | undefined): void { + this.dispose(); + } } // CPU STAT diff --git a/packages/backend/src/server/ServerService.ts b/packages/backend/src/server/ServerService.ts index 9257fee13e..ce6a1f7043 100644 --- a/packages/backend/src/server/ServerService.ts +++ b/packages/backend/src/server/ServerService.ts @@ -222,7 +222,13 @@ export class ServerService implements OnApplicationShutdown { await fastify.ready(); } - async onApplicationShutdown(signal: string): Promise { + @bindThis + public async dispose(): Promise { await this.#fastify.close(); } + + @bindThis + async onApplicationShutdown(signal: string): Promise { + await this.dispose(); + } } diff --git a/packages/backend/src/server/api/ApiCallService.ts b/packages/backend/src/server/api/ApiCallService.ts index e3483c82c6..dad1a4132a 100644 --- a/packages/backend/src/server/api/ApiCallService.ts +++ b/packages/backend/src/server/api/ApiCallService.ts @@ -359,7 +359,12 @@ export class ApiCallService implements OnApplicationShutdown { } @bindThis - public onApplicationShutdown(signal?: string | undefined) { + public dispose(): void { clearInterval(this.userIpHistoriesClearIntervalId); } + + @bindThis + public onApplicationShutdown(signal?: string | undefined): void { + this.dispose(); + } } -- cgit v1.2.3-freya