summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/queue')
-rw-r--r--packages/backend/src/queue/QueueProcessorModule.ts2
-rw-r--r--packages/backend/src/queue/QueueProcessorService.ts87
-rw-r--r--packages/backend/src/queue/processors/BakeBufferedReactionsProcessorService.ts42
-rw-r--r--packages/backend/src/queue/processors/DeliverProcessorService.ts15
-rw-r--r--packages/backend/src/queue/processors/ExportAntennasProcessorService.ts7
-rw-r--r--packages/backend/src/queue/processors/ExportBlockingProcessorService.ts7
-rw-r--r--packages/backend/src/queue/processors/ExportClipsProcessorService.ts7
-rw-r--r--packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts8
-rw-r--r--packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts7
-rw-r--r--packages/backend/src/queue/processors/ExportFollowingProcessorService.ts7
-rw-r--r--packages/backend/src/queue/processors/ExportMutingProcessorService.ts7
-rw-r--r--packages/backend/src/queue/processors/ExportNotesProcessorService.ts7
-rw-r--r--packages/backend/src/queue/processors/ExportUserListsProcessorService.ts7
-rw-r--r--packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts41
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts86
15 files changed, 250 insertions, 87 deletions
diff --git a/packages/backend/src/queue/QueueProcessorModule.ts b/packages/backend/src/queue/QueueProcessorModule.ts
index 7daca687a1..7c6675b15d 100644
--- a/packages/backend/src/queue/QueueProcessorModule.ts
+++ b/packages/backend/src/queue/QueueProcessorModule.ts
@@ -14,6 +14,7 @@ import { InboxProcessorService } from './processors/InboxProcessorService.js';
import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js';
import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js';
import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js';
+import { BakeBufferedReactionsProcessorService } from './processors/BakeBufferedReactionsProcessorService.js';
import { CleanChartsProcessorService } from './processors/CleanChartsProcessorService.js';
import { CleanProcessorService } from './processors/CleanProcessorService.js';
import { CleanRemoteFilesProcessorService } from './processors/CleanRemoteFilesProcessorService.js';
@@ -53,6 +54,7 @@ import { RelationshipProcessorService } from './processors/RelationshipProcessor
ResyncChartsProcessorService,
CleanChartsProcessorService,
CheckExpiredMutingsProcessorService,
+ BakeBufferedReactionsProcessorService,
CleanProcessorService,
DeleteDriveFilesProcessorService,
ExportAccountDataProcessorService,
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts
index 7a6169bf9c..eaeb6d58df 100644
--- a/packages/backend/src/queue/QueueProcessorService.ts
+++ b/packages/backend/src/queue/QueueProcessorService.ts
@@ -40,6 +40,7 @@ import { TickChartsProcessorService } from './processors/TickChartsProcessorServ
import { ResyncChartsProcessorService } from './processors/ResyncChartsProcessorService.js';
import { CleanChartsProcessorService } from './processors/CleanChartsProcessorService.js';
import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js';
+import { BakeBufferedReactionsProcessorService } from './processors/BakeBufferedReactionsProcessorService.js';
import { CleanProcessorService } from './processors/CleanProcessorService.js';
import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
import { QueueLoggerService } from './QueueLoggerService.js';
@@ -122,24 +123,35 @@ export class QueueProcessorService implements OnApplicationShutdown {
private cleanChartsProcessorService: CleanChartsProcessorService,
private aggregateRetentionProcessorService: AggregateRetentionProcessorService,
private checkExpiredMutingsProcessorService: CheckExpiredMutingsProcessorService,
+ private bakeBufferedReactionsProcessorService: BakeBufferedReactionsProcessorService,
private cleanProcessorService: CleanProcessorService,
) {
this.logger = this.queueLoggerService.logger;
- function renderError(e: Error): any {
- if (e) { // 何故かeがundefinedで来ることがある
- return {
- stack: e.stack,
- message: e.message,
- name: e.name,
- };
- } else {
- return {
- stack: '?',
- message: '?',
- name: '?',
- };
+ function renderError(e?: Error) {
+ // 何故かeがundefinedで来ることがある
+ if (!e) return '?';
+
+ if (e instanceof Bull.UnrecoverableError || e.name === 'AbortError') {
+ return `${e.name}: ${e.message}`;
}
+
+ return {
+ stack: e.stack,
+ message: e.message,
+ name: e.name,
+ };
+ }
+
+ function renderJob(job?: Bull.Job) {
+ if (!job) return '?';
+
+ return {
+ name: job.name || undefined,
+ info: getJobInfo(job),
+ failedReason: job.failedReason || undefined,
+ data: job.data,
+ };
}
//#region system
@@ -151,6 +163,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
case 'cleanCharts': return this.cleanChartsProcessorService.process();
case 'aggregateRetention': return this.aggregateRetentionProcessorService.process();
case 'checkExpiredMutings': return this.checkExpiredMutingsProcessorService.process();
+ case 'bakeBufferedReactions': return this.bakeBufferedReactionsProcessorService.process();
case 'clean': return this.cleanProcessorService.process();
default: throw new Error(`unrecognized job type ${job.name} for system`);
}
@@ -173,15 +186,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err: Error) => {
- logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) });
+ logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
if (config.sentryForBackend) {
- Sentry.captureMessage(`Queue: System: ${job?.name ?? '?'}: ${err.message}`, {
+ Sentry.captureMessage(`Queue: System: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
+ .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -238,15 +251,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) });
+ logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
if (config.sentryForBackend) {
- Sentry.captureMessage(`Queue: DB: ${job?.name ?? '?'}: ${err.message}`, {
+ Sentry.captureMessage(`Queue: DB: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
+ .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -278,15 +291,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
+ logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
if (config.sentryForBackend) {
- Sentry.captureMessage(`Queue: Deliver: ${err.message}`, {
+ Sentry.captureMessage(`Queue: Deliver: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
+ .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -318,15 +331,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.stack}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, e: renderError(err) });
+ logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job: renderJob(job), e: renderError(err) });
if (config.sentryForBackend) {
- Sentry.captureMessage(`Queue: Inbox: ${err.message}`, {
+ Sentry.captureMessage(`Queue: Inbox: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
+ .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -358,15 +371,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
+ logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
if (config.sentryForBackend) {
- Sentry.captureMessage(`Queue: UserWebhookDeliver: ${err.message}`, {
+ Sentry.captureMessage(`Queue: UserWebhookDeliver: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
+ .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -398,15 +411,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
+ logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
if (config.sentryForBackend) {
- Sentry.captureMessage(`Queue: SystemWebhookDeliver: ${err.message}`, {
+ Sentry.captureMessage(`Queue: SystemWebhookDeliver: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
+ .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -445,15 +458,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) });
+ logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
if (config.sentryForBackend) {
- Sentry.captureMessage(`Queue: Relationship: ${job?.name ?? '?'}: ${err.message}`, {
+ Sentry.captureMessage(`Queue: Relationship: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
+ .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -486,15 +499,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
- logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) });
+ logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
if (config.sentryForBackend) {
- Sentry.captureMessage(`Queue: ObjectStorage: ${job?.name ?? '?'}: ${err.message}`, {
+ Sentry.captureMessage(`Queue: ObjectStorage: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
- .on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
+ .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
diff --git a/packages/backend/src/queue/processors/BakeBufferedReactionsProcessorService.ts b/packages/backend/src/queue/processors/BakeBufferedReactionsProcessorService.ts
new file mode 100644
index 0000000000..d49c99f694
--- /dev/null
+++ b/packages/backend/src/queue/processors/BakeBufferedReactionsProcessorService.ts
@@ -0,0 +1,42 @@
+/*
+ * SPDX-FileCopyrightText: syuilo and misskey-project
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+import { Inject, Injectable } from '@nestjs/common';
+import type Logger from '@/logger.js';
+import { bindThis } from '@/decorators.js';
+import { ReactionsBufferingService } from '@/core/ReactionsBufferingService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type * as Bull from 'bullmq';
+import { MiMeta } from '@/models/_.js';
+import { DI } from '@/di-symbols.js';
+
+@Injectable()
+export class BakeBufferedReactionsProcessorService {
+ private logger: Logger;
+
+ constructor(
+ @Inject(DI.meta)
+ private meta: MiMeta,
+
+ private reactionsBufferingService: ReactionsBufferingService,
+ private queueLoggerService: QueueLoggerService,
+ ) {
+ this.logger = this.queueLoggerService.logger.createSubLogger('bake-buffered-reactions');
+ }
+
+ @bindThis
+ public async process(): Promise<void> {
+ if (!this.meta.enableReactionsBuffering) {
+ this.logger.info('Reactions buffering is disabled. Skipping...');
+ return;
+ }
+
+ this.logger.info('Baking buffered reactions...');
+
+ await this.reactionsBufferingService.bake();
+
+ this.logger.succ('All buffered reactions baked.');
+ }
+}
diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts
index 4076e9da90..9590a4fe71 100644
--- a/packages/backend/src/queue/processors/DeliverProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts
@@ -7,9 +7,8 @@ import { Inject, Injectable } from '@nestjs/common';
import * as Bull from 'bullmq';
import { Not } from 'typeorm';
import { DI } from '@/di-symbols.js';
-import type { InstancesRepository } from '@/models/_.js';
+import type { InstancesRepository, MiMeta } from '@/models/_.js';
import type Logger from '@/logger.js';
-import { MetaService } from '@/core/MetaService.js';
import { ApRequestService } from '@/core/activitypub/ApRequestService.js';
import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js';
@@ -31,10 +30,12 @@ export class DeliverProcessorService {
private latest: string | null;
constructor(
+ @Inject(DI.meta)
+ private meta: MiMeta,
+
@Inject(DI.instancesRepository)
private instancesRepository: InstancesRepository,
- private metaService: MetaService,
private utilityService: UtilityService,
private federatedInstanceService: FederatedInstanceService,
private fetchInstanceMetadataService: FetchInstanceMetadataService,
@@ -52,9 +53,7 @@ export class DeliverProcessorService {
public async process(job: Bull.Job<DeliverJobData>): Promise<string> {
const { host } = new URL(job.data.to);
- // ブロックしてたら中断
- const meta = await this.metaService.fetch();
- if (this.utilityService.isBlockedHost(meta.blockedHosts, this.utilityService.toPuny(host))) {
+ if (!this.utilityService.isFederationAllowedUri(job.data.to)) {
return 'skip (blocked)';
}
@@ -88,7 +87,7 @@ export class DeliverProcessorService {
this.apRequestChart.deliverSucc();
this.federationChart.deliverd(i.host, true);
- if (meta.enableChartsForFederatedInstances) {
+ if (this.meta.enableChartsForFederatedInstances) {
this.instanceChart.requestSent(i.host, true);
}
});
@@ -120,7 +119,7 @@ export class DeliverProcessorService {
this.apRequestChart.deliverFail();
this.federationChart.deliverd(i.host, false);
- if (meta.enableChartsForFederatedInstances) {
+ if (this.meta.enableChartsForFederatedInstances) {
this.instanceChart.requestSent(i.host, false);
}
});
diff --git a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
index 88c4ea29c0..b3111865ad 100644
--- a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
@@ -14,6 +14,7 @@ import { DriveService } from '@/core/DriveService.js';
import { bindThis } from '@/decorators.js';
import { createTemp } from '@/misc/create-temp.js';
import { UtilityService } from '@/core/UtilityService.js';
+import { NotificationService } from '@/core/NotificationService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type { DBExportAntennasData } from '../types.js';
import type * as Bull from 'bullmq';
@@ -35,6 +36,7 @@ export class ExportAntennasProcessorService {
private driveService: DriveService,
private utilityService: UtilityService,
private queueLoggerService: QueueLoggerService,
+ private notificationService: NotificationService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('export-antennas');
}
@@ -95,6 +97,11 @@ export class ExportAntennasProcessorService {
const fileName = 'antennas-' + DateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json';
const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' });
this.logger.succ('Exported to: ' + driveFile.id);
+
+ this.notificationService.createNotification(user.id, 'exportCompleted', {
+ exportedEntity: 'antenna',
+ fileId: driveFile.id,
+ });
} finally {
cleanup();
}
diff --git a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts
index 6ec3c18786..ecc439db69 100644
--- a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts
@@ -13,6 +13,7 @@ import type Logger from '@/logger.js';
import { DriveService } from '@/core/DriveService.js';
import { createTemp } from '@/misc/create-temp.js';
import { UtilityService } from '@/core/UtilityService.js';
+import { NotificationService } from '@/core/NotificationService.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
@@ -30,6 +31,7 @@ export class ExportBlockingProcessorService {
private blockingsRepository: BlockingsRepository,
private utilityService: UtilityService,
+ private notificationService: NotificationService,
private driveService: DriveService,
private queueLoggerService: QueueLoggerService,
) {
@@ -109,6 +111,11 @@ export class ExportBlockingProcessorService {
const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' });
this.logger.succ(`Exported to: ${driveFile.id}`);
+
+ this.notificationService.createNotification(user.id, 'exportCompleted', {
+ exportedEntity: 'blocking',
+ fileId: driveFile.id,
+ });
} finally {
cleanup();
}
diff --git a/packages/backend/src/queue/processors/ExportClipsProcessorService.ts b/packages/backend/src/queue/processors/ExportClipsProcessorService.ts
index 01eab26e96..583ddbb745 100644
--- a/packages/backend/src/queue/processors/ExportClipsProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportClipsProcessorService.ts
@@ -19,6 +19,7 @@ import { bindThis } from '@/decorators.js';
import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js';
import { Packed } from '@/misc/json-schema.js';
import { IdService } from '@/core/IdService.js';
+import { NotificationService } from '@/core/NotificationService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
import type { DbJobDataWithUser } from '../types.js';
@@ -43,6 +44,7 @@ export class ExportClipsProcessorService {
private driveService: DriveService,
private queueLoggerService: QueueLoggerService,
private idService: IdService,
+ private notificationService: NotificationService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('export-clips');
}
@@ -79,6 +81,11 @@ export class ExportClipsProcessorService {
const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' });
this.logger.succ(`Exported to: ${driveFile.id}`);
+
+ this.notificationService.createNotification(user.id, 'exportCompleted', {
+ exportedEntity: 'clip',
+ fileId: driveFile.id,
+ });
} finally {
cleanup();
}
diff --git a/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts
index 45087927a5..14d32e78b3 100644
--- a/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts
@@ -16,6 +16,7 @@ import type Logger from '@/logger.js';
import { DriveService } from '@/core/DriveService.js';
import { createTemp, createTempDir } from '@/misc/create-temp.js';
import { DownloadService } from '@/core/DownloadService.js';
+import { NotificationService } from '@/core/NotificationService.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
@@ -37,6 +38,7 @@ export class ExportCustomEmojisProcessorService {
private driveService: DriveService,
private downloadService: DownloadService,
private queueLoggerService: QueueLoggerService,
+ private notificationService: NotificationService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('export-custom-emojis');
}
@@ -134,6 +136,12 @@ export class ExportCustomEmojisProcessorService {
const driveFile = await this.driveService.addFile({ user, path: archivePath, name: fileName, force: true });
this.logger.succ(`Exported to: ${driveFile.id}`);
+
+ this.notificationService.createNotification(user.id, 'exportCompleted', {
+ exportedEntity: 'customEmoji',
+ fileId: driveFile.id,
+ });
+
cleanup();
archiveCleanup();
resolve();
diff --git a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts
index 7bb626dd31..b81feece01 100644
--- a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts
@@ -16,6 +16,7 @@ import type { MiPoll } from '@/models/Poll.js';
import type { MiNote } from '@/models/Note.js';
import { bindThis } from '@/decorators.js';
import { IdService } from '@/core/IdService.js';
+import { NotificationService } from '@/core/NotificationService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
import type { DbJobDataWithUser } from '../types.js';
@@ -37,6 +38,7 @@ export class ExportFavoritesProcessorService {
private driveService: DriveService,
private queueLoggerService: QueueLoggerService,
private idService: IdService,
+ private notificationService: NotificationService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('export-favorites');
}
@@ -123,6 +125,11 @@ export class ExportFavoritesProcessorService {
const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' });
this.logger.succ(`Exported to: ${driveFile.id}`);
+
+ this.notificationService.createNotification(user.id, 'exportCompleted', {
+ exportedEntity: 'favorite',
+ fileId: driveFile.id,
+ });
} finally {
cleanup();
}
diff --git a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts
index 1cc80e66d7..903f962515 100644
--- a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts
@@ -14,6 +14,7 @@ import { DriveService } from '@/core/DriveService.js';
import { createTemp } from '@/misc/create-temp.js';
import type { MiFollowing } from '@/models/Following.js';
import { UtilityService } from '@/core/UtilityService.js';
+import { NotificationService } from '@/core/NotificationService.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
@@ -36,6 +37,7 @@ export class ExportFollowingProcessorService {
private utilityService: UtilityService,
private driveService: DriveService,
private queueLoggerService: QueueLoggerService,
+ private notificationService: NotificationService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('export-following');
}
@@ -113,6 +115,11 @@ export class ExportFollowingProcessorService {
const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' });
this.logger.succ(`Exported to: ${driveFile.id}`);
+
+ this.notificationService.createNotification(user.id, 'exportCompleted', {
+ exportedEntity: 'following',
+ fileId: driveFile.id,
+ });
} finally {
cleanup();
}
diff --git a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts
index 243b74f2c2..f9867ade29 100644
--- a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts
@@ -13,6 +13,7 @@ import type Logger from '@/logger.js';
import { DriveService } from '@/core/DriveService.js';
import { createTemp } from '@/misc/create-temp.js';
import { UtilityService } from '@/core/UtilityService.js';
+import { NotificationService } from '@/core/NotificationService.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
@@ -32,6 +33,7 @@ export class ExportMutingProcessorService {
private utilityService: UtilityService,
private driveService: DriveService,
private queueLoggerService: QueueLoggerService,
+ private notificationService: NotificationService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('export-muting');
}
@@ -110,6 +112,11 @@ export class ExportMutingProcessorService {
const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' });
this.logger.succ(`Exported to: ${driveFile.id}`);
+
+ this.notificationService.createNotification(user.id, 'exportCompleted', {
+ exportedEntity: 'muting',
+ fileId: driveFile.id,
+ });
} finally {
cleanup();
}
diff --git a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
index c7611012d7..9e2b678219 100644
--- a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
@@ -18,6 +18,7 @@ import { bindThis } from '@/decorators.js';
import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js';
import { Packed } from '@/misc/json-schema.js';
import { IdService } from '@/core/IdService.js';
+import { NotificationService } from '@/core/NotificationService.js';
import { JsonArrayStream } from '@/misc/JsonArrayStream.js';
import { FileWriterStream } from '@/misc/FileWriterStream.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
@@ -112,6 +113,7 @@ export class ExportNotesProcessorService {
private queueLoggerService: QueueLoggerService,
private driveFileEntityService: DriveFileEntityService,
private idService: IdService,
+ private notificationService: NotificationService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('export-notes');
}
@@ -150,6 +152,11 @@ export class ExportNotesProcessorService {
const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' });
this.logger.succ(`Exported to: ${driveFile.id}`);
+
+ this.notificationService.createNotification(user.id, 'exportCompleted', {
+ exportedEntity: 'note',
+ fileId: driveFile.id,
+ });
} finally {
cleanup();
}
diff --git a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts
index ee87cff5d3..c483d79854 100644
--- a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts
@@ -13,6 +13,7 @@ import type Logger from '@/logger.js';
import { DriveService } from '@/core/DriveService.js';
import { createTemp } from '@/misc/create-temp.js';
import { UtilityService } from '@/core/UtilityService.js';
+import { NotificationService } from '@/core/NotificationService.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
@@ -35,6 +36,7 @@ export class ExportUserListsProcessorService {
private utilityService: UtilityService,
private driveService: DriveService,
private queueLoggerService: QueueLoggerService,
+ private notificationService: NotificationService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('export-user-lists');
}
@@ -89,6 +91,11 @@ export class ExportUserListsProcessorService {
const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' });
this.logger.succ(`Exported to: ${driveFile.id}`);
+
+ this.notificationService.createNotification(user.id, 'exportCompleted', {
+ exportedEntity: 'userList',
+ fileId: driveFile.id,
+ });
} finally {
cleanup();
}
diff --git a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
index 04ad74ee01..17ba71df3d 100644
--- a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
@@ -88,23 +88,30 @@ export class ImportCustomEmojisProcessorService {
await this.emojisRepository.delete({
name: nameNfc,
});
- const driveFile = await this.driveService.addFile({
- user: null,
- path: emojiPath,
- name: record.fileName,
- force: true,
- });
- await this.customEmojiService.add({
- name: nameNfc,
- category: emojiInfo.category?.normalize('NFC'),
- host: null,
- aliases: emojiInfo.aliases?.map((a: string) => a.normalize('NFC')),
- driveFile,
- license: emojiInfo.license,
- isSensitive: emojiInfo.isSensitive,
- localOnly: emojiInfo.localOnly,
- roleIdsThatCanBeUsedThisEmojiAsReaction: [],
- });
+ try {
+ const driveFile = await this.driveService.addFile({
+ user: null,
+ path: emojiPath,
+ name: record.fileName,
+ force: true,
+ });
+ await this.customEmojiService.add({
+ name: nameNfc,
+ category: emojiInfo.category?.normalize('NFC'),
+ host: null,
+ aliases: emojiInfo.aliases?.map((a: string) => a.normalize('NFC')),
+ driveFile,
+ license: emojiInfo.license,
+ isSensitive: emojiInfo.isSensitive,
+ localOnly: emojiInfo.localOnly,
+ roleIdsThatCanBeUsedThisEmojiAsReaction: [],
+ });
+ } catch (e) {
+ if (e instanceof Error || typeof e === 'string') {
+ this.logger.error(`couldn't import ${emojiPath} for ${emojiInfo.name}: ${e}`);
+ }
+ continue;
+ }
}
cleanup();
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
index 641b8b8607..11b00bb683 100644
--- a/packages/backend/src/queue/processors/InboxProcessorService.ts
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -4,11 +4,10 @@
*/
import { URL } from 'node:url';
-import { Injectable } from '@nestjs/common';
+import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import httpSignature from '@peertube/http-signature';
import * as Bull from 'bullmq';
import type Logger from '@/logger.js';
-import { MetaService } from '@/core/MetaService.js';
import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js';
import InstanceChart from '@/core/chart/charts/instance.js';
@@ -26,16 +25,28 @@ import { JsonLdService } from '@/core/activitypub/JsonLdService.js';
import { ApInboxService } from '@/core/activitypub/ApInboxService.js';
import { bindThis } from '@/decorators.js';
import { IdentifiableError } from '@/misc/identifiable-error.js';
+import { CollapsedQueue } from '@/misc/collapsed-queue.js';
+import { MiNote } from '@/models/Note.js';
+import { MiMeta } from '@/models/Meta.js';
+import { DI } from '@/di-symbols.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type { InboxJobData } from '../types.js';
+type UpdateInstanceJob = {
+ latestRequestReceivedAt: Date,
+ shouldUnsuspend: boolean,
+};
+
@Injectable()
-export class InboxProcessorService {
+export class InboxProcessorService implements OnApplicationShutdown {
private logger: Logger;
+ private updateInstanceQueue: CollapsedQueue<MiNote['id'], UpdateInstanceJob>;
constructor(
+ @Inject(DI.meta)
+ private meta: MiMeta,
+
private utilityService: UtilityService,
- private metaService: MetaService,
private apInboxService: ApInboxService,
private federatedInstanceService: FederatedInstanceService,
private fetchInstanceMetadataService: FetchInstanceMetadataService,
@@ -48,6 +59,7 @@ export class InboxProcessorService {
private queueLoggerService: QueueLoggerService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('inbox');
+ this.updateInstanceQueue = new CollapsedQueue(60 * 1000 * 5, this.collapseUpdateInstanceJobs, this.performUpdateInstance);
}
@bindThis
@@ -63,9 +75,7 @@ export class InboxProcessorService {
const host = this.utilityService.toPuny(new URL(signature.keyId).hostname);
- // ブロックしてたら中断
- const meta = await this.metaService.fetch();
- if (this.utilityService.isBlockedHost(meta.blockedHosts, host)) {
+ if (!this.utilityService.isFederationAllowedHost(host)) {
return `Blocked request: ${host}`;
}
@@ -108,19 +118,16 @@ export class InboxProcessorService {
// HTTP-Signatureの検証
let httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem);
- // また、signatureのsignerは、activity.actorと一致する必要がある
- if (!httpSignatureValidated || authUser.user.uri !== activity.actor) {
- let renewKeyFailed = true;
-
- if (!httpSignatureValidated) {
- authUser.key = await this.apDbResolverService.refetchPublicKeyForApId(authUser.user);
-
- if (authUser.key != null) {
- httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem);
- renewKeyFailed = false;
- }
+ // maybe they changed their key? refetch it
+ if (!httpSignatureValidated) {
+ authUser.key = await this.apDbResolverService.refetchPublicKeyForApId(authUser.user);
+ if (authUser.key != null) {
+ httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem);
}
+ }
+ // また、signatureのsignerは、activity.actorと一致する必要がある
+ if (!httpSignatureValidated || authUser.user.uri !== getApId(activity.actor)) {
// 一致しなくても、でもLD-Signatureがありそうならそっちも見る
const ldSignature = activity.signature;
if (ldSignature) {
@@ -169,9 +176,8 @@ export class InboxProcessorService {
throw new Bull.UnrecoverableError(`skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`);
}
- // ブロックしてたら中断
const ldHost = this.utilityService.extractDbHost(authUser.user.uri);
- if (this.utilityService.isBlockedHost(meta.blockedHosts, ldHost)) {
+ if (!this.utilityService.isFederationAllowedHost(ldHost)) {
throw new Bull.UnrecoverableError(`Blocked request: ${ldHost}`);
}
} else {
@@ -190,11 +196,9 @@ export class InboxProcessorService {
// Update stats
this.federatedInstanceService.fetch(authUser.user.host).then(i => {
- this.federatedInstanceService.update(i.id, {
+ this.updateInstanceQueue.enqueue(i.id, {
latestRequestReceivedAt: new Date(),
- isNotResponding: false,
- // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる
- suspensionState: i.suspensionState === 'autoSuspendedForNotResponding' ? 'none' : undefined,
+ shouldUnsuspend: i.suspensionState === 'autoSuspendedForNotResponding',
});
this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
@@ -202,7 +206,7 @@ export class InboxProcessorService {
this.apRequestChart.inbox();
this.federationChart.inbox(i.host);
- if (meta.enableChartsForFederatedInstances) {
+ if (this.meta.enableChartsForFederatedInstances) {
this.instanceChart.requestReceived(i.host);
}
});
@@ -230,4 +234,36 @@ export class InboxProcessorService {
}
return 'ok';
}
+
+ @bindThis
+ public collapseUpdateInstanceJobs(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) {
+ const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt
+ ? newJob.latestRequestReceivedAt
+ : oldJob.latestRequestReceivedAt;
+ const shouldUnsuspend = oldJob.shouldUnsuspend || newJob.shouldUnsuspend;
+ return {
+ latestRequestReceivedAt,
+ shouldUnsuspend,
+ };
+ }
+
+ @bindThis
+ public async performUpdateInstance(id: string, job: UpdateInstanceJob) {
+ await this.federatedInstanceService.update(id, {
+ latestRequestReceivedAt: new Date(),
+ isNotResponding: false,
+ // もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる
+ suspensionState: job.shouldUnsuspend ? 'none' : undefined,
+ });
+ }
+
+ @bindThis
+ public async dispose(): Promise<void> {
+ await this.updateInstanceQueue.performAllNow();
+ }
+
+ @bindThis
+ async onApplicationShutdown(signal?: string) {
+ await this.dispose();
+ }
}