summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/processors
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/queue/processors')
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts82
1 files changed, 10 insertions, 72 deletions
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
index 4182f3e090..557a759136 100644
--- a/packages/backend/src/queue/processors/InboxProcessorService.ts
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -4,7 +4,6 @@
*/
import { URL } from 'node:url';
-import { createHash } from 'crypto';
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import httpSignature from '@peertube/http-signature';
import * as Bull from 'bullmq';
@@ -30,11 +29,9 @@ import { CollapsedQueue } from '@/misc/collapsed-queue.js';
import { MiNote } from '@/models/Note.js';
import { MiMeta } from '@/models/Meta.js';
import { DI } from '@/di-symbols.js';
-import { IdService } from '@/core/IdService.js';
-import { JsonValue } from '@/misc/json-value.js';
-import { SkApInboxLog, SkApContext } from '@/models/_.js';
-import type { ApInboxLogsRepository, ApContextsRepository } from '@/models/_.js';
+import { SkApInboxLog } from '@/models/_.js';
import type { Config } from '@/config.js';
+import { ApLogService, calculateDurationSince } from '@/core/ApLogService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type { InboxJobData } from '../types.js';
@@ -66,13 +63,7 @@ export class InboxProcessorService implements OnApplicationShutdown {
private apRequestChart: ApRequestChart,
private federationChart: FederationChart,
private queueLoggerService: QueueLoggerService,
- private idService: IdService,
-
- @Inject(DI.apContextsRepository)
- private apContextsRepository: ApContextsRepository,
-
- @Inject(DI.apInboxLogsRepository)
- private apInboxLogsRepository: ApInboxLogsRepository,
+ private readonly apLogService: ApLogService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('inbox');
this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance);
@@ -89,14 +80,9 @@ export class InboxProcessorService implements OnApplicationShutdown {
private async _processLogged(job: Bull.Job<InboxJobData>): Promise<string> {
const startTime = process.hrtime.bigint();
- const payload = job.data.activity;
+ const activity = job.data.activity;
const keyId = job.data.signature.keyId;
- const log = this.createLog(payload, keyId);
-
- // Pre-save the activity in case it leads to a hard-crash.
- if (this.config.activityLogging.preSave) {
- await this.recordLog(log);
- }
+ const log = await this.apLogService.createInboxLog({ activity, keyId });
try {
const result = await this._process(job, log);
@@ -111,24 +97,18 @@ export class InboxProcessorService implements OnApplicationShutdown {
throw err;
} finally {
- // Calculate the activity processing time with correct rounding and decimals.
- // 1. Truncate nanoseconds to microseconds
- // 2. Scale to 1/10 millisecond ticks.
- // 3. Round to nearest tick.
- // 4. Sale to milliseconds
- // Example: 123,456,789 ns -> 123,456 us -> 12,345.6 ticks -> 12,346 ticks -> 123.46 ms
- const endTime = process.hrtime.bigint();
- const duration = Math.round(Number((endTime - startTime) / 1000n) / 10) / 100;
- log.duration = duration;
+ const duration = log.duration = calculateDurationSince(startTime);
+ // TODO remove this
// Activities should time out after roughly 5 seconds.
// A runtime longer than 10 seconds could indicate a problem or attack.
if (duration > 10000) {
- this.logger.warn(`Activity ${JSON.stringify(payload.id)} by "${keyId}" took ${(duration / 1000).toFixed(1)} seconds to complete`);
+ this.logger.warn(`Activity ${JSON.stringify(activity.id)} by "${keyId}" took ${(duration / 1000).toFixed(1)} seconds to complete`);
}
// Save or finalize asynchronously
- this.recordLog(log).catch(err => this.logger.error('Failed to record AP activity:', err));
+ this.apLogService.saveInboxLog(log)
+ .catch(err => this.logger.error('Failed to record AP activity:', err));
}
}
@@ -368,46 +348,4 @@ export class InboxProcessorService implements OnApplicationShutdown {
async onApplicationShutdown(signal?: string) {
await this.dispose();
}
-
- private createLog(payload: IActivity, keyId: string): SkApInboxLog {
- const activity = Object.assign({}, payload, { '@context': undefined }) as unknown as JsonValue;
- const host = this.utilityService.extractDbHost(keyId);
-
- const log = new SkApInboxLog({
- id: this.idService.gen(),
- at: new Date(),
- verified: false,
- accepted: false,
- activity,
- keyId,
- host,
- });
-
- const context = payload['@context'];
- if (context) {
- const md5 = createHash('md5').update(JSON.stringify(context)).digest('base64');
- log.contextHash = md5;
- log.context = new SkApContext({
- md5,
- json: context,
- });
- }
-
- return log;
- }
-
- private async recordLog(log: SkApInboxLog): Promise<void> {
- if (log.context) {
- // https://stackoverflow.com/a/47064558
- await this.apContextsRepository
- .createQueryBuilder('activity_context')
- .insert()
- .into(SkApContext)
- .values(log.context)
- .orIgnore('md5')
- .execute();
- }
-
- await this.apInboxLogsRepository.upsert(log, ['id']);
- }
}