summaryrefslogtreecommitdiff
path: root/packages/backend/src/core/QueueModule.ts
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2023-06-05 19:47:08 +0900
committerGitHub <noreply@github.com>2023-06-05 19:47:08 +0900
commit407a965c1d78db9b13ec89a7be910b3c120aafcf (patch)
tree33e00f7a00c4e33b2c95a6e2aba85cea7b9f05f6 /packages/backend/src/core/QueueModule.ts
parentMerge pull request #10833 from misskey-dev/develop (diff)
parentMerge branch 'develop' of https://github.com/misskey-dev/misskey into develop (diff)
downloadmisskey-407a965c1d78db9b13ec89a7be910b3c120aafcf.tar.gz
misskey-407a965c1d78db9b13ec89a7be910b3c120aafcf.tar.bz2
misskey-407a965c1d78db9b13ec89a7be910b3c120aafcf.zip
Merge pull request #10932 from misskey-dev/develop
Release: 13.13.0
Diffstat (limited to 'packages/backend/src/core/QueueModule.ts')
-rw-r--r--packages/backend/src/core/QueueModule.ts59
1 files changed, 16 insertions, 43 deletions
diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts
index 1d73947776..3384ca4577 100644
--- a/packages/backend/src/core/QueueModule.ts
+++ b/packages/backend/src/core/QueueModule.ts
@@ -1,42 +1,11 @@
import { setTimeout } from 'node:timers/promises';
import { Inject, Module, OnApplicationShutdown } from '@nestjs/common';
-import Bull from 'bull';
+import * as Bull from 'bullmq';
import { DI } from '@/di-symbols.js';
import type { Config } from '@/config.js';
+import { QUEUE, baseQueueOptions } from '@/queue/const.js';
import type { Provider } from '@nestjs/common';
-import type { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData, DbJobMap } from '../queue/types.js';
-
-function q<T>(config: Config, name: string, limitPerSec = -1) {
- return new Bull<T>(name, {
- redis: {
- port: config.redisForJobQueue.port,
- host: config.redisForJobQueue.host,
- family: config.redisForJobQueue.family == null ? 0 : config.redisForJobQueue.family,
- password: config.redisForJobQueue.pass,
- db: config.redisForJobQueue.db ?? 0,
- },
- prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue` : 'queue',
- limiter: limitPerSec > 0 ? {
- max: limitPerSec,
- duration: 1000,
- } : undefined,
- settings: {
- backoffStrategies: {
- apBackoff,
- },
- },
- });
-}
-
-// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
-function apBackoff(attemptsMade: number, err: Error) {
- const baseDelay = 60 * 1000; // 1min
- const maxBackoff = 8 * 60 * 60 * 1000; // 8hours
- let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay;
- backoff = Math.min(backoff, maxBackoff);
- backoff += Math.round(backoff * Math.random() * 0.2);
- return backoff;
-}
+import type { DeliverJobData, InboxJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData } from '../queue/types.js';
export type SystemQueue = Bull.Queue<Record<string, unknown>>;
export type EndedPollNotificationQueue = Bull.Queue<EndedPollNotificationJobData>;
@@ -49,49 +18,49 @@ export type WebhookDeliverQueue = Bull.Queue<WebhookDeliverJobData>;
const $system: Provider = {
provide: 'queue:system',
- useFactory: (config: Config) => q(config, 'system'),
+ useFactory: (config: Config) => new Bull.Queue(QUEUE.SYSTEM, baseQueueOptions(config, QUEUE.SYSTEM)),
inject: [DI.config],
};
const $endedPollNotification: Provider = {
provide: 'queue:endedPollNotification',
- useFactory: (config: Config) => q(config, 'endedPollNotification'),
+ useFactory: (config: Config) => new Bull.Queue(QUEUE.ENDED_POLL_NOTIFICATION, baseQueueOptions(config, QUEUE.ENDED_POLL_NOTIFICATION)),
inject: [DI.config],
};
const $deliver: Provider = {
provide: 'queue:deliver',
- useFactory: (config: Config) => q(config, 'deliver', config.deliverJobPerSec ?? 128),
+ useFactory: (config: Config) => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(config, QUEUE.DELIVER)),
inject: [DI.config],
};
const $inbox: Provider = {
provide: 'queue:inbox',
- useFactory: (config: Config) => q(config, 'inbox', config.inboxJobPerSec ?? 16),
+ useFactory: (config: Config) => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(config, QUEUE.INBOX)),
inject: [DI.config],
};
const $db: Provider = {
provide: 'queue:db',
- useFactory: (config: Config) => q(config, 'db'),
+ useFactory: (config: Config) => new Bull.Queue(QUEUE.DB, baseQueueOptions(config, QUEUE.DB)),
inject: [DI.config],
};
const $relationship: Provider = {
provide: 'queue:relationship',
- useFactory: (config: Config) => q(config, 'relationship', config.relashionshipJobPerSec ?? 64),
+ useFactory: (config: Config) => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(config, QUEUE.RELATIONSHIP)),
inject: [DI.config],
};
const $objectStorage: Provider = {
provide: 'queue:objectStorage',
- useFactory: (config: Config) => q(config, 'objectStorage'),
+ useFactory: (config: Config) => new Bull.Queue(QUEUE.OBJECT_STORAGE, baseQueueOptions(config, QUEUE.OBJECT_STORAGE)),
inject: [DI.config],
};
const $webhookDeliver: Provider = {
provide: 'queue:webhookDeliver',
- useFactory: (config: Config) => q(config, 'webhookDeliver', 64),
+ useFactory: (config: Config) => new Bull.Queue(QUEUE.WEBHOOK_DELIVER, baseQueueOptions(config, QUEUE.WEBHOOK_DELIVER)),
inject: [DI.config],
};
@@ -131,7 +100,7 @@ export class QueueModule implements OnApplicationShutdown {
@Inject('queue:webhookDeliver') public webhookDeliverQueue: WebhookDeliverQueue,
) {}
- async onApplicationShutdown(signal: string): Promise<void> {
+ public async dispose(): Promise<void> {
if (process.env.NODE_ENV === 'test') {
// XXX:
// Shutting down the existing connections causes errors on Jest as
@@ -151,4 +120,8 @@ export class QueueModule implements OnApplicationShutdown {
this.webhookDeliverQueue.close(),
]);
}
+
+ async onApplicationShutdown(signal: string): Promise<void> {
+ await this.dispose();
+ }
}