diff options
| author | Julia <julia@insertdomain.name> | 2025-06-19 21:35:18 +0000 |
|---|---|---|
| committer | Julia <julia@insertdomain.name> | 2025-06-19 21:35:18 +0000 |
| commit | a77c32b17da63d3932b219f74152cce023a30f4a (patch) | |
| tree | d2a05796e942c8f250bbd01369eab0cbe5a14531 /packages/backend/src/queue/processors | |
| parent | merge: release 2025.4.2 (!1051) (diff) | |
| parent | Merge branch 'develop' into release/2025.4.3 (diff) | |
| download | sharkey-a77c32b17da63d3932b219f74152cce023a30f4a.tar.gz sharkey-a77c32b17da63d3932b219f74152cce023a30f4a.tar.bz2 sharkey-a77c32b17da63d3932b219f74152cce023a30f4a.zip | |
merge: prepare release 2025.4.3 (!1125)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/1125
Approved-by: Marie <github@yuugi.dev>
Approved-by: Julia <julia@insertdomain.name>
Diffstat (limited to 'packages/backend/src/queue/processors')
33 files changed, 246 insertions, 212 deletions
diff --git a/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts index 4769cccabf..30bdd6ccca 100644 --- a/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts +++ b/packages/backend/src/queue/processors/AggregateRetentionProcessorService.ts @@ -62,7 +62,7 @@ export class AggregateRetentionProcessorService { }); } catch (err) { if (isDuplicateKeyValueError(err)) { - this.logger.succ('Skip because it has already been processed by another worker.'); + this.logger.debug('Skip because it has already been processed by another worker.'); return; } throw err; @@ -87,6 +87,6 @@ export class AggregateRetentionProcessorService { }); } - this.logger.succ('Retention aggregated.'); + this.logger.info('Retention aggregated.'); } } diff --git a/packages/backend/src/queue/processors/BakeBufferedReactionsProcessorService.ts b/packages/backend/src/queue/processors/BakeBufferedReactionsProcessorService.ts index d49c99f694..83b375de3f 100644 --- a/packages/backend/src/queue/processors/BakeBufferedReactionsProcessorService.ts +++ b/packages/backend/src/queue/processors/BakeBufferedReactionsProcessorService.ts @@ -37,6 +37,6 @@ export class BakeBufferedReactionsProcessorService { await this.reactionsBufferingService.bake(); - this.logger.succ('All buffered reactions baked.'); + this.logger.info('All buffered reactions baked.'); } } diff --git a/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts b/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts index 448fc9c763..76d0cb4304 100644 --- a/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts +++ b/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts @@ -41,6 +41,6 @@ export class CheckExpiredMutingsProcessorService { await this.userMutingService.unmute(expired); } - this.logger.succ('All expired mutings checked.'); + this.logger.info('All expired mutings checked.'); } } diff --git a/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts index db8d2e789e..7821cd3d1d 100644 --- a/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts +++ b/packages/backend/src/queue/processors/CheckModeratorsActivityProcessorService.ts @@ -98,16 +98,16 @@ export class CheckModeratorsActivityProcessorService { @bindThis public async process(): Promise<void> { - this.logger.info('start.'); + this.logger.debug('start.'); const meta = await this.metaService.fetch(false); if (!meta.disableRegistration) { await this.processImpl(); } else { - this.logger.info('is already invitation only.'); + this.logger.debug('is already invitation only.'); } - this.logger.succ('finish.'); + this.logger.debug('finish.'); } @bindThis diff --git a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts index 8c5faa8d07..c11682b0fe 100644 --- a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts +++ b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts @@ -62,6 +62,6 @@ export class CleanChartsProcessorService { await this.perUserDriveChart.clean(); await this.apRequestChart.clean(); - this.logger.succ('All charts successfully cleaned.'); + this.logger.info('All charts successfully cleaned.'); } } diff --git a/packages/backend/src/queue/processors/CleanProcessorService.ts b/packages/backend/src/queue/processors/CleanProcessorService.ts index a26b69cd2b..104d19103f 100644 --- a/packages/backend/src/queue/processors/CleanProcessorService.ts +++ b/packages/backend/src/queue/processors/CleanProcessorService.ts @@ -69,6 +69,6 @@ export class CleanProcessorService { this.reversiService.cleanOutdatedGames(); - this.logger.succ('Cleaned.'); + this.logger.info('Cleaned.'); } } diff --git a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts index 81842b221f..2eddae95c8 100644 --- a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts +++ b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts @@ -75,6 +75,6 @@ export class CleanRemoteFilesProcessorService { await job.updateProgress(100 / total * deletedCount); } - this.logger.succ(`All cached remote files processed. Total deleted: ${deletedCount}, Failed: ${errorCount}.`); + this.logger.info(`All cached remote files processed. Total deleted: ${deletedCount}, Failed: ${errorCount}.`); } } diff --git a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts index 4e9779a41b..5bf64e4f04 100644 --- a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts +++ b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts @@ -18,6 +18,7 @@ import { SearchService } from '@/core/SearchService.js'; import { ApLogService } from '@/core/ApLogService.js'; import { ReactionService } from '@/core/ReactionService.js'; import { QueueService } from '@/core/QueueService.js'; +import { CacheService } from '@/core/CacheService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbUserDeleteJobData } from '../types.js'; @@ -94,6 +95,7 @@ export class DeleteAccountProcessorService { private searchService: SearchService, private reactionService: ReactionService, private readonly apLogService: ApLogService, + private readonly cacheService: CacheService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('delete-account'); } @@ -128,7 +130,7 @@ export class DeleteAccountProcessorService { userId: user.id, }); - this.logger.succ('All clips have been deleted.'); + this.logger.info('All clips have been deleted.'); } { // Delete favorites @@ -136,10 +138,26 @@ export class DeleteAccountProcessorService { userId: user.id, }); - this.logger.succ('All favorites have been deleted.'); + this.logger.info('All favorites have been deleted.'); } { // Delete user relations + await this.cacheService.refreshFollowRelationsFor(user.id); + await this.cacheService.userFollowingsCache.delete(user.id); + await this.cacheService.userFollowingsCache.delete(user.id); + await this.cacheService.userBlockingCache.delete(user.id); + await this.cacheService.userBlockedCache.delete(user.id); + await this.cacheService.userMutingsCache.delete(user.id); + await this.cacheService.userMutingsCache.delete(user.id); + await this.cacheService.hibernatedUserCache.delete(user.id); + await this.cacheService.renoteMutingsCache.delete(user.id); + await this.cacheService.userProfileCache.delete(user.id); + this.cacheService.userByIdCache.delete(user.id); + this.cacheService.localUserByIdCache.delete(user.id); + if (user.token) { + this.cacheService.localUserByNativeTokenCache.delete(user.token); + } + await this.followingsRepository.delete({ followerId: user.id, }); @@ -172,7 +190,7 @@ export class DeleteAccountProcessorService { muteeId: user.id, }); - this.logger.succ('All user relations have been deleted.'); + this.logger.info('All user relations have been deleted.'); } { // Delete reactions @@ -206,7 +224,7 @@ export class DeleteAccountProcessorService { } } - this.logger.succ('All reactions have been deleted'); + this.logger.info('All reactions have been deleted'); } { // Poll votes @@ -238,7 +256,7 @@ export class DeleteAccountProcessorService { }); } - this.logger.succ('All poll votes have been deleted'); + this.logger.info('All poll votes have been deleted'); } { // Delete scheduled notes @@ -254,7 +272,7 @@ export class DeleteAccountProcessorService { userId: user.id, }); - this.logger.succ('All scheduled notes deleted'); + this.logger.info('All scheduled notes deleted'); } { // Delete notes @@ -312,7 +330,7 @@ export class DeleteAccountProcessorService { } } - this.logger.succ('All of notes deleted'); + this.logger.info('All of notes deleted'); } { // Delete files @@ -341,7 +359,7 @@ export class DeleteAccountProcessorService { } } - this.logger.succ('All of files deleted'); + this.logger.info('All of files deleted'); } { // Delete actor logs @@ -353,7 +371,7 @@ export class DeleteAccountProcessorService { await this.apLogService.deleteInboxLogs(user.id) .catch(err => this.logger.error(err, `Failed to delete AP logs for user '${user.uri}'`)); - this.logger.succ('All AP logs deleted'); + this.logger.info('All AP logs deleted'); } // Do this BEFORE deleting the account! @@ -379,7 +397,7 @@ export class DeleteAccountProcessorService { await this.usersRepository.delete(user.id); } - this.logger.succ('Account data deleted'); + this.logger.info('Account data deleted'); } { // Send email notification diff --git a/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts b/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts index 291fa4a6d8..ac3cddbed0 100644 --- a/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts +++ b/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts @@ -74,6 +74,6 @@ export class DeleteDriveFilesProcessorService { job.updateProgress(deletedCount / total); } - this.logger.succ(`All drive files (${deletedCount}) of ${user.id} has been deleted.`); + this.logger.info(`All drive files (${deletedCount}) of ${user.id} has been deleted.`); } } diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts index 5a16496011..fc4c8bb814 100644 --- a/packages/backend/src/queue/processors/DeliverProcessorService.ts +++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts @@ -133,23 +133,18 @@ export class DeliverProcessorService { } }); - if (res instanceof StatusError) { + if (res instanceof StatusError && !res.isRetryable) { // 4xx - if (!res.isRetryable) { - // 相手が閉鎖していることを明示しているため、配送停止する - if (job.data.isSharedInbox && res.statusCode === 410) { - this.federatedInstanceService.fetchOrRegister(host).then(i => { - this.federatedInstanceService.update(i.id, { - suspensionState: 'goneSuspended', - }); + // 相手が閉鎖していることを明示しているため、配送停止する + if (job.data.isSharedInbox && res.statusCode === 410) { + this.federatedInstanceService.fetchOrRegister(host).then(i => { + this.federatedInstanceService.update(i.id, { + suspensionState: 'goneSuspended', }); - throw new Bull.UnrecoverableError(`${host} is gone`); - } - throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`); + }); + throw new Bull.UnrecoverableError(`${host} is gone`); } - - // 5xx etc. - throw new Error(`${res.statusCode} ${res.statusMessage}`); + throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`); } else { // DNS error, socket error, timeout ... throw res; diff --git a/packages/backend/src/queue/processors/ExportAccountDataProcessorService.ts b/packages/backend/src/queue/processors/ExportAccountDataProcessorService.ts index 33a2362c4a..58d542635f 100644 --- a/packages/backend/src/queue/processors/ExportAccountDataProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportAccountDataProcessorService.ts @@ -22,6 +22,7 @@ import { Packed } from '@/misc/json-schema.js'; import { UtilityService } from '@/core/UtilityService.js'; import { DownloadService } from '@/core/DownloadService.js'; import { EmailService } from '@/core/EmailService.js'; +import { renderInlineError } from '@/misc/render-inline-error.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; @@ -85,21 +86,23 @@ export class ExportAccountDataProcessorService { @bindThis public async process(job: Bull.Job): Promise<void> { - this.logger.info('Exporting Account Data...'); - const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { + this.logger.debug(`Skip: user ${job.data.user.id} does not exist`); return; } const profile = await this.userProfilesRepository.findOneBy({ userId: job.data.user.id }); if (profile == null) { + this.logger.debug(`Skip: user ${job.data.user.id} has no profile`); return; } + this.logger.info(`Exporting account data for ${job.data.user.id} ...`); + const [path, cleanup] = await createTempDir(); - this.logger.info(`Temp dir is ${path}`); + this.logger.debug(`Temp dir is ${path}`); // User Export @@ -113,7 +116,7 @@ export class ExportAccountDataProcessorService { return new Promise<void>((res, rej) => { userStream.write(text, err => { if (err) { - this.logger.error(err); + this.logger.error('Error writing user:', err); rej(err); } else { res(); @@ -145,7 +148,7 @@ export class ExportAccountDataProcessorService { return new Promise<void>((res, rej) => { profileStream.write(text, err => { if (err) { - this.logger.error(err); + this.logger.error('Error writing profile:', err); rej(err); } else { res(); @@ -179,7 +182,7 @@ export class ExportAccountDataProcessorService { return new Promise<void>((res, rej) => { ipStream.write(text, err => { if (err) { - this.logger.error(err); + this.logger.error('Error writing IPs:', err); rej(err); } else { res(); @@ -214,7 +217,7 @@ export class ExportAccountDataProcessorService { return new Promise<void>((res, rej) => { notesStream.write(text, err => { if (err) { - this.logger.error(err); + this.logger.error('Error writing notes:', err); rej(err); } else { res(); @@ -275,7 +278,7 @@ export class ExportAccountDataProcessorService { return new Promise<void>((res, rej) => { followingStream.write(text, err => { if (err) { - this.logger.error(err); + this.logger.error('Error writing following:', err); rej(err); } else { res(); @@ -345,7 +348,7 @@ export class ExportAccountDataProcessorService { return new Promise<void>((res, rej) => { followerStream.write(text, err => { if (err) { - this.logger.error(err); + this.logger.error('Error writing followers:', err); rej(err); } else { res(); @@ -406,7 +409,7 @@ export class ExportAccountDataProcessorService { return new Promise<void>((res, rej) => { filesStream.write(text, err => { if (err) { - this.logger.error(err); + this.logger.error('Error writing drive:', err); rej(err); } else { res(); @@ -432,7 +435,7 @@ export class ExportAccountDataProcessorService { await this.downloadService.downloadUrl(file.url, filePath); downloaded = true; } catch (e) { - this.logger.error(e instanceof Error ? e : new Error(e as string)); + this.logger.error(`Error writing drive file ${file.id} (${file.name}): ${renderInlineError(e)}`); } if (!downloaded) { @@ -464,7 +467,7 @@ export class ExportAccountDataProcessorService { return new Promise<void>((res, rej) => { mutingStream.write(text, err => { if (err) { - this.logger.error(err); + this.logger.error('Error writing mutings:', err); rej(err); } else { res(); @@ -527,7 +530,7 @@ export class ExportAccountDataProcessorService { return new Promise<void>((res, rej) => { blockingStream.write(text, err => { if (err) { - this.logger.error(err); + this.logger.error('Error writing blockings:', err); rej(err); } else { res(); @@ -589,7 +592,7 @@ export class ExportAccountDataProcessorService { return new Promise<void>((res, rej) => { favoriteStream.write(text, err => { if (err) { - this.logger.error(err); + this.logger.error('Error writing favorites:', err); rej(err); } else { res(); @@ -650,7 +653,7 @@ export class ExportAccountDataProcessorService { return new Promise<void>((res, rej) => { antennaStream.write(text, err => { if (err) { - this.logger.error(err); + this.logger.error('Error writing antennas:', err); rej(err); } else { res(); @@ -708,7 +711,7 @@ export class ExportAccountDataProcessorService { return new Promise<void>((res, rej) => { listStream.write(text, err => { if (err) { - this.logger.error(err); + this.logger.error('Error writing lists:', err); rej(err); } else { res(); @@ -744,12 +747,12 @@ export class ExportAccountDataProcessorService { zlib: { level: 0 }, }); archiveStream.on('close', async () => { - this.logger.succ(`Exported to: ${archivePath}`); + this.logger.debug(`Exported to path: ${archivePath}`); const fileName = 'data-request-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.zip'; const driveFile = await this.driveService.addFile({ user, path: archivePath, name: fileName, force: true }); - this.logger.succ(`Exported to: ${driveFile.id}`); + this.logger.debug(`Exported to drive: ${driveFile.id}`); cleanup(); archiveCleanup(); if (profile.email) { diff --git a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts index b3111865ad..61d76da5ac 100644 --- a/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportAntennasProcessorService.ts @@ -45,15 +45,19 @@ export class ExportAntennasProcessorService { public async process(job: Bull.Job<DBExportAntennasData>): Promise<void> { const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { + this.logger.debug(`Skip: user ${job.data.user.id} does not exist`); return; } + + this.logger.info(`Exporting antennas of ${job.data.user.id} ...`); + const [path, cleanup] = await createTemp(); const stream = fs.createWriteStream(path, { flags: 'a' }); const write = (input: string): Promise<void> => { return new Promise((resolve, reject) => { stream.write(input, err => { if (err) { - this.logger.error(err); + this.logger.error('Error exporting antennas:', err); reject(); } else { resolve(); @@ -96,7 +100,7 @@ export class ExportAntennasProcessorService { const fileName = 'antennas-' + DateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json'; const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' }); - this.logger.succ('Exported to: ' + driveFile.id); + this.logger.debug('Exported to: ' + driveFile.id); this.notificationService.createNotification(user.id, 'exportCompleted', { exportedEntity: 'antenna', diff --git a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts index ecc439db69..4c17c3f718 100644 --- a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts @@ -40,17 +40,18 @@ export class ExportBlockingProcessorService { @bindThis public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> { - this.logger.info(`Exporting blocking of ${job.data.user.id} ...`); - const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { + this.logger.debug(`Skip: user ${job.data.user.id} does not exist`); return; } + this.logger.info(`Exporting blocking of ${job.data.user.id} ...`); + // Create temp file const [path, cleanup] = await createTemp(); - this.logger.info(`Temp file is ${path}`); + this.logger.debug(`Temp file is ${path}`); try { const stream = fs.createWriteStream(path, { flags: 'a' }); @@ -87,7 +88,7 @@ export class ExportBlockingProcessorService { await new Promise<void>((res, rej) => { stream.write(content + '\n', err => { if (err) { - this.logger.error(err); + this.logger.error('Error exporting blocking:', err); rej(err); } else { res(); @@ -105,12 +106,12 @@ export class ExportBlockingProcessorService { } stream.end(); - this.logger.succ(`Exported to: ${path}`); + this.logger.debug(`Exported to: ${path}`); const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv'; const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' }); - this.logger.succ(`Exported to: ${driveFile.id}`); + this.logger.debug(`Exported to: ${driveFile.id}`); this.notificationService.createNotification(user.id, 'exportCompleted', { exportedEntity: 'blocking', diff --git a/packages/backend/src/queue/processors/ExportClipsProcessorService.ts b/packages/backend/src/queue/processors/ExportClipsProcessorService.ts index 583ddbb745..1d34d2b4e6 100644 --- a/packages/backend/src/queue/processors/ExportClipsProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportClipsProcessorService.ts @@ -51,17 +51,18 @@ export class ExportClipsProcessorService { @bindThis public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> { - this.logger.info(`Exporting clips of ${job.data.user.id} ...`); - const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { + this.logger.debug(`Skip: user ${job.data.user.id} does not exist`); return; } + this.logger.info(`Exporting clips of ${job.data.user.id} ...`); + // Create temp file const [path, cleanup] = await createTemp(); - this.logger.info(`Temp file is ${path}`); + this.logger.debug(`Temp file is ${path}`); try { const stream = Writable.toWeb(fs.createWriteStream(path, { flags: 'a' })); @@ -75,12 +76,12 @@ export class ExportClipsProcessorService { await writer.write(']'); await writer.close(); - this.logger.succ(`Exported to: ${path}`); + this.logger.debug(`Exported to: ${path}`); const fileName = 'clips-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json'; const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' }); - this.logger.succ(`Exported to: ${driveFile.id}`); + this.logger.debug(`Exported to: ${driveFile.id}`); this.notificationService.createNotification(user.id, 'exportCompleted', { exportedEntity: 'clip', diff --git a/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts index 14d32e78b3..b8f208bbfc 100644 --- a/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts @@ -45,16 +45,17 @@ export class ExportCustomEmojisProcessorService { @bindThis public async process(job: Bull.Job): Promise<void> { - this.logger.info('Exporting custom emojis ...'); - const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { + this.logger.debug(`Skip: user ${job.data.user.id} does not exist`); return; } + this.logger.info(`Exporting custom emojis of ${job.data.user.id} ...`); + const [path, cleanup] = await createTempDir(); - this.logger.info(`Temp dir is ${path}`); + this.logger.debug(`Temp dir is ${path}`); const metaPath = path + '/meta.json'; @@ -66,7 +67,7 @@ export class ExportCustomEmojisProcessorService { return new Promise<void>((res, rej) => { metaStream.write(text, err => { if (err) { - this.logger.error(err); + this.logger.error('Error exporting custom emojis:', err); rej(err); } else { res(); @@ -101,7 +102,7 @@ export class ExportCustomEmojisProcessorService { await this.downloadService.downloadUrl(emoji.originalUrl, emojiPath); downloaded = true; } catch (e) { // TODO: 何度か再試行 - this.logger.error(e instanceof Error ? e : new Error(e as string)); + this.logger.error('Error exporting custom emojis:', e as Error); } if (!downloaded) { @@ -130,12 +131,12 @@ export class ExportCustomEmojisProcessorService { zlib: { level: 0 }, }); archiveStream.on('close', async () => { - this.logger.succ(`Exported to: ${archivePath}`); + this.logger.debug(`Exported to: ${archivePath}`); const fileName = 'custom-emojis-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.zip'; const driveFile = await this.driveService.addFile({ user, path: archivePath, name: fileName, force: true }); - this.logger.succ(`Exported to: ${driveFile.id}`); + this.logger.debug(`Exported to: ${driveFile.id}`); this.notificationService.createNotification(user.id, 'exportCompleted', { exportedEntity: 'customEmoji', diff --git a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts index b81feece01..b5716f2d49 100644 --- a/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportFavoritesProcessorService.ts @@ -45,17 +45,18 @@ export class ExportFavoritesProcessorService { @bindThis public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> { - this.logger.info(`Exporting favorites of ${job.data.user.id} ...`); - const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { + this.logger.debug(`Skip: user ${job.data.user.id} does not exist`); return; } + this.logger.info(`Exporting favorites of ${job.data.user.id} ...`); + // Create temp file const [path, cleanup] = await createTemp(); - this.logger.info(`Temp file is ${path}`); + this.logger.debug(`Temp file is ${path}`); try { const stream = fs.createWriteStream(path, { flags: 'a' }); @@ -64,7 +65,7 @@ export class ExportFavoritesProcessorService { return new Promise<void>((res, rej) => { stream.write(text, err => { if (err) { - this.logger.error(err); + this.logger.error('Error exporting favorites:', err); rej(err); } else { res(); @@ -119,12 +120,12 @@ export class ExportFavoritesProcessorService { await write(']'); stream.end(); - this.logger.succ(`Exported to: ${path}`); + this.logger.debug(`Exported to: ${path}`); const fileName = 'favorites-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json'; const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' }); - this.logger.succ(`Exported to: ${driveFile.id}`); + this.logger.debug(`Exported to: ${driveFile.id}`); this.notificationService.createNotification(user.id, 'exportCompleted', { exportedEntity: 'favorite', diff --git a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts index 903f962515..883f35e366 100644 --- a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts @@ -44,17 +44,18 @@ export class ExportFollowingProcessorService { @bindThis public async process(job: Bull.Job<DbExportFollowingData>): Promise<void> { - this.logger.info(`Exporting following of ${job.data.user.id} ...`); - const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { + this.logger.debug(`Skip: user ${job.data.user.id} does not exist`); return; } + this.logger.info(`Exporting following of ${job.data.user.id} ...`); + // Create temp file const [path, cleanup] = await createTemp(); - this.logger.info(`Temp file is ${path}`); + this.logger.debug(`Temp file is ${path}`); try { const stream = fs.createWriteStream(path, { flags: 'a' }); @@ -98,7 +99,7 @@ export class ExportFollowingProcessorService { await new Promise<void>((res, rej) => { stream.write(content + '\n', err => { if (err) { - this.logger.error(err); + this.logger.error('Error exporting following:', err); rej(err); } else { res(); @@ -109,12 +110,12 @@ export class ExportFollowingProcessorService { } stream.end(); - this.logger.succ(`Exported to: ${path}`); + this.logger.debug(`Exported to: ${path}`); const fileName = 'following-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv'; const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' }); - this.logger.succ(`Exported to: ${driveFile.id}`); + this.logger.debug(`Exported to: ${driveFile.id}`); this.notificationService.createNotification(user.id, 'exportCompleted', { exportedEntity: 'following', diff --git a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts index f9867ade29..9cdb94beaf 100644 --- a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts @@ -40,17 +40,18 @@ export class ExportMutingProcessorService { @bindThis public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> { - this.logger.info(`Exporting muting of ${job.data.user.id} ...`); - const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { + this.logger.debug(`Skip: user ${job.data.user.id} does not exist`); return; } + this.logger.debug(`Exporting muting of ${job.data.user.id} ...`); + // Create temp file const [path, cleanup] = await createTemp(); - this.logger.info(`Temp file is ${path}`); + this.logger.debug(`Temp file is ${path}`); try { const stream = fs.createWriteStream(path, { flags: 'a' }); @@ -88,7 +89,7 @@ export class ExportMutingProcessorService { await new Promise<void>((res, rej) => { stream.write(content + '\n', err => { if (err) { - this.logger.error(err); + this.logger.error('Error exporting mutings:', err); rej(err); } else { res(); @@ -106,12 +107,12 @@ export class ExportMutingProcessorService { } stream.end(); - this.logger.succ(`Exported to: ${path}`); + this.logger.debug(`Exported to: ${path}`); const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv'; const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' }); - this.logger.succ(`Exported to: ${driveFile.id}`); + this.logger.debug(`Exported to: ${driveFile.id}`); this.notificationService.createNotification(user.id, 'exportCompleted', { exportedEntity: 'muting', diff --git a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts index 9e2b678219..7d49a8dab2 100644 --- a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts @@ -120,17 +120,18 @@ export class ExportNotesProcessorService { @bindThis public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> { - this.logger.info(`Exporting notes of ${job.data.user.id} ...`); - const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { + this.logger.debug(`Skip: user ${job.data.user.id} does not exist`); return; } + this.logger.info(`Exporting notes of ${job.data.user.id} ...`); + // Create temp file const [path, cleanup] = await createTemp(); - this.logger.info(`Temp file is ${path}`); + this.logger.debug(`Temp file is ${path}`); try { // メモリが足りなくならないようにストリームで処理する @@ -146,12 +147,12 @@ export class ExportNotesProcessorService { .pipeThrough(new TextEncoderStream()) .pipeTo(new FileWriterStream(path)); - this.logger.succ(`Exported to: ${path}`); + this.logger.debug(`Exported to: ${path}`); const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json'; const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'json' }); - this.logger.succ(`Exported to: ${driveFile.id}`); + this.logger.debug(`Exported to: ${driveFile.id}`); this.notificationService.createNotification(user.id, 'exportCompleted', { exportedEntity: 'note', diff --git a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts index c483d79854..43043e3a26 100644 --- a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts @@ -43,13 +43,14 @@ export class ExportUserListsProcessorService { @bindThis public async process(job: Bull.Job<DbJobDataWithUser>): Promise<void> { - this.logger.info(`Exporting user lists of ${job.data.user.id} ...`); - const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { + this.logger.debug(`Skip: user ${job.data.user.id} does not exist`); return; } + this.logger.info(`Exporting user lists of ${job.data.user.id} ...`); + const lists = await this.userListsRepository.findBy({ userId: user.id, }); @@ -57,7 +58,7 @@ export class ExportUserListsProcessorService { // Create temp file const [path, cleanup] = await createTemp(); - this.logger.info(`Temp file is ${path}`); + this.logger.debug(`Temp file is ${path}`); try { const stream = fs.createWriteStream(path, { flags: 'a' }); @@ -74,7 +75,7 @@ export class ExportUserListsProcessorService { await new Promise<void>((res, rej) => { stream.write(content + '\n', err => { if (err) { - this.logger.error(err); + this.logger.error('Error exporting lists:', err); rej(err); } else { res(); @@ -85,12 +86,12 @@ export class ExportUserListsProcessorService { } stream.end(); - this.logger.succ(`Exported to: ${path}`); + this.logger.debug(`Exported to: ${path}`); const fileName = 'user-lists-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv'; const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true, ext: 'csv' }); - this.logger.succ(`Exported to: ${driveFile.id}`); + this.logger.debug(`Exported to: ${driveFile.id}`); this.notificationService.createNotification(user.id, 'exportCompleted', { exportedEntity: 'userList', diff --git a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts index 9c033b73e2..f29a19ce66 100644 --- a/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportAntennasProcessorService.ts @@ -8,7 +8,7 @@ import _Ajv from 'ajv'; import { IdService } from '@/core/IdService.js'; import { GlobalEventService } from '@/core/GlobalEventService.js'; import Logger from '@/logger.js'; -import type { AntennasRepository } from '@/models/_.js'; +import type { AntennasRepository, UsersRepository } from '@/models/_.js'; import { DI } from '@/di-symbols.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; @@ -59,6 +59,9 @@ export class ImportAntennasProcessorService { @Inject(DI.antennasRepository) private antennasRepository: AntennasRepository, + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + private queueLoggerService: QueueLoggerService, private idService: IdService, private globalEventService: GlobalEventService, @@ -68,12 +71,20 @@ export class ImportAntennasProcessorService { @bindThis public async process(job: Bull.Job<DBAntennaImportJobData>): Promise<void> { + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + this.logger.debug(`Skip: user ${job.data.user.id} does not exist`); + return; + } + + this.logger.debug(`Importing blocking of ${job.data.user.id} ...`); + const now = new Date(); try { for (const antenna of job.data.antenna) { if (antenna.keywords.length === 0 || antenna.keywords[0].every(x => x === '')) continue; if (!validate(antenna)) { - this.logger.warn('Validation Failed'); + this.logger.warn('Antenna validation failed'); continue; } const result = await this.antennasRepository.insertOne({ @@ -92,11 +103,11 @@ export class ImportAntennasProcessorService { withReplies: antenna.withReplies, withFile: antenna.withFile, }); - this.logger.succ('Antenna created: ' + result.id); + this.logger.debug('Antenna created: ' + result.id); this.globalEventService.publishInternalEvent('antennaCreated', result); } } catch (err: any) { - this.logger.error(err); + this.logger.error('Error importing antennas:', err); } } } diff --git a/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts index b78229c648..e2de9532eb 100644 --- a/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts @@ -40,10 +40,9 @@ export class ImportBlockingProcessorService { @bindThis public async process(job: Bull.Job<DbUserImportJobData>): Promise<void> { - this.logger.info(`Importing blocking of ${job.data.user.id} ...`); - const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { + this.logger.debug(`Skip: user ${job.data.user.id} does not exist`); return; } @@ -51,14 +50,17 @@ export class ImportBlockingProcessorService { id: job.data.fileId, }); if (file == null) { + this.logger.debug(`Skip: file ${job.data.fileId} does not exist`); return; } + this.logger.debug(`Importing blocking of ${job.data.user.id} ...`); + const csv = await this.downloadService.downloadTextFile(file.url); const targets = csv.trim().split('\n'); this.queueService.createImportBlockingToDbJob({ id: user.id }, targets); - this.logger.succ('Import jobs created'); + this.logger.debug('Import jobs created'); } @bindThis @@ -93,11 +95,11 @@ export class ImportBlockingProcessorService { // skip myself if (target.id === job.data.user.id) return; - this.logger.info(`Block ${target.id} ...`); + this.logger.debug(`Block ${target.id} ...`); this.queueService.createBlockJob([{ from: { id: user.id }, to: { id: target.id }, silent: true }]); } catch (e) { - this.logger.warn(`Error: ${e}`); + this.logger.error('Error importing blockings:', e as Error); } } } diff --git a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts index d08cadd378..4b909328cd 100644 --- a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts @@ -16,6 +16,7 @@ import { DriveService } from '@/core/DriveService.js'; import { DownloadService } from '@/core/DownloadService.js'; import { bindThis } from '@/decorators.js'; import type { Config } from '@/config.js'; +import { renderInlineError } from '@/misc/render-inline-error.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbUserImportJobData } from '../types.js'; @@ -45,18 +46,19 @@ export class ImportCustomEmojisProcessorService { @bindThis public async process(job: Bull.Job<DbUserImportJobData>): Promise<void> { - this.logger.info('Importing custom emojis ...'); - const file = await this.driveFilesRepository.findOneBy({ id: job.data.fileId, }); if (file == null) { + this.logger.debug(`Skip: file ${job.data.fileId} does not exist`); return; } + this.logger.info(`Importing custom emojis from ${file.id} (${file.name}) ...`); + const [path, cleanup] = await createTempDir(); - this.logger.info(`Temp dir is ${path}`); + this.logger.debug(`Temp dir is ${path}`); const destPath = path + '/emojis.zip'; @@ -65,14 +67,14 @@ export class ImportCustomEmojisProcessorService { await this.downloadService.downloadUrl(file.url, destPath, { operationTimeout: this.config.import?.downloadTimeout, maxSize: this.config.import?.maxFileSize }); } catch (e) { // TODO: 何度か再試行 if (e instanceof Error || typeof e === 'string') { - this.logger.error(e); + this.logger.error('Error importing custom emojis:', e as Error); } throw e; } const outputPath = path + '/emojis'; try { - this.logger.succ(`Unzipping to ${outputPath}`); + this.logger.debug(`Unzipping to ${outputPath}`); ZipReader.withDestinationPath(outputPath).viaBuffer(await fs.promises.readFile(destPath)); const metaRaw = fs.readFileSync(outputPath + '/meta.json', 'utf-8'); const meta = JSON.parse(metaRaw); @@ -117,7 +119,7 @@ export class ImportCustomEmojisProcessorService { }); } catch (e) { if (e instanceof Error || typeof e === 'string') { - this.logger.error(`couldn't import ${emojiPath} for ${emojiInfo.name}: ${e}`); + this.logger.error(`couldn't import ${emojiPath} for ${emojiInfo.name}: ${renderInlineError(e)}`); } continue; } @@ -125,11 +127,9 @@ export class ImportCustomEmojisProcessorService { cleanup(); - this.logger.succ('Imported'); + this.logger.debug('Imported'); } catch (e) { - if (e instanceof Error || typeof e === 'string') { - this.logger.error(e); - } + this.logger.error('Error importing custom emojis:', e as Error); cleanup(); throw e; } diff --git a/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts index 70c9f3a096..816d5cf65a 100644 --- a/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts @@ -40,10 +40,9 @@ export class ImportFollowingProcessorService { @bindThis public async process(job: Bull.Job<DbUserImportJobData>): Promise<void> { - this.logger.info(`Importing following of ${job.data.user.id} ...`); - const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { + this.logger.debug(`Skip: user ${job.data.user.id} does not exist`); return; } @@ -51,14 +50,17 @@ export class ImportFollowingProcessorService { id: job.data.fileId, }); if (file == null) { + this.logger.debug(`Skip: file ${job.data.fileId} does not exist`); return; } + this.logger.info(`Importing following of ${job.data.user.id} ...`); + const csv = await this.downloadService.downloadTextFile(file.url); const targets = csv.trim().split('\n'); this.queueService.createImportFollowingToDbJob({ id: user.id }, targets, job.data.withReplies); - this.logger.succ('Import jobs created'); + this.logger.debug('Import jobs created'); } @bindThis @@ -93,11 +95,11 @@ export class ImportFollowingProcessorService { // skip myself if (target.id === job.data.user.id) return; - this.logger.info(`Follow ${target.id} ${job.data.withReplies ? 'with replies' : 'without replies'} ...`); + this.logger.debug(`Follow ${target.id} ${job.data.withReplies ? 'with replies' : 'without replies'} ...`); this.queueService.createFollowJob([{ from: user, to: { id: target.id }, silent: true, withReplies: job.data.withReplies }]); } catch (e) { - this.logger.warn(`Error: ${e}`); + this.logger.error('Error importing followings:', e as Error); } } } diff --git a/packages/backend/src/queue/processors/ImportMutingProcessorService.ts b/packages/backend/src/queue/processors/ImportMutingProcessorService.ts index ec9d2b6c4c..d3827b12fd 100644 --- a/packages/backend/src/queue/processors/ImportMutingProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportMutingProcessorService.ts @@ -14,6 +14,7 @@ import { DownloadService } from '@/core/DownloadService.js'; import { UserMutingService } from '@/core/UserMutingService.js'; import { UtilityService } from '@/core/UtilityService.js'; import { bindThis } from '@/decorators.js'; +import { renderInlineError } from '@/misc/render-inline-error.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbUserImportJobData } from '../types.js'; @@ -40,10 +41,9 @@ export class ImportMutingProcessorService { @bindThis public async process(job: Bull.Job<DbUserImportJobData>): Promise<void> { - this.logger.info(`Importing muting of ${job.data.user.id} ...`); - const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { + this.logger.debug(`Skip: user ${job.data.user.id} does not exist`); return; } @@ -51,9 +51,12 @@ export class ImportMutingProcessorService { id: job.data.fileId, }); if (file == null) { + this.logger.debug(`Skip: file ${job.data.fileId} does not exist`); return; } + this.logger.info(`Importing muting of ${job.data.user.id} ...`); + const csv = await this.downloadService.downloadTextFile(file.url); let linenum = 0; @@ -88,14 +91,14 @@ export class ImportMutingProcessorService { // skip myself if (target.id === job.data.user.id) continue; - this.logger.info(`Mute[${linenum}] ${target.id} ...`); + this.logger.debug(`Mute[${linenum}] ${target.id} ...`); await this.userMutingService.mute(user, target); } catch (e) { - this.logger.warn(`Error in line:${linenum} ${e}`); + this.logger.warn(`Error in line:${linenum} ${renderInlineError(e)}`); } } - this.logger.succ('Imported'); + this.logger.debug('Imported'); } } diff --git a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts index 5e660e8081..e209855720 100644 --- a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts @@ -159,10 +159,9 @@ export class ImportNotesProcessorService { @bindThis public async process(job: Bull.Job<DbNoteImportJobData>): Promise<void> { - this.logger.info(`Starting note import of ${job.data.user.id} ...`); - const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { + this.logger.debug(`Skip: user ${job.data.user.id} does not exist`); return; } @@ -170,9 +169,12 @@ export class ImportNotesProcessorService { id: job.data.fileId, }); if (file == null) { + this.logger.debug(`Skip: file ${job.data.fileId} does not exist`); return; } + this.logger.info(`Starting note import of ${job.data.user.id} ...`); + let folder = await this.driveFoldersRepository.findOneBy({ name: 'Imports', userId: job.data.user.id }); if (folder == null) { await this.driveFoldersRepository.insert({ id: this.idService.gen(), name: 'Imports', userId: job.data.user.id }); @@ -184,7 +186,7 @@ export class ImportNotesProcessorService { if (type === 'Twitter' || file.name.startsWith('twitter') && file.name.endsWith('.zip')) { const [path, cleanup] = await createTempDir(); - this.logger.info(`Temp dir is ${path}`); + this.logger.debug(`Temp dir is ${path}`); const destPath = path + '/twitter.zip'; @@ -192,15 +194,13 @@ export class ImportNotesProcessorService { await fsp.writeFile(destPath, '', 'binary'); await this.downloadUrl(file.url, destPath); } catch (e) { // TODO: 何度か再試行 - if (e instanceof Error || typeof e === 'string') { - this.logger.error(e); - } + this.logger.error('Error importing notes:', e as Error); throw e; } const outputPath = path + '/twitter'; try { - this.logger.succ(`Unzipping to ${outputPath}`); + this.logger.debug(`Unzipping to ${outputPath}`); ZipReader.withDestinationPath(outputPath).viaBuffer(await fsp.readFile(destPath)); const unprocessedTweets = this.parseTwitterFile(await fsp.readFile(outputPath + '/data/tweets.js', 'utf-8')); @@ -214,7 +214,7 @@ export class ImportNotesProcessorService { } else if (type === 'Facebook' || file.name.startsWith('facebook-') && file.name.endsWith('.zip')) { const [path, cleanup] = await createTempDir(); - this.logger.info(`Temp dir is ${path}`); + this.logger.debug(`Temp dir is ${path}`); const destPath = path + '/facebook.zip'; @@ -222,15 +222,13 @@ export class ImportNotesProcessorService { await fsp.writeFile(destPath, '', 'binary'); await this.downloadUrl(file.url, destPath); } catch (e) { // TODO: 何度か再試行 - if (e instanceof Error || typeof e === 'string') { - this.logger.error(e); - } + this.logger.error('Error importing notes:', e as Error); throw e; } const outputPath = path + '/facebook'; try { - this.logger.succ(`Unzipping to ${outputPath}`); + this.logger.debug(`Unzipping to ${outputPath}`); ZipReader.withDestinationPath(outputPath).viaBuffer(await fsp.readFile(destPath)); const postsJson = await fsp.readFile(outputPath + '/your_activity_across_facebook/posts/your_posts__check_ins__photos_and_videos_1.json', 'utf-8'); const posts = JSON.parse(postsJson); @@ -247,7 +245,7 @@ export class ImportNotesProcessorService { } else if (file.name.endsWith('.zip')) { const [path, cleanup] = await createTempDir(); - this.logger.info(`Temp dir is ${path}`); + this.logger.debug(`Temp dir is ${path}`); const destPath = path + '/unknown.zip'; @@ -255,15 +253,13 @@ export class ImportNotesProcessorService { await fsp.writeFile(destPath, '', 'binary'); await this.downloadUrl(file.url, destPath); } catch (e) { // TODO: 何度か再試行 - if (e instanceof Error || typeof e === 'string') { - this.logger.error(e); - } + this.logger.error('Error importing notes:', e as Error); throw e; } const outputPath = path + '/unknown'; try { - this.logger.succ(`Unzipping to ${outputPath}`); + this.logger.debug(`Unzipping to ${outputPath}`); ZipReader.withDestinationPath(outputPath).viaBuffer(await fsp.readFile(destPath)); const isInstagram = type === 'Instagram' || fs.existsSync(outputPath + '/instagram_live') || fs.existsSync(outputPath + '/instagram_ads_and_businesses'); const isOutbox = type === 'Mastodon' || fs.existsSync(outputPath + '/outbox.json'); @@ -307,15 +303,13 @@ export class ImportNotesProcessorService { } else if (job.data.type === 'Misskey' || file.name.startsWith('notes-') && file.name.endsWith('.json')) { const [path, cleanup] = await createTemp(); - this.logger.info(`Temp dir is ${path}`); + this.logger.debug(`Temp dir is ${path}`); try { await fsp.writeFile(path, '', 'utf-8'); await this.downloadUrl(file.url, path); } catch (e) { // TODO: 何度か再試行 - if (e instanceof Error || typeof e === 'string') { - this.logger.error(e); - } + this.logger.error('Error importing notes:', e as Error); throw e; } @@ -326,7 +320,7 @@ export class ImportNotesProcessorService { cleanup(); } - this.logger.succ('Import jobs created'); + this.logger.debug('Import jobs created'); } @bindThis @@ -365,7 +359,7 @@ export class ImportNotesProcessorService { try { await this.downloadUrl(file.url, filePath); } catch (e) { // TODO: 何度か再試行 - this.logger.error(e instanceof Error ? e : new Error(e as string)); + this.logger.error('Error importing notes:', e as Error); } const driveFile = await this.driveService.addFile({ user: user, @@ -504,7 +498,7 @@ export class ImportNotesProcessorService { try { await this.downloadUrl(file.url, filePath); } catch (e) { // TODO: 何度か再試行 - this.logger.error(e instanceof Error ? e : new Error(e as string)); + this.logger.error('Error importing notes:', e as Error); } const driveFile = await this.driveService.addFile({ user: user, @@ -628,7 +622,7 @@ export class ImportNotesProcessorService { try { await this.downloadUrl(videos[0].url, filePath); } catch (e) { // TODO: 何度か再試行 - this.logger.error(e instanceof Error ? e : new Error(e as string)); + this.logger.error('Error importing notes:', e as Error); } const driveFile = await this.driveService.addFile({ user: user, @@ -653,7 +647,7 @@ export class ImportNotesProcessorService { try { await this.downloadUrl(file.media_url_https, filePath); } catch (e) { // TODO: 何度か再試行 - this.logger.error(e instanceof Error ? e : new Error(e as string)); + this.logger.error('Error importing notes:', e as Error); } const driveFile = await this.driveService.addFile({ @@ -673,7 +667,7 @@ export class ImportNotesProcessorService { const createdNote = await this.noteCreateService.import(user, { createdAt: date, reply: parentNote, text: text, files: files }); if (tweet.childNotes) this.queueService.createImportTweetsToDbJob(user, tweet.childNotes, createdNote.id); } catch (e) { - this.logger.warn(`Error: ${e}`); + this.logger.error('Error importing notes:', e as Error); } } diff --git a/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts index db9255b35d..482054e52f 100644 --- a/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts @@ -15,6 +15,7 @@ import { UserListService } from '@/core/UserListService.js'; import { IdService } from '@/core/IdService.js'; import { UtilityService } from '@/core/UtilityService.js'; import { bindThis } from '@/decorators.js'; +import { renderInlineError } from '@/misc/render-inline-error.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbUserImportJobData } from '../types.js'; @@ -48,10 +49,9 @@ export class ImportUserListsProcessorService { @bindThis public async process(job: Bull.Job<DbUserImportJobData>): Promise<void> { - this.logger.info(`Importing user lists of ${job.data.user.id} ...`); - const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { + this.logger.debug(`Skip: user ${job.data.user.id} does not exist`); return; } @@ -59,9 +59,12 @@ export class ImportUserListsProcessorService { id: job.data.fileId, }); if (file == null) { + this.logger.debug(`Skip: file ${job.data.fileId} does not exist`); return; } + this.logger.info(`Importing user lists of ${job.data.user.id} ...`); + const csv = await this.downloadService.downloadTextFile(file.url); let linenum = 0; @@ -102,10 +105,10 @@ export class ImportUserListsProcessorService { this.userListService.addMember(target, list!, user); } catch (e) { - this.logger.warn(`Error in line:${linenum} ${e}`); + this.logger.warn(`Error in line:${linenum} ${renderInlineError(e)}`); } } - this.logger.succ('Imported'); + this.logger.debug('Imported'); } } diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 9564724c62..5f82d558b3 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -21,7 +21,7 @@ import { ApDbResolverService } from '@/core/activitypub/ApDbResolverService.js'; import { StatusError } from '@/misc/status-error.js'; import { UtilityService } from '@/core/UtilityService.js'; import { ApPersonService } from '@/core/activitypub/models/ApPersonService.js'; -import { JsonLdService } from '@/core/activitypub/JsonLdService.js'; +import { isSigned, JsonLdService } from '@/core/activitypub/JsonLdService.js'; import { ApInboxService } from '@/core/activitypub/ApInboxService.js'; import { bindThis } from '@/decorators.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; @@ -31,6 +31,8 @@ import { SkApInboxLog } from '@/models/_.js'; import type { Config } from '@/config.js'; import { ApLogService, calculateDurationSince } from '@/core/ApLogService.js'; import { UpdateInstanceQueue } from '@/core/UpdateInstanceQueue.js'; +import { isRetryableError } from '@/misc/is-retryable-error.js'; +import { renderInlineError } from '@/misc/render-inline-error.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { InboxJobData } from '../types.js'; @@ -125,6 +127,14 @@ export class InboxProcessorService implements OnApplicationShutdown { return `Old keyId is no longer supported. ${keyIdLower}`; } + if (activity.actor as unknown == null || (Array.isArray(activity.actor) && activity.actor.length < 1)) { + return 'skip: activity has no actor'; + } + if (typeof(activity.actor) !== 'string' && typeof(activity.actor) !== 'object') { + return `skip: activity actor has invalid type: ${typeof(activity.actor)}`; + } + const actorId = getApId(activity.actor); + // HTTP-Signature keyIdを元にDBから取得 let authUser: { user: MiRemoteUser; @@ -134,26 +144,25 @@ export class InboxProcessorService implements OnApplicationShutdown { // keyIdでわからなければ、activity.actorを元にDBから取得 || activity.actorを元にリモートから取得 if (authUser == null) { try { - authUser = await this.apDbResolverService.getAuthUserFromApId(getApId(activity.actor)); + authUser = await this.apDbResolverService.getAuthUserFromApId(actorId); } catch (err) { // 対象が4xxならスキップ - if (err instanceof StatusError) { - if (!err.isRetryable) { - throw new Bull.UnrecoverableError(`skip: Ignored deleted actors on both ends ${activity.actor} - ${err.statusCode}`); - } - throw new Error(`Error in actor ${activity.actor} - ${err.statusCode}`); + if (!isRetryableError(err)) { + throw new Bull.UnrecoverableError(`skip: Ignored deleted actors on both ends ${actorId}`); } + + throw err; } } // それでもわからなければ終了 if (authUser == null) { - throw new Bull.UnrecoverableError(`skip: failed to resolve user ${getApId(activity.actor)}`); + throw new Bull.UnrecoverableError(`skip: failed to resolve user ${actorId}`); } // publicKey がなくても終了 if (authUser.key == null) { - throw new Bull.UnrecoverableError(`skip: failed to resolve user publicKey ${getApId(activity.actor)}`); + throw new Bull.UnrecoverableError(`skip: failed to resolve user publicKey ${actorId}`); } // HTTP-Signatureの検証 @@ -168,10 +177,10 @@ export class InboxProcessorService implements OnApplicationShutdown { } // また、signatureのsignerは、activity.actorと一致する必要がある - if (!httpSignatureValidated || authUser.user.uri !== getApId(activity.actor)) { + if (!httpSignatureValidated || authUser.user.uri !== actorId) { // 一致しなくても、でもLD-Signatureがありそうならそっちも見る - const ldSignature = activity.signature; - if (ldSignature) { + if (isSigned(activity)) { + const ldSignature = activity.signature; if (ldSignature.type !== 'RsaSignature2017') { throw new Bull.UnrecoverableError(`skip: unsupported LD-signature type ${ldSignature.type}`); } @@ -193,33 +202,30 @@ export class InboxProcessorService implements OnApplicationShutdown { throw new Bull.UnrecoverableError('skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした'); } - const jsonLd = this.jsonLdService.use(); - // LD-Signature検証 - const verified = await jsonLd.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false); + const verified = await this.jsonLdService.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false); if (!verified) { throw new Bull.UnrecoverableError('skip: LD-Signatureの検証に失敗しました'); } // アクティビティを正規化 - delete activity.signature; + const copy = { ...activity, signature: undefined }; try { - activity = await jsonLd.compact(activity) as IActivity; + activity = await this.jsonLdService.compact(copy) as IActivity; } catch (e) { throw new Bull.UnrecoverableError(`skip: failed to compact activity: ${e}`); } // TODO: 元のアクティビティと非互換な形に正規化される場合は転送をスキップする // https://github.com/mastodon/mastodon/blob/664b0ca/app/services/activitypub/process_collection_service.rb#L24-L29 - activity.signature = ldSignature; // もう一度actorチェック - if (authUser.user.uri !== activity.actor) { - throw new Bull.UnrecoverableError(`skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`); + if (authUser.user.uri !== actorId) { + throw new Bull.UnrecoverableError(`skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${actorId})`); } const ldHost = this.utilityService.extractDbHost(authUser.user.uri); if (!this.utilityService.isFederationAllowedHost(ldHost)) { - throw new Bull.UnrecoverableError(`Blocked request: ${ldHost}`); + throw new Bull.UnrecoverableError(`skip: request host is blocked: ${ldHost}`); } } else { throw new Bull.UnrecoverableError(`skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`); @@ -292,16 +298,8 @@ export class InboxProcessorService implements OnApplicationShutdown { } } - if (e instanceof StatusError && !e.isRetryable) { - return `skip: permanent error ${e.statusCode}`; - } - - if (e instanceof IdentifiableError && !e.isRetryable) { - if (e.message) { - return `skip: permanent error ${e.id}: ${e.message}`; - } else { - return `skip: permanent error ${e.id}`; - } + if (!isRetryableError(e)) { + return `skip: permanent error ${renderInlineError(e)}`; } throw e; diff --git a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts index 0c47fdedb3..5b7a871af9 100644 --- a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts +++ b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts @@ -36,6 +36,6 @@ export class ResyncChartsProcessorService { await this.notesChart.resync(); await this.usersChart.resync(); - this.logger.succ('All charts successfully resynced.'); + this.logger.info('All charts successfully resynced.'); } } diff --git a/packages/backend/src/queue/processors/ScheduleNotePostProcessorService.ts b/packages/backend/src/queue/processors/ScheduleNotePostProcessorService.ts index d823d98ef1..73088f3312 100644 --- a/packages/backend/src/queue/processors/ScheduleNotePostProcessorService.ts +++ b/packages/backend/src/queue/processors/ScheduleNotePostProcessorService.ts @@ -12,6 +12,7 @@ import { DI } from '@/di-symbols.js'; import { NotificationService } from '@/core/NotificationService.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; import type { MiScheduleNoteType } from '@/models/NoteSchedule.js'; +import { renderInlineError } from '@/misc/render-inline-error.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { ScheduleNotePostJobData } from '../types.js'; @@ -129,10 +130,11 @@ export class ScheduleNotePostProcessorService { channel, }).catch(async (err: IdentifiableError) => { this.notificationService.createNotification(me.id, 'scheduledNoteFailed', { - reason: err.message, + reason: renderInlineError(err), }); await this.noteScheduleRepository.remove(data); - throw this.logger.error(`Schedule Note Failed Reason: ${err.message}`); + this.logger.error(`Scheduled note failed: ${renderInlineError(err)}`); + throw err; }); await this.noteScheduleRepository.remove(data); this.notificationService.createNotification(me.id, 'scheduledNotePosted', { diff --git a/packages/backend/src/queue/processors/SystemWebhookDeliverProcessorService.ts b/packages/backend/src/queue/processors/SystemWebhookDeliverProcessorService.ts index f6bef52684..f9fcd1e928 100644 --- a/packages/backend/src/queue/processors/SystemWebhookDeliverProcessorService.ts +++ b/packages/backend/src/queue/processors/SystemWebhookDeliverProcessorService.ts @@ -12,6 +12,7 @@ import type Logger from '@/logger.js'; import { HttpRequestService } from '@/core/HttpRequestService.js'; import { StatusError } from '@/misc/status-error.js'; import { bindThis } from '@/decorators.js'; +import { renderInlineError } from '@/misc/render-inline-error.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import { SystemWebhookDeliverJobData } from '../types.js'; @@ -63,21 +64,16 @@ export class SystemWebhookDeliverProcessorService { return 'Success'; } catch (res) { - this.logger.error(res as Error); + this.logger.error(`Failed to send webhook: ${renderInlineError(res)}`); this.systemWebhooksRepository.update({ id: job.data.webhookId }, { latestSentAt: new Date(), latestStatus: res instanceof StatusError ? res.statusCode : 1, }); - if (res instanceof StatusError) { + if (res instanceof StatusError && !res.isRetryable) { // 4xx - if (!res.isRetryable) { - throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`); - } - - // 5xx etc. - throw new Error(`${res.statusCode} ${res.statusMessage}`); + throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`); } else { // DNS error, socket error, timeout ... throw res; diff --git a/packages/backend/src/queue/processors/TickChartsProcessorService.ts b/packages/backend/src/queue/processors/TickChartsProcessorService.ts index fc8856a271..b4b8b1f205 100644 --- a/packages/backend/src/queue/processors/TickChartsProcessorService.ts +++ b/packages/backend/src/queue/processors/TickChartsProcessorService.ts @@ -62,6 +62,6 @@ export class TickChartsProcessorService { await this.perUserDriveChart.tick(false); await this.apRequestChart.tick(false); - this.logger.succ('All charts successfully ticked.'); + this.logger.info('All charts successfully ticked.'); } } diff --git a/packages/backend/src/queue/processors/UserWebhookDeliverProcessorService.ts b/packages/backend/src/queue/processors/UserWebhookDeliverProcessorService.ts index 9ec630ef70..0208ce6038 100644 --- a/packages/backend/src/queue/processors/UserWebhookDeliverProcessorService.ts +++ b/packages/backend/src/queue/processors/UserWebhookDeliverProcessorService.ts @@ -69,14 +69,9 @@ export class UserWebhookDeliverProcessorService { latestStatus: res instanceof StatusError ? res.statusCode : 1, }); - if (res instanceof StatusError) { + if (res instanceof StatusError && !res.isRetryable) { // 4xx - if (!res.isRetryable) { - throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`); - } - - // 5xx etc. - throw new Error(`${res.statusCode} ${res.statusMessage}`); + throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`); } else { // DNS error, socket error, timeout ... throw res; |