From f18d402ce60df24ff3eb920bf30360c1ccf8122b Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Sun, 16 Feb 2025 14:12:13 -0500 Subject: fix typo in activity signature mismatch error --- packages/backend/src/queue/processors/InboxProcessorService.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'packages/backend/src/queue') diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 35a0bf095d..e128108779 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -232,7 +232,7 @@ export class InboxProcessorService implements OnApplicationShutdown { const signerHost = this.utilityService.extractDbHost(authUser.user.uri!); const activityIdHost = this.utilityService.extractDbHost(activity.id); if (signerHost !== activityIdHost) { - throw new Bull.UnrecoverableError(`skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`); + throw new Bull.UnrecoverableError(`skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost})`); } } else { // Activity ID should only be string or undefined. -- cgit v1.2.3-freya From f2bb01f7da62159d1e9be828d9b5840c1677fce9 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Sun, 16 Feb 2025 14:23:55 -0500 Subject: support Announce(Activity) activities --- .../backend/src/core/activitypub/ApInboxService.ts | 85 ++++++++++++++++++++-- .../src/queue/processors/InboxProcessorService.ts | 6 +- 2 files changed, 84 insertions(+), 7 deletions(-) (limited to 'packages/backend/src/queue') diff --git a/packages/backend/src/core/activitypub/ApInboxService.ts b/packages/backend/src/core/activitypub/ApInboxService.ts index 1eef85aeef..be1de3eab0 100644 --- a/packages/backend/src/core/activitypub/ApInboxService.ts +++ b/packages/backend/src/core/activitypub/ApInboxService.ts @@ -3,7 +3,7 @@ * SPDX-License-Identifier: AGPL-3.0-only */ -import { Inject, Injectable } from '@nestjs/common'; +import { forwardRef, Inject, Injectable } from '@nestjs/common'; import { In } from 'typeorm'; import * as Bull from 'bullmq'; import { DI } from '@/di-symbols.js'; @@ -32,7 +32,11 @@ import { AbuseReportService } from '@/core/AbuseReportService.js'; import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; import { fromTuple } from '@/misc/from-tuple.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; -import { getApHrefNullable, getApId, getApIds, getApType, getNullableApId, isAccept, isActor, isAdd, isAnnounce, isApObject, isBlock, isCollection, isCollectionOrOrderedCollection, isCreate, isDelete, isFlag, isFollow, isLike, isDislike, isMove, isPost, isReject, isRemove, isTombstone, isUndo, isUpdate, validActor, validPost } from './type.js'; +import InstanceChart from '@/core/chart/charts/instance.js'; +import FederationChart from '@/core/chart/charts/federation.js'; +import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js'; +import { InboxProcessorService } from '@/queue/processors/InboxProcessorService.js'; +import { getApHrefNullable, getApId, getApIds, getApType, getNullableApId, isAccept, isActor, isAdd, isAnnounce, isApObject, isBlock, isCollection, isCollectionOrOrderedCollection, isCreate, isDelete, isFlag, isFollow, isLike, isDislike, isMove, isPost, isReject, isRemove, isTombstone, isUndo, isUpdate, validActor, validPost, isActivity } from './type.js'; import { ApNoteService } from './models/ApNoteService.js'; import { ApLoggerService } from './ApLoggerService.js'; import { ApDbResolverService } from './ApDbResolverService.js'; @@ -41,7 +45,7 @@ import { ApAudienceService } from './ApAudienceService.js'; import { ApPersonService } from './models/ApPersonService.js'; import { ApQuestionService } from './models/ApQuestionService.js'; import type { Resolver } from './ApResolverService.js'; -import type { IAccept, IAdd, IAnnounce, IBlock, ICreate, IDelete, IFlag, IFollow, ILike, IDislike, IObject, IReject, IRemove, IUndo, IUpdate, IMove, IPost } from './type.js'; +import type { IAccept, IAdd, IAnnounce, IBlock, ICreate, IDelete, IFlag, IFollow, ILike, IDislike, IObject, IReject, IRemove, IUndo, IUpdate, IMove, IPost, IActivity } from './type.js'; @Injectable() export class ApInboxService { @@ -88,7 +92,13 @@ export class ApInboxService { private apQuestionService: ApQuestionService, private queueService: QueueService, private globalEventService: GlobalEventService, - private federatedInstanceService: FederatedInstanceService, + private readonly federatedInstanceService: FederatedInstanceService, + private readonly fetchInstanceMetadataService: FetchInstanceMetadataService, + private readonly instanceChart: InstanceChart, + private readonly federationChart: FederationChart, + + @Inject(forwardRef(() => InboxProcessorService)) + private readonly inboxProcessorService: InboxProcessorService, ) { this.logger = this.apLoggerService.logger; } @@ -310,12 +320,15 @@ export class ApInboxService { const targetUri = getApId(activityObject); if (targetUri.startsWith('bear:')) return 'skip: bearcaps url not supported.'; - const target = await resolver.resolve(activityObject).catch(e => { + // Force a fetch by passing URL only, since the target object must be trusted for announceActivity. + // We cannot just re-fetch or the resolver will throw a recursion error. + const target = await resolver.resolve(targetUri).catch(e => { this.logger.error(`Resolution failed: ${e}`); throw e; }); if (isPost(target)) return await this.announceNote(actor, activity, target); + if (isActivity(target)) return await this.announceActivity(activity, target, resolver); return `skip: unknown object type ${getApType(target)}`; } @@ -383,6 +396,68 @@ export class ApInboxService { } } + private async announceActivity(announce: IAnnounce, activity: IActivity, resolver: Resolver): Promise { + // Shouldn't happen, but just in case + if (!activity.id) { + throw new Bull.UnrecoverableError(`Cannot announce an activity with no ID: ${announce.id}`); + } + + // Since this is a new activity, we need to get a new actor. + const actorId = getApId(activity.actor); + const actor = await this.apPersonService.resolvePerson(actorId, resolver); + + // Ignore announce of our own activities + // 1. No URI/host on an MiUser == local user + // 2. Local URI on activity == local activity + if (!actor.uri || !actor.host || this.utilityService.isUriLocal(activity.id)) { + throw new Bull.UnrecoverableError(`Cannot announce a local activity: ${activity.id} (from ${announce.id})`); + } + + // Make sure that actor matches activity host. + // Activity host is already verified by resolver when fetching the activity, so that is the source of truth. + const actorHost = this.utilityService.punyHostPSLDomain(actor.uri); + const activityHost = this.utilityService.punyHostPSLDomain(activity.id); + if (actorHost !== activityHost) { + throw new Bull.UnrecoverableError(`Actor host ${actorHost} does not activity host ${activityHost} in activity ${activity.id} (from ${announce.id})`); + } + + // Update stats (adapted from InboxProcessorService) + this.federationChart.inbox(actor.host).then(); + process.nextTick(async () => { + const i = await (this.meta.enableStatsForFederatedInstances + ? this.federatedInstanceService.fetchOrRegister(actor.host) + : this.federatedInstanceService.fetch(actor.host)); + + if (i == null) return; + + this.inboxProcessorService.updateInstanceQueue.enqueue(i.id, { + latestRequestReceivedAt: new Date(), + shouldUnsuspend: i.suspensionState === 'autoSuspendedForNotResponding', + }); + + if (this.meta.enableChartsForFederatedInstances) { + this.instanceChart.requestReceived(i.host).then(); + } + + this.fetchInstanceMetadataService.fetchInstanceMetadata(i).then(); + }); + + // Process it! + return await this.performOneActivity(actor, activity, resolver) + .finally(() => { + // Update user (adapted from performActivity) + if (actor.lastFetchedAt == null || Date.now() - actor.lastFetchedAt.getTime() > 1000 * 60 * 60 * 24) { + setImmediate(() => { + // Don't re-use the resolver, or it may throw recursion errors. + // Instead, create a new resolver with an appropriately-reduced recursion limit. + this.apPersonService.updatePerson(actor.uri, this.apResolverService.createResolver({ + recursionLimit: resolver.getRecursionLimit() - resolver.getHistory().length, + })); + }); + } + }); + } + @bindThis private async block(actor: MiRemoteUser, activity: IBlock): Promise { // ※ activity.objectにブロック対象があり、それは存在するローカルユーザーのはず diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index e128108779..839c8c183a 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -4,7 +4,7 @@ */ import { URL } from 'node:url'; -import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; +import { forwardRef, Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import httpSignature from '@peertube/http-signature'; import * as Bull from 'bullmq'; import type Logger from '@/logger.js'; @@ -43,7 +43,7 @@ type UpdateInstanceJob = { @Injectable() export class InboxProcessorService implements OnApplicationShutdown { private logger: Logger; - private updateInstanceQueue: CollapsedQueue; + public readonly updateInstanceQueue: CollapsedQueue; constructor( @Inject(DI.meta) @@ -53,6 +53,8 @@ export class InboxProcessorService implements OnApplicationShutdown { private config: Config, private utilityService: UtilityService, + + @Inject(forwardRef(() => ApInboxService)) private apInboxService: ApInboxService, private federatedInstanceService: FederatedInstanceService, private fetchInstanceMetadataService: FetchInstanceMetadataService, -- cgit v1.2.3-freya From 1ed2f207f7b711972bb57b20cb6653270dc77c97 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Sun, 16 Feb 2025 19:13:08 -0500 Subject: fix startup crash caused by circular reference (SWC is not compatible with forwardRef) --- packages/backend/src/core/CoreModule.ts | 6 +++ packages/backend/src/core/UpdateInstanceQueue.ts | 52 ++++++++++++++++++++++ .../backend/src/core/activitypub/ApInboxService.ts | 10 ++--- .../src/queue/processors/InboxProcessorService.ts | 18 ++++---- 4 files changed, 70 insertions(+), 16 deletions(-) create mode 100644 packages/backend/src/core/UpdateInstanceQueue.ts (limited to 'packages/backend/src/queue') diff --git a/packages/backend/src/core/CoreModule.ts b/packages/backend/src/core/CoreModule.ts index 3c35dfc4ff..997d81facc 100644 --- a/packages/backend/src/core/CoreModule.ts +++ b/packages/backend/src/core/CoreModule.ts @@ -19,6 +19,7 @@ import { TimeService } from '@/core/TimeService.js'; import { EnvService } from '@/core/EnvService.js'; import { ApUtilityService } from '@/core/activitypub/ApUtilityService.js'; import { ApLogService } from '@/core/ApLogService.js'; +import { UpdateInstanceQueue } from '@/core/UpdateInstanceQueue.js'; import { AccountMoveService } from './AccountMoveService.js'; import { AccountUpdateService } from './AccountUpdateService.js'; import { AnnouncementService } from './AnnouncementService.js'; @@ -220,6 +221,7 @@ const $UserRenoteMutingService: Provider = { provide: 'UserRenoteMutingService', const $UserSearchService: Provider = { provide: 'UserSearchService', useExisting: UserSearchService }; const $UserSuspendService: Provider = { provide: 'UserSuspendService', useExisting: UserSuspendService }; const $UserAuthService: Provider = { provide: 'UserAuthService', useExisting: UserAuthService }; +const $UpdateInstanceQueue: Provider = { provide: 'UpdateInstanceQueue', useExisting: UpdateInstanceQueue }; const $VideoProcessingService: Provider = { provide: 'VideoProcessingService', useExisting: VideoProcessingService }; const $UserWebhookService: Provider = { provide: 'UserWebhookService', useExisting: UserWebhookService }; const $SystemWebhookService: Provider = { provide: 'SystemWebhookService', useExisting: SystemWebhookService }; @@ -378,6 +380,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp UserSearchService, UserSuspendService, UserAuthService, + UpdateInstanceQueue, VideoProcessingService, UserWebhookService, SystemWebhookService, @@ -532,6 +535,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp $UserSearchService, $UserSuspendService, $UserAuthService, + $UpdateInstanceQueue, $VideoProcessingService, $UserWebhookService, $SystemWebhookService, @@ -687,6 +691,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp UserSearchService, UserSuspendService, UserAuthService, + UpdateInstanceQueue, VideoProcessingService, UserWebhookService, SystemWebhookService, @@ -840,6 +845,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp $UserSearchService, $UserSuspendService, $UserAuthService, + $UpdateInstanceQueue, $VideoProcessingService, $UserWebhookService, $SystemWebhookService, diff --git a/packages/backend/src/core/UpdateInstanceQueue.ts b/packages/backend/src/core/UpdateInstanceQueue.ts new file mode 100644 index 0000000000..3fcd215ffa --- /dev/null +++ b/packages/backend/src/core/UpdateInstanceQueue.ts @@ -0,0 +1,52 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Injectable, OnApplicationShutdown } from '@nestjs/common'; +import { CollapsedQueue } from '@/misc/collapsed-queue.js'; +import { bindThis } from '@/decorators.js'; +import { MiNote } from '@/models/Note.js'; +import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; + +type UpdateInstanceJob = { + latestRequestReceivedAt: Date, + shouldUnsuspend: boolean, +}; + +// Moved from InboxProcessorService to allow access from ApInboxService +@Injectable() +export class UpdateInstanceQueue extends CollapsedQueue implements OnApplicationShutdown { + constructor( + private readonly federatedInstanceService: FederatedInstanceService, + ) { + super(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, (id, job) => this.collapseUpdateInstanceJobs(id, job), (id, job) => this.performUpdateInstance(id, job)); + } + + @bindThis + private collapseUpdateInstanceJobs(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) { + const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt + ? newJob.latestRequestReceivedAt + : oldJob.latestRequestReceivedAt; + const shouldUnsuspend = oldJob.shouldUnsuspend || newJob.shouldUnsuspend; + return { + latestRequestReceivedAt, + shouldUnsuspend, + }; + } + + @bindThis + private async performUpdateInstance(id: string, job: UpdateInstanceJob) { + await this.federatedInstanceService.update(id, { + latestRequestReceivedAt: new Date(), + isNotResponding: false, + // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる + suspensionState: job.shouldUnsuspend ? 'none' : undefined, + }); + } + + @bindThis + async onApplicationShutdown() { + await this.performAllNow(); + } +} diff --git a/packages/backend/src/core/activitypub/ApInboxService.ts b/packages/backend/src/core/activitypub/ApInboxService.ts index be1de3eab0..6e1359cbdc 100644 --- a/packages/backend/src/core/activitypub/ApInboxService.ts +++ b/packages/backend/src/core/activitypub/ApInboxService.ts @@ -3,7 +3,7 @@ * SPDX-License-Identifier: AGPL-3.0-only */ -import { forwardRef, Inject, Injectable } from '@nestjs/common'; +import { Inject, Injectable } from '@nestjs/common'; import { In } from 'typeorm'; import * as Bull from 'bullmq'; import { DI } from '@/di-symbols.js'; @@ -35,7 +35,7 @@ import { IdentifiableError } from '@/misc/identifiable-error.js'; import InstanceChart from '@/core/chart/charts/instance.js'; import FederationChart from '@/core/chart/charts/federation.js'; import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js'; -import { InboxProcessorService } from '@/queue/processors/InboxProcessorService.js'; +import { UpdateInstanceQueue } from '@/core/UpdateInstanceQueue.js'; import { getApHrefNullable, getApId, getApIds, getApType, getNullableApId, isAccept, isActor, isAdd, isAnnounce, isApObject, isBlock, isCollection, isCollectionOrOrderedCollection, isCreate, isDelete, isFlag, isFollow, isLike, isDislike, isMove, isPost, isReject, isRemove, isTombstone, isUndo, isUpdate, validActor, validPost, isActivity } from './type.js'; import { ApNoteService } from './models/ApNoteService.js'; import { ApLoggerService } from './ApLoggerService.js'; @@ -96,9 +96,7 @@ export class ApInboxService { private readonly fetchInstanceMetadataService: FetchInstanceMetadataService, private readonly instanceChart: InstanceChart, private readonly federationChart: FederationChart, - - @Inject(forwardRef(() => InboxProcessorService)) - private readonly inboxProcessorService: InboxProcessorService, + private readonly updateInstanceQueue: UpdateInstanceQueue, ) { this.logger = this.apLoggerService.logger; } @@ -430,7 +428,7 @@ export class ApInboxService { if (i == null) return; - this.inboxProcessorService.updateInstanceQueue.enqueue(i.id, { + this.updateInstanceQueue.enqueue(i.id, { latestRequestReceivedAt: new Date(), shouldUnsuspend: i.suspensionState === 'autoSuspendedForNotResponding', }); diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 839c8c183a..fc7c66591a 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -4,7 +4,7 @@ */ import { URL } from 'node:url'; -import { forwardRef, Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; +import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import httpSignature from '@peertube/http-signature'; import * as Bull from 'bullmq'; import type Logger from '@/logger.js'; @@ -25,13 +25,14 @@ import { JsonLdService } from '@/core/activitypub/JsonLdService.js'; import { ApInboxService } from '@/core/activitypub/ApInboxService.js'; import { bindThis } from '@/decorators.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; -import { CollapsedQueue } from '@/misc/collapsed-queue.js'; -import { MiNote } from '@/models/Note.js'; +//import { CollapsedQueue } from '@/misc/collapsed-queue.js'; +//import { MiNote } from '@/models/Note.js'; import { MiMeta } from '@/models/Meta.js'; import { DI } from '@/di-symbols.js'; import { SkApInboxLog } from '@/models/_.js'; import type { Config } from '@/config.js'; import { ApLogService, calculateDurationSince } from '@/core/ApLogService.js'; +import { UpdateInstanceQueue } from '@/core/UpdateInstanceQueue.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { InboxJobData } from '../types.js'; @@ -43,7 +44,7 @@ type UpdateInstanceJob = { @Injectable() export class InboxProcessorService implements OnApplicationShutdown { private logger: Logger; - public readonly updateInstanceQueue: CollapsedQueue; + //private updateInstanceQueue: CollapsedQueue; constructor( @Inject(DI.meta) @@ -53,8 +54,6 @@ export class InboxProcessorService implements OnApplicationShutdown { private config: Config, private utilityService: UtilityService, - - @Inject(forwardRef(() => ApInboxService)) private apInboxService: ApInboxService, private federatedInstanceService: FederatedInstanceService, private fetchInstanceMetadataService: FetchInstanceMetadataService, @@ -66,9 +65,10 @@ export class InboxProcessorService implements OnApplicationShutdown { private federationChart: FederationChart, private queueLoggerService: QueueLoggerService, private readonly apLogService: ApLogService, + private readonly updateInstanceQueue: UpdateInstanceQueue, ) { this.logger = this.queueLoggerService.logger.createSubLogger('inbox'); - this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance); + //this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance); } @bindThis @@ -335,9 +335,7 @@ export class InboxProcessorService implements OnApplicationShutdown { } @bindThis - public async dispose(): Promise { - await this.updateInstanceQueue.performAllNow(); - } + public async dispose(): Promise {} @bindThis async onApplicationShutdown(signal?: string) { -- cgit v1.2.3-freya