summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2023-07-21 20:36:07 +0900
committerGitHub <noreply@github.com>2023-07-21 20:36:07 +0900
commite64a81aa1d2801516e8eac8dc69aac540489f20b (patch)
tree56accbc0f5f71db864e1e975920135fb0a957291 /packages/backend/src/queue
parentMerge pull request #10990 from misskey-dev/develop (diff)
parentNew Crowdin updates (#11336) (diff)
downloadmisskey-e64a81aa1d2801516e8eac8dc69aac540489f20b.tar.gz
misskey-e64a81aa1d2801516e8eac8dc69aac540489f20b.tar.bz2
misskey-e64a81aa1d2801516e8eac8dc69aac540489f20b.zip
Merge pull request #11301 from misskey-dev/develop
Release: 13.14.0
Diffstat (limited to '')
-rw-r--r--packages/backend/src/queue/QueueProcessorService.ts2
-rw-r--r--packages/backend/src/queue/const.ts7
-rw-r--r--packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts6
-rw-r--r--packages/backend/src/queue/processors/DeleteAccountProcessorService.ts10
-rw-r--r--packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts6
-rw-r--r--packages/backend/src/queue/processors/ExportAntennasProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ExportBlockingProcessorService.ts6
-rw-r--r--packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ExportFollowingProcessorService.ts2
-rw-r--r--packages/backend/src/queue/processors/ExportMutingProcessorService.ts6
-rw-r--r--packages/backend/src/queue/processors/ExportNotesProcessorService.ts12
-rw-r--r--packages/backend/src/queue/processors/ImportAntennasProcessorService.ts10
-rw-r--r--packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts20
-rw-r--r--packages/backend/src/queue/processors/RelationshipProcessorService.ts6
-rw-r--r--packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts10
15 files changed, 61 insertions, 46 deletions
diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts
index 42f9c1af7d..f575b1718e 100644
--- a/packages/backend/src/queue/QueueProcessorService.ts
+++ b/packages/backend/src/queue/QueueProcessorService.ts
@@ -283,7 +283,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
const relationshipLogger = this.logger.createSubLogger('relationship');
-
+
this.relationshipQueueWorker
.on('active', (job) => relationshipLogger.debug(`active id=${job.id}`))
.on('completed', (job, result) => relationshipLogger.debug(`completed(${result}) id=${job.id}`))
diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts
index d240fe70e0..d49951a1c3 100644
--- a/packages/backend/src/queue/const.ts
+++ b/packages/backend/src/queue/const.ts
@@ -15,11 +15,8 @@ export const QUEUE = {
export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions {
return {
connection: {
- port: config.redisForJobQueue.port,
- host: config.redisForJobQueue.host,
- family: config.redisForJobQueue.family == null ? 0 : config.redisForJobQueue.family,
- password: config.redisForJobQueue.pass,
- db: config.redisForJobQueue.db ?? 0,
+ ...config.redisForJobQueue,
+ keyPrefix: undefined
},
prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue:${queueName}` : `queue:${queueName}`,
};
diff --git a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
index c54bf59ae4..6f887089eb 100644
--- a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
+++ b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts
@@ -1,7 +1,7 @@
import { Inject, Injectable } from '@nestjs/common';
import { IsNull, MoreThan, Not } from 'typeorm';
import { DI } from '@/di-symbols.js';
-import type { DriveFilesRepository } from '@/models/index.js';
+import type { DriveFile, DriveFilesRepository } from '@/models/index.js';
import type { Config } from '@/config.js';
import type Logger from '@/logger.js';
import { DriveService } from '@/core/DriveService.js';
@@ -31,7 +31,7 @@ export class CleanRemoteFilesProcessorService {
this.logger.info('Deleting cached remote files...');
let deletedCount = 0;
- let cursor: any = null;
+ let cursor: DriveFile['id'] | null = null;
while (true) {
const files = await this.driveFilesRepository.find({
@@ -51,7 +51,7 @@ export class CleanRemoteFilesProcessorService {
break;
}
- cursor = files[files.length - 1].id;
+ cursor = files.at(-1)?.id ?? null;
await Promise.all(files.map(file => this.driveService.deleteFileSync(file, true)));
diff --git a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
index 39dd801af0..3b7db5f05c 100644
--- a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts
@@ -9,6 +9,7 @@ import type { DriveFile } from '@/models/entities/DriveFile.js';
import type { Note } from '@/models/entities/Note.js';
import { EmailService } from '@/core/EmailService.js';
import { bindThis } from '@/decorators.js';
+import { SearchService } from '@/core/SearchService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
import type { DbUserDeleteJobData } from '../types.js';
@@ -36,6 +37,7 @@ export class DeleteAccountProcessorService {
private driveService: DriveService,
private emailService: EmailService,
private queueLoggerService: QueueLoggerService,
+ private searchService: SearchService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('delete-account');
}
@@ -68,9 +70,13 @@ export class DeleteAccountProcessorService {
break;
}
- cursor = notes[notes.length - 1].id;
+ cursor = notes.at(-1)?.id ?? null;
await this.notesRepository.delete(notes.map(note => note.id));
+
+ for (const note of notes) {
+ await this.searchService.unindexNote(note);
+ }
}
this.logger.succ('All of notes deleted');
@@ -95,7 +101,7 @@ export class DeleteAccountProcessorService {
break;
}
- cursor = files[files.length - 1].id;
+ cursor = files.at(-1)?.id ?? null;
for (const file of files) {
await this.driveService.deleteFileSync(file);
diff --git a/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts b/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts
index 6772c5dc76..07e3762330 100644
--- a/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts
+++ b/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts
@@ -1,7 +1,7 @@
import { Inject, Injectable } from '@nestjs/common';
import { MoreThan } from 'typeorm';
import { DI } from '@/di-symbols.js';
-import type { UsersRepository, DriveFilesRepository } from '@/models/index.js';
+import type { UsersRepository, DriveFilesRepository, DriveFile } from '@/models/index.js';
import type { Config } from '@/config.js';
import type Logger from '@/logger.js';
import { DriveService } from '@/core/DriveService.js';
@@ -40,7 +40,7 @@ export class DeleteDriveFilesProcessorService {
}
let deletedCount = 0;
- let cursor: any = null;
+ let cursor: DriveFile['id'] | null = null;
while (true) {
const files = await this.driveFilesRepository.find({
@@ -59,7 +59,7 @@ export class DeleteDriveFilesProcessorService {
break;
}
- cursor = files[files.length - 1].id;
+ cursor = files.at(-1)?.id ?? null;
for (const file of files) {
await this.driveService.deleteFileSync(file);
diff --git a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
index ac52325c8d..21c0bfe80e 100644
--- a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts
@@ -30,7 +30,7 @@ export class ExportAntennasProcessorService {
@Inject(DI.userListJoiningsRepository)
private userListJoiningsRepository: UserListJoiningsRepository,
-
+
private driveService: DriveService,
private utilityService: UtilityService,
private queueLoggerService: QueueLoggerService,
diff --git a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts
index eb758e162d..d100c6d09f 100644
--- a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts
@@ -3,7 +3,7 @@ import { Inject, Injectable } from '@nestjs/common';
import { MoreThan } from 'typeorm';
import { format as dateFormat } from 'date-fns';
import { DI } from '@/di-symbols.js';
-import type { UsersRepository, BlockingsRepository } from '@/models/index.js';
+import type { UsersRepository, BlockingsRepository, Blocking } from '@/models/index.js';
import type { Config } from '@/config.js';
import type Logger from '@/logger.js';
import { DriveService } from '@/core/DriveService.js';
@@ -53,7 +53,7 @@ export class ExportBlockingProcessorService {
const stream = fs.createWriteStream(path, { flags: 'a' });
let exportedCount = 0;
- let cursor: any = null;
+ let cursor: Blocking['id'] | null = null;
while (true) {
const blockings = await this.blockingsRepository.find({
@@ -72,7 +72,7 @@ export class ExportBlockingProcessorService {
break;
}
- cursor = blockings[blockings.length - 1].id;
+ cursor = blockings.at(-1)?.id ?? null;
for (const block of blockings) {
const u = await this.usersRepository.findOneBy({ id: block.blockeeId });
diff --git a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts
index 76c38a6b86..2be42b1a7a 100644
--- a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts
@@ -94,7 +94,7 @@ export class ExportFavoritesProcessorService {
break;
}
- cursor = favorites[favorites.length - 1].id;
+ cursor = favorites.at(-1)?.id ?? null;
for (const favorite of favorites) {
let poll: Poll | undefined;
diff --git a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts
index 8726cb1402..d54e5e0b34 100644
--- a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts
@@ -79,7 +79,7 @@ export class ExportFollowingProcessorService {
break;
}
- cursor = followings[followings.length - 1].id;
+ cursor = followings.at(-1)?.id ?? null;
for (const following of followings) {
const u = await this.usersRepository.findOneBy({ id: following.followeeId });
diff --git a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts
index 0f11a9e843..030e38931e 100644
--- a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts
@@ -3,7 +3,7 @@ import { Inject, Injectable } from '@nestjs/common';
import { IsNull, MoreThan } from 'typeorm';
import { format as dateFormat } from 'date-fns';
import { DI } from '@/di-symbols.js';
-import type { MutingsRepository, UsersRepository, BlockingsRepository } from '@/models/index.js';
+import type { MutingsRepository, UsersRepository, BlockingsRepository, Muting } from '@/models/index.js';
import type { Config } from '@/config.js';
import type Logger from '@/logger.js';
import { DriveService } from '@/core/DriveService.js';
@@ -56,7 +56,7 @@ export class ExportMutingProcessorService {
const stream = fs.createWriteStream(path, { flags: 'a' });
let exportedCount = 0;
- let cursor: any = null;
+ let cursor: Muting['id'] | null = null;
while (true) {
const mutes = await this.mutingsRepository.find({
@@ -76,7 +76,7 @@ export class ExportMutingProcessorService {
break;
}
- cursor = mutes[mutes.length - 1].id;
+ cursor = mutes.at(-1)?.id ?? null;
for (const mute of mutes) {
const u = await this.usersRepository.findOneBy({ id: mute.muteeId });
diff --git a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
index 24fb331883..94c81a3cf8 100644
--- a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
+++ b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts
@@ -11,6 +11,8 @@ import { createTemp } from '@/misc/create-temp.js';
import type { Poll } from '@/models/entities/Poll.js';
import type { Note } from '@/models/entities/Note.js';
import { bindThis } from '@/decorators.js';
+import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js';
+import { Packed } from '@/misc/json-schema.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
import type { DbJobDataWithUser } from '../types.js';
@@ -34,6 +36,8 @@ export class ExportNotesProcessorService {
private driveService: DriveService,
private queueLoggerService: QueueLoggerService,
+
+ private driveFileEntityService: DriveFileEntityService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('export-notes');
}
@@ -90,14 +94,15 @@ export class ExportNotesProcessorService {
break;
}
- cursor = notes[notes.length - 1].id;
+ cursor = notes.at(-1)?.id ?? null;
for (const note of notes) {
let poll: Poll | undefined;
if (note.hasPoll) {
poll = await this.pollsRepository.findOneByOrFail({ noteId: note.id });
}
- const content = JSON.stringify(serialize(note, poll));
+ const files = await this.driveFileEntityService.packManyByIds(note.fileIds);
+ const content = JSON.stringify(serialize(note, poll, files));
const isFirst = exportedNotesCount === 0;
await write(isFirst ? content : ',\n' + content);
exportedNotesCount++;
@@ -125,12 +130,13 @@ export class ExportNotesProcessorService {
}
}
-function serialize(note: Note, poll: Poll | null = null): Record<string, unknown> {
+function serialize(note: Note, poll: Poll | null = null, files: Packed<'DriveFile'>[]): Record<string, unknown> {
return {
id: note.id,
text: note.text,
createdAt: note.createdAt,
fileIds: note.fileIds,
+ files: files,
replyId: note.replyId,
renoteId: note.renoteId,
poll: poll,
diff --git a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
index 575cad69d5..74ef20fdd8 100644
--- a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts
@@ -1,5 +1,5 @@
import { Injectable, Inject } from '@nestjs/common';
-import Ajv from 'ajv';
+import _Ajv from 'ajv';
import { IdService } from '@/core/IdService.js';
import { GlobalEventService } from '@/core/GlobalEventService.js';
import Logger from '@/logger.js';
@@ -10,16 +10,18 @@ import { QueueLoggerService } from '../QueueLoggerService.js';
import { DBAntennaImportJobData } from '../types.js';
import type * as Bull from 'bullmq';
+const Ajv = _Ajv.default;
+
const validate = new Ajv().compile({
type: 'object',
properties: {
name: { type: 'string', minLength: 1, maxLength: 100 },
src: { type: 'string', enum: ['home', 'all', 'users', 'list'] },
- userListAccts: {
- type: 'array',
+ userListAccts: {
+ type: 'array',
items: {
type: 'string',
- },
+ },
nullable: true,
},
keywords: { type: 'array', items: {
diff --git a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
index d862567871..37b929cb03 100644
--- a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
+++ b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts
@@ -1,7 +1,7 @@
import * as fs from 'node:fs';
import { Inject, Injectable } from '@nestjs/common';
+import { ZipReader } from 'slacc';
import { DataSource } from 'typeorm';
-import unzipper from 'unzipper';
import { DI } from '@/di-symbols.js';
import type { EmojisRepository, DriveFilesRepository, UsersRepository } from '@/models/index.js';
import type { Config } from '@/config.js';
@@ -72,9 +72,9 @@ export class ImportCustomEmojisProcessorService {
}
const outputPath = path + '/emojis';
- const unzipStream = fs.createReadStream(destPath);
- const extractor = unzipper.Extract({ path: outputPath });
- extractor.on('close', async () => {
+ try {
+ this.logger.succ(`Unzipping to ${outputPath}`);
+ ZipReader.withDestinationPath(outputPath).viaBuffer(await fs.promises.readFile(destPath));
const metaRaw = fs.readFileSync(outputPath + '/meta.json', 'utf-8');
const meta = JSON.parse(metaRaw);
@@ -113,10 +113,14 @@ export class ImportCustomEmojisProcessorService {
}
cleanup();
-
+
this.logger.succ('Imported');
- });
- unzipStream.pipe(extractor);
- this.logger.succ(`Unzipping to ${outputPath}`);
+ } catch (e) {
+ if (e instanceof Error || typeof e === 'string') {
+ this.logger.error(e);
+ }
+ cleanup();
+ throw e;
+ }
}
}
diff --git a/packages/backend/src/queue/processors/RelationshipProcessorService.ts b/packages/backend/src/queue/processors/RelationshipProcessorService.ts
index 722260d948..816c5fc5ec 100644
--- a/packages/backend/src/queue/processors/RelationshipProcessorService.ts
+++ b/packages/backend/src/queue/processors/RelationshipProcessorService.ts
@@ -1,16 +1,16 @@
import { Inject, Injectable } from '@nestjs/common';
-import type * as Bull from 'bullmq';
import { UserFollowingService } from '@/core/UserFollowingService.js';
import { UserBlockingService } from '@/core/UserBlockingService.js';
import { bindThis } from '@/decorators.js';
import type Logger from '@/logger.js';
-import { QueueLoggerService } from '../QueueLoggerService.js';
-import { RelationshipJobData } from '../types.js';
import type { UsersRepository } from '@/models/index.js';
import { DI } from '@/di-symbols.js';
import { LocalUser, RemoteUser } from '@/models/entities/User.js';
+import { RelationshipJobData } from '../types.js';
+import { QueueLoggerService } from '../QueueLoggerService.js';
+import type * as Bull from 'bullmq';
@Injectable()
export class RelationshipProcessorService {
diff --git a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts
index 8b40c16749..25e91761ef 100644
--- a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts
+++ b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts
@@ -31,7 +31,7 @@ export class WebhookDeliverProcessorService {
public async process(job: Bull.Job<WebhookDeliverJobData>): Promise<string> {
try {
this.logger.debug(`delivering ${job.data.webhookId}`);
-
+
const res = await this.httpRequestService.send(job.data.to, {
method: 'POST',
headers: {
@@ -50,25 +50,25 @@ export class WebhookDeliverProcessorService {
body: job.data.content,
}),
});
-
+
this.webhooksRepository.update({ id: job.data.webhookId }, {
latestSentAt: new Date(),
latestStatus: res.status,
});
-
+
return 'Success';
} catch (res) {
this.webhooksRepository.update({ id: job.data.webhookId }, {
latestSentAt: new Date(),
latestStatus: res instanceof StatusError ? res.statusCode : 1,
});
-
+
if (res instanceof StatusError) {
// 4xx
if (res.isClientError) {
throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`);
}
-
+
// 5xx etc.
throw new Error(`${res.statusCode} ${res.statusMessage}`);
} else {