summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue/processors
diff options
context:
space:
mode:
authorokayurisotto <47853651+okayurisotto@users.noreply.github.com>2024-02-28 15:34:58 +0900
committerGitHub <noreply@github.com>2024-02-28 15:34:58 +0900
commitb7d9d1620161a728e34ab20d8ea99160b3eb4196 (patch)
tree6e59e3cd1a98f8ed96ab943c64223f65b9bb7e14 /packages/backend/src/queue/processors
parentenhance(backend): フォロー・フォロワー関連の通知の受信設... (diff)
downloadsharkey-b7d9d1620161a728e34ab20d8ea99160b3eb4196.tar.gz
sharkey-b7d9d1620161a728e34ab20d8ea99160b3eb4196.tar.bz2
sharkey-b7d9d1620161a728e34ab20d8ea99160b3eb4196.zip
refactor(backend): ノートのエクスポート処理でStreams APIを使うように (#13465)
* refactor(backend): ノートのエクスポート処理でStreams APIを使うように * fixup! refactor(backend): ノートのエクスポート処理でStreams APIを使うように `await`忘れにより、ジョブがすぐに完了したことになり削除されてしまっていた。 それによって、`NoteStream`内での`updateProgress`メソッドの呼び出しで、`Missing key for job`のエラーが発生することがあった。 --------- Co-authored-by: syuilo <4439005+syuilo@users.noreply.github.com>
Diffstat (limited to 'packages/backend/src/queue/processors')
-rw-r--r--packages/backend/src/queue/processors/ExportNotesProcessorService.ts164
1 files changed, 85 insertions, 79 deletions
diff --git a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
index f2ae0ce4b4..c7611012d7 100644
--- a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
@@ -3,7 +3,7 @@
* SPDX-License-Identifier: AGPL-3.0-only
*/
-import * as fs from 'node:fs';
+import { ReadableStream, TextEncoderStream } from 'node:stream/web';
import { Inject, Injectable } from '@nestjs/common';
import { MoreThan } from 'typeorm';
import { format as dateFormat } from 'date-fns';
@@ -18,10 +18,82 @@ import { bindThis } from '@/decorators.js';
import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js';
import { Packed } from '@/misc/json-schema.js';
import { IdService } from '@/core/IdService.js';
+import { JsonArrayStream } from '@/misc/JsonArrayStream.js';
+import { FileWriterStream } from '@/misc/FileWriterStream.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
import type { DbJobDataWithUser } from '../types.js';
+class NoteStream extends ReadableStream<Record<string, unknown>> {
+ constructor(
+ job: Bull.Job,
+ notesRepository: NotesRepository,
+ pollsRepository: PollsRepository,
+ driveFileEntityService: DriveFileEntityService,
+ idService: IdService,
+ userId: string,
+ ) {
+ let exportedNotesCount = 0;
+ let cursor: MiNote['id'] | null = null;
+
+ const serialize = (
+ note: MiNote,
+ poll: MiPoll | null,
+ files: Packed<'DriveFile'>[],
+ ): Record<string, unknown> => {
+ return {
+ id: note.id,
+ text: note.text,
+ createdAt: idService.parse(note.id).date.toISOString(),
+ fileIds: note.fileIds,
+ files: files,
+ replyId: note.replyId,
+ renoteId: note.renoteId,
+ poll: poll,
+ cw: note.cw,
+ visibility: note.visibility,
+ visibleUserIds: note.visibleUserIds,
+ localOnly: note.localOnly,
+ reactionAcceptance: note.reactionAcceptance,
+ };
+ };
+
+ super({
+ async pull(controller): Promise<void> {
+ const notes = await notesRepository.find({
+ where: {
+ userId,
+ ...(cursor !== null ? { id: MoreThan(cursor) } : {}),
+ },
+ take: 100, // 100件ずつ取得
+ order: { id: 1 },
+ });
+
+ if (notes.length === 0) {
+ job.updateProgress(100);
+ controller.close();
+ }
+
+ cursor = notes.at(-1)?.id ?? null;
+
+ for (const note of notes) {
+ const poll = note.hasPoll
+ ? await pollsRepository.findOneByOrFail({ noteId: note.id }) // N+1
+ : null;
+ const files = await driveFileEntityService.packManyByIds(note.fileIds); // N+1
+ const content = serialize(note, poll, files);
+
+ controller.enqueue(content);
+ exportedNotesCount++;
+ }
+
+ const total = await notesRepository.countBy({ userId });
+ job.updateProgress(exportedNotesCount / total);
+ },
+ });
+ }
+}
+
@Injectable()
export class ExportNotesProcessorService {
private logger: Logger;
@@ -59,67 +131,19 @@ export class ExportNotesProcessorService {
this.logger.info(`Temp file is ${path}`);
try {
- const stream = fs.createWriteStream(path, { flags: 'a' });
-
- const write = (text: string): Promise<void> => {
- return new Promise<void>((res, rej) => {
- stream.write(text, err => {
- if (err) {
- this.logger.error(err);
- rej(err);
- } else {
- res();
- }
- });
- });
- };
-
- await write('[');
-
- let exportedNotesCount = 0;
- let cursor: MiNote['id'] | null = null;
-
- while (true) {
- const notes = await this.notesRepository.find({
- where: {
- userId: user.id,
- ...(cursor ? { id: MoreThan(cursor) } : {}),
- },
- take: 100,
- order: {
- id: 1,
- },
- }) as MiNote[];
-
- if (notes.length === 0) {
- job.updateProgress(100);
- break;
- }
-
- cursor = notes.at(-1)?.id ?? null;
-
- for (const note of notes) {
- let poll: MiPoll | undefined;
- if (note.hasPoll) {
- poll = await this.pollsRepository.findOneByOrFail({ noteId: note.id });
- }
- const files = await this.driveFileEntityService.packManyByIds(note.fileIds);
- const content = JSON.stringify(this.serialize(note, poll, files));
- const isFirst = exportedNotesCount === 0;
- await write(isFirst ? content : ',\n' + content);
- exportedNotesCount++;
- }
-
- const total = await this.notesRepository.countBy({
- userId: user.id,
- });
+ // メモリが足りなくならないようにストリームで処理する
+ await new NoteStream(
+ job,
+ this.notesRepository,
+ this.pollsRepository,
+ this.driveFileEntityService,
+ this.idService,
+ user.id,
+ )
+ .pipeThrough(new JsonArrayStream())
+ .pipeThrough(new TextEncoderStream())
+ .pipeTo(new FileWriterStream(path));
- job.updateProgress(exportedNotesCount / total);
- }
-
- await write(']');
-
- stream.end();
this.logger.succ(`Exported to: ${path}`);
const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json';
@@ -130,22 +154,4 @@ export class ExportNotesProcessorService {
cleanup();
}
}
-
- private serialize(note: MiNote, poll: MiPoll | null = null, files: Packed<'DriveFile'>[]): Record<string, unknown> {
- return {
- id: note.id,
- text: note.text,
- createdAt: this.idService.parse(note.id).date.toISOString(),
- fileIds: note.fileIds,
- files: files,
- replyId: note.replyId,
- renoteId: note.renoteId,
- poll: poll,
- cw: note.cw,
- visibility: note.visibility,
- visibleUserIds: note.visibleUserIds,
- localOnly: note.localOnly,
- reactionAcceptance: note.reactionAcceptance,
- };
- }
}