diff options
Diffstat (limited to 'packages/backend/src/queue/processors/InboxProcessorService.ts')
| -rw-r--r-- | packages/backend/src/queue/processors/InboxProcessorService.ts | 57 |
1 files changed, 57 insertions, 0 deletions
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 7727a3e985..35a0bf095d 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -29,6 +29,9 @@ import { CollapsedQueue } from '@/misc/collapsed-queue.js'; import { MiNote } from '@/models/Note.js'; import { MiMeta } from '@/models/Meta.js'; import { DI } from '@/di-symbols.js'; +import { SkApInboxLog } from '@/models/_.js'; +import type { Config } from '@/config.js'; +import { ApLogService, calculateDurationSince } from '@/core/ApLogService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { InboxJobData } from '../types.js'; @@ -46,6 +49,9 @@ export class InboxProcessorService implements OnApplicationShutdown { @Inject(DI.meta) private meta: MiMeta, + @Inject(DI.config) + private config: Config, + private utilityService: UtilityService, private apInboxService: ApInboxService, private federatedInstanceService: FederatedInstanceService, @@ -57,6 +63,7 @@ export class InboxProcessorService implements OnApplicationShutdown { private apRequestChart: ApRequestChart, private federationChart: FederationChart, private queueLoggerService: QueueLoggerService, + private readonly apLogService: ApLogService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('inbox'); this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance); @@ -64,6 +71,41 @@ export class InboxProcessorService implements OnApplicationShutdown { @bindThis public async process(job: Bull.Job<InboxJobData>): Promise<string> { + if (this.config.activityLogging.enabled) { + return await this._processLogged(job); + } else { + return await this._process(job); + } + } + + private async _processLogged(job: Bull.Job<InboxJobData>): Promise<string> { + const startTime = process.hrtime.bigint(); + const activity = job.data.activity; + const keyId = job.data.signature.keyId; + const log = await this.apLogService.createInboxLog({ activity, keyId }); + + try { + const result = await this._process(job, log); + + log.accepted = result.startsWith('ok'); + log.result = result; + + return result; + } catch (err) { + log.accepted = false; + log.result = String(err); + + throw err; + } finally { + log.duration = calculateDurationSince(startTime); + + // Save or finalize asynchronously + this.apLogService.saveInboxLog(log) + .catch(err => this.logger.error('Failed to record AP activity:', err)); + } + } + + private async _process(job: Bull.Job<InboxJobData>, log?: SkApInboxLog): Promise<string> { const signature = job.data.signature; // HTTP-signature let activity = job.data.activity; @@ -197,6 +239,13 @@ export class InboxProcessorService implements OnApplicationShutdown { delete activity.id; } + // Record verified user in log + if (log) { + log.verified = true; + log.authUser = authUser.user; + log.authUserId = authUser.user.id; + } + this.apRequestChart.inbox(); this.federationChart.inbox(authUser.user.host); @@ -248,6 +297,14 @@ export class InboxProcessorService implements OnApplicationShutdown { return `skip: permanent error ${e.statusCode}`; } + if (e instanceof IdentifiableError && !e.isRetryable) { + if (e.message) { + return `skip: permanent error ${e.id}: ${e.message}`; + } else { + return `skip: permanent error ${e.id}`; + } + } + throw e; } return 'ok'; |