From 0e4a111f81cceed275d9bec2695f6e401fb654d8 Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 12 Nov 2021 02:02:25 +0900 Subject: refactoring Resolve #7779 --- packages/backend/src/queue/get-job-info.ts | 15 ++ packages/backend/src/queue/index.ts | 255 +++++++++++++++++++++ packages/backend/src/queue/initialize.ts | 33 +++ packages/backend/src/queue/logger.ts | 3 + .../src/queue/processors/db/delete-account.ts | 94 ++++++++ .../src/queue/processors/db/delete-drive-files.ts | 56 +++++ .../src/queue/processors/db/export-blocking.ts | 94 ++++++++ .../src/queue/processors/db/export-following.ts | 94 ++++++++ .../backend/src/queue/processors/db/export-mute.ts | 94 ++++++++ .../src/queue/processors/db/export-notes.ts | 133 +++++++++++ .../src/queue/processors/db/export-user-lists.ts | 71 ++++++ .../src/queue/processors/db/import-blocking.ts | 74 ++++++ .../src/queue/processors/db/import-following.ts | 73 ++++++ .../src/queue/processors/db/import-muting.ts | 83 +++++++ .../src/queue/processors/db/import-user-lists.ts | 80 +++++++ packages/backend/src/queue/processors/db/index.ts | 33 +++ packages/backend/src/queue/processors/deliver.ts | 94 ++++++++ packages/backend/src/queue/processors/inbox.ts | 149 ++++++++++++ .../object-storage/clean-remote-files.ts | 50 ++++ .../queue/processors/object-storage/delete-file.ts | 11 + .../src/queue/processors/object-storage/index.ts | 15 ++ .../backend/src/queue/processors/system/index.ts | 12 + .../src/queue/processors/system/resync-charts.ts | 21 ++ packages/backend/src/queue/queues.ts | 9 + packages/backend/src/queue/types.ts | 44 ++++ 25 files changed, 1690 insertions(+) create mode 100644 packages/backend/src/queue/get-job-info.ts create mode 100644 packages/backend/src/queue/index.ts create mode 100644 packages/backend/src/queue/initialize.ts create mode 100644 packages/backend/src/queue/logger.ts create mode 100644 packages/backend/src/queue/processors/db/delete-account.ts create mode 100644 packages/backend/src/queue/processors/db/delete-drive-files.ts create mode 100644 packages/backend/src/queue/processors/db/export-blocking.ts create mode 100644 packages/backend/src/queue/processors/db/export-following.ts create mode 100644 packages/backend/src/queue/processors/db/export-mute.ts create mode 100644 packages/backend/src/queue/processors/db/export-notes.ts create mode 100644 packages/backend/src/queue/processors/db/export-user-lists.ts create mode 100644 packages/backend/src/queue/processors/db/import-blocking.ts create mode 100644 packages/backend/src/queue/processors/db/import-following.ts create mode 100644 packages/backend/src/queue/processors/db/import-muting.ts create mode 100644 packages/backend/src/queue/processors/db/import-user-lists.ts create mode 100644 packages/backend/src/queue/processors/db/index.ts create mode 100644 packages/backend/src/queue/processors/deliver.ts create mode 100644 packages/backend/src/queue/processors/inbox.ts create mode 100644 packages/backend/src/queue/processors/object-storage/clean-remote-files.ts create mode 100644 packages/backend/src/queue/processors/object-storage/delete-file.ts create mode 100644 packages/backend/src/queue/processors/object-storage/index.ts create mode 100644 packages/backend/src/queue/processors/system/index.ts create mode 100644 packages/backend/src/queue/processors/system/resync-charts.ts create mode 100644 packages/backend/src/queue/queues.ts create mode 100644 packages/backend/src/queue/types.ts (limited to 'packages/backend/src/queue') diff --git a/packages/backend/src/queue/get-job-info.ts b/packages/backend/src/queue/get-job-info.ts new file mode 100644 index 0000000000..f601ae62d0 --- /dev/null +++ b/packages/backend/src/queue/get-job-info.ts @@ -0,0 +1,15 @@ +import * as Bull from 'bull'; + +export function getJobInfo(job: Bull.Job, increment = false) { + const age = Date.now() - job.timestamp; + + const formated = age > 60000 ? `${Math.floor(age / 1000 / 60)}m` + : age > 10000 ? `${Math.floor(age / 1000)}s` + : `${age}ms`; + + // onActiveとかonCompletedのattemptsMadeがなぜか0始まりなのでインクリメントする + const currentAttempts = job.attemptsMade + (increment ? 1 : 0); + const maxAttempts = job.opts ? job.opts.attempts : 0; + + return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`; +} diff --git a/packages/backend/src/queue/index.ts b/packages/backend/src/queue/index.ts new file mode 100644 index 0000000000..37eb809604 --- /dev/null +++ b/packages/backend/src/queue/index.ts @@ -0,0 +1,255 @@ +import * as httpSignature from 'http-signature'; + +import config from '@/config/index'; +import { envOption } from '../env'; + +import processDeliver from './processors/deliver'; +import processInbox from './processors/inbox'; +import processDb from './processors/db/index'; +import procesObjectStorage from './processors/object-storage/index'; +import { queueLogger } from './logger'; +import { DriveFile } from '@/models/entities/drive-file'; +import { getJobInfo } from './get-job-info'; +import { systemQueue, dbQueue, deliverQueue, inboxQueue, objectStorageQueue } from './queues'; +import { ThinUser } from './types'; +import { IActivity } from '@/remote/activitypub/type'; + +function renderError(e: Error): any { + return { + stack: e?.stack, + message: e?.message, + name: e?.name + }; +} + +const systemLogger = queueLogger.createSubLogger('system'); +const deliverLogger = queueLogger.createSubLogger('deliver'); +const inboxLogger = queueLogger.createSubLogger('inbox'); +const dbLogger = queueLogger.createSubLogger('db'); +const objectStorageLogger = queueLogger.createSubLogger('objectStorage'); + +systemQueue + .on('waiting', (jobId) => systemLogger.debug(`waiting id=${jobId}`)) + .on('active', (job) => systemLogger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => systemLogger.debug(`completed(${result}) id=${job.id}`)) + .on('failed', (job, err) => systemLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) })) + .on('error', (job: any, err: Error) => systemLogger.error(`error ${err}`, { job, e: renderError(err) })) + .on('stalled', (job) => systemLogger.warn(`stalled id=${job.id}`)); + +deliverQueue + .on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`)) + .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`)) + .on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`, { job, e: renderError(err) })) + .on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`)); + +inboxQueue + .on('waiting', (jobId) => inboxLogger.debug(`waiting id=${jobId}`)) + .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`)) + .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`)) + .on('failed', (job, err) => inboxLogger.warn(`failed(${err}) ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`, { job, e: renderError(err) })) + .on('error', (job: any, err: Error) => inboxLogger.error(`error ${err}`, { job, e: renderError(err) })) + .on('stalled', (job) => inboxLogger.warn(`stalled ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`)); + +dbQueue + .on('waiting', (jobId) => dbLogger.debug(`waiting id=${jobId}`)) + .on('active', (job) => dbLogger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => dbLogger.debug(`completed(${result}) id=${job.id}`)) + .on('failed', (job, err) => dbLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) })) + .on('error', (job: any, err: Error) => dbLogger.error(`error ${err}`, { job, e: renderError(err) })) + .on('stalled', (job) => dbLogger.warn(`stalled id=${job.id}`)); + +objectStorageQueue + .on('waiting', (jobId) => objectStorageLogger.debug(`waiting id=${jobId}`)) + .on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`)) + .on('failed', (job, err) => objectStorageLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) })) + .on('error', (job: any, err: Error) => objectStorageLogger.error(`error ${err}`, { job, e: renderError(err) })) + .on('stalled', (job) => objectStorageLogger.warn(`stalled id=${job.id}`)); + +export function deliver(user: ThinUser, content: unknown, to: string | null) { + if (content == null) return null; + if (to == null) return null; + + const data = { + user: { + id: user.id + }, + content, + to + }; + + return deliverQueue.add(data, { + attempts: config.deliverJobMaxAttempts || 12, + timeout: 1 * 60 * 1000, // 1min + backoff: { + type: 'apBackoff' + }, + removeOnComplete: true, + removeOnFail: true + }); +} + +export function inbox(activity: IActivity, signature: httpSignature.IParsedSignature) { + const data = { + activity: activity, + signature + }; + + return inboxQueue.add(data, { + attempts: config.inboxJobMaxAttempts || 8, + timeout: 5 * 60 * 1000, // 5min + backoff: { + type: 'apBackoff' + }, + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createDeleteDriveFilesJob(user: ThinUser) { + return dbQueue.add('deleteDriveFiles', { + user: user + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createExportNotesJob(user: ThinUser) { + return dbQueue.add('exportNotes', { + user: user + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createExportFollowingJob(user: ThinUser) { + return dbQueue.add('exportFollowing', { + user: user + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createExportMuteJob(user: ThinUser) { + return dbQueue.add('exportMute', { + user: user + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createExportBlockingJob(user: ThinUser) { + return dbQueue.add('exportBlocking', { + user: user + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createExportUserListsJob(user: ThinUser) { + return dbQueue.add('exportUserLists', { + user: user + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createImportFollowingJob(user: ThinUser, fileId: DriveFile['id']) { + return dbQueue.add('importFollowing', { + user: user, + fileId: fileId + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createImportMutingJob(user: ThinUser, fileId: DriveFile['id']) { + return dbQueue.add('importMuting', { + user: user, + fileId: fileId + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createImportBlockingJob(user: ThinUser, fileId: DriveFile['id']) { + return dbQueue.add('importBlocking', { + user: user, + fileId: fileId + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createImportUserListsJob(user: ThinUser, fileId: DriveFile['id']) { + return dbQueue.add('importUserLists', { + user: user, + fileId: fileId + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createDeleteAccountJob(user: ThinUser, opts: { soft?: boolean; } = {}) { + return dbQueue.add('deleteAccount', { + user: user, + soft: opts.soft + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createDeleteObjectStorageFileJob(key: string) { + return objectStorageQueue.add('deleteFile', { + key: key + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createCleanRemoteFilesJob() { + return objectStorageQueue.add('cleanRemoteFiles', {}, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export default function() { + if (envOption.onlyServer) return; + + deliverQueue.process(config.deliverJobConcurrency || 128, processDeliver); + inboxQueue.process(config.inboxJobConcurrency || 16, processInbox); + processDb(dbQueue); + procesObjectStorage(objectStorageQueue); + + systemQueue.add('resyncCharts', { + }, { + repeat: { cron: '0 0 * * *' } + }); +} + +export function destroy() { + deliverQueue.once('cleaned', (jobs, status) => { + deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); + }); + deliverQueue.clean(0, 'delayed'); + + inboxQueue.once('cleaned', (jobs, status) => { + inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); + }); + inboxQueue.clean(0, 'delayed'); +} diff --git a/packages/backend/src/queue/initialize.ts b/packages/backend/src/queue/initialize.ts new file mode 100644 index 0000000000..31102a3ed2 --- /dev/null +++ b/packages/backend/src/queue/initialize.ts @@ -0,0 +1,33 @@ +import * as Bull from 'bull'; +import config from '@/config/index'; + +export function initialize(name: string, limitPerSec = -1) { + return new Bull(name, { + redis: { + port: config.redis.port, + host: config.redis.host, + password: config.redis.pass, + db: config.redis.db || 0, + }, + prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : 'queue', + limiter: limitPerSec > 0 ? { + max: limitPerSec, + duration: 1000 + } : undefined, + settings: { + backoffStrategies: { + apBackoff + } + } + }); +} + +// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019 +function apBackoff(attemptsMade: number, err: Error) { + const baseDelay = 60 * 1000; // 1min + const maxBackoff = 8 * 60 * 60 * 1000; // 8hours + let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay; + backoff = Math.min(backoff, maxBackoff); + backoff += Math.round(backoff * Math.random() * 0.2); + return backoff; +} diff --git a/packages/backend/src/queue/logger.ts b/packages/backend/src/queue/logger.ts new file mode 100644 index 0000000000..f789b9d079 --- /dev/null +++ b/packages/backend/src/queue/logger.ts @@ -0,0 +1,3 @@ +import Logger from '@/services/logger'; + +export const queueLogger = new Logger('queue', 'orange'); diff --git a/packages/backend/src/queue/processors/db/delete-account.ts b/packages/backend/src/queue/processors/db/delete-account.ts new file mode 100644 index 0000000000..e54f38e35e --- /dev/null +++ b/packages/backend/src/queue/processors/db/delete-account.ts @@ -0,0 +1,94 @@ +import * as Bull from 'bull'; +import { queueLogger } from '../../logger'; +import { DriveFiles, Notes, UserProfiles, Users } from '@/models/index'; +import { DbUserDeleteJobData } from '@/queue/types'; +import { Note } from '@/models/entities/note'; +import { DriveFile } from '@/models/entities/drive-file'; +import { MoreThan } from 'typeorm'; +import { deleteFileSync } from '@/services/drive/delete-file'; +import { sendEmail } from '@/services/send-email'; + +const logger = queueLogger.createSubLogger('delete-account'); + +export async function deleteAccount(job: Bull.Job): Promise { + logger.info(`Deleting account of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + return; + } + + { // Delete notes + let cursor: Note['id'] | null = null; + + while (true) { + const notes = await Notes.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 + } + }); + + if (notes.length === 0) { + break; + } + + cursor = notes[notes.length - 1].id; + + await Notes.delete(notes.map(note => note.id)); + } + + logger.succ(`All of notes deleted`); + } + + { // Delete files + let cursor: DriveFile['id'] | null = null; + + while (true) { + const files = await DriveFiles.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 10, + order: { + id: 1 + } + }); + + if (files.length === 0) { + break; + } + + cursor = files[files.length - 1].id; + + for (const file of files) { + await deleteFileSync(file); + } + } + + logger.succ(`All of files deleted`); + } + + { // Send email notification + const profile = await UserProfiles.findOneOrFail(user.id); + if (profile.email && profile.emailVerified) { + sendEmail(profile.email, 'Account deleted', + `Your account has been deleted.`, + `Your account has been deleted.`); + } + } + + // soft指定されている場合は物理削除しない + if (job.data.soft) { + // nop + } else { + await Users.delete(job.data.user.id); + } + + return 'Account deleted'; +} diff --git a/packages/backend/src/queue/processors/db/delete-drive-files.ts b/packages/backend/src/queue/processors/db/delete-drive-files.ts new file mode 100644 index 0000000000..8a28468b0d --- /dev/null +++ b/packages/backend/src/queue/processors/db/delete-drive-files.ts @@ -0,0 +1,56 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import { deleteFileSync } from '@/services/drive/delete-file'; +import { Users, DriveFiles } from '@/models/index'; +import { MoreThan } from 'typeorm'; +import { DbUserJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('delete-drive-files'); + +export async function deleteDriveFiles(job: Bull.Job, done: any): Promise { + logger.info(`Deleting drive files of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + let deletedCount = 0; + let cursor: any = null; + + while (true) { + const files = await DriveFiles.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 + } + }); + + if (files.length === 0) { + job.progress(100); + break; + } + + cursor = files[files.length - 1].id; + + for (const file of files) { + await deleteFileSync(file); + deletedCount++; + } + + const total = await DriveFiles.count({ + userId: user.id, + }); + + job.progress(deletedCount / total); + } + + logger.succ(`All drive files (${deletedCount}) of ${user.id} has been deleted.`); + done(); +} diff --git a/packages/backend/src/queue/processors/db/export-blocking.ts b/packages/backend/src/queue/processors/db/export-blocking.ts new file mode 100644 index 0000000000..8b8aa259d4 --- /dev/null +++ b/packages/backend/src/queue/processors/db/export-blocking.ts @@ -0,0 +1,94 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; + +import { queueLogger } from '../../logger'; +import addFile from '@/services/drive/add-file'; +import * as dateFormat from 'dateformat'; +import { getFullApAccount } from '@/misc/convert-host'; +import { Users, Blockings } from '@/models/index'; +import { MoreThan } from 'typeorm'; +import { DbUserJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('export-blocking'); + +export async function exportBlocking(job: Bull.Job, done: any): Promise { + logger.info(`Exporting blocking of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + const stream = fs.createWriteStream(path, { flags: 'a' }); + + let exportedCount = 0; + let cursor: any = null; + + while (true) { + const blockings = await Blockings.find({ + where: { + blockerId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 + } + }); + + if (blockings.length === 0) { + job.progress(100); + break; + } + + cursor = blockings[blockings.length - 1].id; + + for (const block of blockings) { + const u = await Users.findOne({ id: block.blockeeId }); + if (u == null) { + exportedCount++; continue; + } + + const content = getFullApAccount(u.username, u.host); + await new Promise((res, rej) => { + stream.write(content + '\n', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + exportedCount++; + } + + const total = await Blockings.count({ + blockerId: user.id, + }); + + job.progress(exportedCount / total); + } + + stream.end(); + logger.succ(`Exported to: ${path}`); + + const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; + const driveFile = await addFile(user, path, fileName, null, null, true); + + logger.succ(`Exported to: ${driveFile.id}`); + cleanup(); + done(); +} diff --git a/packages/backend/src/queue/processors/db/export-following.ts b/packages/backend/src/queue/processors/db/export-following.ts new file mode 100644 index 0000000000..a0ecf5f560 --- /dev/null +++ b/packages/backend/src/queue/processors/db/export-following.ts @@ -0,0 +1,94 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; + +import { queueLogger } from '../../logger'; +import addFile from '@/services/drive/add-file'; +import * as dateFormat from 'dateformat'; +import { getFullApAccount } from '@/misc/convert-host'; +import { Users, Followings } from '@/models/index'; +import { MoreThan } from 'typeorm'; +import { DbUserJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('export-following'); + +export async function exportFollowing(job: Bull.Job, done: any): Promise { + logger.info(`Exporting following of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + const stream = fs.createWriteStream(path, { flags: 'a' }); + + let exportedCount = 0; + let cursor: any = null; + + while (true) { + const followings = await Followings.find({ + where: { + followerId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 + } + }); + + if (followings.length === 0) { + job.progress(100); + break; + } + + cursor = followings[followings.length - 1].id; + + for (const following of followings) { + const u = await Users.findOne({ id: following.followeeId }); + if (u == null) { + exportedCount++; continue; + } + + const content = getFullApAccount(u.username, u.host); + await new Promise((res, rej) => { + stream.write(content + '\n', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + exportedCount++; + } + + const total = await Followings.count({ + followerId: user.id, + }); + + job.progress(exportedCount / total); + } + + stream.end(); + logger.succ(`Exported to: ${path}`); + + const fileName = 'following-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; + const driveFile = await addFile(user, path, fileName, null, null, true); + + logger.succ(`Exported to: ${driveFile.id}`); + cleanup(); + done(); +} diff --git a/packages/backend/src/queue/processors/db/export-mute.ts b/packages/backend/src/queue/processors/db/export-mute.ts new file mode 100644 index 0000000000..d5976f7d56 --- /dev/null +++ b/packages/backend/src/queue/processors/db/export-mute.ts @@ -0,0 +1,94 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; + +import { queueLogger } from '../../logger'; +import addFile from '@/services/drive/add-file'; +import * as dateFormat from 'dateformat'; +import { getFullApAccount } from '@/misc/convert-host'; +import { Users, Mutings } from '@/models/index'; +import { MoreThan } from 'typeorm'; +import { DbUserJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('export-mute'); + +export async function exportMute(job: Bull.Job, done: any): Promise { + logger.info(`Exporting mute of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + const stream = fs.createWriteStream(path, { flags: 'a' }); + + let exportedCount = 0; + let cursor: any = null; + + while (true) { + const mutes = await Mutings.find({ + where: { + muterId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 + } + }); + + if (mutes.length === 0) { + job.progress(100); + break; + } + + cursor = mutes[mutes.length - 1].id; + + for (const mute of mutes) { + const u = await Users.findOne({ id: mute.muteeId }); + if (u == null) { + exportedCount++; continue; + } + + const content = getFullApAccount(u.username, u.host); + await new Promise((res, rej) => { + stream.write(content + '\n', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + exportedCount++; + } + + const total = await Mutings.count({ + muterId: user.id, + }); + + job.progress(exportedCount / total); + } + + stream.end(); + logger.succ(`Exported to: ${path}`); + + const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; + const driveFile = await addFile(user, path, fileName, null, null, true); + + logger.succ(`Exported to: ${driveFile.id}`); + cleanup(); + done(); +} diff --git a/packages/backend/src/queue/processors/db/export-notes.ts b/packages/backend/src/queue/processors/db/export-notes.ts new file mode 100644 index 0000000000..49850aa706 --- /dev/null +++ b/packages/backend/src/queue/processors/db/export-notes.ts @@ -0,0 +1,133 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; + +import { queueLogger } from '../../logger'; +import addFile from '@/services/drive/add-file'; +import * as dateFormat from 'dateformat'; +import { Users, Notes, Polls } from '@/models/index'; +import { MoreThan } from 'typeorm'; +import { Note } from '@/models/entities/note'; +import { Poll } from '@/models/entities/poll'; +import { DbUserJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('export-notes'); + +export async function exportNotes(job: Bull.Job, done: any): Promise { + logger.info(`Exporting notes of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + const stream = fs.createWriteStream(path, { flags: 'a' }); + + await new Promise((res, rej) => { + stream.write('[', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + + let exportedNotesCount = 0; + let cursor: any = null; + + while (true) { + const notes = await Notes.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 + } + }); + + if (notes.length === 0) { + job.progress(100); + break; + } + + cursor = notes[notes.length - 1].id; + + for (const note of notes) { + let poll: Poll | undefined; + if (note.hasPoll) { + poll = await Polls.findOneOrFail({ noteId: note.id }); + } + const content = JSON.stringify(serialize(note, poll)); + await new Promise((res, rej) => { + stream.write(exportedNotesCount === 0 ? content : ',\n' + content, err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + exportedNotesCount++; + } + + const total = await Notes.count({ + userId: user.id, + }); + + job.progress(exportedNotesCount / total); + } + + await new Promise((res, rej) => { + stream.write(']', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + + stream.end(); + logger.succ(`Exported to: ${path}`); + + const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.json'; + const driveFile = await addFile(user, path, fileName, null, null, true); + + logger.succ(`Exported to: ${driveFile.id}`); + cleanup(); + done(); +} + +function serialize(note: Note, poll: Poll | null = null): any { + return { + id: note.id, + text: note.text, + createdAt: note.createdAt, + fileIds: note.fileIds, + replyId: note.replyId, + renoteId: note.renoteId, + poll: poll, + cw: note.cw, + viaMobile: note.viaMobile, + visibility: note.visibility, + visibleUserIds: note.visibleUserIds, + localOnly: note.localOnly + }; +} diff --git a/packages/backend/src/queue/processors/db/export-user-lists.ts b/packages/backend/src/queue/processors/db/export-user-lists.ts new file mode 100644 index 0000000000..8a86c4df5d --- /dev/null +++ b/packages/backend/src/queue/processors/db/export-user-lists.ts @@ -0,0 +1,71 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; + +import { queueLogger } from '../../logger'; +import addFile from '@/services/drive/add-file'; +import * as dateFormat from 'dateformat'; +import { getFullApAccount } from '@/misc/convert-host'; +import { Users, UserLists, UserListJoinings } from '@/models/index'; +import { In } from 'typeorm'; +import { DbUserJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('export-user-lists'); + +export async function exportUserLists(job: Bull.Job, done: any): Promise { + logger.info(`Exporting user lists of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + const lists = await UserLists.find({ + userId: user.id + }); + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + const stream = fs.createWriteStream(path, { flags: 'a' }); + + for (const list of lists) { + const joinings = await UserListJoinings.find({ userListId: list.id }); + const users = await Users.find({ + id: In(joinings.map(j => j.userId)) + }); + + for (const u of users) { + const acct = getFullApAccount(u.username, u.host); + const content = `${list.name},${acct}`; + await new Promise((res, rej) => { + stream.write(content + '\n', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + } + } + + stream.end(); + logger.succ(`Exported to: ${path}`); + + const fileName = 'user-lists-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; + const driveFile = await addFile(user, path, fileName, null, null, true); + + logger.succ(`Exported to: ${driveFile.id}`); + cleanup(); + done(); +} diff --git a/packages/backend/src/queue/processors/db/import-blocking.ts b/packages/backend/src/queue/processors/db/import-blocking.ts new file mode 100644 index 0000000000..2e77107034 --- /dev/null +++ b/packages/backend/src/queue/processors/db/import-blocking.ts @@ -0,0 +1,74 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import * as Acct from 'misskey-js/built/acct'; +import { resolveUser } from '@/remote/resolve-user'; +import { downloadTextFile } from '@/misc/download-text-file'; +import { isSelfHost, toPuny } from '@/misc/convert-host'; +import { Users, DriveFiles, Blockings } from '@/models/index'; +import { DbUserImportJobData } from '@/queue/types'; +import block from '@/services/blocking/create'; + +const logger = queueLogger.createSubLogger('import-blocking'); + +export async function importBlocking(job: Bull.Job, done: any): Promise { + logger.info(`Importing blocking of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + const file = await DriveFiles.findOne({ + id: job.data.fileId + }); + if (file == null) { + done(); + return; + } + + const csv = await downloadTextFile(file.url); + + let linenum = 0; + + for (const line of csv.trim().split('\n')) { + linenum++; + + try { + const acct = line.split(',')[0].trim(); + const { username, host } = Acct.parse(acct); + + let target = isSelfHost(host!) ? await Users.findOne({ + host: null, + usernameLower: username.toLowerCase() + }) : await Users.findOne({ + host: toPuny(host!), + usernameLower: username.toLowerCase() + }); + + if (host == null && target == null) continue; + + if (target == null) { + target = await resolveUser(username, host); + } + + if (target == null) { + throw `cannot resolve user: @${username}@${host}`; + } + + // skip myself + if (target.id === job.data.user.id) continue; + + logger.info(`Block[${linenum}] ${target.id} ...`); + + await block(user, target); + } catch (e) { + logger.warn(`Error in line:${linenum} ${e}`); + } + } + + logger.succ('Imported'); + done(); +} + diff --git a/packages/backend/src/queue/processors/db/import-following.ts b/packages/backend/src/queue/processors/db/import-following.ts new file mode 100644 index 0000000000..2bd079e4bc --- /dev/null +++ b/packages/backend/src/queue/processors/db/import-following.ts @@ -0,0 +1,73 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import follow from '@/services/following/create'; +import * as Acct from 'misskey-js/built/acct'; +import { resolveUser } from '@/remote/resolve-user'; +import { downloadTextFile } from '@/misc/download-text-file'; +import { isSelfHost, toPuny } from '@/misc/convert-host'; +import { Users, DriveFiles } from '@/models/index'; +import { DbUserImportJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('import-following'); + +export async function importFollowing(job: Bull.Job, done: any): Promise { + logger.info(`Importing following of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + const file = await DriveFiles.findOne({ + id: job.data.fileId + }); + if (file == null) { + done(); + return; + } + + const csv = await downloadTextFile(file.url); + + let linenum = 0; + + for (const line of csv.trim().split('\n')) { + linenum++; + + try { + const acct = line.split(',')[0].trim(); + const { username, host } = Acct.parse(acct); + + let target = isSelfHost(host!) ? await Users.findOne({ + host: null, + usernameLower: username.toLowerCase() + }) : await Users.findOne({ + host: toPuny(host!), + usernameLower: username.toLowerCase() + }); + + if (host == null && target == null) continue; + + if (target == null) { + target = await resolveUser(username, host); + } + + if (target == null) { + throw `cannot resolve user: @${username}@${host}`; + } + + // skip myself + if (target.id === job.data.user.id) continue; + + logger.info(`Follow[${linenum}] ${target.id} ...`); + + follow(user, target); + } catch (e) { + logger.warn(`Error in line:${linenum} ${e}`); + } + } + + logger.succ('Imported'); + done(); +} diff --git a/packages/backend/src/queue/processors/db/import-muting.ts b/packages/backend/src/queue/processors/db/import-muting.ts new file mode 100644 index 0000000000..8060980625 --- /dev/null +++ b/packages/backend/src/queue/processors/db/import-muting.ts @@ -0,0 +1,83 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import * as Acct from 'misskey-js/built/acct'; +import { resolveUser } from '@/remote/resolve-user'; +import { downloadTextFile } from '@/misc/download-text-file'; +import { isSelfHost, toPuny } from '@/misc/convert-host'; +import { Users, DriveFiles, Mutings } from '@/models/index'; +import { DbUserImportJobData } from '@/queue/types'; +import { User } from '@/models/entities/user'; +import { genId } from '@/misc/gen-id'; + +const logger = queueLogger.createSubLogger('import-muting'); + +export async function importMuting(job: Bull.Job, done: any): Promise { + logger.info(`Importing muting of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + const file = await DriveFiles.findOne({ + id: job.data.fileId + }); + if (file == null) { + done(); + return; + } + + const csv = await downloadTextFile(file.url); + + let linenum = 0; + + for (const line of csv.trim().split('\n')) { + linenum++; + + try { + const acct = line.split(',')[0].trim(); + const { username, host } = Acct.parse(acct); + + let target = isSelfHost(host!) ? await Users.findOne({ + host: null, + usernameLower: username.toLowerCase() + }) : await Users.findOne({ + host: toPuny(host!), + usernameLower: username.toLowerCase() + }); + + if (host == null && target == null) continue; + + if (target == null) { + target = await resolveUser(username, host); + } + + if (target == null) { + throw `cannot resolve user: @${username}@${host}`; + } + + // skip myself + if (target.id === job.data.user.id) continue; + + logger.info(`Mute[${linenum}] ${target.id} ...`); + + await mute(user, target); + } catch (e) { + logger.warn(`Error in line:${linenum} ${e}`); + } + } + + logger.succ('Imported'); + done(); +} + +async function mute(user: User, target: User) { + await Mutings.insert({ + id: genId(), + createdAt: new Date(), + muterId: user.id, + muteeId: target.id, + }); +} diff --git a/packages/backend/src/queue/processors/db/import-user-lists.ts b/packages/backend/src/queue/processors/db/import-user-lists.ts new file mode 100644 index 0000000000..46b728b387 --- /dev/null +++ b/packages/backend/src/queue/processors/db/import-user-lists.ts @@ -0,0 +1,80 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import * as Acct from 'misskey-js/built/acct'; +import { resolveUser } from '@/remote/resolve-user'; +import { pushUserToUserList } from '@/services/user-list/push'; +import { downloadTextFile } from '@/misc/download-text-file'; +import { isSelfHost, toPuny } from '@/misc/convert-host'; +import { DriveFiles, Users, UserLists, UserListJoinings } from '@/models/index'; +import { genId } from '@/misc/gen-id'; +import { DbUserImportJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('import-user-lists'); + +export async function importUserLists(job: Bull.Job, done: any): Promise { + logger.info(`Importing user lists of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + const file = await DriveFiles.findOne({ + id: job.data.fileId + }); + if (file == null) { + done(); + return; + } + + const csv = await downloadTextFile(file.url); + + let linenum = 0; + + for (const line of csv.trim().split('\n')) { + linenum++; + + try { + const listName = line.split(',')[0].trim(); + const { username, host } = Acct.parse(line.split(',')[1].trim()); + + let list = await UserLists.findOne({ + userId: user.id, + name: listName + }); + + if (list == null) { + list = await UserLists.save({ + id: genId(), + createdAt: new Date(), + userId: user.id, + name: listName, + userIds: [] + }); + } + + let target = isSelfHost(host!) ? await Users.findOne({ + host: null, + usernameLower: username.toLowerCase() + }) : await Users.findOne({ + host: toPuny(host!), + usernameLower: username.toLowerCase() + }); + + if (target == null) { + target = await resolveUser(username, host); + } + + if (await UserListJoinings.findOne({ userListId: list.id, userId: target.id }) != null) continue; + + pushUserToUserList(target, list); + } catch (e) { + logger.warn(`Error in line:${linenum} ${e}`); + } + } + + logger.succ('Imported'); + done(); +} diff --git a/packages/backend/src/queue/processors/db/index.ts b/packages/backend/src/queue/processors/db/index.ts new file mode 100644 index 0000000000..97087642b7 --- /dev/null +++ b/packages/backend/src/queue/processors/db/index.ts @@ -0,0 +1,33 @@ +import * as Bull from 'bull'; +import { DbJobData } from '@/queue/types'; +import { deleteDriveFiles } from './delete-drive-files'; +import { exportNotes } from './export-notes'; +import { exportFollowing } from './export-following'; +import { exportMute } from './export-mute'; +import { exportBlocking } from './export-blocking'; +import { exportUserLists } from './export-user-lists'; +import { importFollowing } from './import-following'; +import { importUserLists } from './import-user-lists'; +import { deleteAccount } from './delete-account'; +import { importMuting } from './import-muting'; +import { importBlocking } from './import-blocking'; + +const jobs = { + deleteDriveFiles, + exportNotes, + exportFollowing, + exportMute, + exportBlocking, + exportUserLists, + importFollowing, + importMuting, + importBlocking, + importUserLists, + deleteAccount, +} as Record | Bull.ProcessPromiseFunction>; + +export default function(dbQueue: Bull.Queue) { + for (const [k, v] of Object.entries(jobs)) { + dbQueue.process(k, v); + } +} diff --git a/packages/backend/src/queue/processors/deliver.ts b/packages/backend/src/queue/processors/deliver.ts new file mode 100644 index 0000000000..3c61896a2f --- /dev/null +++ b/packages/backend/src/queue/processors/deliver.ts @@ -0,0 +1,94 @@ +import { URL } from 'url'; +import * as Bull from 'bull'; +import request from '@/remote/activitypub/request'; +import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc'; +import Logger from '@/services/logger'; +import { Instances } from '@/models/index'; +import { instanceChart } from '@/services/chart/index'; +import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata'; +import { fetchMeta } from '@/misc/fetch-meta'; +import { toPuny } from '@/misc/convert-host'; +import { Cache } from '@/misc/cache'; +import { Instance } from '@/models/entities/instance'; +import { DeliverJobData } from '../types'; +import { StatusError } from '@/misc/fetch'; + +const logger = new Logger('deliver'); + +let latest: string | null = null; + +const suspendedHostsCache = new Cache(1000 * 60 * 60); + +export default async (job: Bull.Job) => { + const { host } = new URL(job.data.to); + + // ブロックしてたら中断 + const meta = await fetchMeta(); + if (meta.blockedHosts.includes(toPuny(host))) { + return 'skip (blocked)'; + } + + // isSuspendedなら中断 + let suspendedHosts = suspendedHostsCache.get(null); + if (suspendedHosts == null) { + suspendedHosts = await Instances.find({ + where: { + isSuspended: true + }, + }); + suspendedHostsCache.set(null, suspendedHosts); + } + if (suspendedHosts.map(x => x.host).includes(toPuny(host))) { + return 'skip (suspended)'; + } + + try { + if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) { + logger.debug(`delivering ${latest}`); + } + + await request(job.data.user, job.data.to, job.data.content); + + // Update stats + registerOrFetchInstanceDoc(host).then(i => { + Instances.update(i.id, { + latestRequestSentAt: new Date(), + latestStatus: 200, + lastCommunicatedAt: new Date(), + isNotResponding: false + }); + + fetchInstanceMetadata(i); + + instanceChart.requestSent(i.host, true); + }); + + return 'Success'; + } catch (res) { + // Update stats + registerOrFetchInstanceDoc(host).then(i => { + Instances.update(i.id, { + latestRequestSentAt: new Date(), + latestStatus: res instanceof StatusError ? res.statusCode : null, + isNotResponding: true + }); + + instanceChart.requestSent(i.host, false); + }); + + if (res instanceof StatusError) { + // 4xx + if (res.isClientError) { + // HTTPステータスコード4xxはクライアントエラーであり、それはつまり + // 何回再送しても成功することはないということなのでエラーにはしないでおく + return `${res.statusCode} ${res.statusMessage}`; + } + + // 5xx etc. + throw `${res.statusCode} ${res.statusMessage}`; + } else { + // DNS error, socket error, timeout ... + throw res; + } + } +}; diff --git a/packages/backend/src/queue/processors/inbox.ts b/packages/backend/src/queue/processors/inbox.ts new file mode 100644 index 0000000000..4032ce8653 --- /dev/null +++ b/packages/backend/src/queue/processors/inbox.ts @@ -0,0 +1,149 @@ +import { URL } from 'url'; +import * as Bull from 'bull'; +import * as httpSignature from 'http-signature'; +import perform from '@/remote/activitypub/perform'; +import Logger from '@/services/logger'; +import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc'; +import { Instances } from '@/models/index'; +import { instanceChart } from '@/services/chart/index'; +import { fetchMeta } from '@/misc/fetch-meta'; +import { toPuny, extractDbHost } from '@/misc/convert-host'; +import { getApId } from '@/remote/activitypub/type'; +import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata'; +import { InboxJobData } from '../types'; +import DbResolver from '@/remote/activitypub/db-resolver'; +import { resolvePerson } from '@/remote/activitypub/models/person'; +import { LdSignature } from '@/remote/activitypub/misc/ld-signature'; +import { StatusError } from '@/misc/fetch'; + +const logger = new Logger('inbox'); + +// ユーザーのinboxにアクティビティが届いた時の処理 +export default async (job: Bull.Job): Promise => { + const signature = job.data.signature; // HTTP-signature + const activity = job.data.activity; + + //#region Log + const info = Object.assign({}, activity) as any; + delete info['@context']; + logger.debug(JSON.stringify(info, null, 2)); + //#endregion + + const host = toPuny(new URL(signature.keyId).hostname); + + // ブロックしてたら中断 + const meta = await fetchMeta(); + if (meta.blockedHosts.includes(host)) { + return `Blocked request: ${host}`; + } + + const keyIdLower = signature.keyId.toLowerCase(); + if (keyIdLower.startsWith('acct:')) { + return `Old keyId is no longer supported. ${keyIdLower}`; + } + + // TDOO: キャッシュ + const dbResolver = new DbResolver(); + + // HTTP-Signature keyIdを元にDBから取得 + let authUser = await dbResolver.getAuthUserFromKeyId(signature.keyId); + + // keyIdでわからなければ、activity.actorを元にDBから取得 || activity.actorを元にリモートから取得 + if (authUser == null) { + try { + authUser = await dbResolver.getAuthUserFromApId(getApId(activity.actor)); + } catch (e) { + // 対象が4xxならスキップ + if (e instanceof StatusError && e.isClientError) { + return `skip: Ignored deleted actors on both ends ${activity.actor} - ${e.statusCode}`; + } + throw `Error in actor ${activity.actor} - ${e.statusCode || e}`; + } + } + + // それでもわからなければ終了 + if (authUser == null) { + return `skip: failed to resolve user`; + } + + // publicKey がなくても終了 + if (authUser.key == null) { + return `skip: failed to resolve user publicKey`; + } + + // HTTP-Signatureの検証 + const httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem); + + // また、signatureのsignerは、activity.actorと一致する必要がある + if (!httpSignatureValidated || authUser.user.uri !== activity.actor) { + // 一致しなくても、でもLD-Signatureがありそうならそっちも見る + if (activity.signature) { + if (activity.signature.type !== 'RsaSignature2017') { + return `skip: unsupported LD-signature type ${activity.signature.type}`; + } + + // activity.signature.creator: https://example.oom/users/user#main-key + // みたいになっててUserを引っ張れば公開キーも入ることを期待する + if (activity.signature.creator) { + const candicate = activity.signature.creator.replace(/#.*/, ''); + await resolvePerson(candicate).catch(() => null); + } + + // keyIdからLD-Signatureのユーザーを取得 + authUser = await dbResolver.getAuthUserFromKeyId(activity.signature.creator); + if (authUser == null) { + return `skip: LD-Signatureのユーザーが取得できませんでした`; + } + + if (authUser.key == null) { + return `skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした`; + } + + // LD-Signature検証 + const ldSignature = new LdSignature(); + const verified = await ldSignature.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false); + if (!verified) { + return `skip: LD-Signatureの検証に失敗しました`; + } + + // もう一度actorチェック + if (authUser.user.uri !== activity.actor) { + return `skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`; + } + + // ブロックしてたら中断 + const ldHost = extractDbHost(authUser.user.uri); + if (meta.blockedHosts.includes(ldHost)) { + return `Blocked request: ${ldHost}`; + } + } else { + return `skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`; + } + } + + // activity.idがあればホストが署名者のホストであることを確認する + if (typeof activity.id === 'string') { + const signerHost = extractDbHost(authUser.user.uri!); + const activityIdHost = extractDbHost(activity.id); + if (signerHost !== activityIdHost) { + return `skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`; + } + } + + // Update stats + registerOrFetchInstanceDoc(authUser.user.host).then(i => { + Instances.update(i.id, { + latestRequestReceivedAt: new Date(), + lastCommunicatedAt: new Date(), + isNotResponding: false + }); + + fetchInstanceMetadata(i); + + instanceChart.requestReceived(i.host); + }); + + // アクティビティを処理 + await perform(authUser.user, activity); + return `ok`; +}; diff --git a/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts b/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts new file mode 100644 index 0000000000..3b2e4ea939 --- /dev/null +++ b/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts @@ -0,0 +1,50 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import { deleteFileSync } from '@/services/drive/delete-file'; +import { DriveFiles } from '@/models/index'; +import { MoreThan, Not, IsNull } from 'typeorm'; + +const logger = queueLogger.createSubLogger('clean-remote-files'); + +export default async function cleanRemoteFiles(job: Bull.Job<{}>, done: any): Promise { + logger.info(`Deleting cached remote files...`); + + let deletedCount = 0; + let cursor: any = null; + + while (true) { + const files = await DriveFiles.find({ + where: { + userHost: Not(IsNull()), + isLink: false, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 8, + order: { + id: 1 + } + }); + + if (files.length === 0) { + job.progress(100); + break; + } + + cursor = files[files.length - 1].id; + + await Promise.all(files.map(file => deleteFileSync(file, true))); + + deletedCount += 8; + + const total = await DriveFiles.count({ + userHost: Not(IsNull()), + isLink: false, + }); + + job.progress(deletedCount / total); + } + + logger.succ(`All cahced remote files has been deleted.`); + done(); +} diff --git a/packages/backend/src/queue/processors/object-storage/delete-file.ts b/packages/backend/src/queue/processors/object-storage/delete-file.ts new file mode 100644 index 0000000000..ed22968a27 --- /dev/null +++ b/packages/backend/src/queue/processors/object-storage/delete-file.ts @@ -0,0 +1,11 @@ +import { ObjectStorageFileJobData } from '@/queue/types'; +import * as Bull from 'bull'; +import { deleteObjectStorageFile } from '@/services/drive/delete-file'; + +export default async (job: Bull.Job) => { + const key: string = job.data.key; + + await deleteObjectStorageFile(key); + + return 'Success'; +}; diff --git a/packages/backend/src/queue/processors/object-storage/index.ts b/packages/backend/src/queue/processors/object-storage/index.ts new file mode 100644 index 0000000000..0d9570e179 --- /dev/null +++ b/packages/backend/src/queue/processors/object-storage/index.ts @@ -0,0 +1,15 @@ +import * as Bull from 'bull'; +import { ObjectStorageJobData } from '@/queue/types'; +import deleteFile from './delete-file'; +import cleanRemoteFiles from './clean-remote-files'; + +const jobs = { + deleteFile, + cleanRemoteFiles, +} as Record | Bull.ProcessPromiseFunction>; + +export default function(q: Bull.Queue) { + for (const [k, v] of Object.entries(jobs)) { + q.process(k, 16, v); + } +} diff --git a/packages/backend/src/queue/processors/system/index.ts b/packages/backend/src/queue/processors/system/index.ts new file mode 100644 index 0000000000..52b7868105 --- /dev/null +++ b/packages/backend/src/queue/processors/system/index.ts @@ -0,0 +1,12 @@ +import * as Bull from 'bull'; +import { resyncCharts } from './resync-charts'; + +const jobs = { + resyncCharts, +} as Record | Bull.ProcessPromiseFunction<{}>>; + +export default function(dbQueue: Bull.Queue<{}>) { + for (const [k, v] of Object.entries(jobs)) { + dbQueue.process(k, v); + } +} diff --git a/packages/backend/src/queue/processors/system/resync-charts.ts b/packages/backend/src/queue/processors/system/resync-charts.ts new file mode 100644 index 0000000000..b36b024cfb --- /dev/null +++ b/packages/backend/src/queue/processors/system/resync-charts.ts @@ -0,0 +1,21 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import { driveChart, notesChart, usersChart } from '@/services/chart/index'; + +const logger = queueLogger.createSubLogger('resync-charts'); + +export default async function resyncCharts(job: Bull.Job<{}>, done: any): Promise { + logger.info(`Resync charts...`); + + // TODO: ユーザーごとのチャートも更新する + // TODO: インスタンスごとのチャートも更新する + await Promise.all([ + driveChart.resync(), + notesChart.resync(), + usersChart.resync(), + ]); + + logger.succ(`All charts successfully resynced.`); + done(); +} diff --git a/packages/backend/src/queue/queues.ts b/packages/backend/src/queue/queues.ts new file mode 100644 index 0000000000..a66a7ca451 --- /dev/null +++ b/packages/backend/src/queue/queues.ts @@ -0,0 +1,9 @@ +import config from '@/config/index'; +import { initialize as initializeQueue } from './initialize'; +import { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData } from './types'; + +export const systemQueue = initializeQueue<{}>('system'); +export const deliverQueue = initializeQueue('deliver', config.deliverJobPerSec || 128); +export const inboxQueue = initializeQueue('inbox', config.inboxJobPerSec || 16); +export const dbQueue = initializeQueue('db'); +export const objectStorageQueue = initializeQueue('objectStorage'); diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts new file mode 100644 index 0000000000..39cab29966 --- /dev/null +++ b/packages/backend/src/queue/types.ts @@ -0,0 +1,44 @@ +import { DriveFile } from '@/models/entities/drive-file'; +import { User } from '@/models/entities/user'; +import { IActivity } from '@/remote/activitypub/type'; +import * as httpSignature from 'http-signature'; + +export type DeliverJobData = { + /** Actor */ + user: ThinUser; + /** Activity */ + content: unknown; + /** inbox URL to deliver */ + to: string; +}; + +export type InboxJobData = { + activity: IActivity; + signature: httpSignature.IParsedSignature; +}; + +export type DbJobData = DbUserJobData | DbUserImportJobData | DbUserDeleteJobData; + +export type DbUserJobData = { + user: ThinUser; +}; + +export type DbUserDeleteJobData = { + user: ThinUser; + soft?: boolean; +}; + +export type DbUserImportJobData = { + user: ThinUser; + fileId: DriveFile['id']; +}; + +export type ObjectStorageJobData = ObjectStorageFileJobData | {}; + +export type ObjectStorageFileJobData = { + key: string; +}; + +export type ThinUser = { + id: User['id']; +}; -- cgit v1.2.3-freya