summaryrefslogtreecommitdiff
path: root/packages/backend/src/core/QueueService.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/core/QueueService.ts')
-rw-r--r--packages/backend/src/core/QueueService.ts46
1 files changed, 40 insertions, 6 deletions
diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts
index 2ae8a2b754..546b4cee1b 100644
--- a/packages/backend/src/core/QueueService.ts
+++ b/packages/backend/src/core/QueueService.ts
@@ -1,5 +1,5 @@
+import { randomUUID } from 'node:crypto';
import { Inject, Injectable } from '@nestjs/common';
-import { v4 as uuid } from 'uuid';
import type { IActivity } from '@/core/activitypub/type.js';
import type { DriveFile } from '@/models/entities/DriveFile.js';
import type { Webhook, webhookEventTypes } from '@/models/entities/Webhook.js';
@@ -8,7 +8,7 @@ import { DI } from '@/di-symbols.js';
import { bindThis } from '@/decorators.js';
import type { Antenna } from '@/server/api/endpoints/i/import-antennas.js';
import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, RelationshipQueue, SystemQueue, WebhookDeliverQueue } from './QueueModule.js';
-import type { DbJobData, RelationshipJobData, ThinUser } from '../queue/types.js';
+import type { DbJobData, DeliverJobData, RelationshipJobData, ThinUser } from '../queue/types.js';
import type httpSignature from '@peertube/http-signature';
import type * as Bull from 'bullmq';
@@ -69,7 +69,7 @@ export class QueueService {
if (content == null) return null;
if (to == null) return null;
- const data = {
+ const data: DeliverJobData = {
user: {
id: user.id,
},
@@ -88,6 +88,40 @@ export class QueueService {
});
}
+ /**
+ * ApDeliverManager-DeliverManager.execute()からinboxesを突っ込んでaddBulkしたい
+ * @param user `{ id: string; }` この関数ではThinUserに変換しないので前もって変換してください
+ * @param content IActivity | null
+ * @param inboxes `Map<string, boolean>` / key: to (inbox url), value: isSharedInbox (whether it is sharedInbox)
+ * @returns void
+ */
+ @bindThis
+ public async deliverMany(user: ThinUser, content: IActivity | null, inboxes: Map<string, boolean>) {
+ if (content == null) return null;
+
+ const opts = {
+ attempts: this.config.deliverJobMaxAttempts ?? 12,
+ backoff: {
+ type: 'custom',
+ },
+ removeOnComplete: true,
+ removeOnFail: true,
+ };
+
+ await this.deliverQueue.addBulk(Array.from(inboxes.entries(), d => ({
+ name: d[0],
+ data: {
+ user,
+ content,
+ to: d[0],
+ isSharedInbox: d[1],
+ } as DeliverJobData,
+ opts,
+ })));
+
+ return;
+ }
+
@bindThis
public inbox(activity: IActivity, signature: httpSignature.IParsedSignature) {
const data = {
@@ -382,7 +416,7 @@ export class QueueService {
to: webhook.url,
secret: webhook.secret,
createdAt: Date.now(),
- eventId: uuid(),
+ eventId: randomUUID(),
};
return this.webhookDeliverQueue.add(webhook.id, data, {
@@ -400,11 +434,11 @@ export class QueueService {
this.deliverQueue.once('cleaned', (jobs, status) => {
//deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
- this.deliverQueue.clean(0, Infinity, 'delayed');
+ this.deliverQueue.clean(0, 0, 'delayed');
this.inboxQueue.once('cleaned', (jobs, status) => {
//inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
- this.inboxQueue.clean(0, Infinity, 'delayed');
+ this.inboxQueue.clean(0, 0, 'delayed');
}
}