summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/processors/InboxProcessorService.ts
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2023-05-29 11:54:49 +0900
committerGitHub <noreply@github.com>2023-05-29 11:54:49 +0900
commitfd7b77c542b51313d8b8ea60124725fe65a295d5 (patch)
tree78893fdfe273496831124414789376b1e2a18997 /packages/backend/src/queue/processors/InboxProcessorService.ts
parentpnpm devでCtrl+Cで終了させてもプロセスが完全に殺せないの... (diff)
downloadsharkey-fd7b77c542b51313d8b8ea60124725fe65a295d5.tar.gz
sharkey-fd7b77c542b51313d8b8ea60124725fe65a295d5.tar.bz2
sharkey-fd7b77c542b51313d8b8ea60124725fe65a295d5.zip
enhance(backend): migrate bull to bullmq (#10910)
* wip * wip * Update QueueService.ts * wip * refactor * :v: * fix * Update QueueStatsService.ts * refactor * Update ApNoteService.ts * Update mock-resolver.ts * refactor * Update mock-resolver.ts
Diffstat (limited to 'packages/backend/src/queue/processors/InboxProcessorService.ts')
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts38
1 files changed, 15 insertions, 23 deletions
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
index ab8b1e9e22..ce1d7aaa1b 100644
--- a/packages/backend/src/queue/processors/InboxProcessorService.ts
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -1,8 +1,8 @@
import { URL } from 'node:url';
import { Inject, Injectable } from '@nestjs/common';
import httpSignature from '@peertube/http-signature';
+import * as Bull from 'bullmq';
import { DI } from '@/di-symbols.js';
-import type { InstancesRepository, DriveFilesRepository } from '@/models/index.js';
import type { Config } from '@/config.js';
import type Logger from '@/logger.js';
import { MetaService } from '@/core/MetaService.js';
@@ -23,10 +23,8 @@ import { LdSignatureService } from '@/core/activitypub/LdSignatureService.js';
import { ApInboxService } from '@/core/activitypub/ApInboxService.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
-import type Bull from 'bull';
import type { InboxJobData } from '../types.js';
-// ユーザーのinboxにアクティビティが届いた時の処理
@Injectable()
export class InboxProcessorService {
private logger: Logger;
@@ -35,12 +33,6 @@ export class InboxProcessorService {
@Inject(DI.config)
private config: Config,
- @Inject(DI.instancesRepository)
- private instancesRepository: InstancesRepository,
-
- @Inject(DI.driveFilesRepository)
- private driveFilesRepository: DriveFilesRepository,
-
private utilityService: UtilityService,
private metaService: MetaService,
private apInboxService: ApInboxService,
@@ -93,24 +85,24 @@ export class InboxProcessorService {
try {
authUser = await this.apDbResolverService.getAuthUserFromApId(getApId(activity.actor));
} catch (err) {
- // 対象が4xxならスキップ
+ // 対象が4xxならスキップ
if (err instanceof StatusError) {
if (err.isClientError) {
- return `skip: Ignored deleted actors on both ends ${activity.actor} - ${err.statusCode}`;
+ throw new Bull.UnrecoverableError(`skip: Ignored deleted actors on both ends ${activity.actor} - ${err.statusCode}`);
}
- throw `Error in actor ${activity.actor} - ${err.statusCode ?? err}`;
+ throw new Error(`Error in actor ${activity.actor} - ${err.statusCode ?? err}`);
}
}
}
// それでもわからなければ終了
if (authUser == null) {
- return 'skip: failed to resolve user';
+ throw new Bull.UnrecoverableError('skip: failed to resolve user');
}
// publicKey がなくても終了
if (authUser.key == null) {
- return 'skip: failed to resolve user publicKey';
+ throw new Bull.UnrecoverableError('skip: failed to resolve user publicKey');
}
// HTTP-Signatureの検証
@@ -118,10 +110,10 @@ export class InboxProcessorService {
// また、signatureのsignerは、activity.actorと一致する必要がある
if (!httpSignatureValidated || authUser.user.uri !== activity.actor) {
- // 一致しなくても、でもLD-Signatureがありそうならそっちも見る
+ // 一致しなくても、でもLD-Signatureがありそうならそっちも見る
if (activity.signature) {
if (activity.signature.type !== 'RsaSignature2017') {
- return `skip: unsupported LD-signature type ${activity.signature.type}`;
+ throw new Bull.UnrecoverableError(`skip: unsupported LD-signature type ${activity.signature.type}`);
}
// activity.signature.creator: https://example.oom/users/user#main-key
@@ -134,32 +126,32 @@ export class InboxProcessorService {
// keyIdからLD-Signatureのユーザーを取得
authUser = await this.apDbResolverService.getAuthUserFromKeyId(activity.signature.creator);
if (authUser == null) {
- return 'skip: LD-Signatureのユーザーが取得できませんでした';
+ throw new Bull.UnrecoverableError('skip: LD-Signatureのユーザーが取得できませんでした');
}
if (authUser.key == null) {
- return 'skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした';
+ throw new Bull.UnrecoverableError('skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした');
}
// LD-Signature検証
const ldSignature = this.ldSignatureService.use();
const verified = await ldSignature.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false);
if (!verified) {
- return 'skip: LD-Signatureの検証に失敗しました';
+ throw new Bull.UnrecoverableError('skip: LD-Signatureの検証に失敗しました');
}
// もう一度actorチェック
if (authUser.user.uri !== activity.actor) {
- return `skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`;
+ throw new Bull.UnrecoverableError(`skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`);
}
// ブロックしてたら中断
const ldHost = this.utilityService.extractDbHost(authUser.user.uri);
if (this.utilityService.isBlockedHost(meta.blockedHosts, ldHost)) {
- return `Blocked request: ${ldHost}`;
+ throw new Bull.UnrecoverableError(`Blocked request: ${ldHost}`);
}
} else {
- return `skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`;
+ throw new Bull.UnrecoverableError(`skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`);
}
}
@@ -168,7 +160,7 @@ export class InboxProcessorService {
const signerHost = this.utilityService.extractDbHost(authUser.user.uri!);
const activityIdHost = this.utilityService.extractDbHost(activity.id);
if (signerHost !== activityIdHost) {
- return `skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`;
+ throw new Bull.UnrecoverableError(`skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`);
}
}