summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/queue')
-rw-r--r--packages/backend/src/queue/QueueProcessorService.ts20
-rw-r--r--packages/backend/src/queue/const.ts10
2 files changed, 20 insertions, 10 deletions
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts
index 6940e1c188..c98ebcdcd9 100644
--- a/packages/backend/src/queue/QueueProcessorService.ts
+++ b/packages/backend/src/queue/QueueProcessorService.ts
@@ -44,7 +44,7 @@ import { BakeBufferedReactionsProcessorService } from './processors/BakeBuffered
import { CleanProcessorService } from './processors/CleanProcessorService.js';
import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
import { QueueLoggerService } from './QueueLoggerService.js';
-import { QUEUE, baseQueueOptions } from './const.js';
+import { QUEUE, baseWorkerOptions } from './const.js';
// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
function httpRelatedBackoff(attemptsMade: number) {
@@ -175,7 +175,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return processer(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.SYSTEM),
+ ...baseWorkerOptions(this.config, QUEUE.SYSTEM),
autorun: false,
});
@@ -232,7 +232,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return processer(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.DB),
+ ...baseWorkerOptions(this.config, QUEUE.DB),
autorun: false,
});
@@ -264,7 +264,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return this.deliverProcessorService.process(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.DELIVER),
+ ...baseWorkerOptions(this.config, QUEUE.DELIVER),
autorun: false,
concurrency: this.config.deliverJobConcurrency ?? 128,
limiter: {
@@ -304,7 +304,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return this.inboxProcessorService.process(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.INBOX),
+ ...baseWorkerOptions(this.config, QUEUE.INBOX),
autorun: false,
concurrency: this.config.inboxJobConcurrency ?? 16,
limiter: {
@@ -344,7 +344,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return this.userWebhookDeliverProcessorService.process(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.USER_WEBHOOK_DELIVER),
+ ...baseWorkerOptions(this.config, QUEUE.USER_WEBHOOK_DELIVER),
autorun: false,
concurrency: 64,
limiter: {
@@ -384,7 +384,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return this.systemWebhookDeliverProcessorService.process(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.SYSTEM_WEBHOOK_DELIVER),
+ ...baseWorkerOptions(this.config, QUEUE.SYSTEM_WEBHOOK_DELIVER),
autorun: false,
concurrency: 16,
limiter: {
@@ -434,7 +434,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return processer(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.RELATIONSHIP),
+ ...baseWorkerOptions(this.config, QUEUE.RELATIONSHIP),
autorun: false,
concurrency: this.config.relationshipJobConcurrency ?? 16,
limiter: {
@@ -479,7 +479,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return processer(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.OBJECT_STORAGE),
+ ...baseWorkerOptions(this.config, QUEUE.OBJECT_STORAGE),
autorun: false,
concurrency: 16,
});
@@ -512,7 +512,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return this.endedPollNotificationProcessorService.process(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION),
+ ...baseWorkerOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION),
autorun: false,
});
}
diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts
index 67f689b618..7e146a7e03 100644
--- a/packages/backend/src/queue/const.ts
+++ b/packages/backend/src/queue/const.ts
@@ -3,6 +3,7 @@
* SPDX-License-Identifier: AGPL-3.0-only
*/
+import { MetricsTime } from 'bullmq';
import { Config } from '@/config.js';
import type * as Bull from 'bullmq';
@@ -27,3 +28,12 @@ export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof t
prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue:${queueName}` : `queue:${queueName}`,
};
}
+
+export function baseWorkerOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.WorkerOptions {
+ return {
+ ...baseQueueOptions(config, queueName),
+ metrics: {
+ maxDataPoints: MetricsTime.ONE_WEEK,
+ },
+ };
+}