From 0e4a111f81cceed275d9bec2695f6e401fb654d8 Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 12 Nov 2021 02:02:25 +0900 Subject: refactoring Resolve #7779 --- packages/backend/src/queue/get-job-info.ts | 15 ++ packages/backend/src/queue/index.ts | 255 +++++++++++++++++++++ packages/backend/src/queue/initialize.ts | 33 +++ packages/backend/src/queue/logger.ts | 3 + .../src/queue/processors/db/delete-account.ts | 94 ++++++++ .../src/queue/processors/db/delete-drive-files.ts | 56 +++++ .../src/queue/processors/db/export-blocking.ts | 94 ++++++++ .../src/queue/processors/db/export-following.ts | 94 ++++++++ .../backend/src/queue/processors/db/export-mute.ts | 94 ++++++++ .../src/queue/processors/db/export-notes.ts | 133 +++++++++++ .../src/queue/processors/db/export-user-lists.ts | 71 ++++++ .../src/queue/processors/db/import-blocking.ts | 74 ++++++ .../src/queue/processors/db/import-following.ts | 73 ++++++ .../src/queue/processors/db/import-muting.ts | 83 +++++++ .../src/queue/processors/db/import-user-lists.ts | 80 +++++++ packages/backend/src/queue/processors/db/index.ts | 33 +++ packages/backend/src/queue/processors/deliver.ts | 94 ++++++++ packages/backend/src/queue/processors/inbox.ts | 149 ++++++++++++ .../object-storage/clean-remote-files.ts | 50 ++++ .../queue/processors/object-storage/delete-file.ts | 11 + .../src/queue/processors/object-storage/index.ts | 15 ++ .../backend/src/queue/processors/system/index.ts | 12 + .../src/queue/processors/system/resync-charts.ts | 21 ++ packages/backend/src/queue/queues.ts | 9 + packages/backend/src/queue/types.ts | 44 ++++ 25 files changed, 1690 insertions(+) create mode 100644 packages/backend/src/queue/get-job-info.ts create mode 100644 packages/backend/src/queue/index.ts create mode 100644 packages/backend/src/queue/initialize.ts create mode 100644 packages/backend/src/queue/logger.ts create mode 100644 packages/backend/src/queue/processors/db/delete-account.ts create mode 100644 packages/backend/src/queue/processors/db/delete-drive-files.ts create mode 100644 packages/backend/src/queue/processors/db/export-blocking.ts create mode 100644 packages/backend/src/queue/processors/db/export-following.ts create mode 100644 packages/backend/src/queue/processors/db/export-mute.ts create mode 100644 packages/backend/src/queue/processors/db/export-notes.ts create mode 100644 packages/backend/src/queue/processors/db/export-user-lists.ts create mode 100644 packages/backend/src/queue/processors/db/import-blocking.ts create mode 100644 packages/backend/src/queue/processors/db/import-following.ts create mode 100644 packages/backend/src/queue/processors/db/import-muting.ts create mode 100644 packages/backend/src/queue/processors/db/import-user-lists.ts create mode 100644 packages/backend/src/queue/processors/db/index.ts create mode 100644 packages/backend/src/queue/processors/deliver.ts create mode 100644 packages/backend/src/queue/processors/inbox.ts create mode 100644 packages/backend/src/queue/processors/object-storage/clean-remote-files.ts create mode 100644 packages/backend/src/queue/processors/object-storage/delete-file.ts create mode 100644 packages/backend/src/queue/processors/object-storage/index.ts create mode 100644 packages/backend/src/queue/processors/system/index.ts create mode 100644 packages/backend/src/queue/processors/system/resync-charts.ts create mode 100644 packages/backend/src/queue/queues.ts create mode 100644 packages/backend/src/queue/types.ts (limited to 'packages/backend/src/queue') diff --git a/packages/backend/src/queue/get-job-info.ts b/packages/backend/src/queue/get-job-info.ts new file mode 100644 index 0000000000..f601ae62d0 --- /dev/null +++ b/packages/backend/src/queue/get-job-info.ts @@ -0,0 +1,15 @@ +import * as Bull from 'bull'; + +export function getJobInfo(job: Bull.Job, increment = false) { + const age = Date.now() - job.timestamp; + + const formated = age > 60000 ? `${Math.floor(age / 1000 / 60)}m` + : age > 10000 ? `${Math.floor(age / 1000)}s` + : `${age}ms`; + + // onActiveとかonCompletedのattemptsMadeがなぜか0始まりなのでインクリメントする + const currentAttempts = job.attemptsMade + (increment ? 1 : 0); + const maxAttempts = job.opts ? job.opts.attempts : 0; + + return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`; +} diff --git a/packages/backend/src/queue/index.ts b/packages/backend/src/queue/index.ts new file mode 100644 index 0000000000..37eb809604 --- /dev/null +++ b/packages/backend/src/queue/index.ts @@ -0,0 +1,255 @@ +import * as httpSignature from 'http-signature'; + +import config from '@/config/index'; +import { envOption } from '../env'; + +import processDeliver from './processors/deliver'; +import processInbox from './processors/inbox'; +import processDb from './processors/db/index'; +import procesObjectStorage from './processors/object-storage/index'; +import { queueLogger } from './logger'; +import { DriveFile } from '@/models/entities/drive-file'; +import { getJobInfo } from './get-job-info'; +import { systemQueue, dbQueue, deliverQueue, inboxQueue, objectStorageQueue } from './queues'; +import { ThinUser } from './types'; +import { IActivity } from '@/remote/activitypub/type'; + +function renderError(e: Error): any { + return { + stack: e?.stack, + message: e?.message, + name: e?.name + }; +} + +const systemLogger = queueLogger.createSubLogger('system'); +const deliverLogger = queueLogger.createSubLogger('deliver'); +const inboxLogger = queueLogger.createSubLogger('inbox'); +const dbLogger = queueLogger.createSubLogger('db'); +const objectStorageLogger = queueLogger.createSubLogger('objectStorage'); + +systemQueue + .on('waiting', (jobId) => systemLogger.debug(`waiting id=${jobId}`)) + .on('active', (job) => systemLogger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => systemLogger.debug(`completed(${result}) id=${job.id}`)) + .on('failed', (job, err) => systemLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) })) + .on('error', (job: any, err: Error) => systemLogger.error(`error ${err}`, { job, e: renderError(err) })) + .on('stalled', (job) => systemLogger.warn(`stalled id=${job.id}`)); + +deliverQueue + .on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`)) + .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`)) + .on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`, { job, e: renderError(err) })) + .on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`)); + +inboxQueue + .on('waiting', (jobId) => inboxLogger.debug(`waiting id=${jobId}`)) + .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`)) + .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`)) + .on('failed', (job, err) => inboxLogger.warn(`failed(${err}) ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`, { job, e: renderError(err) })) + .on('error', (job: any, err: Error) => inboxLogger.error(`error ${err}`, { job, e: renderError(err) })) + .on('stalled', (job) => inboxLogger.warn(`stalled ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`)); + +dbQueue + .on('waiting', (jobId) => dbLogger.debug(`waiting id=${jobId}`)) + .on('active', (job) => dbLogger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => dbLogger.debug(`completed(${result}) id=${job.id}`)) + .on('failed', (job, err) => dbLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) })) + .on('error', (job: any, err: Error) => dbLogger.error(`error ${err}`, { job, e: renderError(err) })) + .on('stalled', (job) => dbLogger.warn(`stalled id=${job.id}`)); + +objectStorageQueue + .on('waiting', (jobId) => objectStorageLogger.debug(`waiting id=${jobId}`)) + .on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`)) + .on('failed', (job, err) => objectStorageLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) })) + .on('error', (job: any, err: Error) => objectStorageLogger.error(`error ${err}`, { job, e: renderError(err) })) + .on('stalled', (job) => objectStorageLogger.warn(`stalled id=${job.id}`)); + +export function deliver(user: ThinUser, content: unknown, to: string | null) { + if (content == null) return null; + if (to == null) return null; + + const data = { + user: { + id: user.id + }, + content, + to + }; + + return deliverQueue.add(data, { + attempts: config.deliverJobMaxAttempts || 12, + timeout: 1 * 60 * 1000, // 1min + backoff: { + type: 'apBackoff' + }, + removeOnComplete: true, + removeOnFail: true + }); +} + +export function inbox(activity: IActivity, signature: httpSignature.IParsedSignature) { + const data = { + activity: activity, + signature + }; + + return inboxQueue.add(data, { + attempts: config.inboxJobMaxAttempts || 8, + timeout: 5 * 60 * 1000, // 5min + backoff: { + type: 'apBackoff' + }, + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createDeleteDriveFilesJob(user: ThinUser) { + return dbQueue.add('deleteDriveFiles', { + user: user + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createExportNotesJob(user: ThinUser) { + return dbQueue.add('exportNotes', { + user: user + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createExportFollowingJob(user: ThinUser) { + return dbQueue.add('exportFollowing', { + user: user + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createExportMuteJob(user: ThinUser) { + return dbQueue.add('exportMute', { + user: user + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createExportBlockingJob(user: ThinUser) { + return dbQueue.add('exportBlocking', { + user: user + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createExportUserListsJob(user: ThinUser) { + return dbQueue.add('exportUserLists', { + user: user + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createImportFollowingJob(user: ThinUser, fileId: DriveFile['id']) { + return dbQueue.add('importFollowing', { + user: user, + fileId: fileId + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createImportMutingJob(user: ThinUser, fileId: DriveFile['id']) { + return dbQueue.add('importMuting', { + user: user, + fileId: fileId + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createImportBlockingJob(user: ThinUser, fileId: DriveFile['id']) { + return dbQueue.add('importBlocking', { + user: user, + fileId: fileId + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createImportUserListsJob(user: ThinUser, fileId: DriveFile['id']) { + return dbQueue.add('importUserLists', { + user: user, + fileId: fileId + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createDeleteAccountJob(user: ThinUser, opts: { soft?: boolean; } = {}) { + return dbQueue.add('deleteAccount', { + user: user, + soft: opts.soft + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createDeleteObjectStorageFileJob(key: string) { + return objectStorageQueue.add('deleteFile', { + key: key + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createCleanRemoteFilesJob() { + return objectStorageQueue.add('cleanRemoteFiles', {}, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export default function() { + if (envOption.onlyServer) return; + + deliverQueue.process(config.deliverJobConcurrency || 128, processDeliver); + inboxQueue.process(config.inboxJobConcurrency || 16, processInbox); + processDb(dbQueue); + procesObjectStorage(objectStorageQueue); + + systemQueue.add('resyncCharts', { + }, { + repeat: { cron: '0 0 * * *' } + }); +} + +export function destroy() { + deliverQueue.once('cleaned', (jobs, status) => { + deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); + }); + deliverQueue.clean(0, 'delayed'); + + inboxQueue.once('cleaned', (jobs, status) => { + inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); + }); + inboxQueue.clean(0, 'delayed'); +} diff --git a/packages/backend/src/queue/initialize.ts b/packages/backend/src/queue/initialize.ts new file mode 100644 index 0000000000..31102a3ed2 --- /dev/null +++ b/packages/backend/src/queue/initialize.ts @@ -0,0 +1,33 @@ +import * as Bull from 'bull'; +import config from '@/config/index'; + +export function initialize(name: string, limitPerSec = -1) { + return new Bull(name, { + redis: { + port: config.redis.port, + host: config.redis.host, + password: config.redis.pass, + db: config.redis.db || 0, + }, + prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : 'queue', + limiter: limitPerSec > 0 ? { + max: limitPerSec, + duration: 1000 + } : undefined, + settings: { + backoffStrategies: { + apBackoff + } + } + }); +} + +// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019 +function apBackoff(attemptsMade: number, err: Error) { + const baseDelay = 60 * 1000; // 1min + const maxBackoff = 8 * 60 * 60 * 1000; // 8hours + let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay; + backoff = Math.min(backoff, maxBackoff); + backoff += Math.round(backoff * Math.random() * 0.2); + return backoff; +} diff --git a/packages/backend/src/queue/logger.ts b/packages/backend/src/queue/logger.ts new file mode 100644 index 0000000000..f789b9d079 --- /dev/null +++ b/packages/backend/src/queue/logger.ts @@ -0,0 +1,3 @@ +import Logger from '@/services/logger'; + +export const queueLogger = new Logger('queue', 'orange'); diff --git a/packages/backend/src/queue/processors/db/delete-account.ts b/packages/backend/src/queue/processors/db/delete-account.ts new file mode 100644 index 0000000000..e54f38e35e --- /dev/null +++ b/packages/backend/src/queue/processors/db/delete-account.ts @@ -0,0 +1,94 @@ +import * as Bull from 'bull'; +import { queueLogger } from '../../logger'; +import { DriveFiles, Notes, UserProfiles, Users } from '@/models/index'; +import { DbUserDeleteJobData } from '@/queue/types'; +import { Note } from '@/models/entities/note'; +import { DriveFile } from '@/models/entities/drive-file'; +import { MoreThan } from 'typeorm'; +import { deleteFileSync } from '@/services/drive/delete-file'; +import { sendEmail } from '@/services/send-email'; + +const logger = queueLogger.createSubLogger('delete-account'); + +export async function deleteAccount(job: Bull.Job): Promise { + logger.info(`Deleting account of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + return; + } + + { // Delete notes + let cursor: Note['id'] | null = null; + + while (true) { + const notes = await Notes.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 + } + }); + + if (notes.length === 0) { + break; + } + + cursor = notes[notes.length - 1].id; + + await Notes.delete(notes.map(note => note.id)); + } + + logger.succ(`All of notes deleted`); + } + + { // Delete files + let cursor: DriveFile['id'] | null = null; + + while (true) { + const files = await DriveFiles.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 10, + order: { + id: 1 + } + }); + + if (files.length === 0) { + break; + } + + cursor = files[files.length - 1].id; + + for (const file of files) { + await deleteFileSync(file); + } + } + + logger.succ(`All of files deleted`); + } + + { // Send email notification + const profile = await UserProfiles.findOneOrFail(user.id); + if (profile.email && profile.emailVerified) { + sendEmail(profile.email, 'Account deleted', + `Your account has been deleted.`, + `Your account has been deleted.`); + } + } + + // soft指定されている場合は物理削除しない + if (job.data.soft) { + // nop + } else { + await Users.delete(job.data.user.id); + } + + return 'Account deleted'; +} diff --git a/packages/backend/src/queue/processors/db/delete-drive-files.ts b/packages/backend/src/queue/processors/db/delete-drive-files.ts new file mode 100644 index 0000000000..8a28468b0d --- /dev/null +++ b/packages/backend/src/queue/processors/db/delete-drive-files.ts @@ -0,0 +1,56 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import { deleteFileSync } from '@/services/drive/delete-file'; +import { Users, DriveFiles } from '@/models/index'; +import { MoreThan } from 'typeorm'; +import { DbUserJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('delete-drive-files'); + +export async function deleteDriveFiles(job: Bull.Job, done: any): Promise { + logger.info(`Deleting drive files of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + let deletedCount = 0; + let cursor: any = null; + + while (true) { + const files = await DriveFiles.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 + } + }); + + if (files.length === 0) { + job.progress(100); + break; + } + + cursor = files[files.length - 1].id; + + for (const file of files) { + await deleteFileSync(file); + deletedCount++; + } + + const total = await DriveFiles.count({ + userId: user.id, + }); + + job.progress(deletedCount / total); + } + + logger.succ(`All drive files (${deletedCount}) of ${user.id} has been deleted.`); + done(); +} diff --git a/packages/backend/src/queue/processors/db/export-blocking.ts b/packages/backend/src/queue/processors/db/export-blocking.ts new file mode 100644 index 0000000000..8b8aa259d4 --- /dev/null +++ b/packages/backend/src/queue/processors/db/export-blocking.ts @@ -0,0 +1,94 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; + +import { queueLogger } from '../../logger'; +import addFile from '@/services/drive/add-file'; +import * as dateFormat from 'dateformat'; +import { getFullApAccount } from '@/misc/convert-host'; +import { Users, Blockings } from '@/models/index'; +import { MoreThan } from 'typeorm'; +import { DbUserJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('export-blocking'); + +export async function exportBlocking(job: Bull.Job, done: any): Promise { + logger.info(`Exporting blocking of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + const stream = fs.createWriteStream(path, { flags: 'a' }); + + let exportedCount = 0; + let cursor: any = null; + + while (true) { + const blockings = await Blockings.find({ + where: { + blockerId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 + } + }); + + if (blockings.length === 0) { + job.progress(100); + break; + } + + cursor = blockings[blockings.length - 1].id; + + for (const block of blockings) { + const u = await Users.findOne({ id: block.blockeeId }); + if (u == null) { + exportedCount++; continue; + } + + const content = getFullApAccount(u.username, u.host); + await new Promise((res, rej) => { + stream.write(content + '\n', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + exportedCount++; + } + + const total = await Blockings.count({ + blockerId: user.id, + }); + + job.progress(exportedCount / total); + } + + stream.end(); + logger.succ(`Exported to: ${path}`); + + const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; + const driveFile = await addFile(user, path, fileName, null, null, true); + + logger.succ(`Exported to: ${driveFile.id}`); + cleanup(); + done(); +} diff --git a/packages/backend/src/queue/processors/db/export-following.ts b/packages/backend/src/queue/processors/db/export-following.ts new file mode 100644 index 0000000000..a0ecf5f560 --- /dev/null +++ b/packages/backend/src/queue/processors/db/export-following.ts @@ -0,0 +1,94 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; + +import { queueLogger } from '../../logger'; +import addFile from '@/services/drive/add-file'; +import * as dateFormat from 'dateformat'; +import { getFullApAccount } from '@/misc/convert-host'; +import { Users, Followings } from '@/models/index'; +import { MoreThan } from 'typeorm'; +import { DbUserJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('export-following'); + +export async function exportFollowing(job: Bull.Job, done: any): Promise { + logger.info(`Exporting following of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + const stream = fs.createWriteStream(path, { flags: 'a' }); + + let exportedCount = 0; + let cursor: any = null; + + while (true) { + const followings = await Followings.find({ + where: { + followerId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 + } + }); + + if (followings.length === 0) { + job.progress(100); + break; + } + + cursor = followings[followings.length - 1].id; + + for (const following of followings) { + const u = await Users.findOne({ id: following.followeeId }); + if (u == null) { + exportedCount++; continue; + } + + const content = getFullApAccount(u.username, u.host); + await new Promise((res, rej) => { + stream.write(content + '\n', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + exportedCount++; + } + + const total = await Followings.count({ + followerId: user.id, + }); + + job.progress(exportedCount / total); + } + + stream.end(); + logger.succ(`Exported to: ${path}`); + + const fileName = 'following-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; + const driveFile = await addFile(user, path, fileName, null, null, true); + + logger.succ(`Exported to: ${driveFile.id}`); + cleanup(); + done(); +} diff --git a/packages/backend/src/queue/processors/db/export-mute.ts b/packages/backend/src/queue/processors/db/export-mute.ts new file mode 100644 index 0000000000..d5976f7d56 --- /dev/null +++ b/packages/backend/src/queue/processors/db/export-mute.ts @@ -0,0 +1,94 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; + +import { queueLogger } from '../../logger'; +import addFile from '@/services/drive/add-file'; +import * as dateFormat from 'dateformat'; +import { getFullApAccount } from '@/misc/convert-host'; +import { Users, Mutings } from '@/models/index'; +import { MoreThan } from 'typeorm'; +import { DbUserJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('export-mute'); + +export async function exportMute(job: Bull.Job, done: any): Promise { + logger.info(`Exporting mute of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + const stream = fs.createWriteStream(path, { flags: 'a' }); + + let exportedCount = 0; + let cursor: any = null; + + while (true) { + const mutes = await Mutings.find({ + where: { + muterId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 + } + }); + + if (mutes.length === 0) { + job.progress(100); + break; + } + + cursor = mutes[mutes.length - 1].id; + + for (const mute of mutes) { + const u = await Users.findOne({ id: mute.muteeId }); + if (u == null) { + exportedCount++; continue; + } + + const content = getFullApAccount(u.username, u.host); + await new Promise((res, rej) => { + stream.write(content + '\n', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + exportedCount++; + } + + const total = await Mutings.count({ + muterId: user.id, + }); + + job.progress(exportedCount / total); + } + + stream.end(); + logger.succ(`Exported to: ${path}`); + + const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; + const driveFile = await addFile(user, path, fileName, null, null, true); + + logger.succ(`Exported to: ${driveFile.id}`); + cleanup(); + done(); +} diff --git a/packages/backend/src/queue/processors/db/export-notes.ts b/packages/backend/src/queue/processors/db/export-notes.ts new file mode 100644 index 0000000000..49850aa706 --- /dev/null +++ b/packages/backend/src/queue/processors/db/export-notes.ts @@ -0,0 +1,133 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; + +import { queueLogger } from '../../logger'; +import addFile from '@/services/drive/add-file'; +import * as dateFormat from 'dateformat'; +import { Users, Notes, Polls } from '@/models/index'; +import { MoreThan } from 'typeorm'; +import { Note } from '@/models/entities/note'; +import { Poll } from '@/models/entities/poll'; +import { DbUserJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('export-notes'); + +export async function exportNotes(job: Bull.Job, done: any): Promise { + logger.info(`Exporting notes of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + const stream = fs.createWriteStream(path, { flags: 'a' }); + + await new Promise((res, rej) => { + stream.write('[', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + + let exportedNotesCount = 0; + let cursor: any = null; + + while (true) { + const notes = await Notes.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 + } + }); + + if (notes.length === 0) { + job.progress(100); + break; + } + + cursor = notes[notes.length - 1].id; + + for (const note of notes) { + let poll: Poll | undefined; + if (note.hasPoll) { + poll = await Polls.findOneOrFail({ noteId: note.id }); + } + const content = JSON.stringify(serialize(note, poll)); + await new Promise((res, rej) => { + stream.write(exportedNotesCount === 0 ? content : ',\n' + content, err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + exportedNotesCount++; + } + + const total = await Notes.count({ + userId: user.id, + }); + + job.progress(exportedNotesCount / total); + } + + await new Promise((res, rej) => { + stream.write(']', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + + stream.end(); + logger.succ(`Exported to: ${path}`); + + const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.json'; + const driveFile = await addFile(user, path, fileName, null, null, true); + + logger.succ(`Exported to: ${driveFile.id}`); + cleanup(); + done(); +} + +function serialize(note: Note, poll: Poll | null = null): any { + return { + id: note.id, + text: note.text, + createdAt: note.createdAt, + fileIds: note.fileIds, + replyId: note.replyId, + renoteId: note.renoteId, + poll: poll, + cw: note.cw, + viaMobile: note.viaMobile, + visibility: note.visibility, + visibleUserIds: note.visibleUserIds, + localOnly: note.localOnly + }; +} diff --git a/packages/backend/src/queue/processors/db/export-user-lists.ts b/packages/backend/src/queue/processors/db/export-user-lists.ts new file mode 100644 index 0000000000..8a86c4df5d --- /dev/null +++ b/packages/backend/src/queue/processors/db/export-user-lists.ts @@ -0,0 +1,71 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; + +import { queueLogger } from '../../logger'; +import addFile from '@/services/drive/add-file'; +import * as dateFormat from 'dateformat'; +import { getFullApAccount } from '@/misc/convert-host'; +import { Users, UserLists, UserListJoinings } from '@/models/index'; +import { In } from 'typeorm'; +import { DbUserJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('export-user-lists'); + +export async function exportUserLists(job: Bull.Job, done: any): Promise { + logger.info(`Exporting user lists of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + const lists = await UserLists.find({ + userId: user.id + }); + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + const stream = fs.createWriteStream(path, { flags: 'a' }); + + for (const list of lists) { + const joinings = await UserListJoinings.find({ userListId: list.id }); + const users = await Users.find({ + id: In(joinings.map(j => j.userId)) + }); + + for (const u of users) { + const acct = getFullApAccount(u.username, u.host); + const content = `${list.name},${acct}`; + await new Promise((res, rej) => { + stream.write(content + '\n', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + } + } + + stream.end(); + logger.succ(`Exported to: ${path}`); + + const fileName = 'user-lists-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; + const driveFile = await addFile(user, path, fileName, null, null, true); + + logger.succ(`Exported to: ${driveFile.id}`); + cleanup(); + done(); +} diff --git a/packages/backend/src/queue/processors/db/import-blocking.ts b/packages/backend/src/queue/processors/db/import-blocking.ts new file mode 100644 index 0000000000..2e77107034 --- /dev/null +++ b/packages/backend/src/queue/processors/db/import-blocking.ts @@ -0,0 +1,74 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import * as Acct from 'misskey-js/built/acct'; +import { resolveUser } from '@/remote/resolve-user'; +import { downloadTextFile } from '@/misc/download-text-file'; +import { isSelfHost, toPuny } from '@/misc/convert-host'; +import { Users, DriveFiles, Blockings } from '@/models/index'; +import { DbUserImportJobData } from '@/queue/types'; +import block from '@/services/blocking/create'; + +const logger = queueLogger.createSubLogger('import-blocking'); + +export async function importBlocking(job: Bull.Job, done: any): Promise { + logger.info(`Importing blocking of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + const file = await DriveFiles.findOne({ + id: job.data.fileId + }); + if (file == null) { + done(); + return; + } + + const csv = await downloadTextFile(file.url); + + let linenum = 0; + + for (const line of csv.trim().split('\n')) { + linenum++; + + try { + const acct = line.split(',')[0].trim(); + const { username, host } = Acct.parse(acct); + + let target = isSelfHost(host!) ? await Users.findOne({ + host: null, + usernameLower: username.toLowerCase() + }) : await Users.findOne({ + host: toPuny(host!), + usernameLower: username.toLowerCase() + }); + + if (host == null && target == null) continue; + + if (target == null) { + target = await resolveUser(username, host); + } + + if (target == null) { + throw `cannot resolve user: @${username}@${host}`; + } + + // skip myself + if (target.id === job.data.user.id) continue; + + logger.info(`Block[${linenum}] ${target.id} ...`); + + await block(user, target); + } catch (e) { + logger.warn(`Error in line:${linenum} ${e}`); + } + } + + logger.succ('Imported'); + done(); +} + diff --git a/packages/backend/src/queue/processors/db/import-following.ts b/packages/backend/src/queue/processors/db/import-following.ts new file mode 100644 index 0000000000..2bd079e4bc --- /dev/null +++ b/packages/backend/src/queue/processors/db/import-following.ts @@ -0,0 +1,73 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import follow from '@/services/following/create'; +import * as Acct from 'misskey-js/built/acct'; +import { resolveUser } from '@/remote/resolve-user'; +import { downloadTextFile } from '@/misc/download-text-file'; +import { isSelfHost, toPuny } from '@/misc/convert-host'; +import { Users, DriveFiles } from '@/models/index'; +import { DbUserImportJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('import-following'); + +export async function importFollowing(job: Bull.Job, done: any): Promise { + logger.info(`Importing following of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + const file = await DriveFiles.findOne({ + id: job.data.fileId + }); + if (file == null) { + done(); + return; + } + + const csv = await downloadTextFile(file.url); + + let linenum = 0; + + for (const line of csv.trim().split('\n')) { + linenum++; + + try { + const acct = line.split(',')[0].trim(); + const { username, host } = Acct.parse(acct); + + let target = isSelfHost(host!) ? await Users.findOne({ + host: null, + usernameLower: username.toLowerCase() + }) : await Users.findOne({ + host: toPuny(host!), + usernameLower: username.toLowerCase() + }); + + if (host == null && target == null) continue; + + if (target == null) { + target = await resolveUser(username, host); + } + + if (target == null) { + throw `cannot resolve user: @${username}@${host}`; + } + + // skip myself + if (target.id === job.data.user.id) continue; + + logger.info(`Follow[${linenum}] ${target.id} ...`); + + follow(user, target); + } catch (e) { + logger.warn(`Error in line:${linenum} ${e}`); + } + } + + logger.succ('Imported'); + done(); +} diff --git a/packages/backend/src/queue/processors/db/import-muting.ts b/packages/backend/src/queue/processors/db/import-muting.ts new file mode 100644 index 0000000000..8060980625 --- /dev/null +++ b/packages/backend/src/queue/processors/db/import-muting.ts @@ -0,0 +1,83 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import * as Acct from 'misskey-js/built/acct'; +import { resolveUser } from '@/remote/resolve-user'; +import { downloadTextFile } from '@/misc/download-text-file'; +import { isSelfHost, toPuny } from '@/misc/convert-host'; +import { Users, DriveFiles, Mutings } from '@/models/index'; +import { DbUserImportJobData } from '@/queue/types'; +import { User } from '@/models/entities/user'; +import { genId } from '@/misc/gen-id'; + +const logger = queueLogger.createSubLogger('import-muting'); + +export async function importMuting(job: Bull.Job, done: any): Promise { + logger.info(`Importing muting of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + const file = await DriveFiles.findOne({ + id: job.data.fileId + }); + if (file == null) { + done(); + return; + } + + const csv = await downloadTextFile(file.url); + + let linenum = 0; + + for (const line of csv.trim().split('\n')) { + linenum++; + + try { + const acct = line.split(',')[0].trim(); + const { username, host } = Acct.parse(acct); + + let target = isSelfHost(host!) ? await Users.findOne({ + host: null, + usernameLower: username.toLowerCase() + }) : await Users.findOne({ + host: toPuny(host!), + usernameLower: username.toLowerCase() + }); + + if (host == null && target == null) continue; + + if (target == null) { + target = await resolveUser(username, host); + } + + if (target == null) { + throw `cannot resolve user: @${username}@${host}`; + } + + // skip myself + if (target.id === job.data.user.id) continue; + + logger.info(`Mute[${linenum}] ${target.id} ...`); + + await mute(user, target); + } catch (e) { + logger.warn(`Error in line:${linenum} ${e}`); + } + } + + logger.succ('Imported'); + done(); +} + +async function mute(user: User, target: User) { + await Mutings.insert({ + id: genId(), + createdAt: new Date(), + muterId: user.id, + muteeId: target.id, + }); +} diff --git a/packages/backend/src/queue/processors/db/import-user-lists.ts b/packages/backend/src/queue/processors/db/import-user-lists.ts new file mode 100644 index 0000000000..46b728b387 --- /dev/null +++ b/packages/backend/src/queue/processors/db/import-user-lists.ts @@ -0,0 +1,80 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import * as Acct from 'misskey-js/built/acct'; +import { resolveUser } from '@/remote/resolve-user'; +import { pushUserToUserList } from '@/services/user-list/push'; +import { downloadTextFile } from '@/misc/download-text-file'; +import { isSelfHost, toPuny } from '@/misc/convert-host'; +import { DriveFiles, Users, UserLists, UserListJoinings } from '@/models/index'; +import { genId } from '@/misc/gen-id'; +import { DbUserImportJobData } from '@/queue/types'; + +const logger = queueLogger.createSubLogger('import-user-lists'); + +export async function importUserLists(job: Bull.Job, done: any): Promise { + logger.info(`Importing user lists of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + const file = await DriveFiles.findOne({ + id: job.data.fileId + }); + if (file == null) { + done(); + return; + } + + const csv = await downloadTextFile(file.url); + + let linenum = 0; + + for (const line of csv.trim().split('\n')) { + linenum++; + + try { + const listName = line.split(',')[0].trim(); + const { username, host } = Acct.parse(line.split(',')[1].trim()); + + let list = await UserLists.findOne({ + userId: user.id, + name: listName + }); + + if (list == null) { + list = await UserLists.save({ + id: genId(), + createdAt: new Date(), + userId: user.id, + name: listName, + userIds: [] + }); + } + + let target = isSelfHost(host!) ? await Users.findOne({ + host: null, + usernameLower: username.toLowerCase() + }) : await Users.findOne({ + host: toPuny(host!), + usernameLower: username.toLowerCase() + }); + + if (target == null) { + target = await resolveUser(username, host); + } + + if (await UserListJoinings.findOne({ userListId: list.id, userId: target.id }) != null) continue; + + pushUserToUserList(target, list); + } catch (e) { + logger.warn(`Error in line:${linenum} ${e}`); + } + } + + logger.succ('Imported'); + done(); +} diff --git a/packages/backend/src/queue/processors/db/index.ts b/packages/backend/src/queue/processors/db/index.ts new file mode 100644 index 0000000000..97087642b7 --- /dev/null +++ b/packages/backend/src/queue/processors/db/index.ts @@ -0,0 +1,33 @@ +import * as Bull from 'bull'; +import { DbJobData } from '@/queue/types'; +import { deleteDriveFiles } from './delete-drive-files'; +import { exportNotes } from './export-notes'; +import { exportFollowing } from './export-following'; +import { exportMute } from './export-mute'; +import { exportBlocking } from './export-blocking'; +import { exportUserLists } from './export-user-lists'; +import { importFollowing } from './import-following'; +import { importUserLists } from './import-user-lists'; +import { deleteAccount } from './delete-account'; +import { importMuting } from './import-muting'; +import { importBlocking } from './import-blocking'; + +const jobs = { + deleteDriveFiles, + exportNotes, + exportFollowing, + exportMute, + exportBlocking, + exportUserLists, + importFollowing, + importMuting, + importBlocking, + importUserLists, + deleteAccount, +} as Record | Bull.ProcessPromiseFunction>; + +export default function(dbQueue: Bull.Queue) { + for (const [k, v] of Object.entries(jobs)) { + dbQueue.process(k, v); + } +} diff --git a/packages/backend/src/queue/processors/deliver.ts b/packages/backend/src/queue/processors/deliver.ts new file mode 100644 index 0000000000..3c61896a2f --- /dev/null +++ b/packages/backend/src/queue/processors/deliver.ts @@ -0,0 +1,94 @@ +import { URL } from 'url'; +import * as Bull from 'bull'; +import request from '@/remote/activitypub/request'; +import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc'; +import Logger from '@/services/logger'; +import { Instances } from '@/models/index'; +import { instanceChart } from '@/services/chart/index'; +import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata'; +import { fetchMeta } from '@/misc/fetch-meta'; +import { toPuny } from '@/misc/convert-host'; +import { Cache } from '@/misc/cache'; +import { Instance } from '@/models/entities/instance'; +import { DeliverJobData } from '../types'; +import { StatusError } from '@/misc/fetch'; + +const logger = new Logger('deliver'); + +let latest: string | null = null; + +const suspendedHostsCache = new Cache(1000 * 60 * 60); + +export default async (job: Bull.Job) => { + const { host } = new URL(job.data.to); + + // ブロックしてたら中断 + const meta = await fetchMeta(); + if (meta.blockedHosts.includes(toPuny(host))) { + return 'skip (blocked)'; + } + + // isSuspendedなら中断 + let suspendedHosts = suspendedHostsCache.get(null); + if (suspendedHosts == null) { + suspendedHosts = await Instances.find({ + where: { + isSuspended: true + }, + }); + suspendedHostsCache.set(null, suspendedHosts); + } + if (suspendedHosts.map(x => x.host).includes(toPuny(host))) { + return 'skip (suspended)'; + } + + try { + if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) { + logger.debug(`delivering ${latest}`); + } + + await request(job.data.user, job.data.to, job.data.content); + + // Update stats + registerOrFetchInstanceDoc(host).then(i => { + Instances.update(i.id, { + latestRequestSentAt: new Date(), + latestStatus: 200, + lastCommunicatedAt: new Date(), + isNotResponding: false + }); + + fetchInstanceMetadata(i); + + instanceChart.requestSent(i.host, true); + }); + + return 'Success'; + } catch (res) { + // Update stats + registerOrFetchInstanceDoc(host).then(i => { + Instances.update(i.id, { + latestRequestSentAt: new Date(), + latestStatus: res instanceof StatusError ? res.statusCode : null, + isNotResponding: true + }); + + instanceChart.requestSent(i.host, false); + }); + + if (res instanceof StatusError) { + // 4xx + if (res.isClientError) { + // HTTPステータスコード4xxはクライアントエラーであり、それはつまり + // 何回再送しても成功することはないということなのでエラーにはしないでおく + return `${res.statusCode} ${res.statusMessage}`; + } + + // 5xx etc. + throw `${res.statusCode} ${res.statusMessage}`; + } else { + // DNS error, socket error, timeout ... + throw res; + } + } +}; diff --git a/packages/backend/src/queue/processors/inbox.ts b/packages/backend/src/queue/processors/inbox.ts new file mode 100644 index 0000000000..4032ce8653 --- /dev/null +++ b/packages/backend/src/queue/processors/inbox.ts @@ -0,0 +1,149 @@ +import { URL } from 'url'; +import * as Bull from 'bull'; +import * as httpSignature from 'http-signature'; +import perform from '@/remote/activitypub/perform'; +import Logger from '@/services/logger'; +import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc'; +import { Instances } from '@/models/index'; +import { instanceChart } from '@/services/chart/index'; +import { fetchMeta } from '@/misc/fetch-meta'; +import { toPuny, extractDbHost } from '@/misc/convert-host'; +import { getApId } from '@/remote/activitypub/type'; +import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata'; +import { InboxJobData } from '../types'; +import DbResolver from '@/remote/activitypub/db-resolver'; +import { resolvePerson } from '@/remote/activitypub/models/person'; +import { LdSignature } from '@/remote/activitypub/misc/ld-signature'; +import { StatusError } from '@/misc/fetch'; + +const logger = new Logger('inbox'); + +// ユーザーのinboxにアクティビティが届いた時の処理 +export default async (job: Bull.Job): Promise => { + const signature = job.data.signature; // HTTP-signature + const activity = job.data.activity; + + //#region Log + const info = Object.assign({}, activity) as any; + delete info['@context']; + logger.debug(JSON.stringify(info, null, 2)); + //#endregion + + const host = toPuny(new URL(signature.keyId).hostname); + + // ブロックしてたら中断 + const meta = await fetchMeta(); + if (meta.blockedHosts.includes(host)) { + return `Blocked request: ${host}`; + } + + const keyIdLower = signature.keyId.toLowerCase(); + if (keyIdLower.startsWith('acct:')) { + return `Old keyId is no longer supported. ${keyIdLower}`; + } + + // TDOO: キャッシュ + const dbResolver = new DbResolver(); + + // HTTP-Signature keyIdを元にDBから取得 + let authUser = await dbResolver.getAuthUserFromKeyId(signature.keyId); + + // keyIdでわからなければ、activity.actorを元にDBから取得 || activity.actorを元にリモートから取得 + if (authUser == null) { + try { + authUser = await dbResolver.getAuthUserFromApId(getApId(activity.actor)); + } catch (e) { + // 対象が4xxならスキップ + if (e instanceof StatusError && e.isClientError) { + return `skip: Ignored deleted actors on both ends ${activity.actor} - ${e.statusCode}`; + } + throw `Error in actor ${activity.actor} - ${e.statusCode || e}`; + } + } + + // それでもわからなければ終了 + if (authUser == null) { + return `skip: failed to resolve user`; + } + + // publicKey がなくても終了 + if (authUser.key == null) { + return `skip: failed to resolve user publicKey`; + } + + // HTTP-Signatureの検証 + const httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem); + + // また、signatureのsignerは、activity.actorと一致する必要がある + if (!httpSignatureValidated || authUser.user.uri !== activity.actor) { + // 一致しなくても、でもLD-Signatureがありそうならそっちも見る + if (activity.signature) { + if (activity.signature.type !== 'RsaSignature2017') { + return `skip: unsupported LD-signature type ${activity.signature.type}`; + } + + // activity.signature.creator: https://example.oom/users/user#main-key + // みたいになっててUserを引っ張れば公開キーも入ることを期待する + if (activity.signature.creator) { + const candicate = activity.signature.creator.replace(/#.*/, ''); + await resolvePerson(candicate).catch(() => null); + } + + // keyIdからLD-Signatureのユーザーを取得 + authUser = await dbResolver.getAuthUserFromKeyId(activity.signature.creator); + if (authUser == null) { + return `skip: LD-Signatureのユーザーが取得できませんでした`; + } + + if (authUser.key == null) { + return `skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした`; + } + + // LD-Signature検証 + const ldSignature = new LdSignature(); + const verified = await ldSignature.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false); + if (!verified) { + return `skip: LD-Signatureの検証に失敗しました`; + } + + // もう一度actorチェック + if (authUser.user.uri !== activity.actor) { + return `skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`; + } + + // ブロックしてたら中断 + const ldHost = extractDbHost(authUser.user.uri); + if (meta.blockedHosts.includes(ldHost)) { + return `Blocked request: ${ldHost}`; + } + } else { + return `skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`; + } + } + + // activity.idがあればホストが署名者のホストであることを確認する + if (typeof activity.id === 'string') { + const signerHost = extractDbHost(authUser.user.uri!); + const activityIdHost = extractDbHost(activity.id); + if (signerHost !== activityIdHost) { + return `skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`; + } + } + + // Update stats + registerOrFetchInstanceDoc(authUser.user.host).then(i => { + Instances.update(i.id, { + latestRequestReceivedAt: new Date(), + lastCommunicatedAt: new Date(), + isNotResponding: false + }); + + fetchInstanceMetadata(i); + + instanceChart.requestReceived(i.host); + }); + + // アクティビティを処理 + await perform(authUser.user, activity); + return `ok`; +}; diff --git a/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts b/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts new file mode 100644 index 0000000000..3b2e4ea939 --- /dev/null +++ b/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts @@ -0,0 +1,50 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import { deleteFileSync } from '@/services/drive/delete-file'; +import { DriveFiles } from '@/models/index'; +import { MoreThan, Not, IsNull } from 'typeorm'; + +const logger = queueLogger.createSubLogger('clean-remote-files'); + +export default async function cleanRemoteFiles(job: Bull.Job<{}>, done: any): Promise { + logger.info(`Deleting cached remote files...`); + + let deletedCount = 0; + let cursor: any = null; + + while (true) { + const files = await DriveFiles.find({ + where: { + userHost: Not(IsNull()), + isLink: false, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 8, + order: { + id: 1 + } + }); + + if (files.length === 0) { + job.progress(100); + break; + } + + cursor = files[files.length - 1].id; + + await Promise.all(files.map(file => deleteFileSync(file, true))); + + deletedCount += 8; + + const total = await DriveFiles.count({ + userHost: Not(IsNull()), + isLink: false, + }); + + job.progress(deletedCount / total); + } + + logger.succ(`All cahced remote files has been deleted.`); + done(); +} diff --git a/packages/backend/src/queue/processors/object-storage/delete-file.ts b/packages/backend/src/queue/processors/object-storage/delete-file.ts new file mode 100644 index 0000000000..ed22968a27 --- /dev/null +++ b/packages/backend/src/queue/processors/object-storage/delete-file.ts @@ -0,0 +1,11 @@ +import { ObjectStorageFileJobData } from '@/queue/types'; +import * as Bull from 'bull'; +import { deleteObjectStorageFile } from '@/services/drive/delete-file'; + +export default async (job: Bull.Job) => { + const key: string = job.data.key; + + await deleteObjectStorageFile(key); + + return 'Success'; +}; diff --git a/packages/backend/src/queue/processors/object-storage/index.ts b/packages/backend/src/queue/processors/object-storage/index.ts new file mode 100644 index 0000000000..0d9570e179 --- /dev/null +++ b/packages/backend/src/queue/processors/object-storage/index.ts @@ -0,0 +1,15 @@ +import * as Bull from 'bull'; +import { ObjectStorageJobData } from '@/queue/types'; +import deleteFile from './delete-file'; +import cleanRemoteFiles from './clean-remote-files'; + +const jobs = { + deleteFile, + cleanRemoteFiles, +} as Record | Bull.ProcessPromiseFunction>; + +export default function(q: Bull.Queue) { + for (const [k, v] of Object.entries(jobs)) { + q.process(k, 16, v); + } +} diff --git a/packages/backend/src/queue/processors/system/index.ts b/packages/backend/src/queue/processors/system/index.ts new file mode 100644 index 0000000000..52b7868105 --- /dev/null +++ b/packages/backend/src/queue/processors/system/index.ts @@ -0,0 +1,12 @@ +import * as Bull from 'bull'; +import { resyncCharts } from './resync-charts'; + +const jobs = { + resyncCharts, +} as Record | Bull.ProcessPromiseFunction<{}>>; + +export default function(dbQueue: Bull.Queue<{}>) { + for (const [k, v] of Object.entries(jobs)) { + dbQueue.process(k, v); + } +} diff --git a/packages/backend/src/queue/processors/system/resync-charts.ts b/packages/backend/src/queue/processors/system/resync-charts.ts new file mode 100644 index 0000000000..b36b024cfb --- /dev/null +++ b/packages/backend/src/queue/processors/system/resync-charts.ts @@ -0,0 +1,21 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import { driveChart, notesChart, usersChart } from '@/services/chart/index'; + +const logger = queueLogger.createSubLogger('resync-charts'); + +export default async function resyncCharts(job: Bull.Job<{}>, done: any): Promise { + logger.info(`Resync charts...`); + + // TODO: ユーザーごとのチャートも更新する + // TODO: インスタンスごとのチャートも更新する + await Promise.all([ + driveChart.resync(), + notesChart.resync(), + usersChart.resync(), + ]); + + logger.succ(`All charts successfully resynced.`); + done(); +} diff --git a/packages/backend/src/queue/queues.ts b/packages/backend/src/queue/queues.ts new file mode 100644 index 0000000000..a66a7ca451 --- /dev/null +++ b/packages/backend/src/queue/queues.ts @@ -0,0 +1,9 @@ +import config from '@/config/index'; +import { initialize as initializeQueue } from './initialize'; +import { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData } from './types'; + +export const systemQueue = initializeQueue<{}>('system'); +export const deliverQueue = initializeQueue('deliver', config.deliverJobPerSec || 128); +export const inboxQueue = initializeQueue('inbox', config.inboxJobPerSec || 16); +export const dbQueue = initializeQueue('db'); +export const objectStorageQueue = initializeQueue('objectStorage'); diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts new file mode 100644 index 0000000000..39cab29966 --- /dev/null +++ b/packages/backend/src/queue/types.ts @@ -0,0 +1,44 @@ +import { DriveFile } from '@/models/entities/drive-file'; +import { User } from '@/models/entities/user'; +import { IActivity } from '@/remote/activitypub/type'; +import * as httpSignature from 'http-signature'; + +export type DeliverJobData = { + /** Actor */ + user: ThinUser; + /** Activity */ + content: unknown; + /** inbox URL to deliver */ + to: string; +}; + +export type InboxJobData = { + activity: IActivity; + signature: httpSignature.IParsedSignature; +}; + +export type DbJobData = DbUserJobData | DbUserImportJobData | DbUserDeleteJobData; + +export type DbUserJobData = { + user: ThinUser; +}; + +export type DbUserDeleteJobData = { + user: ThinUser; + soft?: boolean; +}; + +export type DbUserImportJobData = { + user: ThinUser; + fileId: DriveFile['id']; +}; + +export type ObjectStorageJobData = ObjectStorageFileJobData | {}; + +export type ObjectStorageFileJobData = { + key: string; +}; + +export type ThinUser = { + id: User['id']; +}; -- cgit v1.2.3-freya From b9eaf906e7b7202d06c9fea72b6d3c422a03f81e Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 12 Nov 2021 10:52:10 +0900 Subject: fix lint errors --- packages/backend/.eslintrc.js | 3 ++ packages/backend/@types/jsrsasign.d.ts | 2 +- packages/backend/src/mfm/from-html.ts | 3 +- packages/backend/src/misc/cafy-id.ts | 1 + packages/backend/src/misc/gen-avatar.ts | 2 +- packages/backend/src/prelude/array.ts | 2 +- packages/backend/src/prelude/url.ts | 2 +- .../object-storage/clean-remote-files.ts | 2 +- .../backend/src/queue/processors/system/index.ts | 4 +-- .../src/queue/processors/system/resync-charts.ts | 2 +- packages/backend/src/queue/queues.ts | 2 +- packages/backend/src/queue/types.ts | 2 +- .../src/remote/activitypub/models/person.ts | 2 +- .../activitypub/renderer/ordered-collection.ts | 2 +- packages/backend/src/server/api/define.ts | 6 ++-- .../src/server/api/endpoints/channels/update.ts | 2 +- .../src/server/api/endpoints/i/2fa/key-done.ts | 2 +- packages/backend/src/server/api/limiter.ts | 8 ++--- .../server/api/stream/channels/games/reversi.ts | 3 +- packages/backend/src/server/api/stream/types.ts | 2 +- packages/backend/src/services/chart/core.ts | 16 +++++----- packages/backend/src/services/stream.ts | 34 +++++++++++----------- 22 files changed, 55 insertions(+), 49 deletions(-) (limited to 'packages/backend/src/queue') diff --git a/packages/backend/.eslintrc.js b/packages/backend/.eslintrc.js index 952a2ee9ef..bafd0c9b63 100644 --- a/packages/backend/.eslintrc.js +++ b/packages/backend/.eslintrc.js @@ -22,6 +22,7 @@ module.exports = { 'eol-last': ['error', 'always'], 'semi': ['error', 'always'], 'quotes': ['warn', 'single'], + 'comma-dangle': ['warn', 'always-multiline'], 'keyword-spacing': ['error', { 'before': true, 'after': true, @@ -44,6 +45,8 @@ module.exports = { 'no-multi-spaces': ['warn'], 'no-control-regex': ['warn'], 'no-empty': ['warn'], + 'no-inner-declarations': ['off'], + 'no-sparse-arrays': ['off'], '@typescript-eslint/no-var-requires': ['warn'], '@typescript-eslint/no-inferrable-types': ['warn'], '@typescript-eslint/no-empty-function': ['off'], diff --git a/packages/backend/@types/jsrsasign.d.ts b/packages/backend/@types/jsrsasign.d.ts index bc9d746f7e..bb52f8f64e 100644 --- a/packages/backend/@types/jsrsasign.d.ts +++ b/packages/backend/@types/jsrsasign.d.ts @@ -171,7 +171,7 @@ declare module 'jsrsasign' { public static getTLVbyList(h: ASN1S, currentIndex: Idx, nthList: Mutable, checkingTag?: string): ASN1TLV; - // tslint:disable-next-line:bool-param-default + // eslint:disable-next-line:bool-param-default public static getVbyList(h: ASN1S, currentIndex: Idx, nthList: Mutable, checkingTag?: string, removeUnusedbits?: boolean): ASN1V; public static hextooidstr(hex: ASN1OIDV): OID; diff --git a/packages/backend/src/mfm/from-html.ts b/packages/backend/src/mfm/from-html.ts index de6aa3d0cc..43e16d80c5 100644 --- a/packages/backend/src/mfm/from-html.ts +++ b/packages/backend/src/mfm/from-html.ts @@ -48,9 +48,10 @@ export function fromHtml(html: string, hashtagNames?: string[]): string | null { if (!treeAdapter.isElementNode(node)) return; switch (node.nodeName) { - case 'br': + case 'br': { text += '\n'; break; + } case 'a': { diff --git a/packages/backend/src/misc/cafy-id.ts b/packages/backend/src/misc/cafy-id.ts index 39886611e1..dd81c5c4cf 100644 --- a/packages/backend/src/misc/cafy-id.ts +++ b/packages/backend/src/misc/cafy-id.ts @@ -1,5 +1,6 @@ import { Context } from 'cafy'; +// eslint-disable-next-line @typescript-eslint/ban-types export class ID extends Context { public readonly name = 'ID'; diff --git a/packages/backend/src/misc/gen-avatar.ts b/packages/backend/src/misc/gen-avatar.ts index f03ca9f96d..8838ec8d15 100644 --- a/packages/backend/src/misc/gen-avatar.ts +++ b/packages/backend/src/misc/gen-avatar.ts @@ -56,7 +56,7 @@ export function genAvatar(seed: string, stream: WriteStream): Promise { // 1*n (filled by false) const center: boolean[] = new Array(n).fill(false); - // tslint:disable-next-line:prefer-for-of + // eslint:disable-next-line:prefer-for-of for (let x = 0; x < side.length; x++) { for (let y = 0; y < side[x].length; y++) { side[x][y] = rand(3) === 0; diff --git a/packages/backend/src/prelude/array.ts b/packages/backend/src/prelude/array.ts index d63f0475d0..1e9e62b895 100644 --- a/packages/backend/src/prelude/array.ts +++ b/packages/backend/src/prelude/array.ts @@ -87,7 +87,7 @@ export function groupOn(f: (x: T) => S, xs: T[]): T[][] { export function groupByX(collections: T[], keySelector: (x: T) => string) { return collections.reduce((obj: Record, item: T) => { const key = keySelector(item); - if (!obj.hasOwnProperty(key)) { + if (!Object.prototype.hasOwnProperty.call(obj, key)) { obj[key] = []; } diff --git a/packages/backend/src/prelude/url.ts b/packages/backend/src/prelude/url.ts index c7f2b7c1e7..a4f2f7f5a8 100644 --- a/packages/backend/src/prelude/url.ts +++ b/packages/backend/src/prelude/url.ts @@ -1,4 +1,4 @@ -export function query(obj: {}): string { +export function query(obj: Record): string { const params = Object.entries(obj) .filter(([, v]) => Array.isArray(v) ? v.length : v !== undefined) .reduce((a, [k, v]) => (a[k] = v, a), {} as Record); diff --git a/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts b/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts index 3b2e4ea939..a094c39d5d 100644 --- a/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts +++ b/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts @@ -7,7 +7,7 @@ import { MoreThan, Not, IsNull } from 'typeorm'; const logger = queueLogger.createSubLogger('clean-remote-files'); -export default async function cleanRemoteFiles(job: Bull.Job<{}>, done: any): Promise { +export default async function cleanRemoteFiles(job: Bull.Job>, done: any): Promise { logger.info(`Deleting cached remote files...`); let deletedCount = 0; diff --git a/packages/backend/src/queue/processors/system/index.ts b/packages/backend/src/queue/processors/system/index.ts index 52b7868105..8460ea0a9b 100644 --- a/packages/backend/src/queue/processors/system/index.ts +++ b/packages/backend/src/queue/processors/system/index.ts @@ -3,9 +3,9 @@ import { resyncCharts } from './resync-charts'; const jobs = { resyncCharts, -} as Record | Bull.ProcessPromiseFunction<{}>>; +} as Record> | Bull.ProcessPromiseFunction>>; -export default function(dbQueue: Bull.Queue<{}>) { +export default function(dbQueue: Bull.Queue>) { for (const [k, v] of Object.entries(jobs)) { dbQueue.process(k, v); } diff --git a/packages/backend/src/queue/processors/system/resync-charts.ts b/packages/backend/src/queue/processors/system/resync-charts.ts index b36b024cfb..78a70bb981 100644 --- a/packages/backend/src/queue/processors/system/resync-charts.ts +++ b/packages/backend/src/queue/processors/system/resync-charts.ts @@ -5,7 +5,7 @@ import { driveChart, notesChart, usersChart } from '@/services/chart/index'; const logger = queueLogger.createSubLogger('resync-charts'); -export default async function resyncCharts(job: Bull.Job<{}>, done: any): Promise { +export async function resyncCharts(job: Bull.Job>, done: any): Promise { logger.info(`Resync charts...`); // TODO: ユーザーごとのチャートも更新する diff --git a/packages/backend/src/queue/queues.ts b/packages/backend/src/queue/queues.ts index a66a7ca451..b1d790fcb1 100644 --- a/packages/backend/src/queue/queues.ts +++ b/packages/backend/src/queue/queues.ts @@ -2,7 +2,7 @@ import config from '@/config/index'; import { initialize as initializeQueue } from './initialize'; import { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData } from './types'; -export const systemQueue = initializeQueue<{}>('system'); +export const systemQueue = initializeQueue>('system'); export const deliverQueue = initializeQueue('deliver', config.deliverJobPerSec || 128); export const inboxQueue = initializeQueue('inbox', config.inboxJobPerSec || 16); export const dbQueue = initializeQueue('db'); diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index 39cab29966..c8c7147152 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -33,7 +33,7 @@ export type DbUserImportJobData = { fileId: DriveFile['id']; }; -export type ObjectStorageJobData = ObjectStorageFileJobData | {}; +export type ObjectStorageJobData = ObjectStorageFileJobData | Record; export type ObjectStorageFileJobData = { key: string; diff --git a/packages/backend/src/remote/activitypub/models/person.ts b/packages/backend/src/remote/activitypub/models/person.ts index eb8c00a10b..95db46bff2 100644 --- a/packages/backend/src/remote/activitypub/models/person.ts +++ b/packages/backend/src/remote/activitypub/models/person.ts @@ -274,7 +274,7 @@ export async function createPerson(uri: string, resolver?: Resolver): Promise { +export async function updatePerson(uri: string, resolver?: Resolver | null, hint?: Record): Promise { if (typeof uri !== 'string') throw new Error('uri is not string'); // URIがこのサーバーを指しているならスキップ diff --git a/packages/backend/src/remote/activitypub/renderer/ordered-collection.ts b/packages/backend/src/remote/activitypub/renderer/ordered-collection.ts index 68870a0ecd..c4b4337af8 100644 --- a/packages/backend/src/remote/activitypub/renderer/ordered-collection.ts +++ b/packages/backend/src/remote/activitypub/renderer/ordered-collection.ts @@ -6,7 +6,7 @@ * @param last URL of last page (optional) * @param orderedItems attached objects (optional) */ -export default function(id: string | null, totalItems: any, first?: string, last?: string, orderedItems?: object) { +export default function(id: string | null, totalItems: any, first?: string, last?: string, orderedItems?: Record) { const page: any = { id, type: 'OrderedCollection', diff --git a/packages/backend/src/server/api/define.ts b/packages/backend/src/server/api/define.ts index 4bd8f95e31..48253e78e0 100644 --- a/packages/backend/src/server/api/define.ts +++ b/packages/backend/src/server/api/define.ts @@ -20,7 +20,7 @@ type SimpleUserInfo = { }; type Params = { - [P in keyof T['params']]: NonNullable[P]['transform'] extends Function + [P in keyof T['params']]: NonNullable[P]['transform'] extends () => any ? ReturnType[P]['transform']> : NonNullable[P]['default'] extends null | number | string ? NonOptional[P]['validator']['get']>[0]> @@ -30,7 +30,7 @@ type Params = { export type Response = Record | void; type executor = - (params: Params, user: T['requireCredential'] extends true ? SimpleUserInfo : SimpleUserInfo | null, token: AccessToken | null, file?: any, cleanup?: Function) => + (params: Params, user: T['requireCredential'] extends true ? SimpleUserInfo : SimpleUserInfo | null, token: AccessToken | null, file?: any, cleanup?: () => any) => Promise>>; export default function (meta: T, cb: executor) @@ -74,7 +74,7 @@ function getParams(defs: T, params: any): [Params, A }); return true; } else { - if (v === undefined && def.hasOwnProperty('default')) { + if (v === undefined && Object.prototype.hasOwnProperty.call(def, 'default')) { x[k] = def.default; } else { x[k] = v; diff --git a/packages/backend/src/server/api/endpoints/channels/update.ts b/packages/backend/src/server/api/endpoints/channels/update.ts index 9b447bd04b..05f279d6ac 100644 --- a/packages/backend/src/server/api/endpoints/channels/update.ts +++ b/packages/backend/src/server/api/endpoints/channels/update.ts @@ -69,7 +69,7 @@ export default define(meta, async (ps, me) => { throw new ApiError(meta.errors.accessDenied); } - // tslint:disable-next-line:no-unnecessary-initializer + // eslint:disable-next-line:no-unnecessary-initializer let banner = undefined; if (ps.bannerId != null) { banner = await DriveFiles.findOne({ diff --git a/packages/backend/src/server/api/endpoints/i/2fa/key-done.ts b/packages/backend/src/server/api/endpoints/i/2fa/key-done.ts index b4d3af235a..e06d0a9f68 100644 --- a/packages/backend/src/server/api/endpoints/i/2fa/key-done.ts +++ b/packages/backend/src/server/api/endpoints/i/2fa/key-done.ts @@ -75,7 +75,7 @@ export default define(meta, async (ps, user) => { const flags = attestation.authData[32]; - // tslint:disable-next-line:no-bitwise + // eslint:disable-next-line:no-bitwise if (!(flags & 1)) { throw new Error('user not present'); } diff --git a/packages/backend/src/server/api/limiter.ts b/packages/backend/src/server/api/limiter.ts index 1e2fe5bcb3..82a8613c90 100644 --- a/packages/backend/src/server/api/limiter.ts +++ b/packages/backend/src/server/api/limiter.ts @@ -10,16 +10,16 @@ const logger = new Logger('limiter'); export default (endpoint: IEndpoint, user: User) => new Promise((ok, reject) => { const limitation = endpoint.meta.limit!; - const key = limitation.hasOwnProperty('key') + const key = Object.prototype.hasOwnProperty.call(limitation, 'key') ? limitation.key : endpoint.name; const hasShortTermLimit = - limitation.hasOwnProperty('minInterval'); + Object.prototype.hasOwnProperty.call(limitation, 'minInterval'); const hasLongTermLimit = - limitation.hasOwnProperty('duration') && - limitation.hasOwnProperty('max'); + Object.prototype.hasOwnProperty.call(limitation, 'duration') && + Object.prototype.hasOwnProperty.call(limitation, 'max'); if (hasShortTermLimit) { min(); diff --git a/packages/backend/src/server/api/stream/channels/games/reversi.ts b/packages/backend/src/server/api/stream/channels/games/reversi.ts index 3b89aac35c..399750c26a 100644 --- a/packages/backend/src/server/api/stream/channels/games/reversi.ts +++ b/packages/backend/src/server/api/stream/channels/games/reversi.ts @@ -19,7 +19,7 @@ export default class extends Channel { @autobind public async onMessage(type: string, body: any) { switch (type) { - case 'ping': + case 'ping': { if (body.id == null) return; const matching = await ReversiMatchings.findOne({ parentId: this.user!.id, @@ -28,6 +28,7 @@ export default class extends Channel { if (matching == null) return; publishMainStream(matching.childId, 'reversiInvited', await ReversiMatchings.pack(matching, { id: matching.childId })); break; + } } } } diff --git a/packages/backend/src/server/api/stream/types.ts b/packages/backend/src/server/api/stream/types.ts index 70eb5c5ce5..f4302f64a0 100644 --- a/packages/backend/src/server/api/stream/types.ts +++ b/packages/backend/src/server/api/stream/types.ts @@ -31,7 +31,7 @@ export interface BroadcastTypes { } export interface UserStreamTypes { - terminate: {}; + terminate: Record; followChannel: Channel; unfollowChannel: Channel; updateUserProfile: UserProfile; diff --git a/packages/backend/src/services/chart/core.ts b/packages/backend/src/services/chart/core.ts index c0d3280c2b..78b7dd1359 100644 --- a/packages/backend/src/services/chart/core.ts +++ b/packages/backend/src/services/chart/core.ts @@ -70,7 +70,7 @@ export default abstract class Chart> { @autobind private static convertSchemaToFlatColumnDefinitions(schema: SimpleSchema) { - const columns = {} as any; + const columns = {} as Record; const flatColumns = (x: Obj, path?: string) => { for (const [k, v] of Object.entries(x)) { const p = path ? `${path}${this.columnDot}${k}` : k; @@ -93,8 +93,8 @@ export default abstract class Chart> { } @autobind - private static convertFlattenColumnsToObject(x: Record): Record { - const obj = {} as any; + private static convertFlattenColumnsToObject(x: Record): Record { + const obj = {} as Record; for (const k of Object.keys(x).filter(k => k.startsWith(Chart.columnPrefix))) { // now k is ___x_y_z const path = k.substr(Chart.columnPrefix.length).split(Chart.columnDot).join('.'); @@ -104,7 +104,7 @@ export default abstract class Chart> { } @autobind - private static convertObjectToFlattenColumns(x: Record) { + private static convertObjectToFlattenColumns(x: Record) { const columns = {} as Record; const flatten = (x: Obj, path?: string) => { for (const [k, v] of Object.entries(x)) { @@ -121,9 +121,9 @@ export default abstract class Chart> { } @autobind - private static countUniqueFields(x: Record) { + private static countUniqueFields(x: Record) { const exec = (x: Obj) => { - const res = {} as Record; + const res = {} as Record; for (const [k, v] of Object.entries(x)) { if (typeof v === 'object' && !Array.isArray(v)) { res[k] = exec(v); @@ -140,7 +140,7 @@ export default abstract class Chart> { @autobind private static convertQuery(diff: Record) { - const query: Record = {}; + const query: Record string> = {}; for (const [k, v] of Object.entries(diff)) { if (typeof v === 'number') { @@ -337,7 +337,7 @@ export default abstract class Chart> { } @autobind - public async save() { + public async save(): Promise { if (this.buffer.length === 0) { logger.info(`${this.name}: Write skipped`); return; diff --git a/packages/backend/src/services/stream.ts b/packages/backend/src/services/stream.ts index 2c308a1b54..0901857c33 100644 --- a/packages/backend/src/services/stream.ts +++ b/packages/backend/src/services/stream.ts @@ -37,74 +37,74 @@ class Publisher { channel: channel, message: message })); - } + }; public publishInternalEvent = (type: K, value?: InternalStreamTypes[K]): void => { this.publish('internal', type, typeof value === 'undefined' ? null : value); - } + }; public publishUserEvent = (userId: User['id'], type: K, value?: UserStreamTypes[K]): void => { this.publish(`user:${userId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishBroadcastStream = (type: K, value?: BroadcastTypes[K]): void => { this.publish('broadcast', type, typeof value === 'undefined' ? null : value); - } + }; public publishMainStream = (userId: User['id'], type: K, value?: MainStreamTypes[K]): void => { this.publish(`mainStream:${userId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishDriveStream = (userId: User['id'], type: K, value?: DriveStreamTypes[K]): void => { this.publish(`driveStream:${userId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishNoteStream = (noteId: Note['id'], type: K, value?: NoteStreamTypes[K]): void => { this.publish(`noteStream:${noteId}`, type, { id: noteId, body: value }); - } + }; public publishChannelStream = (channelId: Channel['id'], type: K, value?: ChannelStreamTypes[K]): void => { this.publish(`channelStream:${channelId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishUserListStream = (listId: UserList['id'], type: K, value?: UserListStreamTypes[K]): void => { this.publish(`userListStream:${listId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishAntennaStream = (antennaId: Antenna['id'], type: K, value?: AntennaStreamTypes[K]): void => { this.publish(`antennaStream:${antennaId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishMessagingStream = (userId: User['id'], otherpartyId: User['id'], type: K, value?: MessagingStreamTypes[K]): void => { this.publish(`messagingStream:${userId}-${otherpartyId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishGroupMessagingStream = (groupId: UserGroup['id'], type: K, value?: GroupMessagingStreamTypes[K]): void => { this.publish(`messagingStream:${groupId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishMessagingIndexStream = (userId: User['id'], type: K, value?: MessagingIndexStreamTypes[K]): void => { this.publish(`messagingIndexStream:${userId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishReversiStream = (userId: User['id'], type: K, value?: ReversiStreamTypes[K]): void => { this.publish(`reversiStream:${userId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishReversiGameStream = (gameId: ReversiGame['id'], type: K, value?: ReversiGameStreamTypes[K]): void => { this.publish(`reversiGameStream:${gameId}`, type, typeof value === 'undefined' ? null : value); - } + }; public publishNotesStream = (note: Packed<'Note'>): void => { this.publish('notesStream', null, note); - } + }; public publishAdminStream = (userId: User['id'], type: K, value?: AdminStreamTypes[K]): void => { this.publish(`adminStream:${userId}`, type, typeof value === 'undefined' ? null : value); - } + }; } const publisher = new Publisher(); -- cgit v1.2.3-freya From b404ce463fe6386f35f0a66ede6871082f1540bf Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 12 Nov 2021 15:08:36 +0900 Subject: refactor --- packages/backend/src/queue/processors/db/export-notes.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'packages/backend/src/queue') diff --git a/packages/backend/src/queue/processors/db/export-notes.ts b/packages/backend/src/queue/processors/db/export-notes.ts index 49850aa706..bd178556df 100644 --- a/packages/backend/src/queue/processors/db/export-notes.ts +++ b/packages/backend/src/queue/processors/db/export-notes.ts @@ -46,18 +46,18 @@ export async function exportNotes(job: Bull.Job, done: any): Prom }); let exportedNotesCount = 0; - let cursor: any = null; + let cursor: Note['id'] | null = null; while (true) { const notes = await Notes.find({ where: { userId: user.id, - ...(cursor ? { id: MoreThan(cursor) } : {}) + ...(cursor ? { id: MoreThan(cursor) } : {}), }, take: 100, order: { - id: 1 - } + id: 1, + }, }); if (notes.length === 0) { @@ -115,7 +115,7 @@ export async function exportNotes(job: Bull.Job, done: any): Prom done(); } -function serialize(note: Note, poll: Poll | null = null): any { +function serialize(note: Note, poll: Poll | null = null): Record { return { id: note.id, text: note.text, @@ -128,6 +128,6 @@ function serialize(note: Note, poll: Poll | null = null): any { viaMobile: note.viaMobile, visibility: note.visibility, visibleUserIds: note.visibleUserIds, - localOnly: note.localOnly + localOnly: note.localOnly, }; } -- cgit v1.2.3-freya From 6496835515f12c8221e6e852edba1e0ba4fdf663 Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 12 Nov 2021 15:11:20 +0900 Subject: viaMobileフラグ廃止 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Close #7965 --- .../backend/migration/1636697408073-remove-via-mobile.js | 13 +++++++++++++ packages/backend/src/models/entities/note.ts | 5 ----- packages/backend/src/models/repositories/note.ts | 5 ----- packages/backend/src/queue/processors/db/export-notes.ts | 1 - packages/backend/src/remote/activitypub/models/note.ts | 1 - packages/backend/src/server/api/endpoints/notes/create.ts | 6 ------ packages/backend/src/services/note/create.ts | 3 --- packages/client/src/components/note-detailed.vue | 1 - packages/client/src/components/note-header.vue | 5 ----- packages/client/src/components/post-form.vue | 2 -- packages/client/src/ui/chat/note-header.vue | 5 ----- packages/client/src/ui/chat/post-form.vue | 2 -- 12 files changed, 13 insertions(+), 36 deletions(-) create mode 100644 packages/backend/migration/1636697408073-remove-via-mobile.js (limited to 'packages/backend/src/queue') diff --git a/packages/backend/migration/1636697408073-remove-via-mobile.js b/packages/backend/migration/1636697408073-remove-via-mobile.js new file mode 100644 index 0000000000..bb5157cf1d --- /dev/null +++ b/packages/backend/migration/1636697408073-remove-via-mobile.js @@ -0,0 +1,13 @@ +const { MigrationInterface, QueryRunner } = require("typeorm"); + +module.exports = class removeViaMobile1636697408073 { + name = 'removeViaMobile1636697408073' + + async up(queryRunner) { + await queryRunner.query(`ALTER TABLE "note" DROP COLUMN "viaMobile"`); + } + + async down(queryRunner) { + await queryRunner.query(`ALTER TABLE "note" ADD "viaMobile" boolean NOT NULL DEFAULT false`); + } +} diff --git a/packages/backend/src/models/entities/note.ts b/packages/backend/src/models/entities/note.ts index 4a5411f93d..9dee25ea2a 100644 --- a/packages/backend/src/models/entities/note.ts +++ b/packages/backend/src/models/entities/note.ts @@ -81,11 +81,6 @@ export class Note { @JoinColumn() public user: User | null; - @Column('boolean', { - default: false - }) - public viaMobile: boolean; - @Column('boolean', { default: false }) diff --git a/packages/backend/src/models/repositories/note.ts b/packages/backend/src/models/repositories/note.ts index 0f00c34c9c..c076cb31e8 100644 --- a/packages/backend/src/models/repositories/note.ts +++ b/packages/backend/src/models/repositories/note.ts @@ -230,7 +230,6 @@ export class NoteRepository extends Repository { visibility: note.visibility, localOnly: note.localOnly || undefined, visibleUserIds: note.visibility === 'specified' ? note.visibleUserIds : undefined, - viaMobile: note.viaMobile || undefined, renoteCount: note.renoteCount, repliesCount: note.repliesCount, reactions: convertLegacyReactions(note.reactions), @@ -377,10 +376,6 @@ export const packedNoteSchema = { optional: true as const, nullable: true as const, ref: 'Note' as const, }, - viaMobile: { - type: 'boolean' as const, - optional: true as const, nullable: false as const, - }, isHidden: { type: 'boolean' as const, optional: true as const, nullable: false as const, diff --git a/packages/backend/src/queue/processors/db/export-notes.ts b/packages/backend/src/queue/processors/db/export-notes.ts index bd178556df..761f4d827b 100644 --- a/packages/backend/src/queue/processors/db/export-notes.ts +++ b/packages/backend/src/queue/processors/db/export-notes.ts @@ -125,7 +125,6 @@ function serialize(note: Note, poll: Poll | null = null): Record { reply, renote, cw: ps.cw, - viaMobile: ps.viaMobile, localOnly: ps.localOnly, visibility: ps.visibility, visibleUsers, diff --git a/packages/backend/src/services/note/create.ts b/packages/backend/src/services/note/create.ts index 69d854ab1a..8f6c2fe3a5 100644 --- a/packages/backend/src/services/note/create.ts +++ b/packages/backend/src/services/note/create.ts @@ -98,7 +98,6 @@ type Option = { renote?: Note | null; files?: DriveFile[] | null; poll?: IPoll | null; - viaMobile?: boolean | null; localOnly?: boolean | null; cw?: string | null; visibility?: string; @@ -131,7 +130,6 @@ export default async (user: { id: User['id']; username: User['username']; host: if (data.createdAt == null) data.createdAt = new Date(); if (data.visibility == null) data.visibility = 'public'; - if (data.viaMobile == null) data.viaMobile = false; if (data.localOnly == null) data.localOnly = false; if (data.channel != null) data.visibility = 'public'; if (data.channel != null) data.visibleUsers = []; @@ -478,7 +476,6 @@ async function insertNote(user: { id: User['id']; host: User['host']; }, data: O tags: tags.map(tag => normalizeForSearch(tag)), emojis, userId: user.id, - viaMobile: data.viaMobile!, localOnly: data.localOnly!, visibility: data.visibility as any, visibleUserIds: data.visibility == 'specified' diff --git a/packages/client/src/components/note-detailed.vue b/packages/client/src/components/note-detailed.vue index 8b6905a0e4..7550153521 100644 --- a/packages/client/src/components/note-detailed.vue +++ b/packages/client/src/components/note-detailed.vue @@ -86,7 +86,6 @@