summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/QueueProcessorService.ts
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2023-05-29 11:54:49 +0900
committerGitHub <noreply@github.com>2023-05-29 11:54:49 +0900
commitfd7b77c542b51313d8b8ea60124725fe65a295d5 (patch)
tree78893fdfe273496831124414789376b1e2a18997 /packages/backend/src/queue/QueueProcessorService.ts
parentpnpm devでCtrl+Cで終了させてもプロセスが完全に殺せないの... (diff)
downloadsharkey-fd7b77c542b51313d8b8ea60124725fe65a295d5.tar.gz
sharkey-fd7b77c542b51313d8b8ea60124725fe65a295d5.tar.bz2
sharkey-fd7b77c542b51313d8b8ea60124725fe65a295d5.zip
enhance(backend): migrate bull to bullmq (#10910)
* wip * wip * Update QueueService.ts * wip * refactor * :v: * fix * Update QueueStatsService.ts * refactor * Update ApNoteService.ts * Update mock-resolver.ts * refactor * Update mock-resolver.ts
Diffstat (limited to 'packages/backend/src/queue/QueueProcessorService.ts')
-rw-r--r--packages/backend/src/queue/QueueProcessorService.ts304
1 files changed, 187 insertions, 117 deletions
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts
index dc025f9889..011082cd36 100644
--- a/packages/backend/src/queue/QueueProcessorService.ts
+++ b/packages/backend/src/queue/QueueProcessorService.ts
@@ -1,10 +1,9 @@
import { Inject, Injectable } from '@nestjs/common';
+import * as Bull from 'bullmq';
import type { Config } from '@/config.js';
import { DI } from '@/di-symbols.js';
import type Logger from '@/logger.js';
-import { QueueService } from '@/core/QueueService.js';
import { bindThis } from '@/decorators.js';
-import { getJobInfo } from './get-job-info.js';
import { WebhookDeliverProcessorService } from './processors/WebhookDeliverProcessorService.js';
import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
import { DeliverProcessorService } from './processors/DeliverProcessorService.js';
@@ -35,6 +34,33 @@ import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMu
import { CleanProcessorService } from './processors/CleanProcessorService.js';
import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
import { QueueLoggerService } from './QueueLoggerService.js';
+import { QUEUE, baseQueueOptions } from './const.js';
+
+// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
+function httpRelatedBackoff(attemptsMade: number) {
+ const baseDelay = 60 * 1000; // 1min
+ const maxBackoff = 8 * 60 * 60 * 1000; // 8hours
+ let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay;
+ backoff = Math.min(backoff, maxBackoff);
+ backoff += Math.round(backoff * Math.random() * 0.2);
+ return backoff;
+}
+
+function getJobInfo(job: Bull.Job | undefined, increment = false): string {
+ if (job == null) return '-';
+
+ const age = Date.now() - job.timestamp;
+
+ const formated = age > 60000 ? `${Math.floor(age / 1000 / 60)}m`
+ : age > 10000 ? `${Math.floor(age / 1000)}s`
+ : `${age}ms`;
+
+ // onActiveとかonCompletedのattemptsMadeがなぜか0始まりなのでインクリメントする
+ const currentAttempts = job.attemptsMade + (increment ? 1 : 0);
+ const maxAttempts = job.opts ? job.opts.attempts : 0;
+
+ return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`;
+}
@Injectable()
export class QueueProcessorService {
@@ -45,7 +71,6 @@ export class QueueProcessorService {
private config: Config,
private queueLoggerService: QueueLoggerService,
- private queueService: QueueService,
private webhookDeliverProcessorService: WebhookDeliverProcessorService,
private endedPollNotificationProcessorService: EndedPollNotificationProcessorService,
private deliverProcessorService: DeliverProcessorService,
@@ -97,146 +122,191 @@ export class QueueProcessorService {
}
}
+ //#region system
+ const systemQueueWorker = new Bull.Worker(QUEUE.SYSTEM, (job) => {
+ switch (job.name) {
+ case 'tickCharts': return this.tickChartsProcessorService.process();
+ case 'resyncCharts': return this.resyncChartsProcessorService.process();
+ case 'cleanCharts': return this.cleanChartsProcessorService.process();
+ case 'aggregateRetention': return this.aggregateRetentionProcessorService.process();
+ case 'checkExpiredMutings': return this.checkExpiredMutingsProcessorService.process();
+ case 'clean': return this.cleanProcessorService.process();
+ default: throw new Error(`unrecognized job type ${job.name} for system`);
+ }
+ }, {
+ ...baseQueueOptions(this.config, QUEUE.SYSTEM),
+ });
+
const systemLogger = this.logger.createSubLogger('system');
- const deliverLogger = this.logger.createSubLogger('deliver');
- const webhookLogger = this.logger.createSubLogger('webhook');
- const inboxLogger = this.logger.createSubLogger('inbox');
- const dbLogger = this.logger.createSubLogger('db');
- const relationshipLogger = this.logger.createSubLogger('relationship');
- const objectStorageLogger = this.logger.createSubLogger('objectStorage');
- this.queueService.systemQueue
- .on('waiting', (jobId) => systemLogger.debug(`waiting id=${jobId}`))
+ systemQueueWorker
.on('active', (job) => systemLogger.debug(`active id=${job.id}`))
.on('completed', (job, result) => systemLogger.debug(`completed(${result}) id=${job.id}`))
- .on('failed', (job, err) => systemLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
- .on('error', (job: any, err: Error) => systemLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => systemLogger.warn(`stalled id=${job.id}`));
+ .on('failed', (job, err) => systemLogger.warn(`failed(${err}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }))
+ .on('error', (err: Error) => systemLogger.error(`error ${err}`, { e: renderError(err) }))
+ .on('stalled', (jobId) => systemLogger.warn(`stalled id=${jobId}`));
+ //#endregion
- this.queueService.deliverQueue
- .on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`))
- .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
- .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
- .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`))
- .on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
+ //#region db
+ const dbQueueWorker = new Bull.Worker(QUEUE.DB, (job) => {
+ switch (job.name) {
+ case 'deleteDriveFiles': return this.deleteDriveFilesProcessorService.process(job);
+ case 'exportCustomEmojis': return this.exportCustomEmojisProcessorService.process(job);
+ case 'exportNotes': return this.exportNotesProcessorService.process(job);
+ case 'exportFavorites': return this.exportFavoritesProcessorService.process(job);
+ case 'exportFollowing': return this.exportFollowingProcessorService.process(job);
+ case 'exportMuting': return this.exportMutingProcessorService.process(job);
+ case 'exportBlocking': return this.exportBlockingProcessorService.process(job);
+ case 'exportUserLists': return this.exportUserListsProcessorService.process(job);
+ case 'exportAntennas': return this.exportAntennasProcessorService.process(job);
+ case 'importFollowing': return this.importFollowingProcessorService.process(job);
+ case 'importFollowingToDb': return this.importFollowingProcessorService.processDb(job);
+ case 'importMuting': return this.importMutingProcessorService.process(job);
+ case 'importBlocking': return this.importBlockingProcessorService.process(job);
+ case 'importBlockingToDb': return this.importBlockingProcessorService.processDb(job);
+ case 'importUserLists': return this.importUserListsProcessorService.process(job);
+ case 'importCustomEmojis': return this.importCustomEmojisProcessorService.process(job);
+ case 'importAntennas': return this.importAntennasProcessorService.process(job);
+ case 'deleteAccount': return this.deleteAccountProcessorService.process(job);
+ default: throw new Error(`unrecognized job type ${job.name} for db`);
+ }
+ }, {
+ ...baseQueueOptions(this.config, QUEUE.DB),
+ });
- this.queueService.inboxQueue
- .on('waiting', (jobId) => inboxLogger.debug(`waiting id=${jobId}`))
- .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`))
- .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
- .on('failed', (job, err) => inboxLogger.warn(`failed(${err}) ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`, { job, e: renderError(err) }))
- .on('error', (job: any, err: Error) => inboxLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => inboxLogger.warn(`stalled ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`));
+ const dbLogger = this.logger.createSubLogger('db');
- this.queueService.dbQueue
- .on('waiting', (jobId) => dbLogger.debug(`waiting id=${jobId}`))
+ dbQueueWorker
.on('active', (job) => dbLogger.debug(`active id=${job.id}`))
.on('completed', (job, result) => dbLogger.debug(`completed(${result}) id=${job.id}`))
- .on('failed', (job, err) => dbLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
- .on('error', (job: any, err: Error) => dbLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => dbLogger.warn(`stalled id=${job.id}`));
+ .on('failed', (job, err) => dbLogger.warn(`failed(${err}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }))
+ .on('error', (err: Error) => dbLogger.error(`error ${err}`, { e: renderError(err) }))
+ .on('stalled', (jobId) => dbLogger.warn(`stalled id=${jobId}`));
+ //#endregion
- this.queueService.relationshipQueue
- .on('waiting', (jobId) => relationshipLogger.debug(`waiting id=${jobId}`))
- .on('active', (job) => relationshipLogger.debug(`active id=${job.id}`))
- .on('completed', (job, result) => relationshipLogger.debug(`completed(${result}) id=${job.id}`))
- .on('failed', (job, err) => relationshipLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
- .on('error', (job: any, err: Error) => relationshipLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => relationshipLogger.warn(`stalled id=${job.id}`));
+ //#region deliver
+ const deliverQueueWorker = new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), {
+ ...baseQueueOptions(this.config, QUEUE.DELIVER),
+ concurrency: this.config.deliverJobConcurrency ?? 128,
+ limiter: {
+ max: this.config.deliverJobPerSec ?? 128,
+ duration: 1000,
+ },
+ settings: {
+ backoffStrategy: httpRelatedBackoff,
+ },
+ });
- this.queueService.objectStorageQueue
- .on('waiting', (jobId) => objectStorageLogger.debug(`waiting id=${jobId}`))
- .on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`))
- .on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`))
- .on('failed', (job, err) => objectStorageLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
- .on('error', (job: any, err: Error) => objectStorageLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => objectStorageLogger.warn(`stalled id=${job.id}`));
+ const deliverLogger = this.logger.createSubLogger('deliver');
- this.queueService.webhookDeliverQueue
- .on('waiting', (jobId) => webhookLogger.debug(`waiting id=${jobId}`))
- .on('active', (job) => webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
- .on('completed', (job, result) => webhookLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
- .on('failed', (job, err) => webhookLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`))
- .on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
+ deliverQueueWorker
+ .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`))
+ .on('error', (err: Error) => deliverLogger.error(`error ${err}`, { e: renderError(err) }))
+ .on('stalled', (jobId) => deliverLogger.warn(`stalled id=${jobId}`));
+ //#endregion
- this.queueService.systemQueue.add('tickCharts', {
- }, {
- repeat: { cron: '55 * * * *' },
- removeOnComplete: true,
+ //#region inbox
+ const inboxQueueWorker = new Bull.Worker(QUEUE.INBOX, (job) => this.inboxProcessorService.process(job), {
+ ...baseQueueOptions(this.config, QUEUE.INBOX),
+ concurrency: this.config.inboxJobConcurrency ?? 16,
+ limiter: {
+ max: this.config.inboxJobPerSec ?? 16,
+ duration: 1000,
+ },
+ settings: {
+ backoffStrategy: httpRelatedBackoff,
+ },
});
- this.queueService.systemQueue.add('resyncCharts', {
- }, {
- repeat: { cron: '0 0 * * *' },
- removeOnComplete: true,
- });
+ const inboxLogger = this.logger.createSubLogger('inbox');
- this.queueService.systemQueue.add('cleanCharts', {
- }, {
- repeat: { cron: '0 0 * * *' },
- removeOnComplete: true,
- });
+ inboxQueueWorker
+ .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`))
+ .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
+ .on('failed', (job, err) => inboxLogger.warn(`failed(${err}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, e: renderError(err) }))
+ .on('error', (err: Error) => inboxLogger.error(`error ${err}`, { e: renderError(err) }))
+ .on('stalled', (jobId) => inboxLogger.warn(`stalled id=${jobId}`));
+ //#endregion
- this.queueService.systemQueue.add('aggregateRetention', {
- }, {
- repeat: { cron: '0 0 * * *' },
- removeOnComplete: true,
+ //#region webhook deliver
+ const webhookDeliverQueueWorker = new Bull.Worker(QUEUE.WEBHOOK_DELIVER, (job) => this.webhookDeliverProcessorService.process(job), {
+ ...baseQueueOptions(this.config, QUEUE.WEBHOOK_DELIVER),
+ concurrency: 64,
+ limiter: {
+ max: 64,
+ duration: 1000,
+ },
+ settings: {
+ backoffStrategy: httpRelatedBackoff,
+ },
});
- this.queueService.systemQueue.add('clean', {
+ const webhookLogger = this.logger.createSubLogger('webhook');
+
+ webhookDeliverQueueWorker
+ .on('active', (job) => webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('completed', (job, result) => webhookLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('failed', (job, err) => webhookLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`))
+ .on('error', (err: Error) => webhookLogger.error(`error ${err}`, { e: renderError(err) }))
+ .on('stalled', (jobId) => webhookLogger.warn(`stalled id=${jobId}`));
+ //#endregion
+
+ //#region relationship
+ const relationshipQueueWorker = new Bull.Worker(QUEUE.RELATIONSHIP, (job) => {
+ switch (job.name) {
+ case 'follow': return this.relationshipProcessorService.processFollow(job);
+ case 'unfollow': return this.relationshipProcessorService.processUnfollow(job);
+ case 'block': return this.relationshipProcessorService.processBlock(job);
+ case 'unblock': return this.relationshipProcessorService.processUnblock(job);
+ default: throw new Error(`unrecognized job type ${job.name} for relationship`);
+ }
}, {
- repeat: { cron: '0 0 * * *' },
- removeOnComplete: true,
+ ...baseQueueOptions(this.config, QUEUE.RELATIONSHIP),
+ concurrency: this.config.relashionshipJobConcurrency ?? 16,
+ limiter: {
+ max: this.config.relashionshipJobPerSec ?? 64,
+ duration: 1000,
+ },
});
- this.queueService.systemQueue.add('checkExpiredMutings', {
+ const relationshipLogger = this.logger.createSubLogger('relationship');
+
+ relationshipQueueWorker
+ .on('active', (job) => relationshipLogger.debug(`active id=${job.id}`))
+ .on('completed', (job, result) => relationshipLogger.debug(`completed(${result}) id=${job.id}`))
+ .on('failed', (job, err) => relationshipLogger.warn(`failed(${err}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }))
+ .on('error', (err: Error) => relationshipLogger.error(`error ${err}`, { e: renderError(err) }))
+ .on('stalled', (jobId) => relationshipLogger.warn(`stalled id=${jobId}`));
+ //#endregion
+
+ //#region object storage
+ const objectStorageQueueWorker = new Bull.Worker(QUEUE.OBJECT_STORAGE, (job) => {
+ switch (job.name) {
+ case 'deleteFile': return this.deleteFileProcessorService.process(job);
+ case 'cleanRemoteFiles': return this.cleanRemoteFilesProcessorService.process(job);
+ default: throw new Error(`unrecognized job type ${job.name} for objectStorage`);
+ }
}, {
- repeat: { cron: '*/5 * * * *' },
- removeOnComplete: true,
+ ...baseQueueOptions(this.config, QUEUE.OBJECT_STORAGE),
+ concurrency: 16,
});
- this.queueService.deliverQueue.process(this.config.deliverJobConcurrency ?? 128, (job) => this.deliverProcessorService.process(job));
- this.queueService.inboxQueue.process(this.config.inboxJobConcurrency ?? 16, (job) => this.inboxProcessorService.process(job));
- this.queueService.endedPollNotificationQueue.process((job, done) => this.endedPollNotificationProcessorService.process(job, done));
- this.queueService.webhookDeliverQueue.process(64, (job) => this.webhookDeliverProcessorService.process(job));
-
- this.queueService.dbQueue.process('deleteDriveFiles', (job, done) => this.deleteDriveFilesProcessorService.process(job, done));
- this.queueService.dbQueue.process('exportCustomEmojis', (job, done) => this.exportCustomEmojisProcessorService.process(job, done));
- this.queueService.dbQueue.process('exportNotes', (job, done) => this.exportNotesProcessorService.process(job, done));
- this.queueService.dbQueue.process('exportFavorites', (job, done) => this.exportFavoritesProcessorService.process(job, done));
- this.queueService.dbQueue.process('exportFollowing', (job, done) => this.exportFollowingProcessorService.process(job, done));
- this.queueService.dbQueue.process('exportMuting', (job, done) => this.exportMutingProcessorService.process(job, done));
- this.queueService.dbQueue.process('exportBlocking', (job, done) => this.exportBlockingProcessorService.process(job, done));
- this.queueService.dbQueue.process('exportUserLists', (job, done) => this.exportUserListsProcessorService.process(job, done));
- this.queueService.dbQueue.process('exportAntennas', (job, done) => this.exportAntennasProcessorService.process(job, done));
- this.queueService.dbQueue.process('importFollowing', (job, done) => this.importFollowingProcessorService.process(job, done));
- this.queueService.dbQueue.process('importFollowingToDb', (job) => this.importFollowingProcessorService.processDb(job));
- this.queueService.dbQueue.process('importMuting', (job, done) => this.importMutingProcessorService.process(job, done));
- this.queueService.dbQueue.process('importBlocking', (job, done) => this.importBlockingProcessorService.process(job, done));
- this.queueService.dbQueue.process('importBlockingToDb', (job) => this.importBlockingProcessorService.processDb(job));
- this.queueService.dbQueue.process('importUserLists', (job, done) => this.importUserListsProcessorService.process(job, done));
- this.queueService.dbQueue.process('importCustomEmojis', (job, done) => this.importCustomEmojisProcessorService.process(job, done));
- this.queueService.dbQueue.process('importAntennas', (job, done) => this.importAntennasProcessorService.process(job, done));
- this.queueService.dbQueue.process('deleteAccount', (job) => this.deleteAccountProcessorService.process(job));
+ const objectStorageLogger = this.logger.createSubLogger('objectStorage');
- this.queueService.objectStorageQueue.process('deleteFile', 16, (job) => this.deleteFileProcessorService.process(job));
- this.queueService.objectStorageQueue.process('cleanRemoteFiles', 16, (job, done) => this.cleanRemoteFilesProcessorService.process(job, done));
-
- {
- const maxJobs = this.config.relashionshipJobConcurrency ?? 16;
- this.queueService.relationshipQueue.process('follow', maxJobs, (job) => this.relationshipProcessorService.processFollow(job));
- this.queueService.relationshipQueue.process('unfollow', maxJobs, (job) => this.relationshipProcessorService.processUnfollow(job));
- this.queueService.relationshipQueue.process('block', maxJobs, (job) => this.relationshipProcessorService.processBlock(job));
- this.queueService.relationshipQueue.process('unblock', maxJobs, (job) => this.relationshipProcessorService.processUnblock(job));
- }
+ objectStorageQueueWorker
+ .on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`))
+ .on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`))
+ .on('failed', (job, err) => objectStorageLogger.warn(`failed(${err}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }))
+ .on('error', (err: Error) => objectStorageLogger.error(`error ${err}`, { e: renderError(err) }))
+ .on('stalled', (jobId) => objectStorageLogger.warn(`stalled id=${jobId}`));
+ //#endregion
- this.queueService.systemQueue.process('tickCharts', (job, done) => this.tickChartsProcessorService.process(job, done));
- this.queueService.systemQueue.process('resyncCharts', (job, done) => this.resyncChartsProcessorService.process(job, done));
- this.queueService.systemQueue.process('cleanCharts', (job, done) => this.cleanChartsProcessorService.process(job, done));
- this.queueService.systemQueue.process('aggregateRetention', (job, done) => this.aggregateRetentionProcessorService.process(job, done));
- this.queueService.systemQueue.process('checkExpiredMutings', (job, done) => this.checkExpiredMutingsProcessorService.process(job, done));
- this.queueService.systemQueue.process('clean', (job, done) => this.cleanProcessorService.process(job, done));
+ //#region ended poll notification
+ const endedPollNotificationWorker = new Bull.Worker(QUEUE.ENDED_POLL_NOTIFICATION, (job) => this.endedPollNotificationProcessorService.process(job), {
+ ...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION),
+ });
+ //#endregion
}
}