summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/processors/InboxProcessorService.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/queue/processors/InboxProcessorService.ts')
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts86
1 files changed, 61 insertions, 25 deletions
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
index 641b8b8607..11b00bb683 100644
--- a/packages/backend/src/queue/processors/InboxProcessorService.ts
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -4,11 +4,10 @@
*/
import { URL } from 'node:url';
-import { Injectable } from '@nestjs/common';
+import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import httpSignature from '@peertube/http-signature';
import * as Bull from 'bullmq';
import type Logger from '@/logger.js';
-import { MetaService } from '@/core/MetaService.js';
import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js';
import InstanceChart from '@/core/chart/charts/instance.js';
@@ -26,16 +25,28 @@ import { JsonLdService } from '@/core/activitypub/JsonLdService.js';
import { ApInboxService } from '@/core/activitypub/ApInboxService.js';
import { bindThis } from '@/decorators.js';
import { IdentifiableError } from '@/misc/identifiable-error.js';
+import { CollapsedQueue } from '@/misc/collapsed-queue.js';
+import { MiNote } from '@/models/Note.js';
+import { MiMeta } from '@/models/Meta.js';
+import { DI } from '@/di-symbols.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type { InboxJobData } from '../types.js';
+type UpdateInstanceJob = {
+ latestRequestReceivedAt: Date,
+ shouldUnsuspend: boolean,
+};
+
@Injectable()
-export class InboxProcessorService {
+export class InboxProcessorService implements OnApplicationShutdown {
private logger: Logger;
+ private updateInstanceQueue: CollapsedQueue<MiNote['id'], UpdateInstanceJob>;
constructor(
+ @Inject(DI.meta)
+ private meta: MiMeta,
+
private utilityService: UtilityService,
- private metaService: MetaService,
private apInboxService: ApInboxService,
private federatedInstanceService: FederatedInstanceService,
private fetchInstanceMetadataService: FetchInstanceMetadataService,
@@ -48,6 +59,7 @@ export class InboxProcessorService {
private queueLoggerService: QueueLoggerService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('inbox');
+ this.updateInstanceQueue = new CollapsedQueue(60 * 1000 * 5, this.collapseUpdateInstanceJobs, this.performUpdateInstance);
}
@bindThis
@@ -63,9 +75,7 @@ export class InboxProcessorService {
const host = this.utilityService.toPuny(new URL(signature.keyId).hostname);
- // ブロックしてたら中断
- const meta = await this.metaService.fetch();
- if (this.utilityService.isBlockedHost(meta.blockedHosts, host)) {
+ if (!this.utilityService.isFederationAllowedHost(host)) {
return `Blocked request: ${host}`;
}
@@ -108,19 +118,16 @@ export class InboxProcessorService {
// HTTP-Signatureの検証
let httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem);
- // また、signatureのsignerは、activity.actorと一致する必要がある
- if (!httpSignatureValidated || authUser.user.uri !== activity.actor) {
- let renewKeyFailed = true;
-
- if (!httpSignatureValidated) {
- authUser.key = await this.apDbResolverService.refetchPublicKeyForApId(authUser.user);
-
- if (authUser.key != null) {
- httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem);
- renewKeyFailed = false;
- }
+ // maybe they changed their key? refetch it
+ if (!httpSignatureValidated) {
+ authUser.key = await this.apDbResolverService.refetchPublicKeyForApId(authUser.user);
+ if (authUser.key != null) {
+ httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem);
}
+ }
+ // また、signatureのsignerは、activity.actorと一致する必要がある
+ if (!httpSignatureValidated || authUser.user.uri !== getApId(activity.actor)) {
// 一致しなくても、でもLD-Signatureがありそうならそっちも見る
const ldSignature = activity.signature;
if (ldSignature) {
@@ -169,9 +176,8 @@ export class InboxProcessorService {
throw new Bull.UnrecoverableError(`skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`);
}
- // ブロックしてたら中断
const ldHost = this.utilityService.extractDbHost(authUser.user.uri);
- if (this.utilityService.isBlockedHost(meta.blockedHosts, ldHost)) {
+ if (!this.utilityService.isFederationAllowedHost(ldHost)) {
throw new Bull.UnrecoverableError(`Blocked request: ${ldHost}`);
}
} else {
@@ -190,11 +196,9 @@ export class InboxProcessorService {
// Update stats
this.federatedInstanceService.fetch(authUser.user.host).then(i => {
- this.federatedInstanceService.update(i.id, {
+ this.updateInstanceQueue.enqueue(i.id, {
latestRequestReceivedAt: new Date(),
- isNotResponding: false,
- // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる
- suspensionState: i.suspensionState === 'autoSuspendedForNotResponding' ? 'none' : undefined,
+ shouldUnsuspend: i.suspensionState === 'autoSuspendedForNotResponding',
});
this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
@@ -202,7 +206,7 @@ export class InboxProcessorService {
this.apRequestChart.inbox();
this.federationChart.inbox(i.host);
- if (meta.enableChartsForFederatedInstances) {
+ if (this.meta.enableChartsForFederatedInstances) {
this.instanceChart.requestReceived(i.host);
}
});
@@ -230,4 +234,36 @@ export class InboxProcessorService {
}
return 'ok';
}
+
+ @bindThis
+ public collapseUpdateInstanceJobs(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) {
+ const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt
+ ? newJob.latestRequestReceivedAt
+ : oldJob.latestRequestReceivedAt;
+ const shouldUnsuspend = oldJob.shouldUnsuspend || newJob.shouldUnsuspend;
+ return {
+ latestRequestReceivedAt,
+ shouldUnsuspend,
+ };
+ }
+
+ @bindThis
+ public async performUpdateInstance(id: string, job: UpdateInstanceJob) {
+ await this.federatedInstanceService.update(id, {
+ latestRequestReceivedAt: new Date(),
+ isNotResponding: false,
+ // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる
+ suspensionState: job.shouldUnsuspend ? 'none' : undefined,
+ });
+ }
+
+ @bindThis
+ public async dispose(): Promise<void> {
+ await this.updateInstanceQueue.performAllNow();
+ }
+
+ @bindThis
+ async onApplicationShutdown(signal?: string) {
+ await this.dispose();
+ }
}