diff options
| author | dakkar <dakkar@thenautilus.net> | 2024-03-02 17:28:34 +0000 |
|---|---|---|
| committer | dakkar <dakkar@thenautilus.net> | 2024-03-02 17:28:34 +0000 |
| commit | 23f476dbf32ef9a2fc7d2ed7aab9ce706a2409d0 (patch) | |
| tree | 0b9e79c2f18f4a206811561fa255f2510f60c175 /packages/backend/src/queue/processors | |
| parent | merge: Add missing IMPORTANT_NOTES.md from Sharkey/OldJoinSharkey (!443) (diff) | |
| parent | merge: put back the readme (!447) (diff) | |
| download | sharkey-23f476dbf32ef9a2fc7d2ed7aab9ce706a2409d0.tar.gz sharkey-23f476dbf32ef9a2fc7d2ed7aab9ce706a2409d0.tar.bz2 sharkey-23f476dbf32ef9a2fc7d2ed7aab9ce706a2409d0.zip | |
Merge branch 'develop' into release/2024.3.1
Diffstat (limited to 'packages/backend/src/queue/processors')
31 files changed, 380 insertions, 143 deletions
diff --git a/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts index 9f49d85c7f..4769cccabf 100644 --- a/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts +++ b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts b/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts index 9b07389dc3..448fc9c763 100644 --- a/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts +++ b/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts index 55c444eee6..110468801c 100644 --- a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts +++ b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/CleanProcessorService.ts b/packages/backend/src/queue/processors/CleanProcessorService.ts index e252c5d8a1..a26b69cd2b 100644 --- a/packages/backend/src/queue/processors/CleanProcessorService.ts +++ b/packages/backend/src/queue/processors/CleanProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ @@ -11,6 +11,7 @@ import type Logger from '@/logger.js'; import { bindThis } from '@/decorators.js'; import { IdService } from '@/core/IdService.js'; import type { Config } from '@/config.js'; +import { ReversiService } from '@/core/ReversiService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; @@ -32,6 +33,7 @@ export class CleanProcessorService { private roleAssignmentsRepository: RoleAssignmentsRepository, private queueLoggerService: QueueLoggerService, + private reversiService: ReversiService, private idService: IdService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('clean'); @@ -65,6 +67,8 @@ export class CleanProcessorService { }); } + this.reversiService.cleanOutdatedGames(); + this.logger.succ('Cleaned.'); } } diff --git a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts index b62cc8a8fd..917de8b72c 100644 --- a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts +++ b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts index 56369f3a7a..0e604a0501 100644 --- a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts +++ b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts b/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts index 6d0a45bcc0..291fa4a6d8 100644 --- a/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts +++ b/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/DeleteFileProcessorService.ts b/packages/backend/src/queue/processors/DeleteFileProcessorService.ts index a4638bfaaf..fc1dd93ce7 100644 --- a/packages/backend/src/queue/processors/DeleteFileProcessorService.ts +++ b/packages/backend/src/queue/processors/DeleteFileProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts index 4a1d9f28b4..5fed070929 100644 --- a/packages/backend/src/queue/processors/DeliverProcessorService.ts +++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ @@ -72,7 +72,7 @@ export class DeliverProcessorService { } try { - await this.apRequestService.signedPost(job.data.user, job.data.to, job.data.content); + await this.apRequestService.signedPost(job.data.user, job.data.to, job.data.content, job.data.digest); // Update stats this.federatedInstanceService.fetch(host).then(i => { @@ -111,7 +111,7 @@ export class DeliverProcessorService { if (res instanceof StatusError) { // 4xx - if (res.isClientError) { + if (!res.isRetryable) { // 相手が閉鎖していることを明示しているため、配送停止する if (job.data.isSharedInbox && res.statusCode === 410) { this.federatedInstanceService.fetch(host).then(i => { diff --git a/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts b/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts index 4a48084436..29c1f27bb1 100644 --- a/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts +++ b/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts index d0968d2923..af48bad417 100644 --- a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts index 0a37e3ca1e..6ec3c18786 100644 --- a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/ExportClipsProcessorService.ts b/packages/backend/src/queue/processors/ExportClipsProcessorService.ts new file mode 100644 index 0000000000..01eab26e96 --- /dev/null +++ b/packages/backend/src/queue/processors/ExportClipsProcessorService.ts @@ -0,0 +1,206 @@ +/* + * SPDX-FileCopyrightText: syuilo and misskey-project + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import * as fs from 'node:fs'; +import { Writable } from 'node:stream'; +import { Inject, Injectable, StreamableFile } from '@nestjs/common'; +import { MoreThan } from 'typeorm'; +import { format as dateFormat } from 'date-fns'; +import { DI } from '@/di-symbols.js'; +import type { ClipNotesRepository, ClipsRepository, MiClip, MiClipNote, MiUser, NotesRepository, PollsRepository, UsersRepository } from '@/models/_.js'; +import type Logger from '@/logger.js'; +import { DriveService } from '@/core/DriveService.js'; +import { createTemp } from '@/misc/create-temp.js'; +import type { MiPoll } from '@/models/Poll.js'; +import type { MiNote } from '@/models/Note.js'; +import { bindThis } from '@/decorators.js'; +import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js'; +import { Packed } from '@/misc/json-schema.js'; +import { IdService } from '@/core/IdService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; +import type { DbJobDataWithUser } from '../types.js'; + +@Injectable() +export class ExportClipsProcessorService { + private logger: Logger; + + constructor( + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.pollsRepository) + private pollsRepository: PollsRepository, + + @Inject(DI.clipsRepository) + private clipsRepository: ClipsRepository, + + @Inject(DI.clipNotesRepository) + private clipNotesRepository: ClipNotesRepository, + + private driveService: DriveService, + private queueLoggerService: QueueLoggerService, + private idService: IdService, + ) { + this.logger = this.queueLoggerService.logger.createSubLogger('export-clips'); + } + + @bindThis + public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> { + this.logger.info(`Exporting clips of ${job.data.user.id} ...`); + + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + return; + } + + // Create temp file + const [path, cleanup] = await createTemp(); + + this.logger.info(`Temp file is ${path}`); + + try { + const stream = Writable.toWeb(fs.createWriteStream(path, { flags: 'a' })); + const writer = stream.getWriter(); + writer.closed.catch(this.logger.error); + + await writer.write('['); + + await this.processClips(writer, user, job); + + await writer.write(']'); + await writer.close(); + + this.logger.succ(`Exported to: ${path}`); + + const fileName = 'clips-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json'; + const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' }); + + this.logger.succ(`Exported to: ${driveFile.id}`); + } finally { + cleanup(); + } + } + + async processClips(writer: WritableStreamDefaultWriter, user: MiUser, job: Bull.Job<DbJobDataWithUser>) { + let exportedClipsCount = 0; + let cursor: MiClip['id'] | null = null; + + while (true) { + const clips = await this.clipsRepository.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}), + }, + take: 100, + order: { + id: 1, + }, + }); + + if (clips.length === 0) { + job.updateProgress(100); + break; + } + + cursor = clips.at(-1)?.id ?? null; + + for (const clip of clips) { + // Stringify but remove the last `]}` + const content = JSON.stringify(this.serializeClip(clip)).slice(0, -2); + const isFirst = exportedClipsCount === 0; + await writer.write(isFirst ? content : ',\n' + content); + + await this.processClipNotes(writer, clip.id); + + await writer.write(']}'); + exportedClipsCount++; + } + + const total = await this.clipsRepository.countBy({ + userId: user.id, + }); + + job.updateProgress(exportedClipsCount / total); + } + } + + async processClipNotes(writer: WritableStreamDefaultWriter, clipId: string): Promise<void> { + let exportedClipNotesCount = 0; + let cursor: MiClipNote['id'] | null = null; + + while (true) { + const clipNotes = await this.clipNotesRepository.find({ + where: { + clipId, + ...(cursor ? { id: MoreThan(cursor) } : {}), + }, + take: 100, + order: { + id: 1, + }, + relations: ['note', 'note.user'], + }) as (MiClipNote & { note: MiNote & { user: MiUser } })[]; + + if (clipNotes.length === 0) { + break; + } + + cursor = clipNotes.at(-1)?.id ?? null; + + for (const clipNote of clipNotes) { + let poll: MiPoll | undefined; + if (clipNote.note.hasPoll) { + poll = await this.pollsRepository.findOneByOrFail({ noteId: clipNote.note.id }); + } + const content = JSON.stringify(this.serializeClipNote(clipNote, poll)); + const isFirst = exportedClipNotesCount === 0; + await writer.write(isFirst ? content : ',\n' + content); + + exportedClipNotesCount++; + } + } + } + + private serializeClip(clip: MiClip): Record<string, unknown> { + return { + id: clip.id, + name: clip.name, + description: clip.description, + lastClippedAt: clip.lastClippedAt?.toISOString(), + clipNotes: [], + }; + } + + private serializeClipNote(clip: MiClipNote & { note: MiNote & { user: MiUser } }, poll: MiPoll | undefined): Record<string, unknown> { + return { + id: clip.id, + createdAt: this.idService.parse(clip.id).date.toISOString(), + note: { + id: clip.note.id, + text: clip.note.text, + createdAt: this.idService.parse(clip.note.id).date.toISOString(), + fileIds: clip.note.fileIds, + replyId: clip.note.replyId, + renoteId: clip.note.renoteId, + poll: poll, + cw: clip.note.cw, + visibility: clip.note.visibility, + visibleUserIds: clip.note.visibleUserIds, + localOnly: clip.note.localOnly, + reactionAcceptance: clip.note.reactionAcceptance, + uri: clip.note.uri, + url: clip.note.url, + user: { + id: clip.note.user.id, + name: clip.note.user.name, + username: clip.note.user.username, + host: clip.note.user.host, + uri: clip.note.user.uri, + }, + }, + }; + } +} diff --git a/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts index d5387fe42e..e4eb4791bd 100644 --- a/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts index af2a3434a9..7bb626dd31 100644 --- a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts index c9739eb1cb..1cc80e66d7 100644 --- a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts index c8425c1f2d..243b74f2c2 100644 --- a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts index cd4ccb0b07..c7611012d7 100644 --- a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts @@ -1,9 +1,9 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ -import * as fs from 'node:fs'; +import { ReadableStream, TextEncoderStream } from 'node:stream/web'; import { Inject, Injectable } from '@nestjs/common'; import { MoreThan } from 'typeorm'; import { format as dateFormat } from 'date-fns'; @@ -18,10 +18,82 @@ import { bindThis } from '@/decorators.js'; import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js'; import { Packed } from '@/misc/json-schema.js'; import { IdService } from '@/core/IdService.js'; +import { JsonArrayStream } from '@/misc/JsonArrayStream.js'; +import { FileWriterStream } from '@/misc/FileWriterStream.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbJobDataWithUser } from '../types.js'; +class NoteStream extends ReadableStream<Record<string, unknown>> { + constructor( + job: Bull.Job, + notesRepository: NotesRepository, + pollsRepository: PollsRepository, + driveFileEntityService: DriveFileEntityService, + idService: IdService, + userId: string, + ) { + let exportedNotesCount = 0; + let cursor: MiNote['id'] | null = null; + + const serialize = ( + note: MiNote, + poll: MiPoll | null, + files: Packed<'DriveFile'>[], + ): Record<string, unknown> => { + return { + id: note.id, + text: note.text, + createdAt: idService.parse(note.id).date.toISOString(), + fileIds: note.fileIds, + files: files, + replyId: note.replyId, + renoteId: note.renoteId, + poll: poll, + cw: note.cw, + visibility: note.visibility, + visibleUserIds: note.visibleUserIds, + localOnly: note.localOnly, + reactionAcceptance: note.reactionAcceptance, + }; + }; + + super({ + async pull(controller): Promise<void> { + const notes = await notesRepository.find({ + where: { + userId, + ...(cursor !== null ? { id: MoreThan(cursor) } : {}), + }, + take: 100, // 100件ずつ取得 + order: { id: 1 }, + }); + + if (notes.length === 0) { + job.updateProgress(100); + controller.close(); + } + + cursor = notes.at(-1)?.id ?? null; + + for (const note of notes) { + const poll = note.hasPoll + ? await pollsRepository.findOneByOrFail({ noteId: note.id }) // N+1 + : null; + const files = await driveFileEntityService.packManyByIds(note.fileIds); // N+1 + const content = serialize(note, poll, files); + + controller.enqueue(content); + exportedNotesCount++; + } + + const total = await notesRepository.countBy({ userId }); + job.updateProgress(exportedNotesCount / total); + }, + }); + } +} + @Injectable() export class ExportNotesProcessorService { private logger: Logger; @@ -59,67 +131,19 @@ export class ExportNotesProcessorService { this.logger.info(`Temp file is ${path}`); try { - const stream = fs.createWriteStream(path, { flags: 'a' }); - - const write = (text: string): Promise<void> => { - return new Promise<void>((res, rej) => { - stream.write(text, err => { - if (err) { - this.logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - }; - - await write('['); - - let exportedNotesCount = 0; - let cursor: MiNote['id'] | null = null; - - while (true) { - const notes = await this.notesRepository.find({ - where: { - userId: user.id, - ...(cursor ? { id: MoreThan(cursor) } : {}), - }, - take: 100, - order: { - id: 1, - }, - }) as MiNote[]; - - if (notes.length === 0) { - job.updateProgress(100); - break; - } - - cursor = notes.at(-1)?.id ?? null; - - for (const note of notes) { - let poll: MiPoll | undefined; - if (note.hasPoll) { - poll = await this.pollsRepository.findOneByOrFail({ noteId: note.id }); - } - const files = await this.driveFileEntityService.packManyByIds(note.fileIds); - const content = JSON.stringify(this.serialize(note, poll, files)); - const isFirst = exportedNotesCount === 0; - await write(isFirst ? content : ',\n' + content); - exportedNotesCount++; - } - - const total = await this.notesRepository.countBy({ - userId: user.id, - }); + // メモリが足りなくならないようにストリームで処理する + await new NoteStream( + job, + this.notesRepository, + this.pollsRepository, + this.driveFileEntityService, + this.idService, + user.id, + ) + .pipeThrough(new JsonArrayStream()) + .pipeThrough(new TextEncoderStream()) + .pipeTo(new FileWriterStream(path)); - job.updateProgress(exportedNotesCount / total); - } - - await write(']'); - - stream.end(); this.logger.succ(`Exported to: ${path}`); const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json'; @@ -130,22 +154,4 @@ export class ExportNotesProcessorService { cleanup(); } } - - private serialize(note: MiNote, poll: MiPoll | null = null, files: Packed<'DriveFile'>[]): Record<string, unknown> { - return { - id: note.id, - text: note.text, - createdAt: this.idService.parse(note.id).date.toISOString(), - fileIds: note.fileIds, - files: files, - replyId: note.replyId, - renoteId: note.renoteId, - poll: poll, - cw: note.cw, - visibility: note.visibility, - visibleUserIds: note.visibleUserIds, - localOnly: note.localOnly, - reactionAcceptance: note.reactionAcceptance, - }; - } } diff --git a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts index a3f9441dc2..ee87cff5d3 100644 --- a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts index 291ea14b67..951b560597 100644 --- a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts index 64520b770b..b78229c648 100644 --- a/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts index a52af54a39..171809d25c 100644 --- a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts index e75499a56f..70c9f3a096 100644 --- a/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/ImportMutingProcessorService.ts b/packages/backend/src/queue/processors/ImportMutingProcessorService.ts index 9db4e5d8e0..ec9d2b6c4c 100644 --- a/packages/backend/src/queue/processors/ImportMutingProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportMutingProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts index d64a861b03..10cd90c4ad 100644 --- a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts @@ -1,5 +1,5 @@ import * as fs from 'node:fs'; -import * as vm from 'node:vm'; +import * as fsp from 'node:fs/promises'; import * as crypto from 'node:crypto'; import { Inject, Injectable } from '@nestjs/common'; import { ZipReader } from 'slacc'; @@ -51,7 +51,7 @@ export class ImportNotesProcessorService { @bindThis private async uploadFiles(dir: string, user: MiUser, folder?: MiDriveFolder['id']) { - const fileList = fs.readdirSync(dir); + const fileList = await fsp.readdir(dir); for await (const file of fileList) { const name = `${dir}/${file}`; if (fs.statSync(name).isDirectory()) { @@ -132,7 +132,7 @@ export class ImportNotesProcessorService { private parseTwitterFile(str : string) : null | [{ tweet: any }] { const removed = str.replace(new RegExp('window\\.YTD\\.tweets\\.part0 = ', 'g'), ''); - + try { return JSON.parse(removed); } catch (error) { @@ -142,6 +142,19 @@ export class ImportNotesProcessorService { } @bindThis + private parseTwitterFile(str : string) : { tweet: object }[] { + const jsonStr = str.replace(/^\s*window\.YTD\.tweets\.part0\s*=\s*/, ''); + + try { + return JSON.parse(jsonStr); + } catch (error) { + //The format is not what we expected. Either this file was tampered with or twitters exports changed + this.logger.warn('Failed to import twitter notes due to malformed file'); + throw error; + } + } + + @bindThis public async process(job: Bull.Job<DbNoteImportJobData>): Promise<void> { this.logger.info(`Starting note import of ${job.data.user.id} ...`); @@ -173,7 +186,7 @@ export class ImportNotesProcessorService { const destPath = path + '/twitter.zip'; try { - fs.writeFileSync(destPath, '', 'binary'); + await fsp.writeFile(destPath, '', 'binary'); await this.downloadService.downloadUrl(file.url, destPath); } catch (e) { // TODO: 何度か再試行 if (e instanceof Error || typeof e === 'string') { @@ -185,21 +198,13 @@ export class ImportNotesProcessorService { const outputPath = path + '/twitter'; try { this.logger.succ(`Unzipping to ${outputPath}`); - ZipReader.withDestinationPath(outputPath).viaBuffer(await fs.promises.readFile(destPath)); + ZipReader.withDestinationPath(outputPath).viaBuffer(await fsp.readFile(destPath)); - const unprocessedTweetJson = this.parseTwitterFile(fs.readFileSync(outputPath + '/data/tweets.js', 'utf-8')); + const unprocessedTweets = this.parseTwitterFile(await fsp.readFile(outputPath + '/data/tweets.js', 'utf-8')); - //Make sure that it isnt null (because if something went wrong in parseTwitterFile it returns null) - if (unprocessedTweetJson) { - const tweets = Object.keys(unprocessedTweetJson).reduce((m, key, i, obj) => { - return m.concat(unprocessedTweetJson[i].tweet); - }, []); - - const processedTweets = await this.recreateChain(['id_str'], ['in_reply_to_status_id_str'], tweets, false); - this.queueService.createImportTweetsToDbJob(job.data.user, processedTweets, null); - } else { - this.logger.warn('Failed to import twitter notes due to malformed file'); - } + const tweets = unprocessedTweets.map(e => e.tweet); + const processedTweets = await this.recreateChain(['id_str'], ['in_reply_to_status_id_str'], tweets, false); + this.queueService.createImportTweetsToDbJob(job.data.user, processedTweets, null); } finally { cleanup(); } @@ -211,7 +216,7 @@ export class ImportNotesProcessorService { const destPath = path + '/facebook.zip'; try { - fs.writeFileSync(destPath, '', 'binary'); + await fsp.writeFile(destPath, '', 'binary'); await this.downloadService.downloadUrl(file.url, destPath); } catch (e) { // TODO: 何度か再試行 if (e instanceof Error || typeof e === 'string') { @@ -223,8 +228,8 @@ export class ImportNotesProcessorService { const outputPath = path + '/facebook'; try { this.logger.succ(`Unzipping to ${outputPath}`); - ZipReader.withDestinationPath(outputPath).viaBuffer(await fs.promises.readFile(destPath)); - const postsJson = fs.readFileSync(outputPath + '/your_activity_across_facebook/posts/your_posts__check_ins__photos_and_videos_1.json', 'utf-8'); + ZipReader.withDestinationPath(outputPath).viaBuffer(await fsp.readFile(destPath)); + const postsJson = await fsp.readFile(outputPath + '/your_activity_across_facebook/posts/your_posts__check_ins__photos_and_videos_1.json', 'utf-8'); const posts = JSON.parse(postsJson); const facebookFolder = await this.driveFoldersRepository.findOneBy({ name: 'Facebook', userId: job.data.user.id, parentId: folder?.id }); if (facebookFolder == null && folder) { @@ -244,7 +249,7 @@ export class ImportNotesProcessorService { const destPath = path + '/unknown.zip'; try { - fs.writeFileSync(destPath, '', 'binary'); + await fsp.writeFile(destPath, '', 'binary'); await this.downloadService.downloadUrl(file.url, destPath); } catch (e) { // TODO: 何度か再試行 if (e instanceof Error || typeof e === 'string') { @@ -256,11 +261,11 @@ export class ImportNotesProcessorService { const outputPath = path + '/unknown'; try { this.logger.succ(`Unzipping to ${outputPath}`); - ZipReader.withDestinationPath(outputPath).viaBuffer(await fs.promises.readFile(destPath)); + ZipReader.withDestinationPath(outputPath).viaBuffer(await fsp.readFile(destPath)); const isInstagram = type === 'Instagram' || fs.existsSync(outputPath + '/instagram_live') || fs.existsSync(outputPath + '/instagram_ads_and_businesses'); const isOutbox = type === 'Mastodon' || fs.existsSync(outputPath + '/outbox.json'); if (isInstagram) { - const postsJson = fs.readFileSync(outputPath + '/content/posts_1.json', 'utf-8'); + const postsJson = await fsp.readFile(outputPath + '/content/posts_1.json', 'utf-8'); const posts = JSON.parse(postsJson); const igFolder = await this.driveFoldersRepository.findOneBy({ name: 'Instagram', userId: job.data.user.id, parentId: folder?.id }); if (igFolder == null && folder) { @@ -270,16 +275,16 @@ export class ImportNotesProcessorService { } this.queueService.createImportIGToDbJob(job.data.user, posts); } else if (isOutbox) { - const actorJson = fs.readFileSync(outputPath + '/actor.json', 'utf-8'); + const actorJson = await fsp.readFile(outputPath + '/actor.json', 'utf-8'); const actor = JSON.parse(actorJson); const isPleroma = actor['@context'].some((v: any) => typeof v === 'string' && v.match(/litepub(.*)/)); if (isPleroma) { - const outboxJson = fs.readFileSync(outputPath + '/outbox.json', 'utf-8'); + const outboxJson = await fsp.readFile(outputPath + '/outbox.json', 'utf-8'); const outbox = JSON.parse(outboxJson); const processedToots = await this.recreateChain(['object', 'id'], ['object', 'inReplyTo'], outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note'), true); this.queueService.createImportPleroToDbJob(job.data.user, processedToots, null); } else { - const outboxJson = fs.readFileSync(outputPath + '/outbox.json', 'utf-8'); + const outboxJson = await fsp.readFile(outputPath + '/outbox.json', 'utf-8'); const outbox = JSON.parse(outboxJson); let mastoFolder = await this.driveFoldersRepository.findOneBy({ name: 'Mastodon', userId: job.data.user.id, parentId: folder?.id }); if (mastoFolder == null && folder) { @@ -302,7 +307,7 @@ export class ImportNotesProcessorService { this.logger.info(`Temp dir is ${path}`); try { - fs.writeFileSync(path, '', 'utf-8'); + await fsp.writeFile(path, '', 'utf-8'); await this.downloadService.downloadUrl(file.url, path); } catch (e) { // TODO: 何度か再試行 if (e instanceof Error || typeof e === 'string') { @@ -311,7 +316,7 @@ export class ImportNotesProcessorService { throw e; } - const notesJson = fs.readFileSync(path, 'utf-8'); + const notesJson = await fsp.readFile(path, 'utf-8'); const notes = JSON.parse(notesJson); const processedNotes = await this.recreateChain(['id'], ['replyId'], notes, false); this.queueService.createImportKeyNotesToDbJob(job.data.user, processedNotes, null); @@ -424,6 +429,10 @@ export class ImportNotesProcessorService { const name = file.url.substring(slashdex + 1); const exists = await this.driveFilesRepository.findOneBy({ name: name, userId: user.id }); if (exists) { + if (file.name) { + this.driveService.updateFile(exists, { comment: file.name }, user); + } + files.push(exists); } } @@ -583,14 +592,15 @@ export class ImportNotesProcessorService { async function replaceTwitterMentions(full_text: string, mentions: any) { let full_textedit = full_text; mentions.forEach((mention: any) => { - full_textedit = full_textedit.replaceAll(`@${mention.screen_name}`, `[@${mention.screen_name}](https://nitter.net/${mention.screen_name})`); + full_textedit = full_textedit.replaceAll(`@${mention.screen_name}`, `[@${mention.screen_name}](https://twitter.com/${mention.screen_name})`); }); return full_textedit; } try { const date = new Date(tweet.created_at); - const textReplaceURLs = tweet.entities.urls && tweet.entities.urls.length > 0 ? await replaceTwitterUrls(tweet.full_text, tweet.entities.urls) : tweet.full_text; + const decodedText = tweet.full_text.replaceAll('>', '>').replaceAll('<', '<').replaceAll('&', '&'); + const textReplaceURLs = tweet.entities.urls && tweet.entities.urls.length > 0 ? await replaceTwitterUrls(decodedText, tweet.entities.urls) : decodedText; const text = tweet.entities.user_mentions && tweet.entities.user_mentions.length > 0 ? await replaceTwitterMentions(textReplaceURLs, tweet.entities.user_mentions) : textReplaceURLs; const files: MiDriveFile[] = []; diff --git a/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts index 5dd3fbe887..a5992c28c8 100644 --- a/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index f69634968d..ad1d9799a7 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ @@ -24,6 +24,7 @@ import { ApPersonService } from '@/core/activitypub/models/ApPersonService.js'; import { LdSignatureService } from '@/core/activitypub/LdSignatureService.js'; import { ApInboxService } from '@/core/activitypub/ApInboxService.js'; import { bindThis } from '@/decorators.js'; +import { IdentifiableError } from '@/misc/identifiable-error.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { InboxJobData } from '../types.js'; @@ -85,7 +86,7 @@ export class InboxProcessorService { } catch (err) { // 対象が4xxならスキップ if (err instanceof StatusError) { - if (err.isClientError) { + if (!err.isRetryable) { throw new Bull.UnrecoverableError(`skip: Ignored deleted actors on both ends ${activity.actor} - ${err.statusCode}`); } throw new Error(`Error in actor ${activity.actor} - ${err.statusCode}`); @@ -191,7 +192,17 @@ export class InboxProcessorService { }); // アクティビティを処理 - await this.apInboxService.performActivity(authUser.user, activity); + try { + await this.apInboxService.performActivity(authUser.user, activity); + } catch (e) { + if (e instanceof IdentifiableError) { + if (e.id === '689ee33f-f97c-479a-ac49-1b9f8140af99') { + return 'blocked notes with prohibited words'; + } + if (e.id === '85ab9bd7-3a41-4530-959d-f07073900109') return 'actor has been suspended'; + } + throw e; + } return 'ok'; } } diff --git a/packages/backend/src/queue/processors/RelationshipProcessorService.ts b/packages/backend/src/queue/processors/RelationshipProcessorService.ts index b2d8e3631f..408b02fb38 100644 --- a/packages/backend/src/queue/processors/RelationshipProcessorService.ts +++ b/packages/backend/src/queue/processors/RelationshipProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts index b3b055ef8c..570cdf9a75 100644 --- a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts +++ b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/TickChartsProcessorService.ts b/packages/backend/src/queue/processors/TickChartsProcessorService.ts index 7b1efb71e0..93ec34162d 100644 --- a/packages/backend/src/queue/processors/TickChartsProcessorService.ts +++ b/packages/backend/src/queue/processors/TickChartsProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ diff --git a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts index a41f5565c8..8c260c0137 100644 --- a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts +++ b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ @@ -71,7 +71,7 @@ export class WebhookDeliverProcessorService { if (res instanceof StatusError) { // 4xx - if (res.isClientError) { + if (!res.isRetryable) { throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`); } |