diff options
| author | KOBA789 <kobahide789@gmail.com> | 2024-09-26 10:25:20 +0900 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-09-26 10:25:20 +0900 |
| commit | 7134d24c1f25859e7e092f757ecd327469d75a8f (patch) | |
| tree | 672ebd9c7dbb888a1f7e485fc779f9d61ac3bc8f /packages/backend/src/queue | |
| parent | Update CHANGELOG.md (diff) | |
| download | sharkey-7134d24c1f25859e7e092f757ecd327469d75a8f.tar.gz sharkey-7134d24c1f25859e7e092f757ecd327469d75a8f.tar.bz2 sharkey-7134d24c1f25859e7e092f757ecd327469d75a8f.zip | |
perf(backend): Defer instance metadata update (#14558)
* Defer instance metadata update
* Fix last new line
* Fix typo
* Add license notice
* Fix syntax
* Perform deferred jobs on shutdown
* Fix missing async/await
* Fix typo :)
* Update collapsed-queue.ts
---------
Co-authored-by: syuilo <4439005+syuilo@users.noreply.github.com>
Diffstat (limited to 'packages/backend/src/queue')
| -rw-r--r-- | packages/backend/src/queue/processors/InboxProcessorService.ts | 55 |
1 files changed, 47 insertions, 8 deletions
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 2df37bedf4..68999b5d17 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -4,7 +4,7 @@ */ import { URL } from 'node:url'; -import { Inject, Injectable } from '@nestjs/common'; +import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import httpSignature from '@peertube/http-signature'; import * as Bull from 'bullmq'; import type Logger from '@/logger.js'; @@ -25,14 +25,22 @@ import { JsonLdService } from '@/core/activitypub/JsonLdService.js'; import { ApInboxService } from '@/core/activitypub/ApInboxService.js'; import { bindThis } from '@/decorators.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; -import type { InboxJobData } from '../types.js'; +import { CollapsedQueue } from '@/misc/collapsed-queue.js'; +import { MiNote } from '@/models/Note.js'; import { MiMeta } from '@/models/Meta.js'; import { DI } from '@/di-symbols.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type { InboxJobData } from '../types.js'; + +type UpdateInstanceJob = { + latestRequestReceivedAt: Date, + shouldUnsuspend: boolean, +}; @Injectable() -export class InboxProcessorService { +export class InboxProcessorService implements OnApplicationShutdown { private logger: Logger; + private updateInstanceQueue: CollapsedQueue<MiNote['id'], UpdateInstanceJob>; constructor( @Inject(DI.meta) @@ -51,6 +59,7 @@ export class InboxProcessorService { private queueLoggerService: QueueLoggerService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('inbox'); + this.updateInstanceQueue = new CollapsedQueue(60 * 1000 * 5, this.collapseUpdateInstanceJobs, this.performUpdateInstance); } @bindThis @@ -187,11 +196,9 @@ export class InboxProcessorService { // Update stats this.federatedInstanceService.fetch(authUser.user.host).then(i => { - this.federatedInstanceService.update(i.id, { + this.updateInstanceQueue.enqueue(i.id, { latestRequestReceivedAt: new Date(), - isNotResponding: false, - // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる - suspensionState: i.suspensionState === 'autoSuspendedForNotResponding' ? 'none' : undefined, + shouldUnsuspend: i.suspensionState === 'autoSuspendedForNotResponding', }); this.fetchInstanceMetadataService.fetchInstanceMetadata(i); @@ -227,4 +234,36 @@ export class InboxProcessorService { } return 'ok'; } + + @bindThis + public collapseUpdateInstanceJobs(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) { + const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt + ? newJob.latestRequestReceivedAt + : oldJob.latestRequestReceivedAt; + const shouldUnsuspend = oldJob.shouldUnsuspend || newJob.shouldUnsuspend; + return { + latestRequestReceivedAt, + shouldUnsuspend, + }; + } + + @bindThis + public async performUpdateInstance(id: string, job: UpdateInstanceJob) { + await this.federatedInstanceService.update(id, { + latestRequestReceivedAt: new Date(), + isNotResponding: false, + // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる + suspensionState: job.shouldUnsuspend ? 'none' : undefined, + }); + } + + @bindThis + public async dispose(): Promise<void> { + await this.updateInstanceQueue.performAllNow(); + } + + @bindThis + async onApplicationShutdown(signal?: string) { + await this.dispose(); + } } |