summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/processors/InboxProcessorService.ts
diff options
context:
space:
mode:
authorJulia <julia@insertdomain.name>2025-03-02 19:54:32 +0000
committerJulia <julia@insertdomain.name>2025-03-02 19:54:32 +0000
commit9e13c375c5ef4103ad5ee87fea583b154e9e16f3 (patch)
treefe9e7b1a474e22fb0c37bd68cfd260f7ba39be74 /packages/backend/src/queue/processors/InboxProcessorService.ts
parentmerge: pin corepack version (!885) (diff)
parentbump version (diff)
downloadsharkey-9e13c375c5ef4103ad5ee87fea583b154e9e16f3.tar.gz
sharkey-9e13c375c5ef4103ad5ee87fea583b154e9e16f3.tar.bz2
sharkey-9e13c375c5ef4103ad5ee87fea583b154e9e16f3.zip
merge: 2025.2.2 (!927)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/927 Approved-by: Marie <github@yuugi.dev> Approved-by: Julia <julia@insertdomain.name>
Diffstat (limited to 'packages/backend/src/queue/processors/InboxProcessorService.ts')
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts57
1 files changed, 57 insertions, 0 deletions
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
index 7727a3e985..35a0bf095d 100644
--- a/packages/backend/src/queue/processors/InboxProcessorService.ts
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -29,6 +29,9 @@ import { CollapsedQueue } from '@/misc/collapsed-queue.js';
import { MiNote } from '@/models/Note.js';
import { MiMeta } from '@/models/Meta.js';
import { DI } from '@/di-symbols.js';
+import { SkApInboxLog } from '@/models/_.js';
+import type { Config } from '@/config.js';
+import { ApLogService, calculateDurationSince } from '@/core/ApLogService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type { InboxJobData } from '../types.js';
@@ -46,6 +49,9 @@ export class InboxProcessorService implements OnApplicationShutdown {
@Inject(DI.meta)
private meta: MiMeta,
+ @Inject(DI.config)
+ private config: Config,
+
private utilityService: UtilityService,
private apInboxService: ApInboxService,
private federatedInstanceService: FederatedInstanceService,
@@ -57,6 +63,7 @@ export class InboxProcessorService implements OnApplicationShutdown {
private apRequestChart: ApRequestChart,
private federationChart: FederationChart,
private queueLoggerService: QueueLoggerService,
+ private readonly apLogService: ApLogService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('inbox');
this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance);
@@ -64,6 +71,41 @@ export class InboxProcessorService implements OnApplicationShutdown {
@bindThis
public async process(job: Bull.Job<InboxJobData>): Promise<string> {
+ if (this.config.activityLogging.enabled) {
+ return await this._processLogged(job);
+ } else {
+ return await this._process(job);
+ }
+ }
+
+ private async _processLogged(job: Bull.Job<InboxJobData>): Promise<string> {
+ const startTime = process.hrtime.bigint();
+ const activity = job.data.activity;
+ const keyId = job.data.signature.keyId;
+ const log = await this.apLogService.createInboxLog({ activity, keyId });
+
+ try {
+ const result = await this._process(job, log);
+
+ log.accepted = result.startsWith('ok');
+ log.result = result;
+
+ return result;
+ } catch (err) {
+ log.accepted = false;
+ log.result = String(err);
+
+ throw err;
+ } finally {
+ log.duration = calculateDurationSince(startTime);
+
+ // Save or finalize asynchronously
+ this.apLogService.saveInboxLog(log)
+ .catch(err => this.logger.error('Failed to record AP activity:', err));
+ }
+ }
+
+ private async _process(job: Bull.Job<InboxJobData>, log?: SkApInboxLog): Promise<string> {
const signature = job.data.signature; // HTTP-signature
let activity = job.data.activity;
@@ -197,6 +239,13 @@ export class InboxProcessorService implements OnApplicationShutdown {
delete activity.id;
}
+ // Record verified user in log
+ if (log) {
+ log.verified = true;
+ log.authUser = authUser.user;
+ log.authUserId = authUser.user.id;
+ }
+
this.apRequestChart.inbox();
this.federationChart.inbox(authUser.user.host);
@@ -248,6 +297,14 @@ export class InboxProcessorService implements OnApplicationShutdown {
return `skip: permanent error ${e.statusCode}`;
}
+ if (e instanceof IdentifiableError && !e.isRetryable) {
+ if (e.message) {
+ return `skip: permanent error ${e.id}: ${e.message}`;
+ } else {
+ return `skip: permanent error ${e.id}`;
+ }
+ }
+
throw e;
}
return 'ok';