summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/processors
diff options
context:
space:
mode:
authordakkar <dakkar@thenautilus.net>2024-03-02 17:28:34 +0000
committerdakkar <dakkar@thenautilus.net>2024-03-02 17:28:34 +0000
commit23f476dbf32ef9a2fc7d2ed7aab9ce706a2409d0 (patch)
tree0b9e79c2f18f4a206811561fa255f2510f60c175 /packages/backend/src/queue/processors
parentmerge: Add missing IMPORTANT_NOTES.md from Sharkey/OldJoinSharkey (!443) (diff)
parentmerge: put back the readme (!447) (diff)
downloadsharkey-23f476dbf32ef9a2fc7d2ed7aab9ce706a2409d0.tar.gz
sharkey-23f476dbf32ef9a2fc7d2ed7aab9ce706a2409d0.tar.bz2
sharkey-23f476dbf32ef9a2fc7d2ed7aab9ce706a2409d0.zip
Merge branch 'develop' into release/2024.3.1
Diffstat (limited to 'packages/backend/src/queue/processors')
-rw-r--r--packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/CleanChartsProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/CleanProcessorService.ts6
-rw-r--r--packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/DeleteAccountProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/DeleteFileProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/DeliverProcessorService.ts6
-rw-r--r--packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ExportAntennasProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ExportBlockingProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ExportClipsProcessorService.ts206
-rw-r--r--packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ExportFollowingProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ExportMutingProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ExportNotesProcessorService.ts166
-rw-r--r--packages/backend/src/queue/processors/ExportUserListsProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ImportAntennasProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ImportBlockingProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ImportFollowingProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ImportMutingProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ImportNotesProcessorService.ts70
-rw-r--r--packages/backend/src/queue/processors/ImportUserListsProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts17
-rw-r--r--packages/backend/src/queue/processors/RelationshipProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ResyncChartsProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/TickChartsProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts4
31 files changed, 380 insertions, 143 deletions
diff --git a/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts
index 9f49d85c7f..4769cccabf 100644
--- a/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts
+++ b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts b/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts
index 9b07389dc3..448fc9c763 100644
--- a/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts
+++ b/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts
index 55c444eee6..110468801c 100644
--- a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts
+++ b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/CleanProcessorService.ts b/packages/backend/src/queue/processors/CleanProcessorService.ts
index e252c5d8a1..a26b69cd2b 100644
--- a/packages/backend/src/queue/processors/CleanProcessorService.ts
+++ b/packages/backend/src/queue/processors/CleanProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
@@ -11,6 +11,7 @@ import type Logger from '@/logger.js';
import { bindThis } from '@/decorators.js';
import { IdService } from '@/core/IdService.js';
import type { Config } from '@/config.js';
+import { ReversiService } from '@/core/ReversiService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
@@ -32,6 +33,7 @@ export class CleanProcessorService {
private roleAssignmentsRepository: RoleAssignmentsRepository,
private queueLoggerService: QueueLoggerService,
+ private reversiService: ReversiService,
private idService: IdService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('clean');
@@ -65,6 +67,8 @@ export class CleanProcessorService {
});
}
+ this.reversiService.cleanOutdatedGames();
+
this.logger.succ('Cleaned.');
}
}
diff --git a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
index b62cc8a8fd..917de8b72c 100644
--- a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
+++ b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
index 56369f3a7a..0e604a0501 100644
--- a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts b/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts
index 6d0a45bcc0..291fa4a6d8 100644
--- a/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/DeleteFileProcessorService.ts b/packages/backend/src/queue/processors/DeleteFileProcessorService.ts
index a4638bfaaf..fc1dd93ce7 100644
--- a/packages/backend/src/queue/processors/DeleteFileProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeleteFileProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts
index 4a1d9f28b4..5fed070929 100644
--- a/packages/backend/src/queue/processors/DeliverProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
@@ -72,7 +72,7 @@ export class DeliverProcessorService {
}
try {
- await this.apRequestService.signedPost(job.data.user, job.data.to, job.data.content);
+ await this.apRequestService.signedPost(job.data.user, job.data.to, job.data.content, job.data.digest);
// Update stats
this.federatedInstanceService.fetch(host).then(i => {
@@ -111,7 +111,7 @@ export class DeliverProcessorService {
if (res instanceof StatusError) {
// 4xx
- if (res.isClientError) {
+ if (!res.isRetryable) {
// 相手が閉鎖していることを明示しているため、配送停止する
if (job.data.isSharedInbox && res.statusCode === 410) {
this.federatedInstanceService.fetch(host).then(i => {
diff --git a/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts b/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts
index 4a48084436..29c1f27bb1 100644
--- a/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts
+++ b/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
index d0968d2923..af48bad417 100644
--- a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts
index 0a37e3ca1e..6ec3c18786 100644
--- a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/ExportClipsProcessorService.ts b/packages/backend/src/queue/processors/ExportClipsProcessorService.ts
new file mode 100644
index 0000000000..01eab26e96
--- /dev/null
+++ b/packages/backend/src/queue/processors/ExportClipsProcessorService.ts
@@ -0,0 +1,206 @@
+/*
+ * SPDX-FileCopyrightText: syuilo and misskey-project
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+import * as fs from 'node:fs';
+import { Writable } from 'node:stream';
+import { Inject, Injectable, StreamableFile } from '@nestjs/common';
+import { MoreThan } from 'typeorm';
+import { format as dateFormat } from 'date-fns';
+import { DI } from '@/di-symbols.js';
+import type { ClipNotesRepository, ClipsRepository, MiClip, MiClipNote, MiUser, NotesRepository, PollsRepository, UsersRepository } from '@/models/_.js';
+import type Logger from '@/logger.js';
+import { DriveService } from '@/core/DriveService.js';
+import { createTemp } from '@/misc/create-temp.js';
+import type { MiPoll } from '@/models/Poll.js';
+import type { MiNote } from '@/models/Note.js';
+import { bindThis } from '@/decorators.js';
+import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js';
+import { Packed } from '@/misc/json-schema.js';
+import { IdService } from '@/core/IdService.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type * as Bull from 'bullmq';
+import type { DbJobDataWithUser } from '../types.js';
+
+@Injectable()
+export class ExportClipsProcessorService {
+ private logger: Logger;
+
+ constructor(
+ @Inject(DI.usersRepository)
+ private usersRepository: UsersRepository,
+
+ @Inject(DI.pollsRepository)
+ private pollsRepository: PollsRepository,
+
+ @Inject(DI.clipsRepository)
+ private clipsRepository: ClipsRepository,
+
+ @Inject(DI.clipNotesRepository)
+ private clipNotesRepository: ClipNotesRepository,
+
+ private driveService: DriveService,
+ private queueLoggerService: QueueLoggerService,
+ private idService: IdService,
+ ) {
+ this.logger = this.queueLoggerService.logger.createSubLogger('export-clips');
+ }
+
+ @bindThis
+ public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> {
+ this.logger.info(`Exporting clips of ${job.data.user.id} ...`);
+
+ const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
+ if (user == null) {
+ return;
+ }
+
+ // Create temp file
+ const [path, cleanup] = await createTemp();
+
+ this.logger.info(`Temp file is ${path}`);
+
+ try {
+ const stream = Writable.toWeb(fs.createWriteStream(path, { flags: 'a' }));
+ const writer = stream.getWriter();
+ writer.closed.catch(this.logger.error);
+
+ await writer.write('[');
+
+ await this.processClips(writer, user, job);
+
+ await writer.write(']');
+ await writer.close();
+
+ this.logger.succ(`Exported to: ${path}`);
+
+ const fileName = 'clips-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json';
+ const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' });
+
+ this.logger.succ(`Exported to: ${driveFile.id}`);
+ } finally {
+ cleanup();
+ }
+ }
+
+ async processClips(writer: WritableStreamDefaultWriter, user: MiUser, job: Bull.Job<DbJobDataWithUser>) {
+ let exportedClipsCount = 0;
+ let cursor: MiClip['id'] | null = null;
+
+ while (true) {
+ const clips = await this.clipsRepository.find({
+ where: {
+ userId: user.id,
+ ...(cursor ? { id: MoreThan(cursor) } : {}),
+ },
+ take: 100,
+ order: {
+ id: 1,
+ },
+ });
+
+ if (clips.length === 0) {
+ job.updateProgress(100);
+ break;
+ }
+
+ cursor = clips.at(-1)?.id ?? null;
+
+ for (const clip of clips) {
+ // Stringify but remove the last `]}`
+ const content = JSON.stringify(this.serializeClip(clip)).slice(0, -2);
+ const isFirst = exportedClipsCount === 0;
+ await writer.write(isFirst ? content : ',\n' + content);
+
+ await this.processClipNotes(writer, clip.id);
+
+ await writer.write(']}');
+ exportedClipsCount++;
+ }
+
+ const total = await this.clipsRepository.countBy({
+ userId: user.id,
+ });
+
+ job.updateProgress(exportedClipsCount / total);
+ }
+ }
+
+ async processClipNotes(writer: WritableStreamDefaultWriter, clipId: string): Promise<void> {
+ let exportedClipNotesCount = 0;
+ let cursor: MiClipNote['id'] | null = null;
+
+ while (true) {
+ const clipNotes = await this.clipNotesRepository.find({
+ where: {
+ clipId,
+ ...(cursor ? { id: MoreThan(cursor) } : {}),
+ },
+ take: 100,
+ order: {
+ id: 1,
+ },
+ relations: ['note', 'note.user'],
+ }) as (MiClipNote & { note: MiNote & { user: MiUser } })[];
+
+ if (clipNotes.length === 0) {
+ break;
+ }
+
+ cursor = clipNotes.at(-1)?.id ?? null;
+
+ for (const clipNote of clipNotes) {
+ let poll: MiPoll | undefined;
+ if (clipNote.note.hasPoll) {
+ poll = await this.pollsRepository.findOneByOrFail({ noteId: clipNote.note.id });
+ }
+ const content = JSON.stringify(this.serializeClipNote(clipNote, poll));
+ const isFirst = exportedClipNotesCount === 0;
+ await writer.write(isFirst ? content : ',\n' + content);
+
+ exportedClipNotesCount++;
+ }
+ }
+ }
+
+ private serializeClip(clip: MiClip): Record<string, unknown> {
+ return {
+ id: clip.id,
+ name: clip.name,
+ description: clip.description,
+ lastClippedAt: clip.lastClippedAt?.toISOString(),
+ clipNotes: [],
+ };
+ }
+
+ private serializeClipNote(clip: MiClipNote & { note: MiNote & { user: MiUser } }, poll: MiPoll | undefined): Record<string, unknown> {
+ return {
+ id: clip.id,
+ createdAt: this.idService.parse(clip.id).date.toISOString(),
+ note: {
+ id: clip.note.id,
+ text: clip.note.text,
+ createdAt: this.idService.parse(clip.note.id).date.toISOString(),
+ fileIds: clip.note.fileIds,
+ replyId: clip.note.replyId,
+ renoteId: clip.note.renoteId,
+ poll: poll,
+ cw: clip.note.cw,
+ visibility: clip.note.visibility,
+ visibleUserIds: clip.note.visibleUserIds,
+ localOnly: clip.note.localOnly,
+ reactionAcceptance: clip.note.reactionAcceptance,
+ uri: clip.note.uri,
+ url: clip.note.url,
+ user: {
+ id: clip.note.user.id,
+ name: clip.note.user.name,
+ username: clip.note.user.username,
+ host: clip.note.user.host,
+ uri: clip.note.user.uri,
+ },
+ },
+ };
+ }
+}
diff --git a/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts
index d5387fe42e..e4eb4791bd 100644
--- a/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts
index af2a3434a9..7bb626dd31 100644
--- a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts
index c9739eb1cb..1cc80e66d7 100644
--- a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts
index c8425c1f2d..243b74f2c2 100644
--- a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
index cd4ccb0b07..c7611012d7 100644
--- a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
@@ -1,9 +1,9 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
-import * as fs from 'node:fs';
+import { ReadableStream, TextEncoderStream } from 'node:stream/web';
import { Inject, Injectable } from '@nestjs/common';
import { MoreThan } from 'typeorm';
import { format as dateFormat } from 'date-fns';
@@ -18,10 +18,82 @@ import { bindThis } from '@/decorators.js';
import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js';
import { Packed } from '@/misc/json-schema.js';
import { IdService } from '@/core/IdService.js';
+import { JsonArrayStream } from '@/misc/JsonArrayStream.js';
+import { FileWriterStream } from '@/misc/FileWriterStream.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
import type { DbJobDataWithUser } from '../types.js';
+class NoteStream extends ReadableStream<Record<string, unknown>> {
+ constructor(
+ job: Bull.Job,
+ notesRepository: NotesRepository,
+ pollsRepository: PollsRepository,
+ driveFileEntityService: DriveFileEntityService,
+ idService: IdService,
+ userId: string,
+ ) {
+ let exportedNotesCount = 0;
+ let cursor: MiNote['id'] | null = null;
+
+ const serialize = (
+ note: MiNote,
+ poll: MiPoll | null,
+ files: Packed<'DriveFile'>[],
+ ): Record<string, unknown> => {
+ return {
+ id: note.id,
+ text: note.text,
+ createdAt: idService.parse(note.id).date.toISOString(),
+ fileIds: note.fileIds,
+ files: files,
+ replyId: note.replyId,
+ renoteId: note.renoteId,
+ poll: poll,
+ cw: note.cw,
+ visibility: note.visibility,
+ visibleUserIds: note.visibleUserIds,
+ localOnly: note.localOnly,
+ reactionAcceptance: note.reactionAcceptance,
+ };
+ };
+
+ super({
+ async pull(controller): Promise<void> {
+ const notes = await notesRepository.find({
+ where: {
+ userId,
+ ...(cursor !== null ? { id: MoreThan(cursor) } : {}),
+ },
+ take: 100, // 100件ずつ取得
+ order: { id: 1 },
+ });
+
+ if (notes.length === 0) {
+ job.updateProgress(100);
+ controller.close();
+ }
+
+ cursor = notes.at(-1)?.id ?? null;
+
+ for (const note of notes) {
+ const poll = note.hasPoll
+ ? await pollsRepository.findOneByOrFail({ noteId: note.id }) // N+1
+ : null;
+ const files = await driveFileEntityService.packManyByIds(note.fileIds); // N+1
+ const content = serialize(note, poll, files);
+
+ controller.enqueue(content);
+ exportedNotesCount++;
+ }
+
+ const total = await notesRepository.countBy({ userId });
+ job.updateProgress(exportedNotesCount / total);
+ },
+ });
+ }
+}
+
@Injectable()
export class ExportNotesProcessorService {
private logger: Logger;
@@ -59,67 +131,19 @@ export class ExportNotesProcessorService {
this.logger.info(`Temp file is ${path}`);
try {
- const stream = fs.createWriteStream(path, { flags: 'a' });
-
- const write = (text: string): Promise<void> => {
- return new Promise<void>((res, rej) => {
- stream.write(text, err => {
- if (err) {
- this.logger.error(err);
- rej(err);
- } else {
- res();
- }
- });
- });
- };
-
- await write('[');
-
- let exportedNotesCount = 0;
- let cursor: MiNote['id'] | null = null;
-
- while (true) {
- const notes = await this.notesRepository.find({
- where: {
- userId: user.id,
- ...(cursor ? { id: MoreThan(cursor) } : {}),
- },
- take: 100,
- order: {
- id: 1,
- },
- }) as MiNote[];
-
- if (notes.length === 0) {
- job.updateProgress(100);
- break;
- }
-
- cursor = notes.at(-1)?.id ?? null;
-
- for (const note of notes) {
- let poll: MiPoll | undefined;
- if (note.hasPoll) {
- poll = await this.pollsRepository.findOneByOrFail({ noteId: note.id });
- }
- const files = await this.driveFileEntityService.packManyByIds(note.fileIds);
- const content = JSON.stringify(this.serialize(note, poll, files));
- const isFirst = exportedNotesCount === 0;
- await write(isFirst ? content : ',\n' + content);
- exportedNotesCount++;
- }
-
- const total = await this.notesRepository.countBy({
- userId: user.id,
- });
+ // メモリが足りなくならないようにストリームで処理する
+ await new NoteStream(
+ job,
+ this.notesRepository,
+ this.pollsRepository,
+ this.driveFileEntityService,
+ this.idService,
+ user.id,
+ )
+ .pipeThrough(new JsonArrayStream())
+ .pipeThrough(new TextEncoderStream())
+ .pipeTo(new FileWriterStream(path));
- job.updateProgress(exportedNotesCount / total);
- }
-
- await write(']');
-
- stream.end();
this.logger.succ(`Exported to: ${path}`);
const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json';
@@ -130,22 +154,4 @@ export class ExportNotesProcessorService {
cleanup();
}
}
-
- private serialize(note: MiNote, poll: MiPoll | null = null, files: Packed<'DriveFile'>[]): Record<string, unknown> {
- return {
- id: note.id,
- text: note.text,
- createdAt: this.idService.parse(note.id).date.toISOString(),
- fileIds: note.fileIds,
- files: files,
- replyId: note.replyId,
- renoteId: note.renoteId,
- poll: poll,
- cw: note.cw,
- visibility: note.visibility,
- visibleUserIds: note.visibleUserIds,
- localOnly: note.localOnly,
- reactionAcceptance: note.reactionAcceptance,
- };
- }
}
diff --git a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts
index a3f9441dc2..ee87cff5d3 100644
--- a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
index 291ea14b67..951b560597 100644
--- a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts
index 64520b770b..b78229c648 100644
--- a/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
index a52af54a39..171809d25c 100644
--- a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts
index e75499a56f..70c9f3a096 100644
--- a/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/ImportMutingProcessorService.ts b/packages/backend/src/queue/processors/ImportMutingProcessorService.ts
index 9db4e5d8e0..ec9d2b6c4c 100644
--- a/packages/backend/src/queue/processors/ImportMutingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportMutingProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts
index d64a861b03..10cd90c4ad 100644
--- a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts
@@ -1,5 +1,5 @@
import * as fs from 'node:fs';
-import * as vm from 'node:vm';
+import * as fsp from 'node:fs/promises';
import * as crypto from 'node:crypto';
import { Inject, Injectable } from '@nestjs/common';
import { ZipReader } from 'slacc';
@@ -51,7 +51,7 @@ export class ImportNotesProcessorService {
@bindThis
private async uploadFiles(dir: string, user: MiUser, folder?: MiDriveFolder['id']) {
- const fileList = fs.readdirSync(dir);
+ const fileList = await fsp.readdir(dir);
for await (const file of fileList) {
const name = `${dir}/${file}`;
if (fs.statSync(name).isDirectory()) {
@@ -132,7 +132,7 @@ export class ImportNotesProcessorService {
private parseTwitterFile(str : string) : null | [{ tweet: any }] {
const removed = str.replace(new RegExp('window\\.YTD\\.tweets\\.part0 = ', 'g'), '');
-
+
try {
return JSON.parse(removed);
} catch (error) {
@@ -142,6 +142,19 @@ export class ImportNotesProcessorService {
}
@bindThis
+ private parseTwitterFile(str : string) : { tweet: object }[] {
+ const jsonStr = str.replace(/^\s*window\.YTD\.tweets\.part0\s*=\s*/, '');
+
+ try {
+ return JSON.parse(jsonStr);
+ } catch (error) {
+ //The format is not what we expected. Either this file was tampered with or twitters exports changed
+ this.logger.warn('Failed to import twitter notes due to malformed file');
+ throw error;
+ }
+ }
+
+ @bindThis
public async process(job: Bull.Job<DbNoteImportJobData>): Promise<void> {
this.logger.info(`Starting note import of ${job.data.user.id} ...`);
@@ -173,7 +186,7 @@ export class ImportNotesProcessorService {
const destPath = path + '/twitter.zip';
try {
- fs.writeFileSync(destPath, '', 'binary');
+ await fsp.writeFile(destPath, '', 'binary');
await this.downloadService.downloadUrl(file.url, destPath);
} catch (e) { // TODO: 何度か再試行
if (e instanceof Error || typeof e === 'string') {
@@ -185,21 +198,13 @@ export class ImportNotesProcessorService {
const outputPath = path + '/twitter';
try {
this.logger.succ(`Unzipping to ${outputPath}`);
- ZipReader.withDestinationPath(outputPath).viaBuffer(await fs.promises.readFile(destPath));
+ ZipReader.withDestinationPath(outputPath).viaBuffer(await fsp.readFile(destPath));
- const unprocessedTweetJson = this.parseTwitterFile(fs.readFileSync(outputPath + '/data/tweets.js', 'utf-8'));
+ const unprocessedTweets = this.parseTwitterFile(await fsp.readFile(outputPath + '/data/tweets.js', 'utf-8'));
- //Make sure that it isnt null (because if something went wrong in parseTwitterFile it returns null)
- if (unprocessedTweetJson) {
- const tweets = Object.keys(unprocessedTweetJson).reduce((m, key, i, obj) => {
- return m.concat(unprocessedTweetJson[i].tweet);
- }, []);
-
- const processedTweets = await this.recreateChain(['id_str'], ['in_reply_to_status_id_str'], tweets, false);
- this.queueService.createImportTweetsToDbJob(job.data.user, processedTweets, null);
- } else {
- this.logger.warn('Failed to import twitter notes due to malformed file');
- }
+ const tweets = unprocessedTweets.map(e => e.tweet);
+ const processedTweets = await this.recreateChain(['id_str'], ['in_reply_to_status_id_str'], tweets, false);
+ this.queueService.createImportTweetsToDbJob(job.data.user, processedTweets, null);
} finally {
cleanup();
}
@@ -211,7 +216,7 @@ export class ImportNotesProcessorService {
const destPath = path + '/facebook.zip';
try {
- fs.writeFileSync(destPath, '', 'binary');
+ await fsp.writeFile(destPath, '', 'binary');
await this.downloadService.downloadUrl(file.url, destPath);
} catch (e) { // TODO: 何度か再試行
if (e instanceof Error || typeof e === 'string') {
@@ -223,8 +228,8 @@ export class ImportNotesProcessorService {
const outputPath = path + '/facebook';
try {
this.logger.succ(`Unzipping to ${outputPath}`);
- ZipReader.withDestinationPath(outputPath).viaBuffer(await fs.promises.readFile(destPath));
- const postsJson = fs.readFileSync(outputPath + '/your_activity_across_facebook/posts/your_posts__check_ins__photos_and_videos_1.json', 'utf-8');
+ ZipReader.withDestinationPath(outputPath).viaBuffer(await fsp.readFile(destPath));
+ const postsJson = await fsp.readFile(outputPath + '/your_activity_across_facebook/posts/your_posts__check_ins__photos_and_videos_1.json', 'utf-8');
const posts = JSON.parse(postsJson);
const facebookFolder = await this.driveFoldersRepository.findOneBy({ name: 'Facebook', userId: job.data.user.id, parentId: folder?.id });
if (facebookFolder == null && folder) {
@@ -244,7 +249,7 @@ export class ImportNotesProcessorService {
const destPath = path + '/unknown.zip';
try {
- fs.writeFileSync(destPath, '', 'binary');
+ await fsp.writeFile(destPath, '', 'binary');
await this.downloadService.downloadUrl(file.url, destPath);
} catch (e) { // TODO: 何度か再試行
if (e instanceof Error || typeof e === 'string') {
@@ -256,11 +261,11 @@ export class ImportNotesProcessorService {
const outputPath = path + '/unknown';
try {
this.logger.succ(`Unzipping to ${outputPath}`);
- ZipReader.withDestinationPath(outputPath).viaBuffer(await fs.promises.readFile(destPath));
+ ZipReader.withDestinationPath(outputPath).viaBuffer(await fsp.readFile(destPath));
const isInstagram = type === 'Instagram' || fs.existsSync(outputPath + '/instagram_live') || fs.existsSync(outputPath + '/instagram_ads_and_businesses');
const isOutbox = type === 'Mastodon' || fs.existsSync(outputPath + '/outbox.json');
if (isInstagram) {
- const postsJson = fs.readFileSync(outputPath + '/content/posts_1.json', 'utf-8');
+ const postsJson = await fsp.readFile(outputPath + '/content/posts_1.json', 'utf-8');
const posts = JSON.parse(postsJson);
const igFolder = await this.driveFoldersRepository.findOneBy({ name: 'Instagram', userId: job.data.user.id, parentId: folder?.id });
if (igFolder == null && folder) {
@@ -270,16 +275,16 @@ export class ImportNotesProcessorService {
}
this.queueService.createImportIGToDbJob(job.data.user, posts);
} else if (isOutbox) {
- const actorJson = fs.readFileSync(outputPath + '/actor.json', 'utf-8');
+ const actorJson = await fsp.readFile(outputPath + '/actor.json', 'utf-8');
const actor = JSON.parse(actorJson);
const isPleroma = actor['@context'].some((v: any) => typeof v === 'string' && v.match(/litepub(.*)/));
if (isPleroma) {
- const outboxJson = fs.readFileSync(outputPath + '/outbox.json', 'utf-8');
+ const outboxJson = await fsp.readFile(outputPath + '/outbox.json', 'utf-8');
const outbox = JSON.parse(outboxJson);
const processedToots = await this.recreateChain(['object', 'id'], ['object', 'inReplyTo'], outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note'), true);
this.queueService.createImportPleroToDbJob(job.data.user, processedToots, null);
} else {
- const outboxJson = fs.readFileSync(outputPath + '/outbox.json', 'utf-8');
+ const outboxJson = await fsp.readFile(outputPath + '/outbox.json', 'utf-8');
const outbox = JSON.parse(outboxJson);
let mastoFolder = await this.driveFoldersRepository.findOneBy({ name: 'Mastodon', userId: job.data.user.id, parentId: folder?.id });
if (mastoFolder == null && folder) {
@@ -302,7 +307,7 @@ export class ImportNotesProcessorService {
this.logger.info(`Temp dir is ${path}`);
try {
- fs.writeFileSync(path, '', 'utf-8');
+ await fsp.writeFile(path, '', 'utf-8');
await this.downloadService.downloadUrl(file.url, path);
} catch (e) { // TODO: 何度か再試行
if (e instanceof Error || typeof e === 'string') {
@@ -311,7 +316,7 @@ export class ImportNotesProcessorService {
throw e;
}
- const notesJson = fs.readFileSync(path, 'utf-8');
+ const notesJson = await fsp.readFile(path, 'utf-8');
const notes = JSON.parse(notesJson);
const processedNotes = await this.recreateChain(['id'], ['replyId'], notes, false);
this.queueService.createImportKeyNotesToDbJob(job.data.user, processedNotes, null);
@@ -424,6 +429,10 @@ export class ImportNotesProcessorService {
const name = file.url.substring(slashdex + 1);
const exists = await this.driveFilesRepository.findOneBy({ name: name, userId: user.id });
if (exists) {
+ if (file.name) {
+ this.driveService.updateFile(exists, { comment: file.name }, user);
+ }
+
files.push(exists);
}
}
@@ -583,14 +592,15 @@ export class ImportNotesProcessorService {
async function replaceTwitterMentions(full_text: string, mentions: any) {
let full_textedit = full_text;
mentions.forEach((mention: any) => {
- full_textedit = full_textedit.replaceAll(`@${mention.screen_name}`, `[@${mention.screen_name}](https://nitter.net/${mention.screen_name})`);
+ full_textedit = full_textedit.replaceAll(`@${mention.screen_name}`, `[@${mention.screen_name}](https://twitter.com/${mention.screen_name})`);
});
return full_textedit;
}
try {
const date = new Date(tweet.created_at);
- const textReplaceURLs = tweet.entities.urls && tweet.entities.urls.length > 0 ? await replaceTwitterUrls(tweet.full_text, tweet.entities.urls) : tweet.full_text;
+ const decodedText = tweet.full_text.replaceAll('&gt;', '>').replaceAll('&lt;', '<').replaceAll('&amp;', '&');
+ const textReplaceURLs = tweet.entities.urls && tweet.entities.urls.length > 0 ? await replaceTwitterUrls(decodedText, tweet.entities.urls) : decodedText;
const text = tweet.entities.user_mentions && tweet.entities.user_mentions.length > 0 ? await replaceTwitterMentions(textReplaceURLs, tweet.entities.user_mentions) : textReplaceURLs;
const files: MiDriveFile[] = [];
diff --git a/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts
index 5dd3fbe887..a5992c28c8 100644
--- a/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
index f69634968d..ad1d9799a7 100644
--- a/packages/backend/src/queue/processors/InboxProcessorService.ts
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
@@ -24,6 +24,7 @@ import { ApPersonService } from '@/core/activitypub/models/ApPersonService.js';
import { LdSignatureService } from '@/core/activitypub/LdSignatureService.js';
import { ApInboxService } from '@/core/activitypub/ApInboxService.js';
import { bindThis } from '@/decorators.js';
+import { IdentifiableError } from '@/misc/identifiable-error.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type { InboxJobData } from '../types.js';
@@ -85,7 +86,7 @@ export class InboxProcessorService {
} catch (err) {
// 対象が4xxならスキップ
if (err instanceof StatusError) {
- if (err.isClientError) {
+ if (!err.isRetryable) {
throw new Bull.UnrecoverableError(`skip: Ignored deleted actors on both ends ${activity.actor} - ${err.statusCode}`);
}
throw new Error(`Error in actor ${activity.actor} - ${err.statusCode}`);
@@ -191,7 +192,17 @@ export class InboxProcessorService {
});
// アクティビティを処理
- await this.apInboxService.performActivity(authUser.user, activity);
+ try {
+ await this.apInboxService.performActivity(authUser.user, activity);
+ } catch (e) {
+ if (e instanceof IdentifiableError) {
+ if (e.id === '689ee33f-f97c-479a-ac49-1b9f8140af99') {
+ return 'blocked notes with prohibited words';
+ }
+ if (e.id === '85ab9bd7-3a41-4530-959d-f07073900109') return 'actor has been suspended';
+ }
+ throw e;
+ }
return 'ok';
}
}
diff --git a/packages/backend/src/queue/processors/RelationshipProcessorService.ts b/packages/backend/src/queue/processors/RelationshipProcessorService.ts
index b2d8e3631f..408b02fb38 100644
--- a/packages/backend/src/queue/processors/RelationshipProcessorService.ts
+++ b/packages/backend/src/queue/processors/RelationshipProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts
index b3b055ef8c..570cdf9a75 100644
--- a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts
+++ b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/TickChartsProcessorService.ts b/packages/backend/src/queue/processors/TickChartsProcessorService.ts
index 7b1efb71e0..93ec34162d 100644
--- a/packages/backend/src/queue/processors/TickChartsProcessorService.ts
+++ b/packages/backend/src/queue/processors/TickChartsProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
diff --git a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts
index a41f5565c8..8c260c0137 100644
--- a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts
+++ b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts
@@ -1,5 +1,5 @@
/*
- * SPDX-FileCopyrightText: syuilo and other misskey contributors
+ * SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
@@ -71,7 +71,7 @@ export class WebhookDeliverProcessorService {
if (res instanceof StatusError) {
// 4xx
- if (res.isClientError) {
+ if (!res.isRetryable) {
throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`);
}