summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue
diff options
context:
space:
mode:
authordakkar <dakkar@thenautilus.net>2024-03-02 16:36:49 +0000
committerAmelia Yukii <amelia.yukii@shourai.de>2024-03-02 16:36:49 +0000
commitaf548d05ca821725eabd5a96241d3228b92186f0 (patch)
tree6d23f6739482466abcc71965cd83f9bbfb2c3ae0 /packages/backend/src/queue
parentmerge: Fix Images in ReadMe (!445) (diff)
downloadsharkey-af548d05ca821725eabd5a96241d3228b92186f0.tar.gz
sharkey-af548d05ca821725eabd5a96241d3228b92186f0.tar.bz2
sharkey-af548d05ca821725eabd5a96241d3228b92186f0.zip
merge upstream for 2024.2.1
Diffstat (limited to 'packages/backend/src/queue')
-rw-r--r--packages/backend/src/queue/processors/ExportNotesProcessorService.ts164
-rw-r--r--packages/backend/src/queue/processors/ImportNotesProcessorService.ts1
-rw-r--r--packages/backend/src/queue/processors/InboxProcessorService.ts5
-rw-r--r--packages/backend/src/queue/processors/RelationshipProcessorService.ts2
4 files changed, 90 insertions, 82 deletions
diff --git a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
index f2ae0ce4b4..c7611012d7 100644
--- a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
@@ -3,7 +3,7 @@
* SPDX-License-Identifier: AGPL-3.0-only
*/
-import * as fs from 'node:fs';
+import { ReadableStream, TextEncoderStream } from 'node:stream/web';
import { Inject, Injectable } from '@nestjs/common';
import { MoreThan } from 'typeorm';
import { format as dateFormat } from 'date-fns';
@@ -18,10 +18,82 @@ import { bindThis } from '@/decorators.js';
import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js';
import { Packed } from '@/misc/json-schema.js';
import { IdService } from '@/core/IdService.js';
+import { JsonArrayStream } from '@/misc/JsonArrayStream.js';
+import { FileWriterStream } from '@/misc/FileWriterStream.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
import type { DbJobDataWithUser } from '../types.js';
+class NoteStream extends ReadableStream<Record<string, unknown>> {
+ constructor(
+ job: Bull.Job,
+ notesRepository: NotesRepository,
+ pollsRepository: PollsRepository,
+ driveFileEntityService: DriveFileEntityService,
+ idService: IdService,
+ userId: string,
+ ) {
+ let exportedNotesCount = 0;
+ let cursor: MiNote['id'] | null = null;
+
+ const serialize = (
+ note: MiNote,
+ poll: MiPoll | null,
+ files: Packed<'DriveFile'>[],
+ ): Record<string, unknown> => {
+ return {
+ id: note.id,
+ text: note.text,
+ createdAt: idService.parse(note.id).date.toISOString(),
+ fileIds: note.fileIds,
+ files: files,
+ replyId: note.replyId,
+ renoteId: note.renoteId,
+ poll: poll,
+ cw: note.cw,
+ visibility: note.visibility,
+ visibleUserIds: note.visibleUserIds,
+ localOnly: note.localOnly,
+ reactionAcceptance: note.reactionAcceptance,
+ };
+ };
+
+ super({
+ async pull(controller): Promise<void> {
+ const notes = await notesRepository.find({
+ where: {
+ userId,
+ ...(cursor !== null ? { id: MoreThan(cursor) } : {}),
+ },
+ take: 100, // 100件ずつ取得
+ order: { id: 1 },
+ });
+
+ if (notes.length === 0) {
+ job.updateProgress(100);
+ controller.close();
+ }
+
+ cursor = notes.at(-1)?.id ?? null;
+
+ for (const note of notes) {
+ const poll = note.hasPoll
+ ? await pollsRepository.findOneByOrFail({ noteId: note.id }) // N+1
+ : null;
+ const files = await driveFileEntityService.packManyByIds(note.fileIds); // N+1
+ const content = serialize(note, poll, files);
+
+ controller.enqueue(content);
+ exportedNotesCount++;
+ }
+
+ const total = await notesRepository.countBy({ userId });
+ job.updateProgress(exportedNotesCount / total);
+ },
+ });
+ }
+}
+
@Injectable()
export class ExportNotesProcessorService {
private logger: Logger;
@@ -59,67 +131,19 @@ export class ExportNotesProcessorService {
this.logger.info(`Temp file is ${path}`);
try {
- const stream = fs.createWriteStream(path, { flags: 'a' });
-
- const write = (text: string): Promise<void> => {
- return new Promise<void>((res, rej) => {
- stream.write(text, err => {
- if (err) {
- this.logger.error(err);
- rej(err);
- } else {
- res();
- }
- });
- });
- };
-
- await write('[');
-
- let exportedNotesCount = 0;
- let cursor: MiNote['id'] | null = null;
-
- while (true) {
- const notes = await this.notesRepository.find({
- where: {
- userId: user.id,
- ...(cursor ? { id: MoreThan(cursor) } : {}),
- },
- take: 100,
- order: {
- id: 1,
- },
- }) as MiNote[];
-
- if (notes.length === 0) {
- job.updateProgress(100);
- break;
- }
-
- cursor = notes.at(-1)?.id ?? null;
-
- for (const note of notes) {
- let poll: MiPoll | undefined;
- if (note.hasPoll) {
- poll = await this.pollsRepository.findOneByOrFail({ noteId: note.id });
- }
- const files = await this.driveFileEntityService.packManyByIds(note.fileIds);
- const content = JSON.stringify(this.serialize(note, poll, files));
- const isFirst = exportedNotesCount === 0;
- await write(isFirst ? content : ',\n' + content);
- exportedNotesCount++;
- }
-
- const total = await this.notesRepository.countBy({
- userId: user.id,
- });
+ // メモリが足りなくならないようにストリームで処理する
+ await new NoteStream(
+ job,
+ this.notesRepository,
+ this.pollsRepository,
+ this.driveFileEntityService,
+ this.idService,
+ user.id,
+ )
+ .pipeThrough(new JsonArrayStream())
+ .pipeThrough(new TextEncoderStream())
+ .pipeTo(new FileWriterStream(path));
- job.updateProgress(exportedNotesCount / total);
- }
-
- await write(']');
-
- stream.end();
this.logger.succ(`Exported to: ${path}`);
const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json';
@@ -130,22 +154,4 @@ export class ExportNotesProcessorService {
cleanup();
}
}
-
- private serialize(note: MiNote, poll: MiPoll | null = null, files: Packed<'DriveFile'>[]): Record<string, unknown> {
- return {
- id: note.id,
- text: note.text,
- createdAt: this.idService.parse(note.id).date.toISOString(),
- fileIds: note.fileIds,
- files: files,
- replyId: note.replyId,
- renoteId: note.renoteId,
- poll: poll,
- cw: note.cw,
- visibility: note.visibility,
- visibleUserIds: note.visibleUserIds,
- localOnly: note.localOnly,
- reactionAcceptance: note.reactionAcceptance,
- };
- }
}
diff --git a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts
index f79ecb419a..67e2eb7407 100644
--- a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts
@@ -1,6 +1,5 @@
import * as fs from 'node:fs';
import * as fsp from 'node:fs/promises';
-import * as vm from 'node:vm';
import * as crypto from 'node:crypto';
import { Inject, Injectable } from '@nestjs/common';
import { ZipReader } from 'slacc';
diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts
index d5e103abe3..ad1d9799a7 100644
--- a/packages/backend/src/queue/processors/InboxProcessorService.ts
+++ b/packages/backend/src/queue/processors/InboxProcessorService.ts
@@ -196,7 +196,10 @@ export class InboxProcessorService {
await this.apInboxService.performActivity(authUser.user, activity);
} catch (e) {
if (e instanceof IdentifiableError) {
- if (e.id === '689ee33f-f97c-479a-ac49-1b9f8140af99') return 'blocked notes with prohibited words';
+ if (e.id === '689ee33f-f97c-479a-ac49-1b9f8140af99') {
+ return 'blocked notes with prohibited words';
+ }
+ if (e.id === '85ab9bd7-3a41-4530-959d-f07073900109') return 'actor has been suspended';
}
throw e;
}
diff --git a/packages/backend/src/queue/processors/RelationshipProcessorService.ts b/packages/backend/src/queue/processors/RelationshipProcessorService.ts
index 53dbb42169..408b02fb38 100644
--- a/packages/backend/src/queue/processors/RelationshipProcessorService.ts
+++ b/packages/backend/src/queue/processors/RelationshipProcessorService.ts
@@ -35,7 +35,7 @@ export class RelationshipProcessorService {
@bindThis
public async processFollow(job: Bull.Job<RelationshipJobData>): Promise<string> {
this.logger.info(`${job.data.from.id} is trying to follow ${job.data.to.id} ${job.data.withReplies ? "with replies" : "without replies"}`);
- await this.userFollowingService.followByThinUser(job.data.from, job.data.to, {
+ await this.userFollowingService.follow(job.data.from, job.data.to, {
requestId: job.data.requestId,
silent: job.data.silent,
withReplies: job.data.withReplies,