summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/processors
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/queue/processors')
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts102
1 files changed, 102 insertions, 0 deletions
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
index 87d4bf52fa..d40104ee9b 100644
--- a/packages/backend/src/queue/processors/InboxProcessorService.ts
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -4,6 +4,7 @@
*/
import { URL } from 'node:url';
+import { createHash } from 'crypto';
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import httpSignature from '@peertube/http-signature';
import * as Bull from 'bullmq';
@@ -29,6 +30,11 @@ import { CollapsedQueue } from '@/misc/collapsed-queue.js';
import { MiNote } from '@/models/Note.js';
import { MiMeta } from '@/models/Meta.js';
import { DI } from '@/di-symbols.js';
+import { IdService } from '@/core/IdService.js';
+import { JsonValue } from '@/misc/json-value.js';
+import { SkActivityLog, SkActivityContext } from '@/models/_.js';
+import type { ActivityLogsRepository, ActivityContextRepository } from '@/models/_.js';
+import type { Config } from '@/config.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type { InboxJobData } from '../types.js';
@@ -46,6 +52,9 @@ export class InboxProcessorService implements OnApplicationShutdown {
@Inject(DI.meta)
private meta: MiMeta,
+ @Inject(DI.config)
+ private config: Config,
+
private utilityService: UtilityService,
private apInboxService: ApInboxService,
private federatedInstanceService: FederatedInstanceService,
@@ -57,6 +66,13 @@ export class InboxProcessorService implements OnApplicationShutdown {
private apRequestChart: ApRequestChart,
private federationChart: FederationChart,
private queueLoggerService: QueueLoggerService,
+ private idService: IdService,
+
+ @Inject(DI.activityContextRepository)
+ private activityContextRepository: ActivityContextRepository,
+
+ @Inject(DI.activityLogsRepository)
+ private activityLogsRepository: ActivityLogsRepository,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('inbox');
this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance);
@@ -64,6 +80,42 @@ export class InboxProcessorService implements OnApplicationShutdown {
@bindThis
public async process(job: Bull.Job<InboxJobData>): Promise<string> {
+ if (this.config.activityLogging.enabled) {
+ return await this._processLogged(job);
+ } else {
+ return await this._process(job);
+ }
+ }
+
+ private async _processLogged(job: Bull.Job<InboxJobData>): Promise<string> {
+ const payload = job.data.activity;
+ const keyId = job.data.signature.keyId;
+ const log = this.createLog(payload, keyId);
+
+ // Pre-save the activity in case it leads to a hard-crash.
+ if (this.config.activityLogging.preSave) {
+ await this.recordLog(log);
+ }
+
+ try {
+ const result = await this._process(job, log);
+
+ log.accepted = result.startsWith('ok');
+ log.result = result;
+
+ return result;
+ } catch (err) {
+ log.accepted = false;
+ log.result = String(err);
+
+ throw err;
+ } finally {
+ // Save or finalize asynchronously
+ this.recordLog(log).catch(err => this.logger.error('Failed to record AP activity:', err));
+ }
+ }
+
+ private async _process(job: Bull.Job<InboxJobData>, log?: SkActivityLog): Promise<string> {
const signature = job.data.signature; // HTTP-signature
let activity = job.data.activity;
@@ -197,6 +249,13 @@ export class InboxProcessorService implements OnApplicationShutdown {
delete activity.id;
}
+ // Attach log to verified user
+ if (log) {
+ log.verified = true;
+ log.authUser = authUser.user;
+ log.authUserId = authUser.user.id;
+ }
+
this.apRequestChart.inbox();
this.federationChart.inbox(authUser.user.host);
@@ -292,4 +351,47 @@ export class InboxProcessorService implements OnApplicationShutdown {
async onApplicationShutdown(signal?: string) {
await this.dispose();
}
+
+ private createLog(payload: IActivity, keyId: string): SkActivityLog {
+ const activity = Object.assign({}, payload, { '@context': undefined }) as unknown as JsonValue;
+ const host = this.utilityService.extractDbHost(keyId);
+
+ const log = new SkActivityLog({
+ id: this.idService.gen(),
+ at: new Date(),
+ verified: false,
+ accepted: false,
+ result: 'not processed',
+ activity,
+ keyId,
+ host,
+ });
+
+ const context = payload['@context'];
+ if (context) {
+ const md5 = createHash('md5').update(JSON.stringify(context)).digest('base64');
+ log.contextHash = md5;
+ log.context = new SkActivityContext({
+ md5,
+ json: context,
+ });
+ }
+
+ return log;
+ }
+
+ private async recordLog(log: SkActivityLog): Promise<void> {
+ if (log.context) {
+ // https://stackoverflow.com/a/47064558
+ await this.activityContextRepository
+ .createQueryBuilder('context_body')
+ .insert()
+ .into(SkActivityContext)
+ .values(log.context)
+ .orIgnore('md5')
+ .execute();
+ }
+
+ await this.activityLogsRepository.upsert(log, ['id']);
+ }
}