summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/processors
diff options
context:
space:
mode:
authorJulia <julia@insertdomain.name>2025-03-02 19:54:32 +0000
committerJulia <julia@insertdomain.name>2025-03-02 19:54:32 +0000
commit9e13c375c5ef4103ad5ee87fea583b154e9e16f3 (patch)
treefe9e7b1a474e22fb0c37bd68cfd260f7ba39be74 /packages/backend/src/queue/processors
parentmerge: pin corepack version (!885) (diff)
parentbump version (diff)
downloadsharkey-9e13c375c5ef4103ad5ee87fea583b154e9e16f3.tar.gz
sharkey-9e13c375c5ef4103ad5ee87fea583b154e9e16f3.tar.bz2
sharkey-9e13c375c5ef4103ad5ee87fea583b154e9e16f3.zip
merge: 2025.2.2 (!927)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/927 Approved-by: Marie <github@yuugi.dev> Approved-by: Julia <julia@insertdomain.name>
Diffstat (limited to 'packages/backend/src/queue/processors')
-rw-r--r--packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts26
-rw-r--r--packages/backend/src/queue/processors/CleanChartsProcessorService.ts1
-rw-r--r--packages/backend/src/queue/processors/DeleteAccountProcessorService.ts41
-rw-r--r--packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts11
-rw-r--r--packages/backend/src/queue/processors/ImportNotesProcessorService.ts4
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts57
-rw-r--r--packages/backend/src/queue/processors/ResyncChartsProcessorService.ts1
-rw-r--r--packages/backend/src/queue/processors/TickChartsProcessorService.ts1
8 files changed, 118 insertions, 24 deletions
diff --git a/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts
index b81987cc15..ef21b6142e 100644
--- a/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts
+++ b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts
@@ -215,15 +215,10 @@ export class CheckModeratorsActivityProcessorService {
// -- SystemWebhook
- const systemWebhooks = await this.systemWebhookService.fetchActiveSystemWebhooks()
- .then(it => it.filter(it => it.on.includes('inactiveModeratorsWarning')));
- for (const systemWebhook of systemWebhooks) {
- this.systemWebhookService.enqueueSystemWebhook(
- systemWebhook,
- 'inactiveModeratorsWarning',
- { remainingTime: remainingTime },
- );
- }
+ return this.systemWebhookService.enqueueSystemWebhook(
+ 'inactiveModeratorsWarning',
+ { remainingTime: remainingTime },
+ );
}
@bindThis
@@ -253,15 +248,10 @@ export class CheckModeratorsActivityProcessorService {
// -- SystemWebhook
- const systemWebhooks = await this.systemWebhookService.fetchActiveSystemWebhooks()
- .then(it => it.filter(it => it.on.includes('inactiveModeratorsInvitationOnlyChanged')));
- for (const systemWebhook of systemWebhooks) {
- this.systemWebhookService.enqueueSystemWebhook(
- systemWebhook,
- 'inactiveModeratorsInvitationOnlyChanged',
- {},
- );
- }
+ return this.systemWebhookService.enqueueSystemWebhook(
+ 'inactiveModeratorsInvitationOnlyChanged',
+ {},
+ );
}
@bindThis
diff --git a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts
index 19f98c0d51..8c5faa8d07 100644
--- a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts
+++ b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts
@@ -48,6 +48,7 @@ export class CleanChartsProcessorService {
public async process(): Promise<void> {
this.logger.info('Clean charts...');
+ // DBへの同時接続を避けるためにPromise.allを使わずひとつずつ実行する
await this.federationChart.clean();
await this.notesChart.clean();
await this.usersChart.clean();
diff --git a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
index 0e604a0501..0c70829132 100644
--- a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
@@ -6,7 +6,7 @@
import { Inject, Injectable } from '@nestjs/common';
import { MoreThan } from 'typeorm';
import { DI } from '@/di-symbols.js';
-import type { DriveFilesRepository, NoteReactionsRepository, NotesRepository, UserProfilesRepository, UsersRepository } from '@/models/_.js';
+import type { DriveFilesRepository, NoteReactionsRepository, NotesRepository, UserProfilesRepository, UsersRepository, NoteScheduleRepository, MiNoteSchedule } from '@/models/_.js';
import type Logger from '@/logger.js';
import { DriveService } from '@/core/DriveService.js';
import type { MiDriveFile } from '@/models/DriveFile.js';
@@ -15,10 +15,12 @@ import type { MiNoteReaction } from '@/models/NoteReaction.js';
import { EmailService } from '@/core/EmailService.js';
import { bindThis } from '@/decorators.js';
import { SearchService } from '@/core/SearchService.js';
+import { ApLogService } from '@/core/ApLogService.js';
+import { ReactionService } from '@/core/ReactionService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
import type { DbUserDeleteJobData } from '../types.js';
-import { ReactionService } from '@/core/ReactionService.js';
+import { QueueService } from '@/core/QueueService.js';
@Injectable()
export class DeleteAccountProcessorService {
@@ -40,11 +42,16 @@ export class DeleteAccountProcessorService {
@Inject(DI.noteReactionsRepository)
private noteReactionsRepository: NoteReactionsRepository,
+ @Inject(DI.noteScheduleRepository)
+ private noteScheduleRepository: NoteScheduleRepository,
+
+ private queueService: QueueService,
private driveService: DriveService,
private emailService: EmailService,
private queueLoggerService: QueueLoggerService,
private searchService: SearchService,
private reactionService: ReactionService,
+ private readonly apLogService: ApLogService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('delete-account');
}
@@ -58,6 +65,22 @@ export class DeleteAccountProcessorService {
return;
}
+ { // Delete scheduled notes
+ const scheduledNotes = await this.noteScheduleRepository.findBy({
+ userId: user.id,
+ }) as MiNoteSchedule[];
+
+ for (const note of scheduledNotes) {
+ await this.queueService.ScheduleNotePostQueue.remove(`schedNote:${note.id}`);
+ }
+
+ await this.noteScheduleRepository.delete({
+ userId: user.id,
+ });
+
+ this.logger.succ('All scheduled notes deleted');
+ }
+
{ // Delete notes
let cursor: MiNote['id'] | null = null;
@@ -84,6 +107,13 @@ export class DeleteAccountProcessorService {
for (const note of notes) {
await this.searchService.unindexNote(note);
}
+
+ // Delete note AP logs
+ const noteUris = notes.map(n => n.uri).filter(u => !!u) as string[];
+ if (noteUris.length > 0) {
+ await this.apLogService.deleteObjectLogs(noteUris)
+ .catch(err => this.logger.error(err, `Failed to delete AP logs for notes of user '${user.uri ?? user.id}'`));
+ }
}
this.logger.succ('All of notes deleted');
@@ -149,6 +179,13 @@ export class DeleteAccountProcessorService {
this.logger.succ('All of files deleted');
}
+ { // Delete actor logs
+ if (user.uri) {
+ await this.apLogService.deleteObjectLogs(user.uri)
+ .catch(err => this.logger.error(err, `Failed to delete AP logs for user '${user.uri}'`));
+ }
+ }
+
{ // Send email notification
const profile = await this.userProfilesRepository.findOneByOrFail({ userId: user.id });
if (profile.email && profile.emailVerified) {
diff --git a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
index 17ba71df3d..383fa0c26a 100644
--- a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
@@ -14,6 +14,7 @@ import { createTempDir } from '@/misc/create-temp.js';
import { DriveService } from '@/core/DriveService.js';
import { DownloadService } from '@/core/DownloadService.js';
import { bindThis } from '@/decorators.js';
+import type { Config } from '@/config.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
import type { DbUserImportJobData } from '../types.js';
@@ -24,6 +25,9 @@ export class ImportCustomEmojisProcessorService {
private logger: Logger;
constructor(
+ @Inject(DI.config)
+ private config: Config,
+
@Inject(DI.driveFilesRepository)
private driveFilesRepository: DriveFilesRepository,
@@ -57,7 +61,7 @@ export class ImportCustomEmojisProcessorService {
try {
fs.writeFileSync(destPath, '', 'binary');
- await this.downloadService.downloadUrl(file.url, destPath);
+ await this.downloadService.downloadUrl(file.url, destPath, { operationTimeout: this.config.import?.downloadTimeout, maxSize: this.config.import?.maxFileSize });
} catch (e) { // TODO: 何度か再試行
if (e instanceof Error || typeof e === 'string') {
this.logger.error(e);
@@ -88,6 +92,7 @@ export class ImportCustomEmojisProcessorService {
await this.emojisRepository.delete({
name: nameNfc,
});
+
try {
const driveFile = await this.driveService.addFile({
user: null,
@@ -96,11 +101,13 @@ export class ImportCustomEmojisProcessorService {
force: true,
});
await this.customEmojiService.add({
+ originalUrl: driveFile.url,
+ publicUrl: driveFile.webpublicUrl ?? driveFile.url,
+ fileType: driveFile.webpublicType ?? driveFile.type,
name: nameNfc,
category: emojiInfo.category?.normalize('NFC'),
host: null,
aliases: emojiInfo.aliases?.map((a: string) => a.normalize('NFC')),
- driveFile,
license: emojiInfo.license,
isSensitive: emojiInfo.isSensitive,
localOnly: emojiInfo.localOnly,
diff --git a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts
index f89dc46722..ee9819b29f 100644
--- a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts
@@ -626,7 +626,7 @@ export class ImportNotesProcessorService {
if (!exists) {
try {
- await this.downloadService.downloadUrl(videos[0].url, filePath);
+ await this.downloadUrl(videos[0].url, filePath);
} catch (e) { // TODO: 何度か再試行
this.logger.error(e instanceof Error ? e : new Error(e as string));
}
@@ -651,7 +651,7 @@ export class ImportNotesProcessorService {
if (!exists) {
try {
- await this.downloadService.downloadUrl(file.media_url_https, filePath);
+ await this.downloadUrl(file.media_url_https, filePath);
} catch (e) { // TODO: 何度か再試行
this.logger.error(e instanceof Error ? e : new Error(e as string));
}
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
index 7727a3e985..35a0bf095d 100644
--- a/packages/backend/src/queue/processors/InboxProcessorService.ts
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -29,6 +29,9 @@ import { CollapsedQueue } from '@/misc/collapsed-queue.js';
import { MiNote } from '@/models/Note.js';
import { MiMeta } from '@/models/Meta.js';
import { DI } from '@/di-symbols.js';
+import { SkApInboxLog } from '@/models/_.js';
+import type { Config } from '@/config.js';
+import { ApLogService, calculateDurationSince } from '@/core/ApLogService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type { InboxJobData } from '../types.js';
@@ -46,6 +49,9 @@ export class InboxProcessorService implements OnApplicationShutdown {
@Inject(DI.meta)
private meta: MiMeta,
+ @Inject(DI.config)
+ private config: Config,
+
private utilityService: UtilityService,
private apInboxService: ApInboxService,
private federatedInstanceService: FederatedInstanceService,
@@ -57,6 +63,7 @@ export class InboxProcessorService implements OnApplicationShutdown {
private apRequestChart: ApRequestChart,
private federationChart: FederationChart,
private queueLoggerService: QueueLoggerService,
+ private readonly apLogService: ApLogService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('inbox');
this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance);
@@ -64,6 +71,41 @@ export class InboxProcessorService implements OnApplicationShutdown {
@bindThis
public async process(job: Bull.Job<InboxJobData>): Promise<string> {
+ if (this.config.activityLogging.enabled) {
+ return await this._processLogged(job);
+ } else {
+ return await this._process(job);
+ }
+ }
+
+ private async _processLogged(job: Bull.Job<InboxJobData>): Promise<string> {
+ const startTime = process.hrtime.bigint();
+ const activity = job.data.activity;
+ const keyId = job.data.signature.keyId;
+ const log = await this.apLogService.createInboxLog({ activity, keyId });
+
+ try {
+ const result = await this._process(job, log);
+
+ log.accepted = result.startsWith('ok');
+ log.result = result;
+
+ return result;
+ } catch (err) {
+ log.accepted = false;
+ log.result = String(err);
+
+ throw err;
+ } finally {
+ log.duration = calculateDurationSince(startTime);
+
+ // Save or finalize asynchronously
+ this.apLogService.saveInboxLog(log)
+ .catch(err => this.logger.error('Failed to record AP activity:', err));
+ }
+ }
+
+ private async _process(job: Bull.Job<InboxJobData>, log?: SkApInboxLog): Promise<string> {
const signature = job.data.signature; // HTTP-signature
let activity = job.data.activity;
@@ -197,6 +239,13 @@ export class InboxProcessorService implements OnApplicationShutdown {
delete activity.id;
}
+ // Record verified user in log
+ if (log) {
+ log.verified = true;
+ log.authUser = authUser.user;
+ log.authUserId = authUser.user.id;
+ }
+
this.apRequestChart.inbox();
this.federationChart.inbox(authUser.user.host);
@@ -248,6 +297,14 @@ export class InboxProcessorService implements OnApplicationShutdown {
return `skip: permanent error ${e.statusCode}`;
}
+ if (e instanceof IdentifiableError && !e.isRetryable) {
+ if (e.message) {
+ return `skip: permanent error ${e.id}: ${e.message}`;
+ } else {
+ return `skip: permanent error ${e.id}`;
+ }
+ }
+
throw e;
}
return 'ok';
diff --git a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts
index 46e1adf173..0c47fdedb3 100644
--- a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts
+++ b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts
@@ -29,6 +29,7 @@ export class ResyncChartsProcessorService {
public async process(): Promise<void> {
this.logger.info('Resync charts...');
+ // DBへの同時接続を避けるためにPromise.allを使わずひとつずつ実行する
// TODO: ユーザーごとのチャートも更新する
// TODO: インスタンスごとのチャートも更新する
await this.driveChart.resync();
diff --git a/packages/backend/src/queue/processors/TickChartsProcessorService.ts b/packages/backend/src/queue/processors/TickChartsProcessorService.ts
index c09cbccc57..fc8856a271 100644
--- a/packages/backend/src/queue/processors/TickChartsProcessorService.ts
+++ b/packages/backend/src/queue/processors/TickChartsProcessorService.ts
@@ -48,6 +48,7 @@ export class TickChartsProcessorService {
public async process(): Promise<void> {
this.logger.info('Tick charts...');
+ // DBへの同時接続を避けるためにPromise.allを使わずひとつずつ実行する
await this.federationChart.tick(false);
await this.notesChart.tick(false);
await this.usersChart.tick(false);