From 0e4a111f81cceed275d9bec2695f6e401fb654d8 Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 12 Nov 2021 02:02:25 +0900 Subject: refactoring Resolve #7779 --- src/queue/processors/db/delete-account.ts | 94 ------------- src/queue/processors/db/delete-drive-files.ts | 56 -------- src/queue/processors/db/export-blocking.ts | 94 ------------- src/queue/processors/db/export-following.ts | 94 ------------- src/queue/processors/db/export-mute.ts | 94 ------------- src/queue/processors/db/export-notes.ts | 133 ------------------ src/queue/processors/db/export-user-lists.ts | 71 ---------- src/queue/processors/db/import-blocking.ts | 74 ---------- src/queue/processors/db/import-following.ts | 73 ---------- src/queue/processors/db/import-muting.ts | 83 ------------ src/queue/processors/db/import-user-lists.ts | 80 ----------- src/queue/processors/db/index.ts | 33 ----- src/queue/processors/deliver.ts | 94 ------------- src/queue/processors/inbox.ts | 149 --------------------- .../object-storage/clean-remote-files.ts | 50 ------- src/queue/processors/object-storage/delete-file.ts | 11 -- src/queue/processors/object-storage/index.ts | 15 --- src/queue/processors/system/index.ts | 12 -- src/queue/processors/system/resync-charts.ts | 21 --- 19 files changed, 1331 deletions(-) delete mode 100644 src/queue/processors/db/delete-account.ts delete mode 100644 src/queue/processors/db/delete-drive-files.ts delete mode 100644 src/queue/processors/db/export-blocking.ts delete mode 100644 src/queue/processors/db/export-following.ts delete mode 100644 src/queue/processors/db/export-mute.ts delete mode 100644 src/queue/processors/db/export-notes.ts delete mode 100644 src/queue/processors/db/export-user-lists.ts delete mode 100644 src/queue/processors/db/import-blocking.ts delete mode 100644 src/queue/processors/db/import-following.ts delete mode 100644 src/queue/processors/db/import-muting.ts delete mode 100644 src/queue/processors/db/import-user-lists.ts delete mode 100644 src/queue/processors/db/index.ts delete mode 100644 src/queue/processors/deliver.ts delete mode 100644 src/queue/processors/inbox.ts delete mode 100644 src/queue/processors/object-storage/clean-remote-files.ts delete mode 100644 src/queue/processors/object-storage/delete-file.ts delete mode 100644 src/queue/processors/object-storage/index.ts delete mode 100644 src/queue/processors/system/index.ts delete mode 100644 src/queue/processors/system/resync-charts.ts (limited to 'src/queue/processors') diff --git a/src/queue/processors/db/delete-account.ts b/src/queue/processors/db/delete-account.ts deleted file mode 100644 index e54f38e35e..0000000000 --- a/src/queue/processors/db/delete-account.ts +++ /dev/null @@ -1,94 +0,0 @@ -import * as Bull from 'bull'; -import { queueLogger } from '../../logger'; -import { DriveFiles, Notes, UserProfiles, Users } from '@/models/index'; -import { DbUserDeleteJobData } from '@/queue/types'; -import { Note } from '@/models/entities/note'; -import { DriveFile } from '@/models/entities/drive-file'; -import { MoreThan } from 'typeorm'; -import { deleteFileSync } from '@/services/drive/delete-file'; -import { sendEmail } from '@/services/send-email'; - -const logger = queueLogger.createSubLogger('delete-account'); - -export async function deleteAccount(job: Bull.Job): Promise { - logger.info(`Deleting account of ${job.data.user.id} ...`); - - const user = await Users.findOne(job.data.user.id); - if (user == null) { - return; - } - - { // Delete notes - let cursor: Note['id'] | null = null; - - while (true) { - const notes = await Notes.find({ - where: { - userId: user.id, - ...(cursor ? { id: MoreThan(cursor) } : {}) - }, - take: 100, - order: { - id: 1 - } - }); - - if (notes.length === 0) { - break; - } - - cursor = notes[notes.length - 1].id; - - await Notes.delete(notes.map(note => note.id)); - } - - logger.succ(`All of notes deleted`); - } - - { // Delete files - let cursor: DriveFile['id'] | null = null; - - while (true) { - const files = await DriveFiles.find({ - where: { - userId: user.id, - ...(cursor ? { id: MoreThan(cursor) } : {}) - }, - take: 10, - order: { - id: 1 - } - }); - - if (files.length === 0) { - break; - } - - cursor = files[files.length - 1].id; - - for (const file of files) { - await deleteFileSync(file); - } - } - - logger.succ(`All of files deleted`); - } - - { // Send email notification - const profile = await UserProfiles.findOneOrFail(user.id); - if (profile.email && profile.emailVerified) { - sendEmail(profile.email, 'Account deleted', - `Your account has been deleted.`, - `Your account has been deleted.`); - } - } - - // soft指定されている場合は物理削除しない - if (job.data.soft) { - // nop - } else { - await Users.delete(job.data.user.id); - } - - return 'Account deleted'; -} diff --git a/src/queue/processors/db/delete-drive-files.ts b/src/queue/processors/db/delete-drive-files.ts deleted file mode 100644 index 8a28468b0d..0000000000 --- a/src/queue/processors/db/delete-drive-files.ts +++ /dev/null @@ -1,56 +0,0 @@ -import * as Bull from 'bull'; - -import { queueLogger } from '../../logger'; -import { deleteFileSync } from '@/services/drive/delete-file'; -import { Users, DriveFiles } from '@/models/index'; -import { MoreThan } from 'typeorm'; -import { DbUserJobData } from '@/queue/types'; - -const logger = queueLogger.createSubLogger('delete-drive-files'); - -export async function deleteDriveFiles(job: Bull.Job, done: any): Promise { - logger.info(`Deleting drive files of ${job.data.user.id} ...`); - - const user = await Users.findOne(job.data.user.id); - if (user == null) { - done(); - return; - } - - let deletedCount = 0; - let cursor: any = null; - - while (true) { - const files = await DriveFiles.find({ - where: { - userId: user.id, - ...(cursor ? { id: MoreThan(cursor) } : {}) - }, - take: 100, - order: { - id: 1 - } - }); - - if (files.length === 0) { - job.progress(100); - break; - } - - cursor = files[files.length - 1].id; - - for (const file of files) { - await deleteFileSync(file); - deletedCount++; - } - - const total = await DriveFiles.count({ - userId: user.id, - }); - - job.progress(deletedCount / total); - } - - logger.succ(`All drive files (${deletedCount}) of ${user.id} has been deleted.`); - done(); -} diff --git a/src/queue/processors/db/export-blocking.ts b/src/queue/processors/db/export-blocking.ts deleted file mode 100644 index 8b8aa259d4..0000000000 --- a/src/queue/processors/db/export-blocking.ts +++ /dev/null @@ -1,94 +0,0 @@ -import * as Bull from 'bull'; -import * as tmp from 'tmp'; -import * as fs from 'fs'; - -import { queueLogger } from '../../logger'; -import addFile from '@/services/drive/add-file'; -import * as dateFormat from 'dateformat'; -import { getFullApAccount } from '@/misc/convert-host'; -import { Users, Blockings } from '@/models/index'; -import { MoreThan } from 'typeorm'; -import { DbUserJobData } from '@/queue/types'; - -const logger = queueLogger.createSubLogger('export-blocking'); - -export async function exportBlocking(job: Bull.Job, done: any): Promise { - logger.info(`Exporting blocking of ${job.data.user.id} ...`); - - const user = await Users.findOne(job.data.user.id); - if (user == null) { - done(); - return; - } - - // Create temp file - const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { - tmp.file((e, path, fd, cleanup) => { - if (e) return rej(e); - res([path, cleanup]); - }); - }); - - logger.info(`Temp file is ${path}`); - - const stream = fs.createWriteStream(path, { flags: 'a' }); - - let exportedCount = 0; - let cursor: any = null; - - while (true) { - const blockings = await Blockings.find({ - where: { - blockerId: user.id, - ...(cursor ? { id: MoreThan(cursor) } : {}) - }, - take: 100, - order: { - id: 1 - } - }); - - if (blockings.length === 0) { - job.progress(100); - break; - } - - cursor = blockings[blockings.length - 1].id; - - for (const block of blockings) { - const u = await Users.findOne({ id: block.blockeeId }); - if (u == null) { - exportedCount++; continue; - } - - const content = getFullApAccount(u.username, u.host); - await new Promise((res, rej) => { - stream.write(content + '\n', err => { - if (err) { - logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - exportedCount++; - } - - const total = await Blockings.count({ - blockerId: user.id, - }); - - job.progress(exportedCount / total); - } - - stream.end(); - logger.succ(`Exported to: ${path}`); - - const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; - const driveFile = await addFile(user, path, fileName, null, null, true); - - logger.succ(`Exported to: ${driveFile.id}`); - cleanup(); - done(); -} diff --git a/src/queue/processors/db/export-following.ts b/src/queue/processors/db/export-following.ts deleted file mode 100644 index a0ecf5f560..0000000000 --- a/src/queue/processors/db/export-following.ts +++ /dev/null @@ -1,94 +0,0 @@ -import * as Bull from 'bull'; -import * as tmp from 'tmp'; -import * as fs from 'fs'; - -import { queueLogger } from '../../logger'; -import addFile from '@/services/drive/add-file'; -import * as dateFormat from 'dateformat'; -import { getFullApAccount } from '@/misc/convert-host'; -import { Users, Followings } from '@/models/index'; -import { MoreThan } from 'typeorm'; -import { DbUserJobData } from '@/queue/types'; - -const logger = queueLogger.createSubLogger('export-following'); - -export async function exportFollowing(job: Bull.Job, done: any): Promise { - logger.info(`Exporting following of ${job.data.user.id} ...`); - - const user = await Users.findOne(job.data.user.id); - if (user == null) { - done(); - return; - } - - // Create temp file - const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { - tmp.file((e, path, fd, cleanup) => { - if (e) return rej(e); - res([path, cleanup]); - }); - }); - - logger.info(`Temp file is ${path}`); - - const stream = fs.createWriteStream(path, { flags: 'a' }); - - let exportedCount = 0; - let cursor: any = null; - - while (true) { - const followings = await Followings.find({ - where: { - followerId: user.id, - ...(cursor ? { id: MoreThan(cursor) } : {}) - }, - take: 100, - order: { - id: 1 - } - }); - - if (followings.length === 0) { - job.progress(100); - break; - } - - cursor = followings[followings.length - 1].id; - - for (const following of followings) { - const u = await Users.findOne({ id: following.followeeId }); - if (u == null) { - exportedCount++; continue; - } - - const content = getFullApAccount(u.username, u.host); - await new Promise((res, rej) => { - stream.write(content + '\n', err => { - if (err) { - logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - exportedCount++; - } - - const total = await Followings.count({ - followerId: user.id, - }); - - job.progress(exportedCount / total); - } - - stream.end(); - logger.succ(`Exported to: ${path}`); - - const fileName = 'following-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; - const driveFile = await addFile(user, path, fileName, null, null, true); - - logger.succ(`Exported to: ${driveFile.id}`); - cleanup(); - done(); -} diff --git a/src/queue/processors/db/export-mute.ts b/src/queue/processors/db/export-mute.ts deleted file mode 100644 index d5976f7d56..0000000000 --- a/src/queue/processors/db/export-mute.ts +++ /dev/null @@ -1,94 +0,0 @@ -import * as Bull from 'bull'; -import * as tmp from 'tmp'; -import * as fs from 'fs'; - -import { queueLogger } from '../../logger'; -import addFile from '@/services/drive/add-file'; -import * as dateFormat from 'dateformat'; -import { getFullApAccount } from '@/misc/convert-host'; -import { Users, Mutings } from '@/models/index'; -import { MoreThan } from 'typeorm'; -import { DbUserJobData } from '@/queue/types'; - -const logger = queueLogger.createSubLogger('export-mute'); - -export async function exportMute(job: Bull.Job, done: any): Promise { - logger.info(`Exporting mute of ${job.data.user.id} ...`); - - const user = await Users.findOne(job.data.user.id); - if (user == null) { - done(); - return; - } - - // Create temp file - const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { - tmp.file((e, path, fd, cleanup) => { - if (e) return rej(e); - res([path, cleanup]); - }); - }); - - logger.info(`Temp file is ${path}`); - - const stream = fs.createWriteStream(path, { flags: 'a' }); - - let exportedCount = 0; - let cursor: any = null; - - while (true) { - const mutes = await Mutings.find({ - where: { - muterId: user.id, - ...(cursor ? { id: MoreThan(cursor) } : {}) - }, - take: 100, - order: { - id: 1 - } - }); - - if (mutes.length === 0) { - job.progress(100); - break; - } - - cursor = mutes[mutes.length - 1].id; - - for (const mute of mutes) { - const u = await Users.findOne({ id: mute.muteeId }); - if (u == null) { - exportedCount++; continue; - } - - const content = getFullApAccount(u.username, u.host); - await new Promise((res, rej) => { - stream.write(content + '\n', err => { - if (err) { - logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - exportedCount++; - } - - const total = await Mutings.count({ - muterId: user.id, - }); - - job.progress(exportedCount / total); - } - - stream.end(); - logger.succ(`Exported to: ${path}`); - - const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; - const driveFile = await addFile(user, path, fileName, null, null, true); - - logger.succ(`Exported to: ${driveFile.id}`); - cleanup(); - done(); -} diff --git a/src/queue/processors/db/export-notes.ts b/src/queue/processors/db/export-notes.ts deleted file mode 100644 index 49850aa706..0000000000 --- a/src/queue/processors/db/export-notes.ts +++ /dev/null @@ -1,133 +0,0 @@ -import * as Bull from 'bull'; -import * as tmp from 'tmp'; -import * as fs from 'fs'; - -import { queueLogger } from '../../logger'; -import addFile from '@/services/drive/add-file'; -import * as dateFormat from 'dateformat'; -import { Users, Notes, Polls } from '@/models/index'; -import { MoreThan } from 'typeorm'; -import { Note } from '@/models/entities/note'; -import { Poll } from '@/models/entities/poll'; -import { DbUserJobData } from '@/queue/types'; - -const logger = queueLogger.createSubLogger('export-notes'); - -export async function exportNotes(job: Bull.Job, done: any): Promise { - logger.info(`Exporting notes of ${job.data.user.id} ...`); - - const user = await Users.findOne(job.data.user.id); - if (user == null) { - done(); - return; - } - - // Create temp file - const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { - tmp.file((e, path, fd, cleanup) => { - if (e) return rej(e); - res([path, cleanup]); - }); - }); - - logger.info(`Temp file is ${path}`); - - const stream = fs.createWriteStream(path, { flags: 'a' }); - - await new Promise((res, rej) => { - stream.write('[', err => { - if (err) { - logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - - let exportedNotesCount = 0; - let cursor: any = null; - - while (true) { - const notes = await Notes.find({ - where: { - userId: user.id, - ...(cursor ? { id: MoreThan(cursor) } : {}) - }, - take: 100, - order: { - id: 1 - } - }); - - if (notes.length === 0) { - job.progress(100); - break; - } - - cursor = notes[notes.length - 1].id; - - for (const note of notes) { - let poll: Poll | undefined; - if (note.hasPoll) { - poll = await Polls.findOneOrFail({ noteId: note.id }); - } - const content = JSON.stringify(serialize(note, poll)); - await new Promise((res, rej) => { - stream.write(exportedNotesCount === 0 ? content : ',\n' + content, err => { - if (err) { - logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - exportedNotesCount++; - } - - const total = await Notes.count({ - userId: user.id, - }); - - job.progress(exportedNotesCount / total); - } - - await new Promise((res, rej) => { - stream.write(']', err => { - if (err) { - logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - - stream.end(); - logger.succ(`Exported to: ${path}`); - - const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.json'; - const driveFile = await addFile(user, path, fileName, null, null, true); - - logger.succ(`Exported to: ${driveFile.id}`); - cleanup(); - done(); -} - -function serialize(note: Note, poll: Poll | null = null): any { - return { - id: note.id, - text: note.text, - createdAt: note.createdAt, - fileIds: note.fileIds, - replyId: note.replyId, - renoteId: note.renoteId, - poll: poll, - cw: note.cw, - viaMobile: note.viaMobile, - visibility: note.visibility, - visibleUserIds: note.visibleUserIds, - localOnly: note.localOnly - }; -} diff --git a/src/queue/processors/db/export-user-lists.ts b/src/queue/processors/db/export-user-lists.ts deleted file mode 100644 index 8a86c4df5d..0000000000 --- a/src/queue/processors/db/export-user-lists.ts +++ /dev/null @@ -1,71 +0,0 @@ -import * as Bull from 'bull'; -import * as tmp from 'tmp'; -import * as fs from 'fs'; - -import { queueLogger } from '../../logger'; -import addFile from '@/services/drive/add-file'; -import * as dateFormat from 'dateformat'; -import { getFullApAccount } from '@/misc/convert-host'; -import { Users, UserLists, UserListJoinings } from '@/models/index'; -import { In } from 'typeorm'; -import { DbUserJobData } from '@/queue/types'; - -const logger = queueLogger.createSubLogger('export-user-lists'); - -export async function exportUserLists(job: Bull.Job, done: any): Promise { - logger.info(`Exporting user lists of ${job.data.user.id} ...`); - - const user = await Users.findOne(job.data.user.id); - if (user == null) { - done(); - return; - } - - const lists = await UserLists.find({ - userId: user.id - }); - - // Create temp file - const [path, cleanup] = await new Promise<[string, any]>((res, rej) => { - tmp.file((e, path, fd, cleanup) => { - if (e) return rej(e); - res([path, cleanup]); - }); - }); - - logger.info(`Temp file is ${path}`); - - const stream = fs.createWriteStream(path, { flags: 'a' }); - - for (const list of lists) { - const joinings = await UserListJoinings.find({ userListId: list.id }); - const users = await Users.find({ - id: In(joinings.map(j => j.userId)) - }); - - for (const u of users) { - const acct = getFullApAccount(u.username, u.host); - const content = `${list.name},${acct}`; - await new Promise((res, rej) => { - stream.write(content + '\n', err => { - if (err) { - logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - } - } - - stream.end(); - logger.succ(`Exported to: ${path}`); - - const fileName = 'user-lists-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv'; - const driveFile = await addFile(user, path, fileName, null, null, true); - - logger.succ(`Exported to: ${driveFile.id}`); - cleanup(); - done(); -} diff --git a/src/queue/processors/db/import-blocking.ts b/src/queue/processors/db/import-blocking.ts deleted file mode 100644 index 9951da669d..0000000000 --- a/src/queue/processors/db/import-blocking.ts +++ /dev/null @@ -1,74 +0,0 @@ -import * as Bull from 'bull'; - -import { queueLogger } from '../../logger'; -import { parseAcct } from '@/misc/acct'; -import { resolveUser } from '@/remote/resolve-user'; -import { downloadTextFile } from '@/misc/download-text-file'; -import { isSelfHost, toPuny } from '@/misc/convert-host'; -import { Users, DriveFiles, Blockings } from '@/models/index'; -import { DbUserImportJobData } from '@/queue/types'; -import block from '@/services/blocking/create'; - -const logger = queueLogger.createSubLogger('import-blocking'); - -export async function importBlocking(job: Bull.Job, done: any): Promise { - logger.info(`Importing blocking of ${job.data.user.id} ...`); - - const user = await Users.findOne(job.data.user.id); - if (user == null) { - done(); - return; - } - - const file = await DriveFiles.findOne({ - id: job.data.fileId - }); - if (file == null) { - done(); - return; - } - - const csv = await downloadTextFile(file.url); - - let linenum = 0; - - for (const line of csv.trim().split('\n')) { - linenum++; - - try { - const acct = line.split(',')[0].trim(); - const { username, host } = parseAcct(acct); - - let target = isSelfHost(host!) ? await Users.findOne({ - host: null, - usernameLower: username.toLowerCase() - }) : await Users.findOne({ - host: toPuny(host!), - usernameLower: username.toLowerCase() - }); - - if (host == null && target == null) continue; - - if (target == null) { - target = await resolveUser(username, host); - } - - if (target == null) { - throw `cannot resolve user: @${username}@${host}`; - } - - // skip myself - if (target.id === job.data.user.id) continue; - - logger.info(`Block[${linenum}] ${target.id} ...`); - - await block(user, target); - } catch (e) { - logger.warn(`Error in line:${linenum} ${e}`); - } - } - - logger.succ('Imported'); - done(); -} - diff --git a/src/queue/processors/db/import-following.ts b/src/queue/processors/db/import-following.ts deleted file mode 100644 index 3d7b7ea404..0000000000 --- a/src/queue/processors/db/import-following.ts +++ /dev/null @@ -1,73 +0,0 @@ -import * as Bull from 'bull'; - -import { queueLogger } from '../../logger'; -import follow from '@/services/following/create'; -import { parseAcct } from '@/misc/acct'; -import { resolveUser } from '@/remote/resolve-user'; -import { downloadTextFile } from '@/misc/download-text-file'; -import { isSelfHost, toPuny } from '@/misc/convert-host'; -import { Users, DriveFiles } from '@/models/index'; -import { DbUserImportJobData } from '@/queue/types'; - -const logger = queueLogger.createSubLogger('import-following'); - -export async function importFollowing(job: Bull.Job, done: any): Promise { - logger.info(`Importing following of ${job.data.user.id} ...`); - - const user = await Users.findOne(job.data.user.id); - if (user == null) { - done(); - return; - } - - const file = await DriveFiles.findOne({ - id: job.data.fileId - }); - if (file == null) { - done(); - return; - } - - const csv = await downloadTextFile(file.url); - - let linenum = 0; - - for (const line of csv.trim().split('\n')) { - linenum++; - - try { - const acct = line.split(',')[0].trim(); - const { username, host } = parseAcct(acct); - - let target = isSelfHost(host!) ? await Users.findOne({ - host: null, - usernameLower: username.toLowerCase() - }) : await Users.findOne({ - host: toPuny(host!), - usernameLower: username.toLowerCase() - }); - - if (host == null && target == null) continue; - - if (target == null) { - target = await resolveUser(username, host); - } - - if (target == null) { - throw `cannot resolve user: @${username}@${host}`; - } - - // skip myself - if (target.id === job.data.user.id) continue; - - logger.info(`Follow[${linenum}] ${target.id} ...`); - - follow(user, target); - } catch (e) { - logger.warn(`Error in line:${linenum} ${e}`); - } - } - - logger.succ('Imported'); - done(); -} diff --git a/src/queue/processors/db/import-muting.ts b/src/queue/processors/db/import-muting.ts deleted file mode 100644 index 798f03a627..0000000000 --- a/src/queue/processors/db/import-muting.ts +++ /dev/null @@ -1,83 +0,0 @@ -import * as Bull from 'bull'; - -import { queueLogger } from '../../logger'; -import { parseAcct } from '@/misc/acct'; -import { resolveUser } from '@/remote/resolve-user'; -import { downloadTextFile } from '@/misc/download-text-file'; -import { isSelfHost, toPuny } from '@/misc/convert-host'; -import { Users, DriveFiles, Mutings } from '@/models/index'; -import { DbUserImportJobData } from '@/queue/types'; -import { User } from '@/models/entities/user'; -import { genId } from '@/misc/gen-id'; - -const logger = queueLogger.createSubLogger('import-muting'); - -export async function importMuting(job: Bull.Job, done: any): Promise { - logger.info(`Importing muting of ${job.data.user.id} ...`); - - const user = await Users.findOne(job.data.user.id); - if (user == null) { - done(); - return; - } - - const file = await DriveFiles.findOne({ - id: job.data.fileId - }); - if (file == null) { - done(); - return; - } - - const csv = await downloadTextFile(file.url); - - let linenum = 0; - - for (const line of csv.trim().split('\n')) { - linenum++; - - try { - const acct = line.split(',')[0].trim(); - const { username, host } = parseAcct(acct); - - let target = isSelfHost(host!) ? await Users.findOne({ - host: null, - usernameLower: username.toLowerCase() - }) : await Users.findOne({ - host: toPuny(host!), - usernameLower: username.toLowerCase() - }); - - if (host == null && target == null) continue; - - if (target == null) { - target = await resolveUser(username, host); - } - - if (target == null) { - throw `cannot resolve user: @${username}@${host}`; - } - - // skip myself - if (target.id === job.data.user.id) continue; - - logger.info(`Mute[${linenum}] ${target.id} ...`); - - await mute(user, target); - } catch (e) { - logger.warn(`Error in line:${linenum} ${e}`); - } - } - - logger.succ('Imported'); - done(); -} - -async function mute(user: User, target: User) { - await Mutings.insert({ - id: genId(), - createdAt: new Date(), - muterId: user.id, - muteeId: target.id, - }); -} diff --git a/src/queue/processors/db/import-user-lists.ts b/src/queue/processors/db/import-user-lists.ts deleted file mode 100644 index 3b8c13262a..0000000000 --- a/src/queue/processors/db/import-user-lists.ts +++ /dev/null @@ -1,80 +0,0 @@ -import * as Bull from 'bull'; - -import { queueLogger } from '../../logger'; -import { parseAcct } from '@/misc/acct'; -import { resolveUser } from '@/remote/resolve-user'; -import { pushUserToUserList } from '@/services/user-list/push'; -import { downloadTextFile } from '@/misc/download-text-file'; -import { isSelfHost, toPuny } from '@/misc/convert-host'; -import { DriveFiles, Users, UserLists, UserListJoinings } from '@/models/index'; -import { genId } from '@/misc/gen-id'; -import { DbUserImportJobData } from '@/queue/types'; - -const logger = queueLogger.createSubLogger('import-user-lists'); - -export async function importUserLists(job: Bull.Job, done: any): Promise { - logger.info(`Importing user lists of ${job.data.user.id} ...`); - - const user = await Users.findOne(job.data.user.id); - if (user == null) { - done(); - return; - } - - const file = await DriveFiles.findOne({ - id: job.data.fileId - }); - if (file == null) { - done(); - return; - } - - const csv = await downloadTextFile(file.url); - - let linenum = 0; - - for (const line of csv.trim().split('\n')) { - linenum++; - - try { - const listName = line.split(',')[0].trim(); - const { username, host } = parseAcct(line.split(',')[1].trim()); - - let list = await UserLists.findOne({ - userId: user.id, - name: listName - }); - - if (list == null) { - list = await UserLists.save({ - id: genId(), - createdAt: new Date(), - userId: user.id, - name: listName, - userIds: [] - }); - } - - let target = isSelfHost(host!) ? await Users.findOne({ - host: null, - usernameLower: username.toLowerCase() - }) : await Users.findOne({ - host: toPuny(host!), - usernameLower: username.toLowerCase() - }); - - if (target == null) { - target = await resolveUser(username, host); - } - - if (await UserListJoinings.findOne({ userListId: list.id, userId: target.id }) != null) continue; - - pushUserToUserList(target, list); - } catch (e) { - logger.warn(`Error in line:${linenum} ${e}`); - } - } - - logger.succ('Imported'); - done(); -} diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts deleted file mode 100644 index 97087642b7..0000000000 --- a/src/queue/processors/db/index.ts +++ /dev/null @@ -1,33 +0,0 @@ -import * as Bull from 'bull'; -import { DbJobData } from '@/queue/types'; -import { deleteDriveFiles } from './delete-drive-files'; -import { exportNotes } from './export-notes'; -import { exportFollowing } from './export-following'; -import { exportMute } from './export-mute'; -import { exportBlocking } from './export-blocking'; -import { exportUserLists } from './export-user-lists'; -import { importFollowing } from './import-following'; -import { importUserLists } from './import-user-lists'; -import { deleteAccount } from './delete-account'; -import { importMuting } from './import-muting'; -import { importBlocking } from './import-blocking'; - -const jobs = { - deleteDriveFiles, - exportNotes, - exportFollowing, - exportMute, - exportBlocking, - exportUserLists, - importFollowing, - importMuting, - importBlocking, - importUserLists, - deleteAccount, -} as Record | Bull.ProcessPromiseFunction>; - -export default function(dbQueue: Bull.Queue) { - for (const [k, v] of Object.entries(jobs)) { - dbQueue.process(k, v); - } -} diff --git a/src/queue/processors/deliver.ts b/src/queue/processors/deliver.ts deleted file mode 100644 index 3c61896a2f..0000000000 --- a/src/queue/processors/deliver.ts +++ /dev/null @@ -1,94 +0,0 @@ -import { URL } from 'url'; -import * as Bull from 'bull'; -import request from '@/remote/activitypub/request'; -import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc'; -import Logger from '@/services/logger'; -import { Instances } from '@/models/index'; -import { instanceChart } from '@/services/chart/index'; -import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata'; -import { fetchMeta } from '@/misc/fetch-meta'; -import { toPuny } from '@/misc/convert-host'; -import { Cache } from '@/misc/cache'; -import { Instance } from '@/models/entities/instance'; -import { DeliverJobData } from '../types'; -import { StatusError } from '@/misc/fetch'; - -const logger = new Logger('deliver'); - -let latest: string | null = null; - -const suspendedHostsCache = new Cache(1000 * 60 * 60); - -export default async (job: Bull.Job) => { - const { host } = new URL(job.data.to); - - // ブロックしてたら中断 - const meta = await fetchMeta(); - if (meta.blockedHosts.includes(toPuny(host))) { - return 'skip (blocked)'; - } - - // isSuspendedなら中断 - let suspendedHosts = suspendedHostsCache.get(null); - if (suspendedHosts == null) { - suspendedHosts = await Instances.find({ - where: { - isSuspended: true - }, - }); - suspendedHostsCache.set(null, suspendedHosts); - } - if (suspendedHosts.map(x => x.host).includes(toPuny(host))) { - return 'skip (suspended)'; - } - - try { - if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) { - logger.debug(`delivering ${latest}`); - } - - await request(job.data.user, job.data.to, job.data.content); - - // Update stats - registerOrFetchInstanceDoc(host).then(i => { - Instances.update(i.id, { - latestRequestSentAt: new Date(), - latestStatus: 200, - lastCommunicatedAt: new Date(), - isNotResponding: false - }); - - fetchInstanceMetadata(i); - - instanceChart.requestSent(i.host, true); - }); - - return 'Success'; - } catch (res) { - // Update stats - registerOrFetchInstanceDoc(host).then(i => { - Instances.update(i.id, { - latestRequestSentAt: new Date(), - latestStatus: res instanceof StatusError ? res.statusCode : null, - isNotResponding: true - }); - - instanceChart.requestSent(i.host, false); - }); - - if (res instanceof StatusError) { - // 4xx - if (res.isClientError) { - // HTTPステータスコード4xxはクライアントエラーであり、それはつまり - // 何回再送しても成功することはないということなのでエラーにはしないでおく - return `${res.statusCode} ${res.statusMessage}`; - } - - // 5xx etc. - throw `${res.statusCode} ${res.statusMessage}`; - } else { - // DNS error, socket error, timeout ... - throw res; - } - } -}; diff --git a/src/queue/processors/inbox.ts b/src/queue/processors/inbox.ts deleted file mode 100644 index 4032ce8653..0000000000 --- a/src/queue/processors/inbox.ts +++ /dev/null @@ -1,149 +0,0 @@ -import { URL } from 'url'; -import * as Bull from 'bull'; -import * as httpSignature from 'http-signature'; -import perform from '@/remote/activitypub/perform'; -import Logger from '@/services/logger'; -import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc'; -import { Instances } from '@/models/index'; -import { instanceChart } from '@/services/chart/index'; -import { fetchMeta } from '@/misc/fetch-meta'; -import { toPuny, extractDbHost } from '@/misc/convert-host'; -import { getApId } from '@/remote/activitypub/type'; -import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata'; -import { InboxJobData } from '../types'; -import DbResolver from '@/remote/activitypub/db-resolver'; -import { resolvePerson } from '@/remote/activitypub/models/person'; -import { LdSignature } from '@/remote/activitypub/misc/ld-signature'; -import { StatusError } from '@/misc/fetch'; - -const logger = new Logger('inbox'); - -// ユーザーのinboxにアクティビティが届いた時の処理 -export default async (job: Bull.Job): Promise => { - const signature = job.data.signature; // HTTP-signature - const activity = job.data.activity; - - //#region Log - const info = Object.assign({}, activity) as any; - delete info['@context']; - logger.debug(JSON.stringify(info, null, 2)); - //#endregion - - const host = toPuny(new URL(signature.keyId).hostname); - - // ブロックしてたら中断 - const meta = await fetchMeta(); - if (meta.blockedHosts.includes(host)) { - return `Blocked request: ${host}`; - } - - const keyIdLower = signature.keyId.toLowerCase(); - if (keyIdLower.startsWith('acct:')) { - return `Old keyId is no longer supported. ${keyIdLower}`; - } - - // TDOO: キャッシュ - const dbResolver = new DbResolver(); - - // HTTP-Signature keyIdを元にDBから取得 - let authUser = await dbResolver.getAuthUserFromKeyId(signature.keyId); - - // keyIdでわからなければ、activity.actorを元にDBから取得 || activity.actorを元にリモートから取得 - if (authUser == null) { - try { - authUser = await dbResolver.getAuthUserFromApId(getApId(activity.actor)); - } catch (e) { - // 対象が4xxならスキップ - if (e instanceof StatusError && e.isClientError) { - return `skip: Ignored deleted actors on both ends ${activity.actor} - ${e.statusCode}`; - } - throw `Error in actor ${activity.actor} - ${e.statusCode || e}`; - } - } - - // それでもわからなければ終了 - if (authUser == null) { - return `skip: failed to resolve user`; - } - - // publicKey がなくても終了 - if (authUser.key == null) { - return `skip: failed to resolve user publicKey`; - } - - // HTTP-Signatureの検証 - const httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem); - - // また、signatureのsignerは、activity.actorと一致する必要がある - if (!httpSignatureValidated || authUser.user.uri !== activity.actor) { - // 一致しなくても、でもLD-Signatureがありそうならそっちも見る - if (activity.signature) { - if (activity.signature.type !== 'RsaSignature2017') { - return `skip: unsupported LD-signature type ${activity.signature.type}`; - } - - // activity.signature.creator: https://example.oom/users/user#main-key - // みたいになっててUserを引っ張れば公開キーも入ることを期待する - if (activity.signature.creator) { - const candicate = activity.signature.creator.replace(/#.*/, ''); - await resolvePerson(candicate).catch(() => null); - } - - // keyIdからLD-Signatureのユーザーを取得 - authUser = await dbResolver.getAuthUserFromKeyId(activity.signature.creator); - if (authUser == null) { - return `skip: LD-Signatureのユーザーが取得できませんでした`; - } - - if (authUser.key == null) { - return `skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした`; - } - - // LD-Signature検証 - const ldSignature = new LdSignature(); - const verified = await ldSignature.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false); - if (!verified) { - return `skip: LD-Signatureの検証に失敗しました`; - } - - // もう一度actorチェック - if (authUser.user.uri !== activity.actor) { - return `skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`; - } - - // ブロックしてたら中断 - const ldHost = extractDbHost(authUser.user.uri); - if (meta.blockedHosts.includes(ldHost)) { - return `Blocked request: ${ldHost}`; - } - } else { - return `skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`; - } - } - - // activity.idがあればホストが署名者のホストであることを確認する - if (typeof activity.id === 'string') { - const signerHost = extractDbHost(authUser.user.uri!); - const activityIdHost = extractDbHost(activity.id); - if (signerHost !== activityIdHost) { - return `skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`; - } - } - - // Update stats - registerOrFetchInstanceDoc(authUser.user.host).then(i => { - Instances.update(i.id, { - latestRequestReceivedAt: new Date(), - lastCommunicatedAt: new Date(), - isNotResponding: false - }); - - fetchInstanceMetadata(i); - - instanceChart.requestReceived(i.host); - }); - - // アクティビティを処理 - await perform(authUser.user, activity); - return `ok`; -}; diff --git a/src/queue/processors/object-storage/clean-remote-files.ts b/src/queue/processors/object-storage/clean-remote-files.ts deleted file mode 100644 index 3b2e4ea939..0000000000 --- a/src/queue/processors/object-storage/clean-remote-files.ts +++ /dev/null @@ -1,50 +0,0 @@ -import * as Bull from 'bull'; - -import { queueLogger } from '../../logger'; -import { deleteFileSync } from '@/services/drive/delete-file'; -import { DriveFiles } from '@/models/index'; -import { MoreThan, Not, IsNull } from 'typeorm'; - -const logger = queueLogger.createSubLogger('clean-remote-files'); - -export default async function cleanRemoteFiles(job: Bull.Job<{}>, done: any): Promise { - logger.info(`Deleting cached remote files...`); - - let deletedCount = 0; - let cursor: any = null; - - while (true) { - const files = await DriveFiles.find({ - where: { - userHost: Not(IsNull()), - isLink: false, - ...(cursor ? { id: MoreThan(cursor) } : {}) - }, - take: 8, - order: { - id: 1 - } - }); - - if (files.length === 0) { - job.progress(100); - break; - } - - cursor = files[files.length - 1].id; - - await Promise.all(files.map(file => deleteFileSync(file, true))); - - deletedCount += 8; - - const total = await DriveFiles.count({ - userHost: Not(IsNull()), - isLink: false, - }); - - job.progress(deletedCount / total); - } - - logger.succ(`All cahced remote files has been deleted.`); - done(); -} diff --git a/src/queue/processors/object-storage/delete-file.ts b/src/queue/processors/object-storage/delete-file.ts deleted file mode 100644 index ed22968a27..0000000000 --- a/src/queue/processors/object-storage/delete-file.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { ObjectStorageFileJobData } from '@/queue/types'; -import * as Bull from 'bull'; -import { deleteObjectStorageFile } from '@/services/drive/delete-file'; - -export default async (job: Bull.Job) => { - const key: string = job.data.key; - - await deleteObjectStorageFile(key); - - return 'Success'; -}; diff --git a/src/queue/processors/object-storage/index.ts b/src/queue/processors/object-storage/index.ts deleted file mode 100644 index 0d9570e179..0000000000 --- a/src/queue/processors/object-storage/index.ts +++ /dev/null @@ -1,15 +0,0 @@ -import * as Bull from 'bull'; -import { ObjectStorageJobData } from '@/queue/types'; -import deleteFile from './delete-file'; -import cleanRemoteFiles from './clean-remote-files'; - -const jobs = { - deleteFile, - cleanRemoteFiles, -} as Record | Bull.ProcessPromiseFunction>; - -export default function(q: Bull.Queue) { - for (const [k, v] of Object.entries(jobs)) { - q.process(k, 16, v); - } -} diff --git a/src/queue/processors/system/index.ts b/src/queue/processors/system/index.ts deleted file mode 100644 index 52b7868105..0000000000 --- a/src/queue/processors/system/index.ts +++ /dev/null @@ -1,12 +0,0 @@ -import * as Bull from 'bull'; -import { resyncCharts } from './resync-charts'; - -const jobs = { - resyncCharts, -} as Record | Bull.ProcessPromiseFunction<{}>>; - -export default function(dbQueue: Bull.Queue<{}>) { - for (const [k, v] of Object.entries(jobs)) { - dbQueue.process(k, v); - } -} diff --git a/src/queue/processors/system/resync-charts.ts b/src/queue/processors/system/resync-charts.ts deleted file mode 100644 index b36b024cfb..0000000000 --- a/src/queue/processors/system/resync-charts.ts +++ /dev/null @@ -1,21 +0,0 @@ -import * as Bull from 'bull'; - -import { queueLogger } from '../../logger'; -import { driveChart, notesChart, usersChart } from '@/services/chart/index'; - -const logger = queueLogger.createSubLogger('resync-charts'); - -export default async function resyncCharts(job: Bull.Job<{}>, done: any): Promise { - logger.info(`Resync charts...`); - - // TODO: ユーザーごとのチャートも更新する - // TODO: インスタンスごとのチャートも更新する - await Promise.all([ - driveChart.resync(), - notesChart.resync(), - usersChart.resync(), - ]); - - logger.succ(`All charts successfully resynced.`); - done(); -} -- cgit v1.2.3-freya