diff options
Diffstat (limited to 'packages/backend/src/queue')
13 files changed, 1955 insertions, 63 deletions
diff --git a/packages/backend/src/queue/QueueProcessorModule.ts b/packages/backend/src/queue/QueueProcessorModule.ts index 9044285bf6..51fd97dc97 100644 --- a/packages/backend/src/queue/QueueProcessorModule.ts +++ b/packages/backend/src/queue/QueueProcessorModule.ts @@ -22,6 +22,7 @@ import { CleanRemoteFilesProcessorService } from './processors/CleanRemoteFilesP import { DeleteAccountProcessorService } from './processors/DeleteAccountProcessorService.js'; import { DeleteDriveFilesProcessorService } from './processors/DeleteDriveFilesProcessorService.js'; import { DeleteFileProcessorService } from './processors/DeleteFileProcessorService.js'; +import { ExportAccountDataProcessorService } from './processors/ExportAccountDataProcessorService.js'; import { ExportBlockingProcessorService } from './processors/ExportBlockingProcessorService.js'; import { ExportCustomEmojisProcessorService } from './processors/ExportCustomEmojisProcessorService.js'; import { ExportFollowingProcessorService } from './processors/ExportFollowingProcessorService.js'; @@ -32,6 +33,7 @@ import { ExportUserListsProcessorService } from './processors/ExportUserListsPro import { ExportAntennasProcessorService } from './processors/ExportAntennasProcessorService.js'; import { ImportBlockingProcessorService } from './processors/ImportBlockingProcessorService.js'; import { ImportCustomEmojisProcessorService } from './processors/ImportCustomEmojisProcessorService.js'; +import { ImportNotesProcessorService } from './processors/ImportNotesProcessorService.js'; import { ImportFollowingProcessorService } from './processors/ImportFollowingProcessorService.js'; import { ImportMutingProcessorService } from './processors/ImportMutingProcessorService.js'; import { ImportUserListsProcessorService } from './processors/ImportUserListsProcessorService.js'; @@ -41,6 +43,7 @@ import { TickChartsProcessorService } from './processors/TickChartsProcessorServ import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js'; import { ExportFavoritesProcessorService } from './processors/ExportFavoritesProcessorService.js'; import { RelationshipProcessorService } from './processors/RelationshipProcessorService.js'; +import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostProcessorService.js'; @Module({ imports: [ @@ -56,6 +59,7 @@ import { RelationshipProcessorService } from './processors/RelationshipProcessor BakeBufferedReactionsProcessorService, CleanProcessorService, DeleteDriveFilesProcessorService, + ExportAccountDataProcessorService, ExportCustomEmojisProcessorService, ExportNotesProcessorService, ExportClipsProcessorService, @@ -65,6 +69,7 @@ import { RelationshipProcessorService } from './processors/RelationshipProcessor ExportBlockingProcessorService, ExportUserListsProcessorService, ExportAntennasProcessorService, + ImportNotesProcessorService, ImportFollowingProcessorService, ImportMutingProcessorService, ImportBlockingProcessorService, @@ -84,6 +89,7 @@ import { RelationshipProcessorService } from './processors/RelationshipProcessor CheckExpiredMutingsProcessorService, CheckModeratorsActivityProcessorService, QueueProcessorService, + ScheduleNotePostProcessorService, ], exports: [ QueueProcessorService, diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index c98ebcdcd9..90aaf67a84 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -11,12 +11,14 @@ import { DI } from '@/di-symbols.js'; import type Logger from '@/logger.js'; import { bindThis } from '@/decorators.js'; import { CheckModeratorsActivityProcessorService } from '@/queue/processors/CheckModeratorsActivityProcessorService.js'; +import { StatusError } from '@/misc/status-error.js'; import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js'; import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js'; import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js'; import { DeliverProcessorService } from './processors/DeliverProcessorService.js'; import { InboxProcessorService } from './processors/InboxProcessorService.js'; import { DeleteDriveFilesProcessorService } from './processors/DeleteDriveFilesProcessorService.js'; +import { ExportAccountDataProcessorService } from './processors/ExportAccountDataProcessorService.js'; import { ExportCustomEmojisProcessorService } from './processors/ExportCustomEmojisProcessorService.js'; import { ExportNotesProcessorService } from './processors/ExportNotesProcessorService.js'; import { ExportClipsProcessorService } from './processors/ExportClipsProcessorService.js'; @@ -43,8 +45,10 @@ import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMu import { BakeBufferedReactionsProcessorService } from './processors/BakeBufferedReactionsProcessorService.js'; import { CleanProcessorService } from './processors/CleanProcessorService.js'; import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js'; +import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostProcessorService.js'; import { QueueLoggerService } from './QueueLoggerService.js'; import { QUEUE, baseWorkerOptions } from './const.js'; +import { ImportNotesProcessorService } from './processors/ImportNotesProcessorService.js'; // ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019 function httpRelatedBackoff(attemptsMade: number) { @@ -84,6 +88,7 @@ export class QueueProcessorService implements OnApplicationShutdown { private relationshipQueueWorker: Bull.Worker; private objectStorageQueueWorker: Bull.Worker; private endedPollNotificationQueueWorker: Bull.Worker; + private schedulerNotePostQueueWorker: Bull.Worker; constructor( @Inject(DI.config) @@ -96,6 +101,7 @@ export class QueueProcessorService implements OnApplicationShutdown { private deliverProcessorService: DeliverProcessorService, private inboxProcessorService: InboxProcessorService, private deleteDriveFilesProcessorService: DeleteDriveFilesProcessorService, + private exportAccountDataProcessorService: ExportAccountDataProcessorService, private exportCustomEmojisProcessorService: ExportCustomEmojisProcessorService, private exportNotesProcessorService: ExportNotesProcessorService, private exportClipsProcessorService: ExportClipsProcessorService, @@ -106,6 +112,7 @@ export class QueueProcessorService implements OnApplicationShutdown { private exportUserListsProcessorService: ExportUserListsProcessorService, private exportAntennasProcessorService: ExportAntennasProcessorService, private importFollowingProcessorService: ImportFollowingProcessorService, + private importNotesProcessorService: ImportNotesProcessorService, private importMutingProcessorService: ImportMutingProcessorService, private importBlockingProcessorService: ImportBlockingProcessorService, private importUserListsProcessorService: ImportUserListsProcessorService, @@ -123,6 +130,7 @@ export class QueueProcessorService implements OnApplicationShutdown { private bakeBufferedReactionsProcessorService: BakeBufferedReactionsProcessorService, private checkModeratorsActivityProcessorService: CheckModeratorsActivityProcessorService, private cleanProcessorService: CleanProcessorService, + private scheduleNotePostProcessorService: ScheduleNotePostProcessorService, ) { this.logger = this.queueLoggerService.logger; @@ -130,7 +138,7 @@ export class QueueProcessorService implements OnApplicationShutdown { // 何故かeがundefinedで来ることがある if (!e) return '?'; - if (e instanceof Bull.UnrecoverableError || e.name === 'AbortError') { + if (e instanceof Bull.UnrecoverableError || e.name === 'AbortError' || e instanceof StatusError) { return `${e.name}: ${e.message}`; } @@ -144,12 +152,15 @@ export class QueueProcessorService implements OnApplicationShutdown { function renderJob(job?: Bull.Job) { if (!job) return '?'; - return { - name: job.name || undefined, + const info: Record<string, string> = { info: getJobInfo(job), - failedReason: job.failedReason || undefined, data: job.data, }; + + if (job.name) info.name = job.name; + if (job.failedReason) info.failedReason = job.failedReason; + + return info; } //#region system @@ -212,6 +223,7 @@ export class QueueProcessorService implements OnApplicationShutdown { case 'exportBlocking': return this.exportBlockingProcessorService.process(job); case 'exportUserLists': return this.exportUserListsProcessorService.process(job); case 'exportAntennas': return this.exportAntennasProcessorService.process(job); + case 'exportAccountData': return this.exportAccountDataProcessorService.process(job); case 'importFollowing': return this.importFollowingProcessorService.process(job); case 'importFollowingToDb': return this.importFollowingProcessorService.processDb(job); case 'importMuting': return this.importMutingProcessorService.process(job); @@ -220,6 +232,13 @@ export class QueueProcessorService implements OnApplicationShutdown { case 'importUserLists': return this.importUserListsProcessorService.process(job); case 'importCustomEmojis': return this.importCustomEmojisProcessorService.process(job); case 'importAntennas': return this.importAntennasProcessorService.process(job); + case 'importNotes': return this.importNotesProcessorService.process(job); + case 'importTweetsToDb': return this.importNotesProcessorService.processTwitterDb(job); + case 'importIGToDb': return this.importNotesProcessorService.processIGDb(job); + case 'importFBToDb': return this.importNotesProcessorService.processFBDb(job); + case 'importMastoToDb': return this.importNotesProcessorService.processMastoToDb(job); + case 'importPleroToDb': return this.importNotesProcessorService.processPleroToDb(job); + case 'importKeyNotesToDb': return this.importNotesProcessorService.processKeyNotesToDb(job); case 'deleteAccount': return this.deleteAccountProcessorService.process(job); default: throw new Error(`unrecognized job type ${job.name} for db`); } @@ -330,7 +349,7 @@ export class QueueProcessorService implements OnApplicationShutdown { }); } }) - .on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) })) + .on('error', (err: Error) => logger.error('inbox error:', renderError(err))) .on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`)); } //#endregion @@ -517,6 +536,15 @@ export class QueueProcessorService implements OnApplicationShutdown { }); } //#endregion + + //#region schedule note post + { + this.schedulerNotePostQueueWorker = new Bull.Worker(QUEUE.SCHEDULE_NOTE_POST, (job) => this.scheduleNotePostProcessorService.process(job), { + ...baseQueueOptions(this.config, QUEUE.SCHEDULE_NOTE_POST), + autorun: false, + }); + } + //#endregion } @bindThis @@ -531,6 +559,7 @@ export class QueueProcessorService implements OnApplicationShutdown { this.relationshipQueueWorker.run(), this.objectStorageQueueWorker.run(), this.endedPollNotificationQueueWorker.run(), + this.schedulerNotePostQueueWorker.run(), ]); } @@ -546,6 +575,7 @@ export class QueueProcessorService implements OnApplicationShutdown { this.relationshipQueueWorker.close(), this.objectStorageQueueWorker.close(), this.endedPollNotificationQueueWorker.close(), + this.schedulerNotePostQueueWorker.close(), ]); } diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts index 7e146a7e03..17c6b81736 100644 --- a/packages/backend/src/queue/const.ts +++ b/packages/backend/src/queue/const.ts @@ -17,6 +17,7 @@ export const QUEUE = { OBJECT_STORAGE: 'objectStorage', USER_WEBHOOK_DELIVER: 'userWebhookDeliver', SYSTEM_WEBHOOK_DELIVER: 'systemWebhookDeliver', + SCHEDULE_NOTE_POST: 'scheduleNotePost', }; export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions { diff --git a/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts index c9fe4fca73..db8d2e789e 100644 --- a/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts +++ b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts @@ -38,23 +38,15 @@ export type ModeratorInactivityRemainingTime = { }; function generateModeratorInactivityMail(remainingTime: ModeratorInactivityRemainingTime) { - const subject = 'Moderator Inactivity Warning / モデレーター不在の通知'; + const subject = 'Moderator Inactivity Warning'; const timeVariant = remainingTime.asDays === 0 ? `${remainingTime.asHours} hours` : `${remainingTime.asDays} days`; const timeVariantJa = remainingTime.asDays === 0 ? `${remainingTime.asHours} 時間` : `${remainingTime.asDays} 日間`; const message = [ 'To Moderators,', '', - `A moderator has been inactive for a period of time. If there are ${timeVariant} of inactivity left, it will switch to invitation only.`, - 'If you do not wish to move to invitation only, you must log into Misskey and update your last active date and time.', - '', - '---------------', - '', - 'To モデレーター各位', - '', - `モデレーターが一定期間活動していないようです。あと${timeVariantJa}活動していない状態が続くと招待制に切り替わります。`, - '招待制に切り替わることを望まない場合は、Misskeyにログインして最終アクティブ日時を更新してください。', - '', + `No moderator has been active for a period of time. After further ${timeVariant} of inactivity, the instance will switch to invitation only.`, + 'If you do not wish that to happen, please log into Sharkey to update your last active date and time.', ]; const html = message.join('<br>'); @@ -68,21 +60,13 @@ function generateModeratorInactivityMail(remainingTime: ModeratorInactivityRemai } function generateInvitationOnlyChangedMail() { - const subject = 'Change to Invitation-Only / 招待制に変更されました'; + const subject = 'Switch to invitation only'; const message = [ 'To Moderators,', '', - `Changed to invitation only because no moderator activity was detected for ${MODERATOR_INACTIVITY_LIMIT_DAYS} days.`, - 'To cancel the invitation only, you need to access the control panel.', - '', - '---------------', - '', - 'To モデレーター各位', - '', - `モデレーターの活動が${MODERATOR_INACTIVITY_LIMIT_DAYS}日間検出されなかったため、招待制に変更されました。`, - '招待制を解除するには、コントロールパネルにアクセスする必要があります。', - '', + `The instance has been switched to invitation only, because no moderator activity was detected for ${MODERATOR_INACTIVITY_LIMIT_DAYS} days.`, + 'To change this, please log in and use the control panel.', ]; const html = message.join('<br>'); diff --git a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts index 728fc9e72b..81842b221f 100644 --- a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts +++ b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts @@ -33,6 +33,12 @@ export class CleanRemoteFilesProcessorService { let deletedCount = 0; let cursor: MiDriveFile['id'] | null = null; + let errorCount = 0; + + const total = await this.driveFilesRepository.countBy({ + userHost: Not(IsNull()), + isLink: false, + }); while (true) { const files = await this.driveFilesRepository.find({ @@ -41,7 +47,7 @@ export class CleanRemoteFilesProcessorService { isLink: false, ...(cursor ? { id: MoreThan(cursor) } : {}), }, - take: 8, + take: 256, order: { id: 1, }, @@ -54,18 +60,21 @@ export class CleanRemoteFilesProcessorService { cursor = files.at(-1)?.id ?? null; - await Promise.all(files.map(file => this.driveService.deleteFileSync(file, true))); - - deletedCount += 8; + // Handle deletion in a batch + const results = await Promise.allSettled(files.map(file => this.driveService.deleteFileSync(file, true))); - const total = await this.driveFilesRepository.countBy({ - userHost: Not(IsNull()), - isLink: false, + results.forEach((result, index) => { + if (result.status === 'fulfilled') { + deletedCount++; + } else { + this.logger.error(`Failed to delete file ID ${files[index].id}: ${result.reason}`); + errorCount++; + } }); - job.updateProgress(100 / total * deletedCount); + await job.updateProgress(100 / total * deletedCount); } - this.logger.succ('All cached remote files has been deleted.'); + this.logger.succ(`All cached remote files processed. Total deleted: ${deletedCount}, Failed: ${errorCount}.`); } } diff --git a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts index 14a53e0c42..46cee096cf 100644 --- a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts +++ b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts @@ -6,17 +6,21 @@ import { Inject, Injectable } from '@nestjs/common'; import { MoreThan } from 'typeorm'; import { DI } from '@/di-symbols.js'; -import type { DriveFilesRepository, NotesRepository, UserProfilesRepository, UsersRepository } from '@/models/_.js'; +import type { DriveFilesRepository, NoteReactionsRepository, NotesRepository, UserProfilesRepository, UsersRepository, NoteScheduleRepository, MiNoteSchedule } from '@/models/_.js'; import type Logger from '@/logger.js'; import { DriveService } from '@/core/DriveService.js'; import type { MiDriveFile } from '@/models/DriveFile.js'; import type { MiNote } from '@/models/Note.js'; +import type { MiNoteReaction } from '@/models/NoteReaction.js'; import { EmailService } from '@/core/EmailService.js'; import { bindThis } from '@/decorators.js'; import { SearchService } from '@/core/SearchService.js'; +import { ApLogService } from '@/core/ApLogService.js'; +import { ReactionService } from '@/core/ReactionService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbUserDeleteJobData } from '../types.js'; +import { QueueService } from '@/core/QueueService.js'; @Injectable() export class DeleteAccountProcessorService { @@ -35,10 +39,19 @@ export class DeleteAccountProcessorService { @Inject(DI.driveFilesRepository) private driveFilesRepository: DriveFilesRepository, + @Inject(DI.noteReactionsRepository) + private noteReactionsRepository: NoteReactionsRepository, + + @Inject(DI.noteScheduleRepository) + private noteScheduleRepository: NoteScheduleRepository, + + private queueService: QueueService, private driveService: DriveService, private emailService: EmailService, private queueLoggerService: QueueLoggerService, private searchService: SearchService, + private reactionService: ReactionService, + private readonly apLogService: ApLogService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('delete-account'); } @@ -52,6 +65,22 @@ export class DeleteAccountProcessorService { return; } + { // Delete scheduled notes + const scheduledNotes = await this.noteScheduleRepository.findBy({ + userId: user.id, + }) as MiNoteSchedule[]; + + for (const note of scheduledNotes) { + await this.queueService.ScheduleNotePostQueue.remove(`schedNote:${note.id}`); + } + + await this.noteScheduleRepository.delete({ + userId: user.id, + }); + + this.logger.succ('All scheduled notes deleted'); + } + { // Delete notes let cursor: MiNote['id'] | null = null; @@ -78,11 +107,49 @@ export class DeleteAccountProcessorService { for (const note of notes) { await this.searchService.unindexNote(note); } + + // Delete note AP logs + const noteUris = notes.map(n => n.uri).filter(u => !!u) as string[]; + if (noteUris.length > 0) { + await this.apLogService.deleteObjectLogs(noteUris) + .catch(err => this.logger.error(err, `Failed to delete AP logs for notes of user '${user.uri ?? user.id}'`)); + } } this.logger.succ('All of notes deleted'); } + { // Delete reactions + let cursor: MiNoteReaction['id'] | null = null; + + while (true) { + const reactions = await this.noteReactionsRepository.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}), + }, + take: 100, + order: { + id: 1, + }, + }) as MiNoteReaction[]; + + if (reactions.length === 0) { + break; + } + + cursor = reactions.at(-1)?.id ?? null; + + for (const reaction of reactions) { + const note = await this.notesRepository.findOneBy({ id: reaction.noteId }) as MiNote; + + await this.reactionService.delete(user, note); + } + } + + this.logger.succ('All reactions have been deleted'); + } + { // Delete files let cursor: MiDriveFile['id'] | null = null; @@ -112,6 +179,18 @@ export class DeleteAccountProcessorService { this.logger.succ('All of files deleted'); } + { // Delete actor logs + if (user.uri) { + await this.apLogService.deleteObjectLogs(user.uri) + .catch(err => this.logger.error(err, `Failed to delete AP logs for user '${user.uri}'`)); + } + + await this.apLogService.deleteInboxLogs(user.id) + .catch(err => this.logger.error(err, `Failed to delete AP logs for user '${user.uri}'`)); + + this.logger.succ('All AP logs deleted'); + } + { // Send email notification const profile = await this.userProfilesRepository.findOneByOrFail({ userId: user.id }); if (profile.email && profile.emailVerified) { diff --git a/packages/backend/src/queue/processors/ExportAccountDataProcessorService.ts b/packages/backend/src/queue/processors/ExportAccountDataProcessorService.ts new file mode 100644 index 0000000000..33a2362c4a --- /dev/null +++ b/packages/backend/src/queue/processors/ExportAccountDataProcessorService.ts @@ -0,0 +1,817 @@ +/* + * SPDX-FileCopyrightText: marie and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import * as fs from 'node:fs'; +import { Inject, Injectable } from '@nestjs/common'; +import { In, IsNull, MoreThan, Not } from 'typeorm'; +import { format as dateFormat } from 'date-fns'; +import mime from 'mime-types'; +import archiver from 'archiver'; +import { DI } from '@/di-symbols.js'; +import type { AntennasRepository, BlockingsRepository, DriveFilesRepository, FollowingsRepository, MiBlocking, MiFollowing, MiMuting, MiNote, MiNoteFavorite, MiPoll, MiUser, MutingsRepository, NoteFavoritesRepository, NotesRepository, PollsRepository, SigninsRepository, UserListMembershipsRepository, UserListsRepository, UserProfilesRepository, UsersRepository } from '@/models/_.js'; +import type { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { DriveService } from '@/core/DriveService.js'; +import { IdService } from '@/core/IdService.js'; +import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js'; +import { createTemp, createTempDir } from '@/misc/create-temp.js'; +import { bindThis } from '@/decorators.js'; +import { Packed } from '@/misc/json-schema.js'; +import { UtilityService } from '@/core/UtilityService.js'; +import { DownloadService } from '@/core/DownloadService.js'; +import { EmailService } from '@/core/EmailService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; + +@Injectable() +export class ExportAccountDataProcessorService { + private logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.userProfilesRepository) + private userProfilesRepository: UserProfilesRepository, + + @Inject(DI.notesRepository) + private notesRepository: NotesRepository, + + @Inject(DI.noteFavoritesRepository) + private noteFavoritesRepository: NoteFavoritesRepository, + + @Inject(DI.pollsRepository) + private pollsRepository: PollsRepository, + + @Inject(DI.followingsRepository) + private followingsRepository: FollowingsRepository, + + @Inject(DI.mutingsRepository) + private mutingsRepository: MutingsRepository, + + @Inject(DI.blockingsRepository) + private blockingsRepository: BlockingsRepository, + + @Inject(DI.driveFilesRepository) + private driveFilesRepository: DriveFilesRepository, + + @Inject(DI.antennasRepository) + private antennasRepository: AntennasRepository, + + @Inject(DI.userListsRepository) + private userListsRepository: UserListsRepository, + + @Inject(DI.userListMembershipsRepository) + private userListMembershipsRepository: UserListMembershipsRepository, + + @Inject(DI.signinsRepository) + private signinsRepository: SigninsRepository, + + private utilityService: UtilityService, + private driveService: DriveService, + private idService: IdService, + private driveFileEntityService: DriveFileEntityService, + private downloadService: DownloadService, + private emailService: EmailService, + private queueLoggerService: QueueLoggerService, + ) { + this.logger = this.queueLoggerService.logger.createSubLogger('export-account-data'); + } + + @bindThis + public async process(job: Bull.Job): Promise<void> { + this.logger.info('Exporting Account Data...'); + + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + return; + } + + const profile = await this.userProfilesRepository.findOneBy({ userId: job.data.user.id }); + if (profile == null) { + return; + } + + const [path, cleanup] = await createTempDir(); + + this.logger.info(`Temp dir is ${path}`); + + // User Export + + const userPath = path + '/user.json'; + + fs.writeFileSync(userPath, '', 'utf-8'); + + const userStream = fs.createWriteStream(userPath, { flags: 'a' }); + + const writeUser = (text: string): Promise<void> => { + return new Promise<void>((res, rej) => { + userStream.write(text, err => { + if (err) { + this.logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + }; + + await writeUser(`{"metaVersion":1,"host":"${this.config.host}","exportedAt":"${new Date().toString()}","user":[`); + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { host, uri, sharedInbox, followersUri, lastFetchedAt, inbox, ...userTrimmed } = user; + + await writeUser(JSON.stringify(userTrimmed)); + + await writeUser(']}'); + + userStream.end(); + + // Profile Export + + const profilePath = path + '/profile.json'; + + fs.writeFileSync(profilePath, '', 'utf-8'); + + const profileStream = fs.createWriteStream(profilePath, { flags: 'a' }); + + const writeProfile = (text: string): Promise<void> => { + return new Promise<void>((res, rej) => { + profileStream.write(text, err => { + if (err) { + this.logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + }; + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { emailVerifyCode, twoFactorBackupSecret, twoFactorSecret, password, twoFactorTempSecret, userHost, ...profileTrimmed } = profile; + + await writeProfile(`{"metaVersion":1,"host":"${this.config.host}","exportedAt":"${new Date().toString()}","profile":[`); + + await writeProfile(JSON.stringify(profileTrimmed)); + + await writeProfile(']}'); + + profileStream.end(); + + // Stored IPs export + + const signins = await this.signinsRepository.findBy({ userId: user.id }); + + const ipPath = path + '/ips.json'; + + fs.writeFileSync(ipPath, '', 'utf-8'); + + const ipStream = fs.createWriteStream(ipPath, { flags: 'a' }); + + const writeIPs = (text: string): Promise<void> => { + return new Promise<void>((res, rej) => { + ipStream.write(text, err => { + if (err) { + this.logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + }; + + await writeIPs(`{"metaVersion":1,"host":"${this.config.host}","exportedAt":"${new Date().toString()}","ips":[`); + + for (const signin of signins) { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { userId, id, user, ...signinTrimmed } = signin; + const isFirst = signins.indexOf(signin) === 0; + + await writeIPs(isFirst ? JSON.stringify(signinTrimmed) : ',\n' + JSON.stringify(signinTrimmed)); + } + + await writeIPs(']}'); + + ipStream.end(); + + // Note Export + + const notesPath = path + '/notes.json'; + + fs.writeFileSync(notesPath, '', 'utf-8'); + + const notesStream = fs.createWriteStream(notesPath, { flags: 'a' }); + + const writeNotes = (text: string): Promise<void> => { + return new Promise<void>((res, rej) => { + notesStream.write(text, err => { + if (err) { + this.logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + }; + + await writeNotes(`{"metaVersion":1,"host":"${this.config.host}","exportedAt":"${new Date().toString()}","notes":[`); + + let noteCursor: MiNote['id'] | null = null; + let exportedNotesCount = 0; + + while (true) { + const notes = await this.notesRepository.find({ + where: { + userId: user.id, + ...(noteCursor ? { id: MoreThan(noteCursor) } : {}), + }, + take: 100, + order: { + id: 1, + }, + }) as MiNote[]; + + if (notes.length === 0) { + break; + } + + noteCursor = notes.at(-1)?.id ?? null; + + for (const note of notes) { + let poll: MiPoll | undefined; + if (note.hasPoll) { + poll = await this.pollsRepository.findOneByOrFail({ noteId: note.id }); + } + const files = await this.driveFileEntityService.packManyByIds(note.fileIds); + const content = JSON.stringify(this.noteSerialize(note, poll, files)); + const isFirst = exportedNotesCount === 0; + await writeNotes(isFirst ? content : ',\n' + content); + exportedNotesCount++; + } + } + + await writeNotes(']}'); + + notesStream.end(); + + // Following Export + + const followingsPath = path + '/followings.json'; + + fs.writeFileSync(followingsPath, '', 'utf-8'); + + const followingStream = fs.createWriteStream(followingsPath, { flags: 'a' }); + + const writeFollowing = (text: string): Promise<void> => { + return new Promise<void>((res, rej) => { + followingStream.write(text, err => { + if (err) { + this.logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + }; + + await writeFollowing(`{"metaVersion":1,"host":"${this.config.host}","exportedAt":"${new Date().toString()}","followings":[`); + + let followingsCursor: MiFollowing['id'] | null = null; + let exportedFollowingsCount = 0; + + const mutings = await this.mutingsRepository.findBy({ + muterId: user.id, + }); + + while (true) { + const followings = await this.followingsRepository.find({ + where: { + followerId: user.id, + ...(mutings.length > 0 ? { followeeId: Not(In(mutings.map(x => x.muteeId))) } : {}), + ...(followingsCursor ? { id: MoreThan(followingsCursor) } : {}), + }, + take: 100, + order: { + id: 1, + }, + }) as MiFollowing[]; + + if (followings.length === 0) { + break; + } + + followingsCursor = followings.at(-1)?.id ?? null; + + for (const following of followings) { + const u = await this.usersRepository.findOneBy({ id: following.followeeId }); + if (u == null) { + continue; + } + + if (u.updatedAt && (Date.now() - u.updatedAt.getTime() > 1000 * 60 * 60 * 24 * 90)) { + continue; + } + + const isFirst = exportedFollowingsCount === 0; + const content = this.utilityService.getFullApAccount(u.username, u.host); + await writeFollowing(isFirst ? `"${content}"` : ',\n' + `"${content}"`); + exportedFollowingsCount++; + } + } + + await writeFollowing(']}'); + + followingStream.end(); + + // Followers Export + + const followersPath = path + '/followers.json'; + + fs.writeFileSync(followersPath, '', 'utf-8'); + + const followerStream = fs.createWriteStream(followersPath, { flags: 'a' }); + + const writeFollowers = (text: string): Promise<void> => { + return new Promise<void>((res, rej) => { + followerStream.write(text, err => { + if (err) { + this.logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + }; + + await writeFollowers(`{"metaVersion":1,"host":"${this.config.host}","exportedAt":"${new Date().toString()}","followers":[`); + + let followersCursor: MiFollowing['id'] | null = null; + let exportedFollowersCount = 0; + + while (true) { + const followers = await this.followingsRepository.find({ + where: { + followeeId: user.id, + ...(followersCursor ? { id: MoreThan(followersCursor) } : {}), + }, + take: 100, + order: { + id: 1, + }, + }) as MiFollowing[]; + + if (followers.length === 0) { + break; + } + + followersCursor = followers.at(-1)?.id ?? null; + + for (const follower of followers) { + const u = await this.usersRepository.findOneBy({ id: follower.followerId }); + if (u == null) { + continue; + } + + const isFirst = exportedFollowersCount === 0; + const content = this.utilityService.getFullApAccount(u.username, u.host); + await writeFollowers(isFirst ? `"${content}"` : ',\n' + `"${content}"`); + exportedFollowersCount++; + } + } + + await writeFollowers(']}'); + + followerStream.end(); + + // Drive Export + + const filesPath = path + '/drive.json'; + + fs.writeFileSync(filesPath, '', 'utf-8'); + + const filesStream = fs.createWriteStream(filesPath, { flags: 'a' }); + + const writeDrive = (text: string): Promise<void> => { + return new Promise<void>((res, rej) => { + filesStream.write(text, err => { + if (err) { + this.logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + }; + + fs.mkdirSync(`${path}/files`); + + await writeDrive(`{"metaVersion":1,"host":"${this.config.host}","exportedAt":"${new Date().toString()}","drive":[`); + + const driveFiles = await this.driveFilesRepository.find({ where: { userId: user.id } }); + + for (const file of driveFiles) { + const ext = mime.extension(file.type); + const fileName = file.name + '.' + ext; + const filePath = path + '/files/' + fileName; + fs.writeFileSync(filePath, '', 'binary'); + let downloaded = false; + + try { + await this.downloadService.downloadUrl(file.url, filePath); + downloaded = true; + } catch (e) { + this.logger.error(e instanceof Error ? e : new Error(e as string)); + } + + if (!downloaded) { + fs.unlinkSync(filePath); + } + + const content = JSON.stringify({ + fileName: fileName, + file: file, + }); + const isFirst = driveFiles.indexOf(file) === 0; + + await writeDrive(isFirst ? content : ',\n' + content); + } + + await writeDrive(']}'); + + filesStream.end(); + + // Muting Export + + const mutingPath = path + '/mutings.json'; + + fs.writeFileSync(mutingPath, '', 'utf-8'); + + const mutingStream = fs.createWriteStream(mutingPath, { flags: 'a' }); + + const writeMuting = (text: string): Promise<void> => { + return new Promise<void>((res, rej) => { + mutingStream.write(text, err => { + if (err) { + this.logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + }; + + await writeMuting(`{"metaVersion":1,"host":"${this.config.host}","exportedAt":"${new Date().toString()}","mutings":[`); + + let exportedMutingCount = 0; + let mutingCursor: MiMuting['id'] | null = null; + + while (true) { + const mutes = await this.mutingsRepository.find({ + where: { + muterId: user.id, + expiresAt: IsNull(), + ...(mutingCursor ? { id: MoreThan(mutingCursor) } : {}), + }, + take: 100, + order: { + id: 1, + }, + }); + + if (mutes.length === 0) { + break; + } + + mutingCursor = mutes.at(-1)?.id ?? null; + + for (const mute of mutes) { + const u = await this.usersRepository.findOneBy({ id: mute.muteeId }); + + if (u == null) { + exportedMutingCount++; continue; + } + + const content = this.utilityService.getFullApAccount(u.username, u.host); + const isFirst = exportedMutingCount === 0; + await writeMuting(isFirst ? `"${content}"` : ',\n' + `"${content}"`); + exportedMutingCount++; + } + } + + await writeMuting(']}'); + + mutingStream.end(); + + // Blockings Export + + const blockingPath = path + '/blockings.json'; + + fs.writeFileSync(blockingPath, '', 'utf-8'); + + const blockingStream = fs.createWriteStream(blockingPath, { flags: 'a' }); + + const writeBlocking = (text: string): Promise<void> => { + return new Promise<void>((res, rej) => { + blockingStream.write(text, err => { + if (err) { + this.logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + }; + + await writeBlocking(`{"metaVersion":1,"host":"${this.config.host}","exportedAt":"${new Date().toString()}","blockings":[`); + + let exportedBlockingCount = 0; + let blockingCursor: MiBlocking['id'] | null = null; + + while (true) { + const blockings = await this.blockingsRepository.find({ + where: { + blockerId: user.id, + ...(blockingCursor ? { id: MoreThan(blockingCursor) } : {}), + }, + take: 100, + order: { + id: 1, + }, + }); + + if (blockings.length === 0) { + break; + } + + blockingCursor = blockings.at(-1)?.id ?? null; + + for (const block of blockings) { + const u = await this.usersRepository.findOneBy({ id: block.blockeeId }); + + if (u == null) { + exportedBlockingCount++; continue; + } + + const content = this.utilityService.getFullApAccount(u.username, u.host); + const isFirst = exportedBlockingCount === 0; + await writeBlocking(isFirst ? `"${content}"` : ',\n' + `"${content}"`); + exportedBlockingCount++; + } + } + + await writeBlocking(']}'); + + blockingStream.end(); + + // Favorites export + + const favoritePath = path + '/favorites.json'; + + fs.writeFileSync(favoritePath, '', 'utf-8'); + + const favoriteStream = fs.createWriteStream(favoritePath, { flags: 'a' }); + + const writeFavorite = (text: string): Promise<void> => { + return new Promise<void>((res, rej) => { + favoriteStream.write(text, err => { + if (err) { + this.logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + }; + + await writeFavorite(`{"metaVersion":1,"host":"${this.config.host}","exportedAt":"${new Date().toString()}","favorites":[`); + + let exportedFavoritesCount = 0; + let favoriteCursor: MiNoteFavorite['id'] | null = null; + + while (true) { + const favorites = await this.noteFavoritesRepository.find({ + where: { + userId: user.id, + ...(favoriteCursor ? { id: MoreThan(favoriteCursor) } : {}), + }, + take: 100, + order: { + id: 1, + }, + relations: ['note', 'note.user'], + }) as (MiNoteFavorite & { note: MiNote & { user: MiUser } })[]; + + if (favorites.length === 0) { + break; + } + + favoriteCursor = favorites.at(-1)?.id ?? null; + + for (const favorite of favorites) { + let poll: MiPoll | undefined; + if (favorite.note.hasPoll) { + poll = await this.pollsRepository.findOneByOrFail({ noteId: favorite.note.id }); + } + const content = JSON.stringify(this.favoriteSerialize(favorite, poll)); + const isFirst = exportedFavoritesCount === 0; + await writeFavorite(isFirst ? content : ',\n' + content); + exportedFavoritesCount++; + } + } + + await writeFavorite(']}'); + + favoriteStream.end(); + + // Antennas export + + const antennaPath = path + '/antennas.json'; + + fs.writeFileSync(antennaPath, '', 'utf-8'); + + const antennaStream = fs.createWriteStream(antennaPath, { flags: 'a' }); + + const writeAntenna = (text: string): Promise<void> => { + return new Promise<void>((res, rej) => { + antennaStream.write(text, err => { + if (err) { + this.logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + }; + + await writeAntenna(`{"metaVersion":1,"host":"${this.config.host}","exportedAt":"${new Date().toString()}","antennas":[`); + + const antennas = await this.antennasRepository.findBy({ userId: user.id }); + + for (const [index, antenna] of antennas.entries()) { + let users: MiUser[] | undefined; + if (antenna.userListId !== null) { + const memberships = await this.userListMembershipsRepository.findBy({ userListId: antenna.userListId }); + users = await this.usersRepository.findBy({ + id: In(memberships.map(j => j.userId)), + }); + } + + await writeAntenna(JSON.stringify({ + name: antenna.name, + src: antenna.src, + keywords: antenna.keywords, + excludeKeywords: antenna.excludeKeywords, + users: antenna.users, + userListAccts: typeof users !== 'undefined' ? users.map((u) => { + return this.utilityService.getFullApAccount(u.username, u.host); // acct + }) : null, + caseSensitive: antenna.caseSensitive, + localOnly: antenna.localOnly, + withReplies: antenna.withReplies, + withFile: antenna.withFile, + })); + + if (antennas.length - 1 !== index) { + await writeAntenna(', '); + } + } + + await writeAntenna(']}'); + + antennaStream.end(); + + // Lists export + + const listPath = path + '/lists.csv'; + + fs.writeFileSync(listPath, '', 'utf-8'); + + const listStream = fs.createWriteStream(listPath, { flags: 'a' }); + + const writeList = (text: string): Promise<void> => { + return new Promise<void>((res, rej) => { + listStream.write(text, err => { + if (err) { + this.logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + }; + + const lists = await this.userListsRepository.findBy({ + userId: user.id, + }); + + for (const list of lists) { + const memberships = await this.userListMembershipsRepository.findBy({ userListId: list.id }); + const users = await this.usersRepository.findBy({ + id: In(memberships.map(j => j.userId)), + }); + + for (const u of users) { + const acct = this.utilityService.getFullApAccount(u.username, u.host); + const content = `${list.name},${acct}`; + await writeList(content + '\n'); + } + } + + listStream.end(); + + // Create archive + await new Promise<void>(async (resolve) => { + const [archivePath, archiveCleanup] = await createTemp(); + const archiveStream = fs.createWriteStream(archivePath); + const archive = archiver('zip', { + zlib: { level: 0 }, + }); + archiveStream.on('close', async () => { + this.logger.succ(`Exported to: ${archivePath}`); + + const fileName = 'data-request-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.zip'; + const driveFile = await this.driveService.addFile({ user, path: archivePath, name: fileName, force: true }); + + this.logger.succ(`Exported to: ${driveFile.id}`); + cleanup(); + archiveCleanup(); + if (profile.email) { + this.emailService.sendEmail(profile.email, + 'Your data archive is ready', + `Click the following link to download the archive: ${driveFile.url}<br/>It is also available in your drive.`, + `Click the following link to download the archive: ${driveFile.url}\r\n\r\nIt is also available in your drive.`, + ); + } + resolve(); + }); + archive.pipe(archiveStream); + archive.directory(path, false); + archive.finalize(); + }); + } + + private noteSerialize(note: MiNote, poll: MiPoll | null = null, files: Packed<'DriveFile'>[]): Record<string, unknown> { + return { + id: note.id, + text: note.text, + createdAt: this.idService.parse(note.id).date.toISOString(), + fileIds: note.fileIds, + files: files, + replyId: note.replyId, + renoteId: note.renoteId, + poll: poll, + cw: note.cw, + visibility: note.visibility, + visibleUserIds: note.visibleUserIds, + localOnly: note.localOnly, + reactionAcceptance: note.reactionAcceptance, + }; + } + + private favoriteSerialize(favorite: MiNoteFavorite & { note: MiNote & { user: MiUser } }, poll: MiPoll | null = null): Record<string, unknown> { + return { + id: favorite.id, + createdAt: this.idService.parse(favorite.id).date.toISOString(), + note: { + id: favorite.note.id, + text: favorite.note.text, + createdAt: this.idService.parse(favorite.note.id).date.toISOString(), + fileIds: favorite.note.fileIds, + replyId: favorite.note.replyId, + renoteId: favorite.note.renoteId, + poll: poll, + cw: favorite.note.cw, + visibility: favorite.note.visibility, + visibleUserIds: favorite.note.visibleUserIds, + localOnly: favorite.note.localOnly, + reactionAcceptance: favorite.note.reactionAcceptance, + uri: favorite.note.uri, + url: favorite.note.url, + user: { + id: favorite.note.user.id, + name: favorite.note.user.name, + username: favorite.note.user.username, + host: favorite.note.user.host, + uri: favorite.note.user.uri, + }, + }, + }; + } +} diff --git a/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts index e237cd4975..14d32e78b3 100644 --- a/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts @@ -87,7 +87,7 @@ export class ExportCustomEmojisProcessorService { }); for (const emoji of customEmojis) { - if (!/^[a-zA-Z0-9_]+$/.test(emoji.name)) { + if (!/^[\p{Letter}\p{Number}\p{Mark}_+-]+$/u.test(emoji.name)) { this.logger.error(`invalid emoji name: ${emoji.name}`); continue; } diff --git a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts index 95fe0a2c6a..d08cadd378 100644 --- a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts @@ -15,6 +15,7 @@ import { createTempDir } from '@/misc/create-temp.js'; import { DriveService } from '@/core/DriveService.js'; import { DownloadService } from '@/core/DownloadService.js'; import { bindThis } from '@/decorators.js'; +import type { Config } from '@/config.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbUserImportJobData } from '../types.js'; @@ -25,6 +26,9 @@ export class ImportCustomEmojisProcessorService { private logger: Logger; constructor( + @Inject(DI.config) + private config: Config, + @Inject(DI.driveFilesRepository) private driveFilesRepository: DriveFilesRepository, @@ -58,7 +62,7 @@ export class ImportCustomEmojisProcessorService { try { fs.writeFileSync(destPath, '', 'binary'); - await this.downloadService.downloadUrl(file.url, destPath); + await this.downloadService.downloadUrl(file.url, destPath, { operationTimeout: this.config.import?.downloadTimeout, maxSize: this.config.import?.maxFileSize }); } catch (e) { // TODO: 何度か再試行 if (e instanceof Error || typeof e === 'string') { this.logger.error(e); @@ -80,13 +84,14 @@ export class ImportCustomEmojisProcessorService { continue; } const emojiInfo = record.emoji; - if (!/^[a-zA-Z0-9_]+$/.test(emojiInfo.name)) { - this.logger.error(`invalid emojiname: ${emojiInfo.name}`); + const nameNfc = emojiInfo.name.normalize('NFC'); + if (!/^[\p{Letter}\p{Number}\p{Mark}_+-]+$/u.test(nameNfc)) { + this.logger.error(`invalid emojiname: ${nameNfc}`); continue; } const emojiPath = outputPath + '/' + record.fileName; await this.emojisRepository.delete({ - name: emojiInfo.name, + name: nameNfc, host: IsNull(), }); @@ -101,10 +106,10 @@ export class ImportCustomEmojisProcessorService { originalUrl: driveFile.url, publicUrl: driveFile.webpublicUrl ?? driveFile.url, fileType: driveFile.webpublicType ?? driveFile.type, - name: emojiInfo.name, - category: emojiInfo.category, + name: nameNfc, + category: emojiInfo.category?.normalize('NFC'), host: null, - aliases: emojiInfo.aliases, + aliases: emojiInfo.aliases?.map((a: string) => a.normalize('NFC')), license: emojiInfo.license, isSensitive: emojiInfo.isSensitive, localOnly: emojiInfo.localOnly, diff --git a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts new file mode 100644 index 0000000000..5e660e8081 --- /dev/null +++ b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts @@ -0,0 +1,722 @@ +/* + * SPDX-FileCopyrightText: marie and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import * as fs from 'node:fs'; +import * as fsp from 'node:fs/promises'; +import * as crypto from 'node:crypto'; +import { Inject, Injectable } from '@nestjs/common'; +import { ZipReader } from 'slacc'; +import { DI } from '@/di-symbols.js'; +import type { UsersRepository, DriveFilesRepository, MiDriveFile, MiNote, NotesRepository, MiUser, DriveFoldersRepository, MiDriveFolder } from '@/models/_.js'; +import type Logger from '@/logger.js'; +import { DownloadService } from '@/core/DownloadService.js'; +import { bindThis } from '@/decorators.js'; +import { QueueService } from '@/core/QueueService.js'; +import { createTemp, createTempDir } from '@/misc/create-temp.js'; +import { NoteCreateService } from '@/core/NoteCreateService.js'; +import { DriveService } from '@/core/DriveService.js'; +import { MfmService } from '@/core/MfmService.js'; +import { ApNoteService } from '@/core/activitypub/models/ApNoteService.js'; +import { extractApHashtagObjects } from '@/core/activitypub/models/tag.js'; +import { IdService } from '@/core/IdService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; +import type { DbNoteImportToDbJobData, DbNoteImportJobData, DbNoteWithParentImportToDbJobData } from '../types.js'; +import type { Config } from '@/config.js'; + +@Injectable() +export class ImportNotesProcessorService { + private logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.driveFilesRepository) + private driveFilesRepository: DriveFilesRepository, + + @Inject(DI.driveFoldersRepository) + private driveFoldersRepository: DriveFoldersRepository, + + @Inject(DI.notesRepository) + private notesRepository: NotesRepository, + + private queueService: QueueService, + private noteCreateService: NoteCreateService, + private mfmService: MfmService, + private apNoteService: ApNoteService, + private driveService: DriveService, + private downloadService: DownloadService, + private idService: IdService, + private queueLoggerService: QueueLoggerService, + ) { + this.logger = this.queueLoggerService.logger.createSubLogger('import-notes'); + } + + @bindThis + private async uploadFiles(dir: string, user: MiUser, folder?: MiDriveFolder['id']) { + const fileList = await fsp.readdir(dir); + for await (const file of fileList) { + const name = `${dir}/${file}`; + if (fs.statSync(name).isDirectory()) { + await this.uploadFiles(name, user, folder); + } else { + const exists = await this.driveFilesRepository.findOneBy({ name: file, userId: user.id, folderId: folder }); + + if (file.endsWith('.srt')) return; + + if (!exists) { + await this.driveService.addFile({ + user: user, + path: name, + name: file, + folderId: folder, + }); + } + } + } + } + + @bindThis + private downloadUrl(url: string, path:string): Promise<{ filename: string }> { + return this.downloadService.downloadUrl(url, path, { operationTimeout: this.config.import?.downloadTimeout, maxSize: this.config.import?.maxFileSize }); + } + + @bindThis + private async recreateChain(idFieldPath: string[], replyFieldPath: string[], arr: any[], includeOrphans: boolean): Promise<any[]> { + type NotesMap = { + [id: string]: any; + }; + const notesTree: any[] = []; + const noteById: NotesMap = {}; + const notesWaitingForParent: NotesMap = {}; + + for await (const note of arr) { + const noteId = idFieldPath.reduce( + (obj, step) => obj[step], + note, + ); + + noteById[noteId] = note; + note.childNotes = []; + + const children = notesWaitingForParent[noteId]; + if (children) { + note.childNotes.push(...children); + delete notesWaitingForParent[noteId]; + } + + const noteReplyId = replyFieldPath.reduce( + (obj, step) => obj[step], + note, + ); + if (noteReplyId == null) { + notesTree.push(note); + continue; + } + + const parent = noteById[noteReplyId]; + if (parent) { + parent.childNotes.push(note); + } else { + notesWaitingForParent[noteReplyId] ||= []; + notesWaitingForParent[noteReplyId].push(note); + } + } + + if (includeOrphans) { + notesTree.push(...Object.values(notesWaitingForParent).flat(1)); + } + + return notesTree; + } + + @bindThis + private isIterable(obj: any) { + if (obj == null) { + return false; + } + return typeof obj[Symbol.iterator] === 'function'; + } + + @bindThis + private parseTwitterFile(str : string) : { tweet: object }[] { + const jsonStr = str.replace(/^\s*window\.YTD\.tweets\.part0\s*=\s*/, ''); + + try { + return JSON.parse(jsonStr); + } catch (error) { + //The format is not what we expected. Either this file was tampered with or twitters exports changed + this.logger.warn('Failed to import twitter notes due to malformed file'); + throw error; + } + } + + @bindThis + public async process(job: Bull.Job<DbNoteImportJobData>): Promise<void> { + this.logger.info(`Starting note import of ${job.data.user.id} ...`); + + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + return; + } + + const file = await this.driveFilesRepository.findOneBy({ + id: job.data.fileId, + }); + if (file == null) { + return; + } + + let folder = await this.driveFoldersRepository.findOneBy({ name: 'Imports', userId: job.data.user.id }); + if (folder == null) { + await this.driveFoldersRepository.insert({ id: this.idService.gen(), name: 'Imports', userId: job.data.user.id }); + folder = await this.driveFoldersRepository.findOneBy({ name: 'Imports', userId: job.data.user.id }); + } + + const type = job.data.type; + + if (type === 'Twitter' || file.name.startsWith('twitter') && file.name.endsWith('.zip')) { + const [path, cleanup] = await createTempDir(); + + this.logger.info(`Temp dir is ${path}`); + + const destPath = path + '/twitter.zip'; + + try { + await fsp.writeFile(destPath, '', 'binary'); + await this.downloadUrl(file.url, destPath); + } catch (e) { // TODO: 何度か再試行 + if (e instanceof Error || typeof e === 'string') { + this.logger.error(e); + } + throw e; + } + + const outputPath = path + '/twitter'; + try { + this.logger.succ(`Unzipping to ${outputPath}`); + ZipReader.withDestinationPath(outputPath).viaBuffer(await fsp.readFile(destPath)); + + const unprocessedTweets = this.parseTwitterFile(await fsp.readFile(outputPath + '/data/tweets.js', 'utf-8')); + + const tweets = unprocessedTweets.map(e => e.tweet); + const processedTweets = await this.recreateChain(['id_str'], ['in_reply_to_status_id_str'], tweets, false); + this.queueService.createImportTweetsToDbJob(job.data.user, processedTweets, null); + } finally { + cleanup(); + } + } else if (type === 'Facebook' || file.name.startsWith('facebook-') && file.name.endsWith('.zip')) { + const [path, cleanup] = await createTempDir(); + + this.logger.info(`Temp dir is ${path}`); + + const destPath = path + '/facebook.zip'; + + try { + await fsp.writeFile(destPath, '', 'binary'); + await this.downloadUrl(file.url, destPath); + } catch (e) { // TODO: 何度か再試行 + if (e instanceof Error || typeof e === 'string') { + this.logger.error(e); + } + throw e; + } + + const outputPath = path + '/facebook'; + try { + this.logger.succ(`Unzipping to ${outputPath}`); + ZipReader.withDestinationPath(outputPath).viaBuffer(await fsp.readFile(destPath)); + const postsJson = await fsp.readFile(outputPath + '/your_activity_across_facebook/posts/your_posts__check_ins__photos_and_videos_1.json', 'utf-8'); + const posts = JSON.parse(postsJson); + const facebookFolder = await this.driveFoldersRepository.findOneBy({ name: 'Facebook', userId: job.data.user.id, parentId: folder?.id }); + if (facebookFolder == null && folder) { + await this.driveFoldersRepository.insert({ id: this.idService.gen(), name: 'Facebook', userId: job.data.user.id, parentId: folder.id }); + const createdFolder = await this.driveFoldersRepository.findOneBy({ name: 'Facebook', userId: job.data.user.id, parentId: folder.id }); + if (createdFolder) await this.uploadFiles(outputPath + '/your_activity_across_facebook/posts/media', user, createdFolder.id); + } + this.queueService.createImportFBToDbJob(job.data.user, posts); + } finally { + cleanup(); + } + } else if (file.name.endsWith('.zip')) { + const [path, cleanup] = await createTempDir(); + + this.logger.info(`Temp dir is ${path}`); + + const destPath = path + '/unknown.zip'; + + try { + await fsp.writeFile(destPath, '', 'binary'); + await this.downloadUrl(file.url, destPath); + } catch (e) { // TODO: 何度か再試行 + if (e instanceof Error || typeof e === 'string') { + this.logger.error(e); + } + throw e; + } + + const outputPath = path + '/unknown'; + try { + this.logger.succ(`Unzipping to ${outputPath}`); + ZipReader.withDestinationPath(outputPath).viaBuffer(await fsp.readFile(destPath)); + const isInstagram = type === 'Instagram' || fs.existsSync(outputPath + '/instagram_live') || fs.existsSync(outputPath + '/instagram_ads_and_businesses'); + const isOutbox = type === 'Mastodon' || fs.existsSync(outputPath + '/outbox.json'); + if (isInstagram) { + const postsJson = await fsp.readFile(outputPath + '/content/posts_1.json', 'utf-8'); + const posts = JSON.parse(postsJson); + const igFolder = await this.driveFoldersRepository.findOneBy({ name: 'Instagram', userId: job.data.user.id, parentId: folder?.id }); + if (igFolder == null && folder) { + await this.driveFoldersRepository.insert({ id: this.idService.gen(), name: 'Instagram', userId: job.data.user.id, parentId: folder.id }); + const createdFolder = await this.driveFoldersRepository.findOneBy({ name: 'Instagram', userId: job.data.user.id, parentId: folder.id }); + if (createdFolder) await this.uploadFiles(outputPath + '/media/posts', user, createdFolder.id); + } + this.queueService.createImportIGToDbJob(job.data.user, posts); + } else if (isOutbox) { + const actorJson = await fsp.readFile(outputPath + '/actor.json', 'utf-8'); + const actor = JSON.parse(actorJson); + const isPleroma = actor['@context'].some((v: any) => typeof v === 'string' && v.match(/litepub(.*)/)); + if (isPleroma) { + const outboxJson = await fsp.readFile(outputPath + '/outbox.json', 'utf-8'); + const outbox = JSON.parse(outboxJson); + const processedToots = await this.recreateChain(['object', 'id'], ['object', 'inReplyTo'], outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note'), true); + this.queueService.createImportPleroToDbJob(job.data.user, processedToots, null); + } else { + const outboxJson = await fsp.readFile(outputPath + '/outbox.json', 'utf-8'); + const outbox = JSON.parse(outboxJson); + let mastoFolder = await this.driveFoldersRepository.findOneBy({ name: 'Mastodon', userId: job.data.user.id, parentId: folder?.id }); + if (mastoFolder == null && folder) { + await this.driveFoldersRepository.insert({ id: this.idService.gen(), name: 'Mastodon', userId: job.data.user.id, parentId: folder.id }); + mastoFolder = await this.driveFoldersRepository.findOneBy({ name: 'Mastodon', userId: job.data.user.id, parentId: folder.id }); + } + if (fs.existsSync(outputPath + '/media_attachments/files') && mastoFolder) { + await this.uploadFiles(outputPath + '/media_attachments/files', user, mastoFolder.id); + } + const processedToots = await this.recreateChain(['object', 'id'], ['object', 'inReplyTo'], outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note'), true); + this.queueService.createImportMastoToDbJob(job.data.user, processedToots, null); + } + } + } finally { + cleanup(); + } + } else if (job.data.type === 'Misskey' || file.name.startsWith('notes-') && file.name.endsWith('.json')) { + const [path, cleanup] = await createTemp(); + + this.logger.info(`Temp dir is ${path}`); + + try { + await fsp.writeFile(path, '', 'utf-8'); + await this.downloadUrl(file.url, path); + } catch (e) { // TODO: 何度か再試行 + if (e instanceof Error || typeof e === 'string') { + this.logger.error(e); + } + throw e; + } + + const notesJson = await fsp.readFile(path, 'utf-8'); + const notes = JSON.parse(notesJson); + const processedNotes = await this.recreateChain(['id'], ['replyId'], notes, false); + this.queueService.createImportKeyNotesToDbJob(job.data.user, processedNotes, null); + cleanup(); + } + + this.logger.succ('Import jobs created'); + } + + @bindThis + public async processKeyNotesToDb(job: Bull.Job<DbNoteWithParentImportToDbJobData>): Promise<void> { + const note = job.data.target; + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + return; + } + + if (note.renoteId) return; + + const parentNote = job.data.note ? await this.notesRepository.findOneBy({ id: job.data.note }) : null; + + const folder = await this.driveFoldersRepository.findOneBy({ name: 'Imports', userId: job.data.user.id }); + if (folder == null) return; + + const files: MiDriveFile[] = []; + const date = new Date(note.createdAt); + + if (note.files && this.isIterable(note.files)) { + let keyFolder = await this.driveFoldersRepository.findOneBy({ name: 'Misskey', userId: job.data.user.id, parentId: folder.id }); + if (keyFolder == null) { + await this.driveFoldersRepository.insert({ id: this.idService.gen(), name: 'Misskey', userId: job.data.user.id, parentId: folder.id }); + keyFolder = await this.driveFoldersRepository.findOneBy({ name: 'Misskey', userId: job.data.user.id, parentId: folder.id }); + } + + for await (const file of note.files) { + const [filePath, cleanup] = await createTemp(); + const slashdex = file.url.lastIndexOf('/'); + const name = file.url.substring(slashdex + 1); + + const exists = await this.driveFilesRepository.findOneBy({ name: name, userId: user.id }) ?? await this.driveFilesRepository.findOneBy({ name: name, userId: user.id, folderId: keyFolder?.id }); + + if (!exists) { + try { + await this.downloadUrl(file.url, filePath); + } catch (e) { // TODO: 何度か再試行 + this.logger.error(e instanceof Error ? e : new Error(e as string)); + } + const driveFile = await this.driveService.addFile({ + user: user, + path: filePath, + name: name, + folderId: keyFolder?.id, + }); + files.push(driveFile); + } else { + files.push(exists); + } + + cleanup(); + } + } + + const createdNote = await this.noteCreateService.import(user, { createdAt: date, reply: parentNote, text: note.text, apMentions: new Array(0), visibility: note.visibility, localOnly: note.localOnly, files: files, cw: note.cw }); + if (note.childNotes) this.queueService.createImportKeyNotesToDbJob(user, note.childNotes, createdNote.id); + } + + @bindThis + public async processMastoToDb(job: Bull.Job<DbNoteWithParentImportToDbJobData>): Promise<void> { + const toot = job.data.target; + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + return; + } + + const followers = toot.to.some((str: string) => str.includes('/followers')); + + if (toot.directMessage || !toot.to.includes('https://www.w3.org/ns/activitystreams#Public') && !followers) return; + + const visibility = followers ? toot.cc.includes('https://www.w3.org/ns/activitystreams#Public') ? 'home' : 'followers' : 'public'; + + const date = new Date(toot.object.published); + let text = undefined; + const files: MiDriveFile[] = []; + let reply: MiNote | null = null; + + if (toot.object.inReplyTo != null) { + const parentNote = job.data.note ? await this.notesRepository.findOneBy({ id: job.data.note }) : null; + if (parentNote) { + reply = parentNote; + } else { + try { + reply = await this.apNoteService.resolveNote(toot.object.inReplyTo); + } catch (error) { + reply = null; + } + } + } + + const hashtags = extractApHashtagObjects(toot.object.tag).map((x) => x.name).filter((x): x is string => x != null); + + try { + text = await this.mfmService.fromHtml(toot.object.content, hashtags); + } catch (error) { + text = undefined; + } + + if (toot.object.attachment && this.isIterable(toot.object.attachment)) { + for await (const file of toot.object.attachment) { + const slashdex = file.url.lastIndexOf('/'); + const name = file.url.substring(slashdex + 1); + const exists = await this.driveFilesRepository.findOneBy({ name: name, userId: user.id }); + if (exists) { + if (file.name) { + this.driveService.updateFile(exists, { comment: file.name }, user); + } + + files.push(exists); + } + } + } + + const createdNote = await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, visibility: visibility, apMentions: new Array(0), cw: toot.object.sensitive ? toot.object.summary : null, reply: reply }); + if (toot.childNotes) this.queueService.createImportMastoToDbJob(user, toot.childNotes, createdNote.id); + } + + @bindThis + public async processPleroToDb(job: Bull.Job<DbNoteWithParentImportToDbJobData>): Promise<void> { + const post = job.data.target; + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + return; + } + + if (post.directMessage) return; + + const date = new Date(post.object.published); + let text = undefined; + const files: MiDriveFile[] = []; + let reply: MiNote | null = null; + + const folder = await this.driveFoldersRepository.findOneBy({ name: 'Imports', userId: job.data.user.id }); + if (folder == null) return; + + if (post.object.inReplyTo != null) { + const parentNote = job.data.note ? await this.notesRepository.findOneBy({ id: job.data.note }) : null; + if (parentNote) { + reply = parentNote; + } else { + try { + reply = await this.apNoteService.resolveNote(post.object.inReplyTo); + } catch (error) { + reply = null; + } + } + } + + const hashtags = extractApHashtagObjects(post.object.tag).map((x) => x.name).filter((x): x is string => x != null); + + try { + text = await this.mfmService.fromHtml(post.object.content, hashtags); + } catch (error) { + text = undefined; + } + + if (post.object.attachment && this.isIterable(post.object.attachment)) { + let pleroFolder = await this.driveFoldersRepository.findOneBy({ name: 'Pleroma', userId: job.data.user.id, parentId: folder.id }); + if (pleroFolder == null) { + await this.driveFoldersRepository.insert({ id: this.idService.gen(), name: 'Pleroma', userId: job.data.user.id, parentId: folder.id }); + pleroFolder = await this.driveFoldersRepository.findOneBy({ name: 'Pleroma', userId: job.data.user.id, parentId: folder.id }); + } + + for await (const file of post.object.attachment) { + const slashdex = file.url.lastIndexOf('/'); + const filename = file.url.substring(slashdex + 1); + const hash = crypto.createHash('md5').update(file.url).digest('base64url'); + const name = `${hash}-${filename}`; + const [filePath, cleanup] = await createTemp(); + + const exists = await this.driveFilesRepository.findOneBy({ name: name, userId: user.id }) ?? await this.driveFilesRepository.findOneBy({ name: name, userId: user.id, folderId: pleroFolder?.id }); + + if (!exists) { + try { + await this.downloadUrl(file.url, filePath); + } catch (e) { // TODO: 何度か再試行 + this.logger.error(e instanceof Error ? e : new Error(e as string)); + } + const driveFile = await this.driveService.addFile({ + user: user, + path: filePath, + name: name, + comment: file.name, + folderId: pleroFolder?.id, + }); + files.push(driveFile); + } else { + files.push(exists); + } + + cleanup(); + } + } + + const createdNote = await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, apMentions: new Array(0), cw: post.object.sensitive ? post.object.summary : null, reply: reply }); + if (post.childNotes) this.queueService.createImportPleroToDbJob(user, post.childNotes, createdNote.id); + } + + @bindThis + public async processIGDb(job: Bull.Job<DbNoteImportToDbJobData>): Promise<void> { + const post = job.data.target; + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + return; + } + + let date; + let title; + const files: MiDriveFile[] = []; + + function decodeIGString(str: string) { + const arr = []; + for (let i = 0; i < str.length; i++) { + arr.push(str.charCodeAt(i)); + } + return Buffer.from(arr).toString('utf8'); + } + + if (post.media && this.isIterable(post.media) && post.media.length > 1) { + date = new Date(post.creation_timestamp * 1000); + title = decodeIGString(post.title); + for await (const file of post.media) { + const slashdex = file.uri.lastIndexOf('/'); + const name = file.uri.substring(slashdex + 1); + const exists = await this.driveFilesRepository.findOneBy({ name: name, userId: user.id }) ?? await this.driveFilesRepository.findOneBy({ name: `${name}.jpg`, userId: user.id }) ?? await this.driveFilesRepository.findOneBy({ name: `${name}.mp4`, userId: user.id }); + if (exists) { + files.push(exists); + } + } + } else if (post.media && this.isIterable(post.media) && !(post.media.length > 1)) { + date = new Date(post.media[0].creation_timestamp * 1000); + title = decodeIGString(post.media[0].title); + const slashdex = post.media[0].uri.lastIndexOf('/'); + const name = post.media[0].uri.substring(slashdex + 1); + const exists = await this.driveFilesRepository.findOneBy({ name: name, userId: user.id }) ?? await this.driveFilesRepository.findOneBy({ name: `${name}.jpg`, userId: user.id }) ?? await this.driveFilesRepository.findOneBy({ name: `${name}.mp4`, userId: user.id }); + if (exists) { + files.push(exists); + } + } + + await this.noteCreateService.import(user, { createdAt: date, text: title, files: files }); + } + + @bindThis + public async processTwitterDb(job: Bull.Job<DbNoteWithParentImportToDbJobData>): Promise<void> { + const tweet = job.data.target; + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + return; + } + + const folder = await this.driveFoldersRepository.findOneBy({ name: 'Imports', userId: job.data.user.id }); + if (folder == null) return; + + const parentNote = job.data.note ? await this.notesRepository.findOneBy({ id: job.data.note }) : null; + + async function replaceTwitterUrls(full_text: string, urls: any) { + let full_textedit = full_text; + urls.forEach((url: any) => { + full_textedit = full_textedit.replaceAll(url.url, url.expanded_url); + }); + return full_textedit; + } + + async function replaceTwitterMentions(full_text: string, mentions: any) { + let full_textedit = full_text; + mentions.forEach((mention: any) => { + full_textedit = full_textedit.replaceAll(`@${mention.screen_name}`, `[@${mention.screen_name}](https://twitter.com/${mention.screen_name})`); + }); + return full_textedit; + } + + try { + const date = new Date(tweet.created_at); + const decodedText = tweet.full_text.replaceAll('>', '>').replaceAll('<', '<').replaceAll('&', '&'); + const textReplaceURLs = tweet.entities.urls && tweet.entities.urls.length > 0 ? await replaceTwitterUrls(decodedText, tweet.entities.urls) : decodedText; + const text = tweet.entities.user_mentions && tweet.entities.user_mentions.length > 0 ? await replaceTwitterMentions(textReplaceURLs, tweet.entities.user_mentions) : textReplaceURLs; + const files: MiDriveFile[] = []; + + if (tweet.extended_entities && this.isIterable(tweet.extended_entities.media)) { + let twitFolder = await this.driveFoldersRepository.findOneBy({ name: 'Twitter', userId: job.data.user.id, parentId: folder.id }); + if (twitFolder == null) { + await this.driveFoldersRepository.insert({ id: this.idService.gen(), name: 'Twitter', userId: job.data.user.id, parentId: folder.id }); + twitFolder = await this.driveFoldersRepository.findOneBy({ name: 'Twitter', userId: job.data.user.id, parentId: folder.id }); + } + + for await (const file of tweet.extended_entities.media) { + if (file.video_info) { + const [filePath, cleanup] = await createTemp(); + const slashdex = file.video_info.variants[0].url.lastIndexOf('/'); + const name = file.video_info.variants[0].url.substring(slashdex + 1); + + const exists = await this.driveFilesRepository.findOneBy({ name: name, userId: user.id }) ?? await this.driveFilesRepository.findOneBy({ name: name, userId: user.id, folderId: twitFolder?.id }); + + const videos = file.video_info.variants.filter((x: any) => x.content_type === 'video/mp4'); + + if (!exists) { + try { + await this.downloadUrl(videos[0].url, filePath); + } catch (e) { // TODO: 何度か再試行 + this.logger.error(e instanceof Error ? e : new Error(e as string)); + } + const driveFile = await this.driveService.addFile({ + user: user, + path: filePath, + name: name, + folderId: twitFolder?.id, + }); + files.push(driveFile); + } else { + files.push(exists); + } + + cleanup(); + } else if (file.media_url_https) { + const [filePath, cleanup] = await createTemp(); + const slashdex = file.media_url_https.lastIndexOf('/'); + const name = file.media_url_https.substring(slashdex + 1); + + const exists = await this.driveFilesRepository.findOneBy({ name: name, userId: user.id }); + + if (!exists) { + try { + await this.downloadUrl(file.media_url_https, filePath); + } catch (e) { // TODO: 何度か再試行 + this.logger.error(e instanceof Error ? e : new Error(e as string)); + } + + const driveFile = await this.driveService.addFile({ + user: user, + path: filePath, + name: name, + folderId: twitFolder?.id, + }); + files.push(driveFile); + } else { + files.push(exists); + } + cleanup(); + } + } + } + const createdNote = await this.noteCreateService.import(user, { createdAt: date, reply: parentNote, text: text, files: files }); + if (tweet.childNotes) this.queueService.createImportTweetsToDbJob(user, tweet.childNotes, createdNote.id); + } catch (e) { + this.logger.warn(`Error: ${e}`); + } + } + + @bindThis + public async processFBDb(job: Bull.Job<DbNoteImportToDbJobData>): Promise<void> { + const post = job.data.target; + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + return; + } + + if (!this.isIterable(post.data) || this.isIterable(post.data) && post.data[0].post === undefined) return; + + const date = new Date(post.timestamp * 1000); + const title = decodeFBString(post.data[0].post); + const files: MiDriveFile[] = []; + + function decodeFBString(str: string) { + const arr = []; + for (let i = 0; i < str.length; i++) { + arr.push(str.charCodeAt(i)); + } + return Buffer.from(arr).toString('utf8'); + } + + if (post.attachments && this.isIterable(post.attachments)) { + const media = []; + for await (const data of post.attachments[0].data) { + if (data.media) { + media.push(data.media); + } + } + + for await (const file of media) { + const slashdex = file.uri.lastIndexOf('/'); + const name = file.uri.substring(slashdex + 1); + const exists = await this.driveFilesRepository.findOneBy({ name: name, userId: user.id }); + if (exists) { + files.push(exists); + } + } + } + + await this.noteCreateService.import(user, { createdAt: date, text: title, files: files }); + } +} diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 079e014da8..9564724c62 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -25,10 +25,12 @@ import { JsonLdService } from '@/core/activitypub/JsonLdService.js'; import { ApInboxService } from '@/core/activitypub/ApInboxService.js'; import { bindThis } from '@/decorators.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; -import { CollapsedQueue } from '@/misc/collapsed-queue.js'; -import { MiNote } from '@/models/Note.js'; import { MiMeta } from '@/models/Meta.js'; import { DI } from '@/di-symbols.js'; +import { SkApInboxLog } from '@/models/_.js'; +import type { Config } from '@/config.js'; +import { ApLogService, calculateDurationSince } from '@/core/ApLogService.js'; +import { UpdateInstanceQueue } from '@/core/UpdateInstanceQueue.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { InboxJobData } from '../types.js'; @@ -40,12 +42,15 @@ type UpdateInstanceJob = { @Injectable() export class InboxProcessorService implements OnApplicationShutdown { private logger: Logger; - private updateInstanceQueue: CollapsedQueue<MiNote['id'], UpdateInstanceJob>; + //private updateInstanceQueue: CollapsedQueue<MiNote['id'], UpdateInstanceJob>; constructor( @Inject(DI.meta) private meta: MiMeta, + @Inject(DI.config) + private config: Config, + private utilityService: UtilityService, private apInboxService: ApInboxService, private federatedInstanceService: FederatedInstanceService, @@ -57,13 +62,49 @@ export class InboxProcessorService implements OnApplicationShutdown { private apRequestChart: ApRequestChart, private federationChart: FederationChart, private queueLoggerService: QueueLoggerService, + private readonly apLogService: ApLogService, + private readonly updateInstanceQueue: UpdateInstanceQueue, ) { this.logger = this.queueLoggerService.logger.createSubLogger('inbox'); - this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance); } @bindThis public async process(job: Bull.Job<InboxJobData>): Promise<string> { + if (this.config.activityLogging.enabled) { + return await this._processLogged(job); + } else { + return await this._process(job); + } + } + + private async _processLogged(job: Bull.Job<InboxJobData>): Promise<string> { + const startTime = process.hrtime.bigint(); + const activity = job.data.activity; + const keyId = job.data.signature.keyId; + const log = await this.apLogService.createInboxLog({ activity, keyId }); + + try { + const result = await this._process(job, log); + + log.accepted = result.startsWith('ok'); + log.result = result; + + return result; + } catch (err) { + log.accepted = false; + log.result = String(err); + + throw err; + } finally { + log.duration = calculateDurationSince(startTime); + + // Save or finalize asynchronously + this.apLogService.saveInboxLog(log) + .catch(err => this.logger.error('Failed to record AP activity:', err)); + } + } + + private async _process(job: Bull.Job<InboxJobData>, log?: SkApInboxLog): Promise<string> { const signature = job.data.signature; // HTTP-signature let activity = job.data.activity; @@ -116,10 +157,18 @@ export class InboxProcessorService implements OnApplicationShutdown { } // HTTP-Signatureの検証 - const httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem); + let httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem); + + // maybe they changed their key? refetch it + if (!httpSignatureValidated) { + authUser.key = await this.apDbResolverService.refetchPublicKeyForApId(authUser.user); + if (authUser.key != null) { + httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem); + } + } // また、signatureのsignerは、activity.actorと一致する必要がある - if (!httpSignatureValidated || authUser.user.uri !== activity.actor) { + if (!httpSignatureValidated || authUser.user.uri !== getApId(activity.actor)) { // 一致しなくても、でもLD-Signatureがありそうならそっちも見る const ldSignature = activity.signature; if (ldSignature) { @@ -163,12 +212,6 @@ export class InboxProcessorService implements OnApplicationShutdown { // https://github.com/mastodon/mastodon/blob/664b0ca/app/services/activitypub/process_collection_service.rb#L24-L29 activity.signature = ldSignature; - //#region Log - const compactedInfo = Object.assign({}, activity); - delete compactedInfo['@context']; - this.logger.debug(`compacted: ${JSON.stringify(compactedInfo, null, 2)}`); - //#endregion - // もう一度actorチェック if (authUser.user.uri !== activity.actor) { throw new Bull.UnrecoverableError(`skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`); @@ -188,10 +231,18 @@ export class InboxProcessorService implements OnApplicationShutdown { const signerHost = this.utilityService.extractDbHost(authUser.user.uri!); const activityIdHost = this.utilityService.extractDbHost(activity.id); if (signerHost !== activityIdHost) { - throw new Bull.UnrecoverableError(`skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`); + throw new Bull.UnrecoverableError(`skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost})`); } } else { - throw new Bull.UnrecoverableError('skip: activity id is not a string'); + // Activity ID should only be string or undefined. + delete activity.id; + } + + // Record verified user in log + if (log) { + log.verified = true; + log.authUser = authUser.user; + log.authUserId = authUser.user.id; } this.apRequestChart.inbox(); @@ -221,7 +272,11 @@ export class InboxProcessorService implements OnApplicationShutdown { try { const result = await this.apInboxService.performActivity(authUser.user, activity); if (result && !result.startsWith('ok')) { - this.logger.warn(`inbox activity ignored (maybe): id=${activity.id} reason=${result}`); + if (result.startsWith('skip:')) { + this.logger.info(`inbox activity ignored: id=${activity.id} reason=${result}`); + } else { + this.logger.warn(`inbox activity failed: id=${activity.id} reason=${result}`); + } return result; } } catch (e) { @@ -236,6 +291,19 @@ export class InboxProcessorService implements OnApplicationShutdown { return e.message; } } + + if (e instanceof StatusError && !e.isRetryable) { + return `skip: permanent error ${e.statusCode}`; + } + + if (e instanceof IdentifiableError && !e.isRetryable) { + if (e.message) { + return `skip: permanent error ${e.id}: ${e.message}`; + } else { + return `skip: permanent error ${e.id}`; + } + } + throw e; } return 'ok'; @@ -264,9 +332,7 @@ export class InboxProcessorService implements OnApplicationShutdown { } @bindThis - public async dispose(): Promise<void> { - await this.updateInstanceQueue.performAllNow(); - } + public async dispose(): Promise<void> {} @bindThis async onApplicationShutdown(signal?: string) { diff --git a/packages/backend/src/queue/processors/ScheduleNotePostProcessorService.ts b/packages/backend/src/queue/processors/ScheduleNotePostProcessorService.ts new file mode 100644 index 0000000000..d823d98ef1 --- /dev/null +++ b/packages/backend/src/queue/processors/ScheduleNotePostProcessorService.ts @@ -0,0 +1,144 @@ +/* + * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Inject, Injectable } from '@nestjs/common'; +import type Logger from '@/logger.js'; +import { bindThis } from '@/decorators.js'; +import { NoteCreateService } from '@/core/NoteCreateService.js'; +import type { ChannelsRepository, DriveFilesRepository, MiDriveFile, NoteScheduleRepository, NotesRepository, UsersRepository } from '@/models/_.js'; +import { DI } from '@/di-symbols.js'; +import { NotificationService } from '@/core/NotificationService.js'; +import { IdentifiableError } from '@/misc/identifiable-error.js'; +import type { MiScheduleNoteType } from '@/models/NoteSchedule.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; +import type { ScheduleNotePostJobData } from '../types.js'; + +@Injectable() +export class ScheduleNotePostProcessorService { + private logger: Logger; + + constructor( + @Inject(DI.noteScheduleRepository) + private noteScheduleRepository: NoteScheduleRepository, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + @Inject(DI.driveFilesRepository) + private driveFilesRepository: DriveFilesRepository, + @Inject(DI.notesRepository) + private notesRepository: NotesRepository, + @Inject(DI.channelsRepository) + private channelsRepository: ChannelsRepository, + + private noteCreateService: NoteCreateService, + private queueLoggerService: QueueLoggerService, + private notificationService: NotificationService, + ) { + this.logger = this.queueLoggerService.logger.createSubLogger('schedule-note-post'); + } + + @bindThis + private async isValidNoteSchedule(note: MiScheduleNoteType, id: string): Promise<boolean> { + const reply = note.reply ? await this.notesRepository.findOneBy({ id: note.reply }) : undefined; + const renote = note.renote ? await this.notesRepository.findOneBy({ id: note.renote }) : undefined; + const channel = note.channel ? await this.channelsRepository.findOneBy({ id: note.channel, isArchived: false }) : undefined; + if (note.reply && !reply) { + this.logger.warn('Schedule Note Failed Reason: parent note to reply does not exist'); + this.notificationService.createNotification(id, 'scheduledNoteFailed', { + reason: 'Replied to note on your scheduled note no longer exists', + }); + return false; + } + if (note.renote && !renote) { + this.logger.warn('Schedule Note Failed Reason: attached quote note no longer exists'); + this.notificationService.createNotification(id, 'scheduledNoteFailed', { + reason: 'A quoted note from one of your scheduled notes no longer exists', + }); + return false; + } + if (note.channel && !channel) { + this.logger.warn('Schedule Note Failed Reason: Channel does not exist'); + this.notificationService.createNotification(id, 'scheduledNoteFailed', { + reason: 'An attached channel on your scheduled note no longer exists', + }); + return false; + } + return true; + } + + @bindThis + public async process(job: Bull.Job<ScheduleNotePostJobData>): Promise<void> { + this.noteScheduleRepository.findOneBy({ id: job.data.scheduleNoteId }).then(async (data) => { + if (!data) { + this.logger.warn(`Schedule note ${job.data.scheduleNoteId} not found`); + } else { + const me = await this.usersRepository.findOneBy({ id: data.userId }); + const note = data.note; + const reply = note.reply ? await this.notesRepository.findOneBy({ id: note.reply }) : undefined; + const renote = note.renote ? await this.notesRepository.findOneBy({ id: note.renote }) : undefined; + const channel = note.channel ? await this.channelsRepository.findOneBy({ id: note.channel, isArchived: false }) : undefined; + + let files: MiDriveFile[] = []; + const fileIds = note.files; + + if (fileIds.length > 0 && me) { + files = await this.driveFilesRepository.createQueryBuilder('file') + .where('file.userId = :userId AND file.id IN (:...fileIds)', { + userId: me.id, + fileIds, + }) + .orderBy('array_position(ARRAY[:...fileIds], "id"::text)') + .setParameters({ fileIds }) + .getMany(); + } + + if (!data.userId || !me) { + this.logger.warn('Schedule Note Failed Reason: User Not Found'); + await this.noteScheduleRepository.remove(data); + return; + } + + if (!await this.isValidNoteSchedule(note, me.id)) { + await this.noteScheduleRepository.remove(data); + return; + } + + if (note.files.length !== files.length) { + this.logger.warn('Schedule Note Failed Reason: files are missing in the user\'s drive'); + this.notificationService.createNotification(me.id, 'scheduledNoteFailed', { + reason: 'Some attached files on your scheduled note no longer exist', + }); + await this.noteScheduleRepository.remove(data); + return; + } + + const createdNote = await this.noteCreateService.create(me, { + ...note, + createdAt: new Date(), + files, + poll: note.poll ? { + choices: note.poll.choices, + multiple: note.poll.multiple, + expiresAt: note.poll.expiresAt ? new Date(note.poll.expiresAt) : null, + } : undefined, + reply, + renote, + channel, + }).catch(async (err: IdentifiableError) => { + this.notificationService.createNotification(me.id, 'scheduledNoteFailed', { + reason: err.message, + }); + await this.noteScheduleRepository.remove(data); + throw this.logger.error(`Schedule Note Failed Reason: ${err.message}`); + }); + await this.noteScheduleRepository.remove(data); + this.notificationService.createNotification(me.id, 'scheduledNotePosted', { + noteId: createdNote.id, + }); + } + }); + } +} diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index 757daea88b..1bd9f7a0ab 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -44,6 +44,7 @@ export type DbJobData<T extends keyof DbJobMap> = DbJobMap[T]; export type DbJobMap = { deleteDriveFiles: DbJobDataWithUser; + exportAccountData: DbJobDataWithUser; exportCustomEmojis: DbJobDataWithUser; exportAntennas: DBExportAntennasData; exportNotes: DbJobDataWithUser; @@ -53,6 +54,13 @@ export type DbJobMap = { exportBlocking: DbJobDataWithUser; exportUserLists: DbJobDataWithUser; importAntennas: DBAntennaImportJobData; + importNotes: DbNoteImportJobData; + importTweetsToDb: DbNoteWithParentImportToDbJobData; + importIGToDb: DbNoteImportToDbJobData; + importFBToDb: DbNoteImportToDbJobData; + importMastoToDb: DbNoteWithParentImportToDbJobData; + importPleroToDb: DbNoteWithParentImportToDbJobData; + importKeyNotesToDb: DbNoteWithParentImportToDbJobData; importFollowing: DbUserImportJobData; importFollowingToDb: DbUserImportToDbJobData; importMuting: DbUserImportJobData; @@ -88,6 +96,12 @@ export type DbUserImportJobData = { withReplies?: boolean; }; +export type DbNoteImportJobData = { + user: ThinUser; + fileId: MiDriveFile['id']; + type?: string; +}; + export type DBAntennaImportJobData = { user: ThinUser, antenna: Antenna @@ -99,6 +113,17 @@ export type DbUserImportToDbJobData = { withReplies?: boolean; }; +export type DbNoteImportToDbJobData = { + user: ThinUser; + target: any; +}; + +export type DbNoteWithParentImportToDbJobData = { + user: ThinUser; + target: any; + note: MiNote['id'] | null; +}; + export type ObjectStorageJobData = ObjectStorageFileJobData | Record<string, unknown>; export type ObjectStorageFileJobData = { @@ -133,3 +158,7 @@ export type UserWebhookDeliverJobData<T extends WebhookEventTypes = WebhookEvent export type ThinUser = { id: MiUser['id']; }; + +export type ScheduleNotePostJobData = { + scheduleNoteId: MiNote['id']; +}; |