diff options
| author | dakkar <dakkar@thenautilus.net> | 2024-10-18 21:14:49 +0000 |
|---|---|---|
| committer | dakkar <dakkar@thenautilus.net> | 2024-10-18 21:14:49 +0000 |
| commit | ba17776b1961376b5369aa775e8910a227cc02ec (patch) | |
| tree | 0d0579dd911a6c9b74ebc84195cacf10eafe0918 /packages/backend/src/queue/processors/InboxProcessorService.ts | |
| parent | merge: Allow logged in users to refresh polls (!686) (diff) | |
| parent | Merge branch 'develop' into feature/2024.9.0 (diff) | |
| download | sharkey-ba17776b1961376b5369aa775e8910a227cc02ec.tar.gz sharkey-ba17776b1961376b5369aa775e8910a227cc02ec.tar.bz2 sharkey-ba17776b1961376b5369aa775e8910a227cc02ec.zip | |
merge: version 2024.9.0 (!675)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/675
Approved-by: Julia <julia@insertdomain.name>
Approved-by: Marie <github@yuugi.dev>
Diffstat (limited to 'packages/backend/src/queue/processors/InboxProcessorService.ts')
| -rw-r--r-- | packages/backend/src/queue/processors/InboxProcessorService.ts | 67 |
1 files changed, 53 insertions, 14 deletions
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 641b8b8607..8b3d2ebb50 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -4,11 +4,10 @@ */ import { URL } from 'node:url'; -import { Injectable } from '@nestjs/common'; +import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import httpSignature from '@peertube/http-signature'; import * as Bull from 'bullmq'; import type Logger from '@/logger.js'; -import { MetaService } from '@/core/MetaService.js'; import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js'; import InstanceChart from '@/core/chart/charts/instance.js'; @@ -26,16 +25,28 @@ import { JsonLdService } from '@/core/activitypub/JsonLdService.js'; import { ApInboxService } from '@/core/activitypub/ApInboxService.js'; import { bindThis } from '@/decorators.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; +import { CollapsedQueue } from '@/misc/collapsed-queue.js'; +import { MiNote } from '@/models/Note.js'; +import { MiMeta } from '@/models/Meta.js'; +import { DI } from '@/di-symbols.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { InboxJobData } from '../types.js'; +type UpdateInstanceJob = { + latestRequestReceivedAt: Date, + shouldUnsuspend: boolean, +}; + @Injectable() -export class InboxProcessorService { +export class InboxProcessorService implements OnApplicationShutdown { private logger: Logger; + private updateInstanceQueue: CollapsedQueue<MiNote['id'], UpdateInstanceJob>; constructor( + @Inject(DI.meta) + private meta: MiMeta, + private utilityService: UtilityService, - private metaService: MetaService, private apInboxService: ApInboxService, private federatedInstanceService: FederatedInstanceService, private fetchInstanceMetadataService: FetchInstanceMetadataService, @@ -48,6 +59,7 @@ export class InboxProcessorService { private queueLoggerService: QueueLoggerService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('inbox'); + this.updateInstanceQueue = new CollapsedQueue(60 * 1000 * 5, this.collapseUpdateInstanceJobs, this.performUpdateInstance); } @bindThis @@ -63,9 +75,7 @@ export class InboxProcessorService { const host = this.utilityService.toPuny(new URL(signature.keyId).hostname); - // ブロックしてたら中断 - const meta = await this.metaService.fetch(); - if (this.utilityService.isBlockedHost(meta.blockedHosts, host)) { + if (!this.utilityService.isFederationAllowedHost(host)) { return `Blocked request: ${host}`; } @@ -169,9 +179,8 @@ export class InboxProcessorService { throw new Bull.UnrecoverableError(`skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`); } - // ブロックしてたら中断 const ldHost = this.utilityService.extractDbHost(authUser.user.uri); - if (this.utilityService.isBlockedHost(meta.blockedHosts, ldHost)) { + if (!this.utilityService.isFederationAllowedHost(ldHost)) { throw new Bull.UnrecoverableError(`Blocked request: ${ldHost}`); } } else { @@ -190,11 +199,9 @@ export class InboxProcessorService { // Update stats this.federatedInstanceService.fetch(authUser.user.host).then(i => { - this.federatedInstanceService.update(i.id, { + this.updateInstanceQueue.enqueue(i.id, { latestRequestReceivedAt: new Date(), - isNotResponding: false, - // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる - suspensionState: i.suspensionState === 'autoSuspendedForNotResponding' ? 'none' : undefined, + shouldUnsuspend: i.suspensionState === 'autoSuspendedForNotResponding', }); this.fetchInstanceMetadataService.fetchInstanceMetadata(i); @@ -202,7 +209,7 @@ export class InboxProcessorService { this.apRequestChart.inbox(); this.federationChart.inbox(i.host); - if (meta.enableChartsForFederatedInstances) { + if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.requestReceived(i.host); } }); @@ -230,4 +237,36 @@ export class InboxProcessorService { } return 'ok'; } + + @bindThis + public collapseUpdateInstanceJobs(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) { + const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt + ? newJob.latestRequestReceivedAt + : oldJob.latestRequestReceivedAt; + const shouldUnsuspend = oldJob.shouldUnsuspend || newJob.shouldUnsuspend; + return { + latestRequestReceivedAt, + shouldUnsuspend, + }; + } + + @bindThis + public async performUpdateInstance(id: string, job: UpdateInstanceJob) { + await this.federatedInstanceService.update(id, { + latestRequestReceivedAt: new Date(), + isNotResponding: false, + // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる + suspensionState: job.shouldUnsuspend ? 'none' : undefined, + }); + } + + @bindThis + public async dispose(): Promise<void> { + await this.updateInstanceQueue.performAllNow(); + } + + @bindThis + async onApplicationShutdown(signal?: string) { + await this.dispose(); + } } |