diff options
| author | Julia <julia@insertdomain.name> | 2025-05-29 00:07:38 +0000 |
|---|---|---|
| committer | Julia <julia@insertdomain.name> | 2025-05-29 00:07:38 +0000 |
| commit | 6b554c178b81f13f83a69b19d44b72b282a0c119 (patch) | |
| tree | f5537f1a56323a4dd57ba150b3cb84a2d8b5dc63 /packages/backend/src/queue | |
| parent | merge: Security fixes (!970) (diff) | |
| parent | bump version for release (diff) | |
| download | sharkey-6b554c178b81f13f83a69b19d44b72b282a0c119.tar.gz sharkey-6b554c178b81f13f83a69b19d44b72b282a0c119.tar.bz2 sharkey-6b554c178b81f13f83a69b19d44b72b282a0c119.zip | |
merge: release 2025.4.2 (!1051)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/1051
Approved-by: Hazelnoot <acomputerdog@gmail.com>
Approved-by: Marie <github@yuugi.dev>
Approved-by: Julia <julia@insertdomain.name>
Diffstat (limited to 'packages/backend/src/queue')
8 files changed, 276 insertions, 75 deletions
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index 297edfd545..7f7ce2452c 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -47,7 +47,7 @@ import { CleanProcessorService } from './processors/CleanProcessorService.js'; import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js'; import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostProcessorService.js'; import { QueueLoggerService } from './QueueLoggerService.js'; -import { QUEUE, baseQueueOptions } from './const.js'; +import { QUEUE, baseWorkerOptions } from './const.js'; import { ImportNotesProcessorService } from './processors/ImportNotesProcessorService.js'; // ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019 @@ -186,7 +186,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return processer(job); } }, { - ...baseQueueOptions(this.config, QUEUE.SYSTEM), + ...baseWorkerOptions(this.config, QUEUE.SYSTEM), autorun: false, }); @@ -251,7 +251,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return processer(job); } }, { - ...baseQueueOptions(this.config, QUEUE.DB), + ...baseWorkerOptions(this.config, QUEUE.DB), autorun: false, }); @@ -283,7 +283,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return this.deliverProcessorService.process(job); } }, { - ...baseQueueOptions(this.config, QUEUE.DELIVER), + ...baseWorkerOptions(this.config, QUEUE.DELIVER), autorun: false, concurrency: this.config.deliverJobConcurrency ?? 128, limiter: { @@ -323,7 +323,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return this.inboxProcessorService.process(job); } }, { - ...baseQueueOptions(this.config, QUEUE.INBOX), + ...baseWorkerOptions(this.config, QUEUE.INBOX), autorun: false, concurrency: this.config.inboxJobConcurrency ?? 16, limiter: { @@ -363,7 +363,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return this.userWebhookDeliverProcessorService.process(job); } }, { - ...baseQueueOptions(this.config, QUEUE.USER_WEBHOOK_DELIVER), + ...baseWorkerOptions(this.config, QUEUE.USER_WEBHOOK_DELIVER), autorun: false, concurrency: 64, limiter: { @@ -403,7 +403,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return this.systemWebhookDeliverProcessorService.process(job); } }, { - ...baseQueueOptions(this.config, QUEUE.SYSTEM_WEBHOOK_DELIVER), + ...baseWorkerOptions(this.config, QUEUE.SYSTEM_WEBHOOK_DELIVER), autorun: false, concurrency: 16, limiter: { @@ -453,7 +453,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return processer(job); } }, { - ...baseQueueOptions(this.config, QUEUE.RELATIONSHIP), + ...baseWorkerOptions(this.config, QUEUE.RELATIONSHIP), autorun: false, concurrency: this.config.relationshipJobConcurrency ?? 16, limiter: { @@ -498,7 +498,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return processer(job); } }, { - ...baseQueueOptions(this.config, QUEUE.OBJECT_STORAGE), + ...baseWorkerOptions(this.config, QUEUE.OBJECT_STORAGE), autorun: false, concurrency: 16, }); @@ -531,7 +531,7 @@ export class QueueProcessorService implements OnApplicationShutdown { return this.endedPollNotificationProcessorService.process(job); } }, { - ...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION), + ...baseWorkerOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION), autorun: false, }); } @@ -540,7 +540,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region schedule note post { this.schedulerNotePostQueueWorker = new Bull.Worker(QUEUE.SCHEDULE_NOTE_POST, (job) => this.scheduleNotePostProcessorService.process(job), { - ...baseQueueOptions(this.config, QUEUE.SCHEDULE_NOTE_POST), + ...baseWorkerOptions(this.config, QUEUE.SCHEDULE_NOTE_POST), autorun: false, }); } diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts index fdf012f149..17c6b81736 100644 --- a/packages/backend/src/queue/const.ts +++ b/packages/backend/src/queue/const.ts @@ -3,6 +3,7 @@ * SPDX-License-Identifier: AGPL-3.0-only */ +import { MetricsTime } from 'bullmq'; import { Config } from '@/config.js'; import type * as Bull from 'bullmq'; @@ -28,3 +29,12 @@ export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof t prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue:${queueName}` : `queue:${queueName}`, }; } + +export function baseWorkerOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.WorkerOptions { + return { + ...baseQueueOptions(config, queueName), + metrics: { + maxDataPoints: MetricsTime.ONE_WEEK, + }, + }; +} diff --git a/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts index ef21b6142e..db8d2e789e 100644 --- a/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts +++ b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts @@ -29,7 +29,7 @@ export type ModeratorInactivityEvaluationResult = { isModeratorsInactive: boolean; inactiveModerators: MiUser[]; remainingTime: ModeratorInactivityRemainingTime; -} +}; export type ModeratorInactivityRemainingTime = { time: number; diff --git a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts index 0c70829132..4e9779a41b 100644 --- a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts +++ b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts @@ -4,9 +4,9 @@ */ import { Inject, Injectable } from '@nestjs/common'; -import { MoreThan } from 'typeorm'; +import { In, MoreThan } from 'typeorm'; import { DI } from '@/di-symbols.js'; -import type { DriveFilesRepository, NoteReactionsRepository, NotesRepository, UserProfilesRepository, UsersRepository, NoteScheduleRepository, MiNoteSchedule } from '@/models/_.js'; +import type { DriveFilesRepository, NoteReactionsRepository, NotesRepository, UserProfilesRepository, UsersRepository, NoteScheduleRepository, MiNoteSchedule, FollowingsRepository, FollowRequestsRepository, BlockingsRepository, MutingsRepository, ClipsRepository, ClipNotesRepository, LatestNotesRepository, NoteEditRepository, NoteFavoritesRepository, PollVotesRepository, PollsRepository, SigninsRepository, UserIpsRepository, RegistryItemsRepository } from '@/models/_.js'; import type Logger from '@/logger.js'; import { DriveService } from '@/core/DriveService.js'; import type { MiDriveFile } from '@/models/DriveFile.js'; @@ -17,10 +17,10 @@ import { bindThis } from '@/decorators.js'; import { SearchService } from '@/core/SearchService.js'; import { ApLogService } from '@/core/ApLogService.js'; import { ReactionService } from '@/core/ReactionService.js'; +import { QueueService } from '@/core/QueueService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbUserDeleteJobData } from '../types.js'; -import { QueueService } from '@/core/QueueService.js'; @Injectable() export class DeleteAccountProcessorService { @@ -45,6 +45,48 @@ export class DeleteAccountProcessorService { @Inject(DI.noteScheduleRepository) private noteScheduleRepository: NoteScheduleRepository, + @Inject(DI.followingsRepository) + private readonly followingsRepository: FollowingsRepository, + + @Inject(DI.followRequestsRepository) + private readonly followRequestsRepository: FollowRequestsRepository, + + @Inject(DI.blockingsRepository) + private readonly blockingsRepository: BlockingsRepository, + + @Inject(DI.mutingsRepository) + private readonly mutingsRepository: MutingsRepository, + + @Inject(DI.clipsRepository) + private readonly clipsRepository: ClipsRepository, + + @Inject(DI.clipNotesRepository) + private readonly clipNotesRepository: ClipNotesRepository, + + @Inject(DI.latestNotesRepository) + private readonly latestNotesRepository: LatestNotesRepository, + + @Inject(DI.noteEditRepository) + private readonly noteEditRepository: NoteEditRepository, + + @Inject(DI.noteFavoritesRepository) + private readonly noteFavoritesRepository: NoteFavoritesRepository, + + @Inject(DI.pollVotesRepository) + private readonly pollVotesRepository: PollVotesRepository, + + @Inject(DI.pollsRepository) + private readonly pollsRepository: PollsRepository, + + @Inject(DI.signinsRepository) + private readonly signinsRepository: SigninsRepository, + + @Inject(DI.userIpsRepository) + private readonly userIpsRepository: UserIpsRepository, + + @Inject(DI.registryItemsRepository) + private readonly registryItemsRepository: RegistryItemsRepository, + private queueService: QueueService, private driveService: DriveService, private emailService: EmailService, @@ -65,6 +107,140 @@ export class DeleteAccountProcessorService { return; } + { // Delete user clips + const userClips = await this.clipsRepository.find({ + select: { + id: true, + }, + where: { + userId: user.id, + }, + }) as { id: string }[]; + + // Delete one-at-a-time because there can be a lot + for (const clip of userClips) { + await this.clipNotesRepository.delete({ + id: clip.id, + }); + } + + await this.clipsRepository.delete({ + userId: user.id, + }); + + this.logger.succ('All clips have been deleted.'); + } + + { // Delete favorites + await this.noteFavoritesRepository.delete({ + userId: user.id, + }); + + this.logger.succ('All favorites have been deleted.'); + } + + { // Delete user relations + await this.followingsRepository.delete({ + followerId: user.id, + }); + + await this.followingsRepository.delete({ + followeeId: user.id, + }); + + await this.followRequestsRepository.delete({ + followerId: user.id, + }); + + await this.followRequestsRepository.delete({ + followeeId: user.id, + }); + + await this.blockingsRepository.delete({ + blockerId: user.id, + }); + + await this.blockingsRepository.delete({ + blockeeId: user.id, + }); + + await this.mutingsRepository.delete({ + muterId: user.id, + }); + + await this.mutingsRepository.delete({ + muteeId: user.id, + }); + + this.logger.succ('All user relations have been deleted.'); + } + + { // Delete reactions + let cursor: MiNoteReaction['id'] | null = null; + + while (true) { + const reactions = await this.noteReactionsRepository.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}), + }, + take: 100, + order: { + id: 1, + }, + relations: { + note: true, + }, + }) as MiNoteReaction[]; + + if (reactions.length === 0) { + break; + } + + cursor = reactions.at(-1)?.id ?? null; + + for (const reaction of reactions) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const note = reaction.note!; + await this.reactionService.delete(user, note, reaction); + } + } + + this.logger.succ('All reactions have been deleted'); + } + + { // Poll votes + let cursor: MiNoteReaction['id'] | null = null; + + while (true) { + const votes = await this.pollVotesRepository.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}), + }, + select: { + id: true, + }, + take: 100, + order: { + id: 1, + }, + }) as { id: string }[]; + + if (votes.length === 0) { + break; + } + + cursor = votes.at(-1)?.id ?? null; + + await this.pollVotesRepository.delete({ + id: In(votes.map(v => v.id)), + }); + } + + this.logger.succ('All poll votes have been deleted'); + } + { // Delete scheduled notes const scheduledNotes = await this.noteScheduleRepository.findBy({ userId: user.id, @@ -82,6 +258,10 @@ export class DeleteAccountProcessorService { } { // Delete notes + await this.latestNotesRepository.delete({ + userId: user.id, + }); + let cursor: MiNote['id'] | null = null; while (true) { @@ -102,7 +282,23 @@ export class DeleteAccountProcessorService { cursor = notes.at(-1)?.id ?? null; - await this.notesRepository.delete(notes.map(note => note.id)); + // Delete associated polls one-at-a-time, since it can cascade to a LOT of vote entries + for (const note of notes) { + if (note.hasPoll) { + await this.pollsRepository.delete({ + noteId: note.id, + }); + } + } + + const ids = notes.map(note => note.id); + + await this.noteEditRepository.delete({ + noteId: In(ids), + }); + await this.notesRepository.delete({ + id: In(ids), + }); for (const note of notes) { await this.searchService.unindexNote(note); @@ -119,37 +315,6 @@ export class DeleteAccountProcessorService { this.logger.succ('All of notes deleted'); } - { // Delete reactions - let cursor: MiNoteReaction['id'] | null = null; - - while (true) { - const reactions = await this.noteReactionsRepository.find({ - where: { - userId: user.id, - ...(cursor ? { id: MoreThan(cursor) } : {}), - }, - take: 100, - order: { - id: 1, - }, - }) as MiNoteReaction[]; - - if (reactions.length === 0) { - break; - } - - cursor = reactions.at(-1)?.id ?? null; - - for (const reaction of reactions) { - const note = await this.notesRepository.findOneBy({ id: reaction.noteId }) as MiNote; - - await this.reactionService.delete(user, note); - } - } - - this.logger.succ('All reactions have been deleted'); - } - { // Delete files let cursor: MiDriveFile['id'] | null = null; @@ -184,22 +349,49 @@ export class DeleteAccountProcessorService { await this.apLogService.deleteObjectLogs(user.uri) .catch(err => this.logger.error(err, `Failed to delete AP logs for user '${user.uri}'`)); } + + await this.apLogService.deleteInboxLogs(user.id) + .catch(err => this.logger.error(err, `Failed to delete AP logs for user '${user.uri}'`)); + + this.logger.succ('All AP logs deleted'); } - { // Send email notification - const profile = await this.userProfilesRepository.findOneByOrFail({ userId: user.id }); - if (profile.email && profile.emailVerified) { - this.emailService.sendEmail(profile.email, 'Account deleted', - 'Your account has been deleted.', - 'Your account has been deleted.'); + // Do this BEFORE deleting the account! + const profile = await this.userProfilesRepository.findOneBy({ userId: user.id }); + + { // Delete the actual account + await this.userIpsRepository.delete({ + userId: user.id, + }); + + await this.signinsRepository.delete({ + userId: user.id, + }); + + await this.registryItemsRepository.delete({ + userId: user.id, + }); + + // soft指定されている場合は物理削除しない + if (job.data.soft) { + // nop + } else { + await this.usersRepository.delete(user.id); } + + this.logger.succ('Account data deleted'); } - // soft指定されている場合は物理削除しない - if (job.data.soft) { - // nop - } else { - await this.usersRepository.delete(job.data.user.id); + { // Send email notification + if (profile && profile.email && profile.emailVerified) { + try { + await this.emailService.sendEmail(profile.email, 'Account deleted', + 'Your account has been deleted.', + 'Your account has been deleted.'); + } catch (e) { + this.logger.warn('Failed to send account deletion message:', { e }); + } + } } return 'Account deleted'; diff --git a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts index 383fa0c26a..d08cadd378 100644 --- a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts @@ -6,6 +6,7 @@ import * as fs from 'node:fs'; import { Inject, Injectable } from '@nestjs/common'; import { ZipReader } from 'slacc'; +import { IsNull } from 'typeorm'; import { DI } from '@/di-symbols.js'; import type { EmojisRepository, DriveFilesRepository } from '@/models/_.js'; import type Logger from '@/logger.js'; @@ -91,6 +92,7 @@ export class ImportCustomEmojisProcessorService { const emojiPath = outputPath + '/' + record.fileName; await this.emojisRepository.delete({ name: nameNfc, + host: IsNull(), }); try { diff --git a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts index ee9819b29f..5e660e8081 100644 --- a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts @@ -83,7 +83,7 @@ export class ImportNotesProcessorService { } @bindThis - private downloadUrl(url: string, path:string): Promise<{filename: string}> { + private downloadUrl(url: string, path:string): Promise<{ filename: string }> { return this.downloadService.downloadUrl(url, path, { operationTimeout: this.config.import?.downloadTimeout, maxSize: this.config.import?.maxFileSize }); } diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 35a0bf095d..9564724c62 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -25,13 +25,12 @@ import { JsonLdService } from '@/core/activitypub/JsonLdService.js'; import { ApInboxService } from '@/core/activitypub/ApInboxService.js'; import { bindThis } from '@/decorators.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; -import { CollapsedQueue } from '@/misc/collapsed-queue.js'; -import { MiNote } from '@/models/Note.js'; import { MiMeta } from '@/models/Meta.js'; import { DI } from '@/di-symbols.js'; import { SkApInboxLog } from '@/models/_.js'; import type { Config } from '@/config.js'; import { ApLogService, calculateDurationSince } from '@/core/ApLogService.js'; +import { UpdateInstanceQueue } from '@/core/UpdateInstanceQueue.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { InboxJobData } from '../types.js'; @@ -43,7 +42,7 @@ type UpdateInstanceJob = { @Injectable() export class InboxProcessorService implements OnApplicationShutdown { private logger: Logger; - private updateInstanceQueue: CollapsedQueue<MiNote['id'], UpdateInstanceJob>; + //private updateInstanceQueue: CollapsedQueue<MiNote['id'], UpdateInstanceJob>; constructor( @Inject(DI.meta) @@ -64,9 +63,9 @@ export class InboxProcessorService implements OnApplicationShutdown { private federationChart: FederationChart, private queueLoggerService: QueueLoggerService, private readonly apLogService: ApLogService, + private readonly updateInstanceQueue: UpdateInstanceQueue, ) { this.logger = this.queueLoggerService.logger.createSubLogger('inbox'); - this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance); } @bindThis @@ -149,12 +148,12 @@ export class InboxProcessorService implements OnApplicationShutdown { // それでもわからなければ終了 if (authUser == null) { - throw new Bull.UnrecoverableError('skip: failed to resolve user'); + throw new Bull.UnrecoverableError(`skip: failed to resolve user ${getApId(activity.actor)}`); } // publicKey がなくても終了 if (authUser.key == null) { - throw new Bull.UnrecoverableError('skip: failed to resolve user publicKey'); + throw new Bull.UnrecoverableError(`skip: failed to resolve user publicKey ${getApId(activity.actor)}`); } // HTTP-Signatureの検証 @@ -232,7 +231,7 @@ export class InboxProcessorService implements OnApplicationShutdown { const signerHost = this.utilityService.extractDbHost(authUser.user.uri!); const activityIdHost = this.utilityService.extractDbHost(activity.id); if (signerHost !== activityIdHost) { - throw new Bull.UnrecoverableError(`skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`); + throw new Bull.UnrecoverableError(`skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost})`); } } else { // Activity ID should only be string or undefined. @@ -333,9 +332,7 @@ export class InboxProcessorService implements OnApplicationShutdown { } @bindThis - public async dispose(): Promise<void> { - await this.updateInstanceQueue.performAllNow(); - } + public async dispose(): Promise<void> {} @bindThis async onApplicationShutdown(signal?: string) { diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index a900675a86..1bd9f7a0ab 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -38,7 +38,7 @@ export type RelationshipJobData = { silent?: boolean; requestId?: string; withReplies?: boolean; -} +}; export type DbJobData<T extends keyof DbJobMap> = DbJobMap[T]; @@ -69,11 +69,11 @@ export type DbJobMap = { importUserLists: DbUserImportJobData; importCustomEmojis: DbUserImportJobData; deleteAccount: DbUserDeleteJobData; -} +}; export type DbJobDataWithUser = { user: ThinUser; -} +}; export type DbExportFollowingData = { user: ThinUser; @@ -83,7 +83,7 @@ export type DbExportFollowingData = { export type DBExportAntennasData = { user: ThinUser -} +}; export type DbUserDeleteJobData = { user: ThinUser; @@ -105,7 +105,7 @@ export type DbNoteImportJobData = { export type DBAntennaImportJobData = { user: ThinUser, antenna: Antenna -} +}; export type DbUserImportToDbJobData = { user: ThinUser; @@ -161,4 +161,4 @@ export type ThinUser = { export type ScheduleNotePostJobData = { scheduleNoteId: MiNote['id']; -} +}; |