diff options
| author | Hazelnoot <acomputerdog@gmail.com> | 2025-01-30 22:36:19 -0500 |
|---|---|---|
| committer | Hazelnoot <acomputerdog@gmail.com> | 2025-02-16 19:25:22 -0500 |
| commit | 81944b3bdf49cf95294adcefc265a568b921dee0 (patch) | |
| tree | 3693a235357d9d9576b694128e03a065d36921b9 /packages/backend/src/queue/processors/InboxProcessorService.ts | |
| parent | rename activity_log and activity_context to ap_inbox_log and ap_context (diff) | |
| download | sharkey-81944b3bdf49cf95294adcefc265a568b921dee0.tar.gz sharkey-81944b3bdf49cf95294adcefc265a568b921dee0.tar.bz2 sharkey-81944b3bdf49cf95294adcefc265a568b921dee0.zip | |
implement AP fetch logs
Diffstat (limited to 'packages/backend/src/queue/processors/InboxProcessorService.ts')
| -rw-r--r-- | packages/backend/src/queue/processors/InboxProcessorService.ts | 82 |
1 files changed, 10 insertions, 72 deletions
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 4182f3e090..557a759136 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -4,7 +4,6 @@ */ import { URL } from 'node:url'; -import { createHash } from 'crypto'; import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import httpSignature from '@peertube/http-signature'; import * as Bull from 'bullmq'; @@ -30,11 +29,9 @@ import { CollapsedQueue } from '@/misc/collapsed-queue.js'; import { MiNote } from '@/models/Note.js'; import { MiMeta } from '@/models/Meta.js'; import { DI } from '@/di-symbols.js'; -import { IdService } from '@/core/IdService.js'; -import { JsonValue } from '@/misc/json-value.js'; -import { SkApInboxLog, SkApContext } from '@/models/_.js'; -import type { ApInboxLogsRepository, ApContextsRepository } from '@/models/_.js'; +import { SkApInboxLog } from '@/models/_.js'; import type { Config } from '@/config.js'; +import { ApLogService, calculateDurationSince } from '@/core/ApLogService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { InboxJobData } from '../types.js'; @@ -66,13 +63,7 @@ export class InboxProcessorService implements OnApplicationShutdown { private apRequestChart: ApRequestChart, private federationChart: FederationChart, private queueLoggerService: QueueLoggerService, - private idService: IdService, - - @Inject(DI.apContextsRepository) - private apContextsRepository: ApContextsRepository, - - @Inject(DI.apInboxLogsRepository) - private apInboxLogsRepository: ApInboxLogsRepository, + private readonly apLogService: ApLogService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('inbox'); this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance); @@ -89,14 +80,9 @@ export class InboxProcessorService implements OnApplicationShutdown { private async _processLogged(job: Bull.Job<InboxJobData>): Promise<string> { const startTime = process.hrtime.bigint(); - const payload = job.data.activity; + const activity = job.data.activity; const keyId = job.data.signature.keyId; - const log = this.createLog(payload, keyId); - - // Pre-save the activity in case it leads to a hard-crash. - if (this.config.activityLogging.preSave) { - await this.recordLog(log); - } + const log = await this.apLogService.createInboxLog({ activity, keyId }); try { const result = await this._process(job, log); @@ -111,24 +97,18 @@ export class InboxProcessorService implements OnApplicationShutdown { throw err; } finally { - // Calculate the activity processing time with correct rounding and decimals. - // 1. Truncate nanoseconds to microseconds - // 2. Scale to 1/10 millisecond ticks. - // 3. Round to nearest tick. - // 4. Sale to milliseconds - // Example: 123,456,789 ns -> 123,456 us -> 12,345.6 ticks -> 12,346 ticks -> 123.46 ms - const endTime = process.hrtime.bigint(); - const duration = Math.round(Number((endTime - startTime) / 1000n) / 10) / 100; - log.duration = duration; + const duration = log.duration = calculateDurationSince(startTime); + // TODO remove this // Activities should time out after roughly 5 seconds. // A runtime longer than 10 seconds could indicate a problem or attack. if (duration > 10000) { - this.logger.warn(`Activity ${JSON.stringify(payload.id)} by "${keyId}" took ${(duration / 1000).toFixed(1)} seconds to complete`); + this.logger.warn(`Activity ${JSON.stringify(activity.id)} by "${keyId}" took ${(duration / 1000).toFixed(1)} seconds to complete`); } // Save or finalize asynchronously - this.recordLog(log).catch(err => this.logger.error('Failed to record AP activity:', err)); + this.apLogService.saveInboxLog(log) + .catch(err => this.logger.error('Failed to record AP activity:', err)); } } @@ -368,46 +348,4 @@ export class InboxProcessorService implements OnApplicationShutdown { async onApplicationShutdown(signal?: string) { await this.dispose(); } - - private createLog(payload: IActivity, keyId: string): SkApInboxLog { - const activity = Object.assign({}, payload, { '@context': undefined }) as unknown as JsonValue; - const host = this.utilityService.extractDbHost(keyId); - - const log = new SkApInboxLog({ - id: this.idService.gen(), - at: new Date(), - verified: false, - accepted: false, - activity, - keyId, - host, - }); - - const context = payload['@context']; - if (context) { - const md5 = createHash('md5').update(JSON.stringify(context)).digest('base64'); - log.contextHash = md5; - log.context = new SkApContext({ - md5, - json: context, - }); - } - - return log; - } - - private async recordLog(log: SkApInboxLog): Promise<void> { - if (log.context) { - // https://stackoverflow.com/a/47064558 - await this.apContextsRepository - .createQueryBuilder('activity_context') - .insert() - .into(SkApContext) - .values(log.context) - .orIgnore('md5') - .execute(); - } - - await this.apInboxLogsRepository.upsert(log, ['id']); - } } |