summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/QueueProcessorService.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/queue/QueueProcessorService.ts')
-rw-r--r--packages/backend/src/queue/QueueProcessorService.ts108
1 files changed, 91 insertions, 17 deletions
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts
index 706110f6fc..dc025f9889 100644
--- a/packages/backend/src/queue/QueueProcessorService.ts
+++ b/packages/backend/src/queue/QueueProcessorService.ts
@@ -5,15 +5,36 @@ import type Logger from '@/logger.js';
import { QueueService } from '@/core/QueueService.js';
import { bindThis } from '@/decorators.js';
import { getJobInfo } from './get-job-info.js';
-import { SystemQueueProcessorsService } from './SystemQueueProcessorsService.js';
-import { ObjectStorageQueueProcessorsService } from './ObjectStorageQueueProcessorsService.js';
-import { DbQueueProcessorsService } from './DbQueueProcessorsService.js';
import { WebhookDeliverProcessorService } from './processors/WebhookDeliverProcessorService.js';
import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
import { DeliverProcessorService } from './processors/DeliverProcessorService.js';
import { InboxProcessorService } from './processors/InboxProcessorService.js';
+import { DeleteDriveFilesProcessorService } from './processors/DeleteDriveFilesProcessorService.js';
+import { ExportCustomEmojisProcessorService } from './processors/ExportCustomEmojisProcessorService.js';
+import { ExportNotesProcessorService } from './processors/ExportNotesProcessorService.js';
+import { ExportFollowingProcessorService } from './processors/ExportFollowingProcessorService.js';
+import { ExportMutingProcessorService } from './processors/ExportMutingProcessorService.js';
+import { ExportBlockingProcessorService } from './processors/ExportBlockingProcessorService.js';
+import { ExportUserListsProcessorService } from './processors/ExportUserListsProcessorService.js';
+import { ExportAntennasProcessorService } from './processors/ExportAntennasProcessorService.js';
+import { ImportFollowingProcessorService } from './processors/ImportFollowingProcessorService.js';
+import { ImportMutingProcessorService } from './processors/ImportMutingProcessorService.js';
+import { ImportBlockingProcessorService } from './processors/ImportBlockingProcessorService.js';
+import { ImportUserListsProcessorService } from './processors/ImportUserListsProcessorService.js';
+import { ImportCustomEmojisProcessorService } from './processors/ImportCustomEmojisProcessorService.js';
+import { ImportAntennasProcessorService } from './processors/ImportAntennasProcessorService.js';
+import { DeleteAccountProcessorService } from './processors/DeleteAccountProcessorService.js';
+import { ExportFavoritesProcessorService } from './processors/ExportFavoritesProcessorService.js';
+import { CleanRemoteFilesProcessorService } from './processors/CleanRemoteFilesProcessorService.js';
+import { DeleteFileProcessorService } from './processors/DeleteFileProcessorService.js';
+import { RelationshipProcessorService } from './processors/RelationshipProcessorService.js';
+import { TickChartsProcessorService } from './processors/TickChartsProcessorService.js';
+import { ResyncChartsProcessorService } from './processors/ResyncChartsProcessorService.js';
+import { CleanChartsProcessorService } from './processors/CleanChartsProcessorService.js';
+import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js';
+import { CleanProcessorService } from './processors/CleanProcessorService.js';
+import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
import { QueueLoggerService } from './QueueLoggerService.js';
-import { RelationshipQueueProcessorsService } from './RelationshipQueueProcessorsService.js';
@Injectable()
export class QueueProcessorService {
@@ -25,14 +46,35 @@ export class QueueProcessorService {
private queueLoggerService: QueueLoggerService,
private queueService: QueueService,
- private systemQueueProcessorsService: SystemQueueProcessorsService,
- private objectStorageQueueProcessorsService: ObjectStorageQueueProcessorsService,
- private dbQueueProcessorsService: DbQueueProcessorsService,
- private relationshipQueueProcessorsService: RelationshipQueueProcessorsService,
private webhookDeliverProcessorService: WebhookDeliverProcessorService,
private endedPollNotificationProcessorService: EndedPollNotificationProcessorService,
private deliverProcessorService: DeliverProcessorService,
private inboxProcessorService: InboxProcessorService,
+ private deleteDriveFilesProcessorService: DeleteDriveFilesProcessorService,
+ private exportCustomEmojisProcessorService: ExportCustomEmojisProcessorService,
+ private exportNotesProcessorService: ExportNotesProcessorService,
+ private exportFavoritesProcessorService: ExportFavoritesProcessorService,
+ private exportFollowingProcessorService: ExportFollowingProcessorService,
+ private exportMutingProcessorService: ExportMutingProcessorService,
+ private exportBlockingProcessorService: ExportBlockingProcessorService,
+ private exportUserListsProcessorService: ExportUserListsProcessorService,
+ private exportAntennasProcessorService: ExportAntennasProcessorService,
+ private importFollowingProcessorService: ImportFollowingProcessorService,
+ private importMutingProcessorService: ImportMutingProcessorService,
+ private importBlockingProcessorService: ImportBlockingProcessorService,
+ private importUserListsProcessorService: ImportUserListsProcessorService,
+ private importCustomEmojisProcessorService: ImportCustomEmojisProcessorService,
+ private importAntennasProcessorService: ImportAntennasProcessorService,
+ private deleteAccountProcessorService: DeleteAccountProcessorService,
+ private deleteFileProcessorService: DeleteFileProcessorService,
+ private cleanRemoteFilesProcessorService: CleanRemoteFilesProcessorService,
+ private relationshipProcessorService: RelationshipProcessorService,
+ private tickChartsProcessorService: TickChartsProcessorService,
+ private resyncChartsProcessorService: ResyncChartsProcessorService,
+ private cleanChartsProcessorService: CleanChartsProcessorService,
+ private aggregateRetentionProcessorService: AggregateRetentionProcessorService,
+ private checkExpiredMutingsProcessorService: CheckExpiredMutingsProcessorService,
+ private cleanProcessorService: CleanProcessorService,
) {
this.logger = this.queueLoggerService.logger;
}
@@ -119,14 +161,6 @@ export class QueueProcessorService {
.on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) }))
.on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
- this.queueService.deliverQueue.process(this.config.deliverJobConcurrency ?? 128, (job) => this.deliverProcessorService.process(job));
- this.queueService.inboxQueue.process(this.config.inboxJobConcurrency ?? 16, (job) => this.inboxProcessorService.process(job));
- this.queueService.endedPollNotificationQueue.process((job, done) => this.endedPollNotificationProcessorService.process(job, done));
- this.queueService.webhookDeliverQueue.process(64, (job) => this.webhookDeliverProcessorService.process(job));
- this.dbQueueProcessorsService.start(this.queueService.dbQueue);
- this.relationshipQueueProcessorsService.start(this.queueService.relationshipQueue);
- this.objectStorageQueueProcessorsService.start(this.queueService.objectStorageQueue);
-
this.queueService.systemQueue.add('tickCharts', {
}, {
repeat: { cron: '55 * * * *' },
@@ -163,6 +197,46 @@ export class QueueProcessorService {
removeOnComplete: true,
});
- this.systemQueueProcessorsService.start(this.queueService.systemQueue);
+ this.queueService.deliverQueue.process(this.config.deliverJobConcurrency ?? 128, (job) => this.deliverProcessorService.process(job));
+ this.queueService.inboxQueue.process(this.config.inboxJobConcurrency ?? 16, (job) => this.inboxProcessorService.process(job));
+ this.queueService.endedPollNotificationQueue.process((job, done) => this.endedPollNotificationProcessorService.process(job, done));
+ this.queueService.webhookDeliverQueue.process(64, (job) => this.webhookDeliverProcessorService.process(job));
+
+ this.queueService.dbQueue.process('deleteDriveFiles', (job, done) => this.deleteDriveFilesProcessorService.process(job, done));
+ this.queueService.dbQueue.process('exportCustomEmojis', (job, done) => this.exportCustomEmojisProcessorService.process(job, done));
+ this.queueService.dbQueue.process('exportNotes', (job, done) => this.exportNotesProcessorService.process(job, done));
+ this.queueService.dbQueue.process('exportFavorites', (job, done) => this.exportFavoritesProcessorService.process(job, done));
+ this.queueService.dbQueue.process('exportFollowing', (job, done) => this.exportFollowingProcessorService.process(job, done));
+ this.queueService.dbQueue.process('exportMuting', (job, done) => this.exportMutingProcessorService.process(job, done));
+ this.queueService.dbQueue.process('exportBlocking', (job, done) => this.exportBlockingProcessorService.process(job, done));
+ this.queueService.dbQueue.process('exportUserLists', (job, done) => this.exportUserListsProcessorService.process(job, done));
+ this.queueService.dbQueue.process('exportAntennas', (job, done) => this.exportAntennasProcessorService.process(job, done));
+ this.queueService.dbQueue.process('importFollowing', (job, done) => this.importFollowingProcessorService.process(job, done));
+ this.queueService.dbQueue.process('importFollowingToDb', (job) => this.importFollowingProcessorService.processDb(job));
+ this.queueService.dbQueue.process('importMuting', (job, done) => this.importMutingProcessorService.process(job, done));
+ this.queueService.dbQueue.process('importBlocking', (job, done) => this.importBlockingProcessorService.process(job, done));
+ this.queueService.dbQueue.process('importBlockingToDb', (job) => this.importBlockingProcessorService.processDb(job));
+ this.queueService.dbQueue.process('importUserLists', (job, done) => this.importUserListsProcessorService.process(job, done));
+ this.queueService.dbQueue.process('importCustomEmojis', (job, done) => this.importCustomEmojisProcessorService.process(job, done));
+ this.queueService.dbQueue.process('importAntennas', (job, done) => this.importAntennasProcessorService.process(job, done));
+ this.queueService.dbQueue.process('deleteAccount', (job) => this.deleteAccountProcessorService.process(job));
+
+ this.queueService.objectStorageQueue.process('deleteFile', 16, (job) => this.deleteFileProcessorService.process(job));
+ this.queueService.objectStorageQueue.process('cleanRemoteFiles', 16, (job, done) => this.cleanRemoteFilesProcessorService.process(job, done));
+
+ {
+ const maxJobs = this.config.relashionshipJobConcurrency ?? 16;
+ this.queueService.relationshipQueue.process('follow', maxJobs, (job) => this.relationshipProcessorService.processFollow(job));
+ this.queueService.relationshipQueue.process('unfollow', maxJobs, (job) => this.relationshipProcessorService.processUnfollow(job));
+ this.queueService.relationshipQueue.process('block', maxJobs, (job) => this.relationshipProcessorService.processBlock(job));
+ this.queueService.relationshipQueue.process('unblock', maxJobs, (job) => this.relationshipProcessorService.processUnblock(job));
+ }
+
+ this.queueService.systemQueue.process('tickCharts', (job, done) => this.tickChartsProcessorService.process(job, done));
+ this.queueService.systemQueue.process('resyncCharts', (job, done) => this.resyncChartsProcessorService.process(job, done));
+ this.queueService.systemQueue.process('cleanCharts', (job, done) => this.cleanChartsProcessorService.process(job, done));
+ this.queueService.systemQueue.process('aggregateRetention', (job, done) => this.aggregateRetentionProcessorService.process(job, done));
+ this.queueService.systemQueue.process('checkExpiredMutings', (job, done) => this.checkExpiredMutingsProcessorService.process(job, done));
+ this.queueService.systemQueue.process('clean', (job, done) => this.cleanProcessorService.process(job, done));
}
}