diff options
| author | Hazelnoot <acomputerdog@gmail.com> | 2025-05-07 16:33:18 +0000 |
|---|---|---|
| committer | Hazelnoot <acomputerdog@gmail.com> | 2025-05-07 16:33:18 +0000 |
| commit | d39a56c1b7d74dd07cc78b4c82a6fb6e51036252 (patch) | |
| tree | 24f9c6baa07fadc11c791f1a59bee2c3149cbf56 /packages/backend/src/queue | |
| parent | merge: Add BunnyCDN Edge Storage support (!952) (diff) | |
| parent | isNotUserHome > isUserHome (diff) | |
| download | sharkey-d39a56c1b7d74dd07cc78b4c82a6fb6e51036252.tar.gz sharkey-d39a56c1b7d74dd07cc78b4c82a6fb6e51036252.tar.bz2 sharkey-d39a56c1b7d74dd07cc78b4c82a6fb6e51036252.zip | |
merge: Merge upstream 2025.4.1 (!955)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/955
Closes #638, #1037, #734, and #766
Approved-by: dakkar <dakkar@thenautilus.net>
Approved-by: Marie <github@yuugi.dev>
Diffstat (limited to 'packages/backend/src/queue')
8 files changed, 38 insertions, 24 deletions
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index 297edfd545..7f7ce2452c 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -47,7 +47,7 @@ import { CleanProcessorService } from './processors/CleanProcessorService.js'; import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js'; import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostProcessorService.js'; import { QueueLoggerService } from './QueueLoggerService.js'; -import { QUEUE, baseQueueOptions } from './const.js'; +import { QUEUE, baseWorkerOptions } from './const.js'; import { ImportNotesProcessorService } from './processors/ImportNotesProcessorService.js'; // ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019 @@ -186,7 +186,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return processer(job); } }, { - ...baseQueueOptions(this.config, QUEUE.SYSTEM), + ...baseWorkerOptions(this.config, QUEUE.SYSTEM), autorun: false, }); @@ -251,7 +251,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return processer(job); } }, { - ...baseQueueOptions(this.config, QUEUE.DB), + ...baseWorkerOptions(this.config, QUEUE.DB), autorun: false, }); @@ -283,7 +283,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return this.deliverProcessorService.process(job); } }, { - ...baseQueueOptions(this.config, QUEUE.DELIVER), + ...baseWorkerOptions(this.config, QUEUE.DELIVER), autorun: false, concurrency: this.config.deliverJobConcurrency ?? 128, limiter: { @@ -323,7 +323,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return this.inboxProcessorService.process(job); } }, { - ...baseQueueOptions(this.config, QUEUE.INBOX), + ...baseWorkerOptions(this.config, QUEUE.INBOX), autorun: false, concurrency: this.config.inboxJobConcurrency ?? 16, limiter: { @@ -363,7 +363,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return this.userWebhookDeliverProcessorService.process(job); } }, { - ...baseQueueOptions(this.config, QUEUE.USER_WEBHOOK_DELIVER), + ...baseWorkerOptions(this.config, QUEUE.USER_WEBHOOK_DELIVER), autorun: false, concurrency: 64, limiter: { @@ -403,7 +403,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return this.systemWebhookDeliverProcessorService.process(job); } }, { - ...baseQueueOptions(this.config, QUEUE.SYSTEM_WEBHOOK_DELIVER), + ...baseWorkerOptions(this.config, QUEUE.SYSTEM_WEBHOOK_DELIVER), autorun: false, concurrency: 16, limiter: { @@ -453,7 +453,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return processer(job); } }, { - ...baseQueueOptions(this.config, QUEUE.RELATIONSHIP), + ...baseWorkerOptions(this.config, QUEUE.RELATIONSHIP), autorun: false, concurrency: this.config.relationshipJobConcurrency ?? 16, limiter: { @@ -498,7 +498,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return processer(job); } }, { - ...baseQueueOptions(this.config, QUEUE.OBJECT_STORAGE), + ...baseWorkerOptions(this.config, QUEUE.OBJECT_STORAGE), autorun: false, concurrency: 16, }); @@ -531,7 +531,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return this.endedPollNotificationProcessorService.process(job); } }, { - ...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION), + ...baseWorkerOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION), autorun: false, }); } @@ -540,7 +540,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region schedule note post { this.schedulerNotePostQueueWorker = new Bull.Worker(QUEUE.SCHEDULE_NOTE_POST, (job) => this.scheduleNotePostProcessorService.process(job), { - ...baseQueueOptions(this.config, QUEUE.SCHEDULE_NOTE_POST), + ...baseWorkerOptions(this.config, QUEUE.SCHEDULE_NOTE_POST), autorun: false, }); } diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts index fdf012f149..17c6b81736 100644 --- a/packages/backend/src/queue/const.ts +++ b/packages/backend/src/queue/const.ts @@ -3,6 +3,7 @@ * SPDX-License-Identifier: AGPL-3.0-only */ +import { MetricsTime } from 'bullmq'; import { Config } from '@/config.js'; import type * as Bull from 'bullmq'; @@ -28,3 +29,12 @@ export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof t prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue:${queueName}` : `queue:${queueName}`, }; } + +export function baseWorkerOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.WorkerOptions { + return { + ...baseQueueOptions(config, queueName), + metrics: { + maxDataPoints: MetricsTime.ONE_WEEK, + }, + }; +} diff --git a/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts index ef21b6142e..db8d2e789e 100644 --- a/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts +++ b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts @@ -29,7 +29,7 @@ export type ModeratorInactivityEvaluationResult = { isModeratorsInactive: boolean; inactiveModerators: MiUser[]; remainingTime: ModeratorInactivityRemainingTime; -} +}; export type ModeratorInactivityRemainingTime = { time: number; diff --git a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts index 0c70829132..46cee096cf 100644 --- a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts +++ b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts @@ -184,6 +184,11 @@ export class DeleteAccountProcessorService { await this.apLogService.deleteObjectLogs(user.uri) .catch(err => this.logger.error(err, `Failed to delete AP logs for user '${user.uri}'`)); } + + await this.apLogService.deleteInboxLogs(user.id) + .catch(err => this.logger.error(err, `Failed to delete AP logs for user '${user.uri}'`)); + + this.logger.succ('All AP logs deleted'); } { // Send email notification diff --git a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts index 383fa0c26a..d08cadd378 100644 --- a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts @@ -6,6 +6,7 @@ import * as fs from 'node:fs'; import { Inject, Injectable } from '@nestjs/common'; import { ZipReader } from 'slacc'; +import { IsNull } from 'typeorm'; import { DI } from '@/di-symbols.js'; import type { EmojisRepository, DriveFilesRepository } from '@/models/_.js'; import type Logger from '@/logger.js'; @@ -91,6 +92,7 @@ export class ImportCustomEmojisProcessorService { const emojiPath = outputPath + '/' + record.fileName; await this.emojisRepository.delete({ name: nameNfc, + host: IsNull(), }); try { diff --git a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts index ee9819b29f..5e660e8081 100644 --- a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts @@ -83,7 +83,7 @@ export class ImportNotesProcessorService { } @bindThis - private downloadUrl(url: string, path:string): Promise<{filename: string}> { + private downloadUrl(url: string, path:string): Promise<{ filename: string }> { return this.downloadService.downloadUrl(url, path, { operationTimeout: this.config.import?.downloadTimeout, maxSize: this.config.import?.maxFileSize }); } diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index fc7c66591a..9564724c62 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -25,8 +25,6 @@ import { JsonLdService } from '@/core/activitypub/JsonLdService.js'; import { ApInboxService } from '@/core/activitypub/ApInboxService.js'; import { bindThis } from '@/decorators.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; -//import { CollapsedQueue } from '@/misc/collapsed-queue.js'; -//import { MiNote } from '@/models/Note.js'; import { MiMeta } from '@/models/Meta.js'; import { DI } from '@/di-symbols.js'; import { SkApInboxLog } from '@/models/_.js'; @@ -68,7 +66,6 @@ export class InboxProcessorService implements OnApplicationShutdown { private readonly updateInstanceQueue: UpdateInstanceQueue, ) { this.logger = this.queueLoggerService.logger.createSubLogger('inbox'); - //this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance); } @bindThis @@ -151,12 +148,12 @@ export class InboxProcessorService implements OnApplicationShutdown { // それでもわからなければ終了 if (authUser == null) { - throw new Bull.UnrecoverableError('skip: failed to resolve user'); + throw new Bull.UnrecoverableError(`skip: failed to resolve user ${getApId(activity.actor)}`); } // publicKey がなくても終了 if (authUser.key == null) { - throw new Bull.UnrecoverableError('skip: failed to resolve user publicKey'); + throw new Bull.UnrecoverableError(`skip: failed to resolve user publicKey ${getApId(activity.actor)}`); } // HTTP-Signatureの検証 diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index a900675a86..1bd9f7a0ab 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -38,7 +38,7 @@ export type RelationshipJobData = { silent?: boolean; requestId?: string; withReplies?: boolean; -} +}; export type DbJobData<T extends keyof DbJobMap> = DbJobMap[T]; @@ -69,11 +69,11 @@ export type DbJobMap = { importUserLists: DbUserImportJobData; importCustomEmojis: DbUserImportJobData; deleteAccount: DbUserDeleteJobData; -} +}; export type DbJobDataWithUser = { user: ThinUser; -} +}; export type DbExportFollowingData = { user: ThinUser; @@ -83,7 +83,7 @@ export type DbExportFollowingData = { export type DBExportAntennasData = { user: ThinUser -} +}; export type DbUserDeleteJobData = { user: ThinUser; @@ -105,7 +105,7 @@ export type DbNoteImportJobData = { export type DBAntennaImportJobData = { user: ThinUser, antenna: Antenna -} +}; export type DbUserImportToDbJobData = { user: ThinUser; @@ -161,4 +161,4 @@ export type ThinUser = { export type ScheduleNotePostJobData = { scheduleNoteId: MiNote['id']; -} +}; |