diff options
Diffstat (limited to 'src/queue/processors')
| -rw-r--r-- | src/queue/processors/db/delete-drive-files.ts | 34 | ||||
| -rw-r--r-- | src/queue/processors/db/delete-notes.ts | 55 | ||||
| -rw-r--r-- | src/queue/processors/db/export-blocking.ts | 36 | ||||
| -rw-r--r-- | src/queue/processors/db/export-following.ts | 36 | ||||
| -rw-r--r-- | src/queue/processors/db/export-mute.ts | 36 | ||||
| -rw-r--r-- | src/queue/processors/db/export-notes.ts | 48 | ||||
| -rw-r--r-- | src/queue/processors/db/export-user-lists.ts | 29 | ||||
| -rw-r--r-- | src/queue/processors/db/import-following.ts | 27 | ||||
| -rw-r--r-- | src/queue/processors/db/import-user-lists.ts | 41 | ||||
| -rw-r--r-- | src/queue/processors/db/index.ts | 2 | ||||
| -rw-r--r-- | src/queue/processors/deliver.ts | 26 | ||||
| -rw-r--r-- | src/queue/processors/inbox.ts | 52 |
12 files changed, 180 insertions, 242 deletions
diff --git a/src/queue/processors/db/delete-drive-files.ts b/src/queue/processors/db/delete-drive-files.ts index 3de960a25e..5f347fb588 100644 --- a/src/queue/processors/db/delete-drive-files.ts +++ b/src/queue/processors/db/delete-drive-files.ts @@ -1,18 +1,17 @@ import * as Bull from 'bull'; -import * as mongo from 'mongodb'; import { queueLogger } from '../../logger'; -import User from '../../../models/user'; -import DriveFile from '../../../models/drive-file'; import deleteFile from '../../../services/drive/delete-file'; +import { Users, DriveFiles } from '../../../models'; +import { MoreThan } from 'typeorm'; const logger = queueLogger.createSubLogger('delete-drive-files'); export async function deleteDriveFiles(job: Bull.Job, done: any): Promise<void> { - logger.info(`Deleting drive files of ${job.data.user._id} ...`); + logger.info(`Deleting drive files of ${job.data.user.id} ...`); - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) + const user = await Users.findOne({ + id: job.data.user.id }); let deletedCount = 0; @@ -20,13 +19,14 @@ export async function deleteDriveFiles(job: Bull.Job, done: any): Promise<void> let cursor: any = null; while (!ended) { - const files = await DriveFile.find({ - userId: user._id, - ...(cursor ? { _id: { $gt: cursor } } : {}) - }, { - limit: 100, - sort: { - _id: 1 + const files = await DriveFiles.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 } }); @@ -36,20 +36,20 @@ export async function deleteDriveFiles(job: Bull.Job, done: any): Promise<void> break; } - cursor = files[files.length - 1]._id; + cursor = files[files.length - 1].id; for (const file of files) { await deleteFile(file); deletedCount++; } - const total = await DriveFile.count({ - userId: user._id, + const total = await DriveFiles.count({ + userId: user.id, }); job.progress(deletedCount / total); } - logger.succ(`All drive files (${deletedCount}) of ${user._id} has been deleted.`); + logger.succ(`All drive files (${deletedCount}) of ${user.id} has been deleted.`); done(); } diff --git a/src/queue/processors/db/delete-notes.ts b/src/queue/processors/db/delete-notes.ts deleted file mode 100644 index 021db8062e..0000000000 --- a/src/queue/processors/db/delete-notes.ts +++ /dev/null @@ -1,55 +0,0 @@ -import * as Bull from 'bull'; -import * as mongo from 'mongodb'; - -import { queueLogger } from '../../logger'; -import Note from '../../../models/note'; -import deleteNote from '../../../services/note/delete'; -import User from '../../../models/user'; - -const logger = queueLogger.createSubLogger('delete-notes'); - -export async function deleteNotes(job: Bull.Job, done: any): Promise<void> { - logger.info(`Deleting notes of ${job.data.user._id} ...`); - - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) - }); - - let deletedCount = 0; - let ended = false; - let cursor: any = null; - - while (!ended) { - const notes = await Note.find({ - userId: user._id, - ...(cursor ? { _id: { $gt: cursor } } : {}) - }, { - limit: 100, - sort: { - _id: 1 - } - }); - - if (notes.length === 0) { - ended = true; - job.progress(100); - break; - } - - cursor = notes[notes.length - 1]._id; - - for (const note of notes) { - await deleteNote(user, note, true); - deletedCount++; - } - - const total = await Note.count({ - userId: user._id, - }); - - job.progress(deletedCount / total); - } - - logger.succ(`All notes (${deletedCount}) of ${user._id} has been deleted.`); - done(); -} diff --git a/src/queue/processors/db/export-blocking.ts b/src/queue/processors/db/export-blocking.ts index 7f32c06472..c12aa4fca3 100644 --- a/src/queue/processors/db/export-blocking.ts +++ b/src/queue/processors/db/export-blocking.ts @@ -1,22 +1,21 @@ import * as Bull from 'bull'; import * as tmp from 'tmp'; import * as fs from 'fs'; -import * as mongo from 'mongodb'; import { queueLogger } from '../../logger'; import addFile from '../../../services/drive/add-file'; -import User from '../../../models/user'; import dateFormat = require('dateformat'); -import Blocking from '../../../models/blocking'; import { getFullApAccount } from '../../../misc/convert-host'; +import { Users, Blockings } from '../../../models'; +import { MoreThan } from 'typeorm'; const logger = queueLogger.createSubLogger('export-blocking'); export async function exportBlocking(job: Bull.Job, done: any): Promise<void> { - logger.info(`Exporting blocking of ${job.data.user._id} ...`); + logger.info(`Exporting blocking of ${job.data.user.id} ...`); - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) + const user = await Users.findOne({ + id: job.data.user.id }); // Create temp file @@ -36,13 +35,14 @@ export async function exportBlocking(job: Bull.Job, done: any): Promise<void> { let cursor: any = null; while (!ended) { - const blockings = await Blocking.find({ - blockerId: user._id, - ...(cursor ? { _id: { $gt: cursor } } : {}) - }, { - limit: 100, - sort: { - _id: 1 + const blockings = await Blockings.find({ + where: { + blockerId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 } }); @@ -52,10 +52,10 @@ export async function exportBlocking(job: Bull.Job, done: any): Promise<void> { break; } - cursor = blockings[blockings.length - 1]._id; + cursor = blockings[blockings.length - 1].id; for (const block of blockings) { - const u = await User.findOne({ _id: block.blockeeId }, { fields: { username: true, host: true } }); + const u = await Users.findOne({ id: block.blockeeId }); const content = getFullApAccount(u.username, u.host); await new Promise((res, rej) => { stream.write(content + '\n', err => { @@ -70,8 +70,8 @@ export async function exportBlocking(job: Bull.Job, done: any): Promise<void> { exportedCount++; } - const total = await Blocking.count({ - blockerId: user._id, + const total = await Blockings.count({ + blockerId: user.id, }); job.progress(exportedCount / total); @@ -83,7 +83,7 @@ export async function exportBlocking(job: Bull.Job, done: any): Promise<void> { const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; const driveFile = await addFile(user, path, fileName); - logger.succ(`Exported to: ${driveFile._id}`); + logger.succ(`Exported to: ${driveFile.id}`); cleanup(); done(); } diff --git a/src/queue/processors/db/export-following.ts b/src/queue/processors/db/export-following.ts index 019414072a..fb30df79fe 100644 --- a/src/queue/processors/db/export-following.ts +++ b/src/queue/processors/db/export-following.ts @@ -1,22 +1,21 @@ import * as Bull from 'bull'; import * as tmp from 'tmp'; import * as fs from 'fs'; -import * as mongo from 'mongodb'; import { queueLogger } from '../../logger'; import addFile from '../../../services/drive/add-file'; -import User from '../../../models/user'; import dateFormat = require('dateformat'); -import Following from '../../../models/following'; import { getFullApAccount } from '../../../misc/convert-host'; +import { Users, Followings } from '../../../models'; +import { MoreThan } from 'typeorm'; const logger = queueLogger.createSubLogger('export-following'); export async function exportFollowing(job: Bull.Job, done: any): Promise<void> { - logger.info(`Exporting following of ${job.data.user._id} ...`); + logger.info(`Exporting following of ${job.data.user.id} ...`); - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) + const user = await Users.findOne({ + id: job.data.user.id }); // Create temp file @@ -36,13 +35,14 @@ export async function exportFollowing(job: Bull.Job, done: any): Promise<void> { let cursor: any = null; while (!ended) { - const followings = await Following.find({ - followerId: user._id, - ...(cursor ? { _id: { $gt: cursor } } : {}) - }, { - limit: 100, - sort: { - _id: 1 + const followings = await Followings.find({ + where: { + followerId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 } }); @@ -52,10 +52,10 @@ export async function exportFollowing(job: Bull.Job, done: any): Promise<void> { break; } - cursor = followings[followings.length - 1]._id; + cursor = followings[followings.length - 1].id; for (const following of followings) { - const u = await User.findOne({ _id: following.followeeId }, { fields: { username: true, host: true } }); + const u = await Users.findOne({ id: following.followeeId }); const content = getFullApAccount(u.username, u.host); await new Promise((res, rej) => { stream.write(content + '\n', err => { @@ -70,8 +70,8 @@ export async function exportFollowing(job: Bull.Job, done: any): Promise<void> { exportedCount++; } - const total = await Following.count({ - followerId: user._id, + const total = await Followings.count({ + followerId: user.id, }); job.progress(exportedCount / total); @@ -83,7 +83,7 @@ export async function exportFollowing(job: Bull.Job, done: any): Promise<void> { const fileName = 'following-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; const driveFile = await addFile(user, path, fileName); - logger.succ(`Exported to: ${driveFile._id}`); + logger.succ(`Exported to: ${driveFile.id}`); cleanup(); done(); } diff --git a/src/queue/processors/db/export-mute.ts b/src/queue/processors/db/export-mute.ts index 5ded7cf651..3aed526dc5 100644 --- a/src/queue/processors/db/export-mute.ts +++ b/src/queue/processors/db/export-mute.ts @@ -1,22 +1,21 @@ import * as Bull from 'bull'; import * as tmp from 'tmp'; import * as fs from 'fs'; -import * as mongo from 'mongodb'; import { queueLogger } from '../../logger'; import addFile from '../../../services/drive/add-file'; -import User from '../../../models/user'; import dateFormat = require('dateformat'); -import Mute from '../../../models/mute'; import { getFullApAccount } from '../../../misc/convert-host'; +import { Users, Mutings } from '../../../models'; +import { MoreThan } from 'typeorm'; const logger = queueLogger.createSubLogger('export-mute'); export async function exportMute(job: Bull.Job, done: any): Promise<void> { - logger.info(`Exporting mute of ${job.data.user._id} ...`); + logger.info(`Exporting mute of ${job.data.user.id} ...`); - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) + const user = await Users.findOne({ + id: job.data.user.id }); // Create temp file @@ -36,13 +35,14 @@ export async function exportMute(job: Bull.Job, done: any): Promise<void> { let cursor: any = null; while (!ended) { - const mutes = await Mute.find({ - muterId: user._id, - ...(cursor ? { _id: { $gt: cursor } } : {}) - }, { - limit: 100, - sort: { - _id: 1 + const mutes = await Mutings.find({ + where: { + muterId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 } }); @@ -52,10 +52,10 @@ export async function exportMute(job: Bull.Job, done: any): Promise<void> { break; } - cursor = mutes[mutes.length - 1]._id; + cursor = mutes[mutes.length - 1].id; for (const mute of mutes) { - const u = await User.findOne({ _id: mute.muteeId }, { fields: { username: true, host: true } }); + const u = await Users.findOne({ id: mute.muteeId }); const content = getFullApAccount(u.username, u.host); await new Promise((res, rej) => { stream.write(content + '\n', err => { @@ -70,8 +70,8 @@ export async function exportMute(job: Bull.Job, done: any): Promise<void> { exportedCount++; } - const total = await Mute.count({ - muterId: user._id, + const total = await Mutings.count({ + muterId: user.id, }); job.progress(exportedCount / total); @@ -83,7 +83,7 @@ export async function exportMute(job: Bull.Job, done: any): Promise<void> { const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; const driveFile = await addFile(user, path, fileName); - logger.succ(`Exported to: ${driveFile._id}`); + logger.succ(`Exported to: ${driveFile.id}`); cleanup(); done(); } diff --git a/src/queue/processors/db/export-notes.ts b/src/queue/processors/db/export-notes.ts index 8f3cdc5b99..92867ad82e 100644 --- a/src/queue/processors/db/export-notes.ts +++ b/src/queue/processors/db/export-notes.ts @@ -1,21 +1,22 @@ import * as Bull from 'bull'; import * as tmp from 'tmp'; import * as fs from 'fs'; -import * as mongo from 'mongodb'; import { queueLogger } from '../../logger'; -import Note, { INote } from '../../../models/note'; import addFile from '../../../services/drive/add-file'; -import User from '../../../models/user'; import dateFormat = require('dateformat'); +import { Users, Notes, Polls } from '../../../models'; +import { MoreThan } from 'typeorm'; +import { Note } from '../../../models/entities/note'; +import { Poll } from '../../../models/entities/poll'; const logger = queueLogger.createSubLogger('export-notes'); export async function exportNotes(job: Bull.Job, done: any): Promise<void> { - logger.info(`Exporting notes of ${job.data.user._id} ...`); + logger.info(`Exporting notes of ${job.data.user.id} ...`); - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) + const user = await Users.findOne({ + id: job.data.user.id }); // Create temp file @@ -46,13 +47,14 @@ export async function exportNotes(job: Bull.Job, done: any): Promise<void> { let cursor: any = null; while (!ended) { - const notes = await Note.find({ - userId: user._id, - ...(cursor ? { _id: { $gt: cursor } } : {}) - }, { - limit: 100, - sort: { - _id: 1 + const notes = await Notes.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 } }); @@ -62,10 +64,14 @@ export async function exportNotes(job: Bull.Job, done: any): Promise<void> { break; } - cursor = notes[notes.length - 1]._id; + cursor = notes[notes.length - 1].id; for (const note of notes) { - const content = JSON.stringify(serialize(note)); + let poll: Poll; + if (note.hasPoll) { + poll = await Polls.findOne({ noteId: note.id }); + } + const content = JSON.stringify(serialize(note, poll)); await new Promise((res, rej) => { stream.write(exportedNotesCount === 0 ? content : ',\n' + content, err => { if (err) { @@ -79,8 +85,8 @@ export async function exportNotes(job: Bull.Job, done: any): Promise<void> { exportedNotesCount++; } - const total = await Note.count({ - userId: user._id, + const total = await Notes.count({ + userId: user.id, }); job.progress(exportedNotesCount / total); @@ -103,20 +109,20 @@ export async function exportNotes(job: Bull.Job, done: any): Promise<void> { const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.json'; const driveFile = await addFile(user, path, fileName); - logger.succ(`Exported to: ${driveFile._id}`); + logger.succ(`Exported to: ${driveFile.id}`); cleanup(); done(); } -function serialize(note: INote): any { +function serialize(note: Note, poll: Poll): any { return { - id: note._id, + id: note.id, text: note.text, createdAt: note.createdAt, fileIds: note.fileIds, replyId: note.replyId, renoteId: note.renoteId, - poll: note.poll, + poll: poll, cw: note.cw, viaMobile: note.viaMobile, visibility: note.visibility, diff --git a/src/queue/processors/db/export-user-lists.ts b/src/queue/processors/db/export-user-lists.ts index dfbf152ec0..f3987cb0d2 100644 --- a/src/queue/processors/db/export-user-lists.ts +++ b/src/queue/processors/db/export-user-lists.ts @@ -1,26 +1,25 @@ import * as Bull from 'bull'; import * as tmp from 'tmp'; import * as fs from 'fs'; -import * as mongo from 'mongodb'; import { queueLogger } from '../../logger'; import addFile from '../../../services/drive/add-file'; -import User from '../../../models/user'; import dateFormat = require('dateformat'); -import UserList from '../../../models/user-list'; import { getFullApAccount } from '../../../misc/convert-host'; +import { Users, UserLists, UserListJoinings } from '../../../models'; +import { In } from 'typeorm'; const logger = queueLogger.createSubLogger('export-user-lists'); export async function exportUserLists(job: Bull.Job, done: any): Promise<void> { - logger.info(`Exporting user lists of ${job.data.user._id} ...`); + logger.info(`Exporting user lists of ${job.data.user.id} ...`); - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) + const user = await Users.findOne({ + id: job.data.user.id }); - const lists = await UserList.find({ - userId: user._id + const lists = await UserLists.find({ + userId: user.id }); // Create temp file @@ -36,18 +35,14 @@ export async function exportUserLists(job: Bull.Job, done: any): Promise<void> { const stream = fs.createWriteStream(path, { flags: 'a' }); for (const list of lists) { - const users = await User.find({ - _id: { $in: list.userIds } - }, { - fields: { - username: true, - host: true - } + const joinings = await UserListJoinings.find({ userListId: list.id }); + const users = await Users.find({ + id: In(joinings.map(j => j.userId)) }); for (const u of users) { const acct = getFullApAccount(u.username, u.host); - const content = `${list.title},${acct}`; + const content = `${list.name},${acct}`; await new Promise((res, rej) => { stream.write(content + '\n', err => { if (err) { @@ -67,7 +62,7 @@ export async function exportUserLists(job: Bull.Job, done: any): Promise<void> { const fileName = 'user-lists-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; const driveFile = await addFile(user, path, fileName); - logger.succ(`Exported to: ${driveFile._id}`); + logger.succ(`Exported to: ${driveFile.id}`); cleanup(); done(); } diff --git a/src/queue/processors/db/import-following.ts b/src/queue/processors/db/import-following.ts index 069afa74c4..2e646c1869 100644 --- a/src/queue/processors/db/import-following.ts +++ b/src/queue/processors/db/import-following.ts @@ -1,32 +1,27 @@ import * as Bull from 'bull'; -import * as mongo from 'mongodb'; import { queueLogger } from '../../logger'; -import User from '../../../models/user'; import follow from '../../../services/following/create'; -import DriveFile from '../../../models/drive-file'; -import { getOriginalUrl } from '../../../misc/get-drive-file-url'; import parseAcct from '../../../misc/acct/parse'; import resolveUser from '../../../remote/resolve-user'; import { downloadTextFile } from '../../../misc/download-text-file'; import { isSelfHost, toDbHost } from '../../../misc/convert-host'; +import { Users, DriveFiles } from '../../../models'; const logger = queueLogger.createSubLogger('import-following'); export async function importFollowing(job: Bull.Job, done: any): Promise<void> { - logger.info(`Importing following of ${job.data.user._id} ...`); + logger.info(`Importing following of ${job.data.user.id} ...`); - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) + const user = await Users.findOne({ + id: job.data.user.id }); - const file = await DriveFile.findOne({ - _id: new mongo.ObjectID(job.data.fileId.toString()) + const file = await DriveFiles.findOne({ + id: job.data.fileId }); - const url = getOriginalUrl(file); - - const csv = await downloadTextFile(url); + const csv = await downloadTextFile(file.url); let linenum = 0; @@ -36,10 +31,10 @@ export async function importFollowing(job: Bull.Job, done: any): Promise<void> { try { const { username, host } = parseAcct(line.trim()); - let target = isSelfHost(host) ? await User.findOne({ + let target = isSelfHost(host) ? await Users.findOne({ host: null, usernameLower: username.toLowerCase() - }) : await User.findOne({ + }) : await Users.findOne({ host: toDbHost(host), usernameLower: username.toLowerCase() }); @@ -55,9 +50,9 @@ export async function importFollowing(job: Bull.Job, done: any): Promise<void> { } // skip myself - if (target._id.equals(job.data.user._id)) continue; + if (target.id === job.data.user.id) continue; - logger.info(`Follow[${linenum}] ${target._id} ...`); + logger.info(`Follow[${linenum}] ${target.id} ...`); follow(user, target); } catch (e) { diff --git a/src/queue/processors/db/import-user-lists.ts b/src/queue/processors/db/import-user-lists.ts index 50d3c649d4..8be5785896 100644 --- a/src/queue/processors/db/import-user-lists.ts +++ b/src/queue/processors/db/import-user-lists.ts @@ -1,62 +1,59 @@ import * as Bull from 'bull'; -import * as mongo from 'mongodb'; import { queueLogger } from '../../logger'; -import User from '../../../models/user'; -import UserList from '../../../models/user-list'; -import DriveFile from '../../../models/drive-file'; -import { getOriginalUrl } from '../../../misc/get-drive-file-url'; import parseAcct from '../../../misc/acct/parse'; import resolveUser from '../../../remote/resolve-user'; import { pushUserToUserList } from '../../../services/user-list/push'; import { downloadTextFile } from '../../../misc/download-text-file'; import { isSelfHost, toDbHost } from '../../../misc/convert-host'; +import { DriveFiles, Users, UserLists, UserListJoinings } from '../../../models'; +import { genId } from '../../../misc/gen-id'; const logger = queueLogger.createSubLogger('import-user-lists'); export async function importUserLists(job: Bull.Job, done: any): Promise<void> { - logger.info(`Importing user lists of ${job.data.user._id} ...`); + logger.info(`Importing user lists of ${job.data.user.id} ...`); - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) + const user = await Users.findOne({ + id: job.data.user.id }); - const file = await DriveFile.findOne({ - _id: new mongo.ObjectID(job.data.fileId.toString()) + const file = await DriveFiles.findOne({ + id: job.data.fileId }); - const url = getOriginalUrl(file); - - const csv = await downloadTextFile(url); + const csv = await downloadTextFile(file.url); for (const line of csv.trim().split('\n')) { const listName = line.split(',')[0].trim(); const { username, host } = parseAcct(line.split(',')[1].trim()); - let list = await UserList.findOne({ - userId: user._id, - title: listName + let list = await UserLists.findOne({ + userId: user.id, + name: listName }); if (list == null) { - list = await UserList.insert({ + list = await UserLists.save({ + id: genId(), createdAt: new Date(), - userId: user._id, - title: listName, + userId: user.id, + name: listName, userIds: [] }); } - let target = isSelfHost(host) ? await User.findOne({ + let target = isSelfHost(host) ? await Users.findOne({ host: null, usernameLower: username.toLowerCase() - }) : await User.findOne({ + }) : await Users.findOne({ host: toDbHost(host), usernameLower: username.toLowerCase() }); if (host == null && target == null) continue; - if (list.userIds.some(id => id.equals(target._id))) continue; + + if (await UserListJoinings.findOne({ userListId: list.id, userId: target.id }) != null) continue; if (target == null) { target = await resolveUser(username, host); diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts index 1bc9a9af7c..921cdf7ab1 100644 --- a/src/queue/processors/db/index.ts +++ b/src/queue/processors/db/index.ts @@ -1,5 +1,4 @@ import * as Bull from 'bull'; -import { deleteNotes } from './delete-notes'; import { deleteDriveFiles } from './delete-drive-files'; import { exportNotes } from './export-notes'; import { exportFollowing } from './export-following'; @@ -10,7 +9,6 @@ import { importFollowing } from './import-following'; import { importUserLists } from './import-user-lists'; const jobs = { - deleteNotes, deleteDriveFiles, exportNotes, exportFollowing, diff --git a/src/queue/processors/deliver.ts b/src/queue/processors/deliver.ts index 28d3a17f6b..b9701c0c65 100644 --- a/src/queue/processors/deliver.ts +++ b/src/queue/processors/deliver.ts @@ -1,9 +1,9 @@ import * as Bull from 'bull'; import request from '../../remote/activitypub/request'; import { registerOrFetchInstanceDoc } from '../../services/register-or-fetch-instance-doc'; -import Instance from '../../models/instance'; -import instanceChart from '../../services/chart/instance'; import Logger from '../../services/logger'; +import { Instances } from '../../models'; +import { instanceChart } from '../../services/chart'; const logger = new Logger('deliver'); @@ -21,13 +21,11 @@ export default async (job: Bull.Job) => { // Update stats registerOrFetchInstanceDoc(host).then(i => { - Instance.update({ _id: i._id }, { - $set: { - latestRequestSentAt: new Date(), - latestStatus: 200, - lastCommunicatedAt: new Date(), - isNotResponding: false - } + Instances.update(i.id, { + latestRequestSentAt: new Date(), + latestStatus: 200, + lastCommunicatedAt: new Date(), + isNotResponding: false }); instanceChart.requestSent(i.host, true); @@ -37,12 +35,10 @@ export default async (job: Bull.Job) => { } catch (res) { // Update stats registerOrFetchInstanceDoc(host).then(i => { - Instance.update({ _id: i._id }, { - $set: { - latestRequestSentAt: new Date(), - latestStatus: res != null && res.hasOwnProperty('statusCode') ? res.statusCode : null, - isNotResponding: true - } + Instances.update(i.id, { + latestRequestSentAt: new Date(), + latestStatus: res != null && res.hasOwnProperty('statusCode') ? res.statusCode : null, + isNotResponding: true }); instanceChart.requestSent(i.host, false); diff --git a/src/queue/processors/inbox.ts b/src/queue/processors/inbox.ts index 436f3335c8..16badabcf7 100644 --- a/src/queue/processors/inbox.ts +++ b/src/queue/processors/inbox.ts @@ -1,7 +1,7 @@ import * as Bull from 'bull'; import * as httpSignature from 'http-signature'; import parseAcct from '../../misc/acct/parse'; -import User, { IRemoteUser } from '../../models/user'; +import { IRemoteUser } from '../../models/entities/user'; import perform from '../../remote/activitypub/perform'; import { resolvePerson, updatePerson } from '../../remote/activitypub/models/person'; import { toUnicode } from 'punycode'; @@ -9,8 +9,10 @@ import { URL } from 'url'; import { publishApLogStream } from '../../services/stream'; import Logger from '../../services/logger'; import { registerOrFetchInstanceDoc } from '../../services/register-or-fetch-instance-doc'; -import Instance from '../../models/instance'; -import instanceChart from '../../services/chart/instance'; +import { Instances, Users, UserPublickeys } from '../../models'; +import { instanceChart } from '../../services/chart'; +import { UserPublickey } from '../../models/entities/user-publickey'; +import fetchMeta from '../../misc/fetch-meta'; const logger = new Logger('inbox'); @@ -28,6 +30,7 @@ export default async (job: Bull.Job): Promise<void> => { const keyIdLower = signature.keyId.toLowerCase(); let user: IRemoteUser; + let key: UserPublickey; if (keyIdLower.startsWith('acct:')) { const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); @@ -46,13 +49,17 @@ export default async (job: Bull.Job): Promise<void> => { // ブロックしてたら中断 // TODO: いちいちデータベースにアクセスするのはコスト高そうなのでどっかにキャッシュしておく - const instance = await Instance.findOne({ host: host.toLowerCase() }); - if (instance && instance.isBlocked) { + const meta = await fetchMeta(); + if (meta.blockedHosts.includes(host.toLowerCase())) { logger.info(`Blocked request: ${host}`); return; } - user = await User.findOne({ usernameLower: username, host: host.toLowerCase() }) as IRemoteUser; + user = await Users.findOne({ usernameLower: username, host: host.toLowerCase() }) as IRemoteUser; + + key = await UserPublickeys.findOne({ + userId: user.id + }); } else { // アクティビティ内のホストの検証 const host = toUnicode(new URL(signature.keyId).hostname.toLowerCase()); @@ -65,16 +72,17 @@ export default async (job: Bull.Job): Promise<void> => { // ブロックしてたら中断 // TODO: いちいちデータベースにアクセスするのはコスト高そうなのでどっかにキャッシュしておく - const instance = await Instance.findOne({ host: host.toLowerCase() }); - if (instance && instance.isBlocked) { - logger.warn(`Blocked request: ${host}`); + const meta = await fetchMeta(); + if (meta.blockedHosts.includes(host.toLowerCase())) { + logger.info(`Blocked request: ${host}`); return; } - user = await User.findOne({ - host: { $ne: null }, - 'publicKey.id': signature.keyId - }) as IRemoteUser; + key = await UserPublickeys.findOne({ + keyId: signature.keyId + }); + + user = await Users.findOne(key.userId) as IRemoteUser; } // Update Person activityの場合は、ここで署名検証/更新処理まで実施して終了 @@ -82,7 +90,7 @@ export default async (job: Bull.Job): Promise<void> => { if (activity.object && activity.object.type === 'Person') { if (user == null) { logger.warn('Update activity received, but user not registed.'); - } else if (!httpSignature.verifySignature(signature, user.publicKey.publicKeyPem)) { + } else if (!httpSignature.verifySignature(signature, key.keyPem)) { logger.warn('Update activity received, but signature verification failed.'); } else { updatePerson(activity.actor, null, activity.object); @@ -92,15 +100,15 @@ export default async (job: Bull.Job): Promise<void> => { } // アクティビティを送信してきたユーザーがまだMisskeyサーバーに登録されていなかったら登録する - if (user === null) { + if (user == null) { user = await resolvePerson(activity.actor) as IRemoteUser; } - if (user === null) { + if (user == null) { throw new Error('failed to resolve user'); } - if (!httpSignature.verifySignature(signature, user.publicKey.publicKeyPem)) { + if (!httpSignature.verifySignature(signature, key.keyPem)) { logger.error('signature verification failed'); return; } @@ -116,12 +124,10 @@ export default async (job: Bull.Job): Promise<void> => { // Update stats registerOrFetchInstanceDoc(user.host).then(i => { - Instance.update({ _id: i._id }, { - $set: { - latestRequestReceivedAt: new Date(), - lastCommunicatedAt: new Date(), - isNotResponding: false - } + Instances.update(i.id, { + latestRequestReceivedAt: new Date(), + lastCommunicatedAt: new Date(), + isNotResponding: false }); instanceChart.requestReceived(i.host); |