summaryrefslogtreecommitdiff
path: root/packages/backend
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend')
-rw-r--r--packages/backend/package.json2
-rw-r--r--packages/backend/src/core/NoteCreateService.ts9
-rw-r--r--packages/backend/src/core/QueueService.ts470
-rw-r--r--packages/backend/src/queue/QueueProcessorService.ts20
-rw-r--r--packages/backend/src/queue/const.ts10
-rw-r--r--packages/backend/src/server/api/endpoint-list.ts8
-rw-r--r--packages/backend/src/server/api/endpoints/admin/queue/clear.ts8
-rw-r--r--packages/backend/src/server/api/endpoints/admin/queue/jobs.ts38
-rw-r--r--packages/backend/src/server/api/endpoints/admin/queue/promote-jobs.ts39
-rw-r--r--packages/backend/src/server/api/endpoints/admin/queue/promote.ts77
-rw-r--r--packages/backend/src/server/api/endpoints/admin/queue/queue-stats.ts36
-rw-r--r--packages/backend/src/server/api/endpoints/admin/queue/queues.ts35
-rw-r--r--packages/backend/src/server/api/endpoints/admin/queue/remove-job.ts38
-rw-r--r--packages/backend/src/server/api/endpoints/admin/queue/retry-job.ts38
-rw-r--r--packages/backend/src/server/api/endpoints/admin/queue/show-job.ts38
15 files changed, 700 insertions, 166 deletions
diff --git a/packages/backend/package.json b/packages/backend/package.json
index 5e8529c422..e8b3d74aa7 100644
--- a/packages/backend/package.json
+++ b/packages/backend/package.json
@@ -93,6 +93,7 @@
"@swc/cli": "0.6.0",
"@swc/core": "1.11.18",
"@twemoji/parser": "15.1.1",
+ "@types/redis-info": "3.0.3",
"accepts": "1.3.8",
"ajv": "8.17.1",
"archiver": "7.0.1",
@@ -159,6 +160,7 @@
"random-seed": "0.3.0",
"ratelimiter": "3.4.1",
"re2": "1.21.4",
+ "redis-info": "3.1.0",
"redis-lock": "0.1.4",
"reflect-metadata": "0.2.2",
"rename": "1.0.4",
diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts
index 1ddb2b173d..469426f87e 100644
--- a/packages/backend/src/core/NoteCreateService.ts
+++ b/packages/backend/src/core/NoteCreateService.ts
@@ -576,7 +576,14 @@ export class NoteCreateService implements OnApplicationShutdown {
noteId: note.id,
}, {
delay,
- removeOnComplete: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts
index eb02355625..a1e806816b 100644
--- a/packages/backend/src/core/QueueService.ts
+++ b/packages/backend/src/core/QueueService.ts
@@ -5,6 +5,8 @@
import { randomUUID } from 'node:crypto';
import { Inject, Injectable } from '@nestjs/common';
+import { MetricsTime, type JobType } from 'bullmq';
+import { parse as parseRedisInfo } from 'redis-info';
import type { IActivity } from '@/core/activitypub/type.js';
import type { MiDriveFile } from '@/models/DriveFile.js';
import type { MiWebhook, WebhookEventTypes } from '@/models/Webhook.js';
@@ -69,50 +71,58 @@ export class QueueService {
this.systemQueue.add('tickCharts', {
}, {
repeat: { pattern: '55 * * * *' },
- removeOnComplete: true,
+ removeOnComplete: 10,
+ removeOnFail: 30,
});
this.systemQueue.add('resyncCharts', {
}, {
repeat: { pattern: '0 0 * * *' },
- removeOnComplete: true,
+ removeOnComplete: 10,
+ removeOnFail: 30,
});
this.systemQueue.add('cleanCharts', {
}, {
repeat: { pattern: '0 0 * * *' },
- removeOnComplete: true,
+ removeOnComplete: 10,
+ removeOnFail: 30,
});
this.systemQueue.add('aggregateRetention', {
}, {
repeat: { pattern: '0 0 * * *' },
- removeOnComplete: true,
+ removeOnComplete: 10,
+ removeOnFail: 30,
});
this.systemQueue.add('clean', {
}, {
repeat: { pattern: '0 0 * * *' },
- removeOnComplete: true,
+ removeOnComplete: 10,
+ removeOnFail: 30,
});
this.systemQueue.add('checkExpiredMutings', {
}, {
repeat: { pattern: '*/5 * * * *' },
- removeOnComplete: true,
+ removeOnComplete: 10,
+ removeOnFail: 30,
});
this.systemQueue.add('bakeBufferedReactions', {
}, {
repeat: { pattern: '0 0 * * *' },
- removeOnComplete: true,
+ removeOnComplete: 10,
+ removeOnFail: 30,
});
this.systemQueue.add('checkModeratorsActivity', {
}, {
// 毎時30分に起動
repeat: { pattern: '30 * * * *' },
- removeOnComplete: true,
+ removeOnComplete: 10,
+ removeOnFail: 30,
});
}
@@ -134,13 +144,21 @@ export class QueueService {
isSharedInbox,
};
- return this.deliverQueue.add(to, data, {
+ const label = to.replace('https://', '').replace('/inbox', '');
+
+ return this.deliverQueue.add(label, data, {
attempts: this.config.deliverJobMaxAttempts ?? 12,
backoff: {
type: 'custom',
},
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -162,12 +180,18 @@ export class QueueService {
backoff: {
type: 'custom',
},
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
};
await this.deliverQueue.addBulk(Array.from(inboxes.entries(), d => ({
- name: d[0],
+ name: d[0].replace('https://', '').replace('/inbox', ''),
data: {
user,
content: contentBody,
@@ -188,13 +212,21 @@ export class QueueService {
signature,
};
- return this.inboxQueue.add('', data, {
+ const label = (activity.id ?? '').replace('https://', '').replace('/activity', '');
+
+ return this.inboxQueue.add(label, data, {
attempts: this.config.inboxJobMaxAttempts ?? 8,
backoff: {
type: 'custom',
},
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -203,8 +235,14 @@ export class QueueService {
return this.dbQueue.add('deleteDriveFiles', {
user: { id: user.id },
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -213,8 +251,14 @@ export class QueueService {
return this.dbQueue.add('exportCustomEmojis', {
user: { id: user.id },
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -223,8 +267,14 @@ export class QueueService {
return this.dbQueue.add('exportNotes', {
user: { id: user.id },
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -233,8 +283,14 @@ export class QueueService {
return this.dbQueue.add('exportClips', {
user: { id: user.id },
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -243,8 +299,14 @@ export class QueueService {
return this.dbQueue.add('exportFavorites', {
user: { id: user.id },
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -255,8 +317,14 @@ export class QueueService {
excludeMuting,
excludeInactive,
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -265,8 +333,14 @@ export class QueueService {
return this.dbQueue.add('exportMuting', {
user: { id: user.id },
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -275,8 +349,14 @@ export class QueueService {
return this.dbQueue.add('exportBlocking', {
user: { id: user.id },
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -285,8 +365,14 @@ export class QueueService {
return this.dbQueue.add('exportUserLists', {
user: { id: user.id },
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -295,8 +381,14 @@ export class QueueService {
return this.dbQueue.add('exportAntennas', {
user: { id: user.id },
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -307,8 +399,14 @@ export class QueueService {
fileId: fileId,
withReplies,
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -324,8 +422,14 @@ export class QueueService {
user: { id: user.id },
fileId: fileId,
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -335,8 +439,14 @@ export class QueueService {
user: { id: user.id },
fileId: fileId,
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -356,8 +466,14 @@ export class QueueService {
name,
data,
opts: {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
},
};
}
@@ -368,8 +484,14 @@ export class QueueService {
user: { id: user.id },
fileId: fileId,
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -379,8 +501,14 @@ export class QueueService {
user: { id: user.id },
fileId: fileId,
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -390,8 +518,14 @@ export class QueueService {
user: { id: user.id },
antenna,
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -401,8 +535,14 @@ export class QueueService {
user: { id: user.id },
soft: opts.soft,
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -452,8 +592,14 @@ export class QueueService {
withReplies: data.withReplies,
},
opts: {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
...opts,
},
};
@@ -464,16 +610,28 @@ export class QueueService {
return this.objectStorageQueue.add('deleteFile', {
key: key,
}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@bindThis
public createCleanRemoteFilesJob() {
return this.objectStorageQueue.add('cleanRemoteFiles', {}, {
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -504,8 +662,14 @@ export class QueueService {
backoff: {
type: 'custom',
},
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@@ -535,13 +699,19 @@ export class QueueService {
backoff: {
type: 'custom',
},
- removeOnComplete: true,
- removeOnFail: true,
+ removeOnComplete: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 30,
+ },
+ removeOnFail: {
+ age: 3600 * 24 * 7, // keep up to 7 days
+ count: 100,
+ },
});
}
@bindThis
- private getQueue(type: typeof QUEUE_TYPES[number]) {
+ private getQueue(type: typeof QUEUE_TYPES[number]): Bull.Queue {
switch (type) {
case 'system': return this.systemQueue;
case 'endedPollNotification': return this.endedPollNotificationQueue;
@@ -557,19 +727,173 @@ export class QueueService {
}
@bindThis
- public clearQueue(queueType: typeof QUEUE_TYPES[number], state: '*' | 'completed' | 'wait' | 'active' | 'paused' | 'prioritized' | 'delayed' | 'failed') {
+ public async queueClear(queueType: typeof QUEUE_TYPES[number], state: '*' | 'completed' | 'wait' | 'active' | 'paused' | 'prioritized' | 'delayed' | 'failed') {
const queue = this.getQueue(queueType);
if (state === '*') {
- queue.clean(0, 0, 'completed');
- queue.clean(0, 0, 'wait');
- queue.clean(0, 0, 'active');
- queue.clean(0, 0, 'paused');
- queue.clean(0, 0, 'prioritized');
- queue.clean(0, 0, 'delayed');
- queue.clean(0, 0, 'failed');
+ await Promise.all([
+ queue.clean(0, 0, 'completed'),
+ queue.clean(0, 0, 'wait'),
+ queue.clean(0, 0, 'active'),
+ queue.clean(0, 0, 'paused'),
+ queue.clean(0, 0, 'prioritized'),
+ queue.clean(0, 0, 'delayed'),
+ queue.clean(0, 0, 'failed'),
+ ]);
+ } else {
+ await queue.clean(0, 0, state);
+ }
+ }
+
+ @bindThis
+ public async queuePromoteJobs(queueType: typeof QUEUE_TYPES[number]) {
+ const queue = this.getQueue(queueType);
+ await queue.promoteJobs();
+ }
+
+ @bindThis
+ public async queueRetryJob(queueType: typeof QUEUE_TYPES[number], jobId: string) {
+ const queue = this.getQueue(queueType);
+ const job: Bull.Job | null = await queue.getJob(jobId);
+ if (job) {
+ if (job.finishedOn != null) {
+ await job.retry();
+ } else {
+ await job.promote();
+ }
+ }
+ }
+
+ @bindThis
+ public async queueRemoveJob(queueType: typeof QUEUE_TYPES[number], jobId: string) {
+ const queue = this.getQueue(queueType);
+ const job: Bull.Job | null = await queue.getJob(jobId);
+ if (job) {
+ await job.remove();
+ }
+ }
+
+ @bindThis
+ private packJobData(job: Bull.Job) {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ const stacktrace = job.stacktrace ? job.stacktrace.filter(Boolean) : [];
+ stacktrace.reverse();
+
+ return {
+ id: job.id,
+ name: job.name,
+ data: job.data,
+ opts: job.opts,
+ timestamp: job.timestamp,
+ processedOn: job.processedOn,
+ processedBy: job.processedBy,
+ finishedOn: job.finishedOn,
+ progress: job.progress,
+ attempts: job.attemptsMade,
+ delay: job.delay,
+ failedReason: job.failedReason,
+ stacktrace: stacktrace,
+ returnValue: job.returnvalue,
+ isFailed: !!job.failedReason || (Array.isArray(stacktrace) && stacktrace.length > 0),
+ };
+ }
+
+ @bindThis
+ public async queueGetJob(queueType: typeof QUEUE_TYPES[number], jobId: string) {
+ const queue = this.getQueue(queueType);
+ const job: Bull.Job | null = await queue.getJob(jobId);
+ if (job) {
+ return this.packJobData(job);
} else {
- queue.clean(0, 0, state);
+ throw new Error(`Job not found: ${jobId}`);
}
}
+
+ @bindThis
+ public async queueGetJobs(queueType: typeof QUEUE_TYPES[number], jobTypes: JobType[], search?: string) {
+ const RETURN_LIMIT = 100;
+ const queue = this.getQueue(queueType);
+ let jobs: Bull.Job[];
+
+ if (search) {
+ jobs = await queue.getJobs(jobTypes, 0, 1000);
+
+ jobs = jobs.filter(job => {
+ const jobString = JSON.stringify(job).toLowerCase();
+ return search.toLowerCase().split(' ').every(term => {
+ return jobString.includes(term);
+ });
+ });
+
+ jobs = jobs.slice(0, RETURN_LIMIT);
+ } else {
+ jobs = await queue.getJobs(jobTypes, 0, RETURN_LIMIT);
+ }
+
+ return jobs.map(job => this.packJobData(job));
+ }
+
+ @bindThis
+ public async queueGetQueues() {
+ const fetchings = QUEUE_TYPES.map(async type => {
+ const queue = this.getQueue(type);
+
+ const counts = await queue.getJobCounts();
+ const isPaused = await queue.isPaused();
+ const metrics_completed = await queue.getMetrics('completed', 0, MetricsTime.ONE_WEEK);
+ const metrics_failed = await queue.getMetrics('failed', 0, MetricsTime.ONE_WEEK);
+
+ return {
+ name: type,
+ counts: counts,
+ isPaused,
+ metrics: {
+ completed: metrics_completed,
+ failed: metrics_failed,
+ },
+ };
+ });
+
+ return await Promise.all(fetchings);
+ }
+
+ @bindThis
+ public async queueGetQueue(queueType: typeof QUEUE_TYPES[number]) {
+ const queue = this.getQueue(queueType);
+ const counts = await queue.getJobCounts();
+ const isPaused = await queue.isPaused();
+ const metrics_completed = await queue.getMetrics('completed', 0, MetricsTime.ONE_WEEK);
+ const metrics_failed = await queue.getMetrics('failed', 0, MetricsTime.ONE_WEEK);
+ const db = parseRedisInfo(await (await queue.client).info());
+
+ return {
+ name: queueType,
+ qualifiedName: queue.qualifiedName,
+ counts: counts,
+ isPaused,
+ metrics: {
+ completed: metrics_completed,
+ failed: metrics_failed,
+ },
+ db: {
+ version: db.redis_version,
+ mode: db.redis_mode,
+ runId: db.run_id,
+ processId: db.process_id,
+ port: parseInt(db.tcp_port),
+ os: db.os,
+ uptime: parseInt(db.uptime_in_seconds),
+ memory: {
+ total: parseInt(db.total_system_memory) || parseInt(db.maxmemory),
+ used: parseInt(db.used_memory),
+ fragmentationRatio: parseInt(db.mem_fragmentation_ratio),
+ peak: parseInt(db.used_memory_peak),
+ },
+ clients: {
+ connected: parseInt(db.connected_clients),
+ blocked: parseInt(db.blocked_clients),
+ },
+ },
+ };
+ }
}
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts
index 6940e1c188..c98ebcdcd9 100644
--- a/packages/backend/src/queue/QueueProcessorService.ts
+++ b/packages/backend/src/queue/QueueProcessorService.ts
@@ -44,7 +44,7 @@ import { BakeBufferedReactionsProcessorService } from './processors/BakeBuffered
import { CleanProcessorService } from './processors/CleanProcessorService.js';
import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
import { QueueLoggerService } from './QueueLoggerService.js';
-import { QUEUE, baseQueueOptions } from './const.js';
+import { QUEUE, baseWorkerOptions } from './const.js';
// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
function httpRelatedBackoff(attemptsMade: number) {
@@ -175,7 +175,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return processer(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.SYSTEM),
+ ...baseWorkerOptions(this.config, QUEUE.SYSTEM),
autorun: false,
});
@@ -232,7 +232,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return processer(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.DB),
+ ...baseWorkerOptions(this.config, QUEUE.DB),
autorun: false,
});
@@ -264,7 +264,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return this.deliverProcessorService.process(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.DELIVER),
+ ...baseWorkerOptions(this.config, QUEUE.DELIVER),
autorun: false,
concurrency: this.config.deliverJobConcurrency ?? 128,
limiter: {
@@ -304,7 +304,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return this.inboxProcessorService.process(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.INBOX),
+ ...baseWorkerOptions(this.config, QUEUE.INBOX),
autorun: false,
concurrency: this.config.inboxJobConcurrency ?? 16,
limiter: {
@@ -344,7 +344,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return this.userWebhookDeliverProcessorService.process(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.USER_WEBHOOK_DELIVER),
+ ...baseWorkerOptions(this.config, QUEUE.USER_WEBHOOK_DELIVER),
autorun: false,
concurrency: 64,
limiter: {
@@ -384,7 +384,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return this.systemWebhookDeliverProcessorService.process(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.SYSTEM_WEBHOOK_DELIVER),
+ ...baseWorkerOptions(this.config, QUEUE.SYSTEM_WEBHOOK_DELIVER),
autorun: false,
concurrency: 16,
limiter: {
@@ -434,7 +434,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return processer(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.RELATIONSHIP),
+ ...baseWorkerOptions(this.config, QUEUE.RELATIONSHIP),
autorun: false,
concurrency: this.config.relationshipJobConcurrency ?? 16,
limiter: {
@@ -479,7 +479,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return processer(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.OBJECT_STORAGE),
+ ...baseWorkerOptions(this.config, QUEUE.OBJECT_STORAGE),
autorun: false,
concurrency: 16,
});
@@ -512,7 +512,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return this.endedPollNotificationProcessorService.process(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION),
+ ...baseWorkerOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION),
autorun: false,
});
}
diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts
index 67f689b618..7e146a7e03 100644
--- a/packages/backend/src/queue/const.ts
+++ b/packages/backend/src/queue/const.ts
@@ -3,6 +3,7 @@
* SPDX-License-Identifier: AGPL-3.0-only
*/
+import { MetricsTime } from 'bullmq';
import { Config } from '@/config.js';
import type * as Bull from 'bullmq';
@@ -27,3 +28,12 @@ export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof t
prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue:${queueName}` : `queue:${queueName}`,
};
}
+
+export function baseWorkerOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.WorkerOptions {
+ return {
+ ...baseQueueOptions(config, queueName),
+ metrics: {
+ maxDataPoints: MetricsTime.ONE_WEEK,
+ },
+ };
+}
diff --git a/packages/backend/src/server/api/endpoint-list.ts b/packages/backend/src/server/api/endpoint-list.ts
index 34aaef3cc7..e5170aa2dc 100644
--- a/packages/backend/src/server/api/endpoint-list.ts
+++ b/packages/backend/src/server/api/endpoint-list.ts
@@ -67,8 +67,14 @@ export * as 'admin/promo/create' from './endpoints/admin/promo/create.js';
export * as 'admin/queue/clear' from './endpoints/admin/queue/clear.js';
export * as 'admin/queue/deliver-delayed' from './endpoints/admin/queue/deliver-delayed.js';
export * as 'admin/queue/inbox-delayed' from './endpoints/admin/queue/inbox-delayed.js';
-export * as 'admin/queue/promote' from './endpoints/admin/queue/promote.js';
+export * as 'admin/queue/retry-job' from './endpoints/admin/queue/retry-job.js';
+export * as 'admin/queue/remove-job' from './endpoints/admin/queue/remove-job.js';
+export * as 'admin/queue/show-job' from './endpoints/admin/queue/show-job.js';
+export * as 'admin/queue/promote-jobs' from './endpoints/admin/queue/promote-jobs.js';
+export * as 'admin/queue/jobs' from './endpoints/admin/queue/jobs.js';
export * as 'admin/queue/stats' from './endpoints/admin/queue/stats.js';
+export * as 'admin/queue/queues' from './endpoints/admin/queue/queues.js';
+export * as 'admin/queue/queue-stats' from './endpoints/admin/queue/queue-stats.js';
export * as 'admin/relays/add' from './endpoints/admin/relays/add.js';
export * as 'admin/relays/list' from './endpoints/admin/relays/list.js';
export * as 'admin/relays/remove' from './endpoints/admin/relays/remove.js';
diff --git a/packages/backend/src/server/api/endpoints/admin/queue/clear.ts b/packages/backend/src/server/api/endpoints/admin/queue/clear.ts
index 3978f14f2d..81cb4b8119 100644
--- a/packages/backend/src/server/api/endpoints/admin/queue/clear.ts
+++ b/packages/backend/src/server/api/endpoints/admin/queue/clear.ts
@@ -19,10 +19,10 @@ export const meta = {
export const paramDef = {
type: 'object',
properties: {
- type: { type: 'string', enum: QUEUE_TYPES },
- state: { type: 'string', enum: ['*', 'wait', 'delayed'] },
+ queue: { type: 'string', enum: QUEUE_TYPES },
+ state: { type: 'string', enum: ['*', 'completed', 'wait', 'active', 'paused', 'prioritized', 'delayed', 'failed'] },
},
- required: ['type', 'state'],
+ required: ['queue', 'state'],
} as const;
@Injectable()
@@ -32,7 +32,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
private queueService: QueueService,
) {
super(meta, paramDef, async (ps, me) => {
- this.queueService.clearQueue(ps.type, ps.state);
+ this.queueService.queueClear(ps.queue, ps.state);
this.moderationLogService.log(me, 'clearQueue');
});
diff --git a/packages/backend/src/server/api/endpoints/admin/queue/jobs.ts b/packages/backend/src/server/api/endpoints/admin/queue/jobs.ts
new file mode 100644
index 0000000000..79731c9786
--- /dev/null
+++ b/packages/backend/src/server/api/endpoints/admin/queue/jobs.ts
@@ -0,0 +1,38 @@
+/*
+ * SPDX-FileCopyrightText: syuilo and misskey-project
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+import { Injectable } from '@nestjs/common';
+import { Endpoint } from '@/server/api/endpoint-base.js';
+import { ModerationLogService } from '@/core/ModerationLogService.js';
+import { QUEUE_TYPES, QueueService } from '@/core/QueueService.js';
+
+export const meta = {
+ tags: ['admin'],
+
+ requireCredential: true,
+ requireModerator: true,
+ kind: 'read:admin:queue',
+} as const;
+
+export const paramDef = {
+ type: 'object',
+ properties: {
+ queue: { type: 'string', enum: QUEUE_TYPES },
+ state: { type: 'array', items: { type: 'string', enum: ['active', 'wait', 'delayed', 'completed', 'failed'] } },
+ search: { type: 'string' },
+ },
+ required: ['queue', 'state'],
+} as const;
+
+@Injectable()
+export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-disable-line import/no-default-export
+ constructor(
+ private queueService: QueueService,
+ ) {
+ super(meta, paramDef, async (ps, me) => {
+ return this.queueService.queueGetJobs(ps.queue, ps.state, ps.search);
+ });
+ }
+}
diff --git a/packages/backend/src/server/api/endpoints/admin/queue/promote-jobs.ts b/packages/backend/src/server/api/endpoints/admin/queue/promote-jobs.ts
new file mode 100644
index 0000000000..d22385e261
--- /dev/null
+++ b/packages/backend/src/server/api/endpoints/admin/queue/promote-jobs.ts
@@ -0,0 +1,39 @@
+/*
+ * SPDX-FileCopyrightText: syuilo and misskey-project
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+import { Injectable } from '@nestjs/common';
+import { Endpoint } from '@/server/api/endpoint-base.js';
+import { ModerationLogService } from '@/core/ModerationLogService.js';
+import { QUEUE_TYPES, QueueService } from '@/core/QueueService.js';
+
+export const meta = {
+ tags: ['admin'],
+
+ requireCredential: true,
+ requireModerator: true,
+ kind: 'write:admin:queue',
+} as const;
+
+export const paramDef = {
+ type: 'object',
+ properties: {
+ queue: { type: 'string', enum: QUEUE_TYPES },
+ },
+ required: ['queue'],
+} as const;
+
+@Injectable()
+export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-disable-line import/no-default-export
+ constructor(
+ private moderationLogService: ModerationLogService,
+ private queueService: QueueService,
+ ) {
+ super(meta, paramDef, async (ps, me) => {
+ this.queueService.queuePromoteJobs(ps.queue);
+
+ this.moderationLogService.log(me, 'promoteQueue');
+ });
+ }
+}
diff --git a/packages/backend/src/server/api/endpoints/admin/queue/promote.ts b/packages/backend/src/server/api/endpoints/admin/queue/promote.ts
deleted file mode 100644
index 7502d4e1f7..0000000000
--- a/packages/backend/src/server/api/endpoints/admin/queue/promote.ts
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * SPDX-FileCopyrightText: syuilo and misskey-project
- * SPDX-License-Identifier: AGPL-3.0-only
- */
-
-import { Injectable } from '@nestjs/common';
-import { Endpoint } from '@/server/api/endpoint-base.js';
-import { ModerationLogService } from '@/core/ModerationLogService.js';
-import { QueueService } from '@/core/QueueService.js';
-
-export const meta = {
- tags: ['admin'],
-
- requireCredential: true,
- requireModerator: true,
- kind: 'write:admin:queue',
-} as const;
-
-export const paramDef = {
- type: 'object',
- properties: {
- type: { type: 'string', enum: ['deliver', 'inbox'] },
- },
- required: ['type'],
-} as const;
-
-@Injectable()
-export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-disable-line import/no-default-export
- constructor(
- private moderationLogService: ModerationLogService,
- private queueService: QueueService,
- ) {
- super(meta, paramDef, async (ps, me) => {
- let delayedQueues;
-
- switch (ps.type) {
- case 'deliver':
- delayedQueues = await this.queueService.deliverQueue.getDelayed();
- for (let queueIndex = 0; queueIndex < delayedQueues.length; queueIndex++) {
- const queue = delayedQueues[queueIndex];
- try {
- await queue.promote();
- } catch (e) {
- if (e instanceof Error) {
- if (e.message.indexOf('not in a delayed state') !== -1) {
- throw e;
- }
- } else {
- throw e;
- }
- }
- }
- break;
-
- case 'inbox':
- delayedQueues = await this.queueService.inboxQueue.getDelayed();
- for (let queueIndex = 0; queueIndex < delayedQueues.length; queueIndex++) {
- const queue = delayedQueues[queueIndex];
- try {
- await queue.promote();
- } catch (e) {
- if (e instanceof Error) {
- if (e.message.indexOf('not in a delayed state') !== -1) {
- throw e;
- }
- } else {
- throw e;
- }
- }
- }
- break;
- }
-
- this.moderationLogService.log(me, 'promoteQueue');
- });
- }
-}
diff --git a/packages/backend/src/server/api/endpoints/admin/queue/queue-stats.ts b/packages/backend/src/server/api/endpoints/admin/queue/queue-stats.ts
new file mode 100644
index 0000000000..10ce48332a
--- /dev/null
+++ b/packages/backend/src/server/api/endpoints/admin/queue/queue-stats.ts
@@ -0,0 +1,36 @@
+/*
+ * SPDX-FileCopyrightText: syuilo and misskey-project
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+import { Injectable } from '@nestjs/common';
+import { Endpoint } from '@/server/api/endpoint-base.js';
+import { ModerationLogService } from '@/core/ModerationLogService.js';
+import { QUEUE_TYPES, QueueService } from '@/core/QueueService.js';
+
+export const meta = {
+ tags: ['admin'],
+
+ requireCredential: true,
+ requireModerator: true,
+ kind: 'read:admin:queue',
+} as const;
+
+export const paramDef = {
+ type: 'object',
+ properties: {
+ queue: { type: 'string', enum: QUEUE_TYPES },
+ },
+ required: ['queue'],
+} as const;
+
+@Injectable()
+export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-disable-line import/no-default-export
+ constructor(
+ private queueService: QueueService,
+ ) {
+ super(meta, paramDef, async (ps, me) => {
+ return this.queueService.queueGetQueue(ps.queue);
+ });
+ }
+}
diff --git a/packages/backend/src/server/api/endpoints/admin/queue/queues.ts b/packages/backend/src/server/api/endpoints/admin/queue/queues.ts
new file mode 100644
index 0000000000..3a38275f60
--- /dev/null
+++ b/packages/backend/src/server/api/endpoints/admin/queue/queues.ts
@@ -0,0 +1,35 @@
+/*
+ * SPDX-FileCopyrightText: syuilo and misskey-project
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+import { Injectable } from '@nestjs/common';
+import { Endpoint } from '@/server/api/endpoint-base.js';
+import { ModerationLogService } from '@/core/ModerationLogService.js';
+import { QUEUE_TYPES, QueueService } from '@/core/QueueService.js';
+
+export const meta = {
+ tags: ['admin'],
+
+ requireCredential: true,
+ requireModerator: true,
+ kind: 'read:admin:queue',
+} as const;
+
+export const paramDef = {
+ type: 'object',
+ properties: {
+ },
+ required: [],
+} as const;
+
+@Injectable()
+export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-disable-line import/no-default-export
+ constructor(
+ private queueService: QueueService,
+ ) {
+ super(meta, paramDef, async (ps, me) => {
+ return this.queueService.queueGetQueues();
+ });
+ }
+}
diff --git a/packages/backend/src/server/api/endpoints/admin/queue/remove-job.ts b/packages/backend/src/server/api/endpoints/admin/queue/remove-job.ts
new file mode 100644
index 0000000000..2c73f689d0
--- /dev/null
+++ b/packages/backend/src/server/api/endpoints/admin/queue/remove-job.ts
@@ -0,0 +1,38 @@
+/*
+ * SPDX-FileCopyrightText: syuilo and misskey-project
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+import { Injectable } from '@nestjs/common';
+import { Endpoint } from '@/server/api/endpoint-base.js';
+import { ModerationLogService } from '@/core/ModerationLogService.js';
+import { QUEUE_TYPES, QueueService } from '@/core/QueueService.js';
+
+export const meta = {
+ tags: ['admin'],
+
+ requireCredential: true,
+ requireModerator: true,
+ kind: 'write:admin:queue',
+} as const;
+
+export const paramDef = {
+ type: 'object',
+ properties: {
+ queue: { type: 'string', enum: QUEUE_TYPES },
+ jobId: { type: 'string' },
+ },
+ required: ['queue', 'jobId'],
+} as const;
+
+@Injectable()
+export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-disable-line import/no-default-export
+ constructor(
+ private moderationLogService: ModerationLogService,
+ private queueService: QueueService,
+ ) {
+ super(meta, paramDef, async (ps, me) => {
+ this.queueService.queueRemoveJob(ps.queue, ps.jobId);
+ });
+ }
+}
diff --git a/packages/backend/src/server/api/endpoints/admin/queue/retry-job.ts b/packages/backend/src/server/api/endpoints/admin/queue/retry-job.ts
new file mode 100644
index 0000000000..b2603128f8
--- /dev/null
+++ b/packages/backend/src/server/api/endpoints/admin/queue/retry-job.ts
@@ -0,0 +1,38 @@
+/*
+ * SPDX-FileCopyrightText: syuilo and misskey-project
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+import { Injectable } from '@nestjs/common';
+import { Endpoint } from '@/server/api/endpoint-base.js';
+import { ModerationLogService } from '@/core/ModerationLogService.js';
+import { QUEUE_TYPES, QueueService } from '@/core/QueueService.js';
+
+export const meta = {
+ tags: ['admin'],
+
+ requireCredential: true,
+ requireModerator: true,
+ kind: 'write:admin:queue',
+} as const;
+
+export const paramDef = {
+ type: 'object',
+ properties: {
+ queue: { type: 'string', enum: QUEUE_TYPES },
+ jobId: { type: 'string' },
+ },
+ required: ['queue', 'jobId'],
+} as const;
+
+@Injectable()
+export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-disable-line import/no-default-export
+ constructor(
+ private moderationLogService: ModerationLogService,
+ private queueService: QueueService,
+ ) {
+ super(meta, paramDef, async (ps, me) => {
+ this.queueService.queueRetryJob(ps.queue, ps.jobId);
+ });
+ }
+}
diff --git a/packages/backend/src/server/api/endpoints/admin/queue/show-job.ts b/packages/backend/src/server/api/endpoints/admin/queue/show-job.ts
new file mode 100644
index 0000000000..63747b5540
--- /dev/null
+++ b/packages/backend/src/server/api/endpoints/admin/queue/show-job.ts
@@ -0,0 +1,38 @@
+/*
+ * SPDX-FileCopyrightText: syuilo and misskey-project
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+import { Injectable } from '@nestjs/common';
+import { Endpoint } from '@/server/api/endpoint-base.js';
+import { ModerationLogService } from '@/core/ModerationLogService.js';
+import { QUEUE_TYPES, QueueService } from '@/core/QueueService.js';
+
+export const meta = {
+ tags: ['admin'],
+
+ requireCredential: true,
+ requireModerator: true,
+ kind: 'read:admin:queue',
+} as const;
+
+export const paramDef = {
+ type: 'object',
+ properties: {
+ queue: { type: 'string', enum: QUEUE_TYPES },
+ jobId: { type: 'string' },
+ },
+ required: ['queue', 'jobId'],
+} as const;
+
+@Injectable()
+export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-disable-line import/no-default-export
+ constructor(
+ private moderationLogService: ModerationLogService,
+ private queueService: QueueService,
+ ) {
+ super(meta, paramDef, async (ps, me) => {
+ return this.queueService.queueGetJob(ps.queue, ps.jobId);
+ });
+ }
+}