summaryrefslogtreecommitdiff
path: root/packages/backend/src/core/QueueService.ts
diff options
context:
space:
mode:
authorsyuilo <4439005+syuilo@users.noreply.github.com>2025-04-16 16:47:03 +0900
committersyuilo <4439005+syuilo@users.noreply.github.com>2025-04-16 16:47:03 +0900
commiteda2f587a389ecc67642b855c70d7aa65b41a38b (patch)
tree781a096b3245dd34709bcd1683ea66bce38774ea /packages/backend/src/core/QueueService.ts
parentBump version to 2025.4.1-alpha.1 (diff)
downloadsharkey-eda2f587a389ecc67642b855c70d7aa65b41a38b.tar.gz
sharkey-eda2f587a389ecc67642b855c70d7aa65b41a38b.tar.bz2
sharkey-eda2f587a389ecc67642b855c70d7aa65b41a38b.zip
enhance: コントロールパネルでジョブキューをクリアできるように
Diffstat (limited to 'packages/backend/src/core/QueueService.ts')
-rw-r--r--packages/backend/src/core/QueueService.ts50
1 files changed, 41 insertions, 9 deletions
diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts
index da76dd1284..eb02355625 100644
--- a/packages/backend/src/core/QueueService.ts
+++ b/packages/backend/src/core/QueueService.ts
@@ -38,6 +38,18 @@ import type {
import type httpSignature from '@peertube/http-signature';
import type * as Bull from 'bullmq';
+export const QUEUE_TYPES = [
+ 'system',
+ 'endedPollNotification',
+ 'deliver',
+ 'inbox',
+ 'db',
+ 'relationship',
+ 'objectStorage',
+ 'userWebhookDeliver',
+ 'systemWebhookDeliver',
+] as const;
+
@Injectable()
export class QueueService {
constructor(
@@ -529,15 +541,35 @@ export class QueueService {
}
@bindThis
- public destroy() {
- this.deliverQueue.once('cleaned', (jobs, status) => {
- //deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
- });
- this.deliverQueue.clean(0, 0, 'delayed');
+ private getQueue(type: typeof QUEUE_TYPES[number]) {
+ switch (type) {
+ case 'system': return this.systemQueue;
+ case 'endedPollNotification': return this.endedPollNotificationQueue;
+ case 'deliver': return this.deliverQueue;
+ case 'inbox': return this.inboxQueue;
+ case 'db': return this.dbQueue;
+ case 'relationship': return this.relationshipQueue;
+ case 'objectStorage': return this.objectStorageQueue;
+ case 'userWebhookDeliver': return this.userWebhookDeliverQueue;
+ case 'systemWebhookDeliver': return this.systemWebhookDeliverQueue;
+ default: throw new Error(`Unrecognized queue type: ${type}`);
+ }
+ }
- this.inboxQueue.once('cleaned', (jobs, status) => {
- //inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
- });
- this.inboxQueue.clean(0, 0, 'delayed');
+ @bindThis
+ public clearQueue(queueType: typeof QUEUE_TYPES[number], state: '*' | 'completed' | 'wait' | 'active' | 'paused' | 'prioritized' | 'delayed' | 'failed') {
+ const queue = this.getQueue(queueType);
+
+ if (state === '*') {
+ queue.clean(0, 0, 'completed');
+ queue.clean(0, 0, 'wait');
+ queue.clean(0, 0, 'active');
+ queue.clean(0, 0, 'paused');
+ queue.clean(0, 0, 'prioritized');
+ queue.clean(0, 0, 'delayed');
+ queue.clean(0, 0, 'failed');
+ } else {
+ queue.clean(0, 0, state);
+ }
}
}