summaryrefslogtreecommitdiff
path: root/packages/backend/src
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2023-05-10 15:05:08 +0900
committersyuilo <Syuilotan@yahoo.co.jp>2023-05-10 15:05:08 +0900
commit341c42ebb9c30428fdc7527dd3d22b2d25885ed6 (patch)
treebe605c167249125ab88e644ba07edec701b73763 /packages/backend/src
parentMerge branch 'develop' of https://github.com/misskey-dev/misskey into develop (diff)
downloadsharkey-341c42ebb9c30428fdc7527dd3d22b2d25885ed6.tar.gz
sharkey-341c42ebb9c30428fdc7527dd3d22b2d25885ed6.tar.bz2
sharkey-341c42ebb9c30428fdc7527dd3d22b2d25885ed6.zip
enhance(backend): graceful shutdown for job queue and refactor
Diffstat (limited to 'packages/backend/src')
-rw-r--r--packages/backend/src/core/QueueModule.ts41
-rw-r--r--packages/backend/src/queue/DbQueueProcessorsService.ts69
-rw-r--r--packages/backend/src/queue/ObjectStorageQueueProcessorsService.ts25
-rw-r--r--packages/backend/src/queue/QueueProcessorModule.ts8
-rw-r--r--packages/backend/src/queue/QueueProcessorService.ts108
-rw-r--r--packages/backend/src/queue/RelationshipQueueProcessorsService.ts26
-rw-r--r--packages/backend/src/queue/SystemQueueProcessorsService.ts37
7 files changed, 128 insertions, 186 deletions
diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts
index d4905a5f88..1d73947776 100644
--- a/packages/backend/src/core/QueueModule.ts
+++ b/packages/backend/src/core/QueueModule.ts
@@ -1,4 +1,5 @@
-import { Module } from '@nestjs/common';
+import { setTimeout } from 'node:timers/promises';
+import { Inject, Module, OnApplicationShutdown } from '@nestjs/common';
import Bull from 'bull';
import { DI } from '@/di-symbols.js';
import type { Config } from '@/config.js';
@@ -41,9 +42,9 @@ export type SystemQueue = Bull.Queue<Record<string, unknown>>;
export type EndedPollNotificationQueue = Bull.Queue<EndedPollNotificationJobData>;
export type DeliverQueue = Bull.Queue<DeliverJobData>;
export type InboxQueue = Bull.Queue<InboxJobData>;
-export type DbQueue = Bull.Queue<DbJobData<keyof DbJobMap>>;
+export type DbQueue = Bull.Queue;
export type RelationshipQueue = Bull.Queue<RelationshipJobData>;
-export type ObjectStorageQueue = Bull.Queue<ObjectStorageJobData>;
+export type ObjectStorageQueue = Bull.Queue;
export type WebhookDeliverQueue = Bull.Queue<WebhookDeliverJobData>;
const $system: Provider = {
@@ -118,4 +119,36 @@ const $webhookDeliver: Provider = {
$webhookDeliver,
],
})
-export class QueueModule {}
+export class QueueModule implements OnApplicationShutdown {
+ constructor(
+ @Inject('queue:system') public systemQueue: SystemQueue,
+ @Inject('queue:endedPollNotification') public endedPollNotificationQueue: EndedPollNotificationQueue,
+ @Inject('queue:deliver') public deliverQueue: DeliverQueue,
+ @Inject('queue:inbox') public inboxQueue: InboxQueue,
+ @Inject('queue:db') public dbQueue: DbQueue,
+ @Inject('queue:relationship') public relationshipQueue: RelationshipQueue,
+ @Inject('queue:objectStorage') public objectStorageQueue: ObjectStorageQueue,
+ @Inject('queue:webhookDeliver') public webhookDeliverQueue: WebhookDeliverQueue,
+ ) {}
+
+ async onApplicationShutdown(signal: string): Promise<void> {
+ if (process.env.NODE_ENV === 'test') {
+ // XXX:
+ // Shutting down the existing connections causes errors on Jest as
+ // Misskey has asynchronous postgres/redis connections that are not
+ // awaited.
+ // Let's wait for some random time for them to finish.
+ await setTimeout(5000);
+ }
+ await Promise.all([
+ this.systemQueue.close(),
+ this.endedPollNotificationQueue.close(),
+ this.deliverQueue.close(),
+ this.inboxQueue.close(),
+ this.dbQueue.close(),
+ this.relationshipQueue.close(),
+ this.objectStorageQueue.close(),
+ this.webhookDeliverQueue.close(),
+ ]);
+ }
+}
diff --git a/packages/backend/src/queue/DbQueueProcessorsService.ts b/packages/backend/src/queue/DbQueueProcessorsService.ts
deleted file mode 100644
index df8ac3a301..0000000000
--- a/packages/backend/src/queue/DbQueueProcessorsService.ts
+++ /dev/null
@@ -1,69 +0,0 @@
-import { Inject, Injectable } from '@nestjs/common';
-import { DI } from '@/di-symbols.js';
-import type { Config } from '@/config.js';
-import { bindThis } from '@/decorators.js';
-import { DeleteDriveFilesProcessorService } from './processors/DeleteDriveFilesProcessorService.js';
-import { ExportCustomEmojisProcessorService } from './processors/ExportCustomEmojisProcessorService.js';
-import { ExportNotesProcessorService } from './processors/ExportNotesProcessorService.js';
-import { ExportFollowingProcessorService } from './processors/ExportFollowingProcessorService.js';
-import { ExportMutingProcessorService } from './processors/ExportMutingProcessorService.js';
-import { ExportBlockingProcessorService } from './processors/ExportBlockingProcessorService.js';
-import { ExportUserListsProcessorService } from './processors/ExportUserListsProcessorService.js';
-import { ExportAntennasProcessorService } from './processors/ExportAntennasProcessorService.js';
-import { ImportFollowingProcessorService } from './processors/ImportFollowingProcessorService.js';
-import { ImportMutingProcessorService } from './processors/ImportMutingProcessorService.js';
-import { ImportBlockingProcessorService } from './processors/ImportBlockingProcessorService.js';
-import { ImportUserListsProcessorService } from './processors/ImportUserListsProcessorService.js';
-import { ImportCustomEmojisProcessorService } from './processors/ImportCustomEmojisProcessorService.js';
-import { ImportAntennasProcessorService } from './processors/ImportAntennasProcessorService.js';
-import { DeleteAccountProcessorService } from './processors/DeleteAccountProcessorService.js';
-import { ExportFavoritesProcessorService } from './processors/ExportFavoritesProcessorService.js';
-import type Bull from 'bull';
-
-@Injectable()
-export class DbQueueProcessorsService {
- constructor(
- @Inject(DI.config)
- private config: Config,
-
- private deleteDriveFilesProcessorService: DeleteDriveFilesProcessorService,
- private exportCustomEmojisProcessorService: ExportCustomEmojisProcessorService,
- private exportNotesProcessorService: ExportNotesProcessorService,
- private exportFavoritesProcessorService: ExportFavoritesProcessorService,
- private exportFollowingProcessorService: ExportFollowingProcessorService,
- private exportMutingProcessorService: ExportMutingProcessorService,
- private exportBlockingProcessorService: ExportBlockingProcessorService,
- private exportUserListsProcessorService: ExportUserListsProcessorService,
- private exportAntennasProcessorService: ExportAntennasProcessorService,
- private importFollowingProcessorService: ImportFollowingProcessorService,
- private importMutingProcessorService: ImportMutingProcessorService,
- private importBlockingProcessorService: ImportBlockingProcessorService,
- private importUserListsProcessorService: ImportUserListsProcessorService,
- private importCustomEmojisProcessorService: ImportCustomEmojisProcessorService,
- private importAntennasProcessorService: ImportAntennasProcessorService,
- private deleteAccountProcessorService: DeleteAccountProcessorService,
- ) {
- }
-
- @bindThis
- public start(q: Bull.Queue): void {
- q.process('deleteDriveFiles', (job, done) => this.deleteDriveFilesProcessorService.process(job, done));
- q.process('exportCustomEmojis', (job, done) => this.exportCustomEmojisProcessorService.process(job, done));
- q.process('exportNotes', (job, done) => this.exportNotesProcessorService.process(job, done));
- q.process('exportFavorites', (job, done) => this.exportFavoritesProcessorService.process(job, done));
- q.process('exportFollowing', (job, done) => this.exportFollowingProcessorService.process(job, done));
- q.process('exportMuting', (job, done) => this.exportMutingProcessorService.process(job, done));
- q.process('exportBlocking', (job, done) => this.exportBlockingProcessorService.process(job, done));
- q.process('exportUserLists', (job, done) => this.exportUserListsProcessorService.process(job, done));
- q.process('exportAntennas', (job, done) => this.exportAntennasProcessorService.process(job, done));
- q.process('importFollowing', (job, done) => this.importFollowingProcessorService.process(job, done));
- q.process('importFollowingToDb', (job) => this.importFollowingProcessorService.processDb(job));
- q.process('importMuting', (job, done) => this.importMutingProcessorService.process(job, done));
- q.process('importBlocking', (job, done) => this.importBlockingProcessorService.process(job, done));
- q.process('importBlockingToDb', (job) => this.importBlockingProcessorService.processDb(job));
- q.process('importUserLists', (job, done) => this.importUserListsProcessorService.process(job, done));
- q.process('importCustomEmojis', (job, done) => this.importCustomEmojisProcessorService.process(job, done));
- q.process('importAntennas', (job, done) => this.importAntennasProcessorService.process(job, done));
- q.process('deleteAccount', (job) => this.deleteAccountProcessorService.process(job));
- }
-}
diff --git a/packages/backend/src/queue/ObjectStorageQueueProcessorsService.ts b/packages/backend/src/queue/ObjectStorageQueueProcessorsService.ts
deleted file mode 100644
index 865e47c3f8..0000000000
--- a/packages/backend/src/queue/ObjectStorageQueueProcessorsService.ts
+++ /dev/null
@@ -1,25 +0,0 @@
-import { Inject, Injectable } from '@nestjs/common';
-import { DI } from '@/di-symbols.js';
-import type { Config } from '@/config.js';
-import { CleanRemoteFilesProcessorService } from './processors/CleanRemoteFilesProcessorService.js';
-import { DeleteFileProcessorService } from './processors/DeleteFileProcessorService.js';
-import type Bull from 'bull';
-import { bindThis } from '@/decorators.js';
-
-@Injectable()
-export class ObjectStorageQueueProcessorsService {
- constructor(
- @Inject(DI.config)
- private config: Config,
-
- private deleteFileProcessorService: DeleteFileProcessorService,
- private cleanRemoteFilesProcessorService: CleanRemoteFilesProcessorService,
- ) {
- }
-
- @bindThis
- public start(q: Bull.Queue): void {
- q.process('deleteFile', 16, (job) => this.deleteFileProcessorService.process(job));
- q.process('cleanRemoteFiles', 16, (job, done) => this.cleanRemoteFilesProcessorService.process(job, done));
- }
-}
diff --git a/packages/backend/src/queue/QueueProcessorModule.ts b/packages/backend/src/queue/QueueProcessorModule.ts
index 3d4cc77321..e1c6b93d9b 100644
--- a/packages/backend/src/queue/QueueProcessorModule.ts
+++ b/packages/backend/src/queue/QueueProcessorModule.ts
@@ -3,14 +3,10 @@ import { CoreModule } from '@/core/CoreModule.js';
import { GlobalModule } from '@/GlobalModule.js';
import { QueueLoggerService } from './QueueLoggerService.js';
import { QueueProcessorService } from './QueueProcessorService.js';
-import { DbQueueProcessorsService } from './DbQueueProcessorsService.js';
-import { RelationshipQueueProcessorsService } from './RelationshipQueueProcessorsService.js';
-import { ObjectStorageQueueProcessorsService } from './ObjectStorageQueueProcessorsService.js';
import { DeliverProcessorService } from './processors/DeliverProcessorService.js';
import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
import { InboxProcessorService } from './processors/InboxProcessorService.js';
import { WebhookDeliverProcessorService } from './processors/WebhookDeliverProcessorService.js';
-import { SystemQueueProcessorsService } from './SystemQueueProcessorsService.js';
import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js';
import { CleanChartsProcessorService } from './processors/CleanChartsProcessorService.js';
import { CleanProcessorService } from './processors/CleanProcessorService.js';
@@ -68,10 +64,6 @@ import { RelationshipProcessorService } from './processors/RelationshipProcessor
DeleteFileProcessorService,
CleanRemoteFilesProcessorService,
RelationshipProcessorService,
- SystemQueueProcessorsService,
- ObjectStorageQueueProcessorsService,
- DbQueueProcessorsService,
- RelationshipQueueProcessorsService,
WebhookDeliverProcessorService,
EndedPollNotificationProcessorService,
DeliverProcessorService,
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts
index 706110f6fc..dc025f9889 100644
--- a/packages/backend/src/queue/QueueProcessorService.ts
+++ b/packages/backend/src/queue/QueueProcessorService.ts
@@ -5,15 +5,36 @@ import type Logger from '@/logger.js';
import { QueueService } from '@/core/QueueService.js';
import { bindThis } from '@/decorators.js';
import { getJobInfo } from './get-job-info.js';
-import { SystemQueueProcessorsService } from './SystemQueueProcessorsService.js';
-import { ObjectStorageQueueProcessorsService } from './ObjectStorageQueueProcessorsService.js';
-import { DbQueueProcessorsService } from './DbQueueProcessorsService.js';
import { WebhookDeliverProcessorService } from './processors/WebhookDeliverProcessorService.js';
import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
import { DeliverProcessorService } from './processors/DeliverProcessorService.js';
import { InboxProcessorService } from './processors/InboxProcessorService.js';
+import { DeleteDriveFilesProcessorService } from './processors/DeleteDriveFilesProcessorService.js';
+import { ExportCustomEmojisProcessorService } from './processors/ExportCustomEmojisProcessorService.js';
+import { ExportNotesProcessorService } from './processors/ExportNotesProcessorService.js';
+import { ExportFollowingProcessorService } from './processors/ExportFollowingProcessorService.js';
+import { ExportMutingProcessorService } from './processors/ExportMutingProcessorService.js';
+import { ExportBlockingProcessorService } from './processors/ExportBlockingProcessorService.js';
+import { ExportUserListsProcessorService } from './processors/ExportUserListsProcessorService.js';
+import { ExportAntennasProcessorService } from './processors/ExportAntennasProcessorService.js';
+import { ImportFollowingProcessorService } from './processors/ImportFollowingProcessorService.js';
+import { ImportMutingProcessorService } from './processors/ImportMutingProcessorService.js';
+import { ImportBlockingProcessorService } from './processors/ImportBlockingProcessorService.js';
+import { ImportUserListsProcessorService } from './processors/ImportUserListsProcessorService.js';
+import { ImportCustomEmojisProcessorService } from './processors/ImportCustomEmojisProcessorService.js';
+import { ImportAntennasProcessorService } from './processors/ImportAntennasProcessorService.js';
+import { DeleteAccountProcessorService } from './processors/DeleteAccountProcessorService.js';
+import { ExportFavoritesProcessorService } from './processors/ExportFavoritesProcessorService.js';
+import { CleanRemoteFilesProcessorService } from './processors/CleanRemoteFilesProcessorService.js';
+import { DeleteFileProcessorService } from './processors/DeleteFileProcessorService.js';
+import { RelationshipProcessorService } from './processors/RelationshipProcessorService.js';
+import { TickChartsProcessorService } from './processors/TickChartsProcessorService.js';
+import { ResyncChartsProcessorService } from './processors/ResyncChartsProcessorService.js';
+import { CleanChartsProcessorService } from './processors/CleanChartsProcessorService.js';
+import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js';
+import { CleanProcessorService } from './processors/CleanProcessorService.js';
+import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
import { QueueLoggerService } from './QueueLoggerService.js';
-import { RelationshipQueueProcessorsService } from './RelationshipQueueProcessorsService.js';
@Injectable()
export class QueueProcessorService {
@@ -25,14 +46,35 @@ export class QueueProcessorService {
private queueLoggerService: QueueLoggerService,
private queueService: QueueService,
- private systemQueueProcessorsService: SystemQueueProcessorsService,
- private objectStorageQueueProcessorsService: ObjectStorageQueueProcessorsService,
- private dbQueueProcessorsService: DbQueueProcessorsService,
- private relationshipQueueProcessorsService: RelationshipQueueProcessorsService,
private webhookDeliverProcessorService: WebhookDeliverProcessorService,
private endedPollNotificationProcessorService: EndedPollNotificationProcessorService,
private deliverProcessorService: DeliverProcessorService,
private inboxProcessorService: InboxProcessorService,
+ private deleteDriveFilesProcessorService: DeleteDriveFilesProcessorService,
+ private exportCustomEmojisProcessorService: ExportCustomEmojisProcessorService,
+ private exportNotesProcessorService: ExportNotesProcessorService,
+ private exportFavoritesProcessorService: ExportFavoritesProcessorService,
+ private exportFollowingProcessorService: ExportFollowingProcessorService,
+ private exportMutingProcessorService: ExportMutingProcessorService,
+ private exportBlockingProcessorService: ExportBlockingProcessorService,
+ private exportUserListsProcessorService: ExportUserListsProcessorService,
+ private exportAntennasProcessorService: ExportAntennasProcessorService,
+ private importFollowingProcessorService: ImportFollowingProcessorService,
+ private importMutingProcessorService: ImportMutingProcessorService,
+ private importBlockingProcessorService: ImportBlockingProcessorService,
+ private importUserListsProcessorService: ImportUserListsProcessorService,
+ private importCustomEmojisProcessorService: ImportCustomEmojisProcessorService,
+ private importAntennasProcessorService: ImportAntennasProcessorService,
+ private deleteAccountProcessorService: DeleteAccountProcessorService,
+ private deleteFileProcessorService: DeleteFileProcessorService,
+ private cleanRemoteFilesProcessorService: CleanRemoteFilesProcessorService,
+ private relationshipProcessorService: RelationshipProcessorService,
+ private tickChartsProcessorService: TickChartsProcessorService,
+ private resyncChartsProcessorService: ResyncChartsProcessorService,
+ private cleanChartsProcessorService: CleanChartsProcessorService,
+ private aggregateRetentionProcessorService: AggregateRetentionProcessorService,
+ private checkExpiredMutingsProcessorService: CheckExpiredMutingsProcessorService,
+ private cleanProcessorService: CleanProcessorService,
) {
this.logger = this.queueLoggerService.logger;
}
@@ -119,14 +161,6 @@ export class QueueProcessorService {
.on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) }))
.on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
- this.queueService.deliverQueue.process(this.config.deliverJobConcurrency ?? 128, (job) => this.deliverProcessorService.process(job));
- this.queueService.inboxQueue.process(this.config.inboxJobConcurrency ?? 16, (job) => this.inboxProcessorService.process(job));
- this.queueService.endedPollNotificationQueue.process((job, done) => this.endedPollNotificationProcessorService.process(job, done));
- this.queueService.webhookDeliverQueue.process(64, (job) => this.webhookDeliverProcessorService.process(job));
- this.dbQueueProcessorsService.start(this.queueService.dbQueue);
- this.relationshipQueueProcessorsService.start(this.queueService.relationshipQueue);
- this.objectStorageQueueProcessorsService.start(this.queueService.objectStorageQueue);
-
this.queueService.systemQueue.add('tickCharts', {
}, {
repeat: { cron: '55 * * * *' },
@@ -163,6 +197,46 @@ export class QueueProcessorService {
removeOnComplete: true,
});
- this.systemQueueProcessorsService.start(this.queueService.systemQueue);
+ this.queueService.deliverQueue.process(this.config.deliverJobConcurrency ?? 128, (job) => this.deliverProcessorService.process(job));
+ this.queueService.inboxQueue.process(this.config.inboxJobConcurrency ?? 16, (job) => this.inboxProcessorService.process(job));
+ this.queueService.endedPollNotificationQueue.process((job, done) => this.endedPollNotificationProcessorService.process(job, done));
+ this.queueService.webhookDeliverQueue.process(64, (job) => this.webhookDeliverProcessorService.process(job));
+
+ this.queueService.dbQueue.process('deleteDriveFiles', (job, done) => this.deleteDriveFilesProcessorService.process(job, done));
+ this.queueService.dbQueue.process('exportCustomEmojis', (job, done) => this.exportCustomEmojisProcessorService.process(job, done));
+ this.queueService.dbQueue.process('exportNotes', (job, done) => this.exportNotesProcessorService.process(job, done));
+ this.queueService.dbQueue.process('exportFavorites', (job, done) => this.exportFavoritesProcessorService.process(job, done));
+ this.queueService.dbQueue.process('exportFollowing', (job, done) => this.exportFollowingProcessorService.process(job, done));
+ this.queueService.dbQueue.process('exportMuting', (job, done) => this.exportMutingProcessorService.process(job, done));
+ this.queueService.dbQueue.process('exportBlocking', (job, done) => this.exportBlockingProcessorService.process(job, done));
+ this.queueService.dbQueue.process('exportUserLists', (job, done) => this.exportUserListsProcessorService.process(job, done));
+ this.queueService.dbQueue.process('exportAntennas', (job, done) => this.exportAntennasProcessorService.process(job, done));
+ this.queueService.dbQueue.process('importFollowing', (job, done) => this.importFollowingProcessorService.process(job, done));
+ this.queueService.dbQueue.process('importFollowingToDb', (job) => this.importFollowingProcessorService.processDb(job));
+ this.queueService.dbQueue.process('importMuting', (job, done) => this.importMutingProcessorService.process(job, done));
+ this.queueService.dbQueue.process('importBlocking', (job, done) => this.importBlockingProcessorService.process(job, done));
+ this.queueService.dbQueue.process('importBlockingToDb', (job) => this.importBlockingProcessorService.processDb(job));
+ this.queueService.dbQueue.process('importUserLists', (job, done) => this.importUserListsProcessorService.process(job, done));
+ this.queueService.dbQueue.process('importCustomEmojis', (job, done) => this.importCustomEmojisProcessorService.process(job, done));
+ this.queueService.dbQueue.process('importAntennas', (job, done) => this.importAntennasProcessorService.process(job, done));
+ this.queueService.dbQueue.process('deleteAccount', (job) => this.deleteAccountProcessorService.process(job));
+
+ this.queueService.objectStorageQueue.process('deleteFile', 16, (job) => this.deleteFileProcessorService.process(job));
+ this.queueService.objectStorageQueue.process('cleanRemoteFiles', 16, (job, done) => this.cleanRemoteFilesProcessorService.process(job, done));
+
+ {
+ const maxJobs = this.config.relashionshipJobConcurrency ?? 16;
+ this.queueService.relationshipQueue.process('follow', maxJobs, (job) => this.relationshipProcessorService.processFollow(job));
+ this.queueService.relationshipQueue.process('unfollow', maxJobs, (job) => this.relationshipProcessorService.processUnfollow(job));
+ this.queueService.relationshipQueue.process('block', maxJobs, (job) => this.relationshipProcessorService.processBlock(job));
+ this.queueService.relationshipQueue.process('unblock', maxJobs, (job) => this.relationshipProcessorService.processUnblock(job));
+ }
+
+ this.queueService.systemQueue.process('tickCharts', (job, done) => this.tickChartsProcessorService.process(job, done));
+ this.queueService.systemQueue.process('resyncCharts', (job, done) => this.resyncChartsProcessorService.process(job, done));
+ this.queueService.systemQueue.process('cleanCharts', (job, done) => this.cleanChartsProcessorService.process(job, done));
+ this.queueService.systemQueue.process('aggregateRetention', (job, done) => this.aggregateRetentionProcessorService.process(job, done));
+ this.queueService.systemQueue.process('checkExpiredMutings', (job, done) => this.checkExpiredMutingsProcessorService.process(job, done));
+ this.queueService.systemQueue.process('clean', (job, done) => this.cleanProcessorService.process(job, done));
}
}
diff --git a/packages/backend/src/queue/RelationshipQueueProcessorsService.ts b/packages/backend/src/queue/RelationshipQueueProcessorsService.ts
deleted file mode 100644
index 736b4fa80d..0000000000
--- a/packages/backend/src/queue/RelationshipQueueProcessorsService.ts
+++ /dev/null
@@ -1,26 +0,0 @@
-import { Inject, Injectable } from '@nestjs/common';
-import { bindThis } from '@/decorators.js';
-import { RelationshipProcessorService } from './processors/RelationshipProcessorService.js';
-import type Bull from 'bull';
-import { DI } from '@/di-symbols.js';
-import type { Config } from '@/config.js';
-
-@Injectable()
-export class RelationshipQueueProcessorsService {
- constructor(
- @Inject(DI.config)
- private config: Config,
-
- private relationshipProcessorService: RelationshipProcessorService,
- ) {
- }
-
- @bindThis
- public start(q: Bull.Queue): void {
- const maxJobs = this.config.relashionshipJobConcurrency ?? 16;
- q.process('follow', maxJobs, (job) => this.relationshipProcessorService.processFollow(job));
- q.process('unfollow', maxJobs, (job) => this.relationshipProcessorService.processUnfollow(job));
- q.process('block', maxJobs, (job) => this.relationshipProcessorService.processBlock(job));
- q.process('unblock', maxJobs, (job) => this.relationshipProcessorService.processUnblock(job));
- }
-}
diff --git a/packages/backend/src/queue/SystemQueueProcessorsService.ts b/packages/backend/src/queue/SystemQueueProcessorsService.ts
deleted file mode 100644
index 7fb0da4b10..0000000000
--- a/packages/backend/src/queue/SystemQueueProcessorsService.ts
+++ /dev/null
@@ -1,37 +0,0 @@
-import { Inject, Injectable } from '@nestjs/common';
-import { DI } from '@/di-symbols.js';
-import type { Config } from '@/config.js';
-import { bindThis } from '@/decorators.js';
-import { TickChartsProcessorService } from './processors/TickChartsProcessorService.js';
-import { ResyncChartsProcessorService } from './processors/ResyncChartsProcessorService.js';
-import { CleanChartsProcessorService } from './processors/CleanChartsProcessorService.js';
-import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js';
-import { CleanProcessorService } from './processors/CleanProcessorService.js';
-import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
-import type Bull from 'bull';
-
-@Injectable()
-export class SystemQueueProcessorsService {
- constructor(
- @Inject(DI.config)
- private config: Config,
-
- private tickChartsProcessorService: TickChartsProcessorService,
- private resyncChartsProcessorService: ResyncChartsProcessorService,
- private cleanChartsProcessorService: CleanChartsProcessorService,
- private aggregateRetentionProcessorService: AggregateRetentionProcessorService,
- private checkExpiredMutingsProcessorService: CheckExpiredMutingsProcessorService,
- private cleanProcessorService: CleanProcessorService,
- ) {
- }
-
- @bindThis
- public start(q: Bull.Queue): void {
- q.process('tickCharts', (job, done) => this.tickChartsProcessorService.process(job, done));
- q.process('resyncCharts', (job, done) => this.resyncChartsProcessorService.process(job, done));
- q.process('cleanCharts', (job, done) => this.cleanChartsProcessorService.process(job, done));
- q.process('aggregateRetention', (job, done) => this.aggregateRetentionProcessorService.process(job, done));
- q.process('checkExpiredMutings', (job, done) => this.checkExpiredMutingsProcessorService.process(job, done));
- q.process('clean', (job, done) => this.cleanProcessorService.process(job, done));
- }
-}