diff options
| author | syuilo <Syuilotan@yahoo.co.jp> | 2023-07-21 20:36:07 +0900 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-07-21 20:36:07 +0900 |
| commit | e64a81aa1d2801516e8eac8dc69aac540489f20b (patch) | |
| tree | 56accbc0f5f71db864e1e975920135fb0a957291 /packages/backend/src/core/QueueService.ts | |
| parent | Merge pull request #10990 from misskey-dev/develop (diff) | |
| parent | New Crowdin updates (#11336) (diff) | |
| download | misskey-e64a81aa1d2801516e8eac8dc69aac540489f20b.tar.gz misskey-e64a81aa1d2801516e8eac8dc69aac540489f20b.tar.bz2 misskey-e64a81aa1d2801516e8eac8dc69aac540489f20b.zip | |
Merge pull request #11301 from misskey-dev/develop
Release: 13.14.0
Diffstat (limited to 'packages/backend/src/core/QueueService.ts')
| -rw-r--r-- | packages/backend/src/core/QueueService.ts | 46 |
1 files changed, 40 insertions, 6 deletions
diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts index 2ae8a2b754..546b4cee1b 100644 --- a/packages/backend/src/core/QueueService.ts +++ b/packages/backend/src/core/QueueService.ts @@ -1,5 +1,5 @@ +import { randomUUID } from 'node:crypto'; import { Inject, Injectable } from '@nestjs/common'; -import { v4 as uuid } from 'uuid'; import type { IActivity } from '@/core/activitypub/type.js'; import type { DriveFile } from '@/models/entities/DriveFile.js'; import type { Webhook, webhookEventTypes } from '@/models/entities/Webhook.js'; @@ -8,7 +8,7 @@ import { DI } from '@/di-symbols.js'; import { bindThis } from '@/decorators.js'; import type { Antenna } from '@/server/api/endpoints/i/import-antennas.js'; import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, RelationshipQueue, SystemQueue, WebhookDeliverQueue } from './QueueModule.js'; -import type { DbJobData, RelationshipJobData, ThinUser } from '../queue/types.js'; +import type { DbJobData, DeliverJobData, RelationshipJobData, ThinUser } from '../queue/types.js'; import type httpSignature from '@peertube/http-signature'; import type * as Bull from 'bullmq'; @@ -69,7 +69,7 @@ export class QueueService { if (content == null) return null; if (to == null) return null; - const data = { + const data: DeliverJobData = { user: { id: user.id, }, @@ -88,6 +88,40 @@ export class QueueService { }); } + /** + * ApDeliverManager-DeliverManager.execute()からinboxesを突っ込んでaddBulkしたい + * @param user `{ id: string; }` この関数ではThinUserに変換しないので前もって変換してください + * @param content IActivity | null + * @param inboxes `Map<string, boolean>` / key: to (inbox url), value: isSharedInbox (whether it is sharedInbox) + * @returns void + */ + @bindThis + public async deliverMany(user: ThinUser, content: IActivity | null, inboxes: Map<string, boolean>) { + if (content == null) return null; + + const opts = { + attempts: this.config.deliverJobMaxAttempts ?? 12, + backoff: { + type: 'custom', + }, + removeOnComplete: true, + removeOnFail: true, + }; + + await this.deliverQueue.addBulk(Array.from(inboxes.entries(), d => ({ + name: d[0], + data: { + user, + content, + to: d[0], + isSharedInbox: d[1], + } as DeliverJobData, + opts, + }))); + + return; + } + @bindThis public inbox(activity: IActivity, signature: httpSignature.IParsedSignature) { const data = { @@ -382,7 +416,7 @@ export class QueueService { to: webhook.url, secret: webhook.secret, createdAt: Date.now(), - eventId: uuid(), + eventId: randomUUID(), }; return this.webhookDeliverQueue.add(webhook.id, data, { @@ -400,11 +434,11 @@ export class QueueService { this.deliverQueue.once('cleaned', (jobs, status) => { //deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); }); - this.deliverQueue.clean(0, Infinity, 'delayed'); + this.deliverQueue.clean(0, 0, 'delayed'); this.inboxQueue.once('cleaned', (jobs, status) => { //inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); }); - this.inboxQueue.clean(0, Infinity, 'delayed'); + this.inboxQueue.clean(0, 0, 'delayed'); } } |