summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue
diff options
context:
space:
mode:
authorHazelnoot <acomputerdog@gmail.com>2025-05-07 16:33:18 +0000
committerHazelnoot <acomputerdog@gmail.com>2025-05-07 16:33:18 +0000
commitd39a56c1b7d74dd07cc78b4c82a6fb6e51036252 (patch)
tree24f9c6baa07fadc11c791f1a59bee2c3149cbf56 /packages/backend/src/queue
parentmerge: Add BunnyCDN Edge Storage support (!952) (diff)
parentisNotUserHome > isUserHome (diff)
downloadsharkey-d39a56c1b7d74dd07cc78b4c82a6fb6e51036252.tar.gz
sharkey-d39a56c1b7d74dd07cc78b4c82a6fb6e51036252.tar.bz2
sharkey-d39a56c1b7d74dd07cc78b4c82a6fb6e51036252.zip
merge: Merge upstream 2025.4.1 (!955)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/955 Closes #638, #1037, #734, and #766 Approved-by: dakkar <dakkar@thenautilus.net> Approved-by: Marie <github@yuugi.dev>
Diffstat (limited to 'packages/backend/src/queue')
-rw-r--r--packages/backend/src/queue/QueueProcessorService.ts22
-rw-r--r--packages/backend/src/queue/const.ts10
-rw-r--r--packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/DeleteAccountProcessorService.ts5
-rw-r--r--packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ImportNotesProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts7
-rw-r--r--packages/backend/src/queue/types.ts12
8 files changed, 38 insertions, 24 deletions
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts
index 297edfd545..7f7ce2452c 100644
--- a/packages/backend/src/queue/QueueProcessorService.ts
+++ b/packages/backend/src/queue/QueueProcessorService.ts
@@ -47,7 +47,7 @@ import { CleanProcessorService } from './processors/CleanProcessorService.js';
import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostProcessorService.js';
import { QueueLoggerService } from './QueueLoggerService.js';
-import { QUEUE, baseQueueOptions } from './const.js';
+import { QUEUE, baseWorkerOptions } from './const.js';
import { ImportNotesProcessorService } from './processors/ImportNotesProcessorService.js';
// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
@@ -186,7 +186,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return processer(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.SYSTEM),
+ ...baseWorkerOptions(this.config, QUEUE.SYSTEM),
autorun: false,
});
@@ -251,7 +251,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return processer(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.DB),
+ ...baseWorkerOptions(this.config, QUEUE.DB),
autorun: false,
});
@@ -283,7 +283,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return this.deliverProcessorService.process(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.DELIVER),
+ ...baseWorkerOptions(this.config, QUEUE.DELIVER),
autorun: false,
concurrency: this.config.deliverJobConcurrency ?? 128,
limiter: {
@@ -323,7 +323,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return this.inboxProcessorService.process(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.INBOX),
+ ...baseWorkerOptions(this.config, QUEUE.INBOX),
autorun: false,
concurrency: this.config.inboxJobConcurrency ?? 16,
limiter: {
@@ -363,7 +363,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return this.userWebhookDeliverProcessorService.process(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.USER_WEBHOOK_DELIVER),
+ ...baseWorkerOptions(this.config, QUEUE.USER_WEBHOOK_DELIVER),
autorun: false,
concurrency: 64,
limiter: {
@@ -403,7 +403,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return this.systemWebhookDeliverProcessorService.process(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.SYSTEM_WEBHOOK_DELIVER),
+ ...baseWorkerOptions(this.config, QUEUE.SYSTEM_WEBHOOK_DELIVER),
autorun: false,
concurrency: 16,
limiter: {
@@ -453,7 +453,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return processer(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.RELATIONSHIP),
+ ...baseWorkerOptions(this.config, QUEUE.RELATIONSHIP),
autorun: false,
concurrency: this.config.relationshipJobConcurrency ?? 16,
limiter: {
@@ -498,7 +498,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return processer(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.OBJECT_STORAGE),
+ ...baseWorkerOptions(this.config, QUEUE.OBJECT_STORAGE),
autorun: false,
concurrency: 16,
});
@@ -531,7 +531,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
return this.endedPollNotificationProcessorService.process(job);
}
}, {
- ...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION),
+ ...baseWorkerOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION),
autorun: false,
});
}
@@ -540,7 +540,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#region schedule note post
{
this.schedulerNotePostQueueWorker = new Bull.Worker(QUEUE.SCHEDULE_NOTE_POST, (job) => this.scheduleNotePostProcessorService.process(job), {
- ...baseQueueOptions(this.config, QUEUE.SCHEDULE_NOTE_POST),
+ ...baseWorkerOptions(this.config, QUEUE.SCHEDULE_NOTE_POST),
autorun: false,
});
}
diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts
index fdf012f149..17c6b81736 100644
--- a/packages/backend/src/queue/const.ts
+++ b/packages/backend/src/queue/const.ts
@@ -3,6 +3,7 @@
* SPDX-License-Identifier: AGPL-3.0-only
*/
+import { MetricsTime } from 'bullmq';
import { Config } from '@/config.js';
import type * as Bull from 'bullmq';
@@ -28,3 +29,12 @@ export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof t
prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue:${queueName}` : `queue:${queueName}`,
};
}
+
+export function baseWorkerOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.WorkerOptions {
+ return {
+ ...baseQueueOptions(config, queueName),
+ metrics: {
+ maxDataPoints: MetricsTime.ONE_WEEK,
+ },
+ };
+}
diff --git a/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts
index ef21b6142e..db8d2e789e 100644
--- a/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts
+++ b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts
@@ -29,7 +29,7 @@ export type ModeratorInactivityEvaluationResult = {
isModeratorsInactive: boolean;
inactiveModerators: MiUser[];
remainingTime: ModeratorInactivityRemainingTime;
-}
+};
export type ModeratorInactivityRemainingTime = {
time: number;
diff --git a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
index 0c70829132..46cee096cf 100644
--- a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
@@ -184,6 +184,11 @@ export class DeleteAccountProcessorService {
await this.apLogService.deleteObjectLogs(user.uri)
.catch(err => this.logger.error(err, `Failed to delete AP logs for user '${user.uri}'`));
}
+
+ await this.apLogService.deleteInboxLogs(user.id)
+ .catch(err => this.logger.error(err, `Failed to delete AP logs for user '${user.uri}'`));
+
+ this.logger.succ('All AP logs deleted');
}
{ // Send email notification
diff --git a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
index 383fa0c26a..d08cadd378 100644
--- a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
@@ -6,6 +6,7 @@
import * as fs from 'node:fs';
import { Inject, Injectable } from '@nestjs/common';
import { ZipReader } from 'slacc';
+import { IsNull } from 'typeorm';
import { DI } from '@/di-symbols.js';
import type { EmojisRepository, DriveFilesRepository } from '@/models/_.js';
import type Logger from '@/logger.js';
@@ -91,6 +92,7 @@ export class ImportCustomEmojisProcessorService {
const emojiPath = outputPath + '/' + record.fileName;
await this.emojisRepository.delete({
name: nameNfc,
+ host: IsNull(),
});
try {
diff --git a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts
index ee9819b29f..5e660e8081 100644
--- a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts
@@ -83,7 +83,7 @@ export class ImportNotesProcessorService {
}
@bindThis
- private downloadUrl(url: string, path:string): Promise<{filename: string}> {
+ private downloadUrl(url: string, path:string): Promise<{ filename: string }> {
return this.downloadService.downloadUrl(url, path, { operationTimeout: this.config.import?.downloadTimeout, maxSize: this.config.import?.maxFileSize });
}
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
index fc7c66591a..9564724c62 100644
--- a/packages/backend/src/queue/processors/InboxProcessorService.ts
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -25,8 +25,6 @@ import { JsonLdService } from '@/core/activitypub/JsonLdService.js';
import { ApInboxService } from '@/core/activitypub/ApInboxService.js';
import { bindThis } from '@/decorators.js';
import { IdentifiableError } from '@/misc/identifiable-error.js';
-//import { CollapsedQueue } from '@/misc/collapsed-queue.js';
-//import { MiNote } from '@/models/Note.js';
import { MiMeta } from '@/models/Meta.js';
import { DI } from '@/di-symbols.js';
import { SkApInboxLog } from '@/models/_.js';
@@ -68,7 +66,6 @@ export class InboxProcessorService implements OnApplicationShutdown {
private readonly updateInstanceQueue: UpdateInstanceQueue,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('inbox');
- //this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance);
}
@bindThis
@@ -151,12 +148,12 @@ export class InboxProcessorService implements OnApplicationShutdown {
// それでもわからなければ終了
if (authUser == null) {
- throw new Bull.UnrecoverableError('skip: failed to resolve user');
+ throw new Bull.UnrecoverableError(`skip: failed to resolve user ${getApId(activity.actor)}`);
}
// publicKey がなくても終了
if (authUser.key == null) {
- throw new Bull.UnrecoverableError('skip: failed to resolve user publicKey');
+ throw new Bull.UnrecoverableError(`skip: failed to resolve user publicKey ${getApId(activity.actor)}`);
}
// HTTP-Signatureの検証
diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts
index a900675a86..1bd9f7a0ab 100644
--- a/packages/backend/src/queue/types.ts
+++ b/packages/backend/src/queue/types.ts
@@ -38,7 +38,7 @@ export type RelationshipJobData = {
silent?: boolean;
requestId?: string;
withReplies?: boolean;
-}
+};
export type DbJobData<T extends keyof DbJobMap> = DbJobMap[T];
@@ -69,11 +69,11 @@ export type DbJobMap = {
importUserLists: DbUserImportJobData;
importCustomEmojis: DbUserImportJobData;
deleteAccount: DbUserDeleteJobData;
-}
+};
export type DbJobDataWithUser = {
user: ThinUser;
-}
+};
export type DbExportFollowingData = {
user: ThinUser;
@@ -83,7 +83,7 @@ export type DbExportFollowingData = {
export type DBExportAntennasData = {
user: ThinUser
-}
+};
export type DbUserDeleteJobData = {
user: ThinUser;
@@ -105,7 +105,7 @@ export type DbNoteImportJobData = {
export type DBAntennaImportJobData = {
user: ThinUser,
antenna: Antenna
-}
+};
export type DbUserImportToDbJobData = {
user: ThinUser;
@@ -161,4 +161,4 @@ export type ThinUser = {
export type ScheduleNotePostJobData = {
scheduleNoteId: MiNote['id'];
-}
+};