summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue
diff options
context:
space:
mode:
authorHazelnoot <acomputerdog@gmail.com>2025-02-18 19:51:13 +0000
committerHazelnoot <acomputerdog@gmail.com>2025-02-18 19:51:13 +0000
commitc28b27b57f2329ab72a768bb6a1074adeb4dc2e7 (patch)
tree65be25d7ca2da4306033448d628807acf0518e74 /packages/backend/src/queue
parentmerge: Fix error message when a peertube object is rejected for bad ID / URL ... (diff)
parentdelete fetch logs when a note or user is deleted (diff)
downloadsharkey-c28b27b57f2329ab72a768bb6a1074adeb4dc2e7.tar.gz
sharkey-c28b27b57f2329ab72a768bb6a1074adeb4dc2e7.tar.bz2
sharkey-c28b27b57f2329ab72a768bb6a1074adeb4dc2e7.zip
merge: Optionally log remote ActivityPub objects to database (!833)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/833 Approved-by: dakkar <dakkar@thenautilus.net> Approved-by: Marie <github@yuugi.dev>
Diffstat (limited to 'packages/backend/src/queue')
-rw-r--r--packages/backend/src/queue/processors/DeleteAccountProcessorService.ts16
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts49
2 files changed, 65 insertions, 0 deletions
diff --git a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
index e350b97f53..66bed72f18 100644
--- a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
@@ -15,6 +15,7 @@ import type { MiNoteReaction } from '@/models/NoteReaction.js';
import { EmailService } from '@/core/EmailService.js';
import { bindThis } from '@/decorators.js';
import { SearchService } from '@/core/SearchService.js';
+import { ApLogService } from '@/core/ApLogService.js';
import { ReactionService } from '@/core/ReactionService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
@@ -45,6 +46,7 @@ export class DeleteAccountProcessorService {
private queueLoggerService: QueueLoggerService,
private searchService: SearchService,
private reactionService: ReactionService,
+ private readonly apLogService: ApLogService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('delete-account');
}
@@ -84,6 +86,13 @@ export class DeleteAccountProcessorService {
for (const note of notes) {
await this.searchService.unindexNote(note);
}
+
+ // Delete note AP logs
+ const noteUris = notes.map(n => n.uri).filter(u => !!u) as string[];
+ if (noteUris.length > 0) {
+ await this.apLogService.deleteObjectLogs(noteUris)
+ .catch(err => this.logger.error(err, `Failed to delete AP logs for notes of user '${user.uri ?? user.id}'`));
+ }
}
this.logger.succ('All of notes deleted');
@@ -149,6 +158,13 @@ export class DeleteAccountProcessorService {
this.logger.succ('All of files deleted');
}
+ { // Delete actor logs
+ if (user.uri) {
+ await this.apLogService.deleteObjectLogs(user.uri)
+ .catch(err => this.logger.error(err, `Failed to delete AP logs for user '${user.uri}'`));
+ }
+ }
+
{ // Send email notification
const profile = await this.userProfilesRepository.findOneByOrFail({ userId: user.id });
if (profile.email && profile.emailVerified) {
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
index 87d4bf52fa..35a0bf095d 100644
--- a/packages/backend/src/queue/processors/InboxProcessorService.ts
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -29,6 +29,9 @@ import { CollapsedQueue } from '@/misc/collapsed-queue.js';
import { MiNote } from '@/models/Note.js';
import { MiMeta } from '@/models/Meta.js';
import { DI } from '@/di-symbols.js';
+import { SkApInboxLog } from '@/models/_.js';
+import type { Config } from '@/config.js';
+import { ApLogService, calculateDurationSince } from '@/core/ApLogService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type { InboxJobData } from '../types.js';
@@ -46,6 +49,9 @@ export class InboxProcessorService implements OnApplicationShutdown {
@Inject(DI.meta)
private meta: MiMeta,
+ @Inject(DI.config)
+ private config: Config,
+
private utilityService: UtilityService,
private apInboxService: ApInboxService,
private federatedInstanceService: FederatedInstanceService,
@@ -57,6 +63,7 @@ export class InboxProcessorService implements OnApplicationShutdown {
private apRequestChart: ApRequestChart,
private federationChart: FederationChart,
private queueLoggerService: QueueLoggerService,
+ private readonly apLogService: ApLogService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('inbox');
this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance);
@@ -64,6 +71,41 @@ export class InboxProcessorService implements OnApplicationShutdown {
@bindThis
public async process(job: Bull.Job<InboxJobData>): Promise<string> {
+ if (this.config.activityLogging.enabled) {
+ return await this._processLogged(job);
+ } else {
+ return await this._process(job);
+ }
+ }
+
+ private async _processLogged(job: Bull.Job<InboxJobData>): Promise<string> {
+ const startTime = process.hrtime.bigint();
+ const activity = job.data.activity;
+ const keyId = job.data.signature.keyId;
+ const log = await this.apLogService.createInboxLog({ activity, keyId });
+
+ try {
+ const result = await this._process(job, log);
+
+ log.accepted = result.startsWith('ok');
+ log.result = result;
+
+ return result;
+ } catch (err) {
+ log.accepted = false;
+ log.result = String(err);
+
+ throw err;
+ } finally {
+ log.duration = calculateDurationSince(startTime);
+
+ // Save or finalize asynchronously
+ this.apLogService.saveInboxLog(log)
+ .catch(err => this.logger.error('Failed to record AP activity:', err));
+ }
+ }
+
+ private async _process(job: Bull.Job<InboxJobData>, log?: SkApInboxLog): Promise<string> {
const signature = job.data.signature; // HTTP-signature
let activity = job.data.activity;
@@ -197,6 +239,13 @@ export class InboxProcessorService implements OnApplicationShutdown {
delete activity.id;
}
+ // Record verified user in log
+ if (log) {
+ log.verified = true;
+ log.authUser = authUser.user;
+ log.authUserId = authUser.user.id;
+ }
+
this.apRequestChart.inbox();
this.federationChart.inbox(authUser.user.host);