diff options
Diffstat (limited to 'packages/backend/src/core/QueueModule.ts')
| -rw-r--r-- | packages/backend/src/core/QueueModule.ts | 53 |
1 files changed, 11 insertions, 42 deletions
diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts index 1d73947776..6db9bb14cf 100644 --- a/packages/backend/src/core/QueueModule.ts +++ b/packages/backend/src/core/QueueModule.ts @@ -1,42 +1,11 @@ import { setTimeout } from 'node:timers/promises'; import { Inject, Module, OnApplicationShutdown } from '@nestjs/common'; -import Bull from 'bull'; +import * as Bull from 'bullmq'; import { DI } from '@/di-symbols.js'; import type { Config } from '@/config.js'; +import { QUEUE, baseQueueOptions } from '@/queue/const.js'; import type { Provider } from '@nestjs/common'; -import type { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData, DbJobMap } from '../queue/types.js'; - -function q<T>(config: Config, name: string, limitPerSec = -1) { - return new Bull<T>(name, { - redis: { - port: config.redisForJobQueue.port, - host: config.redisForJobQueue.host, - family: config.redisForJobQueue.family == null ? 0 : config.redisForJobQueue.family, - password: config.redisForJobQueue.pass, - db: config.redisForJobQueue.db ?? 0, - }, - prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue` : 'queue', - limiter: limitPerSec > 0 ? { - max: limitPerSec, - duration: 1000, - } : undefined, - settings: { - backoffStrategies: { - apBackoff, - }, - }, - }); -} - -// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019 -function apBackoff(attemptsMade: number, err: Error) { - const baseDelay = 60 * 1000; // 1min - const maxBackoff = 8 * 60 * 60 * 1000; // 8hours - let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay; - backoff = Math.min(backoff, maxBackoff); - backoff += Math.round(backoff * Math.random() * 0.2); - return backoff; -} +import type { DeliverJobData, InboxJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData } from '../queue/types.js'; export type SystemQueue = Bull.Queue<Record<string, unknown>>; export type EndedPollNotificationQueue = Bull.Queue<EndedPollNotificationJobData>; @@ -49,49 +18,49 @@ export type WebhookDeliverQueue = Bull.Queue<WebhookDeliverJobData>; const $system: Provider = { provide: 'queue:system', - useFactory: (config: Config) => q(config, 'system'), + useFactory: (config: Config) => new Bull.Queue(QUEUE.SYSTEM, baseQueueOptions(config, QUEUE.SYSTEM)), inject: [DI.config], }; const $endedPollNotification: Provider = { provide: 'queue:endedPollNotification', - useFactory: (config: Config) => q(config, 'endedPollNotification'), + useFactory: (config: Config) => new Bull.Queue(QUEUE.ENDED_POLL_NOTIFICATION, baseQueueOptions(config, QUEUE.ENDED_POLL_NOTIFICATION)), inject: [DI.config], }; const $deliver: Provider = { provide: 'queue:deliver', - useFactory: (config: Config) => q(config, 'deliver', config.deliverJobPerSec ?? 128), + useFactory: (config: Config) => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(config, QUEUE.DELIVER)), inject: [DI.config], }; const $inbox: Provider = { provide: 'queue:inbox', - useFactory: (config: Config) => q(config, 'inbox', config.inboxJobPerSec ?? 16), + useFactory: (config: Config) => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(config, QUEUE.INBOX)), inject: [DI.config], }; const $db: Provider = { provide: 'queue:db', - useFactory: (config: Config) => q(config, 'db'), + useFactory: (config: Config) => new Bull.Queue(QUEUE.DB, baseQueueOptions(config, QUEUE.DB)), inject: [DI.config], }; const $relationship: Provider = { provide: 'queue:relationship', - useFactory: (config: Config) => q(config, 'relationship', config.relashionshipJobPerSec ?? 64), + useFactory: (config: Config) => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(config, QUEUE.RELATIONSHIP)), inject: [DI.config], }; const $objectStorage: Provider = { provide: 'queue:objectStorage', - useFactory: (config: Config) => q(config, 'objectStorage'), + useFactory: (config: Config) => new Bull.Queue(QUEUE.OBJECT_STORAGE, baseQueueOptions(config, QUEUE.OBJECT_STORAGE)), inject: [DI.config], }; const $webhookDeliver: Provider = { provide: 'queue:webhookDeliver', - useFactory: (config: Config) => q(config, 'webhookDeliver', 64), + useFactory: (config: Config) => new Bull.Queue(QUEUE.WEBHOOK_DELIVER, baseQueueOptions(config, QUEUE.WEBHOOK_DELIVER)), inject: [DI.config], }; |