diff options
Diffstat (limited to 'packages/backend/src/queue/processors')
13 files changed, 58 insertions, 40 deletions
diff --git a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts index c54bf59ae4..6f887089eb 100644 --- a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts +++ b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts @@ -1,7 +1,7 @@ import { Inject, Injectable } from '@nestjs/common'; import { IsNull, MoreThan, Not } from 'typeorm'; import { DI } from '@/di-symbols.js'; -import type { DriveFilesRepository } from '@/models/index.js'; +import type { DriveFile, DriveFilesRepository } from '@/models/index.js'; import type { Config } from '@/config.js'; import type Logger from '@/logger.js'; import { DriveService } from '@/core/DriveService.js'; @@ -31,7 +31,7 @@ export class CleanRemoteFilesProcessorService { this.logger.info('Deleting cached remote files...'); let deletedCount = 0; - let cursor: any = null; + let cursor: DriveFile['id'] | null = null; while (true) { const files = await this.driveFilesRepository.find({ @@ -51,7 +51,7 @@ export class CleanRemoteFilesProcessorService { break; } - cursor = files[files.length - 1].id; + cursor = files.at(-1)?.id ?? null; await Promise.all(files.map(file => this.driveService.deleteFileSync(file, true))); diff --git a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts index 39dd801af0..3b7db5f05c 100644 --- a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts +++ b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts @@ -9,6 +9,7 @@ import type { DriveFile } from '@/models/entities/DriveFile.js'; import type { Note } from '@/models/entities/Note.js'; import { EmailService } from '@/core/EmailService.js'; import { bindThis } from '@/decorators.js'; +import { SearchService } from '@/core/SearchService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbUserDeleteJobData } from '../types.js'; @@ -36,6 +37,7 @@ export class DeleteAccountProcessorService { private driveService: DriveService, private emailService: EmailService, private queueLoggerService: QueueLoggerService, + private searchService: SearchService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('delete-account'); } @@ -68,9 +70,13 @@ export class DeleteAccountProcessorService { break; } - cursor = notes[notes.length - 1].id; + cursor = notes.at(-1)?.id ?? null; await this.notesRepository.delete(notes.map(note => note.id)); + + for (const note of notes) { + await this.searchService.unindexNote(note); + } } this.logger.succ('All of notes deleted'); @@ -95,7 +101,7 @@ export class DeleteAccountProcessorService { break; } - cursor = files[files.length - 1].id; + cursor = files.at(-1)?.id ?? null; for (const file of files) { await this.driveService.deleteFileSync(file); diff --git a/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts b/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts index 6772c5dc76..07e3762330 100644 --- a/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts +++ b/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts @@ -1,7 +1,7 @@ import { Inject, Injectable } from '@nestjs/common'; import { MoreThan } from 'typeorm'; import { DI } from '@/di-symbols.js'; -import type { UsersRepository, DriveFilesRepository } from '@/models/index.js'; +import type { UsersRepository, DriveFilesRepository, DriveFile } from '@/models/index.js'; import type { Config } from '@/config.js'; import type Logger from '@/logger.js'; import { DriveService } from '@/core/DriveService.js'; @@ -40,7 +40,7 @@ export class DeleteDriveFilesProcessorService { } let deletedCount = 0; - let cursor: any = null; + let cursor: DriveFile['id'] | null = null; while (true) { const files = await this.driveFilesRepository.find({ @@ -59,7 +59,7 @@ export class DeleteDriveFilesProcessorService { break; } - cursor = files[files.length - 1].id; + cursor = files.at(-1)?.id ?? null; for (const file of files) { await this.driveService.deleteFileSync(file); diff --git a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts index ac52325c8d..21c0bfe80e 100644 --- a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts @@ -30,7 +30,7 @@ export class ExportAntennasProcessorService { @Inject(DI.userListJoiningsRepository) private userListJoiningsRepository: UserListJoiningsRepository, - + private driveService: DriveService, private utilityService: UtilityService, private queueLoggerService: QueueLoggerService, diff --git a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts index eb758e162d..d100c6d09f 100644 --- a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts @@ -3,7 +3,7 @@ import { Inject, Injectable } from '@nestjs/common'; import { MoreThan } from 'typeorm'; import { format as dateFormat } from 'date-fns'; import { DI } from '@/di-symbols.js'; -import type { UsersRepository, BlockingsRepository } from '@/models/index.js'; +import type { UsersRepository, BlockingsRepository, Blocking } from '@/models/index.js'; import type { Config } from '@/config.js'; import type Logger from '@/logger.js'; import { DriveService } from '@/core/DriveService.js'; @@ -53,7 +53,7 @@ export class ExportBlockingProcessorService { const stream = fs.createWriteStream(path, { flags: 'a' }); let exportedCount = 0; - let cursor: any = null; + let cursor: Blocking['id'] | null = null; while (true) { const blockings = await this.blockingsRepository.find({ @@ -72,7 +72,7 @@ export class ExportBlockingProcessorService { break; } - cursor = blockings[blockings.length - 1].id; + cursor = blockings.at(-1)?.id ?? null; for (const block of blockings) { const u = await this.usersRepository.findOneBy({ id: block.blockeeId }); diff --git a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts index 76c38a6b86..2be42b1a7a 100644 --- a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts @@ -94,7 +94,7 @@ export class ExportFavoritesProcessorService { break; } - cursor = favorites[favorites.length - 1].id; + cursor = favorites.at(-1)?.id ?? null; for (const favorite of favorites) { let poll: Poll | undefined; diff --git a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts index 8726cb1402..d54e5e0b34 100644 --- a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts @@ -79,7 +79,7 @@ export class ExportFollowingProcessorService { break; } - cursor = followings[followings.length - 1].id; + cursor = followings.at(-1)?.id ?? null; for (const following of followings) { const u = await this.usersRepository.findOneBy({ id: following.followeeId }); diff --git a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts index 0f11a9e843..030e38931e 100644 --- a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts @@ -3,7 +3,7 @@ import { Inject, Injectable } from '@nestjs/common'; import { IsNull, MoreThan } from 'typeorm'; import { format as dateFormat } from 'date-fns'; import { DI } from '@/di-symbols.js'; -import type { MutingsRepository, UsersRepository, BlockingsRepository } from '@/models/index.js'; +import type { MutingsRepository, UsersRepository, BlockingsRepository, Muting } from '@/models/index.js'; import type { Config } from '@/config.js'; import type Logger from '@/logger.js'; import { DriveService } from '@/core/DriveService.js'; @@ -56,7 +56,7 @@ export class ExportMutingProcessorService { const stream = fs.createWriteStream(path, { flags: 'a' }); let exportedCount = 0; - let cursor: any = null; + let cursor: Muting['id'] | null = null; while (true) { const mutes = await this.mutingsRepository.find({ @@ -76,7 +76,7 @@ export class ExportMutingProcessorService { break; } - cursor = mutes[mutes.length - 1].id; + cursor = mutes.at(-1)?.id ?? null; for (const mute of mutes) { const u = await this.usersRepository.findOneBy({ id: mute.muteeId }); diff --git a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts index 24fb331883..94c81a3cf8 100644 --- a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts @@ -11,6 +11,8 @@ import { createTemp } from '@/misc/create-temp.js'; import type { Poll } from '@/models/entities/Poll.js'; import type { Note } from '@/models/entities/Note.js'; import { bindThis } from '@/decorators.js'; +import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js'; +import { Packed } from '@/misc/json-schema.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbJobDataWithUser } from '../types.js'; @@ -34,6 +36,8 @@ export class ExportNotesProcessorService { private driveService: DriveService, private queueLoggerService: QueueLoggerService, + + private driveFileEntityService: DriveFileEntityService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('export-notes'); } @@ -90,14 +94,15 @@ export class ExportNotesProcessorService { break; } - cursor = notes[notes.length - 1].id; + cursor = notes.at(-1)?.id ?? null; for (const note of notes) { let poll: Poll | undefined; if (note.hasPoll) { poll = await this.pollsRepository.findOneByOrFail({ noteId: note.id }); } - const content = JSON.stringify(serialize(note, poll)); + const files = await this.driveFileEntityService.packManyByIds(note.fileIds); + const content = JSON.stringify(serialize(note, poll, files)); const isFirst = exportedNotesCount === 0; await write(isFirst ? content : ',\n' + content); exportedNotesCount++; @@ -125,12 +130,13 @@ export class ExportNotesProcessorService { } } -function serialize(note: Note, poll: Poll | null = null): Record<string, unknown> { +function serialize(note: Note, poll: Poll | null = null, files: Packed<'DriveFile'>[]): Record<string, unknown> { return { id: note.id, text: note.text, createdAt: note.createdAt, fileIds: note.fileIds, + files: files, replyId: note.replyId, renoteId: note.renoteId, poll: poll, diff --git a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts index 575cad69d5..74ef20fdd8 100644 --- a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts @@ -1,5 +1,5 @@ import { Injectable, Inject } from '@nestjs/common'; -import Ajv from 'ajv'; +import _Ajv from 'ajv'; import { IdService } from '@/core/IdService.js'; import { GlobalEventService } from '@/core/GlobalEventService.js'; import Logger from '@/logger.js'; @@ -10,16 +10,18 @@ import { QueueLoggerService } from '../QueueLoggerService.js'; import { DBAntennaImportJobData } from '../types.js'; import type * as Bull from 'bullmq'; +const Ajv = _Ajv.default; + const validate = new Ajv().compile({ type: 'object', properties: { name: { type: 'string', minLength: 1, maxLength: 100 }, src: { type: 'string', enum: ['home', 'all', 'users', 'list'] }, - userListAccts: { - type: 'array', + userListAccts: { + type: 'array', items: { type: 'string', - }, + }, nullable: true, }, keywords: { type: 'array', items: { diff --git a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts index d862567871..37b929cb03 100644 --- a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts @@ -1,7 +1,7 @@ import * as fs from 'node:fs'; import { Inject, Injectable } from '@nestjs/common'; +import { ZipReader } from 'slacc'; import { DataSource } from 'typeorm'; -import unzipper from 'unzipper'; import { DI } from '@/di-symbols.js'; import type { EmojisRepository, DriveFilesRepository, UsersRepository } from '@/models/index.js'; import type { Config } from '@/config.js'; @@ -72,9 +72,9 @@ export class ImportCustomEmojisProcessorService { } const outputPath = path + '/emojis'; - const unzipStream = fs.createReadStream(destPath); - const extractor = unzipper.Extract({ path: outputPath }); - extractor.on('close', async () => { + try { + this.logger.succ(`Unzipping to ${outputPath}`); + ZipReader.withDestinationPath(outputPath).viaBuffer(await fs.promises.readFile(destPath)); const metaRaw = fs.readFileSync(outputPath + '/meta.json', 'utf-8'); const meta = JSON.parse(metaRaw); @@ -113,10 +113,14 @@ export class ImportCustomEmojisProcessorService { } cleanup(); - + this.logger.succ('Imported'); - }); - unzipStream.pipe(extractor); - this.logger.succ(`Unzipping to ${outputPath}`); + } catch (e) { + if (e instanceof Error || typeof e === 'string') { + this.logger.error(e); + } + cleanup(); + throw e; + } } } diff --git a/packages/backend/src/queue/processors/RelationshipProcessorService.ts b/packages/backend/src/queue/processors/RelationshipProcessorService.ts index 722260d948..816c5fc5ec 100644 --- a/packages/backend/src/queue/processors/RelationshipProcessorService.ts +++ b/packages/backend/src/queue/processors/RelationshipProcessorService.ts @@ -1,16 +1,16 @@ import { Inject, Injectable } from '@nestjs/common'; -import type * as Bull from 'bullmq'; import { UserFollowingService } from '@/core/UserFollowingService.js'; import { UserBlockingService } from '@/core/UserBlockingService.js'; import { bindThis } from '@/decorators.js'; import type Logger from '@/logger.js'; -import { QueueLoggerService } from '../QueueLoggerService.js'; -import { RelationshipJobData } from '../types.js'; import type { UsersRepository } from '@/models/index.js'; import { DI } from '@/di-symbols.js'; import { LocalUser, RemoteUser } from '@/models/entities/User.js'; +import { RelationshipJobData } from '../types.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type * as Bull from 'bullmq'; @Injectable() export class RelationshipProcessorService { diff --git a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts index 8b40c16749..25e91761ef 100644 --- a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts +++ b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts @@ -31,7 +31,7 @@ export class WebhookDeliverProcessorService { public async process(job: Bull.Job<WebhookDeliverJobData>): Promise<string> { try { this.logger.debug(`delivering ${job.data.webhookId}`); - + const res = await this.httpRequestService.send(job.data.to, { method: 'POST', headers: { @@ -50,25 +50,25 @@ export class WebhookDeliverProcessorService { body: job.data.content, }), }); - + this.webhooksRepository.update({ id: job.data.webhookId }, { latestSentAt: new Date(), latestStatus: res.status, }); - + return 'Success'; } catch (res) { this.webhooksRepository.update({ id: job.data.webhookId }, { latestSentAt: new Date(), latestStatus: res instanceof StatusError ? res.statusCode : 1, }); - + if (res instanceof StatusError) { // 4xx if (res.isClientError) { throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`); } - + // 5xx etc. throw new Error(`${res.statusCode} ${res.statusMessage}`); } else { |