summaryrefslogtreecommitdiff
path: root/packages/backend
diff options
context:
space:
mode:
authorKOBA789 <kobahide789@gmail.com>2024-09-26 10:25:20 +0900
committerGitHub <noreply@github.com>2024-09-26 10:25:20 +0900
commit7134d24c1f25859e7e092f757ecd327469d75a8f (patch)
tree672ebd9c7dbb888a1f7e485fc779f9d61ac3bc8f /packages/backend
parentUpdate CHANGELOG.md (diff)
downloadsharkey-7134d24c1f25859e7e092f757ecd327469d75a8f.tar.gz
sharkey-7134d24c1f25859e7e092f757ecd327469d75a8f.tar.bz2
sharkey-7134d24c1f25859e7e092f757ecd327469d75a8f.zip
perf(backend): Defer instance metadata update (#14558)
* Defer instance metadata update * Fix last new line * Fix typo * Add license notice * Fix syntax * Perform deferred jobs on shutdown * Fix missing async/await * Fix typo :) * Update collapsed-queue.ts --------- Co-authored-by: syuilo <4439005+syuilo@users.noreply.github.com>
Diffstat (limited to 'packages/backend')
-rw-r--r--packages/backend/src/core/NoteCreateService.ts25
-rw-r--r--packages/backend/src/misc/collapsed-queue.ts44
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts55
3 files changed, 111 insertions, 13 deletions
diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts
index 18efc9d562..89e3eafa0e 100644
--- a/packages/backend/src/core/NoteCreateService.ts
+++ b/packages/backend/src/core/NoteCreateService.ts
@@ -55,6 +55,7 @@ import { UserBlockingService } from '@/core/UserBlockingService.js';
import { isReply } from '@/misc/is-reply.js';
import { trackPromise } from '@/misc/promise-tracker.js';
import { IdentifiableError } from '@/misc/identifiable-error.js';
+import { CollapsedQueue } from '@/misc/collapsed-queue.js';
type NotificationType = 'reply' | 'renote' | 'quote' | 'mention';
@@ -146,6 +147,7 @@ type Option = {
@Injectable()
export class NoteCreateService implements OnApplicationShutdown {
#shutdownController = new AbortController();
+ private updateNotesCountQueue: CollapsedQueue<MiNote['id'], number>;
constructor(
@Inject(DI.config)
@@ -215,7 +217,9 @@ export class NoteCreateService implements OnApplicationShutdown {
private instanceChart: InstanceChart,
private utilityService: UtilityService,
private userBlockingService: UserBlockingService,
- ) { }
+ ) {
+ this.updateNotesCountQueue = new CollapsedQueue(60 * 1000 * 5, this.collapseNotesCount, this.performUpdateNotesCount);
+ }
@bindThis
public async create(user: {
@@ -509,7 +513,7 @@ export class NoteCreateService implements OnApplicationShutdown {
// Register host
if (this.userEntityService.isRemoteUser(user)) {
this.federatedInstanceService.fetch(user.host).then(async i => {
- this.instancesRepository.increment({ id: i.id }, 'notesCount', 1);
+ this.updateNotesCountQueue.enqueue(i.id, 1);
if (this.meta.enableChartsForFederatedInstances) {
this.instanceChart.updateNote(i.host, note, true);
}
@@ -1028,12 +1032,23 @@ export class NoteCreateService implements OnApplicationShutdown {
}
@bindThis
- public dispose(): void {
+ private collapseNotesCount(oldValue: number, newValue: number) {
+ return oldValue + newValue;
+ }
+
+ @bindThis
+ private async performUpdateNotesCount(id: MiNote['id'], incrBy: number) {
+ await this.instancesRepository.increment({ id: id }, 'notesCount', incrBy);
+ }
+
+ @bindThis
+ public async dispose(): Promise<void> {
this.#shutdownController.abort();
+ await this.updateNotesCountQueue.performAllNow();
}
@bindThis
- public onApplicationShutdown(signal?: string | undefined): void {
- this.dispose();
+ public async onApplicationShutdown(signal?: string | undefined): Promise<void> {
+ await this.dispose();
}
}
diff --git a/packages/backend/src/misc/collapsed-queue.ts b/packages/backend/src/misc/collapsed-queue.ts
new file mode 100644
index 0000000000..5bc20a78ae
--- /dev/null
+++ b/packages/backend/src/misc/collapsed-queue.ts
@@ -0,0 +1,44 @@
+/*
+ * SPDX-FileCopyrightText: syuilo and misskey-project
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+type Job<V> = {
+ value: V;
+ timer: NodeJS.Timeout;
+};
+
+// TODO: redis使えるようにする
+export class CollapsedQueue<K, V> {
+ private jobs: Map<K, Job<V>> = new Map();
+
+ constructor(
+ private timeout: number,
+ private collapse: (oldValue: V, newValue: V) => V,
+ private perform: (key: K, value: V) => Promise<void>,
+ ) {}
+
+ enqueue(key: K, value: V) {
+ if (this.jobs.has(key)) {
+ const old = this.jobs.get(key)!;
+ const merged = this.collapse(old.value, value);
+ this.jobs.set(key, { ...old, value: merged });
+ } else {
+ const timer = setTimeout(() => {
+ const job = this.jobs.get(key)!;
+ this.jobs.delete(key);
+ this.perform(key, job.value);
+ }, this.timeout);
+ this.jobs.set(key, { value, timer });
+ }
+ }
+
+ async performAllNow() {
+ const entries = [...this.jobs.entries()];
+ this.jobs.clear();
+ for (const [_key, job] of entries) {
+ clearTimeout(job.timer);
+ }
+ await Promise.allSettled(entries.map(([key, job]) => this.perform(key, job.value)));
+ }
+}
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
index 2df37bedf4..68999b5d17 100644
--- a/packages/backend/src/queue/processors/InboxProcessorService.ts
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -4,7 +4,7 @@
*/
import { URL } from 'node:url';
-import { Inject, Injectable } from '@nestjs/common';
+import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import httpSignature from '@peertube/http-signature';
import * as Bull from 'bullmq';
import type Logger from '@/logger.js';
@@ -25,14 +25,22 @@ import { JsonLdService } from '@/core/activitypub/JsonLdService.js';
import { ApInboxService } from '@/core/activitypub/ApInboxService.js';
import { bindThis } from '@/decorators.js';
import { IdentifiableError } from '@/misc/identifiable-error.js';
-import { QueueLoggerService } from '../QueueLoggerService.js';
-import type { InboxJobData } from '../types.js';
+import { CollapsedQueue } from '@/misc/collapsed-queue.js';
+import { MiNote } from '@/models/Note.js';
import { MiMeta } from '@/models/Meta.js';
import { DI } from '@/di-symbols.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type { InboxJobData } from '../types.js';
+
+type UpdateInstanceJob = {
+ latestRequestReceivedAt: Date,
+ shouldUnsuspend: boolean,
+};
@Injectable()
-export class InboxProcessorService {
+export class InboxProcessorService implements OnApplicationShutdown {
private logger: Logger;
+ private updateInstanceQueue: CollapsedQueue<MiNote['id'], UpdateInstanceJob>;
constructor(
@Inject(DI.meta)
@@ -51,6 +59,7 @@ export class InboxProcessorService {
private queueLoggerService: QueueLoggerService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('inbox');
+ this.updateInstanceQueue = new CollapsedQueue(60 * 1000 * 5, this.collapseUpdateInstanceJobs, this.performUpdateInstance);
}
@bindThis
@@ -187,11 +196,9 @@ export class InboxProcessorService {
// Update stats
this.federatedInstanceService.fetch(authUser.user.host).then(i => {
- this.federatedInstanceService.update(i.id, {
+ this.updateInstanceQueue.enqueue(i.id, {
latestRequestReceivedAt: new Date(),
- isNotResponding: false,
- // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる
- suspensionState: i.suspensionState === 'autoSuspendedForNotResponding' ? 'none' : undefined,
+ shouldUnsuspend: i.suspensionState === 'autoSuspendedForNotResponding',
});
this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
@@ -227,4 +234,36 @@ export class InboxProcessorService {
}
return 'ok';
}
+
+ @bindThis
+ public collapseUpdateInstanceJobs(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) {
+ const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt
+ ? newJob.latestRequestReceivedAt
+ : oldJob.latestRequestReceivedAt;
+ const shouldUnsuspend = oldJob.shouldUnsuspend || newJob.shouldUnsuspend;
+ return {
+ latestRequestReceivedAt,
+ shouldUnsuspend,
+ };
+ }
+
+ @bindThis
+ public async performUpdateInstance(id: string, job: UpdateInstanceJob) {
+ await this.federatedInstanceService.update(id, {
+ latestRequestReceivedAt: new Date(),
+ isNotResponding: false,
+ // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる
+ suspensionState: job.shouldUnsuspend ? 'none' : undefined,
+ });
+ }
+
+ @bindThis
+ public async dispose(): Promise<void> {
+ await this.updateInstanceQueue.performAllNow();
+ }
+
+ @bindThis
+ async onApplicationShutdown(signal?: string) {
+ await this.dispose();
+ }
}