summaryrefslogtreecommitdiff
path: root/packages/backend/src/core/QueueService.ts
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2023-05-29 11:54:49 +0900
committerGitHub <noreply@github.com>2023-05-29 11:54:49 +0900
commitfd7b77c542b51313d8b8ea60124725fe65a295d5 (patch)
tree78893fdfe273496831124414789376b1e2a18997 /packages/backend/src/core/QueueService.ts
parentpnpm devでCtrl+Cで終了させてもプロセスが完全に殺せないの... (diff)
downloadsharkey-fd7b77c542b51313d8b8ea60124725fe65a295d5.tar.gz
sharkey-fd7b77c542b51313d8b8ea60124725fe65a295d5.tar.bz2
sharkey-fd7b77c542b51313d8b8ea60124725fe65a295d5.zip
enhance(backend): migrate bull to bullmq (#10910)
* wip * wip * Update QueueService.ts * wip * refactor * :v: * fix * Update QueueStatsService.ts * refactor * Update ApNoteService.ts * Update mock-resolver.ts * refactor * Update mock-resolver.ts
Diffstat (limited to 'packages/backend/src/core/QueueService.ts')
-rw-r--r--packages/backend/src/core/QueueService.ts65
1 files changed, 49 insertions, 16 deletions
diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts
index b4ffffecc0..2ae8a2b754 100644
--- a/packages/backend/src/core/QueueService.ts
+++ b/packages/backend/src/core/QueueService.ts
@@ -1,6 +1,5 @@
import { Inject, Injectable } from '@nestjs/common';
import { v4 as uuid } from 'uuid';
-import Bull from 'bull';
import type { IActivity } from '@/core/activitypub/type.js';
import type { DriveFile } from '@/models/entities/DriveFile.js';
import type { Webhook, webhookEventTypes } from '@/models/entities/Webhook.js';
@@ -11,6 +10,7 @@ import type { Antenna } from '@/server/api/endpoints/i/import-antennas.js';
import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, RelationshipQueue, SystemQueue, WebhookDeliverQueue } from './QueueModule.js';
import type { DbJobData, RelationshipJobData, ThinUser } from '../queue/types.js';
import type httpSignature from '@peertube/http-signature';
+import type * as Bull from 'bullmq';
@Injectable()
export class QueueService {
@@ -26,7 +26,43 @@ export class QueueService {
@Inject('queue:relationship') public relationshipQueue: RelationshipQueue,
@Inject('queue:objectStorage') public objectStorageQueue: ObjectStorageQueue,
@Inject('queue:webhookDeliver') public webhookDeliverQueue: WebhookDeliverQueue,
- ) {}
+ ) {
+ this.systemQueue.add('tickCharts', {
+ }, {
+ repeat: { pattern: '55 * * * *' },
+ removeOnComplete: true,
+ });
+
+ this.systemQueue.add('resyncCharts', {
+ }, {
+ repeat: { pattern: '0 0 * * *' },
+ removeOnComplete: true,
+ });
+
+ this.systemQueue.add('cleanCharts', {
+ }, {
+ repeat: { pattern: '0 0 * * *' },
+ removeOnComplete: true,
+ });
+
+ this.systemQueue.add('aggregateRetention', {
+ }, {
+ repeat: { pattern: '0 0 * * *' },
+ removeOnComplete: true,
+ });
+
+ this.systemQueue.add('clean', {
+ }, {
+ repeat: { pattern: '0 0 * * *' },
+ removeOnComplete: true,
+ });
+
+ this.systemQueue.add('checkExpiredMutings', {
+ }, {
+ repeat: { pattern: '*/5 * * * *' },
+ removeOnComplete: true,
+ });
+ }
@bindThis
public deliver(user: ThinUser, content: IActivity | null, to: string | null, isSharedInbox: boolean) {
@@ -42,11 +78,10 @@ export class QueueService {
isSharedInbox,
};
- return this.deliverQueue.add(data, {
+ return this.deliverQueue.add(to, data, {
attempts: this.config.deliverJobMaxAttempts ?? 12,
- timeout: 1 * 60 * 1000, // 1min
backoff: {
- type: 'apBackoff',
+ type: 'custom',
},
removeOnComplete: true,
removeOnFail: true,
@@ -60,11 +95,10 @@ export class QueueService {
signature,
};
- return this.inboxQueue.add(data, {
+ return this.inboxQueue.add('', data, {
attempts: this.config.inboxJobMaxAttempts ?? 8,
- timeout: 5 * 60 * 1000, // 5min
backoff: {
- type: 'apBackoff',
+ type: 'custom',
},
removeOnComplete: true,
removeOnFail: true,
@@ -212,7 +246,7 @@ export class QueueService {
private generateToDbJobData<T extends 'importFollowingToDb' | 'importBlockingToDb', D extends DbJobData<T>>(name: T, data: D): {
name: string,
data: D,
- opts: Bull.JobOptions,
+ opts: Bull.JobsOptions,
} {
return {
name,
@@ -299,10 +333,10 @@ export class QueueService {
}
@bindThis
- private generateRelationshipJobData(name: 'follow' | 'unfollow' | 'block' | 'unblock', data: RelationshipJobData, opts: Bull.JobOptions = {}): {
+ private generateRelationshipJobData(name: 'follow' | 'unfollow' | 'block' | 'unblock', data: RelationshipJobData, opts: Bull.JobsOptions = {}): {
name: string,
data: RelationshipJobData,
- opts: Bull.JobOptions,
+ opts: Bull.JobsOptions,
} {
return {
name,
@@ -351,11 +385,10 @@ export class QueueService {
eventId: uuid(),
};
- return this.webhookDeliverQueue.add(data, {
+ return this.webhookDeliverQueue.add(webhook.id, data, {
attempts: 4,
- timeout: 1 * 60 * 1000, // 1min
backoff: {
- type: 'apBackoff',
+ type: 'custom',
},
removeOnComplete: true,
removeOnFail: true,
@@ -367,11 +400,11 @@ export class QueueService {
this.deliverQueue.once('cleaned', (jobs, status) => {
//deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
- this.deliverQueue.clean(0, 'delayed');
+ this.deliverQueue.clean(0, Infinity, 'delayed');
this.inboxQueue.once('cleaned', (jobs, status) => {
//inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
- this.inboxQueue.clean(0, 'delayed');
+ this.inboxQueue.clean(0, Infinity, 'delayed');
}
}