diff options
| author | syuilo <Syuilotan@yahoo.co.jp> | 2022-09-18 03:27:08 +0900 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-09-18 03:27:08 +0900 |
| commit | b75184ec8e3436200bacdcd832e3324702553d20 (patch) | |
| tree | 8b7e316f29e95df921db57289c8b8da476d18f07 /packages/backend/src/core/QueueService.ts | |
| parent | Update ROADMAP.md (diff) | |
| download | sharkey-b75184ec8e3436200bacdcd832e3324702553d20.tar.gz sharkey-b75184ec8e3436200bacdcd832e3324702553d20.tar.bz2 sharkey-b75184ec8e3436200bacdcd832e3324702553d20.zip | |
なんかもうめっちゃ変えた
Diffstat (limited to 'packages/backend/src/core/QueueService.ts')
| -rw-r--r-- | packages/backend/src/core/QueueService.ts | 242 |
1 files changed, 242 insertions, 0 deletions
diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts new file mode 100644 index 0000000000..7e771c100f --- /dev/null +++ b/packages/backend/src/core/QueueService.ts @@ -0,0 +1,242 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { v4 as uuid } from 'uuid'; +import type { IActivity } from '@/core/remote/activitypub/type.js'; +import type { DriveFile } from '@/models/entities/DriveFile.js'; +import type { Webhook, webhookEventTypes } from '@/models/entities/Webhook.js'; +import { Config } from '@/config.js'; +import { DI } from '@/di-symbols.js'; +import { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, SystemQueue, WebhookDeliverQueue } from './queue/QueueModule.js'; +import type { ThinUser } from '../queue/types.js'; +import type httpSignature from '@peertube/http-signature'; + +@Injectable() +export class QueueService { + constructor( + @Inject(DI.config) + private config: Config, + + @Inject('queue:system') public systemQueue: SystemQueue, + @Inject('queue:endedPollNotification') public endedPollNotificationQueue: EndedPollNotificationQueue, + @Inject('queue:deliver') public deliverQueue: DeliverQueue, + @Inject('queue:inbox') public inboxQueue: InboxQueue, + @Inject('queue:db') public dbQueue: DbQueue, + @Inject('queue:objectStorage') public objectStorageQueue: ObjectStorageQueue, + @Inject('queue:webhookDeliver') public webhookDeliverQueue: WebhookDeliverQueue, + ) {} + + public deliver(user: ThinUser, content: IActivity, to: string | null) { + if (content == null) return null; + if (to == null) return null; + + const data = { + user: { + id: user.id, + }, + content, + to, + }; + + return this.deliverQueue.add(data, { + attempts: this.config.deliverJobMaxAttempts ?? 12, + timeout: 1 * 60 * 1000, // 1min + backoff: { + type: 'apBackoff', + }, + removeOnComplete: true, + removeOnFail: true, + }); + } + + public inbox(activity: IActivity, signature: httpSignature.IParsedSignature) { + const data = { + activity: activity, + signature, + }; + + return this.inboxQueue.add(data, { + attempts: this.config.inboxJobMaxAttempts ?? 8, + timeout: 5 * 60 * 1000, // 5min + backoff: { + type: 'apBackoff', + }, + removeOnComplete: true, + removeOnFail: true, + }); + } + + public createDeleteDriveFilesJob(user: ThinUser) { + return this.dbQueue.add('deleteDriveFiles', { + user: user, + }, { + removeOnComplete: true, + removeOnFail: true, + }); + } + + public createExportCustomEmojisJob(user: ThinUser) { + return this.dbQueue.add('exportCustomEmojis', { + user: user, + }, { + removeOnComplete: true, + removeOnFail: true, + }); + } + + public createExportNotesJob(user: ThinUser) { + return this.dbQueue.add('exportNotes', { + user: user, + }, { + removeOnComplete: true, + removeOnFail: true, + }); + } + + public createExportFollowingJob(user: ThinUser, excludeMuting = false, excludeInactive = false) { + return this.dbQueue.add('exportFollowing', { + user: user, + excludeMuting, + excludeInactive, + }, { + removeOnComplete: true, + removeOnFail: true, + }); + } + + public createExportMuteJob(user: ThinUser) { + return this.dbQueue.add('exportMuting', { + user: user, + }, { + removeOnComplete: true, + removeOnFail: true, + }); + } + + public createExportBlockingJob(user: ThinUser) { + return this.dbQueue.add('exportBlocking', { + user: user, + }, { + removeOnComplete: true, + removeOnFail: true, + }); + } + + public createExportUserListsJob(user: ThinUser) { + return this.dbQueue.add('exportUserLists', { + user: user, + }, { + removeOnComplete: true, + removeOnFail: true, + }); + } + + public createImportFollowingJob(user: ThinUser, fileId: DriveFile['id']) { + return this.dbQueue.add('importFollowing', { + user: user, + fileId: fileId, + }, { + removeOnComplete: true, + removeOnFail: true, + }); + } + + public createImportMutingJob(user: ThinUser, fileId: DriveFile['id']) { + return this.dbQueue.add('importMuting', { + user: user, + fileId: fileId, + }, { + removeOnComplete: true, + removeOnFail: true, + }); + } + + public createImportBlockingJob(user: ThinUser, fileId: DriveFile['id']) { + return this.dbQueue.add('importBlocking', { + user: user, + fileId: fileId, + }, { + removeOnComplete: true, + removeOnFail: true, + }); + } + + public createImportUserListsJob(user: ThinUser, fileId: DriveFile['id']) { + return this.dbQueue.add('importUserLists', { + user: user, + fileId: fileId, + }, { + removeOnComplete: true, + removeOnFail: true, + }); + } + + public createImportCustomEmojisJob(user: ThinUser, fileId: DriveFile['id']) { + return this.dbQueue.add('importCustomEmojis', { + user: user, + fileId: fileId, + }, { + removeOnComplete: true, + removeOnFail: true, + }); + } + + public createDeleteAccountJob(user: ThinUser, opts: { soft?: boolean; } = {}) { + return this.dbQueue.add('deleteAccount', { + user: user, + soft: opts.soft, + }, { + removeOnComplete: true, + removeOnFail: true, + }); + } + + public createDeleteObjectStorageFileJob(key: string) { + return this.objectStorageQueue.add('deleteFile', { + key: key, + }, { + removeOnComplete: true, + removeOnFail: true, + }); + } + + public createCleanRemoteFilesJob() { + return this.objectStorageQueue.add('cleanRemoteFiles', {}, { + removeOnComplete: true, + removeOnFail: true, + }); + } + + public webhookDeliver(webhook: Webhook, type: typeof webhookEventTypes[number], content: unknown) { + const data = { + type, + content, + webhookId: webhook.id, + userId: webhook.userId, + to: webhook.url, + secret: webhook.secret, + createdAt: Date.now(), + eventId: uuid(), + }; + + return this.webhookDeliverQueue.add(data, { + attempts: 4, + timeout: 1 * 60 * 1000, // 1min + backoff: { + type: 'apBackoff', + }, + removeOnComplete: true, + removeOnFail: true, + }); + } + + public destroy() { + this.deliverQueue.once('cleaned', (jobs, status) => { + //deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); + }); + this.deliverQueue.clean(0, 'delayed'); + + this.inboxQueue.once('cleaned', (jobs, status) => { + //inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); + }); + this.inboxQueue.clean(0, 'delayed'); + } +} |