diff options
| author | syuilo <4439005+syuilo@users.noreply.github.com> | 2025-04-19 14:00:38 +0900 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-04-19 14:00:38 +0900 |
| commit | 7b38806413b84bd20070f3c81b899e5b890b1a8b (patch) | |
| tree | 2385b32546d410ff2e5e208793f7ad82ad6623ab /packages/backend/src/core | |
| parent | fix(storybook): implement missing stories (#15862) (diff) | |
| download | sharkey-7b38806413b84bd20070f3c81b899e5b890b1a8b.tar.gz sharkey-7b38806413b84bd20070f3c81b899e5b890b1a8b.tar.bz2 sharkey-7b38806413b84bd20070f3c81b899e5b890b1a8b.zip | |
feat: Job queue inspector (#15856)
* wip
* wip
* Update job-queue.vue
* wip
* wip
* Update job-queue.vue
* wip
* Update job-queue.vue
* wip
* Update QueueService.ts
* Update QueueService.ts
* Update QueueService.ts
* Update job-queue.vue
* wip
* wip
* wip
* Update job-queue.vue
* wip
* Update MkTl.vue
* wip
* Update index.vue
* wip
* wip
* Update MkTl.vue
* 🎨
* jobs search
* wip
* Update job-queue.vue
* wip
* wip
* Update job-queue.vue
* Update job-queue.vue
* Update job-queue.vue
* Update job-queue.vue
* wip
* Update job-queue.job.vue
* wip
* wip
* wip
* Update MkCode.vue
* wip
* Update job-queue.job.vue
* wip
* Update job-queue.job.vue
* Update misskey-js.api.md
* Update CHANGELOG.md
* Update job-queue.job.vue
Diffstat (limited to 'packages/backend/src/core')
| -rw-r--r-- | packages/backend/src/core/NoteCreateService.ts | 9 | ||||
| -rw-r--r-- | packages/backend/src/core/QueueService.ts | 470 |
2 files changed, 405 insertions, 74 deletions
diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index 1ddb2b173d..469426f87e 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -576,7 +576,14 @@ export class NoteCreateService implements OnApplicationShutdown { noteId: note.id, }, { delay, - removeOnComplete: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts index eb02355625..a1e806816b 100644 --- a/packages/backend/src/core/QueueService.ts +++ b/packages/backend/src/core/QueueService.ts @@ -5,6 +5,8 @@ import { randomUUID } from 'node:crypto'; import { Inject, Injectable } from '@nestjs/common'; +import { MetricsTime, type JobType } from 'bullmq'; +import { parse as parseRedisInfo } from 'redis-info'; import type { IActivity } from '@/core/activitypub/type.js'; import type { MiDriveFile } from '@/models/DriveFile.js'; import type { MiWebhook, WebhookEventTypes } from '@/models/Webhook.js'; @@ -69,50 +71,58 @@ export class QueueService { this.systemQueue.add('tickCharts', { }, { repeat: { pattern: '55 * * * *' }, - removeOnComplete: true, + removeOnComplete: 10, + removeOnFail: 30, }); this.systemQueue.add('resyncCharts', { }, { repeat: { pattern: '0 0 * * *' }, - removeOnComplete: true, + removeOnComplete: 10, + removeOnFail: 30, }); this.systemQueue.add('cleanCharts', { }, { repeat: { pattern: '0 0 * * *' }, - removeOnComplete: true, + removeOnComplete: 10, + removeOnFail: 30, }); this.systemQueue.add('aggregateRetention', { }, { repeat: { pattern: '0 0 * * *' }, - removeOnComplete: true, + removeOnComplete: 10, + removeOnFail: 30, }); this.systemQueue.add('clean', { }, { repeat: { pattern: '0 0 * * *' }, - removeOnComplete: true, + removeOnComplete: 10, + removeOnFail: 30, }); this.systemQueue.add('checkExpiredMutings', { }, { repeat: { pattern: '*/5 * * * *' }, - removeOnComplete: true, + removeOnComplete: 10, + removeOnFail: 30, }); this.systemQueue.add('bakeBufferedReactions', { }, { repeat: { pattern: '0 0 * * *' }, - removeOnComplete: true, + removeOnComplete: 10, + removeOnFail: 30, }); this.systemQueue.add('checkModeratorsActivity', { }, { // 毎時30分に起動 repeat: { pattern: '30 * * * *' }, - removeOnComplete: true, + removeOnComplete: 10, + removeOnFail: 30, }); } @@ -134,13 +144,21 @@ export class QueueService { isSharedInbox, }; - return this.deliverQueue.add(to, data, { + const label = to.replace('https://', '').replace('/inbox', ''); + + return this.deliverQueue.add(label, data, { attempts: this.config.deliverJobMaxAttempts ?? 12, backoff: { type: 'custom', }, - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -162,12 +180,18 @@ export class QueueService { backoff: { type: 'custom', }, - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }; await this.deliverQueue.addBulk(Array.from(inboxes.entries(), d => ({ - name: d[0], + name: d[0].replace('https://', '').replace('/inbox', ''), data: { user, content: contentBody, @@ -188,13 +212,21 @@ export class QueueService { signature, }; - return this.inboxQueue.add('', data, { + const label = (activity.id ?? '').replace('https://', '').replace('/activity', ''); + + return this.inboxQueue.add(label, data, { attempts: this.config.inboxJobMaxAttempts ?? 8, backoff: { type: 'custom', }, - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -203,8 +235,14 @@ export class QueueService { return this.dbQueue.add('deleteDriveFiles', { user: { id: user.id }, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -213,8 +251,14 @@ export class QueueService { return this.dbQueue.add('exportCustomEmojis', { user: { id: user.id }, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -223,8 +267,14 @@ export class QueueService { return this.dbQueue.add('exportNotes', { user: { id: user.id }, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -233,8 +283,14 @@ export class QueueService { return this.dbQueue.add('exportClips', { user: { id: user.id }, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -243,8 +299,14 @@ export class QueueService { return this.dbQueue.add('exportFavorites', { user: { id: user.id }, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -255,8 +317,14 @@ export class QueueService { excludeMuting, excludeInactive, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -265,8 +333,14 @@ export class QueueService { return this.dbQueue.add('exportMuting', { user: { id: user.id }, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -275,8 +349,14 @@ export class QueueService { return this.dbQueue.add('exportBlocking', { user: { id: user.id }, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -285,8 +365,14 @@ export class QueueService { return this.dbQueue.add('exportUserLists', { user: { id: user.id }, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -295,8 +381,14 @@ export class QueueService { return this.dbQueue.add('exportAntennas', { user: { id: user.id }, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -307,8 +399,14 @@ export class QueueService { fileId: fileId, withReplies, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -324,8 +422,14 @@ export class QueueService { user: { id: user.id }, fileId: fileId, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -335,8 +439,14 @@ export class QueueService { user: { id: user.id }, fileId: fileId, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -356,8 +466,14 @@ export class QueueService { name, data, opts: { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }, }; } @@ -368,8 +484,14 @@ export class QueueService { user: { id: user.id }, fileId: fileId, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -379,8 +501,14 @@ export class QueueService { user: { id: user.id }, fileId: fileId, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -390,8 +518,14 @@ export class QueueService { user: { id: user.id }, antenna, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -401,8 +535,14 @@ export class QueueService { user: { id: user.id }, soft: opts.soft, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -452,8 +592,14 @@ export class QueueService { withReplies: data.withReplies, }, opts: { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, ...opts, }, }; @@ -464,16 +610,28 @@ export class QueueService { return this.objectStorageQueue.add('deleteFile', { key: key, }, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @bindThis public createCleanRemoteFilesJob() { return this.objectStorageQueue.add('cleanRemoteFiles', {}, { - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -504,8 +662,14 @@ export class QueueService { backoff: { type: 'custom', }, - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @@ -535,13 +699,19 @@ export class QueueService { backoff: { type: 'custom', }, - removeOnComplete: true, - removeOnFail: true, + removeOnComplete: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 30, + }, + removeOnFail: { + age: 3600 * 24 * 7, // keep up to 7 days + count: 100, + }, }); } @bindThis - private getQueue(type: typeof QUEUE_TYPES[number]) { + private getQueue(type: typeof QUEUE_TYPES[number]): Bull.Queue { switch (type) { case 'system': return this.systemQueue; case 'endedPollNotification': return this.endedPollNotificationQueue; @@ -557,19 +727,173 @@ export class QueueService { } @bindThis - public clearQueue(queueType: typeof QUEUE_TYPES[number], state: '*' | 'completed' | 'wait' | 'active' | 'paused' | 'prioritized' | 'delayed' | 'failed') { + public async queueClear(queueType: typeof QUEUE_TYPES[number], state: '*' | 'completed' | 'wait' | 'active' | 'paused' | 'prioritized' | 'delayed' | 'failed') { const queue = this.getQueue(queueType); if (state === '*') { - queue.clean(0, 0, 'completed'); - queue.clean(0, 0, 'wait'); - queue.clean(0, 0, 'active'); - queue.clean(0, 0, 'paused'); - queue.clean(0, 0, 'prioritized'); - queue.clean(0, 0, 'delayed'); - queue.clean(0, 0, 'failed'); + await Promise.all([ + queue.clean(0, 0, 'completed'), + queue.clean(0, 0, 'wait'), + queue.clean(0, 0, 'active'), + queue.clean(0, 0, 'paused'), + queue.clean(0, 0, 'prioritized'), + queue.clean(0, 0, 'delayed'), + queue.clean(0, 0, 'failed'), + ]); + } else { + await queue.clean(0, 0, state); + } + } + + @bindThis + public async queuePromoteJobs(queueType: typeof QUEUE_TYPES[number]) { + const queue = this.getQueue(queueType); + await queue.promoteJobs(); + } + + @bindThis + public async queueRetryJob(queueType: typeof QUEUE_TYPES[number], jobId: string) { + const queue = this.getQueue(queueType); + const job: Bull.Job | null = await queue.getJob(jobId); + if (job) { + if (job.finishedOn != null) { + await job.retry(); + } else { + await job.promote(); + } + } + } + + @bindThis + public async queueRemoveJob(queueType: typeof QUEUE_TYPES[number], jobId: string) { + const queue = this.getQueue(queueType); + const job: Bull.Job | null = await queue.getJob(jobId); + if (job) { + await job.remove(); + } + } + + @bindThis + private packJobData(job: Bull.Job) { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + const stacktrace = job.stacktrace ? job.stacktrace.filter(Boolean) : []; + stacktrace.reverse(); + + return { + id: job.id, + name: job.name, + data: job.data, + opts: job.opts, + timestamp: job.timestamp, + processedOn: job.processedOn, + processedBy: job.processedBy, + finishedOn: job.finishedOn, + progress: job.progress, + attempts: job.attemptsMade, + delay: job.delay, + failedReason: job.failedReason, + stacktrace: stacktrace, + returnValue: job.returnvalue, + isFailed: !!job.failedReason || (Array.isArray(stacktrace) && stacktrace.length > 0), + }; + } + + @bindThis + public async queueGetJob(queueType: typeof QUEUE_TYPES[number], jobId: string) { + const queue = this.getQueue(queueType); + const job: Bull.Job | null = await queue.getJob(jobId); + if (job) { + return this.packJobData(job); } else { - queue.clean(0, 0, state); + throw new Error(`Job not found: ${jobId}`); } } + + @bindThis + public async queueGetJobs(queueType: typeof QUEUE_TYPES[number], jobTypes: JobType[], search?: string) { + const RETURN_LIMIT = 100; + const queue = this.getQueue(queueType); + let jobs: Bull.Job[]; + + if (search) { + jobs = await queue.getJobs(jobTypes, 0, 1000); + + jobs = jobs.filter(job => { + const jobString = JSON.stringify(job).toLowerCase(); + return search.toLowerCase().split(' ').every(term => { + return jobString.includes(term); + }); + }); + + jobs = jobs.slice(0, RETURN_LIMIT); + } else { + jobs = await queue.getJobs(jobTypes, 0, RETURN_LIMIT); + } + + return jobs.map(job => this.packJobData(job)); + } + + @bindThis + public async queueGetQueues() { + const fetchings = QUEUE_TYPES.map(async type => { + const queue = this.getQueue(type); + + const counts = await queue.getJobCounts(); + const isPaused = await queue.isPaused(); + const metrics_completed = await queue.getMetrics('completed', 0, MetricsTime.ONE_WEEK); + const metrics_failed = await queue.getMetrics('failed', 0, MetricsTime.ONE_WEEK); + + return { + name: type, + counts: counts, + isPaused, + metrics: { + completed: metrics_completed, + failed: metrics_failed, + }, + }; + }); + + return await Promise.all(fetchings); + } + + @bindThis + public async queueGetQueue(queueType: typeof QUEUE_TYPES[number]) { + const queue = this.getQueue(queueType); + const counts = await queue.getJobCounts(); + const isPaused = await queue.isPaused(); + const metrics_completed = await queue.getMetrics('completed', 0, MetricsTime.ONE_WEEK); + const metrics_failed = await queue.getMetrics('failed', 0, MetricsTime.ONE_WEEK); + const db = parseRedisInfo(await (await queue.client).info()); + + return { + name: queueType, + qualifiedName: queue.qualifiedName, + counts: counts, + isPaused, + metrics: { + completed: metrics_completed, + failed: metrics_failed, + }, + db: { + version: db.redis_version, + mode: db.redis_mode, + runId: db.run_id, + processId: db.process_id, + port: parseInt(db.tcp_port), + os: db.os, + uptime: parseInt(db.uptime_in_seconds), + memory: { + total: parseInt(db.total_system_memory) || parseInt(db.maxmemory), + used: parseInt(db.used_memory), + fragmentationRatio: parseInt(db.mem_fragmentation_ratio), + peak: parseInt(db.used_memory_peak), + }, + clients: { + connected: parseInt(db.connected_clients), + blocked: parseInt(db.blocked_clients), + }, + }, + }; + } } |