From fd7b77c542b51313d8b8ea60124725fe65a295d5 Mon Sep 17 00:00:00 2001 From: syuilo Date: Mon, 29 May 2023 11:54:49 +0900 Subject: enhance(backend): migrate bull to bullmq (#10910) * wip * wip * Update QueueService.ts * wip * refactor * :v: * fix * Update QueueStatsService.ts * refactor * Update ApNoteService.ts * Update mock-resolver.ts * refactor * Update mock-resolver.ts --- .../src/queue/processors/InboxProcessorService.ts | 38 +++++++++------------- 1 file changed, 15 insertions(+), 23 deletions(-) (limited to 'packages/backend/src/queue/processors/InboxProcessorService.ts') diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index ab8b1e9e22..ce1d7aaa1b 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -1,8 +1,8 @@ import { URL } from 'node:url'; import { Inject, Injectable } from '@nestjs/common'; import httpSignature from '@peertube/http-signature'; +import * as Bull from 'bullmq'; import { DI } from '@/di-symbols.js'; -import type { InstancesRepository, DriveFilesRepository } from '@/models/index.js'; import type { Config } from '@/config.js'; import type Logger from '@/logger.js'; import { MetaService } from '@/core/MetaService.js'; @@ -23,10 +23,8 @@ import { LdSignatureService } from '@/core/activitypub/LdSignatureService.js'; import { ApInboxService } from '@/core/activitypub/ApInboxService.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; -import type Bull from 'bull'; import type { InboxJobData } from '../types.js'; -// ユーザーのinboxにアクティビティが届いた時の処理 @Injectable() export class InboxProcessorService { private logger: Logger; @@ -35,12 +33,6 @@ export class InboxProcessorService { @Inject(DI.config) private config: Config, - @Inject(DI.instancesRepository) - private instancesRepository: InstancesRepository, - - @Inject(DI.driveFilesRepository) - private driveFilesRepository: DriveFilesRepository, - private utilityService: UtilityService, private metaService: MetaService, private apInboxService: ApInboxService, @@ -93,24 +85,24 @@ export class InboxProcessorService { try { authUser = await this.apDbResolverService.getAuthUserFromApId(getApId(activity.actor)); } catch (err) { - // 対象が4xxならスキップ + // 対象が4xxならスキップ if (err instanceof StatusError) { if (err.isClientError) { - return `skip: Ignored deleted actors on both ends ${activity.actor} - ${err.statusCode}`; + throw new Bull.UnrecoverableError(`skip: Ignored deleted actors on both ends ${activity.actor} - ${err.statusCode}`); } - throw `Error in actor ${activity.actor} - ${err.statusCode ?? err}`; + throw new Error(`Error in actor ${activity.actor} - ${err.statusCode ?? err}`); } } } // それでもわからなければ終了 if (authUser == null) { - return 'skip: failed to resolve user'; + throw new Bull.UnrecoverableError('skip: failed to resolve user'); } // publicKey がなくても終了 if (authUser.key == null) { - return 'skip: failed to resolve user publicKey'; + throw new Bull.UnrecoverableError('skip: failed to resolve user publicKey'); } // HTTP-Signatureの検証 @@ -118,10 +110,10 @@ export class InboxProcessorService { // また、signatureのsignerは、activity.actorと一致する必要がある if (!httpSignatureValidated || authUser.user.uri !== activity.actor) { - // 一致しなくても、でもLD-Signatureがありそうならそっちも見る + // 一致しなくても、でもLD-Signatureがありそうならそっちも見る if (activity.signature) { if (activity.signature.type !== 'RsaSignature2017') { - return `skip: unsupported LD-signature type ${activity.signature.type}`; + throw new Bull.UnrecoverableError(`skip: unsupported LD-signature type ${activity.signature.type}`); } // activity.signature.creator: https://example.oom/users/user#main-key @@ -134,32 +126,32 @@ export class InboxProcessorService { // keyIdからLD-Signatureのユーザーを取得 authUser = await this.apDbResolverService.getAuthUserFromKeyId(activity.signature.creator); if (authUser == null) { - return 'skip: LD-Signatureのユーザーが取得できませんでした'; + throw new Bull.UnrecoverableError('skip: LD-Signatureのユーザーが取得できませんでした'); } if (authUser.key == null) { - return 'skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした'; + throw new Bull.UnrecoverableError('skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした'); } // LD-Signature検証 const ldSignature = this.ldSignatureService.use(); const verified = await ldSignature.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false); if (!verified) { - return 'skip: LD-Signatureの検証に失敗しました'; + throw new Bull.UnrecoverableError('skip: LD-Signatureの検証に失敗しました'); } // もう一度actorチェック if (authUser.user.uri !== activity.actor) { - return `skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`; + throw new Bull.UnrecoverableError(`skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`); } // ブロックしてたら中断 const ldHost = this.utilityService.extractDbHost(authUser.user.uri); if (this.utilityService.isBlockedHost(meta.blockedHosts, ldHost)) { - return `Blocked request: ${ldHost}`; + throw new Bull.UnrecoverableError(`Blocked request: ${ldHost}`); } } else { - return `skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`; + throw new Bull.UnrecoverableError(`skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`); } } @@ -168,7 +160,7 @@ export class InboxProcessorService { const signerHost = this.utilityService.extractDbHost(authUser.user.uri!); const activityIdHost = this.utilityService.extractDbHost(activity.id); if (signerHost !== activityIdHost) { - return `skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`; + throw new Bull.UnrecoverableError(`skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`); } } -- cgit v1.2.3-freya