summaryrefslogtreecommitdiff
path: root/packages/backend/src/core/QueueModule.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/core/QueueModule.ts')
-rw-r--r--packages/backend/src/core/QueueModule.ts53
1 files changed, 11 insertions, 42 deletions
diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts
index 1d73947776..6db9bb14cf 100644
--- a/packages/backend/src/core/QueueModule.ts
+++ b/packages/backend/src/core/QueueModule.ts
@@ -1,42 +1,11 @@
import { setTimeout } from 'node:timers/promises';
import { Inject, Module, OnApplicationShutdown } from '@nestjs/common';
-import Bull from 'bull';
+import * as Bull from 'bullmq';
import { DI } from '@/di-symbols.js';
import type { Config } from '@/config.js';
+import { QUEUE, baseQueueOptions } from '@/queue/const.js';
import type { Provider } from '@nestjs/common';
-import type { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData, DbJobMap } from '../queue/types.js';
-
-function q<T>(config: Config, name: string, limitPerSec = -1) {
- return new Bull<T>(name, {
- redis: {
- port: config.redisForJobQueue.port,
- host: config.redisForJobQueue.host,
- family: config.redisForJobQueue.family == null ? 0 : config.redisForJobQueue.family,
- password: config.redisForJobQueue.pass,
- db: config.redisForJobQueue.db ?? 0,
- },
- prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue` : 'queue',
- limiter: limitPerSec > 0 ? {
- max: limitPerSec,
- duration: 1000,
- } : undefined,
- settings: {
- backoffStrategies: {
- apBackoff,
- },
- },
- });
-}
-
-// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
-function apBackoff(attemptsMade: number, err: Error) {
- const baseDelay = 60 * 1000; // 1min
- const maxBackoff = 8 * 60 * 60 * 1000; // 8hours
- let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay;
- backoff = Math.min(backoff, maxBackoff);
- backoff += Math.round(backoff * Math.random() * 0.2);
- return backoff;
-}
+import type { DeliverJobData, InboxJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData } from '../queue/types.js';
export type SystemQueue = Bull.Queue<Record<string, unknown>>;
export type EndedPollNotificationQueue = Bull.Queue<EndedPollNotificationJobData>;
@@ -49,49 +18,49 @@ export type WebhookDeliverQueue = Bull.Queue<WebhookDeliverJobData>;
const $system: Provider = {
provide: 'queue:system',
- useFactory: (config: Config) => q(config, 'system'),
+ useFactory: (config: Config) => new Bull.Queue(QUEUE.SYSTEM, baseQueueOptions(config, QUEUE.SYSTEM)),
inject: [DI.config],
};
const $endedPollNotification: Provider = {
provide: 'queue:endedPollNotification',
- useFactory: (config: Config) => q(config, 'endedPollNotification'),
+ useFactory: (config: Config) => new Bull.Queue(QUEUE.ENDED_POLL_NOTIFICATION, baseQueueOptions(config, QUEUE.ENDED_POLL_NOTIFICATION)),
inject: [DI.config],
};
const $deliver: Provider = {
provide: 'queue:deliver',
- useFactory: (config: Config) => q(config, 'deliver', config.deliverJobPerSec ?? 128),
+ useFactory: (config: Config) => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(config, QUEUE.DELIVER)),
inject: [DI.config],
};
const $inbox: Provider = {
provide: 'queue:inbox',
- useFactory: (config: Config) => q(config, 'inbox', config.inboxJobPerSec ?? 16),
+ useFactory: (config: Config) => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(config, QUEUE.INBOX)),
inject: [DI.config],
};
const $db: Provider = {
provide: 'queue:db',
- useFactory: (config: Config) => q(config, 'db'),
+ useFactory: (config: Config) => new Bull.Queue(QUEUE.DB, baseQueueOptions(config, QUEUE.DB)),
inject: [DI.config],
};
const $relationship: Provider = {
provide: 'queue:relationship',
- useFactory: (config: Config) => q(config, 'relationship', config.relashionshipJobPerSec ?? 64),
+ useFactory: (config: Config) => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(config, QUEUE.RELATIONSHIP)),
inject: [DI.config],
};
const $objectStorage: Provider = {
provide: 'queue:objectStorage',
- useFactory: (config: Config) => q(config, 'objectStorage'),
+ useFactory: (config: Config) => new Bull.Queue(QUEUE.OBJECT_STORAGE, baseQueueOptions(config, QUEUE.OBJECT_STORAGE)),
inject: [DI.config],
};
const $webhookDeliver: Provider = {
provide: 'queue:webhookDeliver',
- useFactory: (config: Config) => q(config, 'webhookDeliver', 64),
+ useFactory: (config: Config) => new Bull.Queue(QUEUE.WEBHOOK_DELIVER, baseQueueOptions(config, QUEUE.WEBHOOK_DELIVER)),
inject: [DI.config],
};