diff options
| author | おさむのひと <46447427+samunohito@users.noreply.github.com> | 2024-06-08 15:34:19 +0900 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-06-08 15:34:19 +0900 |
| commit | 61fae45390283aee7ac582aa5303aae863de0f7a (patch) | |
| tree | 17182172ef9f932182fc55f2aabd7243d2be66b2 /packages/backend/src/queue/QueueProcessorService.ts | |
| parent | 配信停止したインスタンス一覧が見れなくなる問題を修... (diff) | |
| download | sharkey-61fae45390283aee7ac582aa5303aae863de0f7a.tar.gz sharkey-61fae45390283aee7ac582aa5303aae863de0f7a.tar.bz2 sharkey-61fae45390283aee7ac582aa5303aae863de0f7a.zip | |
feat: 通報を受けた際にメールまたはWebhookで通知を送出出来るようにする (#13758)
* feat: 通報を受けた際にメールまたはWebhookで通知を送出出来るようにする
* モデログに対応&エンドポイントを単一オブジェクトでのサポートに変更(API経由で大量に作るシチュエーションもないと思うので)
* fix spdx
* fix migration
* fix migration
* fix models
* add e2e webhook
* tweak
* fix modlog
* fix bugs
* add tests and fix bugs
* add tests and fix bugs
* add tests
* fix path
* regenerate locale
* 混入除去
* 混入除去
* add abuseReportResolved
* fix pnpm-lock.yaml
* add abuseReportResolved test
* fix bugs
* fix ui
* add tests
* fix CHANGELOG.md
* add tests
* add RoleService.getModeratorIds tests
* WebhookServiceをUserとSystemに分割
* fix CHANGELOG.md
* fix test
* insertOneを使う用に
* fix
* regenerate locales
* revert version
* separate webhook job queue
* fix
* :art:
* Update QueueProcessorService.ts
---------
Co-authored-by: osamu <46447427+sam-osamu@users.noreply.github.com>
Co-authored-by: syuilo <4439005+syuilo@users.noreply.github.com>
Diffstat (limited to 'packages/backend/src/queue/QueueProcessorService.ts')
| -rw-r--r-- | packages/backend/src/queue/QueueProcessorService.ts | 153 |
1 files changed, 99 insertions, 54 deletions
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index 7bfe1f4caa..7bd74f3210 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -10,7 +10,8 @@ import type { Config } from '@/config.js'; import { DI } from '@/di-symbols.js'; import type Logger from '@/logger.js'; import { bindThis } from '@/decorators.js'; -import { WebhookDeliverProcessorService } from './processors/WebhookDeliverProcessorService.js'; +import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js'; +import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js'; import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js'; import { DeliverProcessorService } from './processors/DeliverProcessorService.js'; import { InboxProcessorService } from './processors/InboxProcessorService.js'; @@ -76,7 +77,8 @@ export class QueueProcessorService implements OnApplicationShutdown { private dbQueueWorker: Bull.Worker; private deliverQueueWorker: Bull.Worker; private inboxQueueWorker: Bull.Worker; - private webhookDeliverQueueWorker: Bull.Worker; + private userWebhookDeliverQueueWorker: Bull.Worker; + private systemWebhookDeliverQueueWorker: Bull.Worker; private relationshipQueueWorker: Bull.Worker; private objectStorageQueueWorker: Bull.Worker; private endedPollNotificationQueueWorker: Bull.Worker; @@ -86,7 +88,8 @@ export class QueueProcessorService implements OnApplicationShutdown { private config: Config, private queueLoggerService: QueueLoggerService, - private webhookDeliverProcessorService: WebhookDeliverProcessorService, + private userWebhookDeliverProcessorService: UserWebhookDeliverProcessorService, + private systemWebhookDeliverProcessorService: SystemWebhookDeliverProcessorService, private endedPollNotificationProcessorService: EndedPollNotificationProcessorService, private deliverProcessorService: DeliverProcessorService, private inboxProcessorService: InboxProcessorService, @@ -160,13 +163,13 @@ export class QueueProcessorService implements OnApplicationShutdown { autorun: false, }); - const systemLogger = this.logger.createSubLogger('system'); + const logger = this.logger.createSubLogger('system'); this.systemQueueWorker - .on('active', (job) => systemLogger.debug(`active id=${job.id}`)) - .on('completed', (job, result) => systemLogger.debug(`completed(${result}) id=${job.id}`)) + .on('active', (job) => logger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`)) .on('failed', (job, err: Error) => { - systemLogger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); + logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); if (config.sentryForBackend) { Sentry.captureMessage(`Queue: System: ${job?.name ?? '?'}: ${err.message}`, { level: 'error', @@ -174,8 +177,8 @@ export class QueueProcessorService implements OnApplicationShutdown { }); } }) - .on('error', (err: Error) => systemLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => systemLogger.warn(`stalled id=${jobId}`)); + .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -217,13 +220,13 @@ export class QueueProcessorService implements OnApplicationShutdown { autorun: false, }); - const dbLogger = this.logger.createSubLogger('db'); + const logger = this.logger.createSubLogger('db'); this.dbQueueWorker - .on('active', (job) => dbLogger.debug(`active id=${job.id}`)) - .on('completed', (job, result) => dbLogger.debug(`completed(${result}) id=${job.id}`)) + .on('active', (job) => logger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`)) .on('failed', (job, err) => { - dbLogger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); + logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); if (config.sentryForBackend) { Sentry.captureMessage(`Queue: DB: ${job?.name ?? '?'}: ${err.message}`, { level: 'error', @@ -231,8 +234,8 @@ export class QueueProcessorService implements OnApplicationShutdown { }); } }) - .on('error', (err: Error) => dbLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => dbLogger.warn(`stalled id=${jobId}`)); + .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -257,13 +260,13 @@ export class QueueProcessorService implements OnApplicationShutdown { }, }); - const deliverLogger = this.logger.createSubLogger('deliver'); + const logger = this.logger.createSubLogger('deliver'); this.deliverQueueWorker - .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) - .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) .on('failed', (job, err) => { - deliverLogger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`); + logger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`); if (config.sentryForBackend) { Sentry.captureMessage(`Queue: Deliver: ${err.message}`, { level: 'error', @@ -271,8 +274,8 @@ export class QueueProcessorService implements OnApplicationShutdown { }); } }) - .on('error', (err: Error) => deliverLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => deliverLogger.warn(`stalled id=${jobId}`)); + .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -297,13 +300,13 @@ export class QueueProcessorService implements OnApplicationShutdown { }, }); - const inboxLogger = this.logger.createSubLogger('inbox'); + const logger = this.logger.createSubLogger('inbox'); this.inboxQueueWorker - .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`)) - .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`)) + .on('active', (job) => logger.debug(`active ${getJobInfo(job, true)}`)) + .on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)}`)) .on('failed', (job, err) => { - inboxLogger.error(`failed(${err.stack}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, e: renderError(err) }); + logger.error(`failed(${err.stack}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, e: renderError(err) }); if (config.sentryForBackend) { Sentry.captureMessage(`Queue: Inbox: ${err.message}`, { level: 'error', @@ -311,21 +314,21 @@ export class QueueProcessorService implements OnApplicationShutdown { }); } }) - .on('error', (err: Error) => inboxLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => inboxLogger.warn(`stalled id=${jobId}`)); + .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion - //#region webhook deliver + //#region user-webhook deliver { - this.webhookDeliverQueueWorker = new Bull.Worker(QUEUE.WEBHOOK_DELIVER, (job) => { + this.userWebhookDeliverQueueWorker = new Bull.Worker(QUEUE.USER_WEBHOOK_DELIVER, (job) => { if (this.config.sentryForBackend) { - return Sentry.startSpan({ name: 'Queue: WebhookDeliver' }, () => this.webhookDeliverProcessorService.process(job)); + return Sentry.startSpan({ name: 'Queue: UserWebhookDeliver' }, () => this.userWebhookDeliverProcessorService.process(job)); } else { - return this.webhookDeliverProcessorService.process(job); + return this.userWebhookDeliverProcessorService.process(job); } }, { - ...baseQueueOptions(this.config, QUEUE.WEBHOOK_DELIVER), + ...baseQueueOptions(this.config, QUEUE.USER_WEBHOOK_DELIVER), autorun: false, concurrency: 64, limiter: { @@ -337,22 +340,62 @@ export class QueueProcessorService implements OnApplicationShutdown { }, }); - const webhookLogger = this.logger.createSubLogger('webhook'); + const logger = this.logger.createSubLogger('user-webhook'); - this.webhookDeliverQueueWorker - .on('active', (job) => webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) - .on('completed', (job, result) => webhookLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) + this.userWebhookDeliverQueueWorker + .on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) .on('failed', (job, err) => { - webhookLogger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`); + logger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`); if (config.sentryForBackend) { - Sentry.captureMessage(`Queue: WebhookDeliver: ${err.message}`, { + Sentry.captureMessage(`Queue: UserWebhookDeliver: ${err.message}`, { level: 'error', extra: { job, err }, }); } }) - .on('error', (err: Error) => webhookLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => webhookLogger.warn(`stalled id=${jobId}`)); + .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); + } + //#endregion + + //#region system-webhook deliver + { + this.systemWebhookDeliverQueueWorker = new Bull.Worker(QUEUE.SYSTEM_WEBHOOK_DELIVER, (job) => { + if (this.config.sentryForBackend) { + return Sentry.startSpan({ name: 'Queue: SystemWebhookDeliver' }, () => this.systemWebhookDeliverProcessorService.process(job)); + } else { + return this.systemWebhookDeliverProcessorService.process(job); + } + }, { + ...baseQueueOptions(this.config, QUEUE.SYSTEM_WEBHOOK_DELIVER), + autorun: false, + concurrency: 16, + limiter: { + max: 16, + duration: 1000, + }, + settings: { + backoffStrategy: httpRelatedBackoff, + }, + }); + + const logger = this.logger.createSubLogger('system-webhook'); + + this.systemWebhookDeliverQueueWorker + .on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('failed', (job, err) => { + logger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`); + if (config.sentryForBackend) { + Sentry.captureMessage(`Queue: SystemWebhookDeliver: ${err.message}`, { + level: 'error', + extra: { job, err }, + }); + } + }) + .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -384,13 +427,13 @@ export class QueueProcessorService implements OnApplicationShutdown { }, }); - const relationshipLogger = this.logger.createSubLogger('relationship'); + const logger = this.logger.createSubLogger('relationship'); this.relationshipQueueWorker - .on('active', (job) => relationshipLogger.debug(`active id=${job.id}`)) - .on('completed', (job, result) => relationshipLogger.debug(`completed(${result}) id=${job.id}`)) + .on('active', (job) => logger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`)) .on('failed', (job, err) => { - relationshipLogger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); + logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); if (config.sentryForBackend) { Sentry.captureMessage(`Queue: Relationship: ${job?.name ?? '?'}: ${err.message}`, { level: 'error', @@ -398,8 +441,8 @@ export class QueueProcessorService implements OnApplicationShutdown { }); } }) - .on('error', (err: Error) => relationshipLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => relationshipLogger.warn(`stalled id=${jobId}`)); + .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -425,13 +468,13 @@ export class QueueProcessorService implements OnApplicationShutdown { concurrency: 16, }); - const objectStorageLogger = this.logger.createSubLogger('objectStorage'); + const logger = this.logger.createSubLogger('objectStorage'); this.objectStorageQueueWorker - .on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`)) - .on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`)) + .on('active', (job) => logger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`)) .on('failed', (job, err) => { - objectStorageLogger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); + logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); if (config.sentryForBackend) { Sentry.captureMessage(`Queue: ObjectStorage: ${job?.name ?? '?'}: ${err.message}`, { level: 'error', @@ -439,8 +482,8 @@ export class QueueProcessorService implements OnApplicationShutdown { }); } }) - .on('error', (err: Error) => objectStorageLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => objectStorageLogger.warn(`stalled id=${jobId}`)); + .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -467,7 +510,8 @@ export class QueueProcessorService implements OnApplicationShutdown { this.dbQueueWorker.run(), this.deliverQueueWorker.run(), this.inboxQueueWorker.run(), - this.webhookDeliverQueueWorker.run(), + this.userWebhookDeliverQueueWorker.run(), + this.systemWebhookDeliverQueueWorker.run(), this.relationshipQueueWorker.run(), this.objectStorageQueueWorker.run(), this.endedPollNotificationQueueWorker.run(), @@ -481,7 +525,8 @@ export class QueueProcessorService implements OnApplicationShutdown { this.dbQueueWorker.close(), this.deliverQueueWorker.close(), this.inboxQueueWorker.close(), - this.webhookDeliverQueueWorker.close(), + this.userWebhookDeliverQueueWorker.close(), + this.systemWebhookDeliverQueueWorker.close(), this.relationshipQueueWorker.close(), this.objectStorageQueueWorker.close(), this.endedPollNotificationQueueWorker.close(), |