From 023fa30280e561e9921a2c83138af4cac01068ab Mon Sep 17 00:00:00 2001 From: syuilo <4439005+syuilo@users.noreply.github.com> Date: Sun, 22 Sep 2024 12:53:13 +0900 Subject: refactor/perf(backend): provide metadata statically (#14601) * wip * Update ReactionService.ts * Update ApiCallService.ts * Update timeline.ts * Update GlobalModule.ts * Update GlobalModule.ts * Update NoteEntityService.ts * wip * wip * wip * Update ApPersonService.ts * wip * Update GlobalModule.ts * Update mock-resolver.ts * Update RoleService.ts * Update activitypub.ts * Update activitypub.ts * Update activitypub.ts * Update activitypub.ts * Update activitypub.ts * clean up * Update utils.ts * Update UtilityService.ts * Revert "Update utils.ts" This reverts commit a27d4be764b78c1b5a9eac685e261fee49331d89. * Revert "Update UtilityService.ts" This reverts commit e5fd9e004c482cf099252201c0c1aa888e001430. * vuwa- * Revert "vuwa-" This reverts commit 0c3bd12472b4b9938cdff2d6f131e6800bc3724c. * Update entry.ts * Update entry.ts * Update entry.ts * Update entry.ts * Update jest.setup.ts --- packages/backend/src/core/NoteCreateService.ts | 60 +++++++++++--------------- 1 file changed, 26 insertions(+), 34 deletions(-) (limited to 'packages/backend/src/core/NoteCreateService.ts') diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index 1d8d248322..18efc9d562 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -8,13 +8,12 @@ import * as mfm from 'mfm-js'; import { In, DataSource, IsNull, LessThan } from 'typeorm'; import * as Redis from 'ioredis'; import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; -import RE2 from 're2'; import { extractMentions } from '@/misc/extract-mentions.js'; import { extractCustomEmojisFromMfm } from '@/misc/extract-custom-emojis-from-mfm.js'; import { extractHashtags } from '@/misc/extract-hashtags.js'; import type { IMentionedRemoteUsers } from '@/models/Note.js'; import { MiNote } from '@/models/Note.js'; -import type { ChannelFollowingsRepository, ChannelsRepository, FollowingsRepository, InstancesRepository, MiFollowing, MutingsRepository, NotesRepository, NoteThreadMutingsRepository, UserListMembershipsRepository, UserProfilesRepository, UsersRepository } from '@/models/_.js'; +import type { ChannelFollowingsRepository, ChannelsRepository, FollowingsRepository, InstancesRepository, MiFollowing, MiMeta, MutingsRepository, NotesRepository, NoteThreadMutingsRepository, UserListMembershipsRepository, UserProfilesRepository, UsersRepository } from '@/models/_.js'; import type { MiDriveFile } from '@/models/DriveFile.js'; import type { MiApp } from '@/models/App.js'; import { concat } from '@/misc/prelude/array.js'; @@ -23,11 +22,8 @@ import type { MiUser, MiLocalUser, MiRemoteUser } from '@/models/User.js'; import type { IPoll } from '@/models/Poll.js'; import { MiPoll } from '@/models/Poll.js'; import { isDuplicateKeyValueError } from '@/misc/is-duplicate-key-value-error.js'; -import { checkWordMute } from '@/misc/check-word-mute.js'; import type { MiChannel } from '@/models/Channel.js'; import { normalizeForSearch } from '@/misc/normalize-for-search.js'; -import { MemorySingleCache } from '@/misc/cache.js'; -import type { MiUserProfile } from '@/models/UserProfile.js'; import { RelayService } from '@/core/RelayService.js'; import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; import { DI } from '@/di-symbols.js'; @@ -51,7 +47,6 @@ import { RemoteUserResolveService } from '@/core/RemoteUserResolveService.js'; import { bindThis } from '@/decorators.js'; import { DB_MAX_NOTE_TEXT_LENGTH } from '@/const.js'; import { RoleService } from '@/core/RoleService.js'; -import { MetaService } from '@/core/MetaService.js'; import { SearchService } from '@/core/SearchService.js'; import { FeaturedService } from '@/core/FeaturedService.js'; import { FanoutTimelineService } from '@/core/FanoutTimelineService.js'; @@ -156,6 +151,9 @@ export class NoteCreateService implements OnApplicationShutdown { @Inject(DI.config) private config: Config, + @Inject(DI.meta) + private meta: MiMeta, + @Inject(DI.db) private db: DataSource, @@ -210,7 +208,6 @@ export class NoteCreateService implements OnApplicationShutdown { private apDeliverManagerService: ApDeliverManagerService, private apRendererService: ApRendererService, private roleService: RoleService, - private metaService: MetaService, private searchService: SearchService, private notesChart: NotesChart, private perUserNotesChart: PerUserNotesChart, @@ -251,10 +248,8 @@ export class NoteCreateService implements OnApplicationShutdown { if (data.channel != null) data.visibleUsers = []; if (data.channel != null) data.localOnly = true; - const meta = await this.metaService.fetch(); - if (data.visibility === 'public' && data.channel == null) { - const sensitiveWords = meta.sensitiveWords; + const sensitiveWords = this.meta.sensitiveWords; if (this.utilityService.isKeyWordIncluded(data.cw ?? data.text ?? '', sensitiveWords)) { data.visibility = 'home'; } else if ((await this.roleService.getUserPolicies(user.id)).canPublicNote === false) { @@ -262,17 +257,17 @@ export class NoteCreateService implements OnApplicationShutdown { } } - const hasProhibitedWords = await this.checkProhibitedWordsContain({ + const hasProhibitedWords = this.checkProhibitedWordsContain({ cw: data.cw, text: data.text, pollChoices: data.poll?.choices, - }, meta.prohibitedWords); + }, this.meta.prohibitedWords); if (hasProhibitedWords) { throw new IdentifiableError('689ee33f-f97c-479a-ac49-1b9f8140af99', 'Note contains prohibited words'); } - const inSilencedInstance = this.utilityService.isSilencedHost(meta.silencedHosts, user.host); + const inSilencedInstance = this.utilityService.isSilencedHost(this.meta.silencedHosts, user.host); if (data.visibility === 'public' && inSilencedInstance && user.host !== null) { data.visibility = 'home'; @@ -365,7 +360,7 @@ export class NoteCreateService implements OnApplicationShutdown { } // if the host is media-silenced, custom emojis are not allowed - if (this.utilityService.isMediaSilencedHost(meta.mediaSilencedHosts, user.host)) emojis = []; + if (this.utilityService.isMediaSilencedHost(this.meta.mediaSilencedHosts, user.host)) emojis = []; tags = tags.filter(tag => Array.from(tag).length <= 128).splice(0, 32); @@ -506,10 +501,8 @@ export class NoteCreateService implements OnApplicationShutdown { host: MiUser['host']; isBot: MiUser['isBot']; }, data: Option, silent: boolean, tags: string[], mentionedUsers: MinimumUser[]) { - const meta = await this.metaService.fetch(); - this.notesChart.update(note, true); - if (note.visibility !== 'specified' && (meta.enableChartsForRemoteUser || (user.host == null))) { + if (note.visibility !== 'specified' && (this.meta.enableChartsForRemoteUser || (user.host == null))) { this.perUserNotesChart.update(user, note, true); } @@ -517,7 +510,7 @@ export class NoteCreateService implements OnApplicationShutdown { if (this.userEntityService.isRemoteUser(user)) { this.federatedInstanceService.fetch(user.host).then(async i => { this.instancesRepository.increment({ id: i.id }, 'notesCount', 1); - if ((await this.metaService.fetch()).enableChartsForFederatedInstances) { + if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.updateNote(i.host, note, true); } }); @@ -853,15 +846,14 @@ export class NoteCreateService implements OnApplicationShutdown { @bindThis private async pushToTl(note: MiNote, user: { id: MiUser['id']; host: MiUser['host']; }) { - const meta = await this.metaService.fetch(); - if (!meta.enableFanoutTimeline) return; + if (!this.meta.enableFanoutTimeline) return; const r = this.redisForTimelines.pipeline(); if (note.channelId) { this.fanoutTimelineService.push(`channelTimeline:${note.channelId}`, note.id, this.config.perChannelMaxNoteCacheCount, r); - this.fanoutTimelineService.push(`userTimelineWithChannel:${user.id}`, note.id, note.userHost == null ? meta.perLocalUserUserTimelineCacheMax : meta.perRemoteUserUserTimelineCacheMax, r); + this.fanoutTimelineService.push(`userTimelineWithChannel:${user.id}`, note.id, note.userHost == null ? this.meta.perLocalUserUserTimelineCacheMax : this.meta.perRemoteUserUserTimelineCacheMax, r); const channelFollowings = await this.channelFollowingsRepository.find({ where: { @@ -871,9 +863,9 @@ export class NoteCreateService implements OnApplicationShutdown { }); for (const channelFollowing of channelFollowings) { - this.fanoutTimelineService.push(`homeTimeline:${channelFollowing.followerId}`, note.id, meta.perUserHomeTimelineCacheMax, r); + this.fanoutTimelineService.push(`homeTimeline:${channelFollowing.followerId}`, note.id, this.meta.perUserHomeTimelineCacheMax, r); if (note.fileIds.length > 0) { - this.fanoutTimelineService.push(`homeTimelineWithFiles:${channelFollowing.followerId}`, note.id, meta.perUserHomeTimelineCacheMax / 2, r); + this.fanoutTimelineService.push(`homeTimelineWithFiles:${channelFollowing.followerId}`, note.id, this.meta.perUserHomeTimelineCacheMax / 2, r); } } } else { @@ -911,9 +903,9 @@ export class NoteCreateService implements OnApplicationShutdown { if (!following.withReplies) continue; } - this.fanoutTimelineService.push(`homeTimeline:${following.followerId}`, note.id, meta.perUserHomeTimelineCacheMax, r); + this.fanoutTimelineService.push(`homeTimeline:${following.followerId}`, note.id, this.meta.perUserHomeTimelineCacheMax, r); if (note.fileIds.length > 0) { - this.fanoutTimelineService.push(`homeTimelineWithFiles:${following.followerId}`, note.id, meta.perUserHomeTimelineCacheMax / 2, r); + this.fanoutTimelineService.push(`homeTimelineWithFiles:${following.followerId}`, note.id, this.meta.perUserHomeTimelineCacheMax / 2, r); } } @@ -930,25 +922,25 @@ export class NoteCreateService implements OnApplicationShutdown { if (!userListMembership.withReplies) continue; } - this.fanoutTimelineService.push(`userListTimeline:${userListMembership.userListId}`, note.id, meta.perUserListTimelineCacheMax, r); + this.fanoutTimelineService.push(`userListTimeline:${userListMembership.userListId}`, note.id, this.meta.perUserListTimelineCacheMax, r); if (note.fileIds.length > 0) { - this.fanoutTimelineService.push(`userListTimelineWithFiles:${userListMembership.userListId}`, note.id, meta.perUserListTimelineCacheMax / 2, r); + this.fanoutTimelineService.push(`userListTimelineWithFiles:${userListMembership.userListId}`, note.id, this.meta.perUserListTimelineCacheMax / 2, r); } } // 自分自身のHTL if (note.userHost == null) { if (note.visibility !== 'specified' || !note.visibleUserIds.some(v => v === user.id)) { - this.fanoutTimelineService.push(`homeTimeline:${user.id}`, note.id, meta.perUserHomeTimelineCacheMax, r); + this.fanoutTimelineService.push(`homeTimeline:${user.id}`, note.id, this.meta.perUserHomeTimelineCacheMax, r); if (note.fileIds.length > 0) { - this.fanoutTimelineService.push(`homeTimelineWithFiles:${user.id}`, note.id, meta.perUserHomeTimelineCacheMax / 2, r); + this.fanoutTimelineService.push(`homeTimelineWithFiles:${user.id}`, note.id, this.meta.perUserHomeTimelineCacheMax / 2, r); } } } // 自分自身以外への返信 if (isReply(note)) { - this.fanoutTimelineService.push(`userTimelineWithReplies:${user.id}`, note.id, note.userHost == null ? meta.perLocalUserUserTimelineCacheMax : meta.perRemoteUserUserTimelineCacheMax, r); + this.fanoutTimelineService.push(`userTimelineWithReplies:${user.id}`, note.id, note.userHost == null ? this.meta.perLocalUserUserTimelineCacheMax : this.meta.perRemoteUserUserTimelineCacheMax, r); if (note.visibility === 'public' && note.userHost == null) { this.fanoutTimelineService.push('localTimelineWithReplies', note.id, 300, r); @@ -957,9 +949,9 @@ export class NoteCreateService implements OnApplicationShutdown { } } } else { - this.fanoutTimelineService.push(`userTimeline:${user.id}`, note.id, note.userHost == null ? meta.perLocalUserUserTimelineCacheMax : meta.perRemoteUserUserTimelineCacheMax, r); + this.fanoutTimelineService.push(`userTimeline:${user.id}`, note.id, note.userHost == null ? this.meta.perLocalUserUserTimelineCacheMax : this.meta.perRemoteUserUserTimelineCacheMax, r); if (note.fileIds.length > 0) { - this.fanoutTimelineService.push(`userTimelineWithFiles:${user.id}`, note.id, note.userHost == null ? meta.perLocalUserUserTimelineCacheMax / 2 : meta.perRemoteUserUserTimelineCacheMax / 2, r); + this.fanoutTimelineService.push(`userTimelineWithFiles:${user.id}`, note.id, note.userHost == null ? this.meta.perLocalUserUserTimelineCacheMax / 2 : this.meta.perRemoteUserUserTimelineCacheMax / 2, r); } if (note.visibility === 'public' && note.userHost == null) { @@ -1018,9 +1010,9 @@ export class NoteCreateService implements OnApplicationShutdown { } } - public async checkProhibitedWordsContain(content: Parameters[0], prohibitedWords?: string[]) { + public checkProhibitedWordsContain(content: Parameters[0], prohibitedWords?: string[]) { if (prohibitedWords == null) { - prohibitedWords = (await this.metaService.fetch()).prohibitedWords; + prohibitedWords = this.meta.prohibitedWords; } if ( -- cgit v1.2.3-freya From 7134d24c1f25859e7e092f757ecd327469d75a8f Mon Sep 17 00:00:00 2001 From: KOBA789 Date: Thu, 26 Sep 2024 10:25:20 +0900 Subject: perf(backend): Defer instance metadata update (#14558) * Defer instance metadata update * Fix last new line * Fix typo * Add license notice * Fix syntax * Perform deferred jobs on shutdown * Fix missing async/await * Fix typo :) * Update collapsed-queue.ts --------- Co-authored-by: syuilo <4439005+syuilo@users.noreply.github.com> --- packages/backend/src/core/NoteCreateService.ts | 25 ++++++++-- packages/backend/src/misc/collapsed-queue.ts | 44 +++++++++++++++++ .../src/queue/processors/InboxProcessorService.ts | 55 ++++++++++++++++++---- 3 files changed, 111 insertions(+), 13 deletions(-) create mode 100644 packages/backend/src/misc/collapsed-queue.ts (limited to 'packages/backend/src/core/NoteCreateService.ts') diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index 18efc9d562..89e3eafa0e 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -55,6 +55,7 @@ import { UserBlockingService } from '@/core/UserBlockingService.js'; import { isReply } from '@/misc/is-reply.js'; import { trackPromise } from '@/misc/promise-tracker.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; +import { CollapsedQueue } from '@/misc/collapsed-queue.js'; type NotificationType = 'reply' | 'renote' | 'quote' | 'mention'; @@ -146,6 +147,7 @@ type Option = { @Injectable() export class NoteCreateService implements OnApplicationShutdown { #shutdownController = new AbortController(); + private updateNotesCountQueue: CollapsedQueue; constructor( @Inject(DI.config) @@ -215,7 +217,9 @@ export class NoteCreateService implements OnApplicationShutdown { private instanceChart: InstanceChart, private utilityService: UtilityService, private userBlockingService: UserBlockingService, - ) { } + ) { + this.updateNotesCountQueue = new CollapsedQueue(60 * 1000 * 5, this.collapseNotesCount, this.performUpdateNotesCount); + } @bindThis public async create(user: { @@ -509,7 +513,7 @@ export class NoteCreateService implements OnApplicationShutdown { // Register host if (this.userEntityService.isRemoteUser(user)) { this.federatedInstanceService.fetch(user.host).then(async i => { - this.instancesRepository.increment({ id: i.id }, 'notesCount', 1); + this.updateNotesCountQueue.enqueue(i.id, 1); if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.updateNote(i.host, note, true); } @@ -1028,12 +1032,23 @@ export class NoteCreateService implements OnApplicationShutdown { } @bindThis - public dispose(): void { + private collapseNotesCount(oldValue: number, newValue: number) { + return oldValue + newValue; + } + + @bindThis + private async performUpdateNotesCount(id: MiNote['id'], incrBy: number) { + await this.instancesRepository.increment({ id: id }, 'notesCount', incrBy); + } + + @bindThis + public async dispose(): Promise { this.#shutdownController.abort(); + await this.updateNotesCountQueue.performAllNow(); } @bindThis - public onApplicationShutdown(signal?: string | undefined): void { - this.dispose(); + public async onApplicationShutdown(signal?: string | undefined): Promise { + await this.dispose(); } } diff --git a/packages/backend/src/misc/collapsed-queue.ts b/packages/backend/src/misc/collapsed-queue.ts new file mode 100644 index 0000000000..5bc20a78ae --- /dev/null +++ b/packages/backend/src/misc/collapsed-queue.ts @@ -0,0 +1,44 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +type Job = { + value: V; + timer: NodeJS.Timeout; +}; + +// TODO: redis使えるようにする +export class CollapsedQueue { + private jobs: Map> = new Map(); + + constructor( + private timeout: number, + private collapse: (oldValue: V, newValue: V) => V, + private perform: (key: K, value: V) => Promise, + ) {} + + enqueue(key: K, value: V) { + if (this.jobs.has(key)) { + const old = this.jobs.get(key)!; + const merged = this.collapse(old.value, value); + this.jobs.set(key, { ...old, value: merged }); + } else { + const timer = setTimeout(() => { + const job = this.jobs.get(key)!; + this.jobs.delete(key); + this.perform(key, job.value); + }, this.timeout); + this.jobs.set(key, { value, timer }); + } + } + + async performAllNow() { + const entries = [...this.jobs.entries()]; + this.jobs.clear(); + for (const [_key, job] of entries) { + clearTimeout(job.timer); + } + await Promise.allSettled(entries.map(([key, job]) => this.perform(key, job.value))); + } +} diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 2df37bedf4..68999b5d17 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -4,7 +4,7 @@ */ import { URL } from 'node:url'; -import { Inject, Injectable } from '@nestjs/common'; +import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import httpSignature from '@peertube/http-signature'; import * as Bull from 'bullmq'; import type Logger from '@/logger.js'; @@ -25,14 +25,22 @@ import { JsonLdService } from '@/core/activitypub/JsonLdService.js'; import { ApInboxService } from '@/core/activitypub/ApInboxService.js'; import { bindThis } from '@/decorators.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; -import type { InboxJobData } from '../types.js'; +import { CollapsedQueue } from '@/misc/collapsed-queue.js'; +import { MiNote } from '@/models/Note.js'; import { MiMeta } from '@/models/Meta.js'; import { DI } from '@/di-symbols.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type { InboxJobData } from '../types.js'; + +type UpdateInstanceJob = { + latestRequestReceivedAt: Date, + shouldUnsuspend: boolean, +}; @Injectable() -export class InboxProcessorService { +export class InboxProcessorService implements OnApplicationShutdown { private logger: Logger; + private updateInstanceQueue: CollapsedQueue; constructor( @Inject(DI.meta) @@ -51,6 +59,7 @@ export class InboxProcessorService { private queueLoggerService: QueueLoggerService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('inbox'); + this.updateInstanceQueue = new CollapsedQueue(60 * 1000 * 5, this.collapseUpdateInstanceJobs, this.performUpdateInstance); } @bindThis @@ -187,11 +196,9 @@ export class InboxProcessorService { // Update stats this.federatedInstanceService.fetch(authUser.user.host).then(i => { - this.federatedInstanceService.update(i.id, { + this.updateInstanceQueue.enqueue(i.id, { latestRequestReceivedAt: new Date(), - isNotResponding: false, - // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる - suspensionState: i.suspensionState === 'autoSuspendedForNotResponding' ? 'none' : undefined, + shouldUnsuspend: i.suspensionState === 'autoSuspendedForNotResponding', }); this.fetchInstanceMetadataService.fetchInstanceMetadata(i); @@ -227,4 +234,36 @@ export class InboxProcessorService { } return 'ok'; } + + @bindThis + public collapseUpdateInstanceJobs(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) { + const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt + ? newJob.latestRequestReceivedAt + : oldJob.latestRequestReceivedAt; + const shouldUnsuspend = oldJob.shouldUnsuspend || newJob.shouldUnsuspend; + return { + latestRequestReceivedAt, + shouldUnsuspend, + }; + } + + @bindThis + public async performUpdateInstance(id: string, job: UpdateInstanceJob) { + await this.federatedInstanceService.update(id, { + latestRequestReceivedAt: new Date(), + isNotResponding: false, + // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる + suspensionState: job.shouldUnsuspend ? 'none' : undefined, + }); + } + + @bindThis + public async dispose(): Promise { + await this.updateInstanceQueue.performAllNow(); + } + + @bindThis + async onApplicationShutdown(signal?: string) { + await this.dispose(); + } } -- cgit v1.2.3-freya