summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/QueueProcessorService.ts
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2022-09-18 03:27:08 +0900
committerGitHub <noreply@github.com>2022-09-18 03:27:08 +0900
commitb75184ec8e3436200bacdcd832e3324702553d20 (patch)
tree8b7e316f29e95df921db57289c8b8da476d18f07 /packages/backend/src/queue/QueueProcessorService.ts
parentUpdate ROADMAP.md (diff)
downloadsharkey-b75184ec8e3436200bacdcd832e3324702553d20.tar.gz
sharkey-b75184ec8e3436200bacdcd832e3324702553d20.tar.bz2
sharkey-b75184ec8e3436200bacdcd832e3324702553d20.zip
なんかもうめっちゃ変えた
Diffstat (limited to 'packages/backend/src/queue/QueueProcessorService.ts')
-rw-r--r--packages/backend/src/queue/QueueProcessorService.ts141
1 files changed, 141 insertions, 0 deletions
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts
new file mode 100644
index 0000000000..27afce0824
--- /dev/null
+++ b/packages/backend/src/queue/QueueProcessorService.ts
@@ -0,0 +1,141 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { ModuleRef } from '@nestjs/core';
+import { Config } from '@/config.js';
+import { DI } from '@/di-symbols.js';
+import type Logger from '@/logger.js';
+import { QueueService } from '@/core/QueueService.js';
+import { getJobInfo } from './get-job-info.js';
+import { SystemQueueProcessorsService } from './SystemQueueProcessorsService.js';
+import { ObjectStorageQueueProcessorsService } from './ObjectStorageQueueProcessorsService.js';
+import { DbQueueProcessorsService } from './DbQueueProcessorsService.js';
+import { WebhookDeliverProcessorService } from './processors/WebhookDeliverProcessorService.js';
+import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
+import { DeliverProcessorService } from './processors/DeliverProcessorService.js';
+import { InboxProcessorService } from './processors/InboxProcessorService.js';
+import { QueueLoggerService } from './QueueLoggerService.js';
+
+@Injectable()
+export class QueueProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ private queueLoggerService: QueueLoggerService,
+ private queueService: QueueService,
+ private systemQueueProcessorsService: SystemQueueProcessorsService,
+ private objectStorageQueueProcessorsService: ObjectStorageQueueProcessorsService,
+ private dbQueueProcessorsService: DbQueueProcessorsService,
+ private webhookDeliverProcessorService: WebhookDeliverProcessorService,
+ private endedPollNotificationProcessorService: EndedPollNotificationProcessorService,
+ private deliverProcessorService: DeliverProcessorService,
+ private inboxProcessorService: InboxProcessorService,
+ ) {
+ this.#logger = this.queueLoggerService.logger;
+ }
+
+ public start() {
+ function renderError(e: Error): any {
+ return {
+ stack: e.stack,
+ message: e.message,
+ name: e.name,
+ };
+ }
+
+ const systemLogger = this.#logger.createSubLogger('system');
+ const deliverLogger = this.#logger.createSubLogger('deliver');
+ const webhookLogger = this.#logger.createSubLogger('webhook');
+ const inboxLogger = this.#logger.createSubLogger('inbox');
+ const dbLogger = this.#logger.createSubLogger('db');
+ const objectStorageLogger = this.#logger.createSubLogger('objectStorage');
+
+ this.queueService.systemQueue
+ .on('waiting', (jobId) => systemLogger.debug(`waiting id=${jobId}`))
+ .on('active', (job) => systemLogger.debug(`active id=${job.id}`))
+ .on('completed', (job, result) => systemLogger.debug(`completed(${result}) id=${job.id}`))
+ .on('failed', (job, err) => systemLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
+ .on('error', (job: any, err: Error) => systemLogger.error(`error ${err}`, { job, e: renderError(err) }))
+ .on('stalled', (job) => systemLogger.warn(`stalled id=${job.id}`));
+
+ this.queueService.deliverQueue
+ .on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`))
+ .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`))
+ .on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`, { job, e: renderError(err) }))
+ .on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
+
+ this.queueService.inboxQueue
+ .on('waiting', (jobId) => inboxLogger.debug(`waiting id=${jobId}`))
+ .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`))
+ .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
+ .on('failed', (job, err) => inboxLogger.warn(`failed(${err}) ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`, { job, e: renderError(err) }))
+ .on('error', (job: any, err: Error) => inboxLogger.error(`error ${err}`, { job, e: renderError(err) }))
+ .on('stalled', (job) => inboxLogger.warn(`stalled ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`));
+
+ this.queueService.dbQueue
+ .on('waiting', (jobId) => dbLogger.debug(`waiting id=${jobId}`))
+ .on('active', (job) => dbLogger.debug(`active id=${job.id}`))
+ .on('completed', (job, result) => dbLogger.debug(`completed(${result}) id=${job.id}`))
+ .on('failed', (job, err) => dbLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
+ .on('error', (job: any, err: Error) => dbLogger.error(`error ${err}`, { job, e: renderError(err) }))
+ .on('stalled', (job) => dbLogger.warn(`stalled id=${job.id}`));
+
+ this.queueService.objectStorageQueue
+ .on('waiting', (jobId) => objectStorageLogger.debug(`waiting id=${jobId}`))
+ .on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`))
+ .on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`))
+ .on('failed', (job, err) => objectStorageLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
+ .on('error', (job: any, err: Error) => objectStorageLogger.error(`error ${err}`, { job, e: renderError(err) }))
+ .on('stalled', (job) => objectStorageLogger.warn(`stalled id=${job.id}`));
+
+ this.queueService.webhookDeliverQueue
+ .on('waiting', (jobId) => webhookLogger.debug(`waiting id=${jobId}`))
+ .on('active', (job) => webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('completed', (job, result) => webhookLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('failed', (job, err) => webhookLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`))
+ .on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) }))
+ .on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
+
+ this.queueService.deliverQueue.process(this.config.deliverJobConcurrency ?? 128, (job, done) => this.deliverProcessorService.process(job));
+ this.queueService.inboxQueue.process(this.config.inboxJobConcurrency ?? 16, (job, done) => this.inboxProcessorService.process(job));
+ this.queueService.endedPollNotificationQueue.process((job, done) => this.endedPollNotificationProcessorService.process(job, done));
+ this.queueService.webhookDeliverQueue.process(64, (job, done) => this.webhookDeliverProcessorService.process(job));
+ this.dbQueueProcessorsService.start(this.queueService.dbQueue);
+ this.objectStorageQueueProcessorsService.start(this.queueService.objectStorageQueue);
+
+ this.queueService.systemQueue.add('tickCharts', {
+ }, {
+ repeat: { cron: '55 * * * *' },
+ removeOnComplete: true,
+ });
+
+ this.queueService.systemQueue.add('resyncCharts', {
+ }, {
+ repeat: { cron: '0 0 * * *' },
+ removeOnComplete: true,
+ });
+
+ this.queueService.systemQueue.add('cleanCharts', {
+ }, {
+ repeat: { cron: '0 0 * * *' },
+ removeOnComplete: true,
+ });
+
+ this.queueService.systemQueue.add('clean', {
+ }, {
+ repeat: { cron: '0 0 * * *' },
+ removeOnComplete: true,
+ });
+
+ this.queueService.systemQueue.add('checkExpiredMutings', {
+ }, {
+ repeat: { cron: '*/5 * * * *' },
+ removeOnComplete: true,
+ });
+
+ this.systemQueueProcessorsService.start(this.queueService.systemQueue);
+ }
+}