summaryrefslogtreecommitdiff
path: root/packages/backend/src/core/QueueService.ts
diff options
context:
space:
mode:
authorsyuilo <4439005+syuilo@users.noreply.github.com>2025-04-19 14:00:38 +0900
committerGitHub <noreply@github.com>2025-04-19 14:00:38 +0900
commit7b38806413b84bd20070f3c81b899e5b890b1a8b (patch)
tree2385b32546d410ff2e5e208793f7ad82ad6623ab /packages/backend/src/core/QueueService.ts
parentfix(storybook): implement missing stories (#15862) (diff)
downloadsharkey-7b38806413b84bd20070f3c81b899e5b890b1a8b.tar.gz
sharkey-7b38806413b84bd20070f3c81b899e5b890b1a8b.tar.bz2
sharkey-7b38806413b84bd20070f3c81b899e5b890b1a8b.zip
feat: Job queue inspector (#15856)
* wip * wip * Update job-queue.vue * wip * wip * Update job-queue.vue * wip * Update job-queue.vue * wip * Update QueueService.ts * Update QueueService.ts * Update QueueService.ts * Update job-queue.vue * wip * wip * wip * Update job-queue.vue * wip * Update MkTl.vue * wip * Update index.vue * wip * wip * Update MkTl.vue * 🎨 * jobs search * wip * Update job-queue.vue * wip * wip * Update job-queue.vue * Update job-queue.vue * Update job-queue.vue * Update job-queue.vue * wip * Update job-queue.job.vue * wip * wip * wip * Update MkCode.vue * wip * Update job-queue.job.vue * wip * Update job-queue.job.vue * Update misskey-js.api.md * Update CHANGELOG.md * Update job-queue.job.vue
Diffstat (limited to 'packages/backend/src/core/QueueService.ts')
-rw-r--r--packages/backend/src/core/QueueService.ts470
1 files changed, 397 insertions, 73 deletions
diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts
index eb02355625..a1e806816b 100644
--- a/packages/backend/src/core/QueueService.ts
+++ b/packages/backend/src/core/QueueService.ts
@@ -5,6 +5,8 @@
import { randomUUID } from 'node:crypto';
import { Inject, Injectable } from '@nestjs/common';
+import { MetricsTime, type JobType } from 'bullmq';
+import { parse as parseRedisInfo } from 'redis-info';
import type { IActivity } from '@/core/activitypub/type.js';
import type { MiDriveFile } from '@/models/DriveFile.js';
import type { MiWebhook, WebhookEventTypes } from '@/models/Webhook.js';
@@ -69,50 +71,58 @@ export class QueueService {
this.systemQueue.add('tickCharts', {
}, {
repeat: { pattern: '55 * * * *' },
- removeOnComplete: true,
+ removeOnComplete: 10,
+ removeOnFail: 30,
});
this.systemQueue.add('resyncCharts', {
}, {
repeat: { pattern: '0 0 * * *' },
- removeOnComplete: true,
+ removeOnComplete: 10,
+ removeOnFail: 30,
});
this.systemQueue.add('cleanCharts', {
}, {
repeat: { pattern: '0 0 * * *' },
- removeOnComplete: true,
+ removeOnComplete: 10,
+ removeOnFail: 30,
});
this.systemQueue.add('aggregateRetention', {
}, {
repeat: { pattern: '0 0 * * *' },
- removeOnComplete: true,
+ removeOnComplete: 10,
+ removeOnFail: 30,
});
this.systemQueue.add('clean', {
}, {
repeat: { pattern: '0 0 * * *' },
- removeOnComplete: true,
+ removeOnComplete: 10,
+ removeOnFail: 30,
});
this.systemQueue.add('checkExpiredMutings', {
}, {
repeat: { pattern: '*/5 * * * *' },
- removeOnComplete: true,
+ removeOnComplete: 10,
+ removeOnFail: 30,
});
this.systemQueue.add('bakeBufferedReactions', {
}, {
repeat: { pattern: '0 0 * * *' },
- removeOnComplete: true,
+ removeOnComplete: 10,
+ removeOnFail: 30,
});
this.systemQueue.add('checkModeratorsActivity', {
}, {
// 毎時30分に起動
repeat: { pattern: '30 * * * *' },
- removeOnComplete: true,
+ removeOnComplete: 10,
+ removeOnFail: 30,
});
}
@@ -134,13 +144,21 @@ export class QueueService {
isSharedInbox,
};
- return this.deliverQueue.add(to, data, {
+ const label = to.replace('https://', '').replace('/inbox', '');
+
+ return this.deliverQueue.add(label, data, {
attempts: this.config.deliverJobMaxAttempts ?? 12,
backoff: {
type: 'custom',
},
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -162,12 +180,18 @@ export class QueueService {
backoff: {
type: 'custom',
},
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
};
await this.deliverQueue.addBulk(Array.from(inboxes.entries(), d => ({
- name: d[0],
+ name: d[0].replace('https://', '').replace('/inbox', ''),
data: {
user,
content: contentBody,
@@ -188,13 +212,21 @@ export class QueueService {
signature,
};
- return this.inboxQueue.add('', data, {
+ const label = (activity.id ?? '').replace('https://', '').replace('/activity', '');
+
+ return this.inboxQueue.add(label, data, {
attempts: this.config.inboxJobMaxAttempts ?? 8,
backoff: {
type: 'custom',
},
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -203,8 +235,14 @@ export class QueueService {
return this.dbQueue.add('deleteDriveFiles', {
user: { id: user.id },
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -213,8 +251,14 @@ export class QueueService {
return this.dbQueue.add('exportCustomEmojis', {
user: { id: user.id },
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -223,8 +267,14 @@ export class QueueService {
return this.dbQueue.add('exportNotes', {
user: { id: user.id },
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -233,8 +283,14 @@ export class QueueService {
return this.dbQueue.add('exportClips', {
user: { id: user.id },
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -243,8 +299,14 @@ export class QueueService {
return this.dbQueue.add('exportFavorites', {
user: { id: user.id },
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -255,8 +317,14 @@ export class QueueService {
excludeMuting,
excludeInactive,
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -265,8 +333,14 @@ export class QueueService {
return this.dbQueue.add('exportMuting', {
user: { id: user.id },
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -275,8 +349,14 @@ export class QueueService {
return this.dbQueue.add('exportBlocking', {
user: { id: user.id },
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -285,8 +365,14 @@ export class QueueService {
return this.dbQueue.add('exportUserLists', {
user: { id: user.id },
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -295,8 +381,14 @@ export class QueueService {
return this.dbQueue.add('exportAntennas', {
user: { id: user.id },
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -307,8 +399,14 @@ export class QueueService {
fileId: fileId,
withReplies,
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -324,8 +422,14 @@ export class QueueService {
user: { id: user.id },
fileId: fileId,
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -335,8 +439,14 @@ export class QueueService {
user: { id: user.id },
fileId: fileId,
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -356,8 +466,14 @@ export class QueueService {
name,
data,
opts: {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
},
};
}
@@ -368,8 +484,14 @@ export class QueueService {
user: { id: user.id },
fileId: fileId,
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -379,8 +501,14 @@ export class QueueService {
user: { id: user.id },
fileId: fileId,
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -390,8 +518,14 @@ export class QueueService {
user: { id: user.id },
antenna,
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -401,8 +535,14 @@ export class QueueService {
user: { id: user.id },
soft: opts.soft,
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -452,8 +592,14 @@ export class QueueService {
withReplies: data.withReplies,
},
opts: {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
...opts,
},
};
@@ -464,16 +610,28 @@ export class QueueService {
return this.objectStorageQueue.add('deleteFile', {
key: key,
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@bindThis
public createCleanRemoteFilesJob() {
return this.objectStorageQueue.add('cleanRemoteFiles', {}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -504,8 +662,14 @@ export class QueueService {
backoff: {
type: 'custom',
},
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -535,13 +699,19 @@ export class QueueService {
backoff: {
type: 'custom',
},
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@bindThis
- private getQueue(type: typeof QUEUE_TYPES[number]) {
+ private getQueue(type: typeof QUEUE_TYPES[number]): Bull.Queue {
switch (type) {
case 'system': return this.systemQueue;
case 'endedPollNotification': return this.endedPollNotificationQueue;
@@ -557,19 +727,173 @@ export class QueueService {
}
@bindThis
- public clearQueue(queueType: typeof QUEUE_TYPES[number], state: '*' | 'completed' | 'wait' | 'active' | 'paused' | 'prioritized' | 'delayed' | 'failed') {
+ public async queueClear(queueType: typeof QUEUE_TYPES[number], state: '*' | 'completed' | 'wait' | 'active' | 'paused' | 'prioritized' | 'delayed' | 'failed') {
const queue = this.getQueue(queueType);
if (state === '*') {
- queue.clean(0, 0, 'completed');
- queue.clean(0, 0, 'wait');
- queue.clean(0, 0, 'active');
- queue.clean(0, 0, 'paused');
- queue.clean(0, 0, 'prioritized');
- queue.clean(0, 0, 'delayed');
- queue.clean(0, 0, 'failed');
+ await Promise.all([
+ queue.clean(0, 0, 'completed'),
+ queue.clean(0, 0, 'wait'),
+ queue.clean(0, 0, 'active'),
+ queue.clean(0, 0, 'paused'),
+ queue.clean(0, 0, 'prioritized'),
+ queue.clean(0, 0, 'delayed'),
+ queue.clean(0, 0, 'failed'),
+ ]);
+ } else {
+ await queue.clean(0, 0, state);
+ }
+ }
+
+ @bindThis
+ public async queuePromoteJobs(queueType: typeof QUEUE_TYPES[number]) {
+ const queue = this.getQueue(queueType);
+ await queue.promoteJobs();
+ }
+
+ @bindThis
+ public async queueRetryJob(queueType: typeof QUEUE_TYPES[number], jobId: string) {
+ const queue = this.getQueue(queueType);
+ const job: Bull.Job | null = await queue.getJob(jobId);
+ if (job) {
+ if (job.finishedOn != null) {
+ await job.retry();
+ } else {
+ await job.promote();
+ }
+ }
+ }
+
+ @bindThis
+ public async queueRemoveJob(queueType: typeof QUEUE_TYPES[number], jobId: string) {
+ const queue = this.getQueue(queueType);
+ const job: Bull.Job | null = await queue.getJob(jobId);
+ if (job) {
+ await job.remove();
+ }
+ }
+
+ @bindThis
+ private packJobData(job: Bull.Job) {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ const stacktrace = job.stacktrace ? job.stacktrace.filter(Boolean) : [];
+ stacktrace.reverse();
+
+ return {
+ id: job.id,
+ name: job.name,
+ data: job.data,
+ opts: job.opts,
+ timestamp: job.timestamp,
+ processedOn: job.processedOn,
+ processedBy: job.processedBy,
+ finishedOn: job.finishedOn,
+ progress: job.progress,
+ attempts: job.attemptsMade,
+ delay: job.delay,
+ failedReason: job.failedReason,
+ stacktrace: stacktrace,
+ returnValue: job.returnvalue,
+ isFailed: !!job.failedReason || (Array.isArray(stacktrace) && stacktrace.length > 0),
+ };
+ }
+
+ @bindThis
+ public async queueGetJob(queueType: typeof QUEUE_TYPES[number], jobId: string) {
+ const queue = this.getQueue(queueType);
+ const job: Bull.Job | null = await queue.getJob(jobId);
+ if (job) {
+ return this.packJobData(job);
} else {
- queue.clean(0, 0, state);
+ throw new Error(`Job not found: ${jobId}`);
}
}
+
+ @bindThis
+ public async queueGetJobs(queueType: typeof QUEUE_TYPES[number], jobTypes: JobType[], search?: string) {
+ const RETURN_LIMIT = 100;
+ const queue = this.getQueue(queueType);
+ let jobs: Bull.Job[];
+
+ if (search) {
+ jobs = await queue.getJobs(jobTypes, 0, 1000);
+
+ jobs = jobs.filter(job => {
+ const jobString = JSON.stringify(job).toLowerCase();
+ return search.toLowerCase().split(' ').every(term => {
+ return jobString.includes(term);
+ });
+ });
+
+ jobs = jobs.slice(0, RETURN_LIMIT);
+ } else {
+ jobs = await queue.getJobs(jobTypes, 0, RETURN_LIMIT);
+ }
+
+ return jobs.map(job => this.packJobData(job));
+ }
+
+ @bindThis
+ public async queueGetQueues() {
+ const fetchings = QUEUE_TYPES.map(async type => {
+ const queue = this.getQueue(type);
+
+ const counts = await queue.getJobCounts();
+ const isPaused = await queue.isPaused();
+ const metrics_completed = await queue.getMetrics('completed', 0, MetricsTime.ONE_WEEK);
+ const metrics_failed = await queue.getMetrics('failed', 0, MetricsTime.ONE_WEEK);
+
+ return {
+ name: type,
+ counts: counts,
+ isPaused,
+ metrics: {
+ completed: metrics_completed,
+ failed: metrics_failed,
+ },
+ };
+ });
+
+ return await Promise.all(fetchings);
+ }
+
+ @bindThis
+ public async queueGetQueue(queueType: typeof QUEUE_TYPES[number]) {
+ const queue = this.getQueue(queueType);
+ const counts = await queue.getJobCounts();
+ const isPaused = await queue.isPaused();
+ const metrics_completed = await queue.getMetrics('completed', 0, MetricsTime.ONE_WEEK);
+ const metrics_failed = await queue.getMetrics('failed', 0, MetricsTime.ONE_WEEK);
+ const db = parseRedisInfo(await (await queue.client).info());
+
+ return {
+ name: queueType,
+ qualifiedName: queue.qualifiedName,
+ counts: counts,
+ isPaused,
+ metrics: {
+ completed: metrics_completed,
+ failed: metrics_failed,
+ },
+ db: {
+ version: db.redis_version,
+ mode: db.redis_mode,
+ runId: db.run_id,
+ processId: db.process_id,
+ port: parseInt(db.tcp_port),
+ os: db.os,
+ uptime: parseInt(db.uptime_in_seconds),
+ memory: {
+ total: parseInt(db.total_system_memory) || parseInt(db.maxmemory),
+ used: parseInt(db.used_memory),
+ fragmentationRatio: parseInt(db.mem_fragmentation_ratio),
+ peak: parseInt(db.used_memory_peak),
+ },
+ clients: {
+ connected: parseInt(db.connected_clients),
+ blocked: parseInt(db.blocked_clients),
+ },
+ },
+ };
+ }
}