summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue
diff options
context:
space:
mode:
Diffstat (limited to 'packages/backend/src/queue')
-rw-r--r--packages/backend/src/queue/QueueProcessorModule.ts2
-rw-r--r--packages/backend/src/queue/QueueProcessorService.ts3
-rw-r--r--packages/backend/src/queue/processors/DeliverProcessorService.ts4
-rw-r--r--packages/backend/src/queue/processors/ExportClipsProcessorService.ts206
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts2
-rw-r--r--packages/backend/src/queue/types.ts4
7 files changed, 218 insertions, 5 deletions
diff --git a/packages/backend/src/queue/QueueProcessorModule.ts b/packages/backend/src/queue/QueueProcessorModule.ts
index 29dc78605b..d547a498a1 100644
--- a/packages/backend/src/queue/QueueProcessorModule.ts
+++ b/packages/backend/src/queue/QueueProcessorModule.ts
@@ -25,6 +25,7 @@ import { ExportCustomEmojisProcessorService } from './processors/ExportCustomEmo
import { ExportFollowingProcessorService } from './processors/ExportFollowingProcessorService.js';
import { ExportMutingProcessorService } from './processors/ExportMutingProcessorService.js';
import { ExportNotesProcessorService } from './processors/ExportNotesProcessorService.js';
+import { ExportClipsProcessorService } from './processors/ExportClipsProcessorService.js';
import { ExportUserListsProcessorService } from './processors/ExportUserListsProcessorService.js';
import { ExportAntennasProcessorService } from './processors/ExportAntennasProcessorService.js';
import { ImportBlockingProcessorService } from './processors/ImportBlockingProcessorService.js';
@@ -56,6 +57,7 @@ import { RelationshipProcessorService } from './processors/RelationshipProcessor
ExportAccountDataProcessorService,
ExportCustomEmojisProcessorService,
ExportNotesProcessorService,
+ ExportClipsProcessorService,
ExportFavoritesProcessorService,
ExportFollowingProcessorService,
ExportMutingProcessorService,
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts
index ea3ecd4ded..cdca744b88 100644
--- a/packages/backend/src/queue/QueueProcessorService.ts
+++ b/packages/backend/src/queue/QueueProcessorService.ts
@@ -17,6 +17,7 @@ import { DeleteDriveFilesProcessorService } from './processors/DeleteDriveFilesP
import { ExportAccountDataProcessorService } from './processors/ExportAccountDataProcessorService.js';
import { ExportCustomEmojisProcessorService } from './processors/ExportCustomEmojisProcessorService.js';
import { ExportNotesProcessorService } from './processors/ExportNotesProcessorService.js';
+import { ExportClipsProcessorService } from './processors/ExportClipsProcessorService.js';
import { ExportFollowingProcessorService } from './processors/ExportFollowingProcessorService.js';
import { ExportMutingProcessorService } from './processors/ExportMutingProcessorService.js';
import { ExportBlockingProcessorService } from './processors/ExportBlockingProcessorService.js';
@@ -94,6 +95,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
private exportAccountDataProcessorService: ExportAccountDataProcessorService,
private exportCustomEmojisProcessorService: ExportCustomEmojisProcessorService,
private exportNotesProcessorService: ExportNotesProcessorService,
+ private exportClipsProcessorService: ExportClipsProcessorService,
private exportFavoritesProcessorService: ExportFavoritesProcessorService,
private exportFollowingProcessorService: ExportFollowingProcessorService,
private exportMutingProcessorService: ExportMutingProcessorService,
@@ -169,6 +171,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
case 'exportAccountData': return this.exportAccountDataProcessorService.process(job);
case 'exportCustomEmojis': return this.exportCustomEmojisProcessorService.process(job);
case 'exportNotes': return this.exportNotesProcessorService.process(job);
+ case 'exportClips': return this.exportClipsProcessorService.process(job);
case 'exportFavorites': return this.exportFavoritesProcessorService.process(job);
case 'exportFollowing': return this.exportFollowingProcessorService.process(job);
case 'exportMuting': return this.exportMutingProcessorService.process(job);
diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts
index 4a1d9f28b4..64c3445552 100644
--- a/packages/backend/src/queue/processors/DeliverProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts
@@ -72,7 +72,7 @@ export class DeliverProcessorService {
}
try {
- await this.apRequestService.signedPost(job.data.user, job.data.to, job.data.content);
+ await this.apRequestService.signedPost(job.data.user, job.data.to, job.data.content, job.data.digest);
// Update stats
this.federatedInstanceService.fetch(host).then(i => {
@@ -111,7 +111,7 @@ export class DeliverProcessorService {
if (res instanceof StatusError) {
// 4xx
- if (res.isClientError) {
+ if (!res.isRetryable) {
// 相手が閉鎖していることを明示しているため、配送停止する
if (job.data.isSharedInbox && res.statusCode === 410) {
this.federatedInstanceService.fetch(host).then(i => {
diff --git a/packages/backend/src/queue/processors/ExportClipsProcessorService.ts b/packages/backend/src/queue/processors/ExportClipsProcessorService.ts
new file mode 100644
index 0000000000..5221497bd3
--- /dev/null
+++ b/packages/backend/src/queue/processors/ExportClipsProcessorService.ts
@@ -0,0 +1,206 @@
+/*
+ * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+import * as fs from 'node:fs';
+import { Writable } from 'node:stream';
+import { Inject, Injectable, StreamableFile } from '@nestjs/common';
+import { MoreThan } from 'typeorm';
+import { format as dateFormat } from 'date-fns';
+import { DI } from '@/di-symbols.js';
+import type { ClipNotesRepository, ClipsRepository, MiClip, MiClipNote, MiUser, NotesRepository, PollsRepository, UsersRepository } from '@/models/_.js';
+import type Logger from '@/logger.js';
+import { DriveService } from '@/core/DriveService.js';
+import { createTemp } from '@/misc/create-temp.js';
+import type { MiPoll } from '@/models/Poll.js';
+import type { MiNote } from '@/models/Note.js';
+import { bindThis } from '@/decorators.js';
+import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js';
+import { Packed } from '@/misc/json-schema.js';
+import { IdService } from '@/core/IdService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type * as Bull from 'bullmq';
+import type { DbJobDataWithUser } from '../types.js';
+
+@Injectable()
+export class ExportClipsProcessorService {
+ private logger: Logger;
+
+ constructor(
+ @Inject(DI.usersRepository)
+ private usersRepository: UsersRepository,
+
+ @Inject(DI.pollsRepository)
+ private pollsRepository: PollsRepository,
+
+ @Inject(DI.clipsRepository)
+ private clipsRepository: ClipsRepository,
+
+ @Inject(DI.clipNotesRepository)
+ private clipNotesRepository: ClipNotesRepository,
+
+ private driveService: DriveService,
+ private queueLoggerService: QueueLoggerService,
+ private idService: IdService,
+ ) {
+ this.logger = this.queueLoggerService.logger.createSubLogger('export-clips');
+ }
+
+ @bindThis
+ public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> {
+ this.logger.info(`Exporting clips of ${job.data.user.id} ...`);
+
+ const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
+ if (user == null) {
+ return;
+ }
+
+ // Create temp file
+ const [path, cleanup] = await createTemp();
+
+ this.logger.info(`Temp file is ${path}`);
+
+ try {
+ const stream = Writable.toWeb(fs.createWriteStream(path, { flags: 'a' }));
+ const writer = stream.getWriter();
+ writer.closed.catch(this.logger.error);
+
+ await writer.write('[');
+
+ await this.processClips(writer, user, job);
+
+ await writer.write(']');
+ await writer.close();
+
+ this.logger.succ(`Exported to: ${path}`);
+
+ const fileName = 'clips-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json';
+ const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' });
+
+ this.logger.succ(`Exported to: ${driveFile.id}`);
+ } finally {
+ cleanup();
+ }
+ }
+
+ async processClips(writer: WritableStreamDefaultWriter, user: MiUser, job: Bull.Job<DbJobDataWithUser>) {
+ let exportedClipsCount = 0;
+ let cursor: MiClip['id'] | null = null;
+
+ while (true) {
+ const clips = await this.clipsRepository.find({
+ where: {
+ userId: user.id,
+ ...(cursor ? { id: MoreThan(cursor) } : {}),
+ },
+ take: 100,
+ order: {
+ id: 1,
+ },
+ });
+
+ if (clips.length === 0) {
+ job.updateProgress(100);
+ break;
+ }
+
+ cursor = clips.at(-1)?.id ?? null;
+
+ for (const clip of clips) {
+ // Stringify but remove the last `]}`
+ const content = JSON.stringify(this.serializeClip(clip)).slice(0, -2);
+ const isFirst = exportedClipsCount === 0;
+ await writer.write(isFirst ? content : ',\n' + content);
+
+ await this.processClipNotes(writer, clip.id);
+
+ await writer.write(']}');
+ exportedClipsCount++;
+ }
+
+ const total = await this.clipsRepository.countBy({
+ userId: user.id,
+ });
+
+ job.updateProgress(exportedClipsCount / total);
+ }
+ }
+
+ async processClipNotes(writer: WritableStreamDefaultWriter, clipId: string): Promise<void> {
+ let exportedClipNotesCount = 0;
+ let cursor: MiClipNote['id'] | null = null;
+
+ while (true) {
+ const clipNotes = await this.clipNotesRepository.find({
+ where: {
+ clipId,
+ ...(cursor ? { id: MoreThan(cursor) } : {}),
+ },
+ take: 100,
+ order: {
+ id: 1,
+ },
+ relations: ['note', 'note.user'],
+ }) as (MiClipNote & { note: MiNote & { user: MiUser } })[];
+
+ if (clipNotes.length === 0) {
+ break;
+ }
+
+ cursor = clipNotes.at(-1)?.id ?? null;
+
+ for (const clipNote of clipNotes) {
+ let poll: MiPoll | undefined;
+ if (clipNote.note.hasPoll) {
+ poll = await this.pollsRepository.findOneByOrFail({ noteId: clipNote.note.id });
+ }
+ const content = JSON.stringify(this.serializeClipNote(clipNote, poll));
+ const isFirst = exportedClipNotesCount === 0;
+ await writer.write(isFirst ? content : ',\n' + content);
+
+ exportedClipNotesCount++;
+ }
+ }
+ }
+
+ private serializeClip(clip: MiClip): Record<string, unknown> {
+ return {
+ id: clip.id,
+ name: clip.name,
+ description: clip.description,
+ lastClippedAt: clip.lastClippedAt?.toISOString(),
+ clipNotes: [],
+ };
+ }
+
+ private serializeClipNote(clip: MiClipNote & { note: MiNote & { user: MiUser } }, poll: MiPoll | undefined): Record<string, unknown> {
+ return {
+ id: clip.id,
+ createdAt: this.idService.parse(clip.id).date.toISOString(),
+ note: {
+ id: clip.note.id,
+ text: clip.note.text,
+ createdAt: this.idService.parse(clip.note.id).date.toISOString(),
+ fileIds: clip.note.fileIds,
+ replyId: clip.note.replyId,
+ renoteId: clip.note.renoteId,
+ poll: poll,
+ cw: clip.note.cw,
+ visibility: clip.note.visibility,
+ visibleUserIds: clip.note.visibleUserIds,
+ localOnly: clip.note.localOnly,
+ reactionAcceptance: clip.note.reactionAcceptance,
+ uri: clip.note.uri,
+ url: clip.note.url,
+ user: {
+ id: clip.note.user.id,
+ name: clip.note.user.name,
+ username: clip.note.user.username,
+ host: clip.note.user.host,
+ uri: clip.note.user.uri,
+ },
+ },
+ };
+ }
+}
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
index f69634968d..971e9f4971 100644
--- a/packages/backend/src/queue/processors/InboxProcessorService.ts
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -85,7 +85,7 @@ export class InboxProcessorService {
} catch (err) {
// 対象が4xxならスキップ
if (err instanceof StatusError) {
- if (err.isClientError) {
+ if (!err.isRetryable) {
throw new Bull.UnrecoverableError(`skip: Ignored deleted actors on both ends ${activity.actor} - ${err.statusCode}`);
}
throw new Error(`Error in actor ${activity.actor} - ${err.statusCode}`);
diff --git a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts
index a41f5565c8..7a0d533846 100644
--- a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts
+++ b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts
@@ -71,7 +71,7 @@ export class WebhookDeliverProcessorService {
if (res instanceof StatusError) {
// 4xx
- if (res.isClientError) {
+ if (!res.isRetryable) {
throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`);
}
diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts
index 432b3d364f..372829a825 100644
--- a/packages/backend/src/queue/types.ts
+++ b/packages/backend/src/queue/types.ts
@@ -15,7 +15,9 @@ export type DeliverJobData = {
/** Actor */
user: ThinUser;
/** Activity */
- content: unknown;
+ content: string;
+ /** Digest header */
+ digest: string;
/** inbox URL to deliver */
to: string;
/** whether it is sharedInbox */