diff options
Diffstat (limited to 'packages/backend/src/queue')
8 files changed, 35 insertions, 28 deletions
diff --git a/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts index b81987cc15..ef21b6142e 100644 --- a/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts +++ b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts @@ -215,15 +215,10 @@ export class CheckModeratorsActivityProcessorService { // -- SystemWebhook - const systemWebhooks = await this.systemWebhookService.fetchActiveSystemWebhooks() - .then(it => it.filter(it => it.on.includes('inactiveModeratorsWarning'))); - for (const systemWebhook of systemWebhooks) { - this.systemWebhookService.enqueueSystemWebhook( - systemWebhook, - 'inactiveModeratorsWarning', - { remainingTime: remainingTime }, - ); - } + return this.systemWebhookService.enqueueSystemWebhook( + 'inactiveModeratorsWarning', + { remainingTime: remainingTime }, + ); } @bindThis @@ -253,15 +248,10 @@ export class CheckModeratorsActivityProcessorService { // -- SystemWebhook - const systemWebhooks = await this.systemWebhookService.fetchActiveSystemWebhooks() - .then(it => it.filter(it => it.on.includes('inactiveModeratorsInvitationOnlyChanged'))); - for (const systemWebhook of systemWebhooks) { - this.systemWebhookService.enqueueSystemWebhook( - systemWebhook, - 'inactiveModeratorsInvitationOnlyChanged', - {}, - ); - } + return this.systemWebhookService.enqueueSystemWebhook( + 'inactiveModeratorsInvitationOnlyChanged', + {}, + ); } @bindThis diff --git a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts index 19f98c0d51..8c5faa8d07 100644 --- a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts +++ b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts @@ -48,6 +48,7 @@ export class CleanChartsProcessorService { public async process(): Promise<void> { this.logger.info('Clean charts...'); + // DBへの同時接続を避けるためにPromise.allを使わずひとつずつ実行する await this.federationChart.clean(); await this.notesChart.clean(); await this.usersChart.clean(); diff --git a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts index 0e604a0501..e350b97f53 100644 --- a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts +++ b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts @@ -15,10 +15,10 @@ import type { MiNoteReaction } from '@/models/NoteReaction.js'; import { EmailService } from '@/core/EmailService.js'; import { bindThis } from '@/decorators.js'; import { SearchService } from '@/core/SearchService.js'; +import { ReactionService } from '@/core/ReactionService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbUserDeleteJobData } from '../types.js'; -import { ReactionService } from '@/core/ReactionService.js'; @Injectable() export class DeleteAccountProcessorService { diff --git a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts index 666a709ab9..383fa0c26a 100644 --- a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts @@ -14,10 +14,10 @@ import { createTempDir } from '@/misc/create-temp.js'; import { DriveService } from '@/core/DriveService.js'; import { DownloadService } from '@/core/DownloadService.js'; import { bindThis } from '@/decorators.js'; +import type { Config } from '@/config.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbUserImportJobData } from '../types.js'; -import type { Config } from '@/config.js'; // TODO: 名前衝突時の動作を選べるようにする @Injectable() @@ -92,6 +92,7 @@ export class ImportCustomEmojisProcessorService { await this.emojisRepository.delete({ name: nameNfc, }); + try { const driveFile = await this.driveService.addFile({ user: null, @@ -100,11 +101,13 @@ export class ImportCustomEmojisProcessorService { force: true, }); await this.customEmojiService.add({ + originalUrl: driveFile.url, + publicUrl: driveFile.webpublicUrl ?? driveFile.url, + fileType: driveFile.webpublicType ?? driveFile.type, name: nameNfc, category: emojiInfo.category?.normalize('NFC'), host: null, aliases: emojiInfo.aliases?.map((a: string) => a.normalize('NFC')), - driveFile, license: emojiInfo.license, isSensitive: emojiInfo.isSensitive, localOnly: emojiInfo.localOnly, diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 7727a3e985..87d4bf52fa 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -248,6 +248,14 @@ export class InboxProcessorService implements OnApplicationShutdown { return `skip: permanent error ${e.statusCode}`; } + if (e instanceof IdentifiableError && !e.isRetryable) { + if (e.message) { + return `skip: permanent error ${e.id}: ${e.message}`; + } else { + return `skip: permanent error ${e.id}`; + } + } + throw e; } return 'ok'; diff --git a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts index 46e1adf173..0c47fdedb3 100644 --- a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts +++ b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts @@ -29,6 +29,7 @@ export class ResyncChartsProcessorService { public async process(): Promise<void> { this.logger.info('Resync charts...'); + // DBへの同時接続を避けるためにPromise.allを使わずひとつずつ実行する // TODO: ユーザーごとのチャートも更新する // TODO: インスタンスごとのチャートも更新する await this.driveChart.resync(); diff --git a/packages/backend/src/queue/processors/TickChartsProcessorService.ts b/packages/backend/src/queue/processors/TickChartsProcessorService.ts index c09cbccc57..fc8856a271 100644 --- a/packages/backend/src/queue/processors/TickChartsProcessorService.ts +++ b/packages/backend/src/queue/processors/TickChartsProcessorService.ts @@ -48,6 +48,7 @@ export class TickChartsProcessorService { public async process(): Promise<void> { this.logger.info('Tick charts...'); + // DBへの同時接続を避けるためにPromise.allを使わずひとつずつ実行する await this.federationChart.tick(false); await this.notesChart.tick(false); await this.usersChart.tick(false); diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index 9433392df5..a900675a86 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -6,9 +6,12 @@ import type { Antenna } from '@/server/api/endpoints/i/import-antennas.js'; import type { MiDriveFile } from '@/models/DriveFile.js'; import type { MiNote } from '@/models/Note.js'; +import type { SystemWebhookEventType } from '@/models/SystemWebhook.js'; import type { MiUser } from '@/models/User.js'; -import type { MiWebhook } from '@/models/Webhook.js'; +import type { MiWebhook, WebhookEventTypes } from '@/models/Webhook.js'; import type { IActivity } from '@/core/activitypub/type.js'; +import type { SystemWebhookPayload } from '@/core/SystemWebhookService.js'; +import type { UserWebhookPayload } from '@/core/UserWebhookService.js'; import type httpSignature from '@peertube/http-signature'; export type DeliverJobData = { @@ -131,9 +134,9 @@ export type EndedPollNotificationJobData = { noteId: MiNote['id']; }; -export type SystemWebhookDeliverJobData = { - type: string; - content: unknown; +export type SystemWebhookDeliverJobData<T extends SystemWebhookEventType = SystemWebhookEventType> = { + type: T; + content: SystemWebhookPayload<T>; webhookId: MiWebhook['id']; to: string; secret: string; @@ -141,9 +144,9 @@ export type SystemWebhookDeliverJobData = { eventId: string; }; -export type UserWebhookDeliverJobData = { - type: string; - content: unknown; +export type UserWebhookDeliverJobData<T extends WebhookEventTypes = WebhookEventTypes> = { + type: T; + content: UserWebhookPayload<T>; webhookId: MiWebhook['id']; userId: MiUser['id']; to: string; |