summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2022-09-18 03:27:08 +0900
committerGitHub <noreply@github.com>2022-09-18 03:27:08 +0900
commitb75184ec8e3436200bacdcd832e3324702553d20 (patch)
tree8b7e316f29e95df921db57289c8b8da476d18f07 /packages/backend/src/queue
parentUpdate ROADMAP.md (diff)
downloadsharkey-b75184ec8e3436200bacdcd832e3324702553d20.tar.gz
sharkey-b75184ec8e3436200bacdcd832e3324702553d20.tar.bz2
sharkey-b75184ec8e3436200bacdcd832e3324702553d20.zip
なんかもうめっちゃ変えた
Diffstat (limited to 'packages/backend/src/queue')
-rw-r--r--packages/backend/src/queue/DbQueueProcessorsService.ts63
-rw-r--r--packages/backend/src/queue/ObjectStorageQueueProcessorsService.ts30
-rw-r--r--packages/backend/src/queue/QueueLoggerService.ts12
-rw-r--r--packages/backend/src/queue/QueueProcessorModule.ts72
-rw-r--r--packages/backend/src/queue/QueueProcessorService.ts141
-rw-r--r--packages/backend/src/queue/SystemQueueProcessorsService.ts38
-rw-r--r--packages/backend/src/queue/index.ts342
-rw-r--r--packages/backend/src/queue/initialize.ts34
-rw-r--r--packages/backend/src/queue/logger.ts3
-rw-r--r--packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts50
-rw-r--r--packages/backend/src/queue/processors/CleanChartsProcessorService.ts68
-rw-r--r--packages/backend/src/queue/processors/CleanProcessorService.ts36
-rw-r--r--packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts69
-rw-r--r--packages/backend/src/queue/processors/DeleteAccountProcessorService.ts124
-rw-r--r--packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts78
-rw-r--r--packages/backend/src/queue/processors/DeleteFileProcessorService.ts31
-rw-r--r--packages/backend/src/queue/processors/DeliverProcessorService.ts130
-rw-r--r--packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts56
-rw-r--r--packages/backend/src/queue/processors/ExportBlockingProcessorService.ts117
-rw-r--r--packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts135
-rw-r--r--packages/backend/src/queue/processors/ExportFollowingProcessorService.ts120
-rw-r--r--packages/backend/src/queue/processors/ExportMutingProcessorService.ts120
-rw-r--r--packages/backend/src/queue/processors/ExportNotesProcessorService.ts143
-rw-r--r--packages/backend/src/queue/processors/ExportUserListsProcessorService.ts96
-rw-r--r--packages/backend/src/queue/processors/ImportBlockingProcessorService.ts102
-rw-r--r--packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts110
-rw-r--r--packages/backend/src/queue/processors/ImportFollowingProcessorService.ts99
-rw-r--r--packages/backend/src/queue/processors/ImportMutingProcessorService.ts100
-rw-r--r--packages/backend/src/queue/processors/ImportUserListsProcessorService.ts112
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts195
-rw-r--r--packages/backend/src/queue/processors/ResyncChartsProcessorService.ts61
-rw-r--r--packages/backend/src/queue/processors/TickChartsProcessorService.ts68
-rw-r--r--packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts79
-rw-r--r--packages/backend/src/queue/processors/db/delete-account.ts94
-rw-r--r--packages/backend/src/queue/processors/db/delete-drive-files.ts56
-rw-r--r--packages/backend/src/queue/processors/db/export-blocking.ts93
-rw-r--r--packages/backend/src/queue/processors/db/export-custom-emojis.ts114
-rw-r--r--packages/backend/src/queue/processors/db/export-following.ts94
-rw-r--r--packages/backend/src/queue/processors/db/export-mute.ts94
-rw-r--r--packages/backend/src/queue/processors/db/export-notes.ts118
-rw-r--r--packages/backend/src/queue/processors/db/export-user-lists.ts70
-rw-r--r--packages/backend/src/queue/processors/db/import-blocking.ts75
-rw-r--r--packages/backend/src/queue/processors/db/import-custom-emojis.ts81
-rw-r--r--packages/backend/src/queue/processors/db/import-following.ts74
-rw-r--r--packages/backend/src/queue/processors/db/import-muting.ts84
-rw-r--r--packages/backend/src/queue/processors/db/import-user-lists.ts80
-rw-r--r--packages/backend/src/queue/processors/db/index.ts37
-rw-r--r--packages/backend/src/queue/processors/deliver.ts98
-rw-r--r--packages/backend/src/queue/processors/ended-poll-notification.ts33
-rw-r--r--packages/backend/src/queue/processors/inbox.ts157
-rw-r--r--packages/backend/src/queue/processors/object-storage/clean-remote-files.ts50
-rw-r--r--packages/backend/src/queue/processors/object-storage/delete-file.ts11
-rw-r--r--packages/backend/src/queue/processors/object-storage/index.ts15
-rw-r--r--packages/backend/src/queue/processors/system/check-expired-mutings.ts30
-rw-r--r--packages/backend/src/queue/processors/system/clean-charts.ts28
-rw-r--r--packages/backend/src/queue/processors/system/clean.ts18
-rw-r--r--packages/backend/src/queue/processors/system/index.ts20
-rw-r--r--packages/backend/src/queue/processors/system/resync-charts.ts21
-rw-r--r--packages/backend/src/queue/processors/system/tick-charts.ts28
-rw-r--r--packages/backend/src/queue/processors/webhook-deliver.ts59
-rw-r--r--packages/backend/src/queue/queues.ts21
-rw-r--r--packages/backend/src/queue/types.ts12
62 files changed, 2661 insertions, 2138 deletions
diff --git a/packages/backend/src/queue/DbQueueProcessorsService.ts b/packages/backend/src/queue/DbQueueProcessorsService.ts
new file mode 100644
index 0000000000..fcc9873a6f
--- /dev/null
+++ b/packages/backend/src/queue/DbQueueProcessorsService.ts
@@ -0,0 +1,63 @@
+import { Inject, Injectable } from '@nestjs/common';
+import type { DbJobData } from '@/queue/types.js';
+import { DI } from '@/di-symbols.js';
+import { Config } from '@/config.js';
+import { DeleteDriveFilesProcessorService } from './processors/DeleteDriveFilesProcessorService.js';
+import { ExportCustomEmojisProcessorService } from './processors/ExportCustomEmojisProcessorService.js';
+import { ExportNotesProcessorService } from './processors/ExportNotesProcessorService.js';
+import { ExportFollowingProcessorService } from './processors/ExportFollowingProcessorService.js';
+import { ExportMutingProcessorService } from './processors/ExportMutingProcessorService.js';
+import { ExportBlockingProcessorService } from './processors/ExportBlockingProcessorService.js';
+import { ExportUserListsProcessorService } from './processors/ExportUserListsProcessorService.js';
+import { ImportFollowingProcessorService } from './processors/ImportFollowingProcessorService.js';
+import { ImportMutingProcessorService } from './processors/ImportMutingProcessorService.js';
+import { ImportBlockingProcessorService } from './processors/ImportBlockingProcessorService.js';
+import { ImportUserListsProcessorService } from './processors/ImportUserListsProcessorService.js';
+import { ImportCustomEmojisProcessorService } from './processors/ImportCustomEmojisProcessorService.js';
+import { DeleteAccountProcessorService } from './processors/DeleteAccountProcessorService.js';
+import type Bull from 'bull';
+
+@Injectable()
+export class DbQueueProcessorsService {
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ private deleteDriveFilesProcessorService: DeleteDriveFilesProcessorService,
+ private exportCustomEmojisProcessorService: ExportCustomEmojisProcessorService,
+ private exportNotesProcessorService: ExportNotesProcessorService,
+ private exportFollowingProcessorService: ExportFollowingProcessorService,
+ private exportMutingProcessorService: ExportMutingProcessorService,
+ private exportBlockingProcessorService: ExportBlockingProcessorService,
+ private exportUserListsProcessorService: ExportUserListsProcessorService,
+ private importFollowingProcessorService: ImportFollowingProcessorService,
+ private importMutingProcessorService: ImportMutingProcessorService,
+ private importBlockingProcessorService: ImportBlockingProcessorService,
+ private importUserListsProcessorService: ImportUserListsProcessorService,
+ private importCustomEmojisProcessorService: ImportCustomEmojisProcessorService,
+ private deleteAccountProcessorService: DeleteAccountProcessorService,
+ ) {
+ }
+
+ public start(dbQueue: Bull.Queue<DbJobData>) {
+ const jobs = {
+ deleteDriveFiles: (job, done) => this.deleteDriveFilesProcessorService.process(job, done),
+ exportCustomEmojis: (job, done) => this.exportCustomEmojisProcessorService.process(job, done),
+ exportNotes: (job, done) => this.exportNotesProcessorService.process(job, done),
+ exportFollowing: (job, done) => this.exportFollowingProcessorService.process(job, done),
+ exportMuting: (job, done) => this.exportMutingProcessorService.process(job, done),
+ exportBlocking: (job, done) => this.exportBlockingProcessorService.process(job, done),
+ exportUserLists: (job, done) => this.exportUserListsProcessorService.process(job, done),
+ importFollowing: (job, done) => this.importFollowingProcessorService.process(job, done),
+ importMuting: (job, done) => this.importMutingProcessorService.process(job, done),
+ importBlocking: (job, done) => this.importBlockingProcessorService.process(job, done),
+ importUserLists: (job, done) => this.importUserListsProcessorService.process(job, done),
+ importCustomEmojis: (job, done) => this.importCustomEmojisProcessorService.process(job, done),
+ deleteAccount: (job, done) => this.deleteAccountProcessorService.process(job, done),
+ } as Record<string, Bull.ProcessCallbackFunction<DbJobData | Bull.ProcessPromiseFunction<DbJobData>>>;
+
+ for (const [k, v] of Object.entries(jobs)) {
+ dbQueue.process(k, v);
+ }
+ }
+}
diff --git a/packages/backend/src/queue/ObjectStorageQueueProcessorsService.ts b/packages/backend/src/queue/ObjectStorageQueueProcessorsService.ts
new file mode 100644
index 0000000000..402c038be0
--- /dev/null
+++ b/packages/backend/src/queue/ObjectStorageQueueProcessorsService.ts
@@ -0,0 +1,30 @@
+import { Inject, Injectable } from '@nestjs/common';
+import type { ObjectStorageJobData } from '@/queue/types.js';
+import { DI } from '@/di-symbols.js';
+import { Config } from '@/config.js';
+import { CleanRemoteFilesProcessorService } from './processors/CleanRemoteFilesProcessorService.js';
+import { DeleteFileProcessorService } from './processors/DeleteFileProcessorService.js';
+import type Bull from 'bull';
+
+@Injectable()
+export class ObjectStorageQueueProcessorsService {
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ private deleteFileProcessorService: DeleteFileProcessorService,
+ private cleanRemoteFilesProcessorService: CleanRemoteFilesProcessorService,
+ ) {
+ }
+
+ public start(q: Bull.Queue) {
+ const jobs = {
+ deleteFile: (job, done) => this.deleteFileProcessorService.process(job, done),
+ cleanRemoteFiles: (job, done) => this.cleanRemoteFilesProcessorService.process(job, done),
+ } as Record<string, Bull.ProcessCallbackFunction<ObjectStorageJobData | Bull.ProcessPromiseFunction<ObjectStorageJobData>>>;
+
+ for (const [k, v] of Object.entries(jobs)) {
+ q.process(k, 16, v);
+ }
+ }
+}
diff --git a/packages/backend/src/queue/QueueLoggerService.ts b/packages/backend/src/queue/QueueLoggerService.ts
new file mode 100644
index 0000000000..4cdd4edfbb
--- /dev/null
+++ b/packages/backend/src/queue/QueueLoggerService.ts
@@ -0,0 +1,12 @@
+import { Inject, Injectable } from '@nestjs/common';
+import Logger from '@/logger.js';
+
+@Injectable()
+export class QueueLoggerService {
+ public logger: Logger;
+
+ constructor(
+ ) {
+ this.logger = new Logger('queue', 'orange');
+ }
+}
diff --git a/packages/backend/src/queue/QueueProcessorModule.ts b/packages/backend/src/queue/QueueProcessorModule.ts
new file mode 100644
index 0000000000..f13dd3ef19
--- /dev/null
+++ b/packages/backend/src/queue/QueueProcessorModule.ts
@@ -0,0 +1,72 @@
+import { Module } from '@nestjs/common';
+import { CoreModule } from '@/core/CoreModule.js';
+import { QueueLoggerService } from './QueueLoggerService.js';
+import { QueueProcessorService } from './QueueProcessorService.js';
+import { DbQueueProcessorsService } from './DbQueueProcessorsService.js';
+import { ObjectStorageQueueProcessorsService } from './ObjectStorageQueueProcessorsService.js';
+import { DeliverProcessorService } from './processors/DeliverProcessorService.js';
+import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
+import { InboxProcessorService } from './processors/InboxProcessorService.js';
+import { WebhookDeliverProcessorService } from './processors/WebhookDeliverProcessorService.js';
+import { SystemQueueProcessorsService } from './SystemQueueProcessorsService.js';
+import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js';
+import { CleanChartsProcessorService } from './processors/CleanChartsProcessorService.js';
+import { CleanProcessorService } from './processors/CleanProcessorService.js';
+import { CleanRemoteFilesProcessorService } from './processors/CleanRemoteFilesProcessorService.js';
+import { DeleteAccountProcessorService } from './processors/DeleteAccountProcessorService.js';
+import { DeleteDriveFilesProcessorService } from './processors/DeleteDriveFilesProcessorService.js';
+import { DeleteFileProcessorService } from './processors/DeleteFileProcessorService.js';
+import { ExportBlockingProcessorService } from './processors/ExportBlockingProcessorService.js';
+import { ExportCustomEmojisProcessorService } from './processors/ExportCustomEmojisProcessorService.js';
+import { ExportFollowingProcessorService } from './processors/ExportFollowingProcessorService.js';
+import { ExportMutingProcessorService } from './processors/ExportMutingProcessorService.js';
+import { ExportNotesProcessorService } from './processors/ExportNotesProcessorService.js';
+import { ExportUserListsProcessorService } from './processors/ExportUserListsProcessorService.js';
+import { ImportBlockingProcessorService } from './processors/ImportBlockingProcessorService.js';
+import { ImportCustomEmojisProcessorService } from './processors/ImportCustomEmojisProcessorService.js';
+import { ImportFollowingProcessorService } from './processors/ImportFollowingProcessorService.js';
+import { ImportMutingProcessorService } from './processors/ImportMutingProcessorService.js';
+import { ImportUserListsProcessorService } from './processors/ImportUserListsProcessorService.js';
+import { ResyncChartsProcessorService } from './processors/ResyncChartsProcessorService.js';
+import { TickChartsProcessorService } from './processors/TickChartsProcessorService.js';
+
+@Module({
+ imports: [
+ CoreModule,
+ ],
+ providers: [
+ QueueLoggerService,
+ TickChartsProcessorService,
+ ResyncChartsProcessorService,
+ CleanChartsProcessorService,
+ CheckExpiredMutingsProcessorService,
+ CleanProcessorService,
+ DeleteDriveFilesProcessorService,
+ ExportCustomEmojisProcessorService,
+ ExportNotesProcessorService,
+ ExportFollowingProcessorService,
+ ExportMutingProcessorService,
+ ExportBlockingProcessorService,
+ ExportUserListsProcessorService,
+ ImportFollowingProcessorService,
+ ImportMutingProcessorService,
+ ImportBlockingProcessorService,
+ ImportUserListsProcessorService,
+ ImportCustomEmojisProcessorService,
+ DeleteAccountProcessorService,
+ DeleteFileProcessorService,
+ CleanRemoteFilesProcessorService,
+ SystemQueueProcessorsService,
+ ObjectStorageQueueProcessorsService,
+ DbQueueProcessorsService,
+ WebhookDeliverProcessorService,
+ EndedPollNotificationProcessorService,
+ DeliverProcessorService,
+ InboxProcessorService,
+ QueueProcessorService,
+ ],
+ exports: [
+ QueueProcessorService,
+ ],
+})
+export class QueueProcessorModule {}
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts
new file mode 100644
index 0000000000..27afce0824
--- /dev/null
+++ b/packages/backend/src/queue/QueueProcessorService.ts
@@ -0,0 +1,141 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { ModuleRef } from '@nestjs/core';
+import { Config } from '@/config.js';
+import { DI } from '@/di-symbols.js';
+import type Logger from '@/logger.js';
+import { QueueService } from '@/core/QueueService.js';
+import { getJobInfo } from './get-job-info.js';
+import { SystemQueueProcessorsService } from './SystemQueueProcessorsService.js';
+import { ObjectStorageQueueProcessorsService } from './ObjectStorageQueueProcessorsService.js';
+import { DbQueueProcessorsService } from './DbQueueProcessorsService.js';
+import { WebhookDeliverProcessorService } from './processors/WebhookDeliverProcessorService.js';
+import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
+import { DeliverProcessorService } from './processors/DeliverProcessorService.js';
+import { InboxProcessorService } from './processors/InboxProcessorService.js';
+import { QueueLoggerService } from './QueueLoggerService.js';
+
+@Injectable()
+export class QueueProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ private queueLoggerService: QueueLoggerService,
+ private queueService: QueueService,
+ private systemQueueProcessorsService: SystemQueueProcessorsService,
+ private objectStorageQueueProcessorsService: ObjectStorageQueueProcessorsService,
+ private dbQueueProcessorsService: DbQueueProcessorsService,
+ private webhookDeliverProcessorService: WebhookDeliverProcessorService,
+ private endedPollNotificationProcessorService: EndedPollNotificationProcessorService,
+ private deliverProcessorService: DeliverProcessorService,
+ private inboxProcessorService: InboxProcessorService,
+ ) {
+ this.#logger = this.queueLoggerService.logger;
+ }
+
+ public start() {
+ function renderError(e: Error): any {
+ return {
+ stack: e.stack,
+ message: e.message,
+ name: e.name,
+ };
+ }
+
+ const systemLogger = this.#logger.createSubLogger('system');
+ const deliverLogger = this.#logger.createSubLogger('deliver');
+ const webhookLogger = this.#logger.createSubLogger('webhook');
+ const inboxLogger = this.#logger.createSubLogger('inbox');
+ const dbLogger = this.#logger.createSubLogger('db');
+ const objectStorageLogger = this.#logger.createSubLogger('objectStorage');
+
+ this.queueService.systemQueue
+ .on('waiting', (jobId) => systemLogger.debug(`waiting id=${jobId}`))
+ .on('active', (job) => systemLogger.debug(`active id=${job.id}`))
+ .on('completed', (job, result) => systemLogger.debug(`completed(${result}) id=${job.id}`))
+ .on('failed', (job, err) => systemLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
+ .on('error', (job: any, err: Error) => systemLogger.error(`error ${err}`, { job, e: renderError(err) }))
+ .on('stalled', (job) => systemLogger.warn(`stalled id=${job.id}`));
+
+ this.queueService.deliverQueue
+ .on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`))
+ .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`))
+ .on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`, { job, e: renderError(err) }))
+ .on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
+
+ this.queueService.inboxQueue
+ .on('waiting', (jobId) => inboxLogger.debug(`waiting id=${jobId}`))
+ .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`))
+ .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
+ .on('failed', (job, err) => inboxLogger.warn(`failed(${err}) ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`, { job, e: renderError(err) }))
+ .on('error', (job: any, err: Error) => inboxLogger.error(`error ${err}`, { job, e: renderError(err) }))
+ .on('stalled', (job) => inboxLogger.warn(`stalled ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`));
+
+ this.queueService.dbQueue
+ .on('waiting', (jobId) => dbLogger.debug(`waiting id=${jobId}`))
+ .on('active', (job) => dbLogger.debug(`active id=${job.id}`))
+ .on('completed', (job, result) => dbLogger.debug(`completed(${result}) id=${job.id}`))
+ .on('failed', (job, err) => dbLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
+ .on('error', (job: any, err: Error) => dbLogger.error(`error ${err}`, { job, e: renderError(err) }))
+ .on('stalled', (job) => dbLogger.warn(`stalled id=${job.id}`));
+
+ this.queueService.objectStorageQueue
+ .on('waiting', (jobId) => objectStorageLogger.debug(`waiting id=${jobId}`))
+ .on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`))
+ .on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`))
+ .on('failed', (job, err) => objectStorageLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
+ .on('error', (job: any, err: Error) => objectStorageLogger.error(`error ${err}`, { job, e: renderError(err) }))
+ .on('stalled', (job) => objectStorageLogger.warn(`stalled id=${job.id}`));
+
+ this.queueService.webhookDeliverQueue
+ .on('waiting', (jobId) => webhookLogger.debug(`waiting id=${jobId}`))
+ .on('active', (job) => webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('completed', (job, result) => webhookLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('failed', (job, err) => webhookLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`))
+ .on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) }))
+ .on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
+
+ this.queueService.deliverQueue.process(this.config.deliverJobConcurrency ?? 128, (job, done) => this.deliverProcessorService.process(job));
+ this.queueService.inboxQueue.process(this.config.inboxJobConcurrency ?? 16, (job, done) => this.inboxProcessorService.process(job));
+ this.queueService.endedPollNotificationQueue.process((job, done) => this.endedPollNotificationProcessorService.process(job, done));
+ this.queueService.webhookDeliverQueue.process(64, (job, done) => this.webhookDeliverProcessorService.process(job));
+ this.dbQueueProcessorsService.start(this.queueService.dbQueue);
+ this.objectStorageQueueProcessorsService.start(this.queueService.objectStorageQueue);
+
+ this.queueService.systemQueue.add('tickCharts', {
+ }, {
+ repeat: { cron: '55 * * * *' },
+ removeOnComplete: true,
+ });
+
+ this.queueService.systemQueue.add('resyncCharts', {
+ }, {
+ repeat: { cron: '0 0 * * *' },
+ removeOnComplete: true,
+ });
+
+ this.queueService.systemQueue.add('cleanCharts', {
+ }, {
+ repeat: { cron: '0 0 * * *' },
+ removeOnComplete: true,
+ });
+
+ this.queueService.systemQueue.add('clean', {
+ }, {
+ repeat: { cron: '0 0 * * *' },
+ removeOnComplete: true,
+ });
+
+ this.queueService.systemQueue.add('checkExpiredMutings', {
+ }, {
+ repeat: { cron: '*/5 * * * *' },
+ removeOnComplete: true,
+ });
+
+ this.systemQueueProcessorsService.start(this.queueService.systemQueue);
+ }
+}
diff --git a/packages/backend/src/queue/SystemQueueProcessorsService.ts b/packages/backend/src/queue/SystemQueueProcessorsService.ts
new file mode 100644
index 0000000000..7c227296e7
--- /dev/null
+++ b/packages/backend/src/queue/SystemQueueProcessorsService.ts
@@ -0,0 +1,38 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { DI } from '@/di-symbols.js';
+import { Config } from '@/config.js';
+import { TickChartsProcessorService } from './processors/TickChartsProcessorService.js';
+import { ResyncChartsProcessorService } from './processors/ResyncChartsProcessorService.js';
+import { CleanChartsProcessorService } from './processors/CleanChartsProcessorService.js';
+import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js';
+import { CleanProcessorService } from './processors/CleanProcessorService.js';
+import type Bull from 'bull';
+
+@Injectable()
+export class SystemQueueProcessorsService {
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ private tickChartsProcessorService: TickChartsProcessorService,
+ private resyncChartsProcessorService: ResyncChartsProcessorService,
+ private cleanChartsProcessorService: CleanChartsProcessorService,
+ private checkExpiredMutingsProcessorService: CheckExpiredMutingsProcessorService,
+ private cleanProcessorService: CleanProcessorService,
+ ) {
+ }
+
+ public start(dbQueue: Bull.Queue<Record<string, unknown>>) {
+ const jobs = {
+ tickCharts: (job, done) => this.tickChartsProcessorService.process(job, done),
+ resyncCharts: (job, done) => this.resyncChartsProcessorService.process(job, done),
+ cleanCharts: (job, done) => this.cleanChartsProcessorService.process(job, done),
+ checkExpiredMutings: (job, done) => this.checkExpiredMutingsProcessorService.process(job, done),
+ clean: (job, done) => this.cleanProcessorService.process(job, done),
+ } as Record<string, Bull.ProcessCallbackFunction<Record<string, unknown>> | Bull.ProcessPromiseFunction<Record<string, unknown>>>;
+
+ for (const [k, v] of Object.entries(jobs)) {
+ dbQueue.process(k, v);
+ }
+ }
+}
diff --git a/packages/backend/src/queue/index.ts b/packages/backend/src/queue/index.ts
deleted file mode 100644
index ebb3a77cab..0000000000
--- a/packages/backend/src/queue/index.ts
+++ /dev/null
@@ -1,342 +0,0 @@
-import httpSignature from '@peertube/http-signature';
-import { v4 as uuid } from 'uuid';
-
-import config from '@/config/index.js';
-import { DriveFile } from '@/models/entities/drive-file.js';
-import { IActivity } from '@/remote/activitypub/type.js';
-import { Webhook, webhookEventTypes } from '@/models/entities/webhook.js';
-import { envOption } from '../env.js';
-
-import processDeliver from './processors/deliver.js';
-import processInbox from './processors/inbox.js';
-import processDb from './processors/db/index.js';
-import processObjectStorage from './processors/object-storage/index.js';
-import processSystemQueue from './processors/system/index.js';
-import processWebhookDeliver from './processors/webhook-deliver.js';
-import { endedPollNotification } from './processors/ended-poll-notification.js';
-import { queueLogger } from './logger.js';
-import { getJobInfo } from './get-job-info.js';
-import { systemQueue, dbQueue, deliverQueue, inboxQueue, objectStorageQueue, endedPollNotificationQueue, webhookDeliverQueue } from './queues.js';
-import { ThinUser } from './types.js';
-
-function renderError(e: Error): any {
- return {
- stack: e.stack,
- message: e.message,
- name: e.name,
- };
-}
-
-const systemLogger = queueLogger.createSubLogger('system');
-const deliverLogger = queueLogger.createSubLogger('deliver');
-const webhookLogger = queueLogger.createSubLogger('webhook');
-const inboxLogger = queueLogger.createSubLogger('inbox');
-const dbLogger = queueLogger.createSubLogger('db');
-const objectStorageLogger = queueLogger.createSubLogger('objectStorage');
-
-systemQueue
- .on('waiting', (jobId) => systemLogger.debug(`waiting id=${jobId}`))
- .on('active', (job) => systemLogger.debug(`active id=${job.id}`))
- .on('completed', (job, result) => systemLogger.debug(`completed(${result}) id=${job.id}`))
- .on('failed', (job, err) => systemLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
- .on('error', (job: any, err: Error) => systemLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => systemLogger.warn(`stalled id=${job.id}`));
-
-deliverQueue
- .on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`))
- .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
- .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
- .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`))
- .on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
-
-inboxQueue
- .on('waiting', (jobId) => inboxLogger.debug(`waiting id=${jobId}`))
- .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`))
- .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
- .on('failed', (job, err) => inboxLogger.warn(`failed(${err}) ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`, { job, e: renderError(err) }))
- .on('error', (job: any, err: Error) => inboxLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => inboxLogger.warn(`stalled ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`));
-
-dbQueue
- .on('waiting', (jobId) => dbLogger.debug(`waiting id=${jobId}`))
- .on('active', (job) => dbLogger.debug(`active id=${job.id}`))
- .on('completed', (job, result) => dbLogger.debug(`completed(${result}) id=${job.id}`))
- .on('failed', (job, err) => dbLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
- .on('error', (job: any, err: Error) => dbLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => dbLogger.warn(`stalled id=${job.id}`));
-
-objectStorageQueue
- .on('waiting', (jobId) => objectStorageLogger.debug(`waiting id=${jobId}`))
- .on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`))
- .on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`))
- .on('failed', (job, err) => objectStorageLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
- .on('error', (job: any, err: Error) => objectStorageLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => objectStorageLogger.warn(`stalled id=${job.id}`));
-
-webhookDeliverQueue
- .on('waiting', (jobId) => webhookLogger.debug(`waiting id=${jobId}`))
- .on('active', (job) => webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
- .on('completed', (job, result) => webhookLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
- .on('failed', (job, err) => webhookLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`))
- .on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) }))
- .on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
-
-export function deliver(user: ThinUser, content: unknown, to: string | null) {
- if (content == null) return null;
- if (to == null) return null;
-
- const data = {
- user: {
- id: user.id,
- },
- content,
- to,
- };
-
- return deliverQueue.add(data, {
- attempts: config.deliverJobMaxAttempts || 12,
- timeout: 1 * 60 * 1000, // 1min
- backoff: {
- type: 'apBackoff',
- },
- removeOnComplete: true,
- removeOnFail: true,
- });
-}
-
-export function inbox(activity: IActivity, signature: httpSignature.IParsedSignature) {
- const data = {
- activity: activity,
- signature,
- };
-
- return inboxQueue.add(data, {
- attempts: config.inboxJobMaxAttempts || 8,
- timeout: 5 * 60 * 1000, // 5min
- backoff: {
- type: 'apBackoff',
- },
- removeOnComplete: true,
- removeOnFail: true,
- });
-}
-
-export function createDeleteDriveFilesJob(user: ThinUser) {
- return dbQueue.add('deleteDriveFiles', {
- user: user,
- }, {
- removeOnComplete: true,
- removeOnFail: true,
- });
-}
-
-export function createExportCustomEmojisJob(user: ThinUser) {
- return dbQueue.add('exportCustomEmojis', {
- user: user,
- }, {
- removeOnComplete: true,
- removeOnFail: true,
- });
-}
-
-export function createExportNotesJob(user: ThinUser) {
- return dbQueue.add('exportNotes', {
- user: user,
- }, {
- removeOnComplete: true,
- removeOnFail: true,
- });
-}
-
-export function createExportFollowingJob(user: ThinUser, excludeMuting = false, excludeInactive = false) {
- return dbQueue.add('exportFollowing', {
- user: user,
- excludeMuting,
- excludeInactive,
- }, {
- removeOnComplete: true,
- removeOnFail: true,
- });
-}
-
-export function createExportMuteJob(user: ThinUser) {
- return dbQueue.add('exportMute', {
- user: user,
- }, {
- removeOnComplete: true,
- removeOnFail: true,
- });
-}
-
-export function createExportBlockingJob(user: ThinUser) {
- return dbQueue.add('exportBlocking', {
- user: user,
- }, {
- removeOnComplete: true,
- removeOnFail: true,
- });
-}
-
-export function createExportUserListsJob(user: ThinUser) {
- return dbQueue.add('exportUserLists', {
- user: user,
- }, {
- removeOnComplete: true,
- removeOnFail: true,
- });
-}
-
-export function createImportFollowingJob(user: ThinUser, fileId: DriveFile['id']) {
- return dbQueue.add('importFollowing', {
- user: user,
- fileId: fileId,
- }, {
- removeOnComplete: true,
- removeOnFail: true,
- });
-}
-
-export function createImportMutingJob(user: ThinUser, fileId: DriveFile['id']) {
- return dbQueue.add('importMuting', {
- user: user,
- fileId: fileId,
- }, {
- removeOnComplete: true,
- removeOnFail: true,
- });
-}
-
-export function createImportBlockingJob(user: ThinUser, fileId: DriveFile['id']) {
- return dbQueue.add('importBlocking', {
- user: user,
- fileId: fileId,
- }, {
- removeOnComplete: true,
- removeOnFail: true,
- });
-}
-
-export function createImportUserListsJob(user: ThinUser, fileId: DriveFile['id']) {
- return dbQueue.add('importUserLists', {
- user: user,
- fileId: fileId,
- }, {
- removeOnComplete: true,
- removeOnFail: true,
- });
-}
-
-export function createImportCustomEmojisJob(user: ThinUser, fileId: DriveFile['id']) {
- return dbQueue.add('importCustomEmojis', {
- user: user,
- fileId: fileId,
- }, {
- removeOnComplete: true,
- removeOnFail: true,
- });
-}
-
-export function createDeleteAccountJob(user: ThinUser, opts: { soft?: boolean; } = {}) {
- return dbQueue.add('deleteAccount', {
- user: user,
- soft: opts.soft,
- }, {
- removeOnComplete: true,
- removeOnFail: true,
- });
-}
-
-export function createDeleteObjectStorageFileJob(key: string) {
- return objectStorageQueue.add('deleteFile', {
- key: key,
- }, {
- removeOnComplete: true,
- removeOnFail: true,
- });
-}
-
-export function createCleanRemoteFilesJob() {
- return objectStorageQueue.add('cleanRemoteFiles', {}, {
- removeOnComplete: true,
- removeOnFail: true,
- });
-}
-
-export function webhookDeliver(webhook: Webhook, type: typeof webhookEventTypes[number], content: unknown) {
- const data = {
- type,
- content,
- webhookId: webhook.id,
- userId: webhook.userId,
- to: webhook.url,
- secret: webhook.secret,
- createdAt: Date.now(),
- eventId: uuid(),
- };
-
- return webhookDeliverQueue.add(data, {
- attempts: 4,
- timeout: 1 * 60 * 1000, // 1min
- backoff: {
- type: 'apBackoff',
- },
- removeOnComplete: true,
- removeOnFail: true,
- });
-}
-
-export default function() {
- if (envOption.onlyServer) return;
-
- deliverQueue.process(config.deliverJobConcurrency || 128, processDeliver);
- inboxQueue.process(config.inboxJobConcurrency || 16, processInbox);
- endedPollNotificationQueue.process(endedPollNotification);
- webhookDeliverQueue.process(64, processWebhookDeliver);
- processDb(dbQueue);
- processObjectStorage(objectStorageQueue);
-
- systemQueue.add('tickCharts', {
- }, {
- repeat: { cron: '55 * * * *' },
- removeOnComplete: true,
- });
-
- systemQueue.add('resyncCharts', {
- }, {
- repeat: { cron: '0 0 * * *' },
- removeOnComplete: true,
- });
-
- systemQueue.add('cleanCharts', {
- }, {
- repeat: { cron: '0 0 * * *' },
- removeOnComplete: true,
- });
-
- systemQueue.add('clean', {
- }, {
- repeat: { cron: '0 0 * * *' },
- removeOnComplete: true,
- });
-
- systemQueue.add('checkExpiredMutings', {
- }, {
- repeat: { cron: '*/5 * * * *' },
- removeOnComplete: true,
- });
-
- processSystemQueue(systemQueue);
-}
-
-export function destroy() {
- deliverQueue.once('cleaned', (jobs, status) => {
- deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
- });
- deliverQueue.clean(0, 'delayed');
-
- inboxQueue.once('cleaned', (jobs, status) => {
- inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
- });
- inboxQueue.clean(0, 'delayed');
-}
diff --git a/packages/backend/src/queue/initialize.ts b/packages/backend/src/queue/initialize.ts
deleted file mode 100644
index eef4080af3..0000000000
--- a/packages/backend/src/queue/initialize.ts
+++ /dev/null
@@ -1,34 +0,0 @@
-import Bull from 'bull';
-import config from '@/config/index.js';
-
-export function initialize<T>(name: string, limitPerSec = -1) {
- return new Bull<T>(name, {
- redis: {
- port: config.redis.port,
- host: config.redis.host,
- family: config.redis.family == null ? 0 : config.redis.family,
- password: config.redis.pass,
- db: config.redis.db || 0,
- },
- prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : 'queue',
- limiter: limitPerSec > 0 ? {
- max: limitPerSec,
- duration: 1000,
- } : undefined,
- settings: {
- backoffStrategies: {
- apBackoff,
- },
- },
- });
-}
-
-// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
-function apBackoff(attemptsMade: number, err: Error) {
- const baseDelay = 60 * 1000; // 1min
- const maxBackoff = 8 * 60 * 60 * 1000; // 8hours
- let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay;
- backoff = Math.min(backoff, maxBackoff);
- backoff += Math.round(backoff * Math.random() * 0.2);
- return backoff;
-}
diff --git a/packages/backend/src/queue/logger.ts b/packages/backend/src/queue/logger.ts
deleted file mode 100644
index 2843a3c263..0000000000
--- a/packages/backend/src/queue/logger.ts
+++ /dev/null
@@ -1,3 +0,0 @@
-import Logger from '@/services/logger.js';
-
-export const queueLogger = new Logger('queue', 'orange');
diff --git a/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts b/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts
new file mode 100644
index 0000000000..514dc1dcf3
--- /dev/null
+++ b/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts
@@ -0,0 +1,50 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { In, MoreThan } from 'typeorm';
+import { DI } from '@/di-symbols.js';
+import { MutingsRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import { GlobalEventService } from '@/core/GlobalEventService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+
+@Injectable()
+export class CheckExpiredMutingsProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.mutingsRepository)
+ private mutingsRepository: MutingsRepository,
+
+ private globalEventService: GlobalEventService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('check-expired-mutings');
+ }
+
+ public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> {
+ this.#logger.info('Checking expired mutings...');
+
+ const expired = await this.mutingsRepository.createQueryBuilder('muting')
+ .where('muting.expiresAt IS NOT NULL')
+ .andWhere('muting.expiresAt < :now', { now: new Date() })
+ .innerJoinAndSelect('muting.mutee', 'mutee')
+ .getMany();
+
+ if (expired.length > 0) {
+ await this.mutingsRepository.delete({
+ id: In(expired.map(m => m.id)),
+ });
+
+ for (const m of expired) {
+ this.globalEventService.publishUserEvent(m.muterId, 'unmute', m.mutee!);
+ }
+ }
+
+ this.#logger.succ('All expired mutings checked.');
+ done();
+ }
+}
diff --git a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts
new file mode 100644
index 0000000000..0eaad9b9ed
--- /dev/null
+++ b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts
@@ -0,0 +1,68 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { In, MoreThan } from 'typeorm';
+import { DI } from '@/di-symbols.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import FederationChart from '@/core/chart/charts/federation.js';
+import NotesChart from '@/core/chart/charts/notes.js';
+import UsersChart from '@/core/chart/charts/users.js';
+import ActiveUsersChart from '@/core/chart/charts/active-users.js';
+import InstanceChart from '@/core/chart/charts/instance.js';
+import PerUserNotesChart from '@/core/chart/charts/per-user-notes.js';
+import DriveChart from '@/core/chart/charts/drive.js';
+import PerUserReactionsChart from '@/core/chart/charts/per-user-reactions.js';
+import HashtagChart from '@/core/chart/charts/hashtag.js';
+import PerUserFollowingChart from '@/core/chart/charts/per-user-following.js';
+import PerUserDriveChart from '@/core/chart/charts/per-user-drive.js';
+import ApRequestChart from '@/core/chart/charts/ap-request.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+
+@Injectable()
+export class CleanChartsProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ private federationChart: FederationChart,
+ private notesChart: NotesChart,
+ private usersChart: UsersChart,
+ private activeUsersChart: ActiveUsersChart,
+ private instanceChart: InstanceChart,
+ private perUserNotesChart: PerUserNotesChart,
+ private driveChart: DriveChart,
+ private perUserReactionsChart: PerUserReactionsChart,
+ private hashtagChart: HashtagChart,
+ private perUserFollowingChart: PerUserFollowingChart,
+ private perUserDriveChart: PerUserDriveChart,
+ private apRequestChart: ApRequestChart,
+
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('clean-charts');
+ }
+
+ public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> {
+ this.#logger.info('Clean charts...');
+
+ await Promise.all([
+ this.federationChart.clean(),
+ this.notesChart.clean(),
+ this.usersChart.clean(),
+ this.activeUsersChart.clean(),
+ this.instanceChart.clean(),
+ this.perUserNotesChart.clean(),
+ this.driveChart.clean(),
+ this.perUserReactionsChart.clean(),
+ this.hashtagChart.clean(),
+ this.perUserFollowingChart.clean(),
+ this.perUserDriveChart.clean(),
+ this.apRequestChart.clean(),
+ ]);
+
+ this.#logger.succ('All charts successfully cleaned.');
+ done();
+ }
+}
diff --git a/packages/backend/src/queue/processors/CleanProcessorService.ts b/packages/backend/src/queue/processors/CleanProcessorService.ts
new file mode 100644
index 0000000000..6150120806
--- /dev/null
+++ b/packages/backend/src/queue/processors/CleanProcessorService.ts
@@ -0,0 +1,36 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { In, LessThan, MoreThan } from 'typeorm';
+import { DI } from '@/di-symbols.js';
+import { UserIpsRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+
+@Injectable()
+export class CleanProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.userIpsRepository)
+ private userIpsRepository: UserIpsRepository,
+
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('clean');
+ }
+
+ public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> {
+ this.#logger.info('Cleaning...');
+
+ this.userIpsRepository.delete({
+ createdAt: LessThan(new Date(Date.now() - (1000 * 60 * 60 * 24 * 90))),
+ });
+
+ this.#logger.succ('Cleaned.');
+ done();
+ }
+}
diff --git a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
new file mode 100644
index 0000000000..8c53632563
--- /dev/null
+++ b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
@@ -0,0 +1,69 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { IsNull, MoreThan, Not } from 'typeorm';
+import { DI } from '@/di-symbols.js';
+import { DriveFilesRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import { DriveService } from '@/core/DriveService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+
+@Injectable()
+export class CleanRemoteFilesProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.driveFilesRepository)
+ private driveFilesRepository: DriveFilesRepository,
+
+ private driveService: DriveService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('clean-remote-files');
+ }
+
+ public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> {
+ this.#logger.info('Deleting cached remote files...');
+
+ let deletedCount = 0;
+ let cursor: any = null;
+
+ while (true) {
+ const files = await this.driveFilesRepository.find({
+ where: {
+ userHost: Not(IsNull()),
+ isLink: false,
+ ...(cursor ? { id: MoreThan(cursor) } : {}),
+ },
+ take: 8,
+ order: {
+ id: 1,
+ },
+ });
+
+ if (files.length === 0) {
+ job.progress(100);
+ break;
+ }
+
+ cursor = files[files.length - 1].id;
+
+ await Promise.all(files.map(file => this.driveService.deleteFileSync(file, true)));
+
+ deletedCount += 8;
+
+ const total = await this.driveFilesRepository.countBy({
+ userHost: Not(IsNull()),
+ isLink: false,
+ });
+
+ job.progress(deletedCount / total);
+ }
+
+ this.#logger.succ('All cahced remote files has been deleted.');
+ done();
+ }
+}
diff --git a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
new file mode 100644
index 0000000000..a5255c5c05
--- /dev/null
+++ b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
@@ -0,0 +1,124 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { MoreThan } from 'typeorm';
+import { DI } from '@/di-symbols.js';
+import { DriveFilesRepository, UserProfilesRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import { DriveService } from '@/core/DriveService.js';
+import type { DriveFile } from '@/models/entities/DriveFile.js';
+import type { Note } from '@/models/entities/Note.js';
+import { EmailService } from '@/core/EmailService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+import type { DbUserDeleteJobData } from '../types.js';
+
+@Injectable()
+export class DeleteAccountProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.usersRepository)
+ private usersRepository: UsersRepository,
+
+ @Inject(DI.userProfilesRepository)
+ private userProfilesRepository: UserProfilesRepository,
+
+ @Inject(DI.notesRepository)
+ private notesRepository: NotesRepository,
+
+ @Inject(DI.driveFilesRepository)
+ private driveFilesRepository: DriveFilesRepository,
+
+ private driveService: DriveService,
+ private emailService: EmailService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('delete-account');
+ }
+
+ public async process(job: Bull.Job<DbUserDeleteJobData>): Promise<string | void> {
+ this.#logger.info(`Deleting account of ${job.data.user.id} ...`);
+
+ const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
+ if (user == null) {
+ return;
+ }
+
+ { // Delete notes
+ let cursor: Note['id'] | null = null;
+
+ while (true) {
+ const notes = await this.notesRepository.find({
+ where: {
+ userId: user.id,
+ ...(cursor ? { id: MoreThan(cursor) } : {}),
+ },
+ take: 100,
+ order: {
+ id: 1,
+ },
+ }) as Note[];
+
+ if (notes.length === 0) {
+ break;
+ }
+
+ cursor = notes[notes.length - 1].id;
+
+ await this.notesRepository.delete(notes.map(note => note.id));
+ }
+
+ this.#logger.succ('All of notes deleted');
+ }
+
+ { // Delete files
+ let cursor: DriveFile['id'] | null = null;
+
+ while (true) {
+ const files = await this.driveFilesRepository.find({
+ where: {
+ userId: user.id,
+ ...(cursor ? { id: MoreThan(cursor) } : {}),
+ },
+ take: 10,
+ order: {
+ id: 1,
+ },
+ }) as DriveFile[];
+
+ if (files.length === 0) {
+ break;
+ }
+
+ cursor = files[files.length - 1].id;
+
+ for (const file of files) {
+ await this.driveService.deleteFileSync(file);
+ }
+ }
+
+ this.#logger.succ('All of files deleted');
+ }
+
+ { // Send email notification
+ const profile = await this.userProfilesRepository.findOneByOrFail({ userId: user.id });
+ if (profile.email && profile.emailVerified) {
+ this.emailService.sendEmail(profile.email, 'Account deleted',
+ 'Your account has been deleted.',
+ 'Your account has been deleted.');
+ }
+ }
+
+ // soft指定されている場合は物理削除しない
+ if (job.data.soft) {
+ // nop
+ } else {
+ await this.usersRepository.delete(job.data.user.id);
+ }
+
+ return 'Account deleted';
+ }
+}
diff --git a/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts b/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts
new file mode 100644
index 0000000000..80814bb5a2
--- /dev/null
+++ b/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts
@@ -0,0 +1,78 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { MoreThan } from 'typeorm';
+import { DI } from '@/di-symbols.js';
+import { UsersRepository, DriveFilesRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import { DriveService } from '@/core/DriveService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+import type { DbUserJobData } from '../types.js';
+
+@Injectable()
+export class DeleteDriveFilesProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.usersRepository)
+ private usersRepository: UsersRepository,
+
+ @Inject(DI.driveFilesRepository)
+ private driveFilesRepository: DriveFilesRepository,
+
+ private driveService: DriveService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('delete-drive-files');
+ }
+
+ public async process(job: Bull.Job<DbUserJobData>, done: () => void): Promise<void> {
+ this.#logger.info(`Deleting drive files of ${job.data.user.id} ...`);
+
+ const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
+ if (user == null) {
+ done();
+ return;
+ }
+
+ let deletedCount = 0;
+ let cursor: any = null;
+
+ while (true) {
+ const files = await this.driveFilesRepository.find({
+ where: {
+ userId: user.id,
+ ...(cursor ? { id: MoreThan(cursor) } : {}),
+ },
+ take: 100,
+ order: {
+ id: 1,
+ },
+ });
+
+ if (files.length === 0) {
+ job.progress(100);
+ break;
+ }
+
+ cursor = files[files.length - 1].id;
+
+ for (const file of files) {
+ await this.driveService.deleteFileSync(file);
+ deletedCount++;
+ }
+
+ const total = await this.driveFilesRepository.countBy({
+ userId: user.id,
+ });
+
+ job.progress(deletedCount / total);
+ }
+
+ this.#logger.succ(`All drive files (${deletedCount}) of ${user.id} has been deleted.`);
+ done();
+ }
+}
diff --git a/packages/backend/src/queue/processors/DeleteFileProcessorService.ts b/packages/backend/src/queue/processors/DeleteFileProcessorService.ts
new file mode 100644
index 0000000000..55424f6444
--- /dev/null
+++ b/packages/backend/src/queue/processors/DeleteFileProcessorService.ts
@@ -0,0 +1,31 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { DI } from '@/di-symbols.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import { DriveService } from '@/core/DriveService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+import type { ObjectStorageFileJobData } from '../types.js';
+
+@Injectable()
+export class DeleteFileProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ private driveService: DriveService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('delete-file');
+ }
+
+ public async process(job: Bull.Job<ObjectStorageFileJobData>): Promise<string> {
+ const key: string = job.data.key;
+
+ await this.driveService.deleteObjectStorageFile(key);
+
+ return 'Success';
+ }
+}
diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts
new file mode 100644
index 0000000000..3403ec83a9
--- /dev/null
+++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts
@@ -0,0 +1,130 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { MoreThan } from 'typeorm';
+import { DI } from '@/di-symbols.js';
+import { DriveFilesRepository, InstancesRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import { MetaService } from '@/core/MetaService.js';
+import { ApRequestService } from '@/core/remote/activitypub/ApRequestService.js';
+import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
+import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js';
+import { Cache } from '@/misc/cache.js';
+import type { Instance } from '@/models/entities/Instance.js';
+import InstanceChart from '@/core/chart/charts/instance.js';
+import ApRequestChart from '@/core/chart/charts/ap-request.js';
+import FederationChart from '@/core/chart/charts/federation.js';
+import { StatusError } from '@/misc/status-error.js';
+import { UtilityService } from '@/core/UtilityService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+import type { DeliverJobData } from '../types.js';
+
+@Injectable()
+export class DeliverProcessorService {
+ #logger: Logger;
+ #suspendedHostsCache: Cache<Instance[]>;
+ #latest: string | null;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.instancesRepository)
+ private instancesRepository: InstancesRepository,
+
+ @Inject(DI.driveFilesRepository)
+ private driveFilesRepository: DriveFilesRepository,
+
+ private metaService: MetaService,
+ private utilityService: UtilityService,
+ private federatedInstanceService: FederatedInstanceService,
+ private fetchInstanceMetadataService: FetchInstanceMetadataService,
+ private apRequestService: ApRequestService,
+ private instanceChart: InstanceChart,
+ private apRequestChart: ApRequestChart,
+ private federationChart: FederationChart,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('deliver');
+ this.#suspendedHostsCache = new Cache<Instance[]>(1000 * 60 * 60);
+ this.#latest = null;
+ }
+
+ public async process(job: Bull.Job<DeliverJobData>): Promise<string> {
+ const { host } = new URL(job.data.to);
+
+ // ブロックしてたら中断
+ const meta = await this.metaService.fetch();
+ if (meta.blockedHosts.includes(this.utilityService.toPuny(host))) {
+ return 'skip (blocked)';
+ }
+
+ // isSuspendedなら中断
+ let suspendedHosts = this.#suspendedHostsCache.get(null);
+ if (suspendedHosts == null) {
+ suspendedHosts = await this.instancesRepository.find({
+ where: {
+ isSuspended: true,
+ },
+ });
+ this.#suspendedHostsCache.set(null, suspendedHosts);
+ }
+ if (suspendedHosts.map(x => x.host).includes(this.utilityService.toPuny(host))) {
+ return 'skip (suspended)';
+ }
+
+ try {
+ if (this.#latest !== (this.#latest = JSON.stringify(job.data.content, null, 2))) {
+ this.#logger.debug(`delivering ${this.#latest}`);
+ }
+
+ await this.apRequestService.signedPost(job.data.user, job.data.to, job.data.content);
+
+ // Update stats
+ this.federatedInstanceService.registerOrFetchInstanceDoc(host).then(i => {
+ this.instancesRepository.update(i.id, {
+ latestRequestSentAt: new Date(),
+ latestStatus: 200,
+ lastCommunicatedAt: new Date(),
+ isNotResponding: false,
+ });
+
+ this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
+
+ this.instanceChart.requestSent(i.host, true);
+ this.apRequestChart.deliverSucc();
+ this.federationChart.deliverd(i.host, true);
+ });
+
+ return 'Success';
+ } catch (res) {
+ // Update stats
+ this.federatedInstanceService.registerOrFetchInstanceDoc(host).then(i => {
+ this.instancesRepository.update(i.id, {
+ latestRequestSentAt: new Date(),
+ latestStatus: res instanceof StatusError ? res.statusCode : null,
+ isNotResponding: true,
+ });
+
+ this.instanceChart.requestSent(i.host, false);
+ this.apRequestChart.deliverFail();
+ this.federationChart.deliverd(i.host, false);
+ });
+
+ if (res instanceof StatusError) {
+ // 4xx
+ if (res.isClientError) {
+ // HTTPステータスコード4xxはクライアントエラーであり、それはつまり
+ // 何回再送しても成功することはないということなのでエラーにはしないでおく
+ return `${res.statusCode} ${res.statusMessage}`;
+ }
+
+ // 5xx etc.
+ throw `${res.statusCode} ${res.statusMessage}`;
+ } else {
+ // DNS error, socket error, timeout ...
+ throw res;
+ }
+ }
+ }
+}
diff --git a/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts b/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts
new file mode 100644
index 0000000000..b90c7be629
--- /dev/null
+++ b/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts
@@ -0,0 +1,56 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { MoreThan } from 'typeorm';
+import { DI } from '@/di-symbols.js';
+import { PollVotesRepository, NotesRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import { CreateNotificationService } from '@/core/CreateNotificationService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+import type { EndedPollNotificationJobData } from '../types.js';
+
+@Injectable()
+export class EndedPollNotificationProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.notesRepository)
+ private notesRepository: NotesRepository,
+
+ @Inject(DI.pollVotesRepository)
+ private pollVotesRepository: PollVotesRepository,
+
+ private createNotificationService: CreateNotificationService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('ended-poll-notification');
+ }
+
+ public async process(job: Bull.Job<EndedPollNotificationJobData>, done: () => void): Promise<void> {
+ const note = await this.notesRepository.findOneBy({ id: job.data.noteId });
+ if (note == null || !note.hasPoll) {
+ done();
+ return;
+ }
+
+ const votes = await this.pollVotesRepository.createQueryBuilder('vote')
+ .select('vote.userId')
+ .where('vote.noteId = :noteId', { noteId: note.id })
+ .innerJoinAndSelect('vote.user', 'user')
+ .andWhere('user.host IS NULL')
+ .getMany();
+
+ const userIds = [...new Set([note.userId, ...votes.map(v => v.userId)])];
+
+ for (const userId of userIds) {
+ this.createNotificationService.createNotification(userId, 'pollEnded', {
+ noteId: note.id,
+ });
+ }
+
+ done();
+ }
+}
diff --git a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts
new file mode 100644
index 0000000000..9b520be06e
--- /dev/null
+++ b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts
@@ -0,0 +1,117 @@
+import * as fs from 'node:fs';
+import { Inject, Injectable } from '@nestjs/common';
+import { MoreThan } from 'typeorm';
+import { format as dateFormat } from 'date-fns';
+import { DI } from '@/di-symbols.js';
+import { UsersRepository, BlockingsRepository } from '@/models/index.js';
+import type { DriveFilesRepository, UserProfilesRepository, NotesRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import { DriveService } from '@/core/DriveService.js';
+import { createTemp } from '@/misc/create-temp.js';
+import { UtilityService } from '@/core/UtilityService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+import type { DbUserJobData } from '../types.js';
+
+@Injectable()
+export class ExportBlockingProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.usersRepository)
+ private usersRepository: UsersRepository,
+
+ @Inject(DI.blockingsRepository)
+ private blockingsRepository: BlockingsRepository,
+
+ private utilityService: UtilityService,
+ private driveService: DriveService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('export-blocking');
+ }
+
+ public async process(job: Bull.Job<DbUserJobData>, done: () => void): Promise<void> {
+ this.#logger.info(`Exporting blocking of ${job.data.user.id} ...`);
+
+ const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
+ if (user == null) {
+ done();
+ return;
+ }
+
+ // Create temp file
+ const [path, cleanup] = await createTemp();
+
+ this.#logger.info(`Temp file is ${path}`);
+
+ try {
+ const stream = fs.createWriteStream(path, { flags: 'a' });
+
+ let exportedCount = 0;
+ let cursor: any = null;
+
+ while (true) {
+ const blockings = await this.blockingsRepository.find({
+ where: {
+ blockerId: user.id,
+ ...(cursor ? { id: MoreThan(cursor) } : {}),
+ },
+ take: 100,
+ order: {
+ id: 1,
+ },
+ });
+
+ if (blockings.length === 0) {
+ job.progress(100);
+ break;
+ }
+
+ cursor = blockings[blockings.length - 1].id;
+
+ for (const block of blockings) {
+ const u = await this.usersRepository.findOneBy({ id: block.blockeeId });
+ if (u == null) {
+ exportedCount++; continue;
+ }
+
+ const content = this.utilityService.getFullApAccount(u.username, u.host);
+ await new Promise<void>((res, rej) => {
+ stream.write(content + '\n', err => {
+ if (err) {
+ this.#logger.error(err);
+ rej(err);
+ } else {
+ res();
+ }
+ });
+ });
+ exportedCount++;
+ }
+
+ const total = await this.blockingsRepository.countBy({
+ blockerId: user.id,
+ });
+
+ job.progress(exportedCount / total);
+ }
+
+ stream.end();
+ this.#logger.succ(`Exported to: ${path}`);
+
+ const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv';
+ const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true });
+
+ this.#logger.succ(`Exported to: ${driveFile.id}`);
+ } finally {
+ cleanup();
+ }
+
+ done();
+ }
+}
diff --git a/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts
new file mode 100644
index 0000000000..93341c2c63
--- /dev/null
+++ b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts
@@ -0,0 +1,135 @@
+import * as fs from 'node:fs';
+import { Inject, Injectable } from '@nestjs/common';
+import { IsNull, MoreThan } from 'typeorm';
+import { format as dateFormat } from 'date-fns';
+import { ulid } from 'ulid';
+import mime from 'mime-types';
+import archiver from 'archiver';
+import { DI } from '@/di-symbols.js';
+import { EmojisRepository, UsersRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import { DriveService } from '@/core/DriveService.js';
+import { createTemp, createTempDir } from '@/misc/create-temp.js';
+import { DownloadService } from '@/core/DownloadService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+
+@Injectable()
+export class ExportCustomEmojisProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.usersRepository)
+ private usersRepository: UsersRepository,
+
+ @Inject(DI.emojisRepository)
+ private emojisRepository: EmojisRepository,
+
+ private driveService: DriveService,
+ private downloadService: DownloadService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('export-custom-emojis');
+ }
+
+ public async process(job: Bull.Job, done: () => void): Promise<void> {
+ this.#logger.info('Exporting custom emojis ...');
+
+ const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
+ if (user == null) {
+ done();
+ return;
+ }
+
+ const [path, cleanup] = await createTempDir();
+
+ this.#logger.info(`Temp dir is ${path}`);
+
+ const metaPath = path + '/meta.json';
+
+ fs.writeFileSync(metaPath, '', 'utf-8');
+
+ const metaStream = fs.createWriteStream(metaPath, { flags: 'a' });
+
+ const writeMeta = (text: string): Promise<void> => {
+ return new Promise<void>((res, rej) => {
+ metaStream.write(text, err => {
+ if (err) {
+ this.#logger.error(err);
+ rej(err);
+ } else {
+ res();
+ }
+ });
+ });
+ };
+
+ await writeMeta(`{"metaVersion":2,"host":"${this.config.host}","exportedAt":"${new Date().toString()}","emojis":[`);
+
+ const customEmojis = await this.emojisRepository.find({
+ where: {
+ host: IsNull(),
+ },
+ order: {
+ id: 'ASC',
+ },
+ });
+
+ for (const emoji of customEmojis) {
+ const ext = mime.extension(emoji.type);
+ const fileName = emoji.name + (ext ? '.' + ext : '');
+ const emojiPath = path + '/' + fileName;
+ fs.writeFileSync(emojiPath, '', 'binary');
+ let downloaded = false;
+
+ try {
+ await this.downloadService.downloadUrl(emoji.originalUrl, emojiPath);
+ downloaded = true;
+ } catch (e) { // TODO: 何度か再試行
+ this.#logger.error(e instanceof Error ? e : new Error(e as string));
+ }
+
+ if (!downloaded) {
+ fs.unlinkSync(emojiPath);
+ }
+
+ const content = JSON.stringify({
+ fileName: fileName,
+ downloaded: downloaded,
+ emoji: emoji,
+ });
+ const isFirst = customEmojis.indexOf(emoji) === 0;
+
+ await writeMeta(isFirst ? content : ',\n' + content);
+ }
+
+ await writeMeta(']}');
+
+ metaStream.end();
+
+ // Create archive
+ const [archivePath, archiveCleanup] = await createTemp();
+ const archiveStream = fs.createWriteStream(archivePath);
+ const archive = archiver('zip', {
+ zlib: { level: 0 },
+ });
+ archiveStream.on('close', async () => {
+ this.#logger.succ(`Exported to: ${archivePath}`);
+
+ const fileName = 'custom-emojis-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.zip';
+ const driveFile = await this.driveService.addFile({ user, path: archivePath, name: fileName, force: true });
+
+ this.#logger.succ(`Exported to: ${driveFile.id}`);
+ cleanup();
+ archiveCleanup();
+ done();
+ });
+ archive.pipe(archiveStream);
+ archive.directory(path, false);
+ archive.finalize();
+ }
+}
diff --git a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts
new file mode 100644
index 0000000000..9946015ff7
--- /dev/null
+++ b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts
@@ -0,0 +1,120 @@
+import * as fs from 'node:fs';
+import { Inject, Injectable } from '@nestjs/common';
+import { In, MoreThan, Not } from 'typeorm';
+import { format as dateFormat } from 'date-fns';
+import { DI } from '@/di-symbols.js';
+import { FollowingsRepository, MutingsRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import { DriveService } from '@/core/DriveService.js';
+import { createTemp } from '@/misc/create-temp.js';
+import type { Following } from '@/models/entities/Following.js';
+import { UtilityService } from '@/core/UtilityService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+import type { DbUserJobData } from '../types.js';
+
+@Injectable()
+export class ExportFollowingProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.usersRepository)
+ private usersRepository: UsersRepository,
+
+ @Inject(DI.followingsRepository)
+ private followingsRepository: FollowingsRepository,
+
+ @Inject(DI.mutingsRepository)
+ private mutingsRepository: MutingsRepository,
+
+ private utilityService: UtilityService,
+ private driveService: DriveService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('export-following');
+ }
+
+ public async process(job: Bull.Job<DbUserJobData>, done: () => void): Promise<void> {
+ this.#logger.info(`Exporting following of ${job.data.user.id} ...`);
+
+ const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
+ if (user == null) {
+ done();
+ return;
+ }
+
+ // Create temp file
+ const [path, cleanup] = await createTemp();
+
+ this.#logger.info(`Temp file is ${path}`);
+
+ try {
+ const stream = fs.createWriteStream(path, { flags: 'a' });
+
+ let cursor: Following['id'] | null = null;
+
+ const mutings = job.data.excludeMuting ? await this.mutingsRepository.findBy({
+ muterId: user.id,
+ }) : [];
+
+ while (true) {
+ const followings = await this.followingsRepository.find({
+ where: {
+ followerId: user.id,
+ ...(mutings.length > 0 ? { followeeId: Not(In(mutings.map(x => x.muteeId))) } : {}),
+ ...(cursor ? { id: MoreThan(cursor) } : {}),
+ },
+ take: 100,
+ order: {
+ id: 1,
+ },
+ }) as Following[];
+
+ if (followings.length === 0) {
+ break;
+ }
+
+ cursor = followings[followings.length - 1].id;
+
+ for (const following of followings) {
+ const u = await this.usersRepository.findOneBy({ id: following.followeeId });
+ if (u == null) {
+ continue;
+ }
+
+ if (job.data.excludeInactive && u.updatedAt && (Date.now() - u.updatedAt.getTime() > 1000 * 60 * 60 * 24 * 90)) {
+ continue;
+ }
+
+ const content = this.utilityService.getFullApAccount(u.username, u.host);
+ await new Promise<void>((res, rej) => {
+ stream.write(content + '\n', err => {
+ if (err) {
+ this.#logger.error(err);
+ rej(err);
+ } else {
+ res();
+ }
+ });
+ });
+ }
+ }
+
+ stream.end();
+ this.#logger.succ(`Exported to: ${path}`);
+
+ const fileName = 'following-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv';
+ const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true });
+
+ this.#logger.succ(`Exported to: ${driveFile.id}`);
+ } finally {
+ cleanup();
+ }
+
+ done();
+ }
+}
diff --git a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts
new file mode 100644
index 0000000000..a34cea0f41
--- /dev/null
+++ b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts
@@ -0,0 +1,120 @@
+import * as fs from 'node:fs';
+import { Inject, Injectable } from '@nestjs/common';
+import { IsNull, MoreThan } from 'typeorm';
+import { format as dateFormat } from 'date-fns';
+import { DI } from '@/di-symbols.js';
+import { MutingsRepository, UsersRepository, BlockingsRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import { DriveService } from '@/core/DriveService.js';
+import { createTemp } from '@/misc/create-temp.js';
+import { UtilityService } from '@/core/UtilityService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+import type { DbUserJobData } from '../types.js';
+
+@Injectable()
+export class ExportMutingProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.usersRepository)
+ private usersRepository: UsersRepository,
+
+ @Inject(DI.blockingsRepository)
+ private blockingsRepository: BlockingsRepository,
+
+ @Inject(DI.mutingsRepository)
+ private mutingsRepository: MutingsRepository,
+
+ private utilityService: UtilityService,
+ private driveService: DriveService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('export-muting');
+ }
+
+ public async process(job: Bull.Job<DbUserJobData>, done: () => void): Promise<void> {
+ this.#logger.info(`Exporting muting of ${job.data.user.id} ...`);
+
+ const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
+ if (user == null) {
+ done();
+ return;
+ }
+
+ // Create temp file
+ const [path, cleanup] = await createTemp();
+
+ this.#logger.info(`Temp file is ${path}`);
+
+ try {
+ const stream = fs.createWriteStream(path, { flags: 'a' });
+
+ let exportedCount = 0;
+ let cursor: any = null;
+
+ while (true) {
+ const mutes = await this.mutingsRepository.find({
+ where: {
+ muterId: user.id,
+ expiresAt: IsNull(),
+ ...(cursor ? { id: MoreThan(cursor) } : {}),
+ },
+ take: 100,
+ order: {
+ id: 1,
+ },
+ });
+
+ if (mutes.length === 0) {
+ job.progress(100);
+ break;
+ }
+
+ cursor = mutes[mutes.length - 1].id;
+
+ for (const mute of mutes) {
+ const u = await this.usersRepository.findOneBy({ id: mute.muteeId });
+ if (u == null) {
+ exportedCount++; continue;
+ }
+
+ const content = this.utilityService.getFullApAccount(u.username, u.host);
+ await new Promise<void>((res, rej) => {
+ stream.write(content + '\n', err => {
+ if (err) {
+ this.#logger.error(err);
+ rej(err);
+ } else {
+ res();
+ }
+ });
+ });
+ exportedCount++;
+ }
+
+ const total = await this.mutingsRepository.countBy({
+ muterId: user.id,
+ });
+
+ job.progress(exportedCount / total);
+ }
+
+ stream.end();
+ this.#logger.succ(`Exported to: ${path}`);
+
+ const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv';
+ const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true });
+
+ this.#logger.succ(`Exported to: ${driveFile.id}`);
+ } finally {
+ cleanup();
+ }
+
+ done();
+ }
+}
diff --git a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
new file mode 100644
index 0000000000..24fcc1a8ad
--- /dev/null
+++ b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
@@ -0,0 +1,143 @@
+import * as fs from 'node:fs';
+import { Inject, Injectable } from '@nestjs/common';
+import { IsNull, MoreThan } from 'typeorm';
+import { format as dateFormat } from 'date-fns';
+import { DI } from '@/di-symbols.js';
+import { NotesRepository, PollsRepository, UsersRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import { DriveService } from '@/core/DriveService.js';
+import { createTemp } from '@/misc/create-temp.js';
+import type { Poll } from '@/models/entities/Poll.js';
+import type { Note } from '@/models/entities/Note.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+import type { DbUserJobData } from '../types.js';
+
+@Injectable()
+export class ExportNotesProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.usersRepository)
+ private usersRepository: UsersRepository,
+
+ @Inject(DI.pollsRepository)
+ private pollsRepository: PollsRepository,
+
+ @Inject(DI.notesRepository)
+ private notesRepository: NotesRepository,
+
+ private driveService: DriveService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('export-notes');
+ }
+
+ public async process(job: Bull.Job<DbUserJobData>, done: () => void): Promise<void> {
+ this.#logger.info(`Exporting notes of ${job.data.user.id} ...`);
+
+ const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
+ if (user == null) {
+ done();
+ return;
+ }
+
+ // Create temp file
+ const [path, cleanup] = await createTemp();
+
+ this.#logger.info(`Temp file is ${path}`);
+
+ try {
+ const stream = fs.createWriteStream(path, { flags: 'a' });
+
+ const write = (text: string): Promise<void> => {
+ return new Promise<void>((res, rej) => {
+ stream.write(text, err => {
+ if (err) {
+ this.#logger.error(err);
+ rej(err);
+ } else {
+ res();
+ }
+ });
+ });
+ };
+
+ await write('[');
+
+ let exportedNotesCount = 0;
+ let cursor: Note['id'] | null = null;
+
+ while (true) {
+ const notes = await this.notesRepository.find({
+ where: {
+ userId: user.id,
+ ...(cursor ? { id: MoreThan(cursor) } : {}),
+ },
+ take: 100,
+ order: {
+ id: 1,
+ },
+ }) as Note[];
+
+ if (notes.length === 0) {
+ job.progress(100);
+ break;
+ }
+
+ cursor = notes[notes.length - 1].id;
+
+ for (const note of notes) {
+ let poll: Poll | undefined;
+ if (note.hasPoll) {
+ poll = await this.pollsRepository.findOneByOrFail({ noteId: note.id });
+ }
+ const content = JSON.stringify(serialize(note, poll));
+ const isFirst = exportedNotesCount === 0;
+ await write(isFirst ? content : ',\n' + content);
+ exportedNotesCount++;
+ }
+
+ const total = await this.notesRepository.countBy({
+ userId: user.id,
+ });
+
+ job.progress(exportedNotesCount / total);
+ }
+
+ await write(']');
+
+ stream.end();
+ this.#logger.succ(`Exported to: ${path}`);
+
+ const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json';
+ const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true });
+
+ this.#logger.succ(`Exported to: ${driveFile.id}`);
+ } finally {
+ cleanup();
+ }
+
+ done();
+ }
+}
+
+function serialize(note: Note, poll: Poll | null = null): Record<string, unknown> {
+ return {
+ id: note.id,
+ text: note.text,
+ createdAt: note.createdAt,
+ fileIds: note.fileIds,
+ replyId: note.replyId,
+ renoteId: note.renoteId,
+ poll: poll,
+ cw: note.cw,
+ visibility: note.visibility,
+ visibleUserIds: note.visibleUserIds,
+ localOnly: note.localOnly,
+ };
+}
diff --git a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts
new file mode 100644
index 0000000000..a02e9bdee4
--- /dev/null
+++ b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts
@@ -0,0 +1,96 @@
+import * as fs from 'node:fs';
+import { Inject, Injectable } from '@nestjs/common';
+import { In, IsNull, MoreThan } from 'typeorm';
+import { format as dateFormat } from 'date-fns';
+import { DI } from '@/di-symbols.js';
+import { UserListJoiningsRepository, UserListsRepository, UsersRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import { DriveService } from '@/core/DriveService.js';
+import { createTemp } from '@/misc/create-temp.js';
+import { UtilityService } from '@/core/UtilityService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+import type { DbUserJobData } from '../types.js';
+
+@Injectable()
+export class ExportUserListsProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.usersRepository)
+ private usersRepository: UsersRepository,
+
+ @Inject(DI.userListsRepository)
+ private userListsRepository: UserListsRepository,
+
+ @Inject(DI.userListJoiningsRepository)
+ private userListJoiningsRepository: UserListJoiningsRepository,
+
+ private utilityService: UtilityService,
+ private driveService: DriveService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('export-user-lists');
+ }
+
+ public async process(job: Bull.Job<DbUserJobData>, done: () => void): Promise<void> {
+ this.#logger.info(`Exporting user lists of ${job.data.user.id} ...`);
+
+ const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
+ if (user == null) {
+ done();
+ return;
+ }
+
+ const lists = await this.userListsRepository.findBy({
+ userId: user.id,
+ });
+
+ // Create temp file
+ const [path, cleanup] = await createTemp();
+
+ this.#logger.info(`Temp file is ${path}`);
+
+ try {
+ const stream = fs.createWriteStream(path, { flags: 'a' });
+
+ for (const list of lists) {
+ const joinings = await this.userListJoiningsRepository.findBy({ userListId: list.id });
+ const users = await this.usersRepository.findBy({
+ id: In(joinings.map(j => j.userId)),
+ });
+
+ for (const u of users) {
+ const acct = this.utilityService.getFullApAccount(u.username, u.host);
+ const content = `${list.name},${acct}`;
+ await new Promise<void>((res, rej) => {
+ stream.write(content + '\n', err => {
+ if (err) {
+ this.#logger.error(err);
+ rej(err);
+ } else {
+ res();
+ }
+ });
+ });
+ }
+ }
+
+ stream.end();
+ this.#logger.succ(`Exported to: ${path}`);
+
+ const fileName = 'user-lists-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv';
+ const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true });
+
+ this.#logger.succ(`Exported to: ${driveFile.id}`);
+ } finally {
+ cleanup();
+ }
+
+ done();
+ }
+}
diff --git a/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts
new file mode 100644
index 0000000000..abae196299
--- /dev/null
+++ b/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts
@@ -0,0 +1,102 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { IsNull, MoreThan } from 'typeorm';
+import { DI } from '@/di-symbols.js';
+import { BlockingsRepository, DriveFilesRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import * as Acct from '@/misc/acct.js';
+import { ResolveUserService } from '@/core/remote/ResolveUserService.js';
+import { UserBlockingService } from '@/core/UserBlockingService.js';
+import { DownloadService } from '@/core/DownloadService.js';
+import { UtilityService } from '@/core/UtilityService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+import type { DbUserImportJobData } from '../types.js';
+
+@Injectable()
+export class ImportBlockingProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.usersRepository)
+ private usersRepository: UsersRepository,
+
+ @Inject(DI.blockingsRepository)
+ private blockingsRepository: BlockingsRepository,
+
+ @Inject(DI.driveFilesRepository)
+ private driveFilesRepository: DriveFilesRepository,
+
+ private utilityService: UtilityService,
+ private userBlockingService: UserBlockingService,
+ private resolveUserService: ResolveUserService,
+ private downloadService: DownloadService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('import-blocking');
+ }
+
+ public async process(job: Bull.Job<DbUserImportJobData>, done: () => void): Promise<void> {
+ this.#logger.info(`Importing blocking of ${job.data.user.id} ...`);
+
+ const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
+ if (user == null) {
+ done();
+ return;
+ }
+
+ const file = await this.driveFilesRepository.findOneBy({
+ id: job.data.fileId,
+ });
+ if (file == null) {
+ done();
+ return;
+ }
+
+ const csv = await this.downloadService.downloadTextFile(file.url);
+
+ let linenum = 0;
+
+ for (const line of csv.trim().split('\n')) {
+ linenum++;
+
+ try {
+ const acct = line.split(',')[0].trim();
+ const { username, host } = Acct.parse(acct);
+
+ let target = this.utilityService.isSelfHost(host!) ? await this.usersRepository.findOneBy({
+ host: IsNull(),
+ usernameLower: username.toLowerCase(),
+ }) : await this.usersRepository.findOneBy({
+ host: this.utilityService.toPuny(host!),
+ usernameLower: username.toLowerCase(),
+ });
+
+ if (host == null && target == null) continue;
+
+ if (target == null) {
+ target = await this.resolveUserService.resolveUser(username, host);
+ }
+
+ if (target == null) {
+ throw `cannot resolve user: @${username}@${host}`;
+ }
+
+ // skip myself
+ if (target.id === job.data.user.id) continue;
+
+ this.#logger.info(`Block[${linenum}] ${target.id} ...`);
+
+ await this.userBlockingService.block(user, target);
+ } catch (e) {
+ this.#logger.warn(`Error in line:${linenum} ${e}`);
+ }
+ }
+
+ this.#logger.succ('Imported');
+ done();
+ }
+}
diff --git a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
new file mode 100644
index 0000000000..6f86589aec
--- /dev/null
+++ b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
@@ -0,0 +1,110 @@
+import * as fs from 'node:fs';
+import { Inject, Injectable } from '@nestjs/common';
+import { IsNull, MoreThan, DataSource } from 'typeorm';
+import unzipper from 'unzipper';
+import { DI } from '@/di-symbols.js';
+import { EmojisRepository, DriveFilesRepository, UsersRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import { CustomEmojiService } from '@/core/CustomEmojiService.js';
+import { createTempDir } from '@/misc/create-temp.js';
+import { DriveService } from '@/core/DriveService.js';
+import { DownloadService } from '@/core/DownloadService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+import type { DbUserImportJobData } from '../types.js';
+
+// TODO: 名前衝突時の動作を選べるようにする
+@Injectable()
+export class ImportCustomEmojisProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.db)
+ private db: DataSource,
+
+ @Inject(DI.usersRepository)
+ private usersRepository: UsersRepository,
+
+ @Inject(DI.driveFilesRepository)
+ private driveFilesRepository: DriveFilesRepository,
+
+ @Inject(DI.emojisRepository)
+ private emojisRepository: EmojisRepository,
+
+ private customEmojiService: CustomEmojiService,
+ private driveService: DriveService,
+ private downloadService: DownloadService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('import-custom-emojis');
+ }
+
+ public async process(job: Bull.Job<DbUserImportJobData>, done: () => void): Promise<void> {
+ this.#logger.info('Importing custom emojis ...');
+
+ const file = await this.driveFilesRepository.findOneBy({
+ id: job.data.fileId,
+ });
+ if (file == null) {
+ done();
+ return;
+ }
+
+ const [path, cleanup] = await createTempDir();
+
+ this.#logger.info(`Temp dir is ${path}`);
+
+ const destPath = path + '/emojis.zip';
+
+ try {
+ fs.writeFileSync(destPath, '', 'binary');
+ await this.downloadService.downloadUrl(file.url, destPath);
+ } catch (e) { // TODO: 何度か再試行
+ if (e instanceof Error || typeof e === 'string') {
+ this.#logger.error(e);
+ }
+ throw e;
+ }
+
+ const outputPath = path + '/emojis';
+ const unzipStream = fs.createReadStream(destPath);
+ const extractor = unzipper.Extract({ path: outputPath });
+ extractor.on('close', async () => {
+ const metaRaw = fs.readFileSync(outputPath + '/meta.json', 'utf-8');
+ const meta = JSON.parse(metaRaw);
+
+ for (const record of meta.emojis) {
+ if (!record.downloaded) continue;
+ const emojiInfo = record.emoji;
+ const emojiPath = outputPath + '/' + record.fileName;
+ await this.emojisRepository.delete({
+ name: emojiInfo.name,
+ });
+ const driveFile = await this.driveService.addFile({
+ user: null,
+ path: emojiPath,
+ name: record.fileName,
+ force: true,
+ });
+ await this.customEmojiService.add({
+ name: emojiInfo.name,
+ category: emojiInfo.category,
+ host: null,
+ aliases: emojiInfo.aliases,
+ driveFile,
+ });
+ }
+
+ cleanup();
+
+ this.#logger.succ('Imported');
+ done();
+ });
+ unzipStream.pipe(extractor);
+ this.#logger.succ(`Unzipping to ${outputPath}`);
+ }
+}
diff --git a/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts
new file mode 100644
index 0000000000..087e0baf96
--- /dev/null
+++ b/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts
@@ -0,0 +1,99 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { IsNull, MoreThan } from 'typeorm';
+import { DI } from '@/di-symbols.js';
+import { DriveFilesRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import * as Acct from '@/misc/acct.js';
+import { ResolveUserService } from '@/core/remote/ResolveUserService.js';
+import { DownloadService } from '@/core/DownloadService.js';
+import { UserFollowingService } from '@/core/UserFollowingService.js';
+import { UtilityService } from '@/core/UtilityService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+import type { DbUserImportJobData } from '../types.js';
+
+@Injectable()
+export class ImportFollowingProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.usersRepository)
+ private usersRepository: UsersRepository,
+
+ @Inject(DI.driveFilesRepository)
+ private driveFilesRepository: DriveFilesRepository,
+
+ private utilityService: UtilityService,
+ private userFollowingService: UserFollowingService,
+ private resolveUserService: ResolveUserService,
+ private downloadService: DownloadService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('import-following');
+ }
+
+ public async process(job: Bull.Job<DbUserImportJobData>, done: () => void): Promise<void> {
+ this.#logger.info(`Importing following of ${job.data.user.id} ...`);
+
+ const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
+ if (user == null) {
+ done();
+ return;
+ }
+
+ const file = await this.driveFilesRepository.findOneBy({
+ id: job.data.fileId,
+ });
+ if (file == null) {
+ done();
+ return;
+ }
+
+ const csv = await this.downloadService.downloadTextFile(file.url);
+
+ let linenum = 0;
+
+ for (const line of csv.trim().split('\n')) {
+ linenum++;
+
+ try {
+ const acct = line.split(',')[0].trim();
+ const { username, host } = Acct.parse(acct);
+
+ let target = this.utilityService.isSelfHost(host!) ? await this.usersRepository.findOneBy({
+ host: IsNull(),
+ usernameLower: username.toLowerCase(),
+ }) : await this.usersRepository.findOneBy({
+ host: this.utilityService.toPuny(host!),
+ usernameLower: username.toLowerCase(),
+ });
+
+ if (host == null && target == null) continue;
+
+ if (target == null) {
+ target = await this.resolveUserService.resolveUser(username, host);
+ }
+
+ if (target == null) {
+ throw `cannot resolve user: @${username}@${host}`;
+ }
+
+ // skip myself
+ if (target.id === job.data.user.id) continue;
+
+ this.#logger.info(`Follow[${linenum}] ${target.id} ...`);
+
+ this.userFollowingService.follow(user, target);
+ } catch (e) {
+ this.#logger.warn(`Error in line:${linenum} ${e}`);
+ }
+ }
+
+ this.#logger.succ('Imported');
+ done();
+ }
+}
diff --git a/packages/backend/src/queue/processors/ImportMutingProcessorService.ts b/packages/backend/src/queue/processors/ImportMutingProcessorService.ts
new file mode 100644
index 0000000000..404091e8ca
--- /dev/null
+++ b/packages/backend/src/queue/processors/ImportMutingProcessorService.ts
@@ -0,0 +1,100 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { IsNull, MoreThan } from 'typeorm';
+import { DI } from '@/di-symbols.js';
+import { DriveFilesRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import * as Acct from '@/misc/acct.js';
+import { ResolveUserService } from '@/core/remote/ResolveUserService.js';
+import { DownloadService } from '@/core/DownloadService.js';
+import type { UserFollowingService } from '@/core/UserFollowingService.js';
+import { UserMutingService } from '@/core/UserMutingService.js';
+import { UtilityService } from '@/core/UtilityService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+import type { DbUserImportJobData } from '../types.js';
+
+@Injectable()
+export class ImportMutingProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.usersRepository)
+ private usersRepository: UsersRepository,
+
+ @Inject(DI.driveFilesRepository)
+ private driveFilesRepository: DriveFilesRepository,
+
+ private utilityService: UtilityService,
+ private userMutingService: UserMutingService,
+ private resolveUserService: ResolveUserService,
+ private downloadService: DownloadService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('import-muting');
+ }
+
+ public async process(job: Bull.Job<DbUserImportJobData>, done: () => void): Promise<void> {
+ this.#logger.info(`Importing muting of ${job.data.user.id} ...`);
+
+ const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
+ if (user == null) {
+ done();
+ return;
+ }
+
+ const file = await this.driveFilesRepository.findOneBy({
+ id: job.data.fileId,
+ });
+ if (file == null) {
+ done();
+ return;
+ }
+
+ const csv = await this.downloadService.downloadTextFile(file.url);
+
+ let linenum = 0;
+
+ for (const line of csv.trim().split('\n')) {
+ linenum++;
+
+ try {
+ const acct = line.split(',')[0].trim();
+ const { username, host } = Acct.parse(acct);
+
+ let target = this.utilityService.isSelfHost(host!) ? await this.usersRepository.findOneBy({
+ host: IsNull(),
+ usernameLower: username.toLowerCase(),
+ }) : await this.usersRepository.findOneBy({
+ host: this.utilityService.toPuny(host!),
+ usernameLower: username.toLowerCase(),
+ });
+
+ if (host == null && target == null) continue;
+
+ if (target == null) {
+ target = await this.resolveUserService.resolveUser(username, host);
+ }
+
+ if (target == null) {
+ throw `cannot resolve user: @${username}@${host}`;
+ }
+
+ // skip myself
+ if (target.id === job.data.user.id) continue;
+
+ this.#logger.info(`Mute[${linenum}] ${target.id} ...`);
+
+ await this.userMutingService.mute(user, target);
+ } catch (e) {
+ this.#logger.warn(`Error in line:${linenum} ${e}`);
+ }
+ }
+
+ this.#logger.succ('Imported');
+ done();
+ }
+}
diff --git a/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts
new file mode 100644
index 0000000000..aed1a4cde5
--- /dev/null
+++ b/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts
@@ -0,0 +1,112 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { IsNull, MoreThan } from 'typeorm';
+import { DI } from '@/di-symbols.js';
+import { DriveFilesRepository, UserListJoiningsRepository, UserListsRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import * as Acct from '@/misc/acct.js';
+import { ResolveUserService } from '@/core/remote/ResolveUserService.js';
+import { DownloadService } from '@/core/DownloadService.js';
+import { UserListService } from '@/core/UserListService.js';
+import { IdService } from '@/core/IdService.js';
+import { UtilityService } from '@/core/UtilityService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+import type { DbUserImportJobData } from '../types.js';
+
+@Injectable()
+export class ImportUserListsProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.usersRepository)
+ private usersRepository: UsersRepository,
+
+ @Inject(DI.driveFilesRepository)
+ private driveFilesRepository: DriveFilesRepository,
+
+ @Inject(DI.userListsRepository)
+ private userListsRepository: UserListsRepository,
+
+ @Inject(DI.userListJoiningsRepository)
+ private userListJoiningsRepository: UserListJoiningsRepository,
+
+ private utilityService: UtilityService,
+ private idService: IdService,
+ private userListService: UserListService,
+ private resolveUserService: ResolveUserService,
+ private downloadService: DownloadService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('import-user-lists');
+ }
+
+ public async process(job: Bull.Job<DbUserImportJobData>, done: () => void): Promise<void> {
+ this.#logger.info(`Importing user lists of ${job.data.user.id} ...`);
+
+ const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
+ if (user == null) {
+ done();
+ return;
+ }
+
+ const file = await this.driveFilesRepository.findOneBy({
+ id: job.data.fileId,
+ });
+ if (file == null) {
+ done();
+ return;
+ }
+
+ const csv = await this.downloadService.downloadTextFile(file.url);
+
+ let linenum = 0;
+
+ for (const line of csv.trim().split('\n')) {
+ linenum++;
+
+ try {
+ const listName = line.split(',')[0].trim();
+ const { username, host } = Acct.parse(line.split(',')[1].trim());
+
+ let list = await this.userListsRepository.findOneBy({
+ userId: user.id,
+ name: listName,
+ });
+
+ if (list == null) {
+ list = await this.userListsRepository.insert({
+ id: this.idService.genId(),
+ createdAt: new Date(),
+ userId: user.id,
+ name: listName,
+ }).then(x => this.userListsRepository.findOneByOrFail(x.identifiers[0]));
+ }
+
+ let target = this.utilityService.isSelfHost(host!) ? await this.usersRepository.findOneBy({
+ host: IsNull(),
+ usernameLower: username.toLowerCase(),
+ }) : await this.usersRepository.findOneBy({
+ host: this.utilityService.toPuny(host!),
+ usernameLower: username.toLowerCase(),
+ });
+
+ if (target == null) {
+ target = await this.resolveUserService.resolveUser(username, host);
+ }
+
+ if (await this.userListJoiningsRepository.findOneBy({ userListId: list!.id, userId: target.id }) != null) continue;
+
+ this.userListService.push(target, list!);
+ } catch (e) {
+ this.#logger.warn(`Error in line:${linenum} ${e}`);
+ }
+ }
+
+ this.#logger.succ('Imported');
+ done();
+ }
+}
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
new file mode 100644
index 0000000000..5733f5d0a9
--- /dev/null
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -0,0 +1,195 @@
+import { URL } from 'node:url';
+import { Inject, Injectable } from '@nestjs/common';
+import { MoreThan } from 'typeorm';
+import httpSignature from '@peertube/http-signature';
+import { DI } from '@/di-symbols.js';
+import { DriveFilesRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import { MetaService } from '@/core/MetaService.js';
+import { ApRequestService } from '@/core/remote/activitypub/ApRequestService.js';
+import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
+import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js';
+import { Cache } from '@/misc/cache.js';
+import type { Instance } from '@/models/entities/Instance.js';
+import InstanceChart from '@/core/chart/charts/instance.js';
+import ApRequestChart from '@/core/chart/charts/ap-request.js';
+import FederationChart from '@/core/chart/charts/federation.js';
+import { getApId } from '@/core/remote/activitypub/type.js';
+import type { CacheableRemoteUser } from '@/models/entities/User.js';
+import type { UserPublickey } from '@/models/entities/UserPublickey.js';
+import { ApDbResolverService } from '@/core/remote/activitypub/ApDbResolverService.js';
+import { StatusError } from '@/misc/status-error.js';
+import { UtilityService } from '@/core/UtilityService.js';
+import { ApPersonService } from '@/core/remote/activitypub/models/ApPersonService.js';
+import { LdSignatureService } from '@/core/remote/activitypub/LdSignatureService.js';
+import { ApInboxService } from '@/core/remote/activitypub/ApInboxService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+import type { DeliverJobData, InboxJobData } from '../types.js';
+
+// ユーザーのinboxにアクティビティが届いた時の処理
+@Injectable()
+export class InboxProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.instancesRepository)
+ private instancesRepository: InstancesRepository,
+
+ @Inject(DI.driveFilesRepository)
+ private driveFilesRepository: DriveFilesRepository,
+
+ private utilityService: UtilityService,
+ private metaService: MetaService,
+ private apInboxService: ApInboxService,
+ private federatedInstanceService: FederatedInstanceService,
+ private fetchInstanceMetadataService: FetchInstanceMetadataService,
+ private ldSignatureService: LdSignatureService,
+ private apRequestService: ApRequestService,
+ private apPersonService: ApPersonService,
+ private apDbResolverService: ApDbResolverService,
+ private instanceChart: InstanceChart,
+ private apRequestChart: ApRequestChart,
+ private federationChart: FederationChart,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('inbox');
+ }
+
+ public async process(job: Bull.Job<InboxJobData>): Promise<string> {
+ const signature = job.data.signature; // HTTP-signature
+ const activity = job.data.activity;
+
+ //#region Log
+ const info = Object.assign({}, activity) as any;
+ delete info['@context'];
+ this.#logger.debug(JSON.stringify(info, null, 2));
+ //#endregion
+
+ const host = this.utilityService.toPuny(new URL(signature.keyId).hostname);
+
+ // ブロックしてたら中断
+ const meta = await this.metaService.fetch();
+ if (meta.blockedHosts.includes(host)) {
+ return `Blocked request: ${host}`;
+ }
+
+ const keyIdLower = signature.keyId.toLowerCase();
+ if (keyIdLower.startsWith('acct:')) {
+ return `Old keyId is no longer supported. ${keyIdLower}`;
+ }
+
+ // HTTP-Signature keyIdを元にDBから取得
+ let authUser: {
+ user: CacheableRemoteUser;
+ key: UserPublickey | null;
+ } | null = await this.apDbResolverService.getAuthUserFromKeyId(signature.keyId);
+
+ // keyIdでわからなければ、activity.actorを元にDBから取得 || activity.actorを元にリモートから取得
+ if (authUser == null) {
+ try {
+ authUser = await this.apDbResolverService.getAuthUserFromApId(getApId(activity.actor));
+ } catch (err) {
+ // 対象が4xxならスキップ
+ if (err instanceof StatusError) {
+ if (err.isClientError) {
+ return `skip: Ignored deleted actors on both ends ${activity.actor} - ${err.statusCode}`;
+ }
+ throw `Error in actor ${activity.actor} - ${err.statusCode ?? err}`;
+ }
+ }
+ }
+
+ // それでもわからなければ終了
+ if (authUser == null) {
+ return 'skip: failed to resolve user';
+ }
+
+ // publicKey がなくても終了
+ if (authUser.key == null) {
+ return 'skip: failed to resolve user publicKey';
+ }
+
+ // HTTP-Signatureの検証
+ const httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem);
+
+ // また、signatureのsignerは、activity.actorと一致する必要がある
+ if (!httpSignatureValidated || authUser.user.uri !== activity.actor) {
+ // 一致しなくても、でもLD-Signatureがありそうならそっちも見る
+ if (activity.signature) {
+ if (activity.signature.type !== 'RsaSignature2017') {
+ return `skip: unsupported LD-signature type ${activity.signature.type}`;
+ }
+
+ // activity.signature.creator: https://example.oom/users/user#main-key
+ // みたいになっててUserを引っ張れば公開キーも入ることを期待する
+ if (activity.signature.creator) {
+ const candicate = activity.signature.creator.replace(/#.*/, '');
+ await this.apPersonService.resolvePerson(candicate).catch(() => null);
+ }
+
+ // keyIdからLD-Signatureのユーザーを取得
+ authUser = await this.apDbResolverService.getAuthUserFromKeyId(activity.signature.creator);
+ if (authUser == null) {
+ return 'skip: LD-Signatureのユーザーが取得できませんでした';
+ }
+
+ if (authUser.key == null) {
+ return 'skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした';
+ }
+
+ // LD-Signature検証
+ const ldSignature = this.ldSignatureService.use();
+ const verified = await ldSignature.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false);
+ if (!verified) {
+ return 'skip: LD-Signatureの検証に失敗しました';
+ }
+
+ // もう一度actorチェック
+ if (authUser.user.uri !== activity.actor) {
+ return `skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`;
+ }
+
+ // ブロックしてたら中断
+ const ldHost = this.utilityService.extractDbHost(authUser.user.uri);
+ if (meta.blockedHosts.includes(ldHost)) {
+ return `Blocked request: ${ldHost}`;
+ }
+ } else {
+ return `skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`;
+ }
+ }
+
+ // activity.idがあればホストが署名者のホストであることを確認する
+ if (typeof activity.id === 'string') {
+ const signerHost = this.utilityService.extractDbHost(authUser.user.uri!);
+ const activityIdHost = this.utilityService.extractDbHost(activity.id);
+ if (signerHost !== activityIdHost) {
+ return `skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`;
+ }
+ }
+
+ // Update stats
+ this.federatedInstanceService.registerOrFetchInstanceDoc(authUser.user.host).then(i => {
+ this.instancesRepository.update(i.id, {
+ latestRequestReceivedAt: new Date(),
+ lastCommunicatedAt: new Date(),
+ isNotResponding: false,
+ });
+
+ this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
+
+ this.instanceChart.requestReceived(i.host);
+ this.apRequestChart.inbox();
+ this.federationChart.inbox(i.host);
+ });
+
+ // アクティビティを処理
+ await this.apInboxService.performActivity(authUser.user, activity);
+ return 'ok';
+ }
+}
diff --git a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts
new file mode 100644
index 0000000000..f976232a24
--- /dev/null
+++ b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts
@@ -0,0 +1,61 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { In, MoreThan } from 'typeorm';
+import { DI } from '@/di-symbols.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import FederationChart from '@/core/chart/charts/federation.js';
+import NotesChart from '@/core/chart/charts/notes.js';
+import UsersChart from '@/core/chart/charts/users.js';
+import ActiveUsersChart from '@/core/chart/charts/active-users.js';
+import InstanceChart from '@/core/chart/charts/instance.js';
+import PerUserNotesChart from '@/core/chart/charts/per-user-notes.js';
+import DriveChart from '@/core/chart/charts/drive.js';
+import PerUserReactionsChart from '@/core/chart/charts/per-user-reactions.js';
+import HashtagChart from '@/core/chart/charts/hashtag.js';
+import PerUserFollowingChart from '@/core/chart/charts/per-user-following.js';
+import PerUserDriveChart from '@/core/chart/charts/per-user-drive.js';
+import ApRequestChart from '@/core/chart/charts/ap-request.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+
+@Injectable()
+export class ResyncChartsProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ private federationChart: FederationChart,
+ private notesChart: NotesChart,
+ private usersChart: UsersChart,
+ private activeUsersChart: ActiveUsersChart,
+ private instanceChart: InstanceChart,
+ private perUserNotesChart: PerUserNotesChart,
+ private driveChart: DriveChart,
+ private perUserReactionsChart: PerUserReactionsChart,
+ private hashtagChart: HashtagChart,
+ private perUserFollowingChart: PerUserFollowingChart,
+ private perUserDriveChart: PerUserDriveChart,
+ private apRequestChart: ApRequestChart,
+
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('resync-charts');
+ }
+
+ public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> {
+ this.#logger.info('Resync charts...');
+
+ // TODO: ユーザーごとのチャートも更新する
+ // TODO: インスタンスごとのチャートも更新する
+ await Promise.all([
+ this.driveChart.resync(),
+ this.notesChart.resync(),
+ this.usersChart.resync(),
+ ]);
+
+ this.#logger.succ('All charts successfully resynced.');
+ done();
+ }
+}
diff --git a/packages/backend/src/queue/processors/TickChartsProcessorService.ts b/packages/backend/src/queue/processors/TickChartsProcessorService.ts
new file mode 100644
index 0000000000..d1ca3c4576
--- /dev/null
+++ b/packages/backend/src/queue/processors/TickChartsProcessorService.ts
@@ -0,0 +1,68 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { In, MoreThan } from 'typeorm';
+import { DI } from '@/di-symbols.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import FederationChart from '@/core/chart/charts/federation.js';
+import NotesChart from '@/core/chart/charts/notes.js';
+import UsersChart from '@/core/chart/charts/users.js';
+import ActiveUsersChart from '@/core/chart/charts/active-users.js';
+import InstanceChart from '@/core/chart/charts/instance.js';
+import PerUserNotesChart from '@/core/chart/charts/per-user-notes.js';
+import DriveChart from '@/core/chart/charts/drive.js';
+import PerUserReactionsChart from '@/core/chart/charts/per-user-reactions.js';
+import HashtagChart from '@/core/chart/charts/hashtag.js';
+import PerUserFollowingChart from '@/core/chart/charts/per-user-following.js';
+import PerUserDriveChart from '@/core/chart/charts/per-user-drive.js';
+import ApRequestChart from '@/core/chart/charts/ap-request.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+
+@Injectable()
+export class TickChartsProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ private federationChart: FederationChart,
+ private notesChart: NotesChart,
+ private usersChart: UsersChart,
+ private activeUsersChart: ActiveUsersChart,
+ private instanceChart: InstanceChart,
+ private perUserNotesChart: PerUserNotesChart,
+ private driveChart: DriveChart,
+ private perUserReactionsChart: PerUserReactionsChart,
+ private hashtagChart: HashtagChart,
+ private perUserFollowingChart: PerUserFollowingChart,
+ private perUserDriveChart: PerUserDriveChart,
+ private apRequestChart: ApRequestChart,
+
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('tick-charts');
+ }
+
+ public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> {
+ this.#logger.info('Tick charts...');
+
+ await Promise.all([
+ this.federationChart.tick(false),
+ this.notesChart.tick(false),
+ this.usersChart.tick(false),
+ this.activeUsersChart.tick(false),
+ this.instanceChart.tick(false),
+ this.perUserNotesChart.tick(false),
+ this.driveChart.tick(false),
+ this.perUserReactionsChart.tick(false),
+ this.hashtagChart.tick(false),
+ this.perUserFollowingChart.tick(false),
+ this.perUserDriveChart.tick(false),
+ this.apRequestChart.tick(false),
+ ]);
+
+ this.#logger.succ('All charts successfully ticked.');
+ done();
+ }
+}
diff --git a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts
new file mode 100644
index 0000000000..5723fe2eeb
--- /dev/null
+++ b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts
@@ -0,0 +1,79 @@
+import { Inject, Injectable } from '@nestjs/common';
+import { IsNull, MoreThan } from 'typeorm';
+import { DI } from '@/di-symbols.js';
+import { WebhooksRepository } from '@/models/index.js';
+import { Config } from '@/config.js';
+import type Logger from '@/logger.js';
+import { HttpRequestService } from '@/core/HttpRequestService.js';
+import { StatusError } from '@/misc/status-error.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type Bull from 'bull';
+import type { WebhookDeliverJobData } from '../types.js';
+
+@Injectable()
+export class WebhookDeliverProcessorService {
+ #logger: Logger;
+
+ constructor(
+ @Inject(DI.config)
+ private config: Config,
+
+ @Inject(DI.webhooksRepository)
+ private webhooksRepository: WebhooksRepository,
+
+ private httpRequestService: HttpRequestService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.#logger = this.queueLoggerService.logger.createSubLogger('webhook');
+ }
+
+ public async process(job: Bull.Job<WebhookDeliverJobData>): Promise<string> {
+ try {
+ this.#logger.debug(`delivering ${job.data.webhookId}`);
+
+ const res = await this.httpRequestService.getResponse({
+ url: job.data.to,
+ method: 'POST',
+ headers: {
+ 'User-Agent': 'Misskey-Hooks',
+ 'X-Misskey-Host': this.config.host,
+ 'X-Misskey-Hook-Id': job.data.webhookId,
+ 'X-Misskey-Hook-Secret': job.data.secret,
+ },
+ body: JSON.stringify({
+ hookId: job.data.webhookId,
+ userId: job.data.userId,
+ eventId: job.data.eventId,
+ createdAt: job.data.createdAt,
+ type: job.data.type,
+ body: job.data.content,
+ }),
+ });
+
+ this.webhooksRepository.update({ id: job.data.webhookId }, {
+ latestSentAt: new Date(),
+ latestStatus: res.status,
+ });
+
+ return 'Success';
+ } catch (res) {
+ this.webhooksRepository.update({ id: job.data.webhookId }, {
+ latestSentAt: new Date(),
+ latestStatus: res instanceof StatusError ? res.statusCode : 1,
+ });
+
+ if (res instanceof StatusError) {
+ // 4xx
+ if (res.isClientError) {
+ return `${res.statusCode} ${res.statusMessage}`;
+ }
+
+ // 5xx etc.
+ throw `${res.statusCode} ${res.statusMessage}`;
+ } else {
+ // DNS error, socket error, timeout ...
+ throw res;
+ }
+ }
+ }
+}
diff --git a/packages/backend/src/queue/processors/db/delete-account.ts b/packages/backend/src/queue/processors/db/delete-account.ts
deleted file mode 100644
index c1657b4be6..0000000000
--- a/packages/backend/src/queue/processors/db/delete-account.ts
+++ /dev/null
@@ -1,94 +0,0 @@
-import Bull from 'bull';
-import { queueLogger } from '../../logger.js';
-import { DriveFiles, Notes, UserProfiles, Users } from '@/models/index.js';
-import { DbUserDeleteJobData } from '@/queue/types.js';
-import { Note } from '@/models/entities/note.js';
-import { DriveFile } from '@/models/entities/drive-file.js';
-import { MoreThan } from 'typeorm';
-import { deleteFileSync } from '@/services/drive/delete-file.js';
-import { sendEmail } from '@/services/send-email.js';
-
-const logger = queueLogger.createSubLogger('delete-account');
-
-export async function deleteAccount(job: Bull.Job<DbUserDeleteJobData>): Promise<string | void> {
- logger.info(`Deleting account of ${job.data.user.id} ...`);
-
- const user = await Users.findOneBy({ id: job.data.user.id });
- if (user == null) {
- return;
- }
-
- { // Delete notes
- let cursor: Note['id'] | null = null;
-
- while (true) {
- const notes = await Notes.find({
- where: {
- userId: user.id,
- ...(cursor ? { id: MoreThan(cursor) } : {}),
- },
- take: 100,
- order: {
- id: 1,
- },
- }) as Note[];
-
- if (notes.length === 0) {
- break;
- }
-
- cursor = notes[notes.length - 1].id;
-
- await Notes.delete(notes.map(note => note.id));
- }
-
- logger.succ(`All of notes deleted`);
- }
-
- { // Delete files
- let cursor: DriveFile['id'] | null = null;
-
- while (true) {
- const files = await DriveFiles.find({
- where: {
- userId: user.id,
- ...(cursor ? { id: MoreThan(cursor) } : {}),
- },
- take: 10,
- order: {
- id: 1,
- },
- }) as DriveFile[];
-
- if (files.length === 0) {
- break;
- }
-
- cursor = files[files.length - 1].id;
-
- for (const file of files) {
- await deleteFileSync(file);
- }
- }
-
- logger.succ(`All of files deleted`);
- }
-
- { // Send email notification
- const profile = await UserProfiles.findOneByOrFail({ userId: user.id });
- if (profile.email && profile.emailVerified) {
- sendEmail(profile.email, 'Account deleted',
- `Your account has been deleted.`,
- `Your account has been deleted.`);
- }
- }
-
- // soft指定されている場合は物理削除しない
- if (job.data.soft) {
- // nop
- } else {
- await Users.delete(job.data.user.id);
- }
-
- return 'Account deleted';
-}
diff --git a/packages/backend/src/queue/processors/db/delete-drive-files.ts b/packages/backend/src/queue/processors/db/delete-drive-files.ts
deleted file mode 100644
index b3832d9f04..0000000000
--- a/packages/backend/src/queue/processors/db/delete-drive-files.ts
+++ /dev/null
@@ -1,56 +0,0 @@
-import Bull from 'bull';
-
-import { queueLogger } from '../../logger.js';
-import { deleteFileSync } from '@/services/drive/delete-file.js';
-import { Users, DriveFiles } from '@/models/index.js';
-import { MoreThan } from 'typeorm';
-import { DbUserJobData } from '@/queue/types.js';
-
-const logger = queueLogger.createSubLogger('delete-drive-files');
-
-export async function deleteDriveFiles(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
- logger.info(`Deleting drive files of ${job.data.user.id} ...`);
-
- const user = await Users.findOneBy({ id: job.data.user.id });
- if (user == null) {
- done();
- return;
- }
-
- let deletedCount = 0;
- let cursor: any = null;
-
- while (true) {
- const files = await DriveFiles.find({
- where: {
- userId: user.id,
- ...(cursor ? { id: MoreThan(cursor) } : {}),
- },
- take: 100,
- order: {
- id: 1,
- },
- });
-
- if (files.length === 0) {
- job.progress(100);
- break;
- }
-
- cursor = files[files.length - 1].id;
-
- for (const file of files) {
- await deleteFileSync(file);
- deletedCount++;
- }
-
- const total = await DriveFiles.countBy({
- userId: user.id,
- });
-
- job.progress(deletedCount / total);
- }
-
- logger.succ(`All drive files (${deletedCount}) of ${user.id} has been deleted.`);
- done();
-}
diff --git a/packages/backend/src/queue/processors/db/export-blocking.ts b/packages/backend/src/queue/processors/db/export-blocking.ts
deleted file mode 100644
index f5e0424a79..0000000000
--- a/packages/backend/src/queue/processors/db/export-blocking.ts
+++ /dev/null
@@ -1,93 +0,0 @@
-import Bull from 'bull';
-import * as fs from 'node:fs';
-
-import { queueLogger } from '../../logger.js';
-import { addFile } from '@/services/drive/add-file.js';
-import { format as dateFormat } from 'date-fns';
-import { getFullApAccount } from '@/misc/convert-host.js';
-import { createTemp } from '@/misc/create-temp.js';
-import { Users, Blockings } from '@/models/index.js';
-import { MoreThan } from 'typeorm';
-import { DbUserJobData } from '@/queue/types.js';
-
-const logger = queueLogger.createSubLogger('export-blocking');
-
-export async function exportBlocking(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
- logger.info(`Exporting blocking of ${job.data.user.id} ...`);
-
- const user = await Users.findOneBy({ id: job.data.user.id });
- if (user == null) {
- done();
- return;
- }
-
- // Create temp file
- const [path, cleanup] = await createTemp();
-
- logger.info(`Temp file is ${path}`);
-
- try {
- const stream = fs.createWriteStream(path, { flags: 'a' });
-
- let exportedCount = 0;
- let cursor: any = null;
-
- while (true) {
- const blockings = await Blockings.find({
- where: {
- blockerId: user.id,
- ...(cursor ? { id: MoreThan(cursor) } : {}),
- },
- take: 100,
- order: {
- id: 1,
- },
- });
-
- if (blockings.length === 0) {
- job.progress(100);
- break;
- }
-
- cursor = blockings[blockings.length - 1].id;
-
- for (const block of blockings) {
- const u = await Users.findOneBy({ id: block.blockeeId });
- if (u == null) {
- exportedCount++; continue;
- }
-
- const content = getFullApAccount(u.username, u.host);
- await new Promise<void>((res, rej) => {
- stream.write(content + '\n', err => {
- if (err) {
- logger.error(err);
- rej(err);
- } else {
- res();
- }
- });
- });
- exportedCount++;
- }
-
- const total = await Blockings.countBy({
- blockerId: user.id,
- });
-
- job.progress(exportedCount / total);
- }
-
- stream.end();
- logger.succ(`Exported to: ${path}`);
-
- const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv';
- const driveFile = await addFile({ user, path, name: fileName, force: true });
-
- logger.succ(`Exported to: ${driveFile.id}`);
- } finally {
- cleanup();
- }
-
- done();
-}
diff --git a/packages/backend/src/queue/processors/db/export-custom-emojis.ts b/packages/backend/src/queue/processors/db/export-custom-emojis.ts
deleted file mode 100644
index 3da887cda2..0000000000
--- a/packages/backend/src/queue/processors/db/export-custom-emojis.ts
+++ /dev/null
@@ -1,114 +0,0 @@
-import Bull from 'bull';
-import * as fs from 'node:fs';
-
-import { ulid } from 'ulid';
-import mime from 'mime-types';
-import archiver from 'archiver';
-import { queueLogger } from '../../logger.js';
-import { addFile } from '@/services/drive/add-file.js';
-import { format as dateFormat } from 'date-fns';
-import { Users, Emojis } from '@/models/index.js';
-import { } from '@/queue/types.js';
-import { createTemp, createTempDir } from '@/misc/create-temp.js';
-import { downloadUrl } from '@/misc/download-url.js';
-import config from '@/config/index.js';
-import { IsNull } from 'typeorm';
-
-const logger = queueLogger.createSubLogger('export-custom-emojis');
-
-export async function exportCustomEmojis(job: Bull.Job, done: () => void): Promise<void> {
- logger.info(`Exporting custom emojis ...`);
-
- const user = await Users.findOneBy({ id: job.data.user.id });
- if (user == null) {
- done();
- return;
- }
-
- const [path, cleanup] = await createTempDir();
-
- logger.info(`Temp dir is ${path}`);
-
- const metaPath = path + '/meta.json';
-
- fs.writeFileSync(metaPath, '', 'utf-8');
-
- const metaStream = fs.createWriteStream(metaPath, { flags: 'a' });
-
- const writeMeta = (text: string): Promise<void> => {
- return new Promise<void>((res, rej) => {
- metaStream.write(text, err => {
- if (err) {
- logger.error(err);
- rej(err);
- } else {
- res();
- }
- });
- });
- };
-
- await writeMeta(`{"metaVersion":2,"host":"${config.host}","exportedAt":"${new Date().toString()}","emojis":[`);
-
- const customEmojis = await Emojis.find({
- where: {
- host: IsNull(),
- },
- order: {
- id: 'ASC',
- },
- });
-
- for (const emoji of customEmojis) {
- const ext = mime.extension(emoji.type);
- const fileName = emoji.name + (ext ? '.' + ext : '');
- const emojiPath = path + '/' + fileName;
- fs.writeFileSync(emojiPath, '', 'binary');
- let downloaded = false;
-
- try {
- await downloadUrl(emoji.originalUrl, emojiPath);
- downloaded = true;
- } catch (e) { // TODO: 何度か再試行
- logger.error(e instanceof Error ? e : new Error(e as string));
- }
-
- if (!downloaded) {
- fs.unlinkSync(emojiPath);
- }
-
- const content = JSON.stringify({
- fileName: fileName,
- downloaded: downloaded,
- emoji: emoji,
- });
- const isFirst = customEmojis.indexOf(emoji) === 0;
-
- await writeMeta(isFirst ? content : ',\n' + content);
- }
-
- await writeMeta(']}');
-
- metaStream.end();
-
- // Create archive
- const [archivePath, archiveCleanup] = await createTemp();
- const archiveStream = fs.createWriteStream(archivePath);
- const archive = archiver('zip', {
- zlib: { level: 0 },
- });
- archiveStream.on('close', async () => {
- logger.succ(`Exported to: ${archivePath}`);
-
- const fileName = 'custom-emojis-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.zip';
- const driveFile = await addFile({ user, path: archivePath, name: fileName, force: true });
-
- logger.succ(`Exported to: ${driveFile.id}`);
- cleanup();
- archiveCleanup();
- done();
- });
- archive.pipe(archiveStream);
- archive.directory(path, false);
- archive.finalize();
-}
diff --git a/packages/backend/src/queue/processors/db/export-following.ts b/packages/backend/src/queue/processors/db/export-following.ts
deleted file mode 100644
index 4ac165567b..0000000000
--- a/packages/backend/src/queue/processors/db/export-following.ts
+++ /dev/null
@@ -1,94 +0,0 @@
-import Bull from 'bull';
-import * as fs from 'node:fs';
-
-import { queueLogger } from '../../logger.js';
-import { addFile } from '@/services/drive/add-file.js';
-import { format as dateFormat } from 'date-fns';
-import { getFullApAccount } from '@/misc/convert-host.js';
-import { createTemp } from '@/misc/create-temp.js';
-import { Users, Followings, Mutings } from '@/models/index.js';
-import { In, MoreThan, Not } from 'typeorm';
-import { DbUserJobData } from '@/queue/types.js';
-import { Following } from '@/models/entities/following.js';
-
-const logger = queueLogger.createSubLogger('export-following');
-
-export async function exportFollowing(job: Bull.Job<DbUserJobData>, done: () => void): Promise<void> {
- logger.info(`Exporting following of ${job.data.user.id} ...`);
-
- const user = await Users.findOneBy({ id: job.data.user.id });
- if (user == null) {
- done();
- return;
- }
-
- // Create temp file
- const [path, cleanup] = await createTemp();
-
- logger.info(`Temp file is ${path}`);
-
- try {
- const stream = fs.createWriteStream(path, { flags: 'a' });
-
- let cursor: Following['id'] | null = null;
-
- const mutings = job.data.excludeMuting ? await Mutings.findBy({
- muterId: user.id,
- }) : [];
-
- while (true) {
- const followings = await Followings.find({
- where: {
- followerId: user.id,
- ...(mutings.length > 0 ? { followeeId: Not(In(mutings.map(x => x.muteeId))) } : {}),
- ...(cursor ? { id: MoreThan(cursor) } : {}),
- },
- take: 100,
- order: {
- id: 1,
- },
- }) as Following[];
-
- if (followings.length === 0) {
- break;
- }
-
- cursor = followings[followings.length - 1].id;
-
- for (const following of followings) {
- const u = await Users.findOneBy({ id: following.followeeId });
- if (u == null) {
- continue;
- }
-
- if (job.data.excludeInactive && u.updatedAt && (Date.now() - u.updatedAt.getTime() > 1000 * 60 * 60 * 24 * 90)) {
- continue;
- }
-
- const content = getFullApAccount(u.username, u.host);
- await new Promise<void>((res, rej) => {
- stream.write(content + '\n', err => {
- if (err) {
- logger.error(err);
- rej(err);
- } else {
- res();
- }
- });
- });
- }
- }
-
- stream.end();
- logger.succ(`Exported to: ${path}`);
-
- const fileName = 'following-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv';
- const driveFile = await addFile({ user, path, name: fileName, force: true });
-
- logger.succ(`Exported to: ${driveFile.id}`);
- } finally {
- cleanup();
- }
-
- done();
-}
diff --git a/packages/backend/src/queue/processors/db/export-mute.ts b/packages/backend/src/queue/processors/db/export-mute.ts
deleted file mode 100644
index 6a36cfa072..0000000000
--- a/packages/backend/src/queue/processors/db/export-mute.ts
+++ /dev/null
@@ -1,94 +0,0 @@
-import Bull from 'bull';
-import * as fs from 'node:fs';
-
-import { queueLogger } from '../../logger.js';
-import { addFile } from '@/services/drive/add-file.js';
-import { format as dateFormat } from 'date-fns';
-import { getFullApAccount } from '@/misc/convert-host.js';
-import { createTemp } from '@/misc/create-temp.js';
-import { Users, Mutings } from '@/models/index.js';
-import { IsNull, MoreThan } from 'typeorm';
-import { DbUserJobData } from '@/queue/types.js';
-
-const logger = queueLogger.createSubLogger('export-mute');
-
-export async function exportMute(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
- logger.info(`Exporting mute of ${job.data.user.id} ...`);
-
- const user = await Users.findOneBy({ id: job.data.user.id });
- if (user == null) {
- done();
- return;
- }
-
- // Create temp file
- const [path, cleanup] = await createTemp();
-
- logger.info(`Temp file is ${path}`);
-
- try {
- const stream = fs.createWriteStream(path, { flags: 'a' });
-
- let exportedCount = 0;
- let cursor: any = null;
-
- while (true) {
- const mutes = await Mutings.find({
- where: {
- muterId: user.id,
- expiresAt: IsNull(),
- ...(cursor ? { id: MoreThan(cursor) } : {}),
- },
- take: 100,
- order: {
- id: 1,
- },
- });
-
- if (mutes.length === 0) {
- job.progress(100);
- break;
- }
-
- cursor = mutes[mutes.length - 1].id;
-
- for (const mute of mutes) {
- const u = await Users.findOneBy({ id: mute.muteeId });
- if (u == null) {
- exportedCount++; continue;
- }
-
- const content = getFullApAccount(u.username, u.host);
- await new Promise<void>((res, rej) => {
- stream.write(content + '\n', err => {
- if (err) {
- logger.error(err);
- rej(err);
- } else {
- res();
- }
- });
- });
- exportedCount++;
- }
-
- const total = await Mutings.countBy({
- muterId: user.id,
- });
-
- job.progress(exportedCount / total);
- }
-
- stream.end();
- logger.succ(`Exported to: ${path}`);
-
- const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv';
- const driveFile = await addFile({ user, path, name: fileName, force: true });
-
- logger.succ(`Exported to: ${driveFile.id}`);
- } finally {
- cleanup();
- }
-
- done();
-}
diff --git a/packages/backend/src/queue/processors/db/export-notes.ts b/packages/backend/src/queue/processors/db/export-notes.ts
deleted file mode 100644
index 051fcdf385..0000000000
--- a/packages/backend/src/queue/processors/db/export-notes.ts
+++ /dev/null
@@ -1,118 +0,0 @@
-import Bull from 'bull';
-import * as fs from 'node:fs';
-
-import { queueLogger } from '../../logger.js';
-import { addFile } from '@/services/drive/add-file.js';
-import { format as dateFormat } from 'date-fns';
-import { Users, Notes, Polls } from '@/models/index.js';
-import { MoreThan } from 'typeorm';
-import { Note } from '@/models/entities/note.js';
-import { Poll } from '@/models/entities/poll.js';
-import { DbUserJobData } from '@/queue/types.js';
-import { createTemp } from '@/misc/create-temp.js';
-
-const logger = queueLogger.createSubLogger('export-notes');
-
-export async function exportNotes(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
- logger.info(`Exporting notes of ${job.data.user.id} ...`);
-
- const user = await Users.findOneBy({ id: job.data.user.id });
- if (user == null) {
- done();
- return;
- }
-
- // Create temp file
- const [path, cleanup] = await createTemp();
-
- logger.info(`Temp file is ${path}`);
-
- try {
- const stream = fs.createWriteStream(path, { flags: 'a' });
-
- const write = (text: string): Promise<void> => {
- return new Promise<void>((res, rej) => {
- stream.write(text, err => {
- if (err) {
- logger.error(err);
- rej(err);
- } else {
- res();
- }
- });
- });
- };
-
- await write('[');
-
- let exportedNotesCount = 0;
- let cursor: Note['id'] | null = null;
-
- while (true) {
- const notes = await Notes.find({
- where: {
- userId: user.id,
- ...(cursor ? { id: MoreThan(cursor) } : {}),
- },
- take: 100,
- order: {
- id: 1,
- },
- }) as Note[];
-
- if (notes.length === 0) {
- job.progress(100);
- break;
- }
-
- cursor = notes[notes.length - 1].id;
-
- for (const note of notes) {
- let poll: Poll | undefined;
- if (note.hasPoll) {
- poll = await Polls.findOneByOrFail({ noteId: note.id });
- }
- const content = JSON.stringify(serialize(note, poll));
- const isFirst = exportedNotesCount === 0;
- await write(isFirst ? content : ',\n' + content);
- exportedNotesCount++;
- }
-
- const total = await Notes.countBy({
- userId: user.id,
- });
-
- job.progress(exportedNotesCount / total);
- }
-
- await write(']');
-
- stream.end();
- logger.succ(`Exported to: ${path}`);
-
- const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json';
- const driveFile = await addFile({ user, path, name: fileName, force: true });
-
- logger.succ(`Exported to: ${driveFile.id}`);
- } finally {
- cleanup();
- }
-
- done();
-}
-
-function serialize(note: Note, poll: Poll | null = null): Record<string, unknown> {
- return {
- id: note.id,
- text: note.text,
- createdAt: note.createdAt,
- fileIds: note.fileIds,
- replyId: note.replyId,
- renoteId: note.renoteId,
- poll: poll,
- cw: note.cw,
- visibility: note.visibility,
- visibleUserIds: note.visibleUserIds,
- localOnly: note.localOnly,
- };
-}
diff --git a/packages/backend/src/queue/processors/db/export-user-lists.ts b/packages/backend/src/queue/processors/db/export-user-lists.ts
deleted file mode 100644
index 71dd72df27..0000000000
--- a/packages/backend/src/queue/processors/db/export-user-lists.ts
+++ /dev/null
@@ -1,70 +0,0 @@
-import Bull from 'bull';
-import * as fs from 'node:fs';
-
-import { queueLogger } from '../../logger.js';
-import { addFile } from '@/services/drive/add-file.js';
-import { format as dateFormat } from 'date-fns';
-import { getFullApAccount } from '@/misc/convert-host.js';
-import { createTemp } from '@/misc/create-temp.js';
-import { Users, UserLists, UserListJoinings } from '@/models/index.js';
-import { In } from 'typeorm';
-import { DbUserJobData } from '@/queue/types.js';
-
-const logger = queueLogger.createSubLogger('export-user-lists');
-
-export async function exportUserLists(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
- logger.info(`Exporting user lists of ${job.data.user.id} ...`);
-
- const user = await Users.findOneBy({ id: job.data.user.id });
- if (user == null) {
- done();
- return;
- }
-
- const lists = await UserLists.findBy({
- userId: user.id,
- });
-
- // Create temp file
- const [path, cleanup] = await createTemp();
-
- logger.info(`Temp file is ${path}`);
-
- try {
- const stream = fs.createWriteStream(path, { flags: 'a' });
-
- for (const list of lists) {
- const joinings = await UserListJoinings.findBy({ userListId: list.id });
- const users = await Users.findBy({
- id: In(joinings.map(j => j.userId)),
- });
-
- for (const u of users) {
- const acct = getFullApAccount(u.username, u.host);
- const content = `${list.name},${acct}`;
- await new Promise<void>((res, rej) => {
- stream.write(content + '\n', err => {
- if (err) {
- logger.error(err);
- rej(err);
- } else {
- res();
- }
- });
- });
- }
- }
-
- stream.end();
- logger.succ(`Exported to: ${path}`);
-
- const fileName = 'user-lists-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv';
- const driveFile = await addFile({ user, path, name: fileName, force: true });
-
- logger.succ(`Exported to: ${driveFile.id}`);
- } finally {
- cleanup();
- }
-
- done();
-}
diff --git a/packages/backend/src/queue/processors/db/import-blocking.ts b/packages/backend/src/queue/processors/db/import-blocking.ts
deleted file mode 100644
index 8bddf34bc2..0000000000
--- a/packages/backend/src/queue/processors/db/import-blocking.ts
+++ /dev/null
@@ -1,75 +0,0 @@
-import Bull from 'bull';
-
-import { queueLogger } from '../../logger.js';
-import * as Acct from '@/misc/acct.js';
-import { resolveUser } from '@/remote/resolve-user.js';
-import { downloadTextFile } from '@/misc/download-text-file.js';
-import { isSelfHost, toPuny } from '@/misc/convert-host.js';
-import { Users, DriveFiles, Blockings } from '@/models/index.js';
-import { DbUserImportJobData } from '@/queue/types.js';
-import block from '@/services/blocking/create.js';
-import { IsNull } from 'typeorm';
-
-const logger = queueLogger.createSubLogger('import-blocking');
-
-export async function importBlocking(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> {
- logger.info(`Importing blocking of ${job.data.user.id} ...`);
-
- const user = await Users.findOneBy({ id: job.data.user.id });
- if (user == null) {
- done();
- return;
- }
-
- const file = await DriveFiles.findOneBy({
- id: job.data.fileId,
- });
- if (file == null) {
- done();
- return;
- }
-
- const csv = await downloadTextFile(file.url);
-
- let linenum = 0;
-
- for (const line of csv.trim().split('\n')) {
- linenum++;
-
- try {
- const acct = line.split(',')[0].trim();
- const { username, host } = Acct.parse(acct);
-
- let target = isSelfHost(host!) ? await Users.findOneBy({
- host: IsNull(),
- usernameLower: username.toLowerCase(),
- }) : await Users.findOneBy({
- host: toPuny(host!),
- usernameLower: username.toLowerCase(),
- });
-
- if (host == null && target == null) continue;
-
- if (target == null) {
- target = await resolveUser(username, host);
- }
-
- if (target == null) {
- throw `cannot resolve user: @${username}@${host}`;
- }
-
- // skip myself
- if (target.id === job.data.user.id) continue;
-
- logger.info(`Block[${linenum}] ${target.id} ...`);
-
- await block(user, target);
- } catch (e) {
- logger.warn(`Error in line:${linenum} ${e}`);
- }
- }
-
- logger.succ('Imported');
- done();
-}
-
diff --git a/packages/backend/src/queue/processors/db/import-custom-emojis.ts b/packages/backend/src/queue/processors/db/import-custom-emojis.ts
deleted file mode 100644
index 64dfe85374..0000000000
--- a/packages/backend/src/queue/processors/db/import-custom-emojis.ts
+++ /dev/null
@@ -1,81 +0,0 @@
-import Bull from 'bull';
-import * as fs from 'node:fs';
-import unzipper from 'unzipper';
-
-import { queueLogger } from '../../logger.js';
-import { createTempDir } from '@/misc/create-temp.js';
-import { downloadUrl } from '@/misc/download-url.js';
-import { DriveFiles, Emojis } from '@/models/index.js';
-import { DbUserImportJobData } from '@/queue/types.js';
-import { addFile } from '@/services/drive/add-file.js';
-import { genId } from '@/misc/gen-id.js';
-import { db } from '@/db/postgre.js';
-
-const logger = queueLogger.createSubLogger('import-custom-emojis');
-
-// TODO: 名前衝突時の動作を選べるようにする
-export async function importCustomEmojis(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> {
- logger.info(`Importing custom emojis ...`);
-
- const file = await DriveFiles.findOneBy({
- id: job.data.fileId,
- });
- if (file == null) {
- done();
- return;
- }
-
- const [path, cleanup] = await createTempDir();
-
- logger.info(`Temp dir is ${path}`);
-
- const destPath = path + '/emojis.zip';
-
- try {
- fs.writeFileSync(destPath, '', 'binary');
- await downloadUrl(file.url, destPath);
- } catch (e) { // TODO: 何度か再試行
- if (e instanceof Error || typeof e === 'string') {
- logger.error(e);
- }
- throw e;
- }
-
- const outputPath = path + '/emojis';
- const unzipStream = fs.createReadStream(destPath);
- const extractor = unzipper.Extract({ path: outputPath });
- extractor.on('close', async () => {
- const metaRaw = fs.readFileSync(outputPath + '/meta.json', 'utf-8');
- const meta = JSON.parse(metaRaw);
-
- for (const record of meta.emojis) {
- if (!record.downloaded) continue;
- const emojiInfo = record.emoji;
- const emojiPath = outputPath + '/' + record.fileName;
- await Emojis.delete({
- name: emojiInfo.name,
- });
- const driveFile = await addFile({ user: null, path: emojiPath, name: record.fileName, force: true });
- const emoji = await Emojis.insert({
- id: genId(),
- updatedAt: new Date(),
- name: emojiInfo.name,
- category: emojiInfo.category,
- host: null,
- aliases: emojiInfo.aliases,
- originalUrl: driveFile.url,
- publicUrl: driveFile.webpublicUrl ?? driveFile.url,
- type: driveFile.webpublicType ?? driveFile.type,
- }).then(x => Emojis.findOneByOrFail(x.identifiers[0]));
- }
-
- await db.queryResultCache!.remove(['meta_emojis']);
-
- cleanup();
-
- logger.succ('Imported');
- done();
- });
- unzipStream.pipe(extractor);
- logger.succ(`Unzipping to ${outputPath}`);
-}
diff --git a/packages/backend/src/queue/processors/db/import-following.ts b/packages/backend/src/queue/processors/db/import-following.ts
deleted file mode 100644
index 8ce2c367d6..0000000000
--- a/packages/backend/src/queue/processors/db/import-following.ts
+++ /dev/null
@@ -1,74 +0,0 @@
-import Bull from 'bull';
-
-import { queueLogger } from '../../logger.js';
-import follow from '@/services/following/create.js';
-import * as Acct from '@/misc/acct.js';
-import { resolveUser } from '@/remote/resolve-user.js';
-import { downloadTextFile } from '@/misc/download-text-file.js';
-import { isSelfHost, toPuny } from '@/misc/convert-host.js';
-import { Users, DriveFiles } from '@/models/index.js';
-import { DbUserImportJobData } from '@/queue/types.js';
-import { IsNull } from 'typeorm';
-
-const logger = queueLogger.createSubLogger('import-following');
-
-export async function importFollowing(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> {
- logger.info(`Importing following of ${job.data.user.id} ...`);
-
- const user = await Users.findOneBy({ id: job.data.user.id });
- if (user == null) {
- done();
- return;
- }
-
- const file = await DriveFiles.findOneBy({
- id: job.data.fileId,
- });
- if (file == null) {
- done();
- return;
- }
-
- const csv = await downloadTextFile(file.url);
-
- let linenum = 0;
-
- for (const line of csv.trim().split('\n')) {
- linenum++;
-
- try {
- const acct = line.split(',')[0].trim();
- const { username, host } = Acct.parse(acct);
-
- let target = isSelfHost(host!) ? await Users.findOneBy({
- host: IsNull(),
- usernameLower: username.toLowerCase(),
- }) : await Users.findOneBy({
- host: toPuny(host!),
- usernameLower: username.toLowerCase(),
- });
-
- if (host == null && target == null) continue;
-
- if (target == null) {
- target = await resolveUser(username, host);
- }
-
- if (target == null) {
- throw `cannot resolve user: @${username}@${host}`;
- }
-
- // skip myself
- if (target.id === job.data.user.id) continue;
-
- logger.info(`Follow[${linenum}] ${target.id} ...`);
-
- follow(user, target);
- } catch (e) {
- logger.warn(`Error in line:${linenum} ${e}`);
- }
- }
-
- logger.succ('Imported');
- done();
-}
diff --git a/packages/backend/src/queue/processors/db/import-muting.ts b/packages/backend/src/queue/processors/db/import-muting.ts
deleted file mode 100644
index 8552b797be..0000000000
--- a/packages/backend/src/queue/processors/db/import-muting.ts
+++ /dev/null
@@ -1,84 +0,0 @@
-import Bull from 'bull';
-
-import { queueLogger } from '../../logger.js';
-import * as Acct from '@/misc/acct.js';
-import { resolveUser } from '@/remote/resolve-user.js';
-import { downloadTextFile } from '@/misc/download-text-file.js';
-import { isSelfHost, toPuny } from '@/misc/convert-host.js';
-import { Users, DriveFiles, Mutings } from '@/models/index.js';
-import { DbUserImportJobData } from '@/queue/types.js';
-import { User } from '@/models/entities/user.js';
-import { genId } from '@/misc/gen-id.js';
-import { IsNull } from 'typeorm';
-
-const logger = queueLogger.createSubLogger('import-muting');
-
-export async function importMuting(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> {
- logger.info(`Importing muting of ${job.data.user.id} ...`);
-
- const user = await Users.findOneBy({ id: job.data.user.id });
- if (user == null) {
- done();
- return;
- }
-
- const file = await DriveFiles.findOneBy({
- id: job.data.fileId,
- });
- if (file == null) {
- done();
- return;
- }
-
- const csv = await downloadTextFile(file.url);
-
- let linenum = 0;
-
- for (const line of csv.trim().split('\n')) {
- linenum++;
-
- try {
- const acct = line.split(',')[0].trim();
- const { username, host } = Acct.parse(acct);
-
- let target = isSelfHost(host!) ? await Users.findOneBy({
- host: IsNull(),
- usernameLower: username.toLowerCase(),
- }) : await Users.findOneBy({
- host: toPuny(host!),
- usernameLower: username.toLowerCase(),
- });
-
- if (host == null && target == null) continue;
-
- if (target == null) {
- target = await resolveUser(username, host);
- }
-
- if (target == null) {
- throw `cannot resolve user: @${username}@${host}`;
- }
-
- // skip myself
- if (target.id === job.data.user.id) continue;
-
- logger.info(`Mute[${linenum}] ${target.id} ...`);
-
- await mute(user, target);
- } catch (e) {
- logger.warn(`Error in line:${linenum} ${e}`);
- }
- }
-
- logger.succ('Imported');
- done();
-}
-
-async function mute(user: User, target: User) {
- await Mutings.insert({
- id: genId(),
- createdAt: new Date(),
- muterId: user.id,
- muteeId: target.id,
- });
-}
diff --git a/packages/backend/src/queue/processors/db/import-user-lists.ts b/packages/backend/src/queue/processors/db/import-user-lists.ts
deleted file mode 100644
index 9919b7c53c..0000000000
--- a/packages/backend/src/queue/processors/db/import-user-lists.ts
+++ /dev/null
@@ -1,80 +0,0 @@
-import Bull from 'bull';
-
-import { queueLogger } from '../../logger.js';
-import * as Acct from '@/misc/acct.js';
-import { resolveUser } from '@/remote/resolve-user.js';
-import { pushUserToUserList } from '@/services/user-list/push.js';
-import { downloadTextFile } from '@/misc/download-text-file.js';
-import { isSelfHost, toPuny } from '@/misc/convert-host.js';
-import { DriveFiles, Users, UserLists, UserListJoinings } from '@/models/index.js';
-import { genId } from '@/misc/gen-id.js';
-import { DbUserImportJobData } from '@/queue/types.js';
-import { IsNull } from 'typeorm';
-
-const logger = queueLogger.createSubLogger('import-user-lists');
-
-export async function importUserLists(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> {
- logger.info(`Importing user lists of ${job.data.user.id} ...`);
-
- const user = await Users.findOneBy({ id: job.data.user.id });
- if (user == null) {
- done();
- return;
- }
-
- const file = await DriveFiles.findOneBy({
- id: job.data.fileId,
- });
- if (file == null) {
- done();
- return;
- }
-
- const csv = await downloadTextFile(file.url);
-
- let linenum = 0;
-
- for (const line of csv.trim().split('\n')) {
- linenum++;
-
- try {
- const listName = line.split(',')[0].trim();
- const { username, host } = Acct.parse(line.split(',')[1].trim());
-
- let list = await UserLists.findOneBy({
- userId: user.id,
- name: listName,
- });
-
- if (list == null) {
- list = await UserLists.insert({
- id: genId(),
- createdAt: new Date(),
- userId: user.id,
- name: listName,
- }).then(x => UserLists.findOneByOrFail(x.identifiers[0]));
- }
-
- let target = isSelfHost(host!) ? await Users.findOneBy({
- host: IsNull(),
- usernameLower: username.toLowerCase(),
- }) : await Users.findOneBy({
- host: toPuny(host!),
- usernameLower: username.toLowerCase(),
- });
-
- if (target == null) {
- target = await resolveUser(username, host);
- }
-
- if (await UserListJoinings.findOneBy({ userListId: list!.id, userId: target.id }) != null) continue;
-
- pushUserToUserList(target, list!);
- } catch (e) {
- logger.warn(`Error in line:${linenum} ${e}`);
- }
- }
-
- logger.succ('Imported');
- done();
-}
diff --git a/packages/backend/src/queue/processors/db/index.ts b/packages/backend/src/queue/processors/db/index.ts
deleted file mode 100644
index e91d569779..0000000000
--- a/packages/backend/src/queue/processors/db/index.ts
+++ /dev/null
@@ -1,37 +0,0 @@
-import Bull from 'bull';
-import { DbJobData } from '@/queue/types.js';
-import { deleteDriveFiles } from './delete-drive-files.js';
-import { exportCustomEmojis } from './export-custom-emojis.js';
-import { exportNotes } from './export-notes.js';
-import { exportFollowing } from './export-following.js';
-import { exportMute } from './export-mute.js';
-import { exportBlocking } from './export-blocking.js';
-import { exportUserLists } from './export-user-lists.js';
-import { importFollowing } from './import-following.js';
-import { importUserLists } from './import-user-lists.js';
-import { deleteAccount } from './delete-account.js';
-import { importMuting } from './import-muting.js';
-import { importBlocking } from './import-blocking.js';
-import { importCustomEmojis } from './import-custom-emojis.js';
-
-const jobs = {
- deleteDriveFiles,
- exportCustomEmojis,
- exportNotes,
- exportFollowing,
- exportMute,
- exportBlocking,
- exportUserLists,
- importFollowing,
- importMuting,
- importBlocking,
- importUserLists,
- importCustomEmojis,
- deleteAccount,
-} as Record<string, Bull.ProcessCallbackFunction<DbJobData> | Bull.ProcessPromiseFunction<DbJobData>>;
-
-export default function(dbQueue: Bull.Queue<DbJobData>) {
- for (const [k, v] of Object.entries(jobs)) {
- dbQueue.process(k, v);
- }
-}
diff --git a/packages/backend/src/queue/processors/deliver.ts b/packages/backend/src/queue/processors/deliver.ts
deleted file mode 100644
index 291c05766e..0000000000
--- a/packages/backend/src/queue/processors/deliver.ts
+++ /dev/null
@@ -1,98 +0,0 @@
-import { URL } from 'node:url';
-import Bull from 'bull';
-import request from '@/remote/activitypub/request.js';
-import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc.js';
-import Logger from '@/services/logger.js';
-import { Instances } from '@/models/index.js';
-import { apRequestChart, federationChart, instanceChart } from '@/services/chart/index.js';
-import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata.js';
-import { fetchMeta } from '@/misc/fetch-meta.js';
-import { toPuny } from '@/misc/convert-host.js';
-import { Cache } from '@/misc/cache.js';
-import { Instance } from '@/models/entities/instance.js';
-import { DeliverJobData } from '../types.js';
-import { StatusError } from '@/misc/fetch.js';
-
-const logger = new Logger('deliver');
-
-let latest: string | null = null;
-
-const suspendedHostsCache = new Cache<Instance[]>(1000 * 60 * 60);
-
-export default async (job: Bull.Job<DeliverJobData>) => {
- const { host } = new URL(job.data.to);
-
- // ブロックしてたら中断
- const meta = await fetchMeta();
- if (meta.blockedHosts.includes(toPuny(host))) {
- return 'skip (blocked)';
- }
-
- // isSuspendedなら中断
- let suspendedHosts = suspendedHostsCache.get(null);
- if (suspendedHosts == null) {
- suspendedHosts = await Instances.find({
- where: {
- isSuspended: true,
- },
- });
- suspendedHostsCache.set(null, suspendedHosts);
- }
- if (suspendedHosts.map(x => x.host).includes(toPuny(host))) {
- return 'skip (suspended)';
- }
-
- try {
- if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) {
- logger.debug(`delivering ${latest}`);
- }
-
- await request(job.data.user, job.data.to, job.data.content);
-
- // Update stats
- registerOrFetchInstanceDoc(host).then(i => {
- Instances.update(i.id, {
- latestRequestSentAt: new Date(),
- latestStatus: 200,
- lastCommunicatedAt: new Date(),
- isNotResponding: false,
- });
-
- fetchInstanceMetadata(i);
-
- instanceChart.requestSent(i.host, true);
- apRequestChart.deliverSucc();
- federationChart.deliverd(i.host, true);
- });
-
- return 'Success';
- } catch (res) {
- // Update stats
- registerOrFetchInstanceDoc(host).then(i => {
- Instances.update(i.id, {
- latestRequestSentAt: new Date(),
- latestStatus: res instanceof StatusError ? res.statusCode : null,
- isNotResponding: true,
- });
-
- instanceChart.requestSent(i.host, false);
- apRequestChart.deliverFail();
- federationChart.deliverd(i.host, false);
- });
-
- if (res instanceof StatusError) {
- // 4xx
- if (res.isClientError) {
- // HTTPステータスコード4xxはクライアントエラーであり、それはつまり
- // 何回再送しても成功することはないということなのでエラーにはしないでおく
- return `${res.statusCode} ${res.statusMessage}`;
- }
-
- // 5xx etc.
- throw `${res.statusCode} ${res.statusMessage}`;
- } else {
- // DNS error, socket error, timeout ...
- throw res;
- }
- }
-};
diff --git a/packages/backend/src/queue/processors/ended-poll-notification.ts b/packages/backend/src/queue/processors/ended-poll-notification.ts
deleted file mode 100644
index 6151c96ad6..0000000000
--- a/packages/backend/src/queue/processors/ended-poll-notification.ts
+++ /dev/null
@@ -1,33 +0,0 @@
-import Bull from 'bull';
-import { In } from 'typeorm';
-import { Notes, Polls, PollVotes } from '@/models/index.js';
-import { queueLogger } from '../logger.js';
-import { EndedPollNotificationJobData } from '@/queue/types.js';
-import { createNotification } from '@/services/create-notification.js';
-
-const logger = queueLogger.createSubLogger('ended-poll-notification');
-
-export async function endedPollNotification(job: Bull.Job<EndedPollNotificationJobData>, done: any): Promise<void> {
- const note = await Notes.findOneBy({ id: job.data.noteId });
- if (note == null || !note.hasPoll) {
- done();
- return;
- }
-
- const votes = await PollVotes.createQueryBuilder('vote')
- .select('vote.userId')
- .where('vote.noteId = :noteId', { noteId: note.id })
- .innerJoinAndSelect('vote.user', 'user')
- .andWhere('user.host IS NULL')
- .getMany();
-
- const userIds = [...new Set([note.userId, ...votes.map(v => v.userId)])];
-
- for (const userId of userIds) {
- createNotification(userId, 'pollEnded', {
- noteId: note.id,
- });
- }
-
- done();
-}
diff --git a/packages/backend/src/queue/processors/inbox.ts b/packages/backend/src/queue/processors/inbox.ts
deleted file mode 100644
index 198dde6050..0000000000
--- a/packages/backend/src/queue/processors/inbox.ts
+++ /dev/null
@@ -1,157 +0,0 @@
-import { URL } from 'node:url';
-import Bull from 'bull';
-import httpSignature from '@peertube/http-signature';
-import perform from '@/remote/activitypub/perform.js';
-import Logger from '@/services/logger.js';
-import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc.js';
-import { Instances } from '@/models/index.js';
-import { apRequestChart, federationChart, instanceChart } from '@/services/chart/index.js';
-import { fetchMeta } from '@/misc/fetch-meta.js';
-import { toPuny, extractDbHost } from '@/misc/convert-host.js';
-import { getApId } from '@/remote/activitypub/type.js';
-import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata.js';
-import { InboxJobData } from '../types.js';
-import DbResolver from '@/remote/activitypub/db-resolver.js';
-import { resolvePerson } from '@/remote/activitypub/models/person.js';
-import { LdSignature } from '@/remote/activitypub/misc/ld-signature.js';
-import { StatusError } from '@/misc/fetch.js';
-import { CacheableRemoteUser } from '@/models/entities/user.js';
-import { UserPublickey } from '@/models/entities/user-publickey.js';
-
-const logger = new Logger('inbox');
-
-// ユーザーのinboxにアクティビティが届いた時の処理
-export default async (job: Bull.Job<InboxJobData>): Promise<string> => {
- const signature = job.data.signature; // HTTP-signature
- const activity = job.data.activity;
-
- //#region Log
- const info = Object.assign({}, activity) as any;
- delete info['@context'];
- logger.debug(JSON.stringify(info, null, 2));
- //#endregion
-
- const host = toPuny(new URL(signature.keyId).hostname);
-
- // ブロックしてたら中断
- const meta = await fetchMeta();
- if (meta.blockedHosts.includes(host)) {
- return `Blocked request: ${host}`;
- }
-
- const keyIdLower = signature.keyId.toLowerCase();
- if (keyIdLower.startsWith('acct:')) {
- return `Old keyId is no longer supported. ${keyIdLower}`;
- }
-
- const dbResolver = new DbResolver();
-
- // HTTP-Signature keyIdを元にDBから取得
- let authUser: {
- user: CacheableRemoteUser;
- key: UserPublickey | null;
- } | null = await dbResolver.getAuthUserFromKeyId(signature.keyId);
-
- // keyIdでわからなければ、activity.actorを元にDBから取得 || activity.actorを元にリモートから取得
- if (authUser == null) {
- try {
- authUser = await dbResolver.getAuthUserFromApId(getApId(activity.actor));
- } catch (e) {
- // 対象が4xxならスキップ
- if (e instanceof StatusError) {
- if (e.isClientError) {
- return `skip: Ignored deleted actors on both ends ${activity.actor} - ${e.statusCode}`;
- }
- throw `Error in actor ${activity.actor} - ${e.statusCode || e}`;
- }
- }
- }
-
- // それでもわからなければ終了
- if (authUser == null) {
- return `skip: failed to resolve user`;
- }
-
- // publicKey がなくても終了
- if (authUser.key == null) {
- return `skip: failed to resolve user publicKey`;
- }
-
- // HTTP-Signatureの検証
- const httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem);
-
- // また、signatureのsignerは、activity.actorと一致する必要がある
- if (!httpSignatureValidated || authUser.user.uri !== activity.actor) {
- // 一致しなくても、でもLD-Signatureがありそうならそっちも見る
- if (activity.signature) {
- if (activity.signature.type !== 'RsaSignature2017') {
- return `skip: unsupported LD-signature type ${activity.signature.type}`;
- }
-
- // activity.signature.creator: https://example.oom/users/user#main-key
- // みたいになっててUserを引っ張れば公開キーも入ることを期待する
- if (activity.signature.creator) {
- const candicate = activity.signature.creator.replace(/#.*/, '');
- await resolvePerson(candicate).catch(() => null);
- }
-
- // keyIdからLD-Signatureのユーザーを取得
- authUser = await dbResolver.getAuthUserFromKeyId(activity.signature.creator);
- if (authUser == null) {
- return `skip: LD-Signatureのユーザーが取得できませんでした`;
- }
-
- if (authUser.key == null) {
- return `skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした`;
- }
-
- // LD-Signature検証
- const ldSignature = new LdSignature();
- const verified = await ldSignature.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false);
- if (!verified) {
- return `skip: LD-Signatureの検証に失敗しました`;
- }
-
- // もう一度actorチェック
- if (authUser.user.uri !== activity.actor) {
- return `skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`;
- }
-
- // ブロックしてたら中断
- const ldHost = extractDbHost(authUser.user.uri);
- if (meta.blockedHosts.includes(ldHost)) {
- return `Blocked request: ${ldHost}`;
- }
- } else {
- return `skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`;
- }
- }
-
- // activity.idがあればホストが署名者のホストであることを確認する
- if (typeof activity.id === 'string') {
- const signerHost = extractDbHost(authUser.user.uri!);
- const activityIdHost = extractDbHost(activity.id);
- if (signerHost !== activityIdHost) {
- return `skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`;
- }
- }
-
- // Update stats
- registerOrFetchInstanceDoc(authUser.user.host).then(i => {
- Instances.update(i.id, {
- latestRequestReceivedAt: new Date(),
- lastCommunicatedAt: new Date(),
- isNotResponding: false,
- });
-
- fetchInstanceMetadata(i);
-
- instanceChart.requestReceived(i.host);
- apRequestChart.inbox();
- federationChart.inbox(i.host);
- });
-
- // アクティビティを処理
- await perform(authUser.user, activity);
- return `ok`;
-};
diff --git a/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts b/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts
deleted file mode 100644
index 77da162f6e..0000000000
--- a/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts
+++ /dev/null
@@ -1,50 +0,0 @@
-import Bull from 'bull';
-
-import { queueLogger } from '../../logger.js';
-import { deleteFileSync } from '@/services/drive/delete-file.js';
-import { DriveFiles } from '@/models/index.js';
-import { MoreThan, Not, IsNull } from 'typeorm';
-
-const logger = queueLogger.createSubLogger('clean-remote-files');
-
-export default async function cleanRemoteFiles(job: Bull.Job<Record<string, unknown>>, done: any): Promise<void> {
- logger.info(`Deleting cached remote files...`);
-
- let deletedCount = 0;
- let cursor: any = null;
-
- while (true) {
- const files = await DriveFiles.find({
- where: {
- userHost: Not(IsNull()),
- isLink: false,
- ...(cursor ? { id: MoreThan(cursor) } : {}),
- },
- take: 8,
- order: {
- id: 1,
- },
- });
-
- if (files.length === 0) {
- job.progress(100);
- break;
- }
-
- cursor = files[files.length - 1].id;
-
- await Promise.all(files.map(file => deleteFileSync(file, true)));
-
- deletedCount += 8;
-
- const total = await DriveFiles.countBy({
- userHost: Not(IsNull()),
- isLink: false,
- });
-
- job.progress(deletedCount / total);
- }
-
- logger.succ(`All cahced remote files has been deleted.`);
- done();
-}
diff --git a/packages/backend/src/queue/processors/object-storage/delete-file.ts b/packages/backend/src/queue/processors/object-storage/delete-file.ts
deleted file mode 100644
index c271e3ddd4..0000000000
--- a/packages/backend/src/queue/processors/object-storage/delete-file.ts
+++ /dev/null
@@ -1,11 +0,0 @@
-import { ObjectStorageFileJobData } from '@/queue/types.js';
-import Bull from 'bull';
-import { deleteObjectStorageFile } from '@/services/drive/delete-file.js';
-
-export default async (job: Bull.Job<ObjectStorageFileJobData>) => {
- const key: string = job.data.key;
-
- await deleteObjectStorageFile(key);
-
- return 'Success';
-};
diff --git a/packages/backend/src/queue/processors/object-storage/index.ts b/packages/backend/src/queue/processors/object-storage/index.ts
deleted file mode 100644
index ae6c481fea..0000000000
--- a/packages/backend/src/queue/processors/object-storage/index.ts
+++ /dev/null
@@ -1,15 +0,0 @@
-import Bull from 'bull';
-import { ObjectStorageJobData } from '@/queue/types.js';
-import deleteFile from './delete-file.js';
-import cleanRemoteFiles from './clean-remote-files.js';
-
-const jobs = {
- deleteFile,
- cleanRemoteFiles,
-} as Record<string, Bull.ProcessCallbackFunction<ObjectStorageJobData> | Bull.ProcessPromiseFunction<ObjectStorageJobData>>;
-
-export default function(q: Bull.Queue) {
- for (const [k, v] of Object.entries(jobs)) {
- q.process(k, 16, v);
- }
-}
diff --git a/packages/backend/src/queue/processors/system/check-expired-mutings.ts b/packages/backend/src/queue/processors/system/check-expired-mutings.ts
deleted file mode 100644
index 621269e7e1..0000000000
--- a/packages/backend/src/queue/processors/system/check-expired-mutings.ts
+++ /dev/null
@@ -1,30 +0,0 @@
-import Bull from 'bull';
-import { In } from 'typeorm';
-import { Mutings } from '@/models/index.js';
-import { queueLogger } from '../../logger.js';
-import { publishUserEvent } from '@/services/stream.js';
-
-const logger = queueLogger.createSubLogger('check-expired-mutings');
-
-export async function checkExpiredMutings(job: Bull.Job<Record<string, unknown>>, done: any): Promise<void> {
- logger.info(`Checking expired mutings...`);
-
- const expired = await Mutings.createQueryBuilder('muting')
- .where('muting.expiresAt IS NOT NULL')
- .andWhere('muting.expiresAt < :now', { now: new Date() })
- .innerJoinAndSelect('muting.mutee', 'mutee')
- .getMany();
-
- if (expired.length > 0) {
- await Mutings.delete({
- id: In(expired.map(m => m.id)),
- });
-
- for (const m of expired) {
- publishUserEvent(m.muterId, 'unmute', m.mutee!);
- }
- }
-
- logger.succ(`All expired mutings checked.`);
- done();
-}
diff --git a/packages/backend/src/queue/processors/system/clean-charts.ts b/packages/backend/src/queue/processors/system/clean-charts.ts
deleted file mode 100644
index c9169d5acf..0000000000
--- a/packages/backend/src/queue/processors/system/clean-charts.ts
+++ /dev/null
@@ -1,28 +0,0 @@
-import Bull from 'bull';
-
-import { queueLogger } from '../../logger.js';
-import { activeUsersChart, driveChart, federationChart, hashtagChart, instanceChart, notesChart, perUserDriveChart, perUserFollowingChart, perUserNotesChart, perUserReactionsChart, usersChart, apRequestChart } from '@/services/chart/index.js';
-
-const logger = queueLogger.createSubLogger('clean-charts');
-
-export async function cleanCharts(job: Bull.Job<Record<string, unknown>>, done: any): Promise<void> {
- logger.info(`Clean charts...`);
-
- await Promise.all([
- federationChart.clean(),
- notesChart.clean(),
- usersChart.clean(),
- activeUsersChart.clean(),
- instanceChart.clean(),
- perUserNotesChart.clean(),
- driveChart.clean(),
- perUserReactionsChart.clean(),
- hashtagChart.clean(),
- perUserFollowingChart.clean(),
- perUserDriveChart.clean(),
- apRequestChart.clean(),
- ]);
-
- logger.succ(`All charts successfully cleaned.`);
- done();
-}
diff --git a/packages/backend/src/queue/processors/system/clean.ts b/packages/backend/src/queue/processors/system/clean.ts
deleted file mode 100644
index c4f978d7c9..0000000000
--- a/packages/backend/src/queue/processors/system/clean.ts
+++ /dev/null
@@ -1,18 +0,0 @@
-import Bull from 'bull';
-import { LessThan } from 'typeorm';
-import { UserIps } from '@/models/index.js';
-
-import { queueLogger } from '../../logger.js';
-
-const logger = queueLogger.createSubLogger('clean');
-
-export async function clean(job: Bull.Job<Record<string, unknown>>, done: any): Promise<void> {
- logger.info('Cleaning...');
-
- UserIps.delete({
- createdAt: LessThan(new Date(Date.now() - (1000 * 60 * 60 * 24 * 90))),
- });
-
- logger.succ('Cleaned.');
- done();
-}
diff --git a/packages/backend/src/queue/processors/system/index.ts b/packages/backend/src/queue/processors/system/index.ts
deleted file mode 100644
index 9527d40b0f..0000000000
--- a/packages/backend/src/queue/processors/system/index.ts
+++ /dev/null
@@ -1,20 +0,0 @@
-import Bull from 'bull';
-import { tickCharts } from './tick-charts.js';
-import { resyncCharts } from './resync-charts.js';
-import { cleanCharts } from './clean-charts.js';
-import { checkExpiredMutings } from './check-expired-mutings.js';
-import { clean } from './clean.js';
-
-const jobs = {
- tickCharts,
- resyncCharts,
- cleanCharts,
- checkExpiredMutings,
- clean,
-} as Record<string, Bull.ProcessCallbackFunction<Record<string, unknown>> | Bull.ProcessPromiseFunction<Record<string, unknown>>>;
-
-export default function(dbQueue: Bull.Queue<Record<string, unknown>>) {
- for (const [k, v] of Object.entries(jobs)) {
- dbQueue.process(k, v);
- }
-}
diff --git a/packages/backend/src/queue/processors/system/resync-charts.ts b/packages/backend/src/queue/processors/system/resync-charts.ts
deleted file mode 100644
index 20012513af..0000000000
--- a/packages/backend/src/queue/processors/system/resync-charts.ts
+++ /dev/null
@@ -1,21 +0,0 @@
-import Bull from 'bull';
-
-import { queueLogger } from '../../logger.js';
-import { driveChart, notesChart, usersChart } from '@/services/chart/index.js';
-
-const logger = queueLogger.createSubLogger('resync-charts');
-
-export async function resyncCharts(job: Bull.Job<Record<string, unknown>>, done: any): Promise<void> {
- logger.info(`Resync charts...`);
-
- // TODO: ユーザーごとのチャートも更新する
- // TODO: インスタンスごとのチャートも更新する
- await Promise.all([
- driveChart.resync(),
- notesChart.resync(),
- usersChart.resync(),
- ]);
-
- logger.succ(`All charts successfully resynced.`);
- done();
-}
diff --git a/packages/backend/src/queue/processors/system/tick-charts.ts b/packages/backend/src/queue/processors/system/tick-charts.ts
deleted file mode 100644
index 13403f8f73..0000000000
--- a/packages/backend/src/queue/processors/system/tick-charts.ts
+++ /dev/null
@@ -1,28 +0,0 @@
-import Bull from 'bull';
-
-import { queueLogger } from '../../logger.js';
-import { activeUsersChart, driveChart, federationChart, hashtagChart, instanceChart, notesChart, perUserDriveChart, perUserFollowingChart, perUserNotesChart, perUserReactionsChart, usersChart, apRequestChart } from '@/services/chart/index.js';
-
-const logger = queueLogger.createSubLogger('tick-charts');
-
-export async function tickCharts(job: Bull.Job<Record<string, unknown>>, done: any): Promise<void> {
- logger.info(`Tick charts...`);
-
- await Promise.all([
- federationChart.tick(false),
- notesChart.tick(false),
- usersChart.tick(false),
- activeUsersChart.tick(false),
- instanceChart.tick(false),
- perUserNotesChart.tick(false),
- driveChart.tick(false),
- perUserReactionsChart.tick(false),
- hashtagChart.tick(false),
- perUserFollowingChart.tick(false),
- perUserDriveChart.tick(false),
- apRequestChart.tick(false),
- ]);
-
- logger.succ(`All charts successfully ticked.`);
- done();
-}
diff --git a/packages/backend/src/queue/processors/webhook-deliver.ts b/packages/backend/src/queue/processors/webhook-deliver.ts
deleted file mode 100644
index d49206f68f..0000000000
--- a/packages/backend/src/queue/processors/webhook-deliver.ts
+++ /dev/null
@@ -1,59 +0,0 @@
-import { URL } from 'node:url';
-import Bull from 'bull';
-import Logger from '@/services/logger.js';
-import { WebhookDeliverJobData } from '../types.js';
-import { getResponse, StatusError } from '@/misc/fetch.js';
-import { Webhooks } from '@/models/index.js';
-import config from '@/config/index.js';
-
-const logger = new Logger('webhook');
-
-export default async (job: Bull.Job<WebhookDeliverJobData>) => {
- try {
- logger.debug(`delivering ${job.data.webhookId}`);
-
- const res = await getResponse({
- url: job.data.to,
- method: 'POST',
- headers: {
- 'User-Agent': 'Misskey-Hooks',
- 'X-Misskey-Host': config.host,
- 'X-Misskey-Hook-Id': job.data.webhookId,
- 'X-Misskey-Hook-Secret': job.data.secret,
- },
- body: JSON.stringify({
- hookId: job.data.webhookId,
- userId: job.data.userId,
- eventId: job.data.eventId,
- createdAt: job.data.createdAt,
- type: job.data.type,
- body: job.data.content,
- }),
- });
-
- Webhooks.update({ id: job.data.webhookId }, {
- latestSentAt: new Date(),
- latestStatus: res.status,
- });
-
- return 'Success';
- } catch (res) {
- Webhooks.update({ id: job.data.webhookId }, {
- latestSentAt: new Date(),
- latestStatus: res instanceof StatusError ? res.statusCode : 1,
- });
-
- if (res instanceof StatusError) {
- // 4xx
- if (res.isClientError) {
- return `${res.statusCode} ${res.statusMessage}`;
- }
-
- // 5xx etc.
- throw `${res.statusCode} ${res.statusMessage}`;
- } else {
- // DNS error, socket error, timeout ...
- throw res;
- }
- }
-};
diff --git a/packages/backend/src/queue/queues.ts b/packages/backend/src/queue/queues.ts
deleted file mode 100644
index f3a267790c..0000000000
--- a/packages/backend/src/queue/queues.ts
+++ /dev/null
@@ -1,21 +0,0 @@
-import config from '@/config/index.js';
-import { initialize as initializeQueue } from './initialize.js';
-import { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData, EndedPollNotificationJobData, WebhookDeliverJobData } from './types.js';
-
-export const systemQueue = initializeQueue<Record<string, unknown>>('system');
-export const endedPollNotificationQueue = initializeQueue<EndedPollNotificationJobData>('endedPollNotification');
-export const deliverQueue = initializeQueue<DeliverJobData>('deliver', config.deliverJobPerSec || 128);
-export const inboxQueue = initializeQueue<InboxJobData>('inbox', config.inboxJobPerSec || 16);
-export const dbQueue = initializeQueue<DbJobData>('db');
-export const objectStorageQueue = initializeQueue<ObjectStorageJobData>('objectStorage');
-export const webhookDeliverQueue = initializeQueue<WebhookDeliverJobData>('webhookDeliver', 64);
-
-export const queues = [
- systemQueue,
- endedPollNotificationQueue,
- deliverQueue,
- inboxQueue,
- dbQueue,
- objectStorageQueue,
- webhookDeliverQueue,
-];
diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts
index 5ea4725561..18ec997a1b 100644
--- a/packages/backend/src/queue/types.ts
+++ b/packages/backend/src/queue/types.ts
@@ -1,9 +1,9 @@
-import { DriveFile } from '@/models/entities/drive-file.js';
-import { Note } from '@/models/entities/note';
-import { User } from '@/models/entities/user.js';
-import { Webhook } from '@/models/entities/webhook';
-import { IActivity } from '@/remote/activitypub/type.js';
-import httpSignature from '@peertube/http-signature';
+import type { DriveFile } from '@/models/entities/DriveFile.js';
+import type { Note } from '@/models/entities/Note.js';
+import type { User } from '@/models/entities/User.js';
+import type { Webhook } from '@/models/entities/Webhook.js';
+import type { IActivity } from '@/core/remote/activitypub/type.js';
+import type httpSignature from '@peertube/http-signature';
export type DeliverJobData = {
/** Actor */