diff options
Diffstat (limited to 'packages/backend/src/queue/processors')
51 files changed, 2299 insertions, 1732 deletions
diff --git a/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts b/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts new file mode 100644 index 0000000000..514dc1dcf3 --- /dev/null +++ b/packages/backend/src/queue/processors/CheckExpiredMutingsProcessorService.ts @@ -0,0 +1,50 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { In, MoreThan } from 'typeorm'; +import { DI } from '@/di-symbols.js'; +import { MutingsRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { GlobalEventService } from '@/core/GlobalEventService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; + +@Injectable() +export class CheckExpiredMutingsProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.mutingsRepository) + private mutingsRepository: MutingsRepository, + + private globalEventService: GlobalEventService, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('check-expired-mutings'); + } + + public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> { + this.#logger.info('Checking expired mutings...'); + + const expired = await this.mutingsRepository.createQueryBuilder('muting') + .where('muting.expiresAt IS NOT NULL') + .andWhere('muting.expiresAt < :now', { now: new Date() }) + .innerJoinAndSelect('muting.mutee', 'mutee') + .getMany(); + + if (expired.length > 0) { + await this.mutingsRepository.delete({ + id: In(expired.map(m => m.id)), + }); + + for (const m of expired) { + this.globalEventService.publishUserEvent(m.muterId, 'unmute', m.mutee!); + } + } + + this.#logger.succ('All expired mutings checked.'); + done(); + } +} diff --git a/packages/backend/src/queue/processors/CleanChartsProcessorService.ts b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts new file mode 100644 index 0000000000..0eaad9b9ed --- /dev/null +++ b/packages/backend/src/queue/processors/CleanChartsProcessorService.ts @@ -0,0 +1,68 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { In, MoreThan } from 'typeorm'; +import { DI } from '@/di-symbols.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import FederationChart from '@/core/chart/charts/federation.js'; +import NotesChart from '@/core/chart/charts/notes.js'; +import UsersChart from '@/core/chart/charts/users.js'; +import ActiveUsersChart from '@/core/chart/charts/active-users.js'; +import InstanceChart from '@/core/chart/charts/instance.js'; +import PerUserNotesChart from '@/core/chart/charts/per-user-notes.js'; +import DriveChart from '@/core/chart/charts/drive.js'; +import PerUserReactionsChart from '@/core/chart/charts/per-user-reactions.js'; +import HashtagChart from '@/core/chart/charts/hashtag.js'; +import PerUserFollowingChart from '@/core/chart/charts/per-user-following.js'; +import PerUserDriveChart from '@/core/chart/charts/per-user-drive.js'; +import ApRequestChart from '@/core/chart/charts/ap-request.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; + +@Injectable() +export class CleanChartsProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + private federationChart: FederationChart, + private notesChart: NotesChart, + private usersChart: UsersChart, + private activeUsersChart: ActiveUsersChart, + private instanceChart: InstanceChart, + private perUserNotesChart: PerUserNotesChart, + private driveChart: DriveChart, + private perUserReactionsChart: PerUserReactionsChart, + private hashtagChart: HashtagChart, + private perUserFollowingChart: PerUserFollowingChart, + private perUserDriveChart: PerUserDriveChart, + private apRequestChart: ApRequestChart, + + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('clean-charts'); + } + + public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> { + this.#logger.info('Clean charts...'); + + await Promise.all([ + this.federationChart.clean(), + this.notesChart.clean(), + this.usersChart.clean(), + this.activeUsersChart.clean(), + this.instanceChart.clean(), + this.perUserNotesChart.clean(), + this.driveChart.clean(), + this.perUserReactionsChart.clean(), + this.hashtagChart.clean(), + this.perUserFollowingChart.clean(), + this.perUserDriveChart.clean(), + this.apRequestChart.clean(), + ]); + + this.#logger.succ('All charts successfully cleaned.'); + done(); + } +} diff --git a/packages/backend/src/queue/processors/CleanProcessorService.ts b/packages/backend/src/queue/processors/CleanProcessorService.ts new file mode 100644 index 0000000000..6150120806 --- /dev/null +++ b/packages/backend/src/queue/processors/CleanProcessorService.ts @@ -0,0 +1,36 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { In, LessThan, MoreThan } from 'typeorm'; +import { DI } from '@/di-symbols.js'; +import { UserIpsRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; + +@Injectable() +export class CleanProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.userIpsRepository) + private userIpsRepository: UserIpsRepository, + + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('clean'); + } + + public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> { + this.#logger.info('Cleaning...'); + + this.userIpsRepository.delete({ + createdAt: LessThan(new Date(Date.now() - (1000 * 60 * 60 * 24 * 90))), + }); + + this.#logger.succ('Cleaned.'); + done(); + } +} diff --git a/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts new file mode 100644 index 0000000000..8c53632563 --- /dev/null +++ b/packages/backend/src/queue/processors/CleanRemoteFilesProcessorService.ts @@ -0,0 +1,69 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { IsNull, MoreThan, Not } from 'typeorm'; +import { DI } from '@/di-symbols.js'; +import { DriveFilesRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { DriveService } from '@/core/DriveService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; + +@Injectable() +export class CleanRemoteFilesProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.driveFilesRepository) + private driveFilesRepository: DriveFilesRepository, + + private driveService: DriveService, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('clean-remote-files'); + } + + public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> { + this.#logger.info('Deleting cached remote files...'); + + let deletedCount = 0; + let cursor: any = null; + + while (true) { + const files = await this.driveFilesRepository.find({ + where: { + userHost: Not(IsNull()), + isLink: false, + ...(cursor ? { id: MoreThan(cursor) } : {}), + }, + take: 8, + order: { + id: 1, + }, + }); + + if (files.length === 0) { + job.progress(100); + break; + } + + cursor = files[files.length - 1].id; + + await Promise.all(files.map(file => this.driveService.deleteFileSync(file, true))); + + deletedCount += 8; + + const total = await this.driveFilesRepository.countBy({ + userHost: Not(IsNull()), + isLink: false, + }); + + job.progress(deletedCount / total); + } + + this.#logger.succ('All cahced remote files has been deleted.'); + done(); + } +} diff --git a/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts new file mode 100644 index 0000000000..a5255c5c05 --- /dev/null +++ b/packages/backend/src/queue/processors/DeleteAccountProcessorService.ts @@ -0,0 +1,124 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { MoreThan } from 'typeorm'; +import { DI } from '@/di-symbols.js'; +import { DriveFilesRepository, UserProfilesRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { DriveService } from '@/core/DriveService.js'; +import type { DriveFile } from '@/models/entities/DriveFile.js'; +import type { Note } from '@/models/entities/Note.js'; +import { EmailService } from '@/core/EmailService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; +import type { DbUserDeleteJobData } from '../types.js'; + +@Injectable() +export class DeleteAccountProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.userProfilesRepository) + private userProfilesRepository: UserProfilesRepository, + + @Inject(DI.notesRepository) + private notesRepository: NotesRepository, + + @Inject(DI.driveFilesRepository) + private driveFilesRepository: DriveFilesRepository, + + private driveService: DriveService, + private emailService: EmailService, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('delete-account'); + } + + public async process(job: Bull.Job<DbUserDeleteJobData>): Promise<string | void> { + this.#logger.info(`Deleting account of ${job.data.user.id} ...`); + + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + return; + } + + { // Delete notes + let cursor: Note['id'] | null = null; + + while (true) { + const notes = await this.notesRepository.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}), + }, + take: 100, + order: { + id: 1, + }, + }) as Note[]; + + if (notes.length === 0) { + break; + } + + cursor = notes[notes.length - 1].id; + + await this.notesRepository.delete(notes.map(note => note.id)); + } + + this.#logger.succ('All of notes deleted'); + } + + { // Delete files + let cursor: DriveFile['id'] | null = null; + + while (true) { + const files = await this.driveFilesRepository.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}), + }, + take: 10, + order: { + id: 1, + }, + }) as DriveFile[]; + + if (files.length === 0) { + break; + } + + cursor = files[files.length - 1].id; + + for (const file of files) { + await this.driveService.deleteFileSync(file); + } + } + + this.#logger.succ('All of files deleted'); + } + + { // Send email notification + const profile = await this.userProfilesRepository.findOneByOrFail({ userId: user.id }); + if (profile.email && profile.emailVerified) { + this.emailService.sendEmail(profile.email, 'Account deleted', + 'Your account has been deleted.', + 'Your account has been deleted.'); + } + } + + // soft指定されている場合は物理削除しない + if (job.data.soft) { + // nop + } else { + await this.usersRepository.delete(job.data.user.id); + } + + return 'Account deleted'; + } +} diff --git a/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts b/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts new file mode 100644 index 0000000000..80814bb5a2 --- /dev/null +++ b/packages/backend/src/queue/processors/DeleteDriveFilesProcessorService.ts @@ -0,0 +1,78 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { MoreThan } from 'typeorm'; +import { DI } from '@/di-symbols.js'; +import { UsersRepository, DriveFilesRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { DriveService } from '@/core/DriveService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; +import type { DbUserJobData } from '../types.js'; + +@Injectable() +export class DeleteDriveFilesProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.driveFilesRepository) + private driveFilesRepository: DriveFilesRepository, + + private driveService: DriveService, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('delete-drive-files'); + } + + public async process(job: Bull.Job<DbUserJobData>, done: () => void): Promise<void> { + this.#logger.info(`Deleting drive files of ${job.data.user.id} ...`); + + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + done(); + return; + } + + let deletedCount = 0; + let cursor: any = null; + + while (true) { + const files = await this.driveFilesRepository.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}), + }, + take: 100, + order: { + id: 1, + }, + }); + + if (files.length === 0) { + job.progress(100); + break; + } + + cursor = files[files.length - 1].id; + + for (const file of files) { + await this.driveService.deleteFileSync(file); + deletedCount++; + } + + const total = await this.driveFilesRepository.countBy({ + userId: user.id, + }); + + job.progress(deletedCount / total); + } + + this.#logger.succ(`All drive files (${deletedCount}) of ${user.id} has been deleted.`); + done(); + } +} diff --git a/packages/backend/src/queue/processors/DeleteFileProcessorService.ts b/packages/backend/src/queue/processors/DeleteFileProcessorService.ts new file mode 100644 index 0000000000..55424f6444 --- /dev/null +++ b/packages/backend/src/queue/processors/DeleteFileProcessorService.ts @@ -0,0 +1,31 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { DI } from '@/di-symbols.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { DriveService } from '@/core/DriveService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; +import type { ObjectStorageFileJobData } from '../types.js'; + +@Injectable() +export class DeleteFileProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + private driveService: DriveService, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('delete-file'); + } + + public async process(job: Bull.Job<ObjectStorageFileJobData>): Promise<string> { + const key: string = job.data.key; + + await this.driveService.deleteObjectStorageFile(key); + + return 'Success'; + } +} diff --git a/packages/backend/src/queue/processors/DeliverProcessorService.ts b/packages/backend/src/queue/processors/DeliverProcessorService.ts new file mode 100644 index 0000000000..3403ec83a9 --- /dev/null +++ b/packages/backend/src/queue/processors/DeliverProcessorService.ts @@ -0,0 +1,130 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { MoreThan } from 'typeorm'; +import { DI } from '@/di-symbols.js'; +import { DriveFilesRepository, InstancesRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { MetaService } from '@/core/MetaService.js'; +import { ApRequestService } from '@/core/remote/activitypub/ApRequestService.js'; +import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; +import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js'; +import { Cache } from '@/misc/cache.js'; +import type { Instance } from '@/models/entities/Instance.js'; +import InstanceChart from '@/core/chart/charts/instance.js'; +import ApRequestChart from '@/core/chart/charts/ap-request.js'; +import FederationChart from '@/core/chart/charts/federation.js'; +import { StatusError } from '@/misc/status-error.js'; +import { UtilityService } from '@/core/UtilityService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; +import type { DeliverJobData } from '../types.js'; + +@Injectable() +export class DeliverProcessorService { + #logger: Logger; + #suspendedHostsCache: Cache<Instance[]>; + #latest: string | null; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.instancesRepository) + private instancesRepository: InstancesRepository, + + @Inject(DI.driveFilesRepository) + private driveFilesRepository: DriveFilesRepository, + + private metaService: MetaService, + private utilityService: UtilityService, + private federatedInstanceService: FederatedInstanceService, + private fetchInstanceMetadataService: FetchInstanceMetadataService, + private apRequestService: ApRequestService, + private instanceChart: InstanceChart, + private apRequestChart: ApRequestChart, + private federationChart: FederationChart, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('deliver'); + this.#suspendedHostsCache = new Cache<Instance[]>(1000 * 60 * 60); + this.#latest = null; + } + + public async process(job: Bull.Job<DeliverJobData>): Promise<string> { + const { host } = new URL(job.data.to); + + // ブロックしてたら中断 + const meta = await this.metaService.fetch(); + if (meta.blockedHosts.includes(this.utilityService.toPuny(host))) { + return 'skip (blocked)'; + } + + // isSuspendedなら中断 + let suspendedHosts = this.#suspendedHostsCache.get(null); + if (suspendedHosts == null) { + suspendedHosts = await this.instancesRepository.find({ + where: { + isSuspended: true, + }, + }); + this.#suspendedHostsCache.set(null, suspendedHosts); + } + if (suspendedHosts.map(x => x.host).includes(this.utilityService.toPuny(host))) { + return 'skip (suspended)'; + } + + try { + if (this.#latest !== (this.#latest = JSON.stringify(job.data.content, null, 2))) { + this.#logger.debug(`delivering ${this.#latest}`); + } + + await this.apRequestService.signedPost(job.data.user, job.data.to, job.data.content); + + // Update stats + this.federatedInstanceService.registerOrFetchInstanceDoc(host).then(i => { + this.instancesRepository.update(i.id, { + latestRequestSentAt: new Date(), + latestStatus: 200, + lastCommunicatedAt: new Date(), + isNotResponding: false, + }); + + this.fetchInstanceMetadataService.fetchInstanceMetadata(i); + + this.instanceChart.requestSent(i.host, true); + this.apRequestChart.deliverSucc(); + this.federationChart.deliverd(i.host, true); + }); + + return 'Success'; + } catch (res) { + // Update stats + this.federatedInstanceService.registerOrFetchInstanceDoc(host).then(i => { + this.instancesRepository.update(i.id, { + latestRequestSentAt: new Date(), + latestStatus: res instanceof StatusError ? res.statusCode : null, + isNotResponding: true, + }); + + this.instanceChart.requestSent(i.host, false); + this.apRequestChart.deliverFail(); + this.federationChart.deliverd(i.host, false); + }); + + if (res instanceof StatusError) { + // 4xx + if (res.isClientError) { + // HTTPステータスコード4xxはクライアントエラーであり、それはつまり + // 何回再送しても成功することはないということなのでエラーにはしないでおく + return `${res.statusCode} ${res.statusMessage}`; + } + + // 5xx etc. + throw `${res.statusCode} ${res.statusMessage}`; + } else { + // DNS error, socket error, timeout ... + throw res; + } + } + } +} diff --git a/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts b/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts new file mode 100644 index 0000000000..b90c7be629 --- /dev/null +++ b/packages/backend/src/queue/processors/EndedPollNotificationProcessorService.ts @@ -0,0 +1,56 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { MoreThan } from 'typeorm'; +import { DI } from '@/di-symbols.js'; +import { PollVotesRepository, NotesRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { CreateNotificationService } from '@/core/CreateNotificationService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; +import type { EndedPollNotificationJobData } from '../types.js'; + +@Injectable() +export class EndedPollNotificationProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.notesRepository) + private notesRepository: NotesRepository, + + @Inject(DI.pollVotesRepository) + private pollVotesRepository: PollVotesRepository, + + private createNotificationService: CreateNotificationService, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('ended-poll-notification'); + } + + public async process(job: Bull.Job<EndedPollNotificationJobData>, done: () => void): Promise<void> { + const note = await this.notesRepository.findOneBy({ id: job.data.noteId }); + if (note == null || !note.hasPoll) { + done(); + return; + } + + const votes = await this.pollVotesRepository.createQueryBuilder('vote') + .select('vote.userId') + .where('vote.noteId = :noteId', { noteId: note.id }) + .innerJoinAndSelect('vote.user', 'user') + .andWhere('user.host IS NULL') + .getMany(); + + const userIds = [...new Set([note.userId, ...votes.map(v => v.userId)])]; + + for (const userId of userIds) { + this.createNotificationService.createNotification(userId, 'pollEnded', { + noteId: note.id, + }); + } + + done(); + } +} diff --git a/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts new file mode 100644 index 0000000000..9b520be06e --- /dev/null +++ b/packages/backend/src/queue/processors/ExportBlockingProcessorService.ts @@ -0,0 +1,117 @@ +import * as fs from 'node:fs'; +import { Inject, Injectable } from '@nestjs/common'; +import { MoreThan } from 'typeorm'; +import { format as dateFormat } from 'date-fns'; +import { DI } from '@/di-symbols.js'; +import { UsersRepository, BlockingsRepository } from '@/models/index.js'; +import type { DriveFilesRepository, UserProfilesRepository, NotesRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { DriveService } from '@/core/DriveService.js'; +import { createTemp } from '@/misc/create-temp.js'; +import { UtilityService } from '@/core/UtilityService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; +import type { DbUserJobData } from '../types.js'; + +@Injectable() +export class ExportBlockingProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.blockingsRepository) + private blockingsRepository: BlockingsRepository, + + private utilityService: UtilityService, + private driveService: DriveService, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('export-blocking'); + } + + public async process(job: Bull.Job<DbUserJobData>, done: () => void): Promise<void> { + this.#logger.info(`Exporting blocking of ${job.data.user.id} ...`); + + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + done(); + return; + } + + // Create temp file + const [path, cleanup] = await createTemp(); + + this.#logger.info(`Temp file is ${path}`); + + try { + const stream = fs.createWriteStream(path, { flags: 'a' }); + + let exportedCount = 0; + let cursor: any = null; + + while (true) { + const blockings = await this.blockingsRepository.find({ + where: { + blockerId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}), + }, + take: 100, + order: { + id: 1, + }, + }); + + if (blockings.length === 0) { + job.progress(100); + break; + } + + cursor = blockings[blockings.length - 1].id; + + for (const block of blockings) { + const u = await this.usersRepository.findOneBy({ id: block.blockeeId }); + if (u == null) { + exportedCount++; continue; + } + + const content = this.utilityService.getFullApAccount(u.username, u.host); + await new Promise<void>((res, rej) => { + stream.write(content + '\n', err => { + if (err) { + this.#logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + exportedCount++; + } + + const total = await this.blockingsRepository.countBy({ + blockerId: user.id, + }); + + job.progress(exportedCount / total); + } + + stream.end(); + this.#logger.succ(`Exported to: ${path}`); + + const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv'; + const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true }); + + this.#logger.succ(`Exported to: ${driveFile.id}`); + } finally { + cleanup(); + } + + done(); + } +} diff --git a/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts new file mode 100644 index 0000000000..93341c2c63 --- /dev/null +++ b/packages/backend/src/queue/processors/ExportCustomEmojisProcessorService.ts @@ -0,0 +1,135 @@ +import * as fs from 'node:fs'; +import { Inject, Injectable } from '@nestjs/common'; +import { IsNull, MoreThan } from 'typeorm'; +import { format as dateFormat } from 'date-fns'; +import { ulid } from 'ulid'; +import mime from 'mime-types'; +import archiver from 'archiver'; +import { DI } from '@/di-symbols.js'; +import { EmojisRepository, UsersRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { DriveService } from '@/core/DriveService.js'; +import { createTemp, createTempDir } from '@/misc/create-temp.js'; +import { DownloadService } from '@/core/DownloadService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; + +@Injectable() +export class ExportCustomEmojisProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.emojisRepository) + private emojisRepository: EmojisRepository, + + private driveService: DriveService, + private downloadService: DownloadService, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('export-custom-emojis'); + } + + public async process(job: Bull.Job, done: () => void): Promise<void> { + this.#logger.info('Exporting custom emojis ...'); + + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + done(); + return; + } + + const [path, cleanup] = await createTempDir(); + + this.#logger.info(`Temp dir is ${path}`); + + const metaPath = path + '/meta.json'; + + fs.writeFileSync(metaPath, '', 'utf-8'); + + const metaStream = fs.createWriteStream(metaPath, { flags: 'a' }); + + const writeMeta = (text: string): Promise<void> => { + return new Promise<void>((res, rej) => { + metaStream.write(text, err => { + if (err) { + this.#logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + }; + + await writeMeta(`{"metaVersion":2,"host":"${this.config.host}","exportedAt":"${new Date().toString()}","emojis":[`); + + const customEmojis = await this.emojisRepository.find({ + where: { + host: IsNull(), + }, + order: { + id: 'ASC', + }, + }); + + for (const emoji of customEmojis) { + const ext = mime.extension(emoji.type); + const fileName = emoji.name + (ext ? '.' + ext : ''); + const emojiPath = path + '/' + fileName; + fs.writeFileSync(emojiPath, '', 'binary'); + let downloaded = false; + + try { + await this.downloadService.downloadUrl(emoji.originalUrl, emojiPath); + downloaded = true; + } catch (e) { // TODO: 何度か再試行 + this.#logger.error(e instanceof Error ? e : new Error(e as string)); + } + + if (!downloaded) { + fs.unlinkSync(emojiPath); + } + + const content = JSON.stringify({ + fileName: fileName, + downloaded: downloaded, + emoji: emoji, + }); + const isFirst = customEmojis.indexOf(emoji) === 0; + + await writeMeta(isFirst ? content : ',\n' + content); + } + + await writeMeta(']}'); + + metaStream.end(); + + // Create archive + const [archivePath, archiveCleanup] = await createTemp(); + const archiveStream = fs.createWriteStream(archivePath); + const archive = archiver('zip', { + zlib: { level: 0 }, + }); + archiveStream.on('close', async () => { + this.#logger.succ(`Exported to: ${archivePath}`); + + const fileName = 'custom-emojis-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.zip'; + const driveFile = await this.driveService.addFile({ user, path: archivePath, name: fileName, force: true }); + + this.#logger.succ(`Exported to: ${driveFile.id}`); + cleanup(); + archiveCleanup(); + done(); + }); + archive.pipe(archiveStream); + archive.directory(path, false); + archive.finalize(); + } +} diff --git a/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts new file mode 100644 index 0000000000..9946015ff7 --- /dev/null +++ b/packages/backend/src/queue/processors/ExportFollowingProcessorService.ts @@ -0,0 +1,120 @@ +import * as fs from 'node:fs'; +import { Inject, Injectable } from '@nestjs/common'; +import { In, MoreThan, Not } from 'typeorm'; +import { format as dateFormat } from 'date-fns'; +import { DI } from '@/di-symbols.js'; +import { FollowingsRepository, MutingsRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { DriveService } from '@/core/DriveService.js'; +import { createTemp } from '@/misc/create-temp.js'; +import type { Following } from '@/models/entities/Following.js'; +import { UtilityService } from '@/core/UtilityService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; +import type { DbUserJobData } from '../types.js'; + +@Injectable() +export class ExportFollowingProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.followingsRepository) + private followingsRepository: FollowingsRepository, + + @Inject(DI.mutingsRepository) + private mutingsRepository: MutingsRepository, + + private utilityService: UtilityService, + private driveService: DriveService, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('export-following'); + } + + public async process(job: Bull.Job<DbUserJobData>, done: () => void): Promise<void> { + this.#logger.info(`Exporting following of ${job.data.user.id} ...`); + + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + done(); + return; + } + + // Create temp file + const [path, cleanup] = await createTemp(); + + this.#logger.info(`Temp file is ${path}`); + + try { + const stream = fs.createWriteStream(path, { flags: 'a' }); + + let cursor: Following['id'] | null = null; + + const mutings = job.data.excludeMuting ? await this.mutingsRepository.findBy({ + muterId: user.id, + }) : []; + + while (true) { + const followings = await this.followingsRepository.find({ + where: { + followerId: user.id, + ...(mutings.length > 0 ? { followeeId: Not(In(mutings.map(x => x.muteeId))) } : {}), + ...(cursor ? { id: MoreThan(cursor) } : {}), + }, + take: 100, + order: { + id: 1, + }, + }) as Following[]; + + if (followings.length === 0) { + break; + } + + cursor = followings[followings.length - 1].id; + + for (const following of followings) { + const u = await this.usersRepository.findOneBy({ id: following.followeeId }); + if (u == null) { + continue; + } + + if (job.data.excludeInactive && u.updatedAt && (Date.now() - u.updatedAt.getTime() > 1000 * 60 * 60 * 24 * 90)) { + continue; + } + + const content = this.utilityService.getFullApAccount(u.username, u.host); + await new Promise<void>((res, rej) => { + stream.write(content + '\n', err => { + if (err) { + this.#logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + } + } + + stream.end(); + this.#logger.succ(`Exported to: ${path}`); + + const fileName = 'following-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv'; + const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true }); + + this.#logger.succ(`Exported to: ${driveFile.id}`); + } finally { + cleanup(); + } + + done(); + } +} diff --git a/packages/backend/src/queue/processors/ExportMutingProcessorService.ts b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts new file mode 100644 index 0000000000..a34cea0f41 --- /dev/null +++ b/packages/backend/src/queue/processors/ExportMutingProcessorService.ts @@ -0,0 +1,120 @@ +import * as fs from 'node:fs'; +import { Inject, Injectable } from '@nestjs/common'; +import { IsNull, MoreThan } from 'typeorm'; +import { format as dateFormat } from 'date-fns'; +import { DI } from '@/di-symbols.js'; +import { MutingsRepository, UsersRepository, BlockingsRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { DriveService } from '@/core/DriveService.js'; +import { createTemp } from '@/misc/create-temp.js'; +import { UtilityService } from '@/core/UtilityService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; +import type { DbUserJobData } from '../types.js'; + +@Injectable() +export class ExportMutingProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.blockingsRepository) + private blockingsRepository: BlockingsRepository, + + @Inject(DI.mutingsRepository) + private mutingsRepository: MutingsRepository, + + private utilityService: UtilityService, + private driveService: DriveService, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('export-muting'); + } + + public async process(job: Bull.Job<DbUserJobData>, done: () => void): Promise<void> { + this.#logger.info(`Exporting muting of ${job.data.user.id} ...`); + + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + done(); + return; + } + + // Create temp file + const [path, cleanup] = await createTemp(); + + this.#logger.info(`Temp file is ${path}`); + + try { + const stream = fs.createWriteStream(path, { flags: 'a' }); + + let exportedCount = 0; + let cursor: any = null; + + while (true) { + const mutes = await this.mutingsRepository.find({ + where: { + muterId: user.id, + expiresAt: IsNull(), + ...(cursor ? { id: MoreThan(cursor) } : {}), + }, + take: 100, + order: { + id: 1, + }, + }); + + if (mutes.length === 0) { + job.progress(100); + break; + } + + cursor = mutes[mutes.length - 1].id; + + for (const mute of mutes) { + const u = await this.usersRepository.findOneBy({ id: mute.muteeId }); + if (u == null) { + exportedCount++; continue; + } + + const content = this.utilityService.getFullApAccount(u.username, u.host); + await new Promise<void>((res, rej) => { + stream.write(content + '\n', err => { + if (err) { + this.#logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + exportedCount++; + } + + const total = await this.mutingsRepository.countBy({ + muterId: user.id, + }); + + job.progress(exportedCount / total); + } + + stream.end(); + this.#logger.succ(`Exported to: ${path}`); + + const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv'; + const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true }); + + this.#logger.succ(`Exported to: ${driveFile.id}`); + } finally { + cleanup(); + } + + done(); + } +} diff --git a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts new file mode 100644 index 0000000000..24fcc1a8ad --- /dev/null +++ b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts @@ -0,0 +1,143 @@ +import * as fs from 'node:fs'; +import { Inject, Injectable } from '@nestjs/common'; +import { IsNull, MoreThan } from 'typeorm'; +import { format as dateFormat } from 'date-fns'; +import { DI } from '@/di-symbols.js'; +import { NotesRepository, PollsRepository, UsersRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { DriveService } from '@/core/DriveService.js'; +import { createTemp } from '@/misc/create-temp.js'; +import type { Poll } from '@/models/entities/Poll.js'; +import type { Note } from '@/models/entities/Note.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; +import type { DbUserJobData } from '../types.js'; + +@Injectable() +export class ExportNotesProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.pollsRepository) + private pollsRepository: PollsRepository, + + @Inject(DI.notesRepository) + private notesRepository: NotesRepository, + + private driveService: DriveService, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('export-notes'); + } + + public async process(job: Bull.Job<DbUserJobData>, done: () => void): Promise<void> { + this.#logger.info(`Exporting notes of ${job.data.user.id} ...`); + + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + done(); + return; + } + + // Create temp file + const [path, cleanup] = await createTemp(); + + this.#logger.info(`Temp file is ${path}`); + + try { + const stream = fs.createWriteStream(path, { flags: 'a' }); + + const write = (text: string): Promise<void> => { + return new Promise<void>((res, rej) => { + stream.write(text, err => { + if (err) { + this.#logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + }; + + await write('['); + + let exportedNotesCount = 0; + let cursor: Note['id'] | null = null; + + while (true) { + const notes = await this.notesRepository.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}), + }, + take: 100, + order: { + id: 1, + }, + }) as Note[]; + + if (notes.length === 0) { + job.progress(100); + break; + } + + cursor = notes[notes.length - 1].id; + + for (const note of notes) { + let poll: Poll | undefined; + if (note.hasPoll) { + poll = await this.pollsRepository.findOneByOrFail({ noteId: note.id }); + } + const content = JSON.stringify(serialize(note, poll)); + const isFirst = exportedNotesCount === 0; + await write(isFirst ? content : ',\n' + content); + exportedNotesCount++; + } + + const total = await this.notesRepository.countBy({ + userId: user.id, + }); + + job.progress(exportedNotesCount / total); + } + + await write(']'); + + stream.end(); + this.#logger.succ(`Exported to: ${path}`); + + const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json'; + const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true }); + + this.#logger.succ(`Exported to: ${driveFile.id}`); + } finally { + cleanup(); + } + + done(); + } +} + +function serialize(note: Note, poll: Poll | null = null): Record<string, unknown> { + return { + id: note.id, + text: note.text, + createdAt: note.createdAt, + fileIds: note.fileIds, + replyId: note.replyId, + renoteId: note.renoteId, + poll: poll, + cw: note.cw, + visibility: note.visibility, + visibleUserIds: note.visibleUserIds, + localOnly: note.localOnly, + }; +} diff --git a/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts new file mode 100644 index 0000000000..a02e9bdee4 --- /dev/null +++ b/packages/backend/src/queue/processors/ExportUserListsProcessorService.ts @@ -0,0 +1,96 @@ +import * as fs from 'node:fs'; +import { Inject, Injectable } from '@nestjs/common'; +import { In, IsNull, MoreThan } from 'typeorm'; +import { format as dateFormat } from 'date-fns'; +import { DI } from '@/di-symbols.js'; +import { UserListJoiningsRepository, UserListsRepository, UsersRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { DriveService } from '@/core/DriveService.js'; +import { createTemp } from '@/misc/create-temp.js'; +import { UtilityService } from '@/core/UtilityService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; +import type { DbUserJobData } from '../types.js'; + +@Injectable() +export class ExportUserListsProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.userListsRepository) + private userListsRepository: UserListsRepository, + + @Inject(DI.userListJoiningsRepository) + private userListJoiningsRepository: UserListJoiningsRepository, + + private utilityService: UtilityService, + private driveService: DriveService, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('export-user-lists'); + } + + public async process(job: Bull.Job<DbUserJobData>, done: () => void): Promise<void> { + this.#logger.info(`Exporting user lists of ${job.data.user.id} ...`); + + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + done(); + return; + } + + const lists = await this.userListsRepository.findBy({ + userId: user.id, + }); + + // Create temp file + const [path, cleanup] = await createTemp(); + + this.#logger.info(`Temp file is ${path}`); + + try { + const stream = fs.createWriteStream(path, { flags: 'a' }); + + for (const list of lists) { + const joinings = await this.userListJoiningsRepository.findBy({ userListId: list.id }); + const users = await this.usersRepository.findBy({ + id: In(joinings.map(j => j.userId)), + }); + + for (const u of users) { + const acct = this.utilityService.getFullApAccount(u.username, u.host); + const content = `${list.name},${acct}`; + await new Promise<void>((res, rej) => { + stream.write(content + '\n', err => { + if (err) { + this.#logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + } + } + + stream.end(); + this.#logger.succ(`Exported to: ${path}`); + + const fileName = 'user-lists-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv'; + const driveFile = await this.driveService.addFile({ user, path, name: fileName, force: true }); + + this.#logger.succ(`Exported to: ${driveFile.id}`); + } finally { + cleanup(); + } + + done(); + } +} diff --git a/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts b/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts new file mode 100644 index 0000000000..abae196299 --- /dev/null +++ b/packages/backend/src/queue/processors/ImportBlockingProcessorService.ts @@ -0,0 +1,102 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { IsNull, MoreThan } from 'typeorm'; +import { DI } from '@/di-symbols.js'; +import { BlockingsRepository, DriveFilesRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import * as Acct from '@/misc/acct.js'; +import { ResolveUserService } from '@/core/remote/ResolveUserService.js'; +import { UserBlockingService } from '@/core/UserBlockingService.js'; +import { DownloadService } from '@/core/DownloadService.js'; +import { UtilityService } from '@/core/UtilityService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; +import type { DbUserImportJobData } from '../types.js'; + +@Injectable() +export class ImportBlockingProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.blockingsRepository) + private blockingsRepository: BlockingsRepository, + + @Inject(DI.driveFilesRepository) + private driveFilesRepository: DriveFilesRepository, + + private utilityService: UtilityService, + private userBlockingService: UserBlockingService, + private resolveUserService: ResolveUserService, + private downloadService: DownloadService, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('import-blocking'); + } + + public async process(job: Bull.Job<DbUserImportJobData>, done: () => void): Promise<void> { + this.#logger.info(`Importing blocking of ${job.data.user.id} ...`); + + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + done(); + return; + } + + const file = await this.driveFilesRepository.findOneBy({ + id: job.data.fileId, + }); + if (file == null) { + done(); + return; + } + + const csv = await this.downloadService.downloadTextFile(file.url); + + let linenum = 0; + + for (const line of csv.trim().split('\n')) { + linenum++; + + try { + const acct = line.split(',')[0].trim(); + const { username, host } = Acct.parse(acct); + + let target = this.utilityService.isSelfHost(host!) ? await this.usersRepository.findOneBy({ + host: IsNull(), + usernameLower: username.toLowerCase(), + }) : await this.usersRepository.findOneBy({ + host: this.utilityService.toPuny(host!), + usernameLower: username.toLowerCase(), + }); + + if (host == null && target == null) continue; + + if (target == null) { + target = await this.resolveUserService.resolveUser(username, host); + } + + if (target == null) { + throw `cannot resolve user: @${username}@${host}`; + } + + // skip myself + if (target.id === job.data.user.id) continue; + + this.#logger.info(`Block[${linenum}] ${target.id} ...`); + + await this.userBlockingService.block(user, target); + } catch (e) { + this.#logger.warn(`Error in line:${linenum} ${e}`); + } + } + + this.#logger.succ('Imported'); + done(); + } +} diff --git a/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts new file mode 100644 index 0000000000..6f86589aec --- /dev/null +++ b/packages/backend/src/queue/processors/ImportCustomEmojisProcessorService.ts @@ -0,0 +1,110 @@ +import * as fs from 'node:fs'; +import { Inject, Injectable } from '@nestjs/common'; +import { IsNull, MoreThan, DataSource } from 'typeorm'; +import unzipper from 'unzipper'; +import { DI } from '@/di-symbols.js'; +import { EmojisRepository, DriveFilesRepository, UsersRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { CustomEmojiService } from '@/core/CustomEmojiService.js'; +import { createTempDir } from '@/misc/create-temp.js'; +import { DriveService } from '@/core/DriveService.js'; +import { DownloadService } from '@/core/DownloadService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; +import type { DbUserImportJobData } from '../types.js'; + +// TODO: 名前衝突時の動作を選べるようにする +@Injectable() +export class ImportCustomEmojisProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.db) + private db: DataSource, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.driveFilesRepository) + private driveFilesRepository: DriveFilesRepository, + + @Inject(DI.emojisRepository) + private emojisRepository: EmojisRepository, + + private customEmojiService: CustomEmojiService, + private driveService: DriveService, + private downloadService: DownloadService, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('import-custom-emojis'); + } + + public async process(job: Bull.Job<DbUserImportJobData>, done: () => void): Promise<void> { + this.#logger.info('Importing custom emojis ...'); + + const file = await this.driveFilesRepository.findOneBy({ + id: job.data.fileId, + }); + if (file == null) { + done(); + return; + } + + const [path, cleanup] = await createTempDir(); + + this.#logger.info(`Temp dir is ${path}`); + + const destPath = path + '/emojis.zip'; + + try { + fs.writeFileSync(destPath, '', 'binary'); + await this.downloadService.downloadUrl(file.url, destPath); + } catch (e) { // TODO: 何度か再試行 + if (e instanceof Error || typeof e === 'string') { + this.#logger.error(e); + } + throw e; + } + + const outputPath = path + '/emojis'; + const unzipStream = fs.createReadStream(destPath); + const extractor = unzipper.Extract({ path: outputPath }); + extractor.on('close', async () => { + const metaRaw = fs.readFileSync(outputPath + '/meta.json', 'utf-8'); + const meta = JSON.parse(metaRaw); + + for (const record of meta.emojis) { + if (!record.downloaded) continue; + const emojiInfo = record.emoji; + const emojiPath = outputPath + '/' + record.fileName; + await this.emojisRepository.delete({ + name: emojiInfo.name, + }); + const driveFile = await this.driveService.addFile({ + user: null, + path: emojiPath, + name: record.fileName, + force: true, + }); + await this.customEmojiService.add({ + name: emojiInfo.name, + category: emojiInfo.category, + host: null, + aliases: emojiInfo.aliases, + driveFile, + }); + } + + cleanup(); + + this.#logger.succ('Imported'); + done(); + }); + unzipStream.pipe(extractor); + this.#logger.succ(`Unzipping to ${outputPath}`); + } +} diff --git a/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts b/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts new file mode 100644 index 0000000000..087e0baf96 --- /dev/null +++ b/packages/backend/src/queue/processors/ImportFollowingProcessorService.ts @@ -0,0 +1,99 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { IsNull, MoreThan } from 'typeorm'; +import { DI } from '@/di-symbols.js'; +import { DriveFilesRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import * as Acct from '@/misc/acct.js'; +import { ResolveUserService } from '@/core/remote/ResolveUserService.js'; +import { DownloadService } from '@/core/DownloadService.js'; +import { UserFollowingService } from '@/core/UserFollowingService.js'; +import { UtilityService } from '@/core/UtilityService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; +import type { DbUserImportJobData } from '../types.js'; + +@Injectable() +export class ImportFollowingProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.driveFilesRepository) + private driveFilesRepository: DriveFilesRepository, + + private utilityService: UtilityService, + private userFollowingService: UserFollowingService, + private resolveUserService: ResolveUserService, + private downloadService: DownloadService, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('import-following'); + } + + public async process(job: Bull.Job<DbUserImportJobData>, done: () => void): Promise<void> { + this.#logger.info(`Importing following of ${job.data.user.id} ...`); + + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + done(); + return; + } + + const file = await this.driveFilesRepository.findOneBy({ + id: job.data.fileId, + }); + if (file == null) { + done(); + return; + } + + const csv = await this.downloadService.downloadTextFile(file.url); + + let linenum = 0; + + for (const line of csv.trim().split('\n')) { + linenum++; + + try { + const acct = line.split(',')[0].trim(); + const { username, host } = Acct.parse(acct); + + let target = this.utilityService.isSelfHost(host!) ? await this.usersRepository.findOneBy({ + host: IsNull(), + usernameLower: username.toLowerCase(), + }) : await this.usersRepository.findOneBy({ + host: this.utilityService.toPuny(host!), + usernameLower: username.toLowerCase(), + }); + + if (host == null && target == null) continue; + + if (target == null) { + target = await this.resolveUserService.resolveUser(username, host); + } + + if (target == null) { + throw `cannot resolve user: @${username}@${host}`; + } + + // skip myself + if (target.id === job.data.user.id) continue; + + this.#logger.info(`Follow[${linenum}] ${target.id} ...`); + + this.userFollowingService.follow(user, target); + } catch (e) { + this.#logger.warn(`Error in line:${linenum} ${e}`); + } + } + + this.#logger.succ('Imported'); + done(); + } +} diff --git a/packages/backend/src/queue/processors/ImportMutingProcessorService.ts b/packages/backend/src/queue/processors/ImportMutingProcessorService.ts new file mode 100644 index 0000000000..404091e8ca --- /dev/null +++ b/packages/backend/src/queue/processors/ImportMutingProcessorService.ts @@ -0,0 +1,100 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { IsNull, MoreThan } from 'typeorm'; +import { DI } from '@/di-symbols.js'; +import { DriveFilesRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import * as Acct from '@/misc/acct.js'; +import { ResolveUserService } from '@/core/remote/ResolveUserService.js'; +import { DownloadService } from '@/core/DownloadService.js'; +import type { UserFollowingService } from '@/core/UserFollowingService.js'; +import { UserMutingService } from '@/core/UserMutingService.js'; +import { UtilityService } from '@/core/UtilityService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; +import type { DbUserImportJobData } from '../types.js'; + +@Injectable() +export class ImportMutingProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.driveFilesRepository) + private driveFilesRepository: DriveFilesRepository, + + private utilityService: UtilityService, + private userMutingService: UserMutingService, + private resolveUserService: ResolveUserService, + private downloadService: DownloadService, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('import-muting'); + } + + public async process(job: Bull.Job<DbUserImportJobData>, done: () => void): Promise<void> { + this.#logger.info(`Importing muting of ${job.data.user.id} ...`); + + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + done(); + return; + } + + const file = await this.driveFilesRepository.findOneBy({ + id: job.data.fileId, + }); + if (file == null) { + done(); + return; + } + + const csv = await this.downloadService.downloadTextFile(file.url); + + let linenum = 0; + + for (const line of csv.trim().split('\n')) { + linenum++; + + try { + const acct = line.split(',')[0].trim(); + const { username, host } = Acct.parse(acct); + + let target = this.utilityService.isSelfHost(host!) ? await this.usersRepository.findOneBy({ + host: IsNull(), + usernameLower: username.toLowerCase(), + }) : await this.usersRepository.findOneBy({ + host: this.utilityService.toPuny(host!), + usernameLower: username.toLowerCase(), + }); + + if (host == null && target == null) continue; + + if (target == null) { + target = await this.resolveUserService.resolveUser(username, host); + } + + if (target == null) { + throw `cannot resolve user: @${username}@${host}`; + } + + // skip myself + if (target.id === job.data.user.id) continue; + + this.#logger.info(`Mute[${linenum}] ${target.id} ...`); + + await this.userMutingService.mute(user, target); + } catch (e) { + this.#logger.warn(`Error in line:${linenum} ${e}`); + } + } + + this.#logger.succ('Imported'); + done(); + } +} diff --git a/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts b/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts new file mode 100644 index 0000000000..aed1a4cde5 --- /dev/null +++ b/packages/backend/src/queue/processors/ImportUserListsProcessorService.ts @@ -0,0 +1,112 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { IsNull, MoreThan } from 'typeorm'; +import { DI } from '@/di-symbols.js'; +import { DriveFilesRepository, UserListJoiningsRepository, UserListsRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import * as Acct from '@/misc/acct.js'; +import { ResolveUserService } from '@/core/remote/ResolveUserService.js'; +import { DownloadService } from '@/core/DownloadService.js'; +import { UserListService } from '@/core/UserListService.js'; +import { IdService } from '@/core/IdService.js'; +import { UtilityService } from '@/core/UtilityService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; +import type { DbUserImportJobData } from '../types.js'; + +@Injectable() +export class ImportUserListsProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.usersRepository) + private usersRepository: UsersRepository, + + @Inject(DI.driveFilesRepository) + private driveFilesRepository: DriveFilesRepository, + + @Inject(DI.userListsRepository) + private userListsRepository: UserListsRepository, + + @Inject(DI.userListJoiningsRepository) + private userListJoiningsRepository: UserListJoiningsRepository, + + private utilityService: UtilityService, + private idService: IdService, + private userListService: UserListService, + private resolveUserService: ResolveUserService, + private downloadService: DownloadService, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('import-user-lists'); + } + + public async process(job: Bull.Job<DbUserImportJobData>, done: () => void): Promise<void> { + this.#logger.info(`Importing user lists of ${job.data.user.id} ...`); + + const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); + if (user == null) { + done(); + return; + } + + const file = await this.driveFilesRepository.findOneBy({ + id: job.data.fileId, + }); + if (file == null) { + done(); + return; + } + + const csv = await this.downloadService.downloadTextFile(file.url); + + let linenum = 0; + + for (const line of csv.trim().split('\n')) { + linenum++; + + try { + const listName = line.split(',')[0].trim(); + const { username, host } = Acct.parse(line.split(',')[1].trim()); + + let list = await this.userListsRepository.findOneBy({ + userId: user.id, + name: listName, + }); + + if (list == null) { + list = await this.userListsRepository.insert({ + id: this.idService.genId(), + createdAt: new Date(), + userId: user.id, + name: listName, + }).then(x => this.userListsRepository.findOneByOrFail(x.identifiers[0])); + } + + let target = this.utilityService.isSelfHost(host!) ? await this.usersRepository.findOneBy({ + host: IsNull(), + usernameLower: username.toLowerCase(), + }) : await this.usersRepository.findOneBy({ + host: this.utilityService.toPuny(host!), + usernameLower: username.toLowerCase(), + }); + + if (target == null) { + target = await this.resolveUserService.resolveUser(username, host); + } + + if (await this.userListJoiningsRepository.findOneBy({ userListId: list!.id, userId: target.id }) != null) continue; + + this.userListService.push(target, list!); + } catch (e) { + this.#logger.warn(`Error in line:${linenum} ${e}`); + } + } + + this.#logger.succ('Imported'); + done(); + } +} diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts new file mode 100644 index 0000000000..5733f5d0a9 --- /dev/null +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -0,0 +1,195 @@ +import { URL } from 'node:url'; +import { Inject, Injectable } from '@nestjs/common'; +import { MoreThan } from 'typeorm'; +import httpSignature from '@peertube/http-signature'; +import { DI } from '@/di-symbols.js'; +import { DriveFilesRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { MetaService } from '@/core/MetaService.js'; +import { ApRequestService } from '@/core/remote/activitypub/ApRequestService.js'; +import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; +import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js'; +import { Cache } from '@/misc/cache.js'; +import type { Instance } from '@/models/entities/Instance.js'; +import InstanceChart from '@/core/chart/charts/instance.js'; +import ApRequestChart from '@/core/chart/charts/ap-request.js'; +import FederationChart from '@/core/chart/charts/federation.js'; +import { getApId } from '@/core/remote/activitypub/type.js'; +import type { CacheableRemoteUser } from '@/models/entities/User.js'; +import type { UserPublickey } from '@/models/entities/UserPublickey.js'; +import { ApDbResolverService } from '@/core/remote/activitypub/ApDbResolverService.js'; +import { StatusError } from '@/misc/status-error.js'; +import { UtilityService } from '@/core/UtilityService.js'; +import { ApPersonService } from '@/core/remote/activitypub/models/ApPersonService.js'; +import { LdSignatureService } from '@/core/remote/activitypub/LdSignatureService.js'; +import { ApInboxService } from '@/core/remote/activitypub/ApInboxService.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; +import type { DeliverJobData, InboxJobData } from '../types.js'; + +// ユーザーのinboxにアクティビティが届いた時の処理 +@Injectable() +export class InboxProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.instancesRepository) + private instancesRepository: InstancesRepository, + + @Inject(DI.driveFilesRepository) + private driveFilesRepository: DriveFilesRepository, + + private utilityService: UtilityService, + private metaService: MetaService, + private apInboxService: ApInboxService, + private federatedInstanceService: FederatedInstanceService, + private fetchInstanceMetadataService: FetchInstanceMetadataService, + private ldSignatureService: LdSignatureService, + private apRequestService: ApRequestService, + private apPersonService: ApPersonService, + private apDbResolverService: ApDbResolverService, + private instanceChart: InstanceChart, + private apRequestChart: ApRequestChart, + private federationChart: FederationChart, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('inbox'); + } + + public async process(job: Bull.Job<InboxJobData>): Promise<string> { + const signature = job.data.signature; // HTTP-signature + const activity = job.data.activity; + + //#region Log + const info = Object.assign({}, activity) as any; + delete info['@context']; + this.#logger.debug(JSON.stringify(info, null, 2)); + //#endregion + + const host = this.utilityService.toPuny(new URL(signature.keyId).hostname); + + // ブロックしてたら中断 + const meta = await this.metaService.fetch(); + if (meta.blockedHosts.includes(host)) { + return `Blocked request: ${host}`; + } + + const keyIdLower = signature.keyId.toLowerCase(); + if (keyIdLower.startsWith('acct:')) { + return `Old keyId is no longer supported. ${keyIdLower}`; + } + + // HTTP-Signature keyIdを元にDBから取得 + let authUser: { + user: CacheableRemoteUser; + key: UserPublickey | null; + } | null = await this.apDbResolverService.getAuthUserFromKeyId(signature.keyId); + + // keyIdでわからなければ、activity.actorを元にDBから取得 || activity.actorを元にリモートから取得 + if (authUser == null) { + try { + authUser = await this.apDbResolverService.getAuthUserFromApId(getApId(activity.actor)); + } catch (err) { + // 対象が4xxならスキップ + if (err instanceof StatusError) { + if (err.isClientError) { + return `skip: Ignored deleted actors on both ends ${activity.actor} - ${err.statusCode}`; + } + throw `Error in actor ${activity.actor} - ${err.statusCode ?? err}`; + } + } + } + + // それでもわからなければ終了 + if (authUser == null) { + return 'skip: failed to resolve user'; + } + + // publicKey がなくても終了 + if (authUser.key == null) { + return 'skip: failed to resolve user publicKey'; + } + + // HTTP-Signatureの検証 + const httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem); + + // また、signatureのsignerは、activity.actorと一致する必要がある + if (!httpSignatureValidated || authUser.user.uri !== activity.actor) { + // 一致しなくても、でもLD-Signatureがありそうならそっちも見る + if (activity.signature) { + if (activity.signature.type !== 'RsaSignature2017') { + return `skip: unsupported LD-signature type ${activity.signature.type}`; + } + + // activity.signature.creator: https://example.oom/users/user#main-key + // みたいになっててUserを引っ張れば公開キーも入ることを期待する + if (activity.signature.creator) { + const candicate = activity.signature.creator.replace(/#.*/, ''); + await this.apPersonService.resolvePerson(candicate).catch(() => null); + } + + // keyIdからLD-Signatureのユーザーを取得 + authUser = await this.apDbResolverService.getAuthUserFromKeyId(activity.signature.creator); + if (authUser == null) { + return 'skip: LD-Signatureのユーザーが取得できませんでした'; + } + + if (authUser.key == null) { + return 'skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした'; + } + + // LD-Signature検証 + const ldSignature = this.ldSignatureService.use(); + const verified = await ldSignature.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false); + if (!verified) { + return 'skip: LD-Signatureの検証に失敗しました'; + } + + // もう一度actorチェック + if (authUser.user.uri !== activity.actor) { + return `skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`; + } + + // ブロックしてたら中断 + const ldHost = this.utilityService.extractDbHost(authUser.user.uri); + if (meta.blockedHosts.includes(ldHost)) { + return `Blocked request: ${ldHost}`; + } + } else { + return `skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`; + } + } + + // activity.idがあればホストが署名者のホストであることを確認する + if (typeof activity.id === 'string') { + const signerHost = this.utilityService.extractDbHost(authUser.user.uri!); + const activityIdHost = this.utilityService.extractDbHost(activity.id); + if (signerHost !== activityIdHost) { + return `skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`; + } + } + + // Update stats + this.federatedInstanceService.registerOrFetchInstanceDoc(authUser.user.host).then(i => { + this.instancesRepository.update(i.id, { + latestRequestReceivedAt: new Date(), + lastCommunicatedAt: new Date(), + isNotResponding: false, + }); + + this.fetchInstanceMetadataService.fetchInstanceMetadata(i); + + this.instanceChart.requestReceived(i.host); + this.apRequestChart.inbox(); + this.federationChart.inbox(i.host); + }); + + // アクティビティを処理 + await this.apInboxService.performActivity(authUser.user, activity); + return 'ok'; + } +} diff --git a/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts new file mode 100644 index 0000000000..f976232a24 --- /dev/null +++ b/packages/backend/src/queue/processors/ResyncChartsProcessorService.ts @@ -0,0 +1,61 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { In, MoreThan } from 'typeorm'; +import { DI } from '@/di-symbols.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import FederationChart from '@/core/chart/charts/federation.js'; +import NotesChart from '@/core/chart/charts/notes.js'; +import UsersChart from '@/core/chart/charts/users.js'; +import ActiveUsersChart from '@/core/chart/charts/active-users.js'; +import InstanceChart from '@/core/chart/charts/instance.js'; +import PerUserNotesChart from '@/core/chart/charts/per-user-notes.js'; +import DriveChart from '@/core/chart/charts/drive.js'; +import PerUserReactionsChart from '@/core/chart/charts/per-user-reactions.js'; +import HashtagChart from '@/core/chart/charts/hashtag.js'; +import PerUserFollowingChart from '@/core/chart/charts/per-user-following.js'; +import PerUserDriveChart from '@/core/chart/charts/per-user-drive.js'; +import ApRequestChart from '@/core/chart/charts/ap-request.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; + +@Injectable() +export class ResyncChartsProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + private federationChart: FederationChart, + private notesChart: NotesChart, + private usersChart: UsersChart, + private activeUsersChart: ActiveUsersChart, + private instanceChart: InstanceChart, + private perUserNotesChart: PerUserNotesChart, + private driveChart: DriveChart, + private perUserReactionsChart: PerUserReactionsChart, + private hashtagChart: HashtagChart, + private perUserFollowingChart: PerUserFollowingChart, + private perUserDriveChart: PerUserDriveChart, + private apRequestChart: ApRequestChart, + + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('resync-charts'); + } + + public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> { + this.#logger.info('Resync charts...'); + + // TODO: ユーザーごとのチャートも更新する + // TODO: インスタンスごとのチャートも更新する + await Promise.all([ + this.driveChart.resync(), + this.notesChart.resync(), + this.usersChart.resync(), + ]); + + this.#logger.succ('All charts successfully resynced.'); + done(); + } +} diff --git a/packages/backend/src/queue/processors/TickChartsProcessorService.ts b/packages/backend/src/queue/processors/TickChartsProcessorService.ts new file mode 100644 index 0000000000..d1ca3c4576 --- /dev/null +++ b/packages/backend/src/queue/processors/TickChartsProcessorService.ts @@ -0,0 +1,68 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { In, MoreThan } from 'typeorm'; +import { DI } from '@/di-symbols.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import FederationChart from '@/core/chart/charts/federation.js'; +import NotesChart from '@/core/chart/charts/notes.js'; +import UsersChart from '@/core/chart/charts/users.js'; +import ActiveUsersChart from '@/core/chart/charts/active-users.js'; +import InstanceChart from '@/core/chart/charts/instance.js'; +import PerUserNotesChart from '@/core/chart/charts/per-user-notes.js'; +import DriveChart from '@/core/chart/charts/drive.js'; +import PerUserReactionsChart from '@/core/chart/charts/per-user-reactions.js'; +import HashtagChart from '@/core/chart/charts/hashtag.js'; +import PerUserFollowingChart from '@/core/chart/charts/per-user-following.js'; +import PerUserDriveChart from '@/core/chart/charts/per-user-drive.js'; +import ApRequestChart from '@/core/chart/charts/ap-request.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; + +@Injectable() +export class TickChartsProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + private federationChart: FederationChart, + private notesChart: NotesChart, + private usersChart: UsersChart, + private activeUsersChart: ActiveUsersChart, + private instanceChart: InstanceChart, + private perUserNotesChart: PerUserNotesChart, + private driveChart: DriveChart, + private perUserReactionsChart: PerUserReactionsChart, + private hashtagChart: HashtagChart, + private perUserFollowingChart: PerUserFollowingChart, + private perUserDriveChart: PerUserDriveChart, + private apRequestChart: ApRequestChart, + + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('tick-charts'); + } + + public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> { + this.#logger.info('Tick charts...'); + + await Promise.all([ + this.federationChart.tick(false), + this.notesChart.tick(false), + this.usersChart.tick(false), + this.activeUsersChart.tick(false), + this.instanceChart.tick(false), + this.perUserNotesChart.tick(false), + this.driveChart.tick(false), + this.perUserReactionsChart.tick(false), + this.hashtagChart.tick(false), + this.perUserFollowingChart.tick(false), + this.perUserDriveChart.tick(false), + this.apRequestChart.tick(false), + ]); + + this.#logger.succ('All charts successfully ticked.'); + done(); + } +} diff --git a/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts new file mode 100644 index 0000000000..5723fe2eeb --- /dev/null +++ b/packages/backend/src/queue/processors/WebhookDeliverProcessorService.ts @@ -0,0 +1,79 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { IsNull, MoreThan } from 'typeorm'; +import { DI } from '@/di-symbols.js'; +import { WebhooksRepository } from '@/models/index.js'; +import { Config } from '@/config.js'; +import type Logger from '@/logger.js'; +import { HttpRequestService } from '@/core/HttpRequestService.js'; +import { StatusError } from '@/misc/status-error.js'; +import { QueueLoggerService } from '../QueueLoggerService.js'; +import type Bull from 'bull'; +import type { WebhookDeliverJobData } from '../types.js'; + +@Injectable() +export class WebhookDeliverProcessorService { + #logger: Logger; + + constructor( + @Inject(DI.config) + private config: Config, + + @Inject(DI.webhooksRepository) + private webhooksRepository: WebhooksRepository, + + private httpRequestService: HttpRequestService, + private queueLoggerService: QueueLoggerService, + ) { + this.#logger = this.queueLoggerService.logger.createSubLogger('webhook'); + } + + public async process(job: Bull.Job<WebhookDeliverJobData>): Promise<string> { + try { + this.#logger.debug(`delivering ${job.data.webhookId}`); + + const res = await this.httpRequestService.getResponse({ + url: job.data.to, + method: 'POST', + headers: { + 'User-Agent': 'Misskey-Hooks', + 'X-Misskey-Host': this.config.host, + 'X-Misskey-Hook-Id': job.data.webhookId, + 'X-Misskey-Hook-Secret': job.data.secret, + }, + body: JSON.stringify({ + hookId: job.data.webhookId, + userId: job.data.userId, + eventId: job.data.eventId, + createdAt: job.data.createdAt, + type: job.data.type, + body: job.data.content, + }), + }); + + this.webhooksRepository.update({ id: job.data.webhookId }, { + latestSentAt: new Date(), + latestStatus: res.status, + }); + + return 'Success'; + } catch (res) { + this.webhooksRepository.update({ id: job.data.webhookId }, { + latestSentAt: new Date(), + latestStatus: res instanceof StatusError ? res.statusCode : 1, + }); + + if (res instanceof StatusError) { + // 4xx + if (res.isClientError) { + return `${res.statusCode} ${res.statusMessage}`; + } + + // 5xx etc. + throw `${res.statusCode} ${res.statusMessage}`; + } else { + // DNS error, socket error, timeout ... + throw res; + } + } + } +} diff --git a/packages/backend/src/queue/processors/db/delete-account.ts b/packages/backend/src/queue/processors/db/delete-account.ts deleted file mode 100644 index c1657b4be6..0000000000 --- a/packages/backend/src/queue/processors/db/delete-account.ts +++ /dev/null @@ -1,94 +0,0 @@ -import Bull from 'bull'; -import { queueLogger } from '../../logger.js'; -import { DriveFiles, Notes, UserProfiles, Users } from '@/models/index.js'; -import { DbUserDeleteJobData } from '@/queue/types.js'; -import { Note } from '@/models/entities/note.js'; -import { DriveFile } from '@/models/entities/drive-file.js'; -import { MoreThan } from 'typeorm'; -import { deleteFileSync } from '@/services/drive/delete-file.js'; -import { sendEmail } from '@/services/send-email.js'; - -const logger = queueLogger.createSubLogger('delete-account'); - -export async function deleteAccount(job: Bull.Job<DbUserDeleteJobData>): Promise<string | void> { - logger.info(`Deleting account of ${job.data.user.id} ...`); - - const user = await Users.findOneBy({ id: job.data.user.id }); - if (user == null) { - return; - } - - { // Delete notes - let cursor: Note['id'] | null = null; - - while (true) { - const notes = await Notes.find({ - where: { - userId: user.id, - ...(cursor ? { id: MoreThan(cursor) } : {}), - }, - take: 100, - order: { - id: 1, - }, - }) as Note[]; - - if (notes.length === 0) { - break; - } - - cursor = notes[notes.length - 1].id; - - await Notes.delete(notes.map(note => note.id)); - } - - logger.succ(`All of notes deleted`); - } - - { // Delete files - let cursor: DriveFile['id'] | null = null; - - while (true) { - const files = await DriveFiles.find({ - where: { - userId: user.id, - ...(cursor ? { id: MoreThan(cursor) } : {}), - }, - take: 10, - order: { - id: 1, - }, - }) as DriveFile[]; - - if (files.length === 0) { - break; - } - - cursor = files[files.length - 1].id; - - for (const file of files) { - await deleteFileSync(file); - } - } - - logger.succ(`All of files deleted`); - } - - { // Send email notification - const profile = await UserProfiles.findOneByOrFail({ userId: user.id }); - if (profile.email && profile.emailVerified) { - sendEmail(profile.email, 'Account deleted', - `Your account has been deleted.`, - `Your account has been deleted.`); - } - } - - // soft指定されている場合は物理削除しない - if (job.data.soft) { - // nop - } else { - await Users.delete(job.data.user.id); - } - - return 'Account deleted'; -} diff --git a/packages/backend/src/queue/processors/db/delete-drive-files.ts b/packages/backend/src/queue/processors/db/delete-drive-files.ts deleted file mode 100644 index b3832d9f04..0000000000 --- a/packages/backend/src/queue/processors/db/delete-drive-files.ts +++ /dev/null @@ -1,56 +0,0 @@ -import Bull from 'bull'; - -import { queueLogger } from '../../logger.js'; -import { deleteFileSync } from '@/services/drive/delete-file.js'; -import { Users, DriveFiles } from '@/models/index.js'; -import { MoreThan } from 'typeorm'; -import { DbUserJobData } from '@/queue/types.js'; - -const logger = queueLogger.createSubLogger('delete-drive-files'); - -export async function deleteDriveFiles(job: Bull.Job<DbUserJobData>, done: any): Promise<void> { - logger.info(`Deleting drive files of ${job.data.user.id} ...`); - - const user = await Users.findOneBy({ id: job.data.user.id }); - if (user == null) { - done(); - return; - } - - let deletedCount = 0; - let cursor: any = null; - - while (true) { - const files = await DriveFiles.find({ - where: { - userId: user.id, - ...(cursor ? { id: MoreThan(cursor) } : {}), - }, - take: 100, - order: { - id: 1, - }, - }); - - if (files.length === 0) { - job.progress(100); - break; - } - - cursor = files[files.length - 1].id; - - for (const file of files) { - await deleteFileSync(file); - deletedCount++; - } - - const total = await DriveFiles.countBy({ - userId: user.id, - }); - - job.progress(deletedCount / total); - } - - logger.succ(`All drive files (${deletedCount}) of ${user.id} has been deleted.`); - done(); -} diff --git a/packages/backend/src/queue/processors/db/export-blocking.ts b/packages/backend/src/queue/processors/db/export-blocking.ts deleted file mode 100644 index f5e0424a79..0000000000 --- a/packages/backend/src/queue/processors/db/export-blocking.ts +++ /dev/null @@ -1,93 +0,0 @@ -import Bull from 'bull'; -import * as fs from 'node:fs'; - -import { queueLogger } from '../../logger.js'; -import { addFile } from '@/services/drive/add-file.js'; -import { format as dateFormat } from 'date-fns'; -import { getFullApAccount } from '@/misc/convert-host.js'; -import { createTemp } from '@/misc/create-temp.js'; -import { Users, Blockings } from '@/models/index.js'; -import { MoreThan } from 'typeorm'; -import { DbUserJobData } from '@/queue/types.js'; - -const logger = queueLogger.createSubLogger('export-blocking'); - -export async function exportBlocking(job: Bull.Job<DbUserJobData>, done: any): Promise<void> { - logger.info(`Exporting blocking of ${job.data.user.id} ...`); - - const user = await Users.findOneBy({ id: job.data.user.id }); - if (user == null) { - done(); - return; - } - - // Create temp file - const [path, cleanup] = await createTemp(); - - logger.info(`Temp file is ${path}`); - - try { - const stream = fs.createWriteStream(path, { flags: 'a' }); - - let exportedCount = 0; - let cursor: any = null; - - while (true) { - const blockings = await Blockings.find({ - where: { - blockerId: user.id, - ...(cursor ? { id: MoreThan(cursor) } : {}), - }, - take: 100, - order: { - id: 1, - }, - }); - - if (blockings.length === 0) { - job.progress(100); - break; - } - - cursor = blockings[blockings.length - 1].id; - - for (const block of blockings) { - const u = await Users.findOneBy({ id: block.blockeeId }); - if (u == null) { - exportedCount++; continue; - } - - const content = getFullApAccount(u.username, u.host); - await new Promise<void>((res, rej) => { - stream.write(content + '\n', err => { - if (err) { - logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - exportedCount++; - } - - const total = await Blockings.countBy({ - blockerId: user.id, - }); - - job.progress(exportedCount / total); - } - - stream.end(); - logger.succ(`Exported to: ${path}`); - - const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv'; - const driveFile = await addFile({ user, path, name: fileName, force: true }); - - logger.succ(`Exported to: ${driveFile.id}`); - } finally { - cleanup(); - } - - done(); -} diff --git a/packages/backend/src/queue/processors/db/export-custom-emojis.ts b/packages/backend/src/queue/processors/db/export-custom-emojis.ts deleted file mode 100644 index 3da887cda2..0000000000 --- a/packages/backend/src/queue/processors/db/export-custom-emojis.ts +++ /dev/null @@ -1,114 +0,0 @@ -import Bull from 'bull'; -import * as fs from 'node:fs'; - -import { ulid } from 'ulid'; -import mime from 'mime-types'; -import archiver from 'archiver'; -import { queueLogger } from '../../logger.js'; -import { addFile } from '@/services/drive/add-file.js'; -import { format as dateFormat } from 'date-fns'; -import { Users, Emojis } from '@/models/index.js'; -import { } from '@/queue/types.js'; -import { createTemp, createTempDir } from '@/misc/create-temp.js'; -import { downloadUrl } from '@/misc/download-url.js'; -import config from '@/config/index.js'; -import { IsNull } from 'typeorm'; - -const logger = queueLogger.createSubLogger('export-custom-emojis'); - -export async function exportCustomEmojis(job: Bull.Job, done: () => void): Promise<void> { - logger.info(`Exporting custom emojis ...`); - - const user = await Users.findOneBy({ id: job.data.user.id }); - if (user == null) { - done(); - return; - } - - const [path, cleanup] = await createTempDir(); - - logger.info(`Temp dir is ${path}`); - - const metaPath = path + '/meta.json'; - - fs.writeFileSync(metaPath, '', 'utf-8'); - - const metaStream = fs.createWriteStream(metaPath, { flags: 'a' }); - - const writeMeta = (text: string): Promise<void> => { - return new Promise<void>((res, rej) => { - metaStream.write(text, err => { - if (err) { - logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - }; - - await writeMeta(`{"metaVersion":2,"host":"${config.host}","exportedAt":"${new Date().toString()}","emojis":[`); - - const customEmojis = await Emojis.find({ - where: { - host: IsNull(), - }, - order: { - id: 'ASC', - }, - }); - - for (const emoji of customEmojis) { - const ext = mime.extension(emoji.type); - const fileName = emoji.name + (ext ? '.' + ext : ''); - const emojiPath = path + '/' + fileName; - fs.writeFileSync(emojiPath, '', 'binary'); - let downloaded = false; - - try { - await downloadUrl(emoji.originalUrl, emojiPath); - downloaded = true; - } catch (e) { // TODO: 何度か再試行 - logger.error(e instanceof Error ? e : new Error(e as string)); - } - - if (!downloaded) { - fs.unlinkSync(emojiPath); - } - - const content = JSON.stringify({ - fileName: fileName, - downloaded: downloaded, - emoji: emoji, - }); - const isFirst = customEmojis.indexOf(emoji) === 0; - - await writeMeta(isFirst ? content : ',\n' + content); - } - - await writeMeta(']}'); - - metaStream.end(); - - // Create archive - const [archivePath, archiveCleanup] = await createTemp(); - const archiveStream = fs.createWriteStream(archivePath); - const archive = archiver('zip', { - zlib: { level: 0 }, - }); - archiveStream.on('close', async () => { - logger.succ(`Exported to: ${archivePath}`); - - const fileName = 'custom-emojis-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.zip'; - const driveFile = await addFile({ user, path: archivePath, name: fileName, force: true }); - - logger.succ(`Exported to: ${driveFile.id}`); - cleanup(); - archiveCleanup(); - done(); - }); - archive.pipe(archiveStream); - archive.directory(path, false); - archive.finalize(); -} diff --git a/packages/backend/src/queue/processors/db/export-following.ts b/packages/backend/src/queue/processors/db/export-following.ts deleted file mode 100644 index 4ac165567b..0000000000 --- a/packages/backend/src/queue/processors/db/export-following.ts +++ /dev/null @@ -1,94 +0,0 @@ -import Bull from 'bull'; -import * as fs from 'node:fs'; - -import { queueLogger } from '../../logger.js'; -import { addFile } from '@/services/drive/add-file.js'; -import { format as dateFormat } from 'date-fns'; -import { getFullApAccount } from '@/misc/convert-host.js'; -import { createTemp } from '@/misc/create-temp.js'; -import { Users, Followings, Mutings } from '@/models/index.js'; -import { In, MoreThan, Not } from 'typeorm'; -import { DbUserJobData } from '@/queue/types.js'; -import { Following } from '@/models/entities/following.js'; - -const logger = queueLogger.createSubLogger('export-following'); - -export async function exportFollowing(job: Bull.Job<DbUserJobData>, done: () => void): Promise<void> { - logger.info(`Exporting following of ${job.data.user.id} ...`); - - const user = await Users.findOneBy({ id: job.data.user.id }); - if (user == null) { - done(); - return; - } - - // Create temp file - const [path, cleanup] = await createTemp(); - - logger.info(`Temp file is ${path}`); - - try { - const stream = fs.createWriteStream(path, { flags: 'a' }); - - let cursor: Following['id'] | null = null; - - const mutings = job.data.excludeMuting ? await Mutings.findBy({ - muterId: user.id, - }) : []; - - while (true) { - const followings = await Followings.find({ - where: { - followerId: user.id, - ...(mutings.length > 0 ? { followeeId: Not(In(mutings.map(x => x.muteeId))) } : {}), - ...(cursor ? { id: MoreThan(cursor) } : {}), - }, - take: 100, - order: { - id: 1, - }, - }) as Following[]; - - if (followings.length === 0) { - break; - } - - cursor = followings[followings.length - 1].id; - - for (const following of followings) { - const u = await Users.findOneBy({ id: following.followeeId }); - if (u == null) { - continue; - } - - if (job.data.excludeInactive && u.updatedAt && (Date.now() - u.updatedAt.getTime() > 1000 * 60 * 60 * 24 * 90)) { - continue; - } - - const content = getFullApAccount(u.username, u.host); - await new Promise<void>((res, rej) => { - stream.write(content + '\n', err => { - if (err) { - logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - } - } - - stream.end(); - logger.succ(`Exported to: ${path}`); - - const fileName = 'following-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv'; - const driveFile = await addFile({ user, path, name: fileName, force: true }); - - logger.succ(`Exported to: ${driveFile.id}`); - } finally { - cleanup(); - } - - done(); -} diff --git a/packages/backend/src/queue/processors/db/export-mute.ts b/packages/backend/src/queue/processors/db/export-mute.ts deleted file mode 100644 index 6a36cfa072..0000000000 --- a/packages/backend/src/queue/processors/db/export-mute.ts +++ /dev/null @@ -1,94 +0,0 @@ -import Bull from 'bull'; -import * as fs from 'node:fs'; - -import { queueLogger } from '../../logger.js'; -import { addFile } from '@/services/drive/add-file.js'; -import { format as dateFormat } from 'date-fns'; -import { getFullApAccount } from '@/misc/convert-host.js'; -import { createTemp } from '@/misc/create-temp.js'; -import { Users, Mutings } from '@/models/index.js'; -import { IsNull, MoreThan } from 'typeorm'; -import { DbUserJobData } from '@/queue/types.js'; - -const logger = queueLogger.createSubLogger('export-mute'); - -export async function exportMute(job: Bull.Job<DbUserJobData>, done: any): Promise<void> { - logger.info(`Exporting mute of ${job.data.user.id} ...`); - - const user = await Users.findOneBy({ id: job.data.user.id }); - if (user == null) { - done(); - return; - } - - // Create temp file - const [path, cleanup] = await createTemp(); - - logger.info(`Temp file is ${path}`); - - try { - const stream = fs.createWriteStream(path, { flags: 'a' }); - - let exportedCount = 0; - let cursor: any = null; - - while (true) { - const mutes = await Mutings.find({ - where: { - muterId: user.id, - expiresAt: IsNull(), - ...(cursor ? { id: MoreThan(cursor) } : {}), - }, - take: 100, - order: { - id: 1, - }, - }); - - if (mutes.length === 0) { - job.progress(100); - break; - } - - cursor = mutes[mutes.length - 1].id; - - for (const mute of mutes) { - const u = await Users.findOneBy({ id: mute.muteeId }); - if (u == null) { - exportedCount++; continue; - } - - const content = getFullApAccount(u.username, u.host); - await new Promise<void>((res, rej) => { - stream.write(content + '\n', err => { - if (err) { - logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - exportedCount++; - } - - const total = await Mutings.countBy({ - muterId: user.id, - }); - - job.progress(exportedCount / total); - } - - stream.end(); - logger.succ(`Exported to: ${path}`); - - const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv'; - const driveFile = await addFile({ user, path, name: fileName, force: true }); - - logger.succ(`Exported to: ${driveFile.id}`); - } finally { - cleanup(); - } - - done(); -} diff --git a/packages/backend/src/queue/processors/db/export-notes.ts b/packages/backend/src/queue/processors/db/export-notes.ts deleted file mode 100644 index 051fcdf385..0000000000 --- a/packages/backend/src/queue/processors/db/export-notes.ts +++ /dev/null @@ -1,118 +0,0 @@ -import Bull from 'bull'; -import * as fs from 'node:fs'; - -import { queueLogger } from '../../logger.js'; -import { addFile } from '@/services/drive/add-file.js'; -import { format as dateFormat } from 'date-fns'; -import { Users, Notes, Polls } from '@/models/index.js'; -import { MoreThan } from 'typeorm'; -import { Note } from '@/models/entities/note.js'; -import { Poll } from '@/models/entities/poll.js'; -import { DbUserJobData } from '@/queue/types.js'; -import { createTemp } from '@/misc/create-temp.js'; - -const logger = queueLogger.createSubLogger('export-notes'); - -export async function exportNotes(job: Bull.Job<DbUserJobData>, done: any): Promise<void> { - logger.info(`Exporting notes of ${job.data.user.id} ...`); - - const user = await Users.findOneBy({ id: job.data.user.id }); - if (user == null) { - done(); - return; - } - - // Create temp file - const [path, cleanup] = await createTemp(); - - logger.info(`Temp file is ${path}`); - - try { - const stream = fs.createWriteStream(path, { flags: 'a' }); - - const write = (text: string): Promise<void> => { - return new Promise<void>((res, rej) => { - stream.write(text, err => { - if (err) { - logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - }; - - await write('['); - - let exportedNotesCount = 0; - let cursor: Note['id'] | null = null; - - while (true) { - const notes = await Notes.find({ - where: { - userId: user.id, - ...(cursor ? { id: MoreThan(cursor) } : {}), - }, - take: 100, - order: { - id: 1, - }, - }) as Note[]; - - if (notes.length === 0) { - job.progress(100); - break; - } - - cursor = notes[notes.length - 1].id; - - for (const note of notes) { - let poll: Poll | undefined; - if (note.hasPoll) { - poll = await Polls.findOneByOrFail({ noteId: note.id }); - } - const content = JSON.stringify(serialize(note, poll)); - const isFirst = exportedNotesCount === 0; - await write(isFirst ? content : ',\n' + content); - exportedNotesCount++; - } - - const total = await Notes.countBy({ - userId: user.id, - }); - - job.progress(exportedNotesCount / total); - } - - await write(']'); - - stream.end(); - logger.succ(`Exported to: ${path}`); - - const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json'; - const driveFile = await addFile({ user, path, name: fileName, force: true }); - - logger.succ(`Exported to: ${driveFile.id}`); - } finally { - cleanup(); - } - - done(); -} - -function serialize(note: Note, poll: Poll | null = null): Record<string, unknown> { - return { - id: note.id, - text: note.text, - createdAt: note.createdAt, - fileIds: note.fileIds, - replyId: note.replyId, - renoteId: note.renoteId, - poll: poll, - cw: note.cw, - visibility: note.visibility, - visibleUserIds: note.visibleUserIds, - localOnly: note.localOnly, - }; -} diff --git a/packages/backend/src/queue/processors/db/export-user-lists.ts b/packages/backend/src/queue/processors/db/export-user-lists.ts deleted file mode 100644 index 71dd72df27..0000000000 --- a/packages/backend/src/queue/processors/db/export-user-lists.ts +++ /dev/null @@ -1,70 +0,0 @@ -import Bull from 'bull'; -import * as fs from 'node:fs'; - -import { queueLogger } from '../../logger.js'; -import { addFile } from '@/services/drive/add-file.js'; -import { format as dateFormat } from 'date-fns'; -import { getFullApAccount } from '@/misc/convert-host.js'; -import { createTemp } from '@/misc/create-temp.js'; -import { Users, UserLists, UserListJoinings } from '@/models/index.js'; -import { In } from 'typeorm'; -import { DbUserJobData } from '@/queue/types.js'; - -const logger = queueLogger.createSubLogger('export-user-lists'); - -export async function exportUserLists(job: Bull.Job<DbUserJobData>, done: any): Promise<void> { - logger.info(`Exporting user lists of ${job.data.user.id} ...`); - - const user = await Users.findOneBy({ id: job.data.user.id }); - if (user == null) { - done(); - return; - } - - const lists = await UserLists.findBy({ - userId: user.id, - }); - - // Create temp file - const [path, cleanup] = await createTemp(); - - logger.info(`Temp file is ${path}`); - - try { - const stream = fs.createWriteStream(path, { flags: 'a' }); - - for (const list of lists) { - const joinings = await UserListJoinings.findBy({ userListId: list.id }); - const users = await Users.findBy({ - id: In(joinings.map(j => j.userId)), - }); - - for (const u of users) { - const acct = getFullApAccount(u.username, u.host); - const content = `${list.name},${acct}`; - await new Promise<void>((res, rej) => { - stream.write(content + '\n', err => { - if (err) { - logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - } - } - - stream.end(); - logger.succ(`Exported to: ${path}`); - - const fileName = 'user-lists-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.csv'; - const driveFile = await addFile({ user, path, name: fileName, force: true }); - - logger.succ(`Exported to: ${driveFile.id}`); - } finally { - cleanup(); - } - - done(); -} diff --git a/packages/backend/src/queue/processors/db/import-blocking.ts b/packages/backend/src/queue/processors/db/import-blocking.ts deleted file mode 100644 index 8bddf34bc2..0000000000 --- a/packages/backend/src/queue/processors/db/import-blocking.ts +++ /dev/null @@ -1,75 +0,0 @@ -import Bull from 'bull'; - -import { queueLogger } from '../../logger.js'; -import * as Acct from '@/misc/acct.js'; -import { resolveUser } from '@/remote/resolve-user.js'; -import { downloadTextFile } from '@/misc/download-text-file.js'; -import { isSelfHost, toPuny } from '@/misc/convert-host.js'; -import { Users, DriveFiles, Blockings } from '@/models/index.js'; -import { DbUserImportJobData } from '@/queue/types.js'; -import block from '@/services/blocking/create.js'; -import { IsNull } from 'typeorm'; - -const logger = queueLogger.createSubLogger('import-blocking'); - -export async function importBlocking(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> { - logger.info(`Importing blocking of ${job.data.user.id} ...`); - - const user = await Users.findOneBy({ id: job.data.user.id }); - if (user == null) { - done(); - return; - } - - const file = await DriveFiles.findOneBy({ - id: job.data.fileId, - }); - if (file == null) { - done(); - return; - } - - const csv = await downloadTextFile(file.url); - - let linenum = 0; - - for (const line of csv.trim().split('\n')) { - linenum++; - - try { - const acct = line.split(',')[0].trim(); - const { username, host } = Acct.parse(acct); - - let target = isSelfHost(host!) ? await Users.findOneBy({ - host: IsNull(), - usernameLower: username.toLowerCase(), - }) : await Users.findOneBy({ - host: toPuny(host!), - usernameLower: username.toLowerCase(), - }); - - if (host == null && target == null) continue; - - if (target == null) { - target = await resolveUser(username, host); - } - - if (target == null) { - throw `cannot resolve user: @${username}@${host}`; - } - - // skip myself - if (target.id === job.data.user.id) continue; - - logger.info(`Block[${linenum}] ${target.id} ...`); - - await block(user, target); - } catch (e) { - logger.warn(`Error in line:${linenum} ${e}`); - } - } - - logger.succ('Imported'); - done(); -} - diff --git a/packages/backend/src/queue/processors/db/import-custom-emojis.ts b/packages/backend/src/queue/processors/db/import-custom-emojis.ts deleted file mode 100644 index 64dfe85374..0000000000 --- a/packages/backend/src/queue/processors/db/import-custom-emojis.ts +++ /dev/null @@ -1,81 +0,0 @@ -import Bull from 'bull'; -import * as fs from 'node:fs'; -import unzipper from 'unzipper'; - -import { queueLogger } from '../../logger.js'; -import { createTempDir } from '@/misc/create-temp.js'; -import { downloadUrl } from '@/misc/download-url.js'; -import { DriveFiles, Emojis } from '@/models/index.js'; -import { DbUserImportJobData } from '@/queue/types.js'; -import { addFile } from '@/services/drive/add-file.js'; -import { genId } from '@/misc/gen-id.js'; -import { db } from '@/db/postgre.js'; - -const logger = queueLogger.createSubLogger('import-custom-emojis'); - -// TODO: 名前衝突時の動作を選べるようにする -export async function importCustomEmojis(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> { - logger.info(`Importing custom emojis ...`); - - const file = await DriveFiles.findOneBy({ - id: job.data.fileId, - }); - if (file == null) { - done(); - return; - } - - const [path, cleanup] = await createTempDir(); - - logger.info(`Temp dir is ${path}`); - - const destPath = path + '/emojis.zip'; - - try { - fs.writeFileSync(destPath, '', 'binary'); - await downloadUrl(file.url, destPath); - } catch (e) { // TODO: 何度か再試行 - if (e instanceof Error || typeof e === 'string') { - logger.error(e); - } - throw e; - } - - const outputPath = path + '/emojis'; - const unzipStream = fs.createReadStream(destPath); - const extractor = unzipper.Extract({ path: outputPath }); - extractor.on('close', async () => { - const metaRaw = fs.readFileSync(outputPath + '/meta.json', 'utf-8'); - const meta = JSON.parse(metaRaw); - - for (const record of meta.emojis) { - if (!record.downloaded) continue; - const emojiInfo = record.emoji; - const emojiPath = outputPath + '/' + record.fileName; - await Emojis.delete({ - name: emojiInfo.name, - }); - const driveFile = await addFile({ user: null, path: emojiPath, name: record.fileName, force: true }); - const emoji = await Emojis.insert({ - id: genId(), - updatedAt: new Date(), - name: emojiInfo.name, - category: emojiInfo.category, - host: null, - aliases: emojiInfo.aliases, - originalUrl: driveFile.url, - publicUrl: driveFile.webpublicUrl ?? driveFile.url, - type: driveFile.webpublicType ?? driveFile.type, - }).then(x => Emojis.findOneByOrFail(x.identifiers[0])); - } - - await db.queryResultCache!.remove(['meta_emojis']); - - cleanup(); - - logger.succ('Imported'); - done(); - }); - unzipStream.pipe(extractor); - logger.succ(`Unzipping to ${outputPath}`); -} diff --git a/packages/backend/src/queue/processors/db/import-following.ts b/packages/backend/src/queue/processors/db/import-following.ts deleted file mode 100644 index 8ce2c367d6..0000000000 --- a/packages/backend/src/queue/processors/db/import-following.ts +++ /dev/null @@ -1,74 +0,0 @@ -import Bull from 'bull'; - -import { queueLogger } from '../../logger.js'; -import follow from '@/services/following/create.js'; -import * as Acct from '@/misc/acct.js'; -import { resolveUser } from '@/remote/resolve-user.js'; -import { downloadTextFile } from '@/misc/download-text-file.js'; -import { isSelfHost, toPuny } from '@/misc/convert-host.js'; -import { Users, DriveFiles } from '@/models/index.js'; -import { DbUserImportJobData } from '@/queue/types.js'; -import { IsNull } from 'typeorm'; - -const logger = queueLogger.createSubLogger('import-following'); - -export async function importFollowing(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> { - logger.info(`Importing following of ${job.data.user.id} ...`); - - const user = await Users.findOneBy({ id: job.data.user.id }); - if (user == null) { - done(); - return; - } - - const file = await DriveFiles.findOneBy({ - id: job.data.fileId, - }); - if (file == null) { - done(); - return; - } - - const csv = await downloadTextFile(file.url); - - let linenum = 0; - - for (const line of csv.trim().split('\n')) { - linenum++; - - try { - const acct = line.split(',')[0].trim(); - const { username, host } = Acct.parse(acct); - - let target = isSelfHost(host!) ? await Users.findOneBy({ - host: IsNull(), - usernameLower: username.toLowerCase(), - }) : await Users.findOneBy({ - host: toPuny(host!), - usernameLower: username.toLowerCase(), - }); - - if (host == null && target == null) continue; - - if (target == null) { - target = await resolveUser(username, host); - } - - if (target == null) { - throw `cannot resolve user: @${username}@${host}`; - } - - // skip myself - if (target.id === job.data.user.id) continue; - - logger.info(`Follow[${linenum}] ${target.id} ...`); - - follow(user, target); - } catch (e) { - logger.warn(`Error in line:${linenum} ${e}`); - } - } - - logger.succ('Imported'); - done(); -} diff --git a/packages/backend/src/queue/processors/db/import-muting.ts b/packages/backend/src/queue/processors/db/import-muting.ts deleted file mode 100644 index 8552b797be..0000000000 --- a/packages/backend/src/queue/processors/db/import-muting.ts +++ /dev/null @@ -1,84 +0,0 @@ -import Bull from 'bull'; - -import { queueLogger } from '../../logger.js'; -import * as Acct from '@/misc/acct.js'; -import { resolveUser } from '@/remote/resolve-user.js'; -import { downloadTextFile } from '@/misc/download-text-file.js'; -import { isSelfHost, toPuny } from '@/misc/convert-host.js'; -import { Users, DriveFiles, Mutings } from '@/models/index.js'; -import { DbUserImportJobData } from '@/queue/types.js'; -import { User } from '@/models/entities/user.js'; -import { genId } from '@/misc/gen-id.js'; -import { IsNull } from 'typeorm'; - -const logger = queueLogger.createSubLogger('import-muting'); - -export async function importMuting(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> { - logger.info(`Importing muting of ${job.data.user.id} ...`); - - const user = await Users.findOneBy({ id: job.data.user.id }); - if (user == null) { - done(); - return; - } - - const file = await DriveFiles.findOneBy({ - id: job.data.fileId, - }); - if (file == null) { - done(); - return; - } - - const csv = await downloadTextFile(file.url); - - let linenum = 0; - - for (const line of csv.trim().split('\n')) { - linenum++; - - try { - const acct = line.split(',')[0].trim(); - const { username, host } = Acct.parse(acct); - - let target = isSelfHost(host!) ? await Users.findOneBy({ - host: IsNull(), - usernameLower: username.toLowerCase(), - }) : await Users.findOneBy({ - host: toPuny(host!), - usernameLower: username.toLowerCase(), - }); - - if (host == null && target == null) continue; - - if (target == null) { - target = await resolveUser(username, host); - } - - if (target == null) { - throw `cannot resolve user: @${username}@${host}`; - } - - // skip myself - if (target.id === job.data.user.id) continue; - - logger.info(`Mute[${linenum}] ${target.id} ...`); - - await mute(user, target); - } catch (e) { - logger.warn(`Error in line:${linenum} ${e}`); - } - } - - logger.succ('Imported'); - done(); -} - -async function mute(user: User, target: User) { - await Mutings.insert({ - id: genId(), - createdAt: new Date(), - muterId: user.id, - muteeId: target.id, - }); -} diff --git a/packages/backend/src/queue/processors/db/import-user-lists.ts b/packages/backend/src/queue/processors/db/import-user-lists.ts deleted file mode 100644 index 9919b7c53c..0000000000 --- a/packages/backend/src/queue/processors/db/import-user-lists.ts +++ /dev/null @@ -1,80 +0,0 @@ -import Bull from 'bull'; - -import { queueLogger } from '../../logger.js'; -import * as Acct from '@/misc/acct.js'; -import { resolveUser } from '@/remote/resolve-user.js'; -import { pushUserToUserList } from '@/services/user-list/push.js'; -import { downloadTextFile } from '@/misc/download-text-file.js'; -import { isSelfHost, toPuny } from '@/misc/convert-host.js'; -import { DriveFiles, Users, UserLists, UserListJoinings } from '@/models/index.js'; -import { genId } from '@/misc/gen-id.js'; -import { DbUserImportJobData } from '@/queue/types.js'; -import { IsNull } from 'typeorm'; - -const logger = queueLogger.createSubLogger('import-user-lists'); - -export async function importUserLists(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> { - logger.info(`Importing user lists of ${job.data.user.id} ...`); - - const user = await Users.findOneBy({ id: job.data.user.id }); - if (user == null) { - done(); - return; - } - - const file = await DriveFiles.findOneBy({ - id: job.data.fileId, - }); - if (file == null) { - done(); - return; - } - - const csv = await downloadTextFile(file.url); - - let linenum = 0; - - for (const line of csv.trim().split('\n')) { - linenum++; - - try { - const listName = line.split(',')[0].trim(); - const { username, host } = Acct.parse(line.split(',')[1].trim()); - - let list = await UserLists.findOneBy({ - userId: user.id, - name: listName, - }); - - if (list == null) { - list = await UserLists.insert({ - id: genId(), - createdAt: new Date(), - userId: user.id, - name: listName, - }).then(x => UserLists.findOneByOrFail(x.identifiers[0])); - } - - let target = isSelfHost(host!) ? await Users.findOneBy({ - host: IsNull(), - usernameLower: username.toLowerCase(), - }) : await Users.findOneBy({ - host: toPuny(host!), - usernameLower: username.toLowerCase(), - }); - - if (target == null) { - target = await resolveUser(username, host); - } - - if (await UserListJoinings.findOneBy({ userListId: list!.id, userId: target.id }) != null) continue; - - pushUserToUserList(target, list!); - } catch (e) { - logger.warn(`Error in line:${linenum} ${e}`); - } - } - - logger.succ('Imported'); - done(); -} diff --git a/packages/backend/src/queue/processors/db/index.ts b/packages/backend/src/queue/processors/db/index.ts deleted file mode 100644 index e91d569779..0000000000 --- a/packages/backend/src/queue/processors/db/index.ts +++ /dev/null @@ -1,37 +0,0 @@ -import Bull from 'bull'; -import { DbJobData } from '@/queue/types.js'; -import { deleteDriveFiles } from './delete-drive-files.js'; -import { exportCustomEmojis } from './export-custom-emojis.js'; -import { exportNotes } from './export-notes.js'; -import { exportFollowing } from './export-following.js'; -import { exportMute } from './export-mute.js'; -import { exportBlocking } from './export-blocking.js'; -import { exportUserLists } from './export-user-lists.js'; -import { importFollowing } from './import-following.js'; -import { importUserLists } from './import-user-lists.js'; -import { deleteAccount } from './delete-account.js'; -import { importMuting } from './import-muting.js'; -import { importBlocking } from './import-blocking.js'; -import { importCustomEmojis } from './import-custom-emojis.js'; - -const jobs = { - deleteDriveFiles, - exportCustomEmojis, - exportNotes, - exportFollowing, - exportMute, - exportBlocking, - exportUserLists, - importFollowing, - importMuting, - importBlocking, - importUserLists, - importCustomEmojis, - deleteAccount, -} as Record<string, Bull.ProcessCallbackFunction<DbJobData> | Bull.ProcessPromiseFunction<DbJobData>>; - -export default function(dbQueue: Bull.Queue<DbJobData>) { - for (const [k, v] of Object.entries(jobs)) { - dbQueue.process(k, v); - } -} diff --git a/packages/backend/src/queue/processors/deliver.ts b/packages/backend/src/queue/processors/deliver.ts deleted file mode 100644 index 291c05766e..0000000000 --- a/packages/backend/src/queue/processors/deliver.ts +++ /dev/null @@ -1,98 +0,0 @@ -import { URL } from 'node:url'; -import Bull from 'bull'; -import request from '@/remote/activitypub/request.js'; -import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc.js'; -import Logger from '@/services/logger.js'; -import { Instances } from '@/models/index.js'; -import { apRequestChart, federationChart, instanceChart } from '@/services/chart/index.js'; -import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata.js'; -import { fetchMeta } from '@/misc/fetch-meta.js'; -import { toPuny } from '@/misc/convert-host.js'; -import { Cache } from '@/misc/cache.js'; -import { Instance } from '@/models/entities/instance.js'; -import { DeliverJobData } from '../types.js'; -import { StatusError } from '@/misc/fetch.js'; - -const logger = new Logger('deliver'); - -let latest: string | null = null; - -const suspendedHostsCache = new Cache<Instance[]>(1000 * 60 * 60); - -export default async (job: Bull.Job<DeliverJobData>) => { - const { host } = new URL(job.data.to); - - // ブロックしてたら中断 - const meta = await fetchMeta(); - if (meta.blockedHosts.includes(toPuny(host))) { - return 'skip (blocked)'; - } - - // isSuspendedなら中断 - let suspendedHosts = suspendedHostsCache.get(null); - if (suspendedHosts == null) { - suspendedHosts = await Instances.find({ - where: { - isSuspended: true, - }, - }); - suspendedHostsCache.set(null, suspendedHosts); - } - if (suspendedHosts.map(x => x.host).includes(toPuny(host))) { - return 'skip (suspended)'; - } - - try { - if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) { - logger.debug(`delivering ${latest}`); - } - - await request(job.data.user, job.data.to, job.data.content); - - // Update stats - registerOrFetchInstanceDoc(host).then(i => { - Instances.update(i.id, { - latestRequestSentAt: new Date(), - latestStatus: 200, - lastCommunicatedAt: new Date(), - isNotResponding: false, - }); - - fetchInstanceMetadata(i); - - instanceChart.requestSent(i.host, true); - apRequestChart.deliverSucc(); - federationChart.deliverd(i.host, true); - }); - - return 'Success'; - } catch (res) { - // Update stats - registerOrFetchInstanceDoc(host).then(i => { - Instances.update(i.id, { - latestRequestSentAt: new Date(), - latestStatus: res instanceof StatusError ? res.statusCode : null, - isNotResponding: true, - }); - - instanceChart.requestSent(i.host, false); - apRequestChart.deliverFail(); - federationChart.deliverd(i.host, false); - }); - - if (res instanceof StatusError) { - // 4xx - if (res.isClientError) { - // HTTPステータスコード4xxはクライアントエラーであり、それはつまり - // 何回再送しても成功することはないということなのでエラーにはしないでおく - return `${res.statusCode} ${res.statusMessage}`; - } - - // 5xx etc. - throw `${res.statusCode} ${res.statusMessage}`; - } else { - // DNS error, socket error, timeout ... - throw res; - } - } -}; diff --git a/packages/backend/src/queue/processors/ended-poll-notification.ts b/packages/backend/src/queue/processors/ended-poll-notification.ts deleted file mode 100644 index 6151c96ad6..0000000000 --- a/packages/backend/src/queue/processors/ended-poll-notification.ts +++ /dev/null @@ -1,33 +0,0 @@ -import Bull from 'bull'; -import { In } from 'typeorm'; -import { Notes, Polls, PollVotes } from '@/models/index.js'; -import { queueLogger } from '../logger.js'; -import { EndedPollNotificationJobData } from '@/queue/types.js'; -import { createNotification } from '@/services/create-notification.js'; - -const logger = queueLogger.createSubLogger('ended-poll-notification'); - -export async function endedPollNotification(job: Bull.Job<EndedPollNotificationJobData>, done: any): Promise<void> { - const note = await Notes.findOneBy({ id: job.data.noteId }); - if (note == null || !note.hasPoll) { - done(); - return; - } - - const votes = await PollVotes.createQueryBuilder('vote') - .select('vote.userId') - .where('vote.noteId = :noteId', { noteId: note.id }) - .innerJoinAndSelect('vote.user', 'user') - .andWhere('user.host IS NULL') - .getMany(); - - const userIds = [...new Set([note.userId, ...votes.map(v => v.userId)])]; - - for (const userId of userIds) { - createNotification(userId, 'pollEnded', { - noteId: note.id, - }); - } - - done(); -} diff --git a/packages/backend/src/queue/processors/inbox.ts b/packages/backend/src/queue/processors/inbox.ts deleted file mode 100644 index 198dde6050..0000000000 --- a/packages/backend/src/queue/processors/inbox.ts +++ /dev/null @@ -1,157 +0,0 @@ -import { URL } from 'node:url'; -import Bull from 'bull'; -import httpSignature from '@peertube/http-signature'; -import perform from '@/remote/activitypub/perform.js'; -import Logger from '@/services/logger.js'; -import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc.js'; -import { Instances } from '@/models/index.js'; -import { apRequestChart, federationChart, instanceChart } from '@/services/chart/index.js'; -import { fetchMeta } from '@/misc/fetch-meta.js'; -import { toPuny, extractDbHost } from '@/misc/convert-host.js'; -import { getApId } from '@/remote/activitypub/type.js'; -import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata.js'; -import { InboxJobData } from '../types.js'; -import DbResolver from '@/remote/activitypub/db-resolver.js'; -import { resolvePerson } from '@/remote/activitypub/models/person.js'; -import { LdSignature } from '@/remote/activitypub/misc/ld-signature.js'; -import { StatusError } from '@/misc/fetch.js'; -import { CacheableRemoteUser } from '@/models/entities/user.js'; -import { UserPublickey } from '@/models/entities/user-publickey.js'; - -const logger = new Logger('inbox'); - -// ユーザーのinboxにアクティビティが届いた時の処理 -export default async (job: Bull.Job<InboxJobData>): Promise<string> => { - const signature = job.data.signature; // HTTP-signature - const activity = job.data.activity; - - //#region Log - const info = Object.assign({}, activity) as any; - delete info['@context']; - logger.debug(JSON.stringify(info, null, 2)); - //#endregion - - const host = toPuny(new URL(signature.keyId).hostname); - - // ブロックしてたら中断 - const meta = await fetchMeta(); - if (meta.blockedHosts.includes(host)) { - return `Blocked request: ${host}`; - } - - const keyIdLower = signature.keyId.toLowerCase(); - if (keyIdLower.startsWith('acct:')) { - return `Old keyId is no longer supported. ${keyIdLower}`; - } - - const dbResolver = new DbResolver(); - - // HTTP-Signature keyIdを元にDBから取得 - let authUser: { - user: CacheableRemoteUser; - key: UserPublickey | null; - } | null = await dbResolver.getAuthUserFromKeyId(signature.keyId); - - // keyIdでわからなければ、activity.actorを元にDBから取得 || activity.actorを元にリモートから取得 - if (authUser == null) { - try { - authUser = await dbResolver.getAuthUserFromApId(getApId(activity.actor)); - } catch (e) { - // 対象が4xxならスキップ - if (e instanceof StatusError) { - if (e.isClientError) { - return `skip: Ignored deleted actors on both ends ${activity.actor} - ${e.statusCode}`; - } - throw `Error in actor ${activity.actor} - ${e.statusCode || e}`; - } - } - } - - // それでもわからなければ終了 - if (authUser == null) { - return `skip: failed to resolve user`; - } - - // publicKey がなくても終了 - if (authUser.key == null) { - return `skip: failed to resolve user publicKey`; - } - - // HTTP-Signatureの検証 - const httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem); - - // また、signatureのsignerは、activity.actorと一致する必要がある - if (!httpSignatureValidated || authUser.user.uri !== activity.actor) { - // 一致しなくても、でもLD-Signatureがありそうならそっちも見る - if (activity.signature) { - if (activity.signature.type !== 'RsaSignature2017') { - return `skip: unsupported LD-signature type ${activity.signature.type}`; - } - - // activity.signature.creator: https://example.oom/users/user#main-key - // みたいになっててUserを引っ張れば公開キーも入ることを期待する - if (activity.signature.creator) { - const candicate = activity.signature.creator.replace(/#.*/, ''); - await resolvePerson(candicate).catch(() => null); - } - - // keyIdからLD-Signatureのユーザーを取得 - authUser = await dbResolver.getAuthUserFromKeyId(activity.signature.creator); - if (authUser == null) { - return `skip: LD-Signatureのユーザーが取得できませんでした`; - } - - if (authUser.key == null) { - return `skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした`; - } - - // LD-Signature検証 - const ldSignature = new LdSignature(); - const verified = await ldSignature.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false); - if (!verified) { - return `skip: LD-Signatureの検証に失敗しました`; - } - - // もう一度actorチェック - if (authUser.user.uri !== activity.actor) { - return `skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`; - } - - // ブロックしてたら中断 - const ldHost = extractDbHost(authUser.user.uri); - if (meta.blockedHosts.includes(ldHost)) { - return `Blocked request: ${ldHost}`; - } - } else { - return `skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`; - } - } - - // activity.idがあればホストが署名者のホストであることを確認する - if (typeof activity.id === 'string') { - const signerHost = extractDbHost(authUser.user.uri!); - const activityIdHost = extractDbHost(activity.id); - if (signerHost !== activityIdHost) { - return `skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`; - } - } - - // Update stats - registerOrFetchInstanceDoc(authUser.user.host).then(i => { - Instances.update(i.id, { - latestRequestReceivedAt: new Date(), - lastCommunicatedAt: new Date(), - isNotResponding: false, - }); - - fetchInstanceMetadata(i); - - instanceChart.requestReceived(i.host); - apRequestChart.inbox(); - federationChart.inbox(i.host); - }); - - // アクティビティを処理 - await perform(authUser.user, activity); - return `ok`; -}; diff --git a/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts b/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts deleted file mode 100644 index 77da162f6e..0000000000 --- a/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts +++ /dev/null @@ -1,50 +0,0 @@ -import Bull from 'bull'; - -import { queueLogger } from '../../logger.js'; -import { deleteFileSync } from '@/services/drive/delete-file.js'; -import { DriveFiles } from '@/models/index.js'; -import { MoreThan, Not, IsNull } from 'typeorm'; - -const logger = queueLogger.createSubLogger('clean-remote-files'); - -export default async function cleanRemoteFiles(job: Bull.Job<Record<string, unknown>>, done: any): Promise<void> { - logger.info(`Deleting cached remote files...`); - - let deletedCount = 0; - let cursor: any = null; - - while (true) { - const files = await DriveFiles.find({ - where: { - userHost: Not(IsNull()), - isLink: false, - ...(cursor ? { id: MoreThan(cursor) } : {}), - }, - take: 8, - order: { - id: 1, - }, - }); - - if (files.length === 0) { - job.progress(100); - break; - } - - cursor = files[files.length - 1].id; - - await Promise.all(files.map(file => deleteFileSync(file, true))); - - deletedCount += 8; - - const total = await DriveFiles.countBy({ - userHost: Not(IsNull()), - isLink: false, - }); - - job.progress(deletedCount / total); - } - - logger.succ(`All cahced remote files has been deleted.`); - done(); -} diff --git a/packages/backend/src/queue/processors/object-storage/delete-file.ts b/packages/backend/src/queue/processors/object-storage/delete-file.ts deleted file mode 100644 index c271e3ddd4..0000000000 --- a/packages/backend/src/queue/processors/object-storage/delete-file.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { ObjectStorageFileJobData } from '@/queue/types.js'; -import Bull from 'bull'; -import { deleteObjectStorageFile } from '@/services/drive/delete-file.js'; - -export default async (job: Bull.Job<ObjectStorageFileJobData>) => { - const key: string = job.data.key; - - await deleteObjectStorageFile(key); - - return 'Success'; -}; diff --git a/packages/backend/src/queue/processors/object-storage/index.ts b/packages/backend/src/queue/processors/object-storage/index.ts deleted file mode 100644 index ae6c481fea..0000000000 --- a/packages/backend/src/queue/processors/object-storage/index.ts +++ /dev/null @@ -1,15 +0,0 @@ -import Bull from 'bull'; -import { ObjectStorageJobData } from '@/queue/types.js'; -import deleteFile from './delete-file.js'; -import cleanRemoteFiles from './clean-remote-files.js'; - -const jobs = { - deleteFile, - cleanRemoteFiles, -} as Record<string, Bull.ProcessCallbackFunction<ObjectStorageJobData> | Bull.ProcessPromiseFunction<ObjectStorageJobData>>; - -export default function(q: Bull.Queue) { - for (const [k, v] of Object.entries(jobs)) { - q.process(k, 16, v); - } -} diff --git a/packages/backend/src/queue/processors/system/check-expired-mutings.ts b/packages/backend/src/queue/processors/system/check-expired-mutings.ts deleted file mode 100644 index 621269e7e1..0000000000 --- a/packages/backend/src/queue/processors/system/check-expired-mutings.ts +++ /dev/null @@ -1,30 +0,0 @@ -import Bull from 'bull'; -import { In } from 'typeorm'; -import { Mutings } from '@/models/index.js'; -import { queueLogger } from '../../logger.js'; -import { publishUserEvent } from '@/services/stream.js'; - -const logger = queueLogger.createSubLogger('check-expired-mutings'); - -export async function checkExpiredMutings(job: Bull.Job<Record<string, unknown>>, done: any): Promise<void> { - logger.info(`Checking expired mutings...`); - - const expired = await Mutings.createQueryBuilder('muting') - .where('muting.expiresAt IS NOT NULL') - .andWhere('muting.expiresAt < :now', { now: new Date() }) - .innerJoinAndSelect('muting.mutee', 'mutee') - .getMany(); - - if (expired.length > 0) { - await Mutings.delete({ - id: In(expired.map(m => m.id)), - }); - - for (const m of expired) { - publishUserEvent(m.muterId, 'unmute', m.mutee!); - } - } - - logger.succ(`All expired mutings checked.`); - done(); -} diff --git a/packages/backend/src/queue/processors/system/clean-charts.ts b/packages/backend/src/queue/processors/system/clean-charts.ts deleted file mode 100644 index c9169d5acf..0000000000 --- a/packages/backend/src/queue/processors/system/clean-charts.ts +++ /dev/null @@ -1,28 +0,0 @@ -import Bull from 'bull'; - -import { queueLogger } from '../../logger.js'; -import { activeUsersChart, driveChart, federationChart, hashtagChart, instanceChart, notesChart, perUserDriveChart, perUserFollowingChart, perUserNotesChart, perUserReactionsChart, usersChart, apRequestChart } from '@/services/chart/index.js'; - -const logger = queueLogger.createSubLogger('clean-charts'); - -export async function cleanCharts(job: Bull.Job<Record<string, unknown>>, done: any): Promise<void> { - logger.info(`Clean charts...`); - - await Promise.all([ - federationChart.clean(), - notesChart.clean(), - usersChart.clean(), - activeUsersChart.clean(), - instanceChart.clean(), - perUserNotesChart.clean(), - driveChart.clean(), - perUserReactionsChart.clean(), - hashtagChart.clean(), - perUserFollowingChart.clean(), - perUserDriveChart.clean(), - apRequestChart.clean(), - ]); - - logger.succ(`All charts successfully cleaned.`); - done(); -} diff --git a/packages/backend/src/queue/processors/system/clean.ts b/packages/backend/src/queue/processors/system/clean.ts deleted file mode 100644 index c4f978d7c9..0000000000 --- a/packages/backend/src/queue/processors/system/clean.ts +++ /dev/null @@ -1,18 +0,0 @@ -import Bull from 'bull'; -import { LessThan } from 'typeorm'; -import { UserIps } from '@/models/index.js'; - -import { queueLogger } from '../../logger.js'; - -const logger = queueLogger.createSubLogger('clean'); - -export async function clean(job: Bull.Job<Record<string, unknown>>, done: any): Promise<void> { - logger.info('Cleaning...'); - - UserIps.delete({ - createdAt: LessThan(new Date(Date.now() - (1000 * 60 * 60 * 24 * 90))), - }); - - logger.succ('Cleaned.'); - done(); -} diff --git a/packages/backend/src/queue/processors/system/index.ts b/packages/backend/src/queue/processors/system/index.ts deleted file mode 100644 index 9527d40b0f..0000000000 --- a/packages/backend/src/queue/processors/system/index.ts +++ /dev/null @@ -1,20 +0,0 @@ -import Bull from 'bull'; -import { tickCharts } from './tick-charts.js'; -import { resyncCharts } from './resync-charts.js'; -import { cleanCharts } from './clean-charts.js'; -import { checkExpiredMutings } from './check-expired-mutings.js'; -import { clean } from './clean.js'; - -const jobs = { - tickCharts, - resyncCharts, - cleanCharts, - checkExpiredMutings, - clean, -} as Record<string, Bull.ProcessCallbackFunction<Record<string, unknown>> | Bull.ProcessPromiseFunction<Record<string, unknown>>>; - -export default function(dbQueue: Bull.Queue<Record<string, unknown>>) { - for (const [k, v] of Object.entries(jobs)) { - dbQueue.process(k, v); - } -} diff --git a/packages/backend/src/queue/processors/system/resync-charts.ts b/packages/backend/src/queue/processors/system/resync-charts.ts deleted file mode 100644 index 20012513af..0000000000 --- a/packages/backend/src/queue/processors/system/resync-charts.ts +++ /dev/null @@ -1,21 +0,0 @@ -import Bull from 'bull'; - -import { queueLogger } from '../../logger.js'; -import { driveChart, notesChart, usersChart } from '@/services/chart/index.js'; - -const logger = queueLogger.createSubLogger('resync-charts'); - -export async function resyncCharts(job: Bull.Job<Record<string, unknown>>, done: any): Promise<void> { - logger.info(`Resync charts...`); - - // TODO: ユーザーごとのチャートも更新する - // TODO: インスタンスごとのチャートも更新する - await Promise.all([ - driveChart.resync(), - notesChart.resync(), - usersChart.resync(), - ]); - - logger.succ(`All charts successfully resynced.`); - done(); -} diff --git a/packages/backend/src/queue/processors/system/tick-charts.ts b/packages/backend/src/queue/processors/system/tick-charts.ts deleted file mode 100644 index 13403f8f73..0000000000 --- a/packages/backend/src/queue/processors/system/tick-charts.ts +++ /dev/null @@ -1,28 +0,0 @@ -import Bull from 'bull'; - -import { queueLogger } from '../../logger.js'; -import { activeUsersChart, driveChart, federationChart, hashtagChart, instanceChart, notesChart, perUserDriveChart, perUserFollowingChart, perUserNotesChart, perUserReactionsChart, usersChart, apRequestChart } from '@/services/chart/index.js'; - -const logger = queueLogger.createSubLogger('tick-charts'); - -export async function tickCharts(job: Bull.Job<Record<string, unknown>>, done: any): Promise<void> { - logger.info(`Tick charts...`); - - await Promise.all([ - federationChart.tick(false), - notesChart.tick(false), - usersChart.tick(false), - activeUsersChart.tick(false), - instanceChart.tick(false), - perUserNotesChart.tick(false), - driveChart.tick(false), - perUserReactionsChart.tick(false), - hashtagChart.tick(false), - perUserFollowingChart.tick(false), - perUserDriveChart.tick(false), - apRequestChart.tick(false), - ]); - - logger.succ(`All charts successfully ticked.`); - done(); -} diff --git a/packages/backend/src/queue/processors/webhook-deliver.ts b/packages/backend/src/queue/processors/webhook-deliver.ts deleted file mode 100644 index d49206f68f..0000000000 --- a/packages/backend/src/queue/processors/webhook-deliver.ts +++ /dev/null @@ -1,59 +0,0 @@ -import { URL } from 'node:url'; -import Bull from 'bull'; -import Logger from '@/services/logger.js'; -import { WebhookDeliverJobData } from '../types.js'; -import { getResponse, StatusError } from '@/misc/fetch.js'; -import { Webhooks } from '@/models/index.js'; -import config from '@/config/index.js'; - -const logger = new Logger('webhook'); - -export default async (job: Bull.Job<WebhookDeliverJobData>) => { - try { - logger.debug(`delivering ${job.data.webhookId}`); - - const res = await getResponse({ - url: job.data.to, - method: 'POST', - headers: { - 'User-Agent': 'Misskey-Hooks', - 'X-Misskey-Host': config.host, - 'X-Misskey-Hook-Id': job.data.webhookId, - 'X-Misskey-Hook-Secret': job.data.secret, - }, - body: JSON.stringify({ - hookId: job.data.webhookId, - userId: job.data.userId, - eventId: job.data.eventId, - createdAt: job.data.createdAt, - type: job.data.type, - body: job.data.content, - }), - }); - - Webhooks.update({ id: job.data.webhookId }, { - latestSentAt: new Date(), - latestStatus: res.status, - }); - - return 'Success'; - } catch (res) { - Webhooks.update({ id: job.data.webhookId }, { - latestSentAt: new Date(), - latestStatus: res instanceof StatusError ? res.statusCode : 1, - }); - - if (res instanceof StatusError) { - // 4xx - if (res.isClientError) { - return `${res.statusCode} ${res.statusMessage}`; - } - - // 5xx etc. - throw `${res.statusCode} ${res.statusMessage}`; - } else { - // DNS error, socket error, timeout ... - throw res; - } - } -}; |