diff options
| author | dakkar <dakkar@thenautilus.net> | 2025-03-02 18:57:27 +0000 |
|---|---|---|
| committer | dakkar <dakkar@thenautilus.net> | 2025-03-02 18:57:27 +0000 |
| commit | 0b5e197afb5f9c2519c22324706c3b27d5d3eea3 (patch) | |
| tree | c73d3940938e4fd8cc515377f9334c4a48679c7b /packages/backend/src/queue | |
| parent | merge: pin corepack version (!885) (diff) | |
| parent | merge: Add/fix moderation logs for many endpoints (resolves #911 and #969) (!... (diff) | |
| download | sharkey-0b5e197afb5f9c2519c22324706c3b27d5d3eea3.tar.gz sharkey-0b5e197afb5f9c2519c22324706c3b27d5d3eea3.tar.bz2 sharkey-0b5e197afb5f9c2519c22324706c3b27d5d3eea3.zip | |
Merge branch 'develop' into release/2025.2.2
Diffstat (limited to 'packages/backend/src/queue')
9 files changed, 128 insertions, 31 deletions
diff --git a/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts index b81987cc15..ef21b6142e 100644 --- a/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts +++ b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts @@ -215,15 +215,10 @@ export class CheckModeratorsActivityProcessorService { // -- SystemWebhook - const systemWebhooks = await this.systemWebhookService.fetchActiveSystemWebhooks() - .then(it => it.filter(it => it.on.includes('inactiveModeratorsWarning'))); - for (const systemWebhook of systemWebhooks) { - this.systemWebhookService.enqueueSystemWebhook( - systemWebhook, - 'inactiveModeratorsWarning', - { remainingTime: remainingTime }, - ); - } + return this.systemWebhookService.enqueueSystemWebhook( + 'inactiveModeratorsWarning', + { remainingTime: remainingTime }, + ); } @bindThis @@ -253,15 +248,10 @@ export class CheckModeratorsActivityProcessorService { // -- SystemWebhook - const systemWebhooks = await this.systemWebhookService.fetchActiveSystemWebhooks() - .then(it => it.filter(it => it.on.includes('inactiveModeratorsInvitationOnlyChanged'))); - for (const systemWebhook of systemWebhooks) { - this.systemWebhookService.enqueueSystemWebhook( - systemWebhook, - 'inactiveModeratorsInvitationOnlyChanged', - {}, - ); - } + return this.systemWebhookService.enqueueSystemWebhook( + 'inactiveModeratorsInvitationOnlyChanged', + {}, + ); } @bindThis diff --git a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts index 19f98c0d51..8c5faa8d07 100644 --- a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts +++ b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts @@ -48,6 +48,7 @@ export class CleanChartsProcessorService { public async process(): Promise<void> { this.logger.info('Clean charts...'); + // DBへの同時接続を避けるためにPromise.allを使わずひとつずつ実行する await this.federationChart.clean(); await this.notesChart.clean(); await this.usersChart.clean(); diff --git a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts index 0e604a0501..0c70829132 100644 --- a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts +++ b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts @@ -6,7 +6,7 @@ import { Inject, Injectable } from '@nestjs/common'; import { MoreThan } from 'typeorm'; import { DI } from '@/di-symbols.js'; -import type { DriveFilesRepository, NoteReactionsRepository, NotesRepository, UserProfilesRepository, UsersRepository } from '@/models/_.js'; +import type { DriveFilesRepository, NoteReactionsRepository, NotesRepository, UserProfilesRepository, UsersRepository, NoteScheduleRepository, MiNoteSchedule } from '@/models/_.js'; import type Logger from '@/logger.js'; import { DriveService } from '@/core/DriveService.js'; import type { MiDriveFile } from '@/models/DriveFile.js'; @@ -15,10 +15,12 @@ import type { MiNoteReaction } from '@/models/NoteReaction.js'; import { EmailService } from '@/core/EmailService.js'; import { bindThis } from '@/decorators.js'; import { SearchService } from '@/core/SearchService.js'; +import { ApLogService } from '@/core/ApLogService.js'; +import { ReactionService } from '@/core/ReactionService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbUserDeleteJobData } from '../types.js'; -import { ReactionService } from '@/core/ReactionService.js'; +import { QueueService } from '@/core/QueueService.js'; @Injectable() export class DeleteAccountProcessorService { @@ -40,11 +42,16 @@ export class DeleteAccountProcessorService { @Inject(DI.noteReactionsRepository) private noteReactionsRepository: NoteReactionsRepository, + @Inject(DI.noteScheduleRepository) + private noteScheduleRepository: NoteScheduleRepository, + + private queueService: QueueService, private driveService: DriveService, private emailService: EmailService, private queueLoggerService: QueueLoggerService, private searchService: SearchService, private reactionService: ReactionService, + private readonly apLogService: ApLogService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('delete-account'); } @@ -58,6 +65,22 @@ export class DeleteAccountProcessorService { return; } + { // Delete scheduled notes + const scheduledNotes = await this.noteScheduleRepository.findBy({ + userId: user.id, + }) as MiNoteSchedule[]; + + for (const note of scheduledNotes) { + await this.queueService.ScheduleNotePostQueue.remove(`schedNote:${note.id}`); + } + + await this.noteScheduleRepository.delete({ + userId: user.id, + }); + + this.logger.succ('All scheduled notes deleted'); + } + { // Delete notes let cursor: MiNote['id'] | null = null; @@ -84,6 +107,13 @@ export class DeleteAccountProcessorService { for (const note of notes) { await this.searchService.unindexNote(note); } + + // Delete note AP logs + const noteUris = notes.map(n => n.uri).filter(u => !!u) as string[]; + if (noteUris.length > 0) { + await this.apLogService.deleteObjectLogs(noteUris) + .catch(err => this.logger.error(err, `Failed to delete AP logs for notes of user '${user.uri ?? user.id}'`)); + } } this.logger.succ('All of notes deleted'); @@ -149,6 +179,13 @@ export class DeleteAccountProcessorService { this.logger.succ('All of files deleted'); } + { // Delete actor logs + if (user.uri) { + await this.apLogService.deleteObjectLogs(user.uri) + .catch(err => this.logger.error(err, `Failed to delete AP logs for user '${user.uri}'`)); + } + } + { // Send email notification const profile = await this.userProfilesRepository.findOneByOrFail({ userId: user.id }); if (profile.email && profile.emailVerified) { diff --git a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts index 17ba71df3d..383fa0c26a 100644 --- a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts @@ -14,6 +14,7 @@ import { createTempDir } from '@/misc/create-temp.js'; import { DriveService } from '@/core/DriveService.js'; import { DownloadService } from '@/core/DownloadService.js'; import { bindThis } from '@/decorators.js'; +import type { Config } from '@/config.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbUserImportJobData } from '../types.js'; @@ -24,6 +25,9 @@ export class ImportCustomEmojisProcessorService { private logger: Logger; constructor( + @Inject(DI.config) + private config: Config, + @Inject(DI.driveFilesRepository) private driveFilesRepository: DriveFilesRepository, @@ -57,7 +61,7 @@ export class ImportCustomEmojisProcessorService { try { fs.writeFileSync(destPath, '', 'binary'); - await this.downloadService.downloadUrl(file.url, destPath); + await this.downloadService.downloadUrl(file.url, destPath, { operationTimeout: this.config.import?.downloadTimeout, maxSize: this.config.import?.maxFileSize }); } catch (e) { // TODO: 何度か再試行 if (e instanceof Error || typeof e === 'string') { this.logger.error(e); @@ -88,6 +92,7 @@ export class ImportCustomEmojisProcessorService { await this.emojisRepository.delete({ name: nameNfc, }); + try { const driveFile = await this.driveService.addFile({ user: null, @@ -96,11 +101,13 @@ export class ImportCustomEmojisProcessorService { force: true, }); await this.customEmojiService.add({ + originalUrl: driveFile.url, + publicUrl: driveFile.webpublicUrl ?? driveFile.url, + fileType: driveFile.webpublicType ?? driveFile.type, name: nameNfc, category: emojiInfo.category?.normalize('NFC'), host: null, aliases: emojiInfo.aliases?.map((a: string) => a.normalize('NFC')), - driveFile, license: emojiInfo.license, isSensitive: emojiInfo.isSensitive, localOnly: emojiInfo.localOnly, diff --git a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts index f89dc46722..ee9819b29f 100644 --- a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts @@ -626,7 +626,7 @@ export class ImportNotesProcessorService { if (!exists) { try { - await this.downloadService.downloadUrl(videos[0].url, filePath); + await this.downloadUrl(videos[0].url, filePath); } catch (e) { // TODO: 何度か再試行 this.logger.error(e instanceof Error ? e : new Error(e as string)); } @@ -651,7 +651,7 @@ export class ImportNotesProcessorService { if (!exists) { try { - await this.downloadService.downloadUrl(file.media_url_https, filePath); + await this.downloadUrl(file.media_url_https, filePath); } catch (e) { // TODO: 何度か再試行 this.logger.error(e instanceof Error ? e : new Error(e as string)); } diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 7727a3e985..35a0bf095d 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -29,6 +29,9 @@ import { CollapsedQueue } from '@/misc/collapsed-queue.js'; import { MiNote } from '@/models/Note.js'; import { MiMeta } from '@/models/Meta.js'; import { DI } from '@/di-symbols.js'; +import { SkApInboxLog } from '@/models/_.js'; +import type { Config } from '@/config.js'; +import { ApLogService, calculateDurationSince } from '@/core/ApLogService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { InboxJobData } from '../types.js'; @@ -46,6 +49,9 @@ export class InboxProcessorService implements OnApplicationShutdown { @Inject(DI.meta) private meta: MiMeta, + @Inject(DI.config) + private config: Config, + private utilityService: UtilityService, private apInboxService: ApInboxService, private federatedInstanceService: FederatedInstanceService, @@ -57,6 +63,7 @@ export class InboxProcessorService implements OnApplicationShutdown { private apRequestChart: ApRequestChart, private federationChart: FederationChart, private queueLoggerService: QueueLoggerService, + private readonly apLogService: ApLogService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('inbox'); this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance); @@ -64,6 +71,41 @@ export class InboxProcessorService implements OnApplicationShutdown { @bindThis public async process(job: Bull.Job<InboxJobData>): Promise<string> { + if (this.config.activityLogging.enabled) { + return await this._processLogged(job); + } else { + return await this._process(job); + } + } + + private async _processLogged(job: Bull.Job<InboxJobData>): Promise<string> { + const startTime = process.hrtime.bigint(); + const activity = job.data.activity; + const keyId = job.data.signature.keyId; + const log = await this.apLogService.createInboxLog({ activity, keyId }); + + try { + const result = await this._process(job, log); + + log.accepted = result.startsWith('ok'); + log.result = result; + + return result; + } catch (err) { + log.accepted = false; + log.result = String(err); + + throw err; + } finally { + log.duration = calculateDurationSince(startTime); + + // Save or finalize asynchronously + this.apLogService.saveInboxLog(log) + .catch(err => this.logger.error('Failed to record AP activity:', err)); + } + } + + private async _process(job: Bull.Job<InboxJobData>, log?: SkApInboxLog): Promise<string> { const signature = job.data.signature; // HTTP-signature let activity = job.data.activity; @@ -197,6 +239,13 @@ export class InboxProcessorService implements OnApplicationShutdown { delete activity.id; } + // Record verified user in log + if (log) { + log.verified = true; + log.authUser = authUser.user; + log.authUserId = authUser.user.id; + } + this.apRequestChart.inbox(); this.federationChart.inbox(authUser.user.host); @@ -248,6 +297,14 @@ export class InboxProcessorService implements OnApplicationShutdown { return `skip: permanent error ${e.statusCode}`; } + if (e instanceof IdentifiableError && !e.isRetryable) { + if (e.message) { + return `skip: permanent error ${e.id}: ${e.message}`; + } else { + return `skip: permanent error ${e.id}`; + } + } + throw e; } return 'ok'; diff --git a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts index 46e1adf173..0c47fdedb3 100644 --- a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts +++ b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts @@ -29,6 +29,7 @@ export class ResyncChartsProcessorService { public async process(): Promise<void> { this.logger.info('Resync charts...'); + // DBへの同時接続を避けるためにPromise.allを使わずひとつずつ実行する // TODO: ユーザーごとのチャートも更新する // TODO: インスタンスごとのチャートも更新する await this.driveChart.resync(); diff --git a/packages/backend/src/queue/processors/TickChartsProcessorService.ts b/packages/backend/src/queue/processors/TickChartsProcessorService.ts index c09cbccc57..fc8856a271 100644 --- a/packages/backend/src/queue/processors/TickChartsProcessorService.ts +++ b/packages/backend/src/queue/processors/TickChartsProcessorService.ts @@ -48,6 +48,7 @@ export class TickChartsProcessorService { public async process(): Promise<void> { this.logger.info('Tick charts...'); + // DBへの同時接続を避けるためにPromise.allを使わずひとつずつ実行する await this.federationChart.tick(false); await this.notesChart.tick(false); await this.usersChart.tick(false); diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index 9433392df5..a900675a86 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -6,9 +6,12 @@ import type { Antenna } from '@/server/api/endpoints/i/import-antennas.js'; import type { MiDriveFile } from '@/models/DriveFile.js'; import type { MiNote } from '@/models/Note.js'; +import type { SystemWebhookEventType } from '@/models/SystemWebhook.js'; import type { MiUser } from '@/models/User.js'; -import type { MiWebhook } from '@/models/Webhook.js'; +import type { MiWebhook, WebhookEventTypes } from '@/models/Webhook.js'; import type { IActivity } from '@/core/activitypub/type.js'; +import type { SystemWebhookPayload } from '@/core/SystemWebhookService.js'; +import type { UserWebhookPayload } from '@/core/UserWebhookService.js'; import type httpSignature from '@peertube/http-signature'; export type DeliverJobData = { @@ -131,9 +134,9 @@ export type EndedPollNotificationJobData = { noteId: MiNote['id']; }; -export type SystemWebhookDeliverJobData = { - type: string; - content: unknown; +export type SystemWebhookDeliverJobData<T extends SystemWebhookEventType = SystemWebhookEventType> = { + type: T; + content: SystemWebhookPayload<T>; webhookId: MiWebhook['id']; to: string; secret: string; @@ -141,9 +144,9 @@ export type SystemWebhookDeliverJobData = { eventId: string; }; -export type UserWebhookDeliverJobData = { - type: string; - content: unknown; +export type UserWebhookDeliverJobData<T extends WebhookEventTypes = WebhookEventTypes> = { + type: T; + content: UserWebhookPayload<T>; webhookId: MiWebhook['id']; userId: MiUser['id']; to: string; |