diff options
Diffstat (limited to 'packages/backend/src/queue/processors')
| -rw-r--r-- | packages/backend/src/queue/processors/InboxProcessorService.ts | 102 |
1 files changed, 102 insertions, 0 deletions
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 87d4bf52fa..d40104ee9b 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -4,6 +4,7 @@ */ import { URL } from 'node:url'; +import { createHash } from 'crypto'; import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import httpSignature from '@peertube/http-signature'; import * as Bull from 'bullmq'; @@ -29,6 +30,11 @@ import { CollapsedQueue } from '@/misc/collapsed-queue.js'; import { MiNote } from '@/models/Note.js'; import { MiMeta } from '@/models/Meta.js'; import { DI } from '@/di-symbols.js'; +import { IdService } from '@/core/IdService.js'; +import { JsonValue } from '@/misc/json-value.js'; +import { SkActivityLog, SkActivityContext } from '@/models/_.js'; +import type { ActivityLogsRepository, ActivityContextRepository } from '@/models/_.js'; +import type { Config } from '@/config.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { InboxJobData } from '../types.js'; @@ -46,6 +52,9 @@ export class InboxProcessorService implements OnApplicationShutdown { @Inject(DI.meta) private meta: MiMeta, + @Inject(DI.config) + private config: Config, + private utilityService: UtilityService, private apInboxService: ApInboxService, private federatedInstanceService: FederatedInstanceService, @@ -57,6 +66,13 @@ export class InboxProcessorService implements OnApplicationShutdown { private apRequestChart: ApRequestChart, private federationChart: FederationChart, private queueLoggerService: QueueLoggerService, + private idService: IdService, + + @Inject(DI.activityContextRepository) + private activityContextRepository: ActivityContextRepository, + + @Inject(DI.activityLogsRepository) + private activityLogsRepository: ActivityLogsRepository, ) { this.logger = this.queueLoggerService.logger.createSubLogger('inbox'); this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance); @@ -64,6 +80,42 @@ export class InboxProcessorService implements OnApplicationShutdown { @bindThis public async process(job: Bull.Job<InboxJobData>): Promise<string> { + if (this.config.activityLogging.enabled) { + return await this._processLogged(job); + } else { + return await this._process(job); + } + } + + private async _processLogged(job: Bull.Job<InboxJobData>): Promise<string> { + const payload = job.data.activity; + const keyId = job.data.signature.keyId; + const log = this.createLog(payload, keyId); + + // Pre-save the activity in case it leads to a hard-crash. + if (this.config.activityLogging.preSave) { + await this.recordLog(log); + } + + try { + const result = await this._process(job, log); + + log.accepted = result.startsWith('ok'); + log.result = result; + + return result; + } catch (err) { + log.accepted = false; + log.result = String(err); + + throw err; + } finally { + // Save or finalize asynchronously + this.recordLog(log).catch(err => this.logger.error('Failed to record AP activity:', err)); + } + } + + private async _process(job: Bull.Job<InboxJobData>, log?: SkActivityLog): Promise<string> { const signature = job.data.signature; // HTTP-signature let activity = job.data.activity; @@ -197,6 +249,13 @@ export class InboxProcessorService implements OnApplicationShutdown { delete activity.id; } + // Attach log to verified user + if (log) { + log.verified = true; + log.authUser = authUser.user; + log.authUserId = authUser.user.id; + } + this.apRequestChart.inbox(); this.federationChart.inbox(authUser.user.host); @@ -292,4 +351,47 @@ export class InboxProcessorService implements OnApplicationShutdown { async onApplicationShutdown(signal?: string) { await this.dispose(); } + + private createLog(payload: IActivity, keyId: string): SkActivityLog { + const activity = Object.assign({}, payload, { '@context': undefined }) as unknown as JsonValue; + const host = this.utilityService.extractDbHost(keyId); + + const log = new SkActivityLog({ + id: this.idService.gen(), + at: new Date(), + verified: false, + accepted: false, + result: 'not processed', + activity, + keyId, + host, + }); + + const context = payload['@context']; + if (context) { + const md5 = createHash('md5').update(JSON.stringify(context)).digest('base64'); + log.contextHash = md5; + log.context = new SkActivityContext({ + md5, + json: context, + }); + } + + return log; + } + + private async recordLog(log: SkActivityLog): Promise<void> { + if (log.context) { + // https://stackoverflow.com/a/47064558 + await this.activityContextRepository + .createQueryBuilder('context_body') + .insert() + .into(SkActivityContext) + .values(log.context) + .orIgnore('md5') + .execute(); + } + + await this.activityLogsRepository.upsert(log, ['id']); + } } |