diff options
| author | おさむのひと <46447427+samunohito@users.noreply.github.com> | 2024-06-08 15:34:19 +0900 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-06-08 15:34:19 +0900 |
| commit | 61fae45390283aee7ac582aa5303aae863de0f7a (patch) | |
| tree | 17182172ef9f932182fc55f2aabd7243d2be66b2 /packages/backend/src/queue | |
| parent | 配信停止したインスタンス一覧が見れなくなる問題を修... (diff) | |
| download | sharkey-61fae45390283aee7ac582aa5303aae863de0f7a.tar.gz sharkey-61fae45390283aee7ac582aa5303aae863de0f7a.tar.bz2 sharkey-61fae45390283aee7ac582aa5303aae863de0f7a.zip | |
feat: 通報を受けた際にメールまたはWebhookで通知を送出出来るようにする (#13758)
* feat: 通報を受けた際にメールまたはWebhookで通知を送出出来るようにする
* モデログに対応&エンドポイントを単一オブジェクトでのサポートに変更(API経由で大量に作るシチュエーションもないと思うので)
* fix spdx
* fix migration
* fix migration
* fix models
* add e2e webhook
* tweak
* fix modlog
* fix bugs
* add tests and fix bugs
* add tests and fix bugs
* add tests
* fix path
* regenerate locale
* 混入除去
* 混入除去
* add abuseReportResolved
* fix pnpm-lock.yaml
* add abuseReportResolved test
* fix bugs
* fix ui
* add tests
* fix CHANGELOG.md
* add tests
* add RoleService.getModeratorIds tests
* WebhookServiceをUserとSystemに分割
* fix CHANGELOG.md
* fix test
* insertOneを使う用に
* fix
* regenerate locales
* revert version
* separate webhook job queue
* fix
* :art:
* Update QueueProcessorService.ts
---------
Co-authored-by: osamu <46447427+sam-osamu@users.noreply.github.com>
Co-authored-by: syuilo <4439005+syuilo@users.noreply.github.com>
Diffstat (limited to 'packages/backend/src/queue')
| -rw-r--r-- | packages/backend/src/queue/QueueProcessorModule.ts | 6 | ||||
| -rw-r--r-- | packages/backend/src/queue/QueueProcessorService.ts | 153 | ||||
| -rw-r--r-- | packages/backend/src/queue/const.ts | 3 | ||||
| -rw-r--r-- | packages/backend/src/queue/processors/SystemWebhookDeliverProcessorService.ts | 87 | ||||
| -rw-r--r-- | packages/backend/src/queue/processors/UserWebhookDeliverProcessorService.ts (renamed from packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts) | 6 | ||||
| -rw-r--r-- | packages/backend/src/queue/types.ts | 12 |
6 files changed, 206 insertions, 61 deletions
diff --git a/packages/backend/src/queue/QueueProcessorModule.ts b/packages/backend/src/queue/QueueProcessorModule.ts index 8086158997..a1fd38fcc5 100644 --- a/packages/backend/src/queue/QueueProcessorModule.ts +++ b/packages/backend/src/queue/QueueProcessorModule.ts @@ -11,7 +11,8 @@ import { QueueProcessorService } from './QueueProcessorService.js'; import { DeliverProcessorService } from './processors/DeliverProcessorService.js'; import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js'; import { InboxProcessorService } from './processors/InboxProcessorService.js'; -import { WebhookDeliverProcessorService } from './processors/WebhookDeliverProcessorService.js'; +import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js'; +import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js'; import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js'; import { CleanChartsProcessorService } from './processors/CleanChartsProcessorService.js'; import { CleanProcessorService } from './processors/CleanProcessorService.js'; @@ -71,7 +72,8 @@ import { RelationshipProcessorService } from './processors/RelationshipProcessor DeleteFileProcessorService, CleanRemoteFilesProcessorService, RelationshipProcessorService, - WebhookDeliverProcessorService, + UserWebhookDeliverProcessorService, + SystemWebhookDeliverProcessorService, EndedPollNotificationProcessorService, DeliverProcessorService, InboxProcessorService, diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index 7bfe1f4caa..7bd74f3210 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -10,7 +10,8 @@ import type { Config } from '@/config.js'; import { DI } from '@/di-symbols.js'; import type Logger from '@/logger.js'; import { bindThis } from '@/decorators.js'; -import { WebhookDeliverProcessorService } from './processors/WebhookDeliverProcessorService.js'; +import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js'; +import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js'; import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js'; import { DeliverProcessorService } from './processors/DeliverProcessorService.js'; import { InboxProcessorService } from './processors/InboxProcessorService.js'; @@ -76,7 +77,8 @@ export class QueueProcessorService implements OnApplicationShutdown { private dbQueueWorker: Bull.Worker; private deliverQueueWorker: Bull.Worker; private inboxQueueWorker: Bull.Worker; - private webhookDeliverQueueWorker: Bull.Worker; + private userWebhookDeliverQueueWorker: Bull.Worker; + private systemWebhookDeliverQueueWorker: Bull.Worker; private relationshipQueueWorker: Bull.Worker; private objectStorageQueueWorker: Bull.Worker; private endedPollNotificationQueueWorker: Bull.Worker; @@ -86,7 +88,8 @@ export class QueueProcessorService implements OnApplicationShutdown { private config: Config, private queueLoggerService: QueueLoggerService, - private webhookDeliverProcessorService: WebhookDeliverProcessorService, + private userWebhookDeliverProcessorService: UserWebhookDeliverProcessorService, + private systemWebhookDeliverProcessorService: SystemWebhookDeliverProcessorService, private endedPollNotificationProcessorService: EndedPollNotificationProcessorService, private deliverProcessorService: DeliverProcessorService, private inboxProcessorService: InboxProcessorService, @@ -160,13 +163,13 @@ export class QueueProcessorService implements OnApplicationShutdown { autorun: false, }); - const systemLogger = this.logger.createSubLogger('system'); + const logger = this.logger.createSubLogger('system'); this.systemQueueWorker - .on('active', (job) => systemLogger.debug(`active id=${job.id}`)) - .on('completed', (job, result) => systemLogger.debug(`completed(${result}) id=${job.id}`)) + .on('active', (job) => logger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`)) .on('failed', (job, err: Error) => { - systemLogger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); + logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); if (config.sentryForBackend) { Sentry.captureMessage(`Queue: System: ${job?.name ?? '?'}: ${err.message}`, { level: 'error', @@ -174,8 +177,8 @@ export class QueueProcessorService implements OnApplicationShutdown { }); } }) - .on('error', (err: Error) => systemLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => systemLogger.warn(`stalled id=${jobId}`)); + .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -217,13 +220,13 @@ export class QueueProcessorService implements OnApplicationShutdown { autorun: false, }); - const dbLogger = this.logger.createSubLogger('db'); + const logger = this.logger.createSubLogger('db'); this.dbQueueWorker - .on('active', (job) => dbLogger.debug(`active id=${job.id}`)) - .on('completed', (job, result) => dbLogger.debug(`completed(${result}) id=${job.id}`)) + .on('active', (job) => logger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`)) .on('failed', (job, err) => { - dbLogger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); + logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); if (config.sentryForBackend) { Sentry.captureMessage(`Queue: DB: ${job?.name ?? '?'}: ${err.message}`, { level: 'error', @@ -231,8 +234,8 @@ export class QueueProcessorService implements OnApplicationShutdown { }); } }) - .on('error', (err: Error) => dbLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => dbLogger.warn(`stalled id=${jobId}`)); + .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -257,13 +260,13 @@ export class QueueProcessorService implements OnApplicationShutdown { }, }); - const deliverLogger = this.logger.createSubLogger('deliver'); + const logger = this.logger.createSubLogger('deliver'); this.deliverQueueWorker - .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) - .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) .on('failed', (job, err) => { - deliverLogger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`); + logger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`); if (config.sentryForBackend) { Sentry.captureMessage(`Queue: Deliver: ${err.message}`, { level: 'error', @@ -271,8 +274,8 @@ export class QueueProcessorService implements OnApplicationShutdown { }); } }) - .on('error', (err: Error) => deliverLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => deliverLogger.warn(`stalled id=${jobId}`)); + .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -297,13 +300,13 @@ export class QueueProcessorService implements OnApplicationShutdown { }, }); - const inboxLogger = this.logger.createSubLogger('inbox'); + const logger = this.logger.createSubLogger('inbox'); this.inboxQueueWorker - .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`)) - .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`)) + .on('active', (job) => logger.debug(`active ${getJobInfo(job, true)}`)) + .on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)}`)) .on('failed', (job, err) => { - inboxLogger.error(`failed(${err.stack}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, e: renderError(err) }); + logger.error(`failed(${err.stack}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, e: renderError(err) }); if (config.sentryForBackend) { Sentry.captureMessage(`Queue: Inbox: ${err.message}`, { level: 'error', @@ -311,21 +314,21 @@ export class QueueProcessorService implements OnApplicationShutdown { }); } }) - .on('error', (err: Error) => inboxLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => inboxLogger.warn(`stalled id=${jobId}`)); + .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion - //#region webhook deliver + //#region user-webhook deliver { - this.webhookDeliverQueueWorker = new Bull.Worker(QUEUE.WEBHOOK_DELIVER, (job) => { + this.userWebhookDeliverQueueWorker = new Bull.Worker(QUEUE.USER_WEBHOOK_DELIVER, (job) => { if (this.config.sentryForBackend) { - return Sentry.startSpan({ name: 'Queue: WebhookDeliver' }, () => this.webhookDeliverProcessorService.process(job)); + return Sentry.startSpan({ name: 'Queue: UserWebhookDeliver' }, () => this.userWebhookDeliverProcessorService.process(job)); } else { - return this.webhookDeliverProcessorService.process(job); + return this.userWebhookDeliverProcessorService.process(job); } }, { - ...baseQueueOptions(this.config, QUEUE.WEBHOOK_DELIVER), + ...baseQueueOptions(this.config, QUEUE.USER_WEBHOOK_DELIVER), autorun: false, concurrency: 64, limiter: { @@ -337,22 +340,62 @@ export class QueueProcessorService implements OnApplicationShutdown { }, }); - const webhookLogger = this.logger.createSubLogger('webhook'); + const logger = this.logger.createSubLogger('user-webhook'); - this.webhookDeliverQueueWorker - .on('active', (job) => webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) - .on('completed', (job, result) => webhookLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) + this.userWebhookDeliverQueueWorker + .on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) .on('failed', (job, err) => { - webhookLogger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`); + logger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`); if (config.sentryForBackend) { - Sentry.captureMessage(`Queue: WebhookDeliver: ${err.message}`, { + Sentry.captureMessage(`Queue: UserWebhookDeliver: ${err.message}`, { level: 'error', extra: { job, err }, }); } }) - .on('error', (err: Error) => webhookLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => webhookLogger.warn(`stalled id=${jobId}`)); + .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); + } + //#endregion + + //#region system-webhook deliver + { + this.systemWebhookDeliverQueueWorker = new Bull.Worker(QUEUE.SYSTEM_WEBHOOK_DELIVER, (job) => { + if (this.config.sentryForBackend) { + return Sentry.startSpan({ name: 'Queue: SystemWebhookDeliver' }, () => this.systemWebhookDeliverProcessorService.process(job)); + } else { + return this.systemWebhookDeliverProcessorService.process(job); + } + }, { + ...baseQueueOptions(this.config, QUEUE.SYSTEM_WEBHOOK_DELIVER), + autorun: false, + concurrency: 16, + limiter: { + max: 16, + duration: 1000, + }, + settings: { + backoffStrategy: httpRelatedBackoff, + }, + }); + + const logger = this.logger.createSubLogger('system-webhook'); + + this.systemWebhookDeliverQueueWorker + .on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('failed', (job, err) => { + logger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`); + if (config.sentryForBackend) { + Sentry.captureMessage(`Queue: SystemWebhookDeliver: ${err.message}`, { + level: 'error', + extra: { job, err }, + }); + } + }) + .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -384,13 +427,13 @@ export class QueueProcessorService implements OnApplicationShutdown { }, }); - const relationshipLogger = this.logger.createSubLogger('relationship'); + const logger = this.logger.createSubLogger('relationship'); this.relationshipQueueWorker - .on('active', (job) => relationshipLogger.debug(`active id=${job.id}`)) - .on('completed', (job, result) => relationshipLogger.debug(`completed(${result}) id=${job.id}`)) + .on('active', (job) => logger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`)) .on('failed', (job, err) => { - relationshipLogger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); + logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); if (config.sentryForBackend) { Sentry.captureMessage(`Queue: Relationship: ${job?.name ?? '?'}: ${err.message}`, { level: 'error', @@ -398,8 +441,8 @@ export class QueueProcessorService implements OnApplicationShutdown { }); } }) - .on('error', (err: Error) => relationshipLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => relationshipLogger.warn(`stalled id=${jobId}`)); + .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -425,13 +468,13 @@ export class QueueProcessorService implements OnApplicationShutdown { concurrency: 16, }); - const objectStorageLogger = this.logger.createSubLogger('objectStorage'); + const logger = this.logger.createSubLogger('objectStorage'); this.objectStorageQueueWorker - .on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`)) - .on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`)) + .on('active', (job) => logger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`)) .on('failed', (job, err) => { - objectStorageLogger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); + logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) }); if (config.sentryForBackend) { Sentry.captureMessage(`Queue: ObjectStorage: ${job?.name ?? '?'}: ${err.message}`, { level: 'error', @@ -439,8 +482,8 @@ export class QueueProcessorService implements OnApplicationShutdown { }); } }) - .on('error', (err: Error) => objectStorageLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => objectStorageLogger.warn(`stalled id=${jobId}`)); + .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -467,7 +510,8 @@ export class QueueProcessorService implements OnApplicationShutdown { this.dbQueueWorker.run(), this.deliverQueueWorker.run(), this.inboxQueueWorker.run(), - this.webhookDeliverQueueWorker.run(), + this.userWebhookDeliverQueueWorker.run(), + this.systemWebhookDeliverQueueWorker.run(), this.relationshipQueueWorker.run(), this.objectStorageQueueWorker.run(), this.endedPollNotificationQueueWorker.run(), @@ -481,7 +525,8 @@ export class QueueProcessorService implements OnApplicationShutdown { this.dbQueueWorker.close(), this.deliverQueueWorker.close(), this.inboxQueueWorker.close(), - this.webhookDeliverQueueWorker.close(), + this.userWebhookDeliverQueueWorker.close(), + this.systemWebhookDeliverQueueWorker.close(), this.relationshipQueueWorker.close(), this.objectStorageQueueWorker.close(), this.endedPollNotificationQueueWorker.close(), diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts index 132e916612..67f689b618 100644 --- a/packages/backend/src/queue/const.ts +++ b/packages/backend/src/queue/const.ts @@ -14,7 +14,8 @@ export const QUEUE = { DB: 'db', RELATIONSHIP: 'relationship', OBJECT_STORAGE: 'objectStorage', - WEBHOOK_DELIVER: 'webhookDeliver', + USER_WEBHOOK_DELIVER: 'userWebhookDeliver', + SYSTEM_WEBHOOK_DELIVER: 'systemWebhookDeliver', }; export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions { diff --git a/packages/backend/src/queue/processors/SystemWebhookDeliverProcessorService.ts b/packages/backend/src/queue/processors/SystemWebhookDeliverProcessorService.ts new file mode 100644 index 0000000000..f6bef52684 --- /dev/null +++ b/packages/backend/src/queue/processors/SystemWebhookDeliverProcessorService.ts @@ -0,0 +1,87 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Inject, Injectable } from '@nestjs/common'; +import * as Bull from 'bullmq'; +import { DI } from '@/di-symbols.js'; +import type { SystemWebhooksRepository } from '@/models/_.js'; +import type { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { HttpRequestService } from '@/core/HttpRequestService.js'; +import { StatusError } from '@/misc/status-error.js'; +import { bindThis } from '@/decorators.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import { SystemWebhookDeliverJobData } from '../types.js'; + +@Injectable() +export class SystemWebhookDeliverProcessorService { + private logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.systemWebhooksRepository) + private systemWebhooksRepository: SystemWebhooksRepository, + + private httpRequestService: HttpRequestService, + private queueLoggerService: QueueLoggerService, + ) { + this.logger = this.queueLoggerService.logger.createSubLogger('webhook'); + } + + @bindThis + public async process(job: Bull.Job<SystemWebhookDeliverJobData>): Promise<string> { + try { + this.logger.debug(`delivering ${job.data.webhookId}`); + + const res = await this.httpRequestService.send(job.data.to, { + method: 'POST', + headers: { + 'User-Agent': 'Misskey-Hooks', + 'X-Misskey-Host': this.config.host, + 'X-Misskey-Hook-Id': job.data.webhookId, + 'X-Misskey-Hook-Secret': job.data.secret, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + server: this.config.url, + hookId: job.data.webhookId, + eventId: job.data.eventId, + createdAt: job.data.createdAt, + type: job.data.type, + body: job.data.content, + }), + }); + + this.systemWebhooksRepository.update({ id: job.data.webhookId }, { + latestSentAt: new Date(), + latestStatus: res.status, + }); + + return 'Success'; + } catch (res) { + this.logger.error(res as Error); + + this.systemWebhooksRepository.update({ id: job.data.webhookId }, { + latestSentAt: new Date(), + latestStatus: res instanceof StatusError ? res.statusCode : 1, + }); + + if (res instanceof StatusError) { + // 4xx + if (!res.isRetryable) { + throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`); + } + + // 5xx etc. + throw new Error(`${res.statusCode} ${res.statusMessage}`); + } else { + // DNS error, socket error, timeout ... + throw res; + } + } + } +} diff --git a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts b/packages/backend/src/queue/processors/UserWebhookDeliverProcessorService.ts index 8c260c0137..9ec630ef70 100644 --- a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts +++ b/packages/backend/src/queue/processors/UserWebhookDeliverProcessorService.ts @@ -13,10 +13,10 @@ import { HttpRequestService } from '@/core/HttpRequestService.js'; import { StatusError } from '@/misc/status-error.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; -import type { WebhookDeliverJobData } from '../types.js'; +import { UserWebhookDeliverJobData } from '../types.js'; @Injectable() -export class WebhookDeliverProcessorService { +export class UserWebhookDeliverProcessorService { private logger: Logger; constructor( @@ -33,7 +33,7 @@ export class WebhookDeliverProcessorService { } @bindThis - public async process(job: Bull.Job<WebhookDeliverJobData>): Promise<string> { + public async process(job: Bull.Job<UserWebhookDeliverJobData>): Promise<string> { try { this.logger.debug(`delivering ${job.data.webhookId}`); diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index ce57ba745e..a4077a0547 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -106,7 +106,17 @@ export type EndedPollNotificationJobData = { noteId: MiNote['id']; }; -export type WebhookDeliverJobData = { +export type SystemWebhookDeliverJobData = { + type: string; + content: unknown; + webhookId: MiWebhook['id']; + to: string; + secret: string; + createdAt: number; + eventId: string; +}; + +export type UserWebhookDeliverJobData = { type: string; content: unknown; webhookId: MiWebhook['id']; |