diff options
| author | misskey-release-bot[bot] <157398866+misskey-release-bot[bot]@users.noreply.github.com> | 2024-09-29 11:42:24 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-09-29 11:42:24 +0000 |
| commit | 5fc8b3bc5018a2cb553f114a570cc33ef6831163 (patch) | |
| tree | 40edc874ae384548fd13e55fff6e317d1ef84fbb /packages/backend/src/queue | |
| parent | Merge pull request #14391 from misskey-dev/develop (diff) | |
| parent | Release: 2024.9.0 (diff) | |
| download | sharkey-5fc8b3bc5018a2cb553f114a570cc33ef6831163.tar.gz sharkey-5fc8b3bc5018a2cb553f114a570cc33ef6831163.tar.bz2 sharkey-5fc8b3bc5018a2cb553f114a570cc33ef6831163.zip | |
Merge pull request #14580 from misskey-dev/develop
Release: 2024.9.0
Diffstat (limited to 'packages/backend/src/queue')
15 files changed, 195 insertions, 39 deletions
diff --git a/packages/backend/src/queue/QueueProcessorModule.ts b/packages/backend/src/queue/QueueProcessorModule.ts index a1fd38fcc5..0027b5ef3d 100644 --- a/packages/backend/src/queue/QueueProcessorModule.ts +++ b/packages/backend/src/queue/QueueProcessorModule.ts @@ -14,6 +14,7 @@ import { InboxProcessorService } from './processors/InboxProcessorService.js'; import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js'; import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js'; import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js'; +import { BakeBufferedReactionsProcessorService } from './processors/BakeBufferedReactionsProcessorService.js'; import { CleanChartsProcessorService } from './processors/CleanChartsProcessorService.js'; import { CleanProcessorService } from './processors/CleanProcessorService.js'; import { CleanRemoteFilesProcessorService } from './processors/CleanRemoteFilesProcessorService.js'; @@ -51,6 +52,7 @@ import { RelationshipProcessorService } from './processors/RelationshipProcessor ResyncChartsProcessorService, CleanChartsProcessorService, CheckExpiredMutingsProcessorService, + BakeBufferedReactionsProcessorService, CleanProcessorService, DeleteDriveFilesProcessorService, ExportCustomEmojisProcessorService, diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index 7bd74f3210..e9e1c45224 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -39,6 +39,7 @@ import { TickChartsProcessorService } from './processors/TickChartsProcessorServ import { ResyncChartsProcessorService } from './processors/ResyncChartsProcessorService.js'; import { CleanChartsProcessorService } from './processors/CleanChartsProcessorService.js'; import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js'; +import { BakeBufferedReactionsProcessorService } from './processors/BakeBufferedReactionsProcessorService.js'; import { CleanProcessorService } from './processors/CleanProcessorService.js'; import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js'; import { QueueLoggerService } from './QueueLoggerService.js'; @@ -118,6 +119,7 @@ export class QueueProcessorService implements OnApplicationShutdown { private cleanChartsProcessorService: CleanChartsProcessorService, private aggregateRetentionProcessorService: AggregateRetentionProcessorService, private checkExpiredMutingsProcessorService: CheckExpiredMutingsProcessorService, + private bakeBufferedReactionsProcessorService: BakeBufferedReactionsProcessorService, private cleanProcessorService: CleanProcessorService, ) { this.logger = this.queueLoggerService.logger; @@ -147,6 +149,7 @@ export class QueueProcessorService implements OnApplicationShutdown { case 'cleanCharts': return this.cleanChartsProcessorService.process(); case 'aggregateRetention': return this.aggregateRetentionProcessorService.process(); case 'checkExpiredMutings': return this.checkExpiredMutingsProcessorService.process(); + case 'bakeBufferedReactions': return this.bakeBufferedReactionsProcessorService.process(); case 'clean': return this.cleanProcessorService.process(); default: throw new Error(`unrecognized job type ${job.name} for system`); } diff --git a/packages/backend/src/queue/processors/BakeBufferedReactionsProcessorService.ts b/packages/backend/src/queue/processors/BakeBufferedReactionsProcessorService.ts new file mode 100644 index 0000000000..d49c99f694 --- /dev/null +++ b/packages/backend/src/queue/processors/BakeBufferedReactionsProcessorService.ts @@ -0,0 +1,42 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Inject, Injectable } from '@nestjs/common'; +import type Logger from '@/logger.js'; +import { bindThis } from '@/decorators.js'; +import { ReactionsBufferingService } from '@/core/ReactionsBufferingService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; +import { MiMeta } from '@/models/_.js'; +import { DI } from '@/di-symbols.js'; + +@Injectable() +export class BakeBufferedReactionsProcessorService { + private logger: Logger; + + constructor( + @Inject(DI.meta) + private meta: MiMeta, + + private reactionsBufferingService: ReactionsBufferingService, + private queueLoggerService: QueueLoggerService, + ) { + this.logger = this.queueLoggerService.logger.createSubLogger('bake-buffered-reactions'); + } + + @bindThis + public async process(): Promise<void> { + if (!this.meta.enableReactionsBuffering) { + this.logger.info('Reactions buffering is disabled. Skipping...'); + return; + } + + this.logger.info('Baking buffered reactions...'); + + await this.reactionsBufferingService.bake(); + + this.logger.succ('All buffered reactions baked.'); + } +} diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts index 4076e9da90..9590a4fe71 100644 --- a/packages/backend/src/queue/processors/DeliverProcessorService.ts +++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts @@ -7,9 +7,8 @@ import { Inject, Injectable } from '@nestjs/common'; import * as Bull from 'bullmq'; import { Not } from 'typeorm'; import { DI } from '@/di-symbols.js'; -import type { InstancesRepository } from '@/models/_.js'; +import type { InstancesRepository, MiMeta } from '@/models/_.js'; import type Logger from '@/logger.js'; -import { MetaService } from '@/core/MetaService.js'; import { ApRequestService } from '@/core/activitypub/ApRequestService.js'; import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js'; @@ -31,10 +30,12 @@ export class DeliverProcessorService { private latest: string | null; constructor( + @Inject(DI.meta) + private meta: MiMeta, + @Inject(DI.instancesRepository) private instancesRepository: InstancesRepository, - private metaService: MetaService, private utilityService: UtilityService, private federatedInstanceService: FederatedInstanceService, private fetchInstanceMetadataService: FetchInstanceMetadataService, @@ -52,9 +53,7 @@ export class DeliverProcessorService { public async process(job: Bull.Job<DeliverJobData>): Promise<string> { const { host } = new URL(job.data.to); - // ブロックしてたら中断 - const meta = await this.metaService.fetch(); - if (this.utilityService.isBlockedHost(meta.blockedHosts, this.utilityService.toPuny(host))) { + if (!this.utilityService.isFederationAllowedUri(job.data.to)) { return 'skip (blocked)'; } @@ -88,7 +87,7 @@ export class DeliverProcessorService { this.apRequestChart.deliverSucc(); this.federationChart.deliverd(i.host, true); - if (meta.enableChartsForFederatedInstances) { + if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.requestSent(i.host, true); } }); @@ -120,7 +119,7 @@ export class DeliverProcessorService { this.apRequestChart.deliverFail(); this.federationChart.deliverd(i.host, false); - if (meta.enableChartsForFederatedInstances) { + if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.requestSent(i.host, false); } }); diff --git a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts index 88c4ea29c0..b3111865ad 100644 --- a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts @@ -14,6 +14,7 @@ import { DriveService } from '@/core/DriveService.js'; import { bindThis } from '@/decorators.js'; import { createTemp } from '@/misc/create-temp.js'; import { UtilityService } from '@/core/UtilityService.js'; +import { NotificationService } from '@/core/NotificationService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { DBExportAntennasData } from '../types.js'; import type * as Bull from 'bullmq'; @@ -35,6 +36,7 @@ export class ExportAntennasProcessorService { private driveService: DriveService, private utilityService: UtilityService, private queueLoggerService: QueueLoggerService, + private notificationService: NotificationService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('export-antennas'); } @@ -95,6 +97,11 @@ export class ExportAntennasProcessorService { const fileName = 'antennas-' + DateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json'; const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' }); this.logger.succ('Exported to: ' + driveFile.id); + + this.notificationService.createNotification(user.id, 'exportCompleted', { + exportedEntity: 'antenna', + fileId: driveFile.id, + }); } finally { cleanup(); } diff --git a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts index 6ec3c18786..ecc439db69 100644 --- a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts @@ -13,6 +13,7 @@ import type Logger from '@/logger.js'; import { DriveService } from '@/core/DriveService.js'; import { createTemp } from '@/misc/create-temp.js'; import { UtilityService } from '@/core/UtilityService.js'; +import { NotificationService } from '@/core/NotificationService.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; @@ -30,6 +31,7 @@ export class ExportBlockingProcessorService { private blockingsRepository: BlockingsRepository, private utilityService: UtilityService, + private notificationService: NotificationService, private driveService: DriveService, private queueLoggerService: QueueLoggerService, ) { @@ -109,6 +111,11 @@ export class ExportBlockingProcessorService { const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' }); this.logger.succ(`Exported to: ${driveFile.id}`); + + this.notificationService.createNotification(user.id, 'exportCompleted', { + exportedEntity: 'blocking', + fileId: driveFile.id, + }); } finally { cleanup(); } diff --git a/packages/backend/src/queue/processors/ExportClipsProcessorService.ts b/packages/backend/src/queue/processors/ExportClipsProcessorService.ts index 01eab26e96..583ddbb745 100644 --- a/packages/backend/src/queue/processors/ExportClipsProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportClipsProcessorService.ts @@ -19,6 +19,7 @@ import { bindThis } from '@/decorators.js'; import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js'; import { Packed } from '@/misc/json-schema.js'; import { IdService } from '@/core/IdService.js'; +import { NotificationService } from '@/core/NotificationService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbJobDataWithUser } from '../types.js'; @@ -43,6 +44,7 @@ export class ExportClipsProcessorService { private driveService: DriveService, private queueLoggerService: QueueLoggerService, private idService: IdService, + private notificationService: NotificationService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('export-clips'); } @@ -79,6 +81,11 @@ export class ExportClipsProcessorService { const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' }); this.logger.succ(`Exported to: ${driveFile.id}`); + + this.notificationService.createNotification(user.id, 'exportCompleted', { + exportedEntity: 'clip', + fileId: driveFile.id, + }); } finally { cleanup(); } diff --git a/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts index e4eb4791bd..e237cd4975 100644 --- a/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts @@ -16,6 +16,7 @@ import type Logger from '@/logger.js'; import { DriveService } from '@/core/DriveService.js'; import { createTemp, createTempDir } from '@/misc/create-temp.js'; import { DownloadService } from '@/core/DownloadService.js'; +import { NotificationService } from '@/core/NotificationService.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; @@ -37,6 +38,7 @@ export class ExportCustomEmojisProcessorService { private driveService: DriveService, private downloadService: DownloadService, private queueLoggerService: QueueLoggerService, + private notificationService: NotificationService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('export-custom-emojis'); } @@ -134,6 +136,12 @@ export class ExportCustomEmojisProcessorService { const driveFile = await this.driveService.addFile({ user, path: archivePath, name: fileName, force: true }); this.logger.succ(`Exported to: ${driveFile.id}`); + + this.notificationService.createNotification(user.id, 'exportCompleted', { + exportedEntity: 'customEmoji', + fileId: driveFile.id, + }); + cleanup(); archiveCleanup(); resolve(); diff --git a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts index 7bb626dd31..b81feece01 100644 --- a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts @@ -16,6 +16,7 @@ import type { MiPoll } from '@/models/Poll.js'; import type { MiNote } from '@/models/Note.js'; import { bindThis } from '@/decorators.js'; import { IdService } from '@/core/IdService.js'; +import { NotificationService } from '@/core/NotificationService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbJobDataWithUser } from '../types.js'; @@ -37,6 +38,7 @@ export class ExportFavoritesProcessorService { private driveService: DriveService, private queueLoggerService: QueueLoggerService, private idService: IdService, + private notificationService: NotificationService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('export-favorites'); } @@ -123,6 +125,11 @@ export class ExportFavoritesProcessorService { const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' }); this.logger.succ(`Exported to: ${driveFile.id}`); + + this.notificationService.createNotification(user.id, 'exportCompleted', { + exportedEntity: 'favorite', + fileId: driveFile.id, + }); } finally { cleanup(); } diff --git a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts index 1cc80e66d7..903f962515 100644 --- a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts @@ -14,6 +14,7 @@ import { DriveService } from '@/core/DriveService.js'; import { createTemp } from '@/misc/create-temp.js'; import type { MiFollowing } from '@/models/Following.js'; import { UtilityService } from '@/core/UtilityService.js'; +import { NotificationService } from '@/core/NotificationService.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; @@ -36,6 +37,7 @@ export class ExportFollowingProcessorService { private utilityService: UtilityService, private driveService: DriveService, private queueLoggerService: QueueLoggerService, + private notificationService: NotificationService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('export-following'); } @@ -113,6 +115,11 @@ export class ExportFollowingProcessorService { const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' }); this.logger.succ(`Exported to: ${driveFile.id}`); + + this.notificationService.createNotification(user.id, 'exportCompleted', { + exportedEntity: 'following', + fileId: driveFile.id, + }); } finally { cleanup(); } diff --git a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts index 243b74f2c2..f9867ade29 100644 --- a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts @@ -13,6 +13,7 @@ import type Logger from '@/logger.js'; import { DriveService } from '@/core/DriveService.js'; import { createTemp } from '@/misc/create-temp.js'; import { UtilityService } from '@/core/UtilityService.js'; +import { NotificationService } from '@/core/NotificationService.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; @@ -32,6 +33,7 @@ export class ExportMutingProcessorService { private utilityService: UtilityService, private driveService: DriveService, private queueLoggerService: QueueLoggerService, + private notificationService: NotificationService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('export-muting'); } @@ -110,6 +112,11 @@ export class ExportMutingProcessorService { const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' }); this.logger.succ(`Exported to: ${driveFile.id}`); + + this.notificationService.createNotification(user.id, 'exportCompleted', { + exportedEntity: 'muting', + fileId: driveFile.id, + }); } finally { cleanup(); } diff --git a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts index c7611012d7..9e2b678219 100644 --- a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts @@ -18,6 +18,7 @@ import { bindThis } from '@/decorators.js'; import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js'; import { Packed } from '@/misc/json-schema.js'; import { IdService } from '@/core/IdService.js'; +import { NotificationService } from '@/core/NotificationService.js'; import { JsonArrayStream } from '@/misc/JsonArrayStream.js'; import { FileWriterStream } from '@/misc/FileWriterStream.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; @@ -112,6 +113,7 @@ export class ExportNotesProcessorService { private queueLoggerService: QueueLoggerService, private driveFileEntityService: DriveFileEntityService, private idService: IdService, + private notificationService: NotificationService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('export-notes'); } @@ -150,6 +152,11 @@ export class ExportNotesProcessorService { const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' }); this.logger.succ(`Exported to: ${driveFile.id}`); + + this.notificationService.createNotification(user.id, 'exportCompleted', { + exportedEntity: 'note', + fileId: driveFile.id, + }); } finally { cleanup(); } diff --git a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts index ee87cff5d3..c483d79854 100644 --- a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts @@ -13,6 +13,7 @@ import type Logger from '@/logger.js'; import { DriveService } from '@/core/DriveService.js'; import { createTemp } from '@/misc/create-temp.js'; import { UtilityService } from '@/core/UtilityService.js'; +import { NotificationService } from '@/core/NotificationService.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; @@ -35,6 +36,7 @@ export class ExportUserListsProcessorService { private utilityService: UtilityService, private driveService: DriveService, private queueLoggerService: QueueLoggerService, + private notificationService: NotificationService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('export-user-lists'); } @@ -89,6 +91,11 @@ export class ExportUserListsProcessorService { const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' }); this.logger.succ(`Exported to: ${driveFile.id}`); + + this.notificationService.createNotification(user.id, 'exportCompleted', { + exportedEntity: 'userList', + fileId: driveFile.id, + }); } finally { cleanup(); } diff --git a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts index 171809d25c..9e1b8fee70 100644 --- a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts @@ -87,23 +87,30 @@ export class ImportCustomEmojisProcessorService { await this.emojisRepository.delete({ name: emojiInfo.name, }); - const driveFile = await this.driveService.addFile({ - user: null, - path: emojiPath, - name: record.fileName, - force: true, - }); - await this.customEmojiService.add({ - name: emojiInfo.name, - category: emojiInfo.category, - host: null, - aliases: emojiInfo.aliases, - driveFile, - license: emojiInfo.license, - isSensitive: emojiInfo.isSensitive, - localOnly: emojiInfo.localOnly, - roleIdsThatCanBeUsedThisEmojiAsReaction: [], - }); + try { + const driveFile = await this.driveService.addFile({ + user: null, + path: emojiPath, + name: record.fileName, + force: true, + }); + await this.customEmojiService.add({ + name: emojiInfo.name, + category: emojiInfo.category, + host: null, + aliases: emojiInfo.aliases, + driveFile, + license: emojiInfo.license, + isSensitive: emojiInfo.isSensitive, + localOnly: emojiInfo.localOnly, + roleIdsThatCanBeUsedThisEmojiAsReaction: [], + }); + } catch (e) { + if (e instanceof Error || typeof e === 'string') { + this.logger.error(`couldn't import ${emojiPath} for ${emojiInfo.name}: ${e}`); + } + continue; + } } cleanup(); diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index fa7009f8f5..09d51bec72 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -4,11 +4,10 @@ */ import { URL } from 'node:url'; -import { Injectable } from '@nestjs/common'; +import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import httpSignature from '@peertube/http-signature'; import * as Bull from 'bullmq'; import type Logger from '@/logger.js'; -import { MetaService } from '@/core/MetaService.js'; import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js'; import InstanceChart from '@/core/chart/charts/instance.js'; @@ -26,16 +25,28 @@ import { JsonLdService } from '@/core/activitypub/JsonLdService.js'; import { ApInboxService } from '@/core/activitypub/ApInboxService.js'; import { bindThis } from '@/decorators.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; +import { CollapsedQueue } from '@/misc/collapsed-queue.js'; +import { MiNote } from '@/models/Note.js'; +import { MiMeta } from '@/models/Meta.js'; +import { DI } from '@/di-symbols.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { InboxJobData } from '../types.js'; +type UpdateInstanceJob = { + latestRequestReceivedAt: Date, + shouldUnsuspend: boolean, +}; + @Injectable() -export class InboxProcessorService { +export class InboxProcessorService implements OnApplicationShutdown { private logger: Logger; + private updateInstanceQueue: CollapsedQueue<MiNote['id'], UpdateInstanceJob>; constructor( + @Inject(DI.meta) + private meta: MiMeta, + private utilityService: UtilityService, - private metaService: MetaService, private apInboxService: ApInboxService, private federatedInstanceService: FederatedInstanceService, private fetchInstanceMetadataService: FetchInstanceMetadataService, @@ -48,6 +59,7 @@ export class InboxProcessorService { private queueLoggerService: QueueLoggerService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('inbox'); + this.updateInstanceQueue = new CollapsedQueue(60 * 1000 * 5, this.collapseUpdateInstanceJobs, this.performUpdateInstance); } @bindThis @@ -63,9 +75,7 @@ export class InboxProcessorService { const host = this.utilityService.toPuny(new URL(signature.keyId).hostname); - // ブロックしてたら中断 - const meta = await this.metaService.fetch(); - if (this.utilityService.isBlockedHost(meta.blockedHosts, host)) { + if (!this.utilityService.isFederationAllowedHost(host)) { return `Blocked request: ${host}`; } @@ -164,9 +174,8 @@ export class InboxProcessorService { throw new Bull.UnrecoverableError(`skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`); } - // ブロックしてたら中断 const ldHost = this.utilityService.extractDbHost(authUser.user.uri); - if (this.utilityService.isBlockedHost(meta.blockedHosts, ldHost)) { + if (!this.utilityService.isFederationAllowedHost(ldHost)) { throw new Bull.UnrecoverableError(`Blocked request: ${ldHost}`); } } else { @@ -185,11 +194,9 @@ export class InboxProcessorService { // Update stats this.federatedInstanceService.fetch(authUser.user.host).then(i => { - this.federatedInstanceService.update(i.id, { + this.updateInstanceQueue.enqueue(i.id, { latestRequestReceivedAt: new Date(), - isNotResponding: false, - // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる - suspensionState: i.suspensionState === 'autoSuspendedForNotResponding' ? 'none' : undefined, + shouldUnsuspend: i.suspensionState === 'autoSuspendedForNotResponding', }); this.fetchInstanceMetadataService.fetchInstanceMetadata(i); @@ -197,7 +204,7 @@ export class InboxProcessorService { this.apRequestChart.inbox(); this.federationChart.inbox(i.host); - if (meta.enableChartsForFederatedInstances) { + if (this.meta.enableChartsForFederatedInstances) { this.instanceChart.requestReceived(i.host); } }); @@ -225,4 +232,36 @@ export class InboxProcessorService { } return 'ok'; } + + @bindThis + public collapseUpdateInstanceJobs(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) { + const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt + ? newJob.latestRequestReceivedAt + : oldJob.latestRequestReceivedAt; + const shouldUnsuspend = oldJob.shouldUnsuspend || newJob.shouldUnsuspend; + return { + latestRequestReceivedAt, + shouldUnsuspend, + }; + } + + @bindThis + public async performUpdateInstance(id: string, job: UpdateInstanceJob) { + await this.federatedInstanceService.update(id, { + latestRequestReceivedAt: new Date(), + isNotResponding: false, + // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる + suspensionState: job.shouldUnsuspend ? 'none' : undefined, + }); + } + + @bindThis + public async dispose(): Promise<void> { + await this.updateInstanceQueue.performAllNow(); + } + + @bindThis + async onApplicationShutdown(signal?: string) { + await this.dispose(); + } } |