diff options
Diffstat (limited to 'packages/backend/src/queue')
7 files changed, 218 insertions, 5 deletions
diff --git a/packages/backend/src/queue/QueueProcessorModule.ts b/packages/backend/src/queue/QueueProcessorModule.ts index 29dc78605b..d547a498a1 100644 --- a/packages/backend/src/queue/QueueProcessorModule.ts +++ b/packages/backend/src/queue/QueueProcessorModule.ts @@ -25,6 +25,7 @@ import { ExportCustomEmojisProcessorService } from './processors/ExportCustomEmo import { ExportFollowingProcessorService } from './processors/ExportFollowingProcessorService.js'; import { ExportMutingProcessorService } from './processors/ExportMutingProcessorService.js'; import { ExportNotesProcessorService } from './processors/ExportNotesProcessorService.js'; +import { ExportClipsProcessorService } from './processors/ExportClipsProcessorService.js'; import { ExportUserListsProcessorService } from './processors/ExportUserListsProcessorService.js'; import { ExportAntennasProcessorService } from './processors/ExportAntennasProcessorService.js'; import { ImportBlockingProcessorService } from './processors/ImportBlockingProcessorService.js'; @@ -56,6 +57,7 @@ import { RelationshipProcessorService } from './processors/RelationshipProcessor ExportAccountDataProcessorService, ExportCustomEmojisProcessorService, ExportNotesProcessorService, + ExportClipsProcessorService, ExportFavoritesProcessorService, ExportFollowingProcessorService, ExportMutingProcessorService, diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index ea3ecd4ded..cdca744b88 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -17,6 +17,7 @@ import { DeleteDriveFilesProcessorService } from './processors/DeleteDriveFilesP import { ExportAccountDataProcessorService } from './processors/ExportAccountDataProcessorService.js'; import { ExportCustomEmojisProcessorService } from './processors/ExportCustomEmojisProcessorService.js'; import { ExportNotesProcessorService } from './processors/ExportNotesProcessorService.js'; +import { ExportClipsProcessorService } from './processors/ExportClipsProcessorService.js'; import { ExportFollowingProcessorService } from './processors/ExportFollowingProcessorService.js'; import { ExportMutingProcessorService } from './processors/ExportMutingProcessorService.js'; import { ExportBlockingProcessorService } from './processors/ExportBlockingProcessorService.js'; @@ -94,6 +95,7 @@ export class QueueProcessorService implements OnApplicationShutdown { private exportAccountDataProcessorService: ExportAccountDataProcessorService, private exportCustomEmojisProcessorService: ExportCustomEmojisProcessorService, private exportNotesProcessorService: ExportNotesProcessorService, + private exportClipsProcessorService: ExportClipsProcessorService, private exportFavoritesProcessorService: ExportFavoritesProcessorService, private exportFollowingProcessorService: ExportFollowingProcessorService, private exportMutingProcessorService: ExportMutingProcessorService, @@ -169,6 +171,7 @@ export class QueueProcessorService implements OnApplicationShutdown { case 'exportAccountData': return this.exportAccountDataProcessorService.process(job); case 'exportCustomEmojis': return this.exportCustomEmojisProcessorService.process(job); case 'exportNotes': return this.exportNotesProcessorService.process(job); + case 'exportClips': return this.exportClipsProcessorService.process(job); case 'exportFavorites': return this.exportFavoritesProcessorService.process(job); case 'exportFollowing': return this.exportFollowingProcessorService.process(job); case 'exportMuting': return this.exportMutingProcessorService.process(job); diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts index 4a1d9f28b4..64c3445552 100644 --- a/packages/backend/src/queue/processors/DeliverProcessorService.ts +++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts @@ -72,7 +72,7 @@ export class DeliverProcessorService { } try { - await this.apRequestService.signedPost(job.data.user, job.data.to, job.data.content); + await this.apRequestService.signedPost(job.data.user, job.data.to, job.data.content, job.data.digest); // Update stats this.federatedInstanceService.fetch(host).then(i => { @@ -111,7 +111,7 @@ export class DeliverProcessorService { if (res instanceof StatusError) { // 4xx - if (res.isClientError) { + if (!res.isRetryable) { // 相手が閉鎖していることを明示しているため、配送停止する if (job.data.isSharedInbox && res.statusCode === 410) { this.federatedInstanceService.fetch(host).then(i => { diff --git a/packages/backend/src/queue/processors/ExportClipsProcessorService.ts b/packages/backend/src/queue/processors/ExportClipsProcessorService.ts new file mode 100644 index 0000000000..5221497bd3 --- /dev/null +++ b/packages/backend/src/queue/processors/ExportClipsProcessorService.ts @@ -0,0 +1,206 @@ +/* + * SPDX-FileCopyrightText: syuilo and other misskey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import * as fs from 'node:fs'; +import { Writable } from 'node:stream'; +import { Inject, Injectable, StreamableFile } from '@nestjs/common'; +import { MoreThan } from 'typeorm'; +import { format as dateFormat } from 'date-fns'; +import { DI } from '@/di-symbols.js'; +import type { ClipNotesRepository, ClipsRepository, MiClip, MiClipNote, MiUser, NotesRepository, PollsRepository, UsersRepository } from '@/models/_.js'; +import type Logger from '@/logger.js'; +import { DriveService } from '@/core/DriveService.js'; +import { createTemp } from '@/misc/create-temp.js'; +import type { MiPoll } from '@/models/Poll.js'; +import type { MiNote } from '@/models/Note.js'; +import { bindThis } from '@/decorators.js'; +import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js'; +import { Packed } from '@/misc/json-schema.js'; +import { IdService } from '@/core/IdService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; +import type { DbJobDataWithUser } from '../types.js'; + +@Injectable() +export class ExportClipsProcessorService { + private logger: Logger; + + constructor( + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.pollsRepository) + private pollsRepository: PollsRepository, + + @Inject(DI.clipsRepository) + private clipsRepository: ClipsRepository, + + @Inject(DI.clipNotesRepository) + private clipNotesRepository: ClipNotesRepository, + + private driveService: DriveService, + private queueLoggerService: QueueLoggerService, + private idService: IdService, + ) { + this.logger = this.queueLoggerService.logger.createSubLogger('export-clips'); + } + + @bindThis + public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> { + this.logger.info(`Exporting clips of ${job.data.user.id} ...`); + + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + return; + } + + // Create temp file + const [path, cleanup] = await createTemp(); + + this.logger.info(`Temp file is ${path}`); + + try { + const stream = Writable.toWeb(fs.createWriteStream(path, { flags: 'a' })); + const writer = stream.getWriter(); + writer.closed.catch(this.logger.error); + + await writer.write('['); + + await this.processClips(writer, user, job); + + await writer.write(']'); + await writer.close(); + + this.logger.succ(`Exported to: ${path}`); + + const fileName = 'clips-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json'; + const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' }); + + this.logger.succ(`Exported to: ${driveFile.id}`); + } finally { + cleanup(); + } + } + + async processClips(writer: WritableStreamDefaultWriter, user: MiUser, job: Bull.Job<DbJobDataWithUser>) { + let exportedClipsCount = 0; + let cursor: MiClip['id'] | null = null; + + while (true) { + const clips = await this.clipsRepository.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}), + }, + take: 100, + order: { + id: 1, + }, + }); + + if (clips.length === 0) { + job.updateProgress(100); + break; + } + + cursor = clips.at(-1)?.id ?? null; + + for (const clip of clips) { + // Stringify but remove the last `]}` + const content = JSON.stringify(this.serializeClip(clip)).slice(0, -2); + const isFirst = exportedClipsCount === 0; + await writer.write(isFirst ? content : ',\n' + content); + + await this.processClipNotes(writer, clip.id); + + await writer.write(']}'); + exportedClipsCount++; + } + + const total = await this.clipsRepository.countBy({ + userId: user.id, + }); + + job.updateProgress(exportedClipsCount / total); + } + } + + async processClipNotes(writer: WritableStreamDefaultWriter, clipId: string): Promise<void> { + let exportedClipNotesCount = 0; + let cursor: MiClipNote['id'] | null = null; + + while (true) { + const clipNotes = await this.clipNotesRepository.find({ + where: { + clipId, + ...(cursor ? { id: MoreThan(cursor) } : {}), + }, + take: 100, + order: { + id: 1, + }, + relations: ['note', 'note.user'], + }) as (MiClipNote & { note: MiNote & { user: MiUser } })[]; + + if (clipNotes.length === 0) { + break; + } + + cursor = clipNotes.at(-1)?.id ?? null; + + for (const clipNote of clipNotes) { + let poll: MiPoll | undefined; + if (clipNote.note.hasPoll) { + poll = await this.pollsRepository.findOneByOrFail({ noteId: clipNote.note.id }); + } + const content = JSON.stringify(this.serializeClipNote(clipNote, poll)); + const isFirst = exportedClipNotesCount === 0; + await writer.write(isFirst ? content : ',\n' + content); + + exportedClipNotesCount++; + } + } + } + + private serializeClip(clip: MiClip): Record<string, unknown> { + return { + id: clip.id, + name: clip.name, + description: clip.description, + lastClippedAt: clip.lastClippedAt?.toISOString(), + clipNotes: [], + }; + } + + private serializeClipNote(clip: MiClipNote & { note: MiNote & { user: MiUser } }, poll: MiPoll | undefined): Record<string, unknown> { + return { + id: clip.id, + createdAt: this.idService.parse(clip.id).date.toISOString(), + note: { + id: clip.note.id, + text: clip.note.text, + createdAt: this.idService.parse(clip.note.id).date.toISOString(), + fileIds: clip.note.fileIds, + replyId: clip.note.replyId, + renoteId: clip.note.renoteId, + poll: poll, + cw: clip.note.cw, + visibility: clip.note.visibility, + visibleUserIds: clip.note.visibleUserIds, + localOnly: clip.note.localOnly, + reactionAcceptance: clip.note.reactionAcceptance, + uri: clip.note.uri, + url: clip.note.url, + user: { + id: clip.note.user.id, + name: clip.note.user.name, + username: clip.note.user.username, + host: clip.note.user.host, + uri: clip.note.user.uri, + }, + }, + }; + } +} diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index f69634968d..971e9f4971 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -85,7 +85,7 @@ export class InboxProcessorService { } catch (err) { // 対象が4xxならスキップ if (err instanceof StatusError) { - if (err.isClientError) { + if (!err.isRetryable) { throw new Bull.UnrecoverableError(`skip: Ignored deleted actors on both ends ${activity.actor} - ${err.statusCode}`); } throw new Error(`Error in actor ${activity.actor} - ${err.statusCode}`); diff --git a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts index a41f5565c8..7a0d533846 100644 --- a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts +++ b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts @@ -71,7 +71,7 @@ export class WebhookDeliverProcessorService { if (res instanceof StatusError) { // 4xx - if (res.isClientError) { + if (!res.isRetryable) { throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`); } diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index 432b3d364f..372829a825 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -15,7 +15,9 @@ export type DeliverJobData = { /** Actor */ user: ThinUser; /** Activity */ - content: unknown; + content: string; + /** Digest header */ + digest: string; /** inbox URL to deliver */ to: string; /** whether it is sharedInbox */ |