diff options
| author | syuilo <syuilotan@yahoo.co.jp> | 2019-04-14 20:38:55 +0900 |
|---|---|---|
| committer | syuilo <syuilotan@yahoo.co.jp> | 2019-04-14 20:38:55 +0900 |
| commit | d66e4b7ff97d512e2a2523815e2eef170456b37f (patch) | |
| tree | 59ae1a102d88b5c2c2236b734ea4a584b4f9ba46 /src/queue/processors | |
| parent | 10.100.0 (diff) | |
| parent | 11.0.0 (diff) | |
| download | misskey-d66e4b7ff97d512e2a2523815e2eef170456b37f.tar.gz misskey-d66e4b7ff97d512e2a2523815e2eef170456b37f.tar.bz2 misskey-d66e4b7ff97d512e2a2523815e2eef170456b37f.zip | |
Merge branch 'develop'
Diffstat (limited to 'src/queue/processors')
| -rw-r--r-- | src/queue/processors/db/delete-drive-files.ts | 42 | ||||
| -rw-r--r-- | src/queue/processors/db/delete-notes.ts | 55 | ||||
| -rw-r--r-- | src/queue/processors/db/export-blocking.ts | 48 | ||||
| -rw-r--r-- | src/queue/processors/db/export-following.ts | 48 | ||||
| -rw-r--r-- | src/queue/processors/db/export-mute.ts | 48 | ||||
| -rw-r--r-- | src/queue/processors/db/export-notes.ts | 57 | ||||
| -rw-r--r-- | src/queue/processors/db/export-user-lists.ts | 33 | ||||
| -rw-r--r-- | src/queue/processors/db/import-following.ts | 41 | ||||
| -rw-r--r-- | src/queue/processors/db/import-user-lists.ts | 57 | ||||
| -rw-r--r-- | src/queue/processors/db/index.ts | 2 | ||||
| -rw-r--r-- | src/queue/processors/deliver.ts | 28 | ||||
| -rw-r--r-- | src/queue/processors/inbox.ts | 74 |
12 files changed, 251 insertions, 282 deletions
diff --git a/src/queue/processors/db/delete-drive-files.ts b/src/queue/processors/db/delete-drive-files.ts index 3de960a25e..4e4eab86b7 100644 --- a/src/queue/processors/db/delete-drive-files.ts +++ b/src/queue/processors/db/delete-drive-files.ts @@ -1,55 +1,55 @@ import * as Bull from 'bull'; -import * as mongo from 'mongodb'; import { queueLogger } from '../../logger'; -import User from '../../../models/user'; -import DriveFile from '../../../models/drive-file'; import deleteFile from '../../../services/drive/delete-file'; +import { Users, DriveFiles } from '../../../models'; +import { MoreThan } from 'typeorm'; const logger = queueLogger.createSubLogger('delete-drive-files'); export async function deleteDriveFiles(job: Bull.Job, done: any): Promise<void> { - logger.info(`Deleting drive files of ${job.data.user._id} ...`); + logger.info(`Deleting drive files of ${job.data.user.id} ...`); - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) - }); + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } let deletedCount = 0; - let ended = false; let cursor: any = null; - while (!ended) { - const files = await DriveFile.find({ - userId: user._id, - ...(cursor ? { _id: { $gt: cursor } } : {}) - }, { - limit: 100, - sort: { - _id: 1 + while (true) { + const files = await DriveFiles.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 } }); if (files.length === 0) { - ended = true; job.progress(100); break; } - cursor = files[files.length - 1]._id; + cursor = files[files.length - 1].id; for (const file of files) { await deleteFile(file); deletedCount++; } - const total = await DriveFile.count({ - userId: user._id, + const total = await DriveFiles.count({ + userId: user.id, }); job.progress(deletedCount / total); } - logger.succ(`All drive files (${deletedCount}) of ${user._id} has been deleted.`); + logger.succ(`All drive files (${deletedCount}) of ${user.id} has been deleted.`); done(); } diff --git a/src/queue/processors/db/delete-notes.ts b/src/queue/processors/db/delete-notes.ts deleted file mode 100644 index 021db8062e..0000000000 --- a/src/queue/processors/db/delete-notes.ts +++ /dev/null @@ -1,55 +0,0 @@ -import * as Bull from 'bull'; -import * as mongo from 'mongodb'; - -import { queueLogger } from '../../logger'; -import Note from '../../../models/note'; -import deleteNote from '../../../services/note/delete'; -import User from '../../../models/user'; - -const logger = queueLogger.createSubLogger('delete-notes'); - -export async function deleteNotes(job: Bull.Job, done: any): Promise<void> { - logger.info(`Deleting notes of ${job.data.user._id} ...`); - - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) - }); - - let deletedCount = 0; - let ended = false; - let cursor: any = null; - - while (!ended) { - const notes = await Note.find({ - userId: user._id, - ...(cursor ? { _id: { $gt: cursor } } : {}) - }, { - limit: 100, - sort: { - _id: 1 - } - }); - - if (notes.length === 0) { - ended = true; - job.progress(100); - break; - } - - cursor = notes[notes.length - 1]._id; - - for (const note of notes) { - await deleteNote(user, note, true); - deletedCount++; - } - - const total = await Note.count({ - userId: user._id, - }); - - job.progress(deletedCount / total); - } - - logger.succ(`All notes (${deletedCount}) of ${user._id} has been deleted.`); - done(); -} diff --git a/src/queue/processors/db/export-blocking.ts b/src/queue/processors/db/export-blocking.ts index 7f32c06472..c4b8c9438d 100644 --- a/src/queue/processors/db/export-blocking.ts +++ b/src/queue/processors/db/export-blocking.ts @@ -1,23 +1,24 @@ import * as Bull from 'bull'; import * as tmp from 'tmp'; import * as fs from 'fs'; -import * as mongo from 'mongodb'; import { queueLogger } from '../../logger'; import addFile from '../../../services/drive/add-file'; -import User from '../../../models/user'; import dateFormat = require('dateformat'); -import Blocking from '../../../models/blocking'; import { getFullApAccount } from '../../../misc/convert-host'; +import { Users, Blockings } from '../../../models'; +import { MoreThan } from 'typeorm'; const logger = queueLogger.createSubLogger('export-blocking'); export async function exportBlocking(job: Bull.Job, done: any): Promise<void> { - logger.info(`Exporting blocking of ${job.data.user._id} ...`); + logger.info(`Exporting blocking of ${job.data.user.id} ...`); - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) - }); + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } // Create temp file const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { @@ -32,30 +33,33 @@ export async function exportBlocking(job: Bull.Job, done: any): Promise<void> { const stream = fs.createWriteStream(path, { flags: 'a' }); let exportedCount = 0; - let ended = false; let cursor: any = null; - while (!ended) { - const blockings = await Blocking.find({ - blockerId: user._id, - ...(cursor ? { _id: { $gt: cursor } } : {}) - }, { - limit: 100, - sort: { - _id: 1 + while (true) { + const blockings = await Blockings.find({ + where: { + blockerId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 } }); if (blockings.length === 0) { - ended = true; job.progress(100); break; } - cursor = blockings[blockings.length - 1]._id; + cursor = blockings[blockings.length - 1].id; for (const block of blockings) { - const u = await User.findOne({ _id: block.blockeeId }, { fields: { username: true, host: true } }); + const u = await Users.findOne({ id: block.blockeeId }); + if (u == null) { + exportedCount++; continue; + } + const content = getFullApAccount(u.username, u.host); await new Promise((res, rej) => { stream.write(content + '\n', err => { @@ -70,8 +74,8 @@ export async function exportBlocking(job: Bull.Job, done: any): Promise<void> { exportedCount++; } - const total = await Blocking.count({ - blockerId: user._id, + const total = await Blockings.count({ + blockerId: user.id, }); job.progress(exportedCount / total); @@ -83,7 +87,7 @@ export async function exportBlocking(job: Bull.Job, done: any): Promise<void> { const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; const driveFile = await addFile(user, path, fileName); - logger.succ(`Exported to: ${driveFile._id}`); + logger.succ(`Exported to: ${driveFile.id}`); cleanup(); done(); } diff --git a/src/queue/processors/db/export-following.ts b/src/queue/processors/db/export-following.ts index 019414072a..9fab5bb21a 100644 --- a/src/queue/processors/db/export-following.ts +++ b/src/queue/processors/db/export-following.ts @@ -1,23 +1,24 @@ import * as Bull from 'bull'; import * as tmp from 'tmp'; import * as fs from 'fs'; -import * as mongo from 'mongodb'; import { queueLogger } from '../../logger'; import addFile from '../../../services/drive/add-file'; -import User from '../../../models/user'; import dateFormat = require('dateformat'); -import Following from '../../../models/following'; import { getFullApAccount } from '../../../misc/convert-host'; +import { Users, Followings } from '../../../models'; +import { MoreThan } from 'typeorm'; const logger = queueLogger.createSubLogger('export-following'); export async function exportFollowing(job: Bull.Job, done: any): Promise<void> { - logger.info(`Exporting following of ${job.data.user._id} ...`); + logger.info(`Exporting following of ${job.data.user.id} ...`); - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) - }); + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } // Create temp file const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { @@ -32,30 +33,33 @@ export async function exportFollowing(job: Bull.Job, done: any): Promise<void> { const stream = fs.createWriteStream(path, { flags: 'a' }); let exportedCount = 0; - let ended = false; let cursor: any = null; - while (!ended) { - const followings = await Following.find({ - followerId: user._id, - ...(cursor ? { _id: { $gt: cursor } } : {}) - }, { - limit: 100, - sort: { - _id: 1 + while (true) { + const followings = await Followings.find({ + where: { + followerId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 } }); if (followings.length === 0) { - ended = true; job.progress(100); break; } - cursor = followings[followings.length - 1]._id; + cursor = followings[followings.length - 1].id; for (const following of followings) { - const u = await User.findOne({ _id: following.followeeId }, { fields: { username: true, host: true } }); + const u = await Users.findOne({ id: following.followeeId }); + if (u == null) { + exportedCount++; continue; + } + const content = getFullApAccount(u.username, u.host); await new Promise((res, rej) => { stream.write(content + '\n', err => { @@ -70,8 +74,8 @@ export async function exportFollowing(job: Bull.Job, done: any): Promise<void> { exportedCount++; } - const total = await Following.count({ - followerId: user._id, + const total = await Followings.count({ + followerId: user.id, }); job.progress(exportedCount / total); @@ -83,7 +87,7 @@ export async function exportFollowing(job: Bull.Job, done: any): Promise<void> { const fileName = 'following-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; const driveFile = await addFile(user, path, fileName); - logger.succ(`Exported to: ${driveFile._id}`); + logger.succ(`Exported to: ${driveFile.id}`); cleanup(); done(); } diff --git a/src/queue/processors/db/export-mute.ts b/src/queue/processors/db/export-mute.ts index 5ded7cf651..b957b48b20 100644 --- a/src/queue/processors/db/export-mute.ts +++ b/src/queue/processors/db/export-mute.ts @@ -1,23 +1,24 @@ import * as Bull from 'bull'; import * as tmp from 'tmp'; import * as fs from 'fs'; -import * as mongo from 'mongodb'; import { queueLogger } from '../../logger'; import addFile from '../../../services/drive/add-file'; -import User from '../../../models/user'; import dateFormat = require('dateformat'); -import Mute from '../../../models/mute'; import { getFullApAccount } from '../../../misc/convert-host'; +import { Users, Mutings } from '../../../models'; +import { MoreThan } from 'typeorm'; const logger = queueLogger.createSubLogger('export-mute'); export async function exportMute(job: Bull.Job, done: any): Promise<void> { - logger.info(`Exporting mute of ${job.data.user._id} ...`); + logger.info(`Exporting mute of ${job.data.user.id} ...`); - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) - }); + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } // Create temp file const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { @@ -32,30 +33,33 @@ export async function exportMute(job: Bull.Job, done: any): Promise<void> { const stream = fs.createWriteStream(path, { flags: 'a' }); let exportedCount = 0; - let ended = false; let cursor: any = null; - while (!ended) { - const mutes = await Mute.find({ - muterId: user._id, - ...(cursor ? { _id: { $gt: cursor } } : {}) - }, { - limit: 100, - sort: { - _id: 1 + while (true) { + const mutes = await Mutings.find({ + where: { + muterId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 } }); if (mutes.length === 0) { - ended = true; job.progress(100); break; } - cursor = mutes[mutes.length - 1]._id; + cursor = mutes[mutes.length - 1].id; for (const mute of mutes) { - const u = await User.findOne({ _id: mute.muteeId }, { fields: { username: true, host: true } }); + const u = await Users.findOne({ id: mute.muteeId }); + if (u == null) { + exportedCount++; continue; + } + const content = getFullApAccount(u.username, u.host); await new Promise((res, rej) => { stream.write(content + '\n', err => { @@ -70,8 +74,8 @@ export async function exportMute(job: Bull.Job, done: any): Promise<void> { exportedCount++; } - const total = await Mute.count({ - muterId: user._id, + const total = await Mutings.count({ + muterId: user.id, }); job.progress(exportedCount / total); @@ -83,7 +87,7 @@ export async function exportMute(job: Bull.Job, done: any): Promise<void> { const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; const driveFile = await addFile(user, path, fileName); - logger.succ(`Exported to: ${driveFile._id}`); + logger.succ(`Exported to: ${driveFile.id}`); cleanup(); done(); } diff --git a/src/queue/processors/db/export-notes.ts b/src/queue/processors/db/export-notes.ts index 8f3cdc5b99..d03a216a59 100644 --- a/src/queue/processors/db/export-notes.ts +++ b/src/queue/processors/db/export-notes.ts @@ -1,22 +1,26 @@ import * as Bull from 'bull'; import * as tmp from 'tmp'; import * as fs from 'fs'; -import * as mongo from 'mongodb'; import { queueLogger } from '../../logger'; -import Note, { INote } from '../../../models/note'; import addFile from '../../../services/drive/add-file'; -import User from '../../../models/user'; import dateFormat = require('dateformat'); +import { Users, Notes, Polls } from '../../../models'; +import { MoreThan } from 'typeorm'; +import { Note } from '../../../models/entities/note'; +import { Poll } from '../../../models/entities/poll'; +import { ensure } from '../../../prelude/ensure'; const logger = queueLogger.createSubLogger('export-notes'); export async function exportNotes(job: Bull.Job, done: any): Promise<void> { - logger.info(`Exporting notes of ${job.data.user._id} ...`); + logger.info(`Exporting notes of ${job.data.user.id} ...`); - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) - }); + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } // Create temp file const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { @@ -42,30 +46,33 @@ export async function exportNotes(job: Bull.Job, done: any): Promise<void> { }); let exportedNotesCount = 0; - let ended = false; let cursor: any = null; - while (!ended) { - const notes = await Note.find({ - userId: user._id, - ...(cursor ? { _id: { $gt: cursor } } : {}) - }, { - limit: 100, - sort: { - _id: 1 + while (true) { + const notes = await Notes.find({ + where: { + userId: user.id, + ...(cursor ? { id: MoreThan(cursor) } : {}) + }, + take: 100, + order: { + id: 1 } }); if (notes.length === 0) { - ended = true; job.progress(100); break; } - cursor = notes[notes.length - 1]._id; + cursor = notes[notes.length - 1].id; for (const note of notes) { - const content = JSON.stringify(serialize(note)); + let poll: Poll | undefined; + if (note.hasPoll) { + poll = await Polls.findOne({ noteId: note.id }).then(ensure); + } + const content = JSON.stringify(serialize(note, poll)); await new Promise((res, rej) => { stream.write(exportedNotesCount === 0 ? content : ',\n' + content, err => { if (err) { @@ -79,8 +86,8 @@ export async function exportNotes(job: Bull.Job, done: any): Promise<void> { exportedNotesCount++; } - const total = await Note.count({ - userId: user._id, + const total = await Notes.count({ + userId: user.id, }); job.progress(exportedNotesCount / total); @@ -103,20 +110,20 @@ export async function exportNotes(job: Bull.Job, done: any): Promise<void> { const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.json'; const driveFile = await addFile(user, path, fileName); - logger.succ(`Exported to: ${driveFile._id}`); + logger.succ(`Exported to: ${driveFile.id}`); cleanup(); done(); } -function serialize(note: INote): any { +function serialize(note: Note, poll: Poll | null = null): any { return { - id: note._id, + id: note.id, text: note.text, createdAt: note.createdAt, fileIds: note.fileIds, replyId: note.replyId, renoteId: note.renoteId, - poll: note.poll, + poll: poll, cw: note.cw, viaMobile: note.viaMobile, visibility: note.visibility, diff --git a/src/queue/processors/db/export-user-lists.ts b/src/queue/processors/db/export-user-lists.ts index dfbf152ec0..5cd978c1aa 100644 --- a/src/queue/processors/db/export-user-lists.ts +++ b/src/queue/processors/db/export-user-lists.ts @@ -1,26 +1,27 @@ import * as Bull from 'bull'; import * as tmp from 'tmp'; import * as fs from 'fs'; -import * as mongo from 'mongodb'; import { queueLogger } from '../../logger'; import addFile from '../../../services/drive/add-file'; -import User from '../../../models/user'; import dateFormat = require('dateformat'); -import UserList from '../../../models/user-list'; import { getFullApAccount } from '../../../misc/convert-host'; +import { Users, UserLists, UserListJoinings } from '../../../models'; +import { In } from 'typeorm'; const logger = queueLogger.createSubLogger('export-user-lists'); export async function exportUserLists(job: Bull.Job, done: any): Promise<void> { - logger.info(`Exporting user lists of ${job.data.user._id} ...`); + logger.info(`Exporting user lists of ${job.data.user.id} ...`); - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) - }); + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } - const lists = await UserList.find({ - userId: user._id + const lists = await UserLists.find({ + userId: user.id }); // Create temp file @@ -36,18 +37,14 @@ export async function exportUserLists(job: Bull.Job, done: any): Promise<void> { const stream = fs.createWriteStream(path, { flags: 'a' }); for (const list of lists) { - const users = await User.find({ - _id: { $in: list.userIds } - }, { - fields: { - username: true, - host: true - } + const joinings = await UserListJoinings.find({ userListId: list.id }); + const users = await Users.find({ + id: In(joinings.map(j => j.userId)) }); for (const u of users) { const acct = getFullApAccount(u.username, u.host); - const content = `${list.title},${acct}`; + const content = `${list.name},${acct}`; await new Promise((res, rej) => { stream.write(content + '\n', err => { if (err) { @@ -67,7 +64,7 @@ export async function exportUserLists(job: Bull.Job, done: any): Promise<void> { const fileName = 'user-lists-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; const driveFile = await addFile(user, path, fileName); - logger.succ(`Exported to: ${driveFile._id}`); + logger.succ(`Exported to: ${driveFile.id}`); cleanup(); done(); } diff --git a/src/queue/processors/db/import-following.ts b/src/queue/processors/db/import-following.ts index 069afa74c4..8de3193e46 100644 --- a/src/queue/processors/db/import-following.ts +++ b/src/queue/processors/db/import-following.ts @@ -1,32 +1,33 @@ import * as Bull from 'bull'; -import * as mongo from 'mongodb'; import { queueLogger } from '../../logger'; -import User from '../../../models/user'; import follow from '../../../services/following/create'; -import DriveFile from '../../../models/drive-file'; -import { getOriginalUrl } from '../../../misc/get-drive-file-url'; import parseAcct from '../../../misc/acct/parse'; -import resolveUser from '../../../remote/resolve-user'; +import { resolveUser } from '../../../remote/resolve-user'; import { downloadTextFile } from '../../../misc/download-text-file'; -import { isSelfHost, toDbHost } from '../../../misc/convert-host'; +import { isSelfHost, toPuny } from '../../../misc/convert-host'; +import { Users, DriveFiles } from '../../../models'; const logger = queueLogger.createSubLogger('import-following'); export async function importFollowing(job: Bull.Job, done: any): Promise<void> { - logger.info(`Importing following of ${job.data.user._id} ...`); + logger.info(`Importing following of ${job.data.user.id} ...`); - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) - }); + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } - const file = await DriveFile.findOne({ - _id: new mongo.ObjectID(job.data.fileId.toString()) + const file = await DriveFiles.findOne({ + id: job.data.fileId }); + if (file == null) { + done(); + return; + } - const url = getOriginalUrl(file); - - const csv = await downloadTextFile(url); + const csv = await downloadTextFile(file.url); let linenum = 0; @@ -36,11 +37,11 @@ export async function importFollowing(job: Bull.Job, done: any): Promise<void> { try { const { username, host } = parseAcct(line.trim()); - let target = isSelfHost(host) ? await User.findOne({ + let target = isSelfHost(host!) ? await Users.findOne({ host: null, usernameLower: username.toLowerCase() - }) : await User.findOne({ - host: toDbHost(host), + }) : await Users.findOne({ + host: toPuny(host!), usernameLower: username.toLowerCase() }); @@ -55,9 +56,9 @@ export async function importFollowing(job: Bull.Job, done: any): Promise<void> { } // skip myself - if (target._id.equals(job.data.user._id)) continue; + if (target.id === job.data.user.id) continue; - logger.info(`Follow[${linenum}] ${target._id} ...`); + logger.info(`Follow[${linenum}] ${target.id} ...`); follow(user, target); } catch (e) { diff --git a/src/queue/processors/db/import-user-lists.ts b/src/queue/processors/db/import-user-lists.ts index 50d3c649d4..1e852be945 100644 --- a/src/queue/processors/db/import-user-lists.ts +++ b/src/queue/processors/db/import-user-lists.ts @@ -1,67 +1,68 @@ import * as Bull from 'bull'; -import * as mongo from 'mongodb'; import { queueLogger } from '../../logger'; -import User from '../../../models/user'; -import UserList from '../../../models/user-list'; -import DriveFile from '../../../models/drive-file'; -import { getOriginalUrl } from '../../../misc/get-drive-file-url'; import parseAcct from '../../../misc/acct/parse'; -import resolveUser from '../../../remote/resolve-user'; +import { resolveUser } from '../../../remote/resolve-user'; import { pushUserToUserList } from '../../../services/user-list/push'; import { downloadTextFile } from '../../../misc/download-text-file'; -import { isSelfHost, toDbHost } from '../../../misc/convert-host'; +import { isSelfHost, toPuny } from '../../../misc/convert-host'; +import { DriveFiles, Users, UserLists, UserListJoinings } from '../../../models'; +import { genId } from '../../../misc/gen-id'; const logger = queueLogger.createSubLogger('import-user-lists'); export async function importUserLists(job: Bull.Job, done: any): Promise<void> { - logger.info(`Importing user lists of ${job.data.user._id} ...`); + logger.info(`Importing user lists of ${job.data.user.id} ...`); - const user = await User.findOne({ - _id: new mongo.ObjectID(job.data.user._id.toString()) - }); + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } - const file = await DriveFile.findOne({ - _id: new mongo.ObjectID(job.data.fileId.toString()) + const file = await DriveFiles.findOne({ + id: job.data.fileId }); + if (file == null) { + done(); + return; + } - const url = getOriginalUrl(file); - - const csv = await downloadTextFile(url); + const csv = await downloadTextFile(file.url); for (const line of csv.trim().split('\n')) { const listName = line.split(',')[0].trim(); const { username, host } = parseAcct(line.split(',')[1].trim()); - let list = await UserList.findOne({ - userId: user._id, - title: listName + let list = await UserLists.findOne({ + userId: user.id, + name: listName }); if (list == null) { - list = await UserList.insert({ + list = await UserLists.save({ + id: genId(), createdAt: new Date(), - userId: user._id, - title: listName, + userId: user.id, + name: listName, userIds: [] }); } - let target = isSelfHost(host) ? await User.findOne({ + let target = isSelfHost(host!) ? await Users.findOne({ host: null, usernameLower: username.toLowerCase() - }) : await User.findOne({ - host: toDbHost(host), + }) : await Users.findOne({ + host: toPuny(host!), usernameLower: username.toLowerCase() }); - if (host == null && target == null) continue; - if (list.userIds.some(id => id.equals(target._id))) continue; - if (target == null) { target = await resolveUser(username, host); } + if (await UserListJoinings.findOne({ userListId: list.id, userId: target.id }) != null) continue; + pushUserToUserList(target, list); } diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts index 1bc9a9af7c..921cdf7ab1 100644 --- a/src/queue/processors/db/index.ts +++ b/src/queue/processors/db/index.ts @@ -1,5 +1,4 @@ import * as Bull from 'bull'; -import { deleteNotes } from './delete-notes'; import { deleteDriveFiles } from './delete-drive-files'; import { exportNotes } from './export-notes'; import { exportFollowing } from './export-following'; @@ -10,7 +9,6 @@ import { importFollowing } from './import-following'; import { importUserLists } from './import-user-lists'; const jobs = { - deleteNotes, deleteDriveFiles, exportNotes, exportFollowing, diff --git a/src/queue/processors/deliver.ts b/src/queue/processors/deliver.ts index 28d3a17f6b..8837c80d87 100644 --- a/src/queue/processors/deliver.ts +++ b/src/queue/processors/deliver.ts @@ -1,13 +1,13 @@ import * as Bull from 'bull'; import request from '../../remote/activitypub/request'; import { registerOrFetchInstanceDoc } from '../../services/register-or-fetch-instance-doc'; -import Instance from '../../models/instance'; -import instanceChart from '../../services/chart/instance'; import Logger from '../../services/logger'; +import { Instances } from '../../models'; +import { instanceChart } from '../../services/chart'; const logger = new Logger('deliver'); -let latest: string = null; +let latest: string | null = null; export default async (job: Bull.Job) => { const { host } = new URL(job.data.to); @@ -21,13 +21,11 @@ export default async (job: Bull.Job) => { // Update stats registerOrFetchInstanceDoc(host).then(i => { - Instance.update({ _id: i._id }, { - $set: { - latestRequestSentAt: new Date(), - latestStatus: 200, - lastCommunicatedAt: new Date(), - isNotResponding: false - } + Instances.update(i.id, { + latestRequestSentAt: new Date(), + latestStatus: 200, + lastCommunicatedAt: new Date(), + isNotResponding: false }); instanceChart.requestSent(i.host, true); @@ -37,12 +35,10 @@ export default async (job: Bull.Job) => { } catch (res) { // Update stats registerOrFetchInstanceDoc(host).then(i => { - Instance.update({ _id: i._id }, { - $set: { - latestRequestSentAt: new Date(), - latestStatus: res != null && res.hasOwnProperty('statusCode') ? res.statusCode : null, - isNotResponding: true - } + Instances.update(i.id, { + latestRequestSentAt: new Date(), + latestStatus: res != null && res.hasOwnProperty('statusCode') ? res.statusCode : null, + isNotResponding: true }); instanceChart.requestSent(i.host, false); diff --git a/src/queue/processors/inbox.ts b/src/queue/processors/inbox.ts index 436f3335c8..05fed0566d 100644 --- a/src/queue/processors/inbox.ts +++ b/src/queue/processors/inbox.ts @@ -1,16 +1,20 @@ import * as Bull from 'bull'; import * as httpSignature from 'http-signature'; import parseAcct from '../../misc/acct/parse'; -import User, { IRemoteUser } from '../../models/user'; +import { IRemoteUser } from '../../models/entities/user'; import perform from '../../remote/activitypub/perform'; import { resolvePerson, updatePerson } from '../../remote/activitypub/models/person'; -import { toUnicode } from 'punycode'; import { URL } from 'url'; import { publishApLogStream } from '../../services/stream'; import Logger from '../../services/logger'; import { registerOrFetchInstanceDoc } from '../../services/register-or-fetch-instance-doc'; -import Instance from '../../models/instance'; -import instanceChart from '../../services/chart/instance'; +import { Instances, Users, UserPublickeys } from '../../models'; +import { instanceChart } from '../../services/chart'; +import { UserPublickey } from '../../models/entities/user-publickey'; +import fetchMeta from '../../misc/fetch-meta'; +import { toPuny, toPunyNullable } from '../../misc/convert-host'; +import { validActor } from '../../remote/activitypub/type'; +import { ensure } from '../../prelude/ensure'; const logger = new Logger('inbox'); @@ -28,9 +32,13 @@ export default async (job: Bull.Job): Promise<void> => { const keyIdLower = signature.keyId.toLowerCase(); let user: IRemoteUser; + let key: UserPublickey; if (keyIdLower.startsWith('acct:')) { - const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); + const acct = parseAcct(keyIdLower.slice('acct:'.length)); + const host = toPunyNullable(acct.host); + const username = toPuny(acct.username); + if (host === null) { logger.warn(`request was made by local user: @${username}`); return; @@ -46,16 +54,21 @@ export default async (job: Bull.Job): Promise<void> => { // ブロックしてたら中断 // TODO: いちいちデータベースにアクセスするのはコスト高そうなのでどっかにキャッシュしておく - const instance = await Instance.findOne({ host: host.toLowerCase() }); - if (instance && instance.isBlocked) { + const meta = await fetchMeta(); + if (meta.blockedHosts.includes(host)) { logger.info(`Blocked request: ${host}`); return; } - user = await User.findOne({ usernameLower: username, host: host.toLowerCase() }) as IRemoteUser; + user = await Users.findOne({ + usernameLower: username.toLowerCase(), + host: host + }) as IRemoteUser; + + key = await UserPublickeys.findOne(user.id).then(ensure); } else { // アクティビティ内のホストの検証 - const host = toUnicode(new URL(signature.keyId).hostname.toLowerCase()); + const host = toPuny(new URL(signature.keyId).hostname); try { ValidateActivity(activity, host); } catch (e) { @@ -65,24 +78,25 @@ export default async (job: Bull.Job): Promise<void> => { // ブロックしてたら中断 // TODO: いちいちデータベースにアクセスするのはコスト高そうなのでどっかにキャッシュしておく - const instance = await Instance.findOne({ host: host.toLowerCase() }); - if (instance && instance.isBlocked) { - logger.warn(`Blocked request: ${host}`); + const meta = await fetchMeta(); + if (meta.blockedHosts.includes(host)) { + logger.info(`Blocked request: ${host}`); return; } - user = await User.findOne({ - host: { $ne: null }, - 'publicKey.id': signature.keyId - }) as IRemoteUser; + key = await UserPublickeys.findOne({ + keyId: signature.keyId + }).then(ensure); + + user = await Users.findOne(key.userId) as IRemoteUser; } // Update Person activityの場合は、ここで署名検証/更新処理まで実施して終了 if (activity.type === 'Update') { - if (activity.object && activity.object.type === 'Person') { + if (activity.object && validActor.includes(activity.object.type)) { if (user == null) { logger.warn('Update activity received, but user not registed.'); - } else if (!httpSignature.verifySignature(signature, user.publicKey.publicKeyPem)) { + } else if (!httpSignature.verifySignature(signature, key.keyPem)) { logger.warn('Update activity received, but signature verification failed.'); } else { updatePerson(activity.actor, null, activity.object); @@ -92,15 +106,15 @@ export default async (job: Bull.Job): Promise<void> => { } // アクティビティを送信してきたユーザーがまだMisskeyサーバーに登録されていなかったら登録する - if (user === null) { + if (user == null) { user = await resolvePerson(activity.actor) as IRemoteUser; } - if (user === null) { + if (user == null) { throw new Error('failed to resolve user'); } - if (!httpSignature.verifySignature(signature, user.publicKey.publicKeyPem)) { + if (!httpSignature.verifySignature(signature, key.keyPem)) { logger.error('signature verification failed'); return; } @@ -116,12 +130,10 @@ export default async (job: Bull.Job): Promise<void> => { // Update stats registerOrFetchInstanceDoc(user.host).then(i => { - Instance.update({ _id: i._id }, { - $set: { - latestRequestReceivedAt: new Date(), - lastCommunicatedAt: new Date(), - isNotResponding: false - } + Instances.update(i.id, { + latestRequestReceivedAt: new Date(), + lastCommunicatedAt: new Date(), + isNotResponding: false }); instanceChart.requestReceived(i.host); @@ -139,7 +151,7 @@ export default async (job: Bull.Job): Promise<void> => { function ValidateActivity(activity: any, host: string) { // id (if exists) if (typeof activity.id === 'string') { - const uriHost = toUnicode(new URL(activity.id).hostname.toLowerCase()); + const uriHost = toPuny(new URL(activity.id).hostname); if (host !== uriHost) { const diag = activity.signature ? '. Has LD-Signature. Forwarded?' : ''; throw new Error(`activity.id(${activity.id}) has different host(${host})${diag}`); @@ -148,7 +160,7 @@ function ValidateActivity(activity: any, host: string) { // actor (if exists) if (typeof activity.actor === 'string') { - const uriHost = toUnicode(new URL(activity.actor).hostname.toLowerCase()); + const uriHost = toPuny(new URL(activity.actor).hostname); if (host !== uriHost) throw new Error('activity.actor has different host'); } @@ -156,13 +168,13 @@ function ValidateActivity(activity: any, host: string) { if (activity.type === 'Create' && activity.object) { // object.id (if exists) if (typeof activity.object.id === 'string') { - const uriHost = toUnicode(new URL(activity.object.id).hostname.toLowerCase()); + const uriHost = toPuny(new URL(activity.object.id).hostname); if (host !== uriHost) throw new Error('activity.object.id has different host'); } // object.attributedTo (if exists) if (typeof activity.object.attributedTo === 'string') { - const uriHost = toUnicode(new URL(activity.object.attributedTo).hostname.toLowerCase()); + const uriHost = toPuny(new URL(activity.object.attributedTo).hostname); if (host !== uriHost) throw new Error('activity.object.attributedTo has different host'); } } |