From c934987b14dc2b0c362f2d13e0664ab275aca522 Mon Sep 17 00:00:00 2001 From: syuilo Date: Thu, 7 Mar 2019 23:07:21 +0900 Subject: Resolve #4444 --- src/queue/index.ts | 153 ++++++++++------------ src/queue/processors/db/delete-drive-files.ts | 55 ++++++++ src/queue/processors/db/delete-notes.ts | 55 ++++++++ src/queue/processors/db/export-blocking.ts | 89 +++++++++++++ src/queue/processors/db/export-following.ts | 89 +++++++++++++ src/queue/processors/db/export-mute.ts | 89 +++++++++++++ src/queue/processors/db/export-notes.ts | 128 ++++++++++++++++++ src/queue/processors/db/index.ts | 20 +++ src/queue/processors/delete-drive-files.ts | 55 -------- src/queue/processors/delete-notes.ts | 55 -------- src/queue/processors/deliver.ts | 63 +++++++++ src/queue/processors/export-blocking.ts | 89 ------------- src/queue/processors/export-following.ts | 89 ------------- src/queue/processors/export-mute.ts | 89 ------------- src/queue/processors/export-notes.ts | 128 ------------------ src/queue/processors/http/deliver.ts | 64 --------- src/queue/processors/http/process-inbox.ts | 182 -------------------------- src/queue/processors/index.ts | 31 ----- src/queue/processors/process-inbox.ts | 182 ++++++++++++++++++++++++++ 19 files changed, 839 insertions(+), 866 deletions(-) create mode 100644 src/queue/processors/db/delete-drive-files.ts create mode 100644 src/queue/processors/db/delete-notes.ts create mode 100644 src/queue/processors/db/export-blocking.ts create mode 100644 src/queue/processors/db/export-following.ts create mode 100644 src/queue/processors/db/export-mute.ts create mode 100644 src/queue/processors/db/export-notes.ts create mode 100644 src/queue/processors/db/index.ts delete mode 100644 src/queue/processors/delete-drive-files.ts delete mode 100644 src/queue/processors/delete-notes.ts create mode 100644 src/queue/processors/deliver.ts delete mode 100644 src/queue/processors/export-blocking.ts delete mode 100644 src/queue/processors/export-following.ts delete mode 100644 src/queue/processors/export-mute.ts delete mode 100644 src/queue/processors/export-notes.ts delete mode 100644 src/queue/processors/http/deliver.ts delete mode 100644 src/queue/processors/http/process-inbox.ts delete mode 100644 src/queue/processors/index.ts create mode 100644 src/queue/processors/process-inbox.ts (limited to 'src/queue') diff --git a/src/queue/index.ts b/src/queue/index.ts index 351a035ada..bb3b66908d 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -1,73 +1,64 @@ -import * as Queue from 'bee-queue'; +import * as Queue from 'bull'; import * as httpSignature from 'http-signature'; import config from '../config'; import { ILocalUser } from '../models/user'; import { program } from '../argv'; -import handler from './processors'; -import { queueLogger } from './logger'; - -const enableQueue = !program.disableQueue; -const enableQueueProcessing = !program.onlyServer && enableQueue; -const queueAvailable = config.redis != null; - -const queue = initializeQueue(); - -function initializeQueue() { - if (queueAvailable && enableQueue) { - return new Queue('misskey-queue', { - redis: { - port: config.redis.port, - host: config.redis.host, - password: config.redis.pass - }, - - removeOnSuccess: true, - removeOnFailure: true, - getEvents: false, - sendEvents: false, - storeJobs: false - }); - } else { - return null; - } + +import processDeliver from './processors/deliver'; +import processInbox from './processors/process-inbox'; +import processDb from './processors/db'; + +function initializeQueue(name: string) { + return new Queue(name, config.redis != null ? { + redis: { + port: config.redis.port, + host: config.redis.host, + password: config.redis.pass, + db: 1 + } + } : null); } +const deliverQueue = initializeQueue('deliver'); +const inboxQueue = initializeQueue('inbox'); +const dbQueue = initializeQueue('db'); + export function deliver(user: ILocalUser, content: any, to: any) { - if (content == null) return; + if (content == null) return null; const data = { - type: 'deliver', user, content, to }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data) - .retries(8) - .backoff('exponential', 1000) - .save(); - } else { - return handler({ data }, () => {}); - } + return deliverQueue.add(data, { + attempts: 4, + backoff: { + type: 'exponential', + delay: 1000 + }, + removeOnComplete: true, + removeOnFail: true + }); } -export function processInbox(activity: any, signature: httpSignature.IParsedSignature) { +export function inbox(activity: any, signature: httpSignature.IParsedSignature) { const data = { - type: 'processInbox', activity: activity, signature }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data) - .retries(3) - .backoff('exponential', 500) - .save(); - } else { - return handler({ data }, () => {}); - } + return inboxQueue.add(data, { + attempts: 4, + backoff: { + type: 'exponential', + delay: 1000 + }, + removeOnComplete: true, + removeOnFail: true + }); } export function createDeleteNotesJob(user: ILocalUser) { @@ -76,11 +67,10 @@ export function createDeleteNotesJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createDeleteDriveFilesJob(user: ILocalUser) { @@ -89,11 +79,10 @@ export function createDeleteDriveFilesJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createExportNotesJob(user: ILocalUser) { @@ -102,11 +91,10 @@ export function createExportNotesJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createExportFollowingJob(user: ILocalUser) { @@ -115,11 +103,10 @@ export function createExportFollowingJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createExportMuteJob(user: ILocalUser) { @@ -128,11 +115,10 @@ export function createExportMuteJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createExportBlockingJob(user: ILocalUser) { @@ -141,24 +127,23 @@ export function createExportBlockingJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export default function() { - if (queueAvailable && enableQueueProcessing) { - queue.process(128, handler); - queueLogger.succ('Processing started'); + if (!program.onlyServer) { + deliverQueue.process(processDeliver); + inboxQueue.process(processInbox); + dbQueue.process(processDb); } - - return queue; } export function destroy() { + /* queue.destroy().then(n => { queueLogger.succ(`All job removed (${n} jobs)`); - }); + });*/ } diff --git a/src/queue/processors/db/delete-drive-files.ts b/src/queue/processors/db/delete-drive-files.ts new file mode 100644 index 0000000000..3de960a25e --- /dev/null +++ b/src/queue/processors/db/delete-drive-files.ts @@ -0,0 +1,55 @@ +import * as Bull from 'bull'; +import * as mongo from 'mongodb'; + +import { queueLogger } from '../../logger'; +import User from '../../../models/user'; +import DriveFile from '../../../models/drive-file'; +import deleteFile from '../../../services/drive/delete-file'; + +const logger = queueLogger.createSubLogger('delete-drive-files'); + +export async function deleteDriveFiles(job: Bull.Job, done: any): Promise { + logger.info(`Deleting drive files of ${job.data.user._id} ...`); + + const user = await User.findOne({ + _id: new mongo.ObjectID(job.data.user._id.toString()) + }); + + let deletedCount = 0; + let ended = false; + let cursor: any = null; + + while (!ended) { + const files = await DriveFile.find({ + userId: user._id, + ...(cursor ? { _id: { $gt: cursor } } : {}) + }, { + limit: 100, + sort: { + _id: 1 + } + }); + + if (files.length === 0) { + ended = true; + job.progress(100); + break; + } + + cursor = files[files.length - 1]._id; + + for (const file of files) { + await deleteFile(file); + deletedCount++; + } + + const total = await DriveFile.count({ + userId: user._id, + }); + + job.progress(deletedCount / total); + } + + logger.succ(`All drive files (${deletedCount}) of ${user._id} has been deleted.`); + done(); +} diff --git a/src/queue/processors/db/delete-notes.ts b/src/queue/processors/db/delete-notes.ts new file mode 100644 index 0000000000..021db8062e --- /dev/null +++ b/src/queue/processors/db/delete-notes.ts @@ -0,0 +1,55 @@ +import * as Bull from 'bull'; +import * as mongo from 'mongodb'; + +import { queueLogger } from '../../logger'; +import Note from '../../../models/note'; +import deleteNote from '../../../services/note/delete'; +import User from '../../../models/user'; + +const logger = queueLogger.createSubLogger('delete-notes'); + +export async function deleteNotes(job: Bull.Job, done: any): Promise { + logger.info(`Deleting notes of ${job.data.user._id} ...`); + + const user = await User.findOne({ + _id: new mongo.ObjectID(job.data.user._id.toString()) + }); + + let deletedCount = 0; + let ended = false; + let cursor: any = null; + + while (!ended) { + const notes = await Note.find({ + userId: user._id, + ...(cursor ? { _id: { $gt: cursor } } : {}) + }, { + limit: 100, + sort: { + _id: 1 + } + }); + + if (notes.length === 0) { + ended = true; + job.progress(100); + break; + } + + cursor = notes[notes.length - 1]._id; + + for (const note of notes) { + await deleteNote(user, note, true); + deletedCount++; + } + + const total = await Note.count({ + userId: user._id, + }); + + job.progress(deletedCount / total); + } + + logger.succ(`All notes (${deletedCount}) of ${user._id} has been deleted.`); + done(); +} diff --git a/src/queue/processors/db/export-blocking.ts b/src/queue/processors/db/export-blocking.ts new file mode 100644 index 0000000000..e56aec94ac --- /dev/null +++ b/src/queue/processors/db/export-blocking.ts @@ -0,0 +1,89 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; +import * as mongo from 'mongodb'; + +import { queueLogger } from '../../logger'; +import addFile from '../../../services/drive/add-file'; +import User from '../../../models/user'; +import dateFormat = require('dateformat'); +import Blocking from '../../../models/blocking'; +import config from '../../../config'; + +const logger = queueLogger.createSubLogger('export-blocking'); + +export async function exportBlocking(job: Bull.Job, done: any): Promise { + logger.info(`Exporting blocking of ${job.data.user._id} ...`); + + const user = await User.findOne({ + _id: new mongo.ObjectID(job.data.user._id.toString()) + }); + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + const stream = fs.createWriteStream(path, { flags: 'a' }); + + let exportedCount = 0; + let ended = false; + let cursor: any = null; + + while (!ended) { + const blockings = await Blocking.find({ + blockerId: user._id, + ...(cursor ? { _id: { $gt: cursor } } : {}) + }, { + limit: 100, + sort: { + _id: 1 + } + }); + + if (blockings.length === 0) { + ended = true; + job.progress(100); + break; + } + + cursor = blockings[blockings.length - 1]._id; + + for (const block of blockings) { + const u = await User.findOne({ _id: block.blockeeId }, { fields: { username: true, host: true } }); + const content = u.host ? `${u.username}@${u.host}` : `${u.username}@${config.host}`; + await new Promise((res, rej) => { + stream.write(content + '\n', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + exportedCount++; + } + + const total = await Blocking.count({ + blockerId: user._id, + }); + + job.progress(exportedCount / total); + } + + stream.end(); + logger.succ(`Exported to: ${path}`); + + const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; + const driveFile = await addFile(user, path, fileName); + + logger.succ(`Exported to: ${driveFile._id}`); + cleanup(); + done(); +} diff --git a/src/queue/processors/db/export-following.ts b/src/queue/processors/db/export-following.ts new file mode 100644 index 0000000000..1d8a501b78 --- /dev/null +++ b/src/queue/processors/db/export-following.ts @@ -0,0 +1,89 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; +import * as mongo from 'mongodb'; + +import { queueLogger } from '../../logger'; +import addFile from '../../../services/drive/add-file'; +import User from '../../../models/user'; +import dateFormat = require('dateformat'); +import Following from '../../../models/following'; +import config from '../../../config'; + +const logger = queueLogger.createSubLogger('export-following'); + +export async function exportFollowing(job: Bull.Job, done: any): Promise { + logger.info(`Exporting following of ${job.data.user._id} ...`); + + const user = await User.findOne({ + _id: new mongo.ObjectID(job.data.user._id.toString()) + }); + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + const stream = fs.createWriteStream(path, { flags: 'a' }); + + let exportedCount = 0; + let ended = false; + let cursor: any = null; + + while (!ended) { + const followings = await Following.find({ + followerId: user._id, + ...(cursor ? { _id: { $gt: cursor } } : {}) + }, { + limit: 100, + sort: { + _id: 1 + } + }); + + if (followings.length === 0) { + ended = true; + job.progress(100); + break; + } + + cursor = followings[followings.length - 1]._id; + + for (const following of followings) { + const u = await User.findOne({ _id: following.followeeId }, { fields: { username: true, host: true } }); + const content = u.host ? `${u.username}@${u.host}` : `${u.username}@${config.host}`; + await new Promise((res, rej) => { + stream.write(content + '\n', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + exportedCount++; + } + + const total = await Following.count({ + followerId: user._id, + }); + + job.progress(exportedCount / total); + } + + stream.end(); + logger.succ(`Exported to: ${path}`); + + const fileName = 'following-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; + const driveFile = await addFile(user, path, fileName); + + logger.succ(`Exported to: ${driveFile._id}`); + cleanup(); + done(); +} diff --git a/src/queue/processors/db/export-mute.ts b/src/queue/processors/db/export-mute.ts new file mode 100644 index 0000000000..6f2dd6df13 --- /dev/null +++ b/src/queue/processors/db/export-mute.ts @@ -0,0 +1,89 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; +import * as mongo from 'mongodb'; + +import { queueLogger } from '../../logger'; +import addFile from '../../../services/drive/add-file'; +import User from '../../../models/user'; +import dateFormat = require('dateformat'); +import Mute from '../../../models/mute'; +import config from '../../../config'; + +const logger = queueLogger.createSubLogger('export-mute'); + +export async function exportMute(job: Bull.Job, done: any): Promise { + logger.info(`Exporting mute of ${job.data.user._id} ...`); + + const user = await User.findOne({ + _id: new mongo.ObjectID(job.data.user._id.toString()) + }); + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + const stream = fs.createWriteStream(path, { flags: 'a' }); + + let exportedCount = 0; + let ended = false; + let cursor: any = null; + + while (!ended) { + const mutes = await Mute.find({ + muterId: user._id, + ...(cursor ? { _id: { $gt: cursor } } : {}) + }, { + limit: 100, + sort: { + _id: 1 + } + }); + + if (mutes.length === 0) { + ended = true; + job.progress(100); + break; + } + + cursor = mutes[mutes.length - 1]._id; + + for (const mute of mutes) { + const u = await User.findOne({ _id: mute.muteeId }, { fields: { username: true, host: true } }); + const content = u.host ? `${u.username}@${u.host}` : `${u.username}@${config.host}`; + await new Promise((res, rej) => { + stream.write(content + '\n', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + exportedCount++; + } + + const total = await Mute.count({ + muterId: user._id, + }); + + job.progress(exportedCount / total); + } + + stream.end(); + logger.succ(`Exported to: ${path}`); + + const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; + const driveFile = await addFile(user, path, fileName); + + logger.succ(`Exported to: ${driveFile._id}`); + cleanup(); + done(); +} diff --git a/src/queue/processors/db/export-notes.ts b/src/queue/processors/db/export-notes.ts new file mode 100644 index 0000000000..8f3cdc5b99 --- /dev/null +++ b/src/queue/processors/db/export-notes.ts @@ -0,0 +1,128 @@ +import * as Bull from 'bull'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; +import * as mongo from 'mongodb'; + +import { queueLogger } from '../../logger'; +import Note, { INote } from '../../../models/note'; +import addFile from '../../../services/drive/add-file'; +import User from '../../../models/user'; +import dateFormat = require('dateformat'); + +const logger = queueLogger.createSubLogger('export-notes'); + +export async function exportNotes(job: Bull.Job, done: any): Promise { + logger.info(`Exporting notes of ${job.data.user._id} ...`); + + const user = await User.findOne({ + _id: new mongo.ObjectID(job.data.user._id.toString()) + }); + + // Create temp file + const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { + tmp.file((e, path, fd, cleanup) => { + if (e) return rej(e); + res([path, cleanup]); + }); + }); + + logger.info(`Temp file is ${path}`); + + const stream = fs.createWriteStream(path, { flags: 'a' }); + + await new Promise((res, rej) => { + stream.write('[', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + + let exportedNotesCount = 0; + let ended = false; + let cursor: any = null; + + while (!ended) { + const notes = await Note.find({ + userId: user._id, + ...(cursor ? { _id: { $gt: cursor } } : {}) + }, { + limit: 100, + sort: { + _id: 1 + } + }); + + if (notes.length === 0) { + ended = true; + job.progress(100); + break; + } + + cursor = notes[notes.length - 1]._id; + + for (const note of notes) { + const content = JSON.stringify(serialize(note)); + await new Promise((res, rej) => { + stream.write(exportedNotesCount === 0 ? content : ',\n' + content, err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + exportedNotesCount++; + } + + const total = await Note.count({ + userId: user._id, + }); + + job.progress(exportedNotesCount / total); + } + + await new Promise((res, rej) => { + stream.write(']', err => { + if (err) { + logger.error(err); + rej(err); + } else { + res(); + } + }); + }); + + stream.end(); + logger.succ(`Exported to: ${path}`); + + const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.json'; + const driveFile = await addFile(user, path, fileName); + + logger.succ(`Exported to: ${driveFile._id}`); + cleanup(); + done(); +} + +function serialize(note: INote): any { + return { + id: note._id, + text: note.text, + createdAt: note.createdAt, + fileIds: note.fileIds, + replyId: note.replyId, + renoteId: note.renoteId, + poll: note.poll, + cw: note.cw, + viaMobile: note.viaMobile, + visibility: note.visibility, + visibleUserIds: note.visibleUserIds, + appId: note.appId, + geo: note.geo, + localOnly: note.localOnly + }; +} diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts new file mode 100644 index 0000000000..91d7f06a4b --- /dev/null +++ b/src/queue/processors/db/index.ts @@ -0,0 +1,20 @@ +import * as Bull from 'bull'; +import { deleteNotes } from './delete-notes'; +import { deleteDriveFiles } from './delete-drive-files'; +import { exportNotes } from './export-notes'; +import { exportFollowing } from './export-following'; +import { exportMute } from './export-mute'; +import { exportBlocking } from './export-blocking'; + +const jobs = { + deleteNotes, + deleteDriveFiles, + exportNotes, + exportFollowing, + exportMute, + exportBlocking, +} as any; + +export default function(job: Bull.Job, done: any) { + jobs[job.data.type](job, done); +} diff --git a/src/queue/processors/delete-drive-files.ts b/src/queue/processors/delete-drive-files.ts deleted file mode 100644 index 7e76aa73e6..0000000000 --- a/src/queue/processors/delete-drive-files.ts +++ /dev/null @@ -1,55 +0,0 @@ -import * as bq from 'bee-queue'; -import * as mongo from 'mongodb'; - -import { queueLogger } from '../logger'; -import User from '../../models/user'; -import DriveFile from '../../models/drive-file'; -import deleteFile from '../../services/drive/delete-file'; - -const logger = queueLogger.createSubLogger('delete-drive-files'); - -export async function deleteDriveFiles(job: bq.Job, done: any): Promise { - logger.info(`Deleting drive files of ${job.data.user._id} ...`); - - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) - }); - - let deletedCount = 0; - let ended = false; - let cursor: any = null; - - while (!ended) { - const files = await DriveFile.find({ - userId: user._id, - ...(cursor ? { _id: { $gt: cursor } } : {}) - }, { - limit: 100, - sort: { - _id: 1 - } - }); - - if (files.length === 0) { - ended = true; - if (job.reportProgress) job.reportProgress(100); - break; - } - - cursor = files[files.length - 1]._id; - - for (const file of files) { - await deleteFile(file); - deletedCount++; - } - - const total = await DriveFile.count({ - userId: user._id, - }); - - if (job.reportProgress) job.reportProgress(deletedCount / total); - } - - logger.succ(`All drive files (${deletedCount}) of ${user._id} has been deleted.`); - done(); -} diff --git a/src/queue/processors/delete-notes.ts b/src/queue/processors/delete-notes.ts deleted file mode 100644 index 13c6042b16..0000000000 --- a/src/queue/processors/delete-notes.ts +++ /dev/null @@ -1,55 +0,0 @@ -import * as bq from 'bee-queue'; -import * as mongo from 'mongodb'; - -import { queueLogger } from '../logger'; -import Note from '../../models/note'; -import deleteNote from '../../services/note/delete'; -import User from '../../models/user'; - -const logger = queueLogger.createSubLogger('delete-notes'); - -export async function deleteNotes(job: bq.Job, done: any): Promise { - logger.info(`Deleting notes of ${job.data.user._id} ...`); - - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) - }); - - let deletedCount = 0; - let ended = false; - let cursor: any = null; - - while (!ended) { - const notes = await Note.find({ - userId: user._id, - ...(cursor ? { _id: { $gt: cursor } } : {}) - }, { - limit: 100, - sort: { - _id: 1 - } - }); - - if (notes.length === 0) { - ended = true; - if (job.reportProgress) job.reportProgress(100); - break; - } - - cursor = notes[notes.length - 1]._id; - - for (const note of notes) { - await deleteNote(user, note, true); - deletedCount++; - } - - const total = await Note.count({ - userId: user._id, - }); - - if (job.reportProgress) job.reportProgress(deletedCount / total); - } - - logger.succ(`All notes (${deletedCount}) of ${user._id} has been deleted.`); - done(); -} diff --git a/src/queue/processors/deliver.ts b/src/queue/processors/deliver.ts new file mode 100644 index 0000000000..2f730c31cd --- /dev/null +++ b/src/queue/processors/deliver.ts @@ -0,0 +1,63 @@ +import * as Bull from 'bull'; +import request from '../../remote/activitypub/request'; +import { queueLogger } from '../logger'; +import { registerOrFetchInstanceDoc } from '../../services/register-or-fetch-instance-doc'; +import Instance from '../../models/instance'; +import instanceChart from '../../services/chart/instance'; + +let latest: string = null; + +export default async (job: Bull.Job, done: any): Promise => { + const { host } = new URL(job.data.to); + + try { + if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) + queueLogger.debug(`delivering ${latest}`); + + await request(job.data.user, job.data.to, job.data.content); + + // Update stats + registerOrFetchInstanceDoc(host).then(i => { + Instance.update({ _id: i._id }, { + $set: { + latestRequestSentAt: new Date(), + latestStatus: 200, + lastCommunicatedAt: new Date(), + isNotResponding: false + } + }); + + instanceChart.requestSent(i.host, true); + }); + + done(); + } catch (res) { + // Update stats + registerOrFetchInstanceDoc(host).then(i => { + Instance.update({ _id: i._id }, { + $set: { + latestRequestSentAt: new Date(), + latestStatus: res != null && res.hasOwnProperty('statusCode') ? res.statusCode : null, + isNotResponding: true + } + }); + + instanceChart.requestSent(i.host, false); + }); + + if (res != null && res.hasOwnProperty('statusCode')) { + queueLogger.warn(`deliver failed: ${res.statusCode} ${res.statusMessage} to=${job.data.to}`); + + if (res.statusCode >= 400 && res.statusCode < 500) { + // HTTPステータスコード4xxはクライアントエラーであり、それはつまり + // 何回再送しても成功することはないということなのでエラーにはしないでおく + done(); + } else { + done(res.statusMessage); + } + } else { + queueLogger.warn(`deliver failed: ${res} to=${job.data.to}`); + done(); + } + } +}; diff --git a/src/queue/processors/export-blocking.ts b/src/queue/processors/export-blocking.ts deleted file mode 100644 index b30d8e3bc8..0000000000 --- a/src/queue/processors/export-blocking.ts +++ /dev/null @@ -1,89 +0,0 @@ -import * as bq from 'bee-queue'; -import * as tmp from 'tmp'; -import * as fs from 'fs'; -import * as mongo from 'mongodb'; - -import { queueLogger } from '../logger'; -import addFile from '../../services/drive/add-file'; -import User from '../../models/user'; -import dateFormat = require('dateformat'); -import Blocking from '../../models/blocking'; -import config from '../../config'; - -const logger = queueLogger.createSubLogger('export-blocking'); - -export async function exportBlocking(job: bq.Job, done: any): Promise { - logger.info(`Exporting blocking of ${job.data.user._id} ...`); - - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) - }); - - // Create temp file - const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { - tmp.file((e, path, fd, cleanup) => { - if (e) return rej(e); - res([path, cleanup]); - }); - }); - - logger.info(`Temp file is ${path}`); - - const stream = fs.createWriteStream(path, { flags: 'a' }); - - let exportedCount = 0; - let ended = false; - let cursor: any = null; - - while (!ended) { - const blockings = await Blocking.find({ - blockerId: user._id, - ...(cursor ? { _id: { $gt: cursor } } : {}) - }, { - limit: 100, - sort: { - _id: 1 - } - }); - - if (blockings.length === 0) { - ended = true; - if (job.reportProgress) job.reportProgress(100); - break; - } - - cursor = blockings[blockings.length - 1]._id; - - for (const block of blockings) { - const u = await User.findOne({ _id: block.blockeeId }, { fields: { username: true, host: true } }); - const content = u.host ? `${u.username}@${u.host}` : `${u.username}@${config.host}`; - await new Promise((res, rej) => { - stream.write(content + '\n', err => { - if (err) { - logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - exportedCount++; - } - - const total = await Blocking.count({ - blockerId: user._id, - }); - - if (job.reportProgress) job.reportProgress(exportedCount / total); - } - - stream.end(); - logger.succ(`Exported to: ${path}`); - - const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; - const driveFile = await addFile(user, path, fileName); - - logger.succ(`Exported to: ${driveFile._id}`); - cleanup(); - done(); -} diff --git a/src/queue/processors/export-following.ts b/src/queue/processors/export-following.ts deleted file mode 100644 index e6521d0652..0000000000 --- a/src/queue/processors/export-following.ts +++ /dev/null @@ -1,89 +0,0 @@ -import * as bq from 'bee-queue'; -import * as tmp from 'tmp'; -import * as fs from 'fs'; -import * as mongo from 'mongodb'; - -import { queueLogger } from '../logger'; -import addFile from '../../services/drive/add-file'; -import User from '../../models/user'; -import dateFormat = require('dateformat'); -import Following from '../../models/following'; -import config from '../../config'; - -const logger = queueLogger.createSubLogger('export-following'); - -export async function exportFollowing(job: bq.Job, done: any): Promise { - logger.info(`Exporting following of ${job.data.user._id} ...`); - - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) - }); - - // Create temp file - const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { - tmp.file((e, path, fd, cleanup) => { - if (e) return rej(e); - res([path, cleanup]); - }); - }); - - logger.info(`Temp file is ${path}`); - - const stream = fs.createWriteStream(path, { flags: 'a' }); - - let exportedCount = 0; - let ended = false; - let cursor: any = null; - - while (!ended) { - const followings = await Following.find({ - followerId: user._id, - ...(cursor ? { _id: { $gt: cursor } } : {}) - }, { - limit: 100, - sort: { - _id: 1 - } - }); - - if (followings.length === 0) { - ended = true; - if (job.reportProgress) job.reportProgress(100); - break; - } - - cursor = followings[followings.length - 1]._id; - - for (const following of followings) { - const u = await User.findOne({ _id: following.followeeId }, { fields: { username: true, host: true } }); - const content = u.host ? `${u.username}@${u.host}` : `${u.username}@${config.host}`; - await new Promise((res, rej) => { - stream.write(content + '\n', err => { - if (err) { - logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - exportedCount++; - } - - const total = await Following.count({ - followerId: user._id, - }); - - if (job.reportProgress) job.reportProgress(exportedCount / total); - } - - stream.end(); - logger.succ(`Exported to: ${path}`); - - const fileName = 'following-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; - const driveFile = await addFile(user, path, fileName); - - logger.succ(`Exported to: ${driveFile._id}`); - cleanup(); - done(); -} diff --git a/src/queue/processors/export-mute.ts b/src/queue/processors/export-mute.ts deleted file mode 100644 index 74456c1da2..0000000000 --- a/src/queue/processors/export-mute.ts +++ /dev/null @@ -1,89 +0,0 @@ -import * as bq from 'bee-queue'; -import * as tmp from 'tmp'; -import * as fs from 'fs'; -import * as mongo from 'mongodb'; - -import { queueLogger } from '../logger'; -import addFile from '../../services/drive/add-file'; -import User from '../../models/user'; -import dateFormat = require('dateformat'); -import Mute from '../../models/mute'; -import config from '../../config'; - -const logger = queueLogger.createSubLogger('export-mute'); - -export async function exportMute(job: bq.Job, done: any): Promise { - logger.info(`Exporting mute of ${job.data.user._id} ...`); - - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) - }); - - // Create temp file - const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { - tmp.file((e, path, fd, cleanup) => { - if (e) return rej(e); - res([path, cleanup]); - }); - }); - - logger.info(`Temp file is ${path}`); - - const stream = fs.createWriteStream(path, { flags: 'a' }); - - let exportedCount = 0; - let ended = false; - let cursor: any = null; - - while (!ended) { - const mutes = await Mute.find({ - muterId: user._id, - ...(cursor ? { _id: { $gt: cursor } } : {}) - }, { - limit: 100, - sort: { - _id: 1 - } - }); - - if (mutes.length === 0) { - ended = true; - if (job.reportProgress) job.reportProgress(100); - break; - } - - cursor = mutes[mutes.length - 1]._id; - - for (const mute of mutes) { - const u = await User.findOne({ _id: mute.muteeId }, { fields: { username: true, host: true } }); - const content = u.host ? `${u.username}@${u.host}` : `${u.username}@${config.host}`; - await new Promise((res, rej) => { - stream.write(content + '\n', err => { - if (err) { - logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - exportedCount++; - } - - const total = await Mute.count({ - muterId: user._id, - }); - - if (job.reportProgress) job.reportProgress(exportedCount / total); - } - - stream.end(); - logger.succ(`Exported to: ${path}`); - - const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; - const driveFile = await addFile(user, path, fileName); - - logger.succ(`Exported to: ${driveFile._id}`); - cleanup(); - done(); -} diff --git a/src/queue/processors/export-notes.ts b/src/queue/processors/export-notes.ts deleted file mode 100644 index 32e4cd1d6c..0000000000 --- a/src/queue/processors/export-notes.ts +++ /dev/null @@ -1,128 +0,0 @@ -import * as bq from 'bee-queue'; -import * as tmp from 'tmp'; -import * as fs from 'fs'; -import * as mongo from 'mongodb'; - -import { queueLogger } from '../logger'; -import Note, { INote } from '../../models/note'; -import addFile from '../../services/drive/add-file'; -import User from '../../models/user'; -import dateFormat = require('dateformat'); - -const logger = queueLogger.createSubLogger('export-notes'); - -export async function exportNotes(job: bq.Job, done: any): Promise { - logger.info(`Exporting notes of ${job.data.user._id} ...`); - - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) - }); - - // Create temp file - const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { - tmp.file((e, path, fd, cleanup) => { - if (e) return rej(e); - res([path, cleanup]); - }); - }); - - logger.info(`Temp file is ${path}`); - - const stream = fs.createWriteStream(path, { flags: 'a' }); - - await new Promise((res, rej) => { - stream.write('[', err => { - if (err) { - logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - - let exportedNotesCount = 0; - let ended = false; - let cursor: any = null; - - while (!ended) { - const notes = await Note.find({ - userId: user._id, - ...(cursor ? { _id: { $gt: cursor } } : {}) - }, { - limit: 100, - sort: { - _id: 1 - } - }); - - if (notes.length === 0) { - ended = true; - if (job.reportProgress) job.reportProgress(100); - break; - } - - cursor = notes[notes.length - 1]._id; - - for (const note of notes) { - const content = JSON.stringify(serialize(note)); - await new Promise((res, rej) => { - stream.write(exportedNotesCount === 0 ? content : ',\n' + content, err => { - if (err) { - logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - exportedNotesCount++; - } - - const total = await Note.count({ - userId: user._id, - }); - - if (job.reportProgress) job.reportProgress(exportedNotesCount / total); - } - - await new Promise((res, rej) => { - stream.write(']', err => { - if (err) { - logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - - stream.end(); - logger.succ(`Exported to: ${path}`); - - const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.json'; - const driveFile = await addFile(user, path, fileName); - - logger.succ(`Exported to: ${driveFile._id}`); - cleanup(); - done(); -} - -function serialize(note: INote): any { - return { - id: note._id, - text: note.text, - createdAt: note.createdAt, - fileIds: note.fileIds, - replyId: note.replyId, - renoteId: note.renoteId, - poll: note.poll, - cw: note.cw, - viaMobile: note.viaMobile, - visibility: note.visibility, - visibleUserIds: note.visibleUserIds, - appId: note.appId, - geo: note.geo, - localOnly: note.localOnly - }; -} diff --git a/src/queue/processors/http/deliver.ts b/src/queue/processors/http/deliver.ts deleted file mode 100644 index 96f6cc07ce..0000000000 --- a/src/queue/processors/http/deliver.ts +++ /dev/null @@ -1,64 +0,0 @@ -import * as bq from 'bee-queue'; - -import request from '../../../remote/activitypub/request'; -import { queueLogger } from '../../logger'; -import { registerOrFetchInstanceDoc } from '../../../services/register-or-fetch-instance-doc'; -import Instance from '../../../models/instance'; -import instanceChart from '../../../services/chart/instance'; - -let latest: string = null; - -export default async (job: bq.Job, done: any): Promise => { - const { host } = new URL(job.data.to); - - try { - if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) - queueLogger.debug(`delivering ${latest}`); - - await request(job.data.user, job.data.to, job.data.content); - - // Update stats - registerOrFetchInstanceDoc(host).then(i => { - Instance.update({ _id: i._id }, { - $set: { - latestRequestSentAt: new Date(), - latestStatus: 200, - lastCommunicatedAt: new Date(), - isNotResponding: false - } - }); - - instanceChart.requestSent(i.host, true); - }); - - done(); - } catch (res) { - // Update stats - registerOrFetchInstanceDoc(host).then(i => { - Instance.update({ _id: i._id }, { - $set: { - latestRequestSentAt: new Date(), - latestStatus: res != null && res.hasOwnProperty('statusCode') ? res.statusCode : null, - isNotResponding: true - } - }); - - instanceChart.requestSent(i.host, false); - }); - - if (res != null && res.hasOwnProperty('statusCode')) { - queueLogger.warn(`deliver failed: ${res.statusCode} ${res.statusMessage} to=${job.data.to}`); - - if (res.statusCode >= 400 && res.statusCode < 500) { - // HTTPステータスコード4xxはクライアントエラーであり、それはつまり - // 何回再送しても成功することはないということなのでエラーにはしないでおく - done(); - } else { - done(res.statusMessage); - } - } else { - queueLogger.warn(`deliver failed: ${res} to=${job.data.to}`); - done(); - } - } -}; diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts deleted file mode 100644 index cc4e711d00..0000000000 --- a/src/queue/processors/http/process-inbox.ts +++ /dev/null @@ -1,182 +0,0 @@ -import * as bq from 'bee-queue'; -import * as httpSignature from 'http-signature'; -import parseAcct from '../../../misc/acct/parse'; -import User, { IRemoteUser } from '../../../models/user'; -import perform from '../../../remote/activitypub/perform'; -import { resolvePerson, updatePerson } from '../../../remote/activitypub/models/person'; -import { toUnicode } from 'punycode'; -import { URL } from 'url'; -import { publishApLogStream } from '../../../services/stream'; -import Logger from '../../../services/logger'; -import { registerOrFetchInstanceDoc } from '../../../services/register-or-fetch-instance-doc'; -import Instance from '../../../models/instance'; -import instanceChart from '../../../services/chart/instance'; - -const logger = new Logger('inbox'); - -// ユーザーのinboxにアクティビティが届いた時の処理 -export default async (job: bq.Job, done: any): Promise => { - const signature = job.data.signature; - const activity = job.data.activity; - - //#region Log - const info = Object.assign({}, activity); - delete info['@context']; - delete info['signature']; - logger.debug(JSON.stringify(info, null, 2)); - //#endregion - - const keyIdLower = signature.keyId.toLowerCase(); - let user: IRemoteUser; - - if (keyIdLower.startsWith('acct:')) { - const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); - if (host === null) { - logger.warn(`request was made by local user: @${username}`); - done(); - return; - } - - // アクティビティ内のホストの検証 - try { - ValidateActivity(activity, host); - } catch (e) { - logger.warn(e.message); - done(); - return; - } - - // ブロックしてたら中断 - // TODO: いちいちデータベースにアクセスするのはコスト高そうなのでどっかにキャッシュしておく - const instance = await Instance.findOne({ host: host.toLowerCase() }); - if (instance && instance.isBlocked) { - logger.warn(`Blocked request: ${host}`); - done(); - return; - } - - user = await User.findOne({ usernameLower: username, host: host.toLowerCase() }) as IRemoteUser; - } else { - // アクティビティ内のホストの検証 - const host = toUnicode(new URL(signature.keyId).hostname.toLowerCase()); - try { - ValidateActivity(activity, host); - } catch (e) { - logger.warn(e.message); - done(); - return; - } - - // ブロックしてたら中断 - // TODO: いちいちデータベースにアクセスするのはコスト高そうなのでどっかにキャッシュしておく - const instance = await Instance.findOne({ host: host.toLowerCase() }); - if (instance && instance.isBlocked) { - logger.warn(`Blocked request: ${host}`); - done(); - return; - } - - user = await User.findOne({ - host: { $ne: null }, - 'publicKey.id': signature.keyId - }) as IRemoteUser; - } - - // Update Person activityの場合は、ここで署名検証/更新処理まで実施して終了 - if (activity.type === 'Update') { - if (activity.object && activity.object.type === 'Person') { - if (user == null) { - logger.warn('Update activity received, but user not registed.'); - } else if (!httpSignature.verifySignature(signature, user.publicKey.publicKeyPem)) { - logger.warn('Update activity received, but signature verification failed.'); - } else { - updatePerson(activity.actor, null, activity.object); - } - done(); - return; - } - } - - // アクティビティを送信してきたユーザーがまだMisskeyサーバーに登録されていなかったら登録する - if (user === null) { - user = await resolvePerson(activity.actor) as IRemoteUser; - } - - if (user === null) { - done(new Error('failed to resolve user')); - return; - } - - if (!httpSignature.verifySignature(signature, user.publicKey.publicKeyPem)) { - logger.error('signature verification failed'); - done(); - return; - } - - //#region Log - publishApLogStream({ - direction: 'in', - activity: activity.type, - host: user.host, - actor: user.username - }); - //#endregion - - // Update stats - registerOrFetchInstanceDoc(user.host).then(i => { - Instance.update({ _id: i._id }, { - $set: { - latestRequestReceivedAt: new Date(), - lastCommunicatedAt: new Date(), - isNotResponding: false - } - }); - - instanceChart.requestReceived(i.host); - }); - - // アクティビティを処理 - try { - await perform(user, activity); - done(); - } catch (e) { - done(e); - } -}; - -/** - * Validate host in activity - * @param activity Activity - * @param host Expect host - */ -function ValidateActivity(activity: any, host: string) { - // id (if exists) - if (typeof activity.id === 'string') { - const uriHost = toUnicode(new URL(activity.id).hostname.toLowerCase()); - if (host !== uriHost) { - const diag = activity.signature ? '. Has LD-Signature. Forwarded?' : ''; - throw new Error(`activity.id(${activity.id}) has different host(${host})${diag}`); - } - } - - // actor (if exists) - if (typeof activity.actor === 'string') { - const uriHost = toUnicode(new URL(activity.actor).hostname.toLowerCase()); - if (host !== uriHost) throw new Error('activity.actor has different host'); - } - - // For Create activity - if (activity.type === 'Create' && activity.object) { - // object.id (if exists) - if (typeof activity.object.id === 'string') { - const uriHost = toUnicode(new URL(activity.object.id).hostname.toLowerCase()); - if (host !== uriHost) throw new Error('activity.object.id has different host'); - } - - // object.attributedTo (if exists) - if (typeof activity.object.attributedTo === 'string') { - const uriHost = toUnicode(new URL(activity.object.attributedTo).hostname.toLowerCase()); - if (host !== uriHost) throw new Error('activity.object.attributedTo has different host'); - } - } -} diff --git a/src/queue/processors/index.ts b/src/queue/processors/index.ts deleted file mode 100644 index 31e87c3f67..0000000000 --- a/src/queue/processors/index.ts +++ /dev/null @@ -1,31 +0,0 @@ -import deliver from './http/deliver'; -import processInbox from './http/process-inbox'; -import { deleteNotes } from './delete-notes'; -import { deleteDriveFiles } from './delete-drive-files'; -import { exportNotes } from './export-notes'; -import { exportFollowing } from './export-following'; -import { exportMute } from './export-mute'; -import { exportBlocking } from './export-blocking'; -import { queueLogger } from '../logger'; - -const handlers: any = { - deliver, - processInbox, - deleteNotes, - deleteDriveFiles, - exportNotes, - exportFollowing, - exportMute, - exportBlocking, -}; - -export default (job: any, done: any) => { - const handler = handlers[job.data.type]; - - if (handler) { - handler(job, done); - } else { - queueLogger.error(`Unknown job: ${job.data.type}`); - done(); - } -}; diff --git a/src/queue/processors/process-inbox.ts b/src/queue/processors/process-inbox.ts new file mode 100644 index 0000000000..21292921f0 --- /dev/null +++ b/src/queue/processors/process-inbox.ts @@ -0,0 +1,182 @@ +import * as Bull from 'bull'; +import * as httpSignature from 'http-signature'; +import parseAcct from '../../misc/acct/parse'; +import User, { IRemoteUser } from '../../models/user'; +import perform from '../../remote/activitypub/perform'; +import { resolvePerson, updatePerson } from '../../remote/activitypub/models/person'; +import { toUnicode } from 'punycode'; +import { URL } from 'url'; +import { publishApLogStream } from '../../services/stream'; +import Logger from '../../services/logger'; +import { registerOrFetchInstanceDoc } from '../../services/register-or-fetch-instance-doc'; +import Instance from '../../models/instance'; +import instanceChart from '../../services/chart/instance'; + +const logger = new Logger('inbox'); + +// ユーザーのinboxにアクティビティが届いた時の処理 +export default async (job: Bull.Job, done: any): Promise => { + const signature = job.data.signature; + const activity = job.data.activity; + + //#region Log + const info = Object.assign({}, activity); + delete info['@context']; + delete info['signature']; + logger.debug(JSON.stringify(info, null, 2)); + //#endregion + + const keyIdLower = signature.keyId.toLowerCase(); + let user: IRemoteUser; + + if (keyIdLower.startsWith('acct:')) { + const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); + if (host === null) { + logger.warn(`request was made by local user: @${username}`); + done(); + return; + } + + // アクティビティ内のホストの検証 + try { + ValidateActivity(activity, host); + } catch (e) { + logger.warn(e.message); + done(); + return; + } + + // ブロックしてたら中断 + // TODO: いちいちデータベースにアクセスするのはコスト高そうなのでどっかにキャッシュしておく + const instance = await Instance.findOne({ host: host.toLowerCase() }); + if (instance && instance.isBlocked) { + logger.warn(`Blocked request: ${host}`); + done(); + return; + } + + user = await User.findOne({ usernameLower: username, host: host.toLowerCase() }) as IRemoteUser; + } else { + // アクティビティ内のホストの検証 + const host = toUnicode(new URL(signature.keyId).hostname.toLowerCase()); + try { + ValidateActivity(activity, host); + } catch (e) { + logger.warn(e.message); + done(); + return; + } + + // ブロックしてたら中断 + // TODO: いちいちデータベースにアクセスするのはコスト高そうなのでどっかにキャッシュしておく + const instance = await Instance.findOne({ host: host.toLowerCase() }); + if (instance && instance.isBlocked) { + logger.warn(`Blocked request: ${host}`); + done(); + return; + } + + user = await User.findOne({ + host: { $ne: null }, + 'publicKey.id': signature.keyId + }) as IRemoteUser; + } + + // Update Person activityの場合は、ここで署名検証/更新処理まで実施して終了 + if (activity.type === 'Update') { + if (activity.object && activity.object.type === 'Person') { + if (user == null) { + logger.warn('Update activity received, but user not registed.'); + } else if (!httpSignature.verifySignature(signature, user.publicKey.publicKeyPem)) { + logger.warn('Update activity received, but signature verification failed.'); + } else { + updatePerson(activity.actor, null, activity.object); + } + done(); + return; + } + } + + // アクティビティを送信してきたユーザーがまだMisskeyサーバーに登録されていなかったら登録する + if (user === null) { + user = await resolvePerson(activity.actor) as IRemoteUser; + } + + if (user === null) { + done(new Error('failed to resolve user')); + return; + } + + if (!httpSignature.verifySignature(signature, user.publicKey.publicKeyPem)) { + logger.error('signature verification failed'); + done(); + return; + } + + //#region Log + publishApLogStream({ + direction: 'in', + activity: activity.type, + host: user.host, + actor: user.username + }); + //#endregion + + // Update stats + registerOrFetchInstanceDoc(user.host).then(i => { + Instance.update({ _id: i._id }, { + $set: { + latestRequestReceivedAt: new Date(), + lastCommunicatedAt: new Date(), + isNotResponding: false + } + }); + + instanceChart.requestReceived(i.host); + }); + + // アクティビティを処理 + try { + await perform(user, activity); + done(); + } catch (e) { + done(e); + } +}; + +/** + * Validate host in activity + * @param activity Activity + * @param host Expect host + */ +function ValidateActivity(activity: any, host: string) { + // id (if exists) + if (typeof activity.id === 'string') { + const uriHost = toUnicode(new URL(activity.id).hostname.toLowerCase()); + if (host !== uriHost) { + const diag = activity.signature ? '. Has LD-Signature. Forwarded?' : ''; + throw new Error(`activity.id(${activity.id}) has different host(${host})${diag}`); + } + } + + // actor (if exists) + if (typeof activity.actor === 'string') { + const uriHost = toUnicode(new URL(activity.actor).hostname.toLowerCase()); + if (host !== uriHost) throw new Error('activity.actor has different host'); + } + + // For Create activity + if (activity.type === 'Create' && activity.object) { + // object.id (if exists) + if (typeof activity.object.id === 'string') { + const uriHost = toUnicode(new URL(activity.object.id).hostname.toLowerCase()); + if (host !== uriHost) throw new Error('activity.object.id has different host'); + } + + // object.attributedTo (if exists) + if (typeof activity.object.attributedTo === 'string') { + const uriHost = toUnicode(new URL(activity.object.attributedTo).hostname.toLowerCase()); + if (host !== uriHost) throw new Error('activity.object.attributedTo has different host'); + } + } +} -- cgit v1.2.3-freya