summaryrefslogtreecommitdiff
path: root/src/queue/processors
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2021-11-12 02:02:25 +0900
committersyuilo <Syuilotan@yahoo.co.jp>2021-11-12 02:02:25 +0900
commit0e4a111f81cceed275d9bec2695f6e401fb654d8 (patch)
tree40874799472fa07416f17b50a398ac33b7771905 /src/queue/processors
parentupdate deps (diff)
downloadsharkey-0e4a111f81cceed275d9bec2695f6e401fb654d8.tar.gz
sharkey-0e4a111f81cceed275d9bec2695f6e401fb654d8.tar.bz2
sharkey-0e4a111f81cceed275d9bec2695f6e401fb654d8.zip
refactoring
Resolve #7779
Diffstat (limited to 'src/queue/processors')
-rw-r--r--src/queue/processors/db/delete-account.ts94
-rw-r--r--src/queue/processors/db/delete-drive-files.ts56
-rw-r--r--src/queue/processors/db/export-blocking.ts94
-rw-r--r--src/queue/processors/db/export-following.ts94
-rw-r--r--src/queue/processors/db/export-mute.ts94
-rw-r--r--src/queue/processors/db/export-notes.ts133
-rw-r--r--src/queue/processors/db/export-user-lists.ts71
-rw-r--r--src/queue/processors/db/import-blocking.ts74
-rw-r--r--src/queue/processors/db/import-following.ts73
-rw-r--r--src/queue/processors/db/import-muting.ts83
-rw-r--r--src/queue/processors/db/import-user-lists.ts80
-rw-r--r--src/queue/processors/db/index.ts33
-rw-r--r--src/queue/processors/deliver.ts94
-rw-r--r--src/queue/processors/inbox.ts149
-rw-r--r--src/queue/processors/object-storage/clean-remote-files.ts50
-rw-r--r--src/queue/processors/object-storage/delete-file.ts11
-rw-r--r--src/queue/processors/object-storage/index.ts15
-rw-r--r--src/queue/processors/system/index.ts12
-rw-r--r--src/queue/processors/system/resync-charts.ts21
19 files changed, 0 insertions, 1331 deletions
diff --git a/src/queue/processors/db/delete-account.ts b/src/queue/processors/db/delete-account.ts
deleted file mode 100644
index e54f38e35e..0000000000
--- a/src/queue/processors/db/delete-account.ts
+++ /dev/null
@@ -1,94 +0,0 @@
-import * as Bull from 'bull';
-import { queueLogger } from '../../logger';
-import { DriveFiles, Notes, UserProfiles, Users } from '@/models/index';
-import { DbUserDeleteJobData } from '@/queue/types';
-import { Note } from '@/models/entities/note';
-import { DriveFile } from '@/models/entities/drive-file';
-import { MoreThan } from 'typeorm';
-import { deleteFileSync } from '@/services/drive/delete-file';
-import { sendEmail } from '@/services/send-email';
-
-const logger = queueLogger.createSubLogger('delete-account');
-
-export async function deleteAccount(job: Bull.Job<DbUserDeleteJobData>): Promise<string | void> {
- logger.info(`Deleting account of ${job.data.user.id} ...`);
-
- const user = await Users.findOne(job.data.user.id);
- if (user == null) {
- return;
- }
-
- { // Delete notes
- let cursor: Note['id'] | null = null;
-
- while (true) {
- const notes = await Notes.find({
- where: {
- userId: user.id,
- ...(cursor ? { id: MoreThan(cursor) } : {})
- },
- take: 100,
- order: {
- id: 1
- }
- });
-
- if (notes.length === 0) {
- break;
- }
-
- cursor = notes[notes.length - 1].id;
-
- await Notes.delete(notes.map(note => note.id));
- }
-
- logger.succ(`All of notes deleted`);
- }
-
- { // Delete files
- let cursor: DriveFile['id'] | null = null;
-
- while (true) {
- const files = await DriveFiles.find({
- where: {
- userId: user.id,
- ...(cursor ? { id: MoreThan(cursor) } : {})
- },
- take: 10,
- order: {
- id: 1
- }
- });
-
- if (files.length === 0) {
- break;
- }
-
- cursor = files[files.length - 1].id;
-
- for (const file of files) {
- await deleteFileSync(file);
- }
- }
-
- logger.succ(`All of files deleted`);
- }
-
- { // Send email notification
- const profile = await UserProfiles.findOneOrFail(user.id);
- if (profile.email && profile.emailVerified) {
- sendEmail(profile.email, 'Account deleted',
- `Your account has been deleted.`,
- `Your account has been deleted.`);
- }
- }
-
- // soft指定されている場合は物理削除しない
- if (job.data.soft) {
- // nop
- } else {
- await Users.delete(job.data.user.id);
- }
-
- return 'Account deleted';
-}
diff --git a/src/queue/processors/db/delete-drive-files.ts b/src/queue/processors/db/delete-drive-files.ts
deleted file mode 100644
index 8a28468b0d..0000000000
--- a/src/queue/processors/db/delete-drive-files.ts
+++ /dev/null
@@ -1,56 +0,0 @@
-import * as Bull from 'bull';
-
-import { queueLogger } from '../../logger';
-import { deleteFileSync } from '@/services/drive/delete-file';
-import { Users, DriveFiles } from '@/models/index';
-import { MoreThan } from 'typeorm';
-import { DbUserJobData } from '@/queue/types';
-
-const logger = queueLogger.createSubLogger('delete-drive-files');
-
-export async function deleteDriveFiles(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
- logger.info(`Deleting drive files of ${job.data.user.id} ...`);
-
- const user = await Users.findOne(job.data.user.id);
- if (user == null) {
- done();
- return;
- }
-
- let deletedCount = 0;
- let cursor: any = null;
-
- while (true) {
- const files = await DriveFiles.find({
- where: {
- userId: user.id,
- ...(cursor ? { id: MoreThan(cursor) } : {})
- },
- take: 100,
- order: {
- id: 1
- }
- });
-
- if (files.length === 0) {
- job.progress(100);
- break;
- }
-
- cursor = files[files.length - 1].id;
-
- for (const file of files) {
- await deleteFileSync(file);
- deletedCount++;
- }
-
- const total = await DriveFiles.count({
- userId: user.id,
- });
-
- job.progress(deletedCount / total);
- }
-
- logger.succ(`All drive files (${deletedCount}) of ${user.id} has been deleted.`);
- done();
-}
diff --git a/src/queue/processors/db/export-blocking.ts b/src/queue/processors/db/export-blocking.ts
deleted file mode 100644
index 8b8aa259d4..0000000000
--- a/src/queue/processors/db/export-blocking.ts
+++ /dev/null
@@ -1,94 +0,0 @@
-import * as Bull from 'bull';
-import * as tmp from 'tmp';
-import * as fs from 'fs';
-
-import { queueLogger } from '../../logger';
-import addFile from '@/services/drive/add-file';
-import * as dateFormat from 'dateformat';
-import { getFullApAccount } from '@/misc/convert-host';
-import { Users, Blockings } from '@/models/index';
-import { MoreThan } from 'typeorm';
-import { DbUserJobData } from '@/queue/types';
-
-const logger = queueLogger.createSubLogger('export-blocking');
-
-export async function exportBlocking(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
- logger.info(`Exporting blocking of ${job.data.user.id} ...`);
-
- const user = await Users.findOne(job.data.user.id);
- if (user == null) {
- done();
- return;
- }
-
- // Create temp file
- const [path, cleanup] = await new Promise<[string, any]>((res, rej) => {
- tmp.file((e, path, fd, cleanup) => {
- if (e) return rej(e);
- res([path, cleanup]);
- });
- });
-
- logger.info(`Temp file is ${path}`);
-
- const stream = fs.createWriteStream(path, { flags: 'a' });
-
- let exportedCount = 0;
- let cursor: any = null;
-
- while (true) {
- const blockings = await Blockings.find({
- where: {
- blockerId: user.id,
- ...(cursor ? { id: MoreThan(cursor) } : {})
- },
- take: 100,
- order: {
- id: 1
- }
- });
-
- if (blockings.length === 0) {
- job.progress(100);
- break;
- }
-
- cursor = blockings[blockings.length - 1].id;
-
- for (const block of blockings) {
- const u = await Users.findOne({ id: block.blockeeId });
- if (u == null) {
- exportedCount++; continue;
- }
-
- const content = getFullApAccount(u.username, u.host);
- await new Promise<void>((res, rej) => {
- stream.write(content + '\n', err => {
- if (err) {
- logger.error(err);
- rej(err);
- } else {
- res();
- }
- });
- });
- exportedCount++;
- }
-
- const total = await Blockings.count({
- blockerId: user.id,
- });
-
- job.progress(exportedCount / total);
- }
-
- stream.end();
- logger.succ(`Exported to: ${path}`);
-
- const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv';
- const driveFile = await addFile(user, path, fileName, null, null, true);
-
- logger.succ(`Exported to: ${driveFile.id}`);
- cleanup();
- done();
-}
diff --git a/src/queue/processors/db/export-following.ts b/src/queue/processors/db/export-following.ts
deleted file mode 100644
index a0ecf5f560..0000000000
--- a/src/queue/processors/db/export-following.ts
+++ /dev/null
@@ -1,94 +0,0 @@
-import * as Bull from 'bull';
-import * as tmp from 'tmp';
-import * as fs from 'fs';
-
-import { queueLogger } from '../../logger';
-import addFile from '@/services/drive/add-file';
-import * as dateFormat from 'dateformat';
-import { getFullApAccount } from '@/misc/convert-host';
-import { Users, Followings } from '@/models/index';
-import { MoreThan } from 'typeorm';
-import { DbUserJobData } from '@/queue/types';
-
-const logger = queueLogger.createSubLogger('export-following');
-
-export async function exportFollowing(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
- logger.info(`Exporting following of ${job.data.user.id} ...`);
-
- const user = await Users.findOne(job.data.user.id);
- if (user == null) {
- done();
- return;
- }
-
- // Create temp file
- const [path, cleanup] = await new Promise<[string, any]>((res, rej) => {
- tmp.file((e, path, fd, cleanup) => {
- if (e) return rej(e);
- res([path, cleanup]);
- });
- });
-
- logger.info(`Temp file is ${path}`);
-
- const stream = fs.createWriteStream(path, { flags: 'a' });
-
- let exportedCount = 0;
- let cursor: any = null;
-
- while (true) {
- const followings = await Followings.find({
- where: {
- followerId: user.id,
- ...(cursor ? { id: MoreThan(cursor) } : {})
- },
- take: 100,
- order: {
- id: 1
- }
- });
-
- if (followings.length === 0) {
- job.progress(100);
- break;
- }
-
- cursor = followings[followings.length - 1].id;
-
- for (const following of followings) {
- const u = await Users.findOne({ id: following.followeeId });
- if (u == null) {
- exportedCount++; continue;
- }
-
- const content = getFullApAccount(u.username, u.host);
- await new Promise<void>((res, rej) => {
- stream.write(content + '\n', err => {
- if (err) {
- logger.error(err);
- rej(err);
- } else {
- res();
- }
- });
- });
- exportedCount++;
- }
-
- const total = await Followings.count({
- followerId: user.id,
- });
-
- job.progress(exportedCount / total);
- }
-
- stream.end();
- logger.succ(`Exported to: ${path}`);
-
- const fileName = 'following-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv';
- const driveFile = await addFile(user, path, fileName, null, null, true);
-
- logger.succ(`Exported to: ${driveFile.id}`);
- cleanup();
- done();
-}
diff --git a/src/queue/processors/db/export-mute.ts b/src/queue/processors/db/export-mute.ts
deleted file mode 100644
index d5976f7d56..0000000000
--- a/src/queue/processors/db/export-mute.ts
+++ /dev/null
@@ -1,94 +0,0 @@
-import * as Bull from 'bull';
-import * as tmp from 'tmp';
-import * as fs from 'fs';
-
-import { queueLogger } from '../../logger';
-import addFile from '@/services/drive/add-file';
-import * as dateFormat from 'dateformat';
-import { getFullApAccount } from '@/misc/convert-host';
-import { Users, Mutings } from '@/models/index';
-import { MoreThan } from 'typeorm';
-import { DbUserJobData } from '@/queue/types';
-
-const logger = queueLogger.createSubLogger('export-mute');
-
-export async function exportMute(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
- logger.info(`Exporting mute of ${job.data.user.id} ...`);
-
- const user = await Users.findOne(job.data.user.id);
- if (user == null) {
- done();
- return;
- }
-
- // Create temp file
- const [path, cleanup] = await new Promise<[string, any]>((res, rej) => {
- tmp.file((e, path, fd, cleanup) => {
- if (e) return rej(e);
- res([path, cleanup]);
- });
- });
-
- logger.info(`Temp file is ${path}`);
-
- const stream = fs.createWriteStream(path, { flags: 'a' });
-
- let exportedCount = 0;
- let cursor: any = null;
-
- while (true) {
- const mutes = await Mutings.find({
- where: {
- muterId: user.id,
- ...(cursor ? { id: MoreThan(cursor) } : {})
- },
- take: 100,
- order: {
- id: 1
- }
- });
-
- if (mutes.length === 0) {
- job.progress(100);
- break;
- }
-
- cursor = mutes[mutes.length - 1].id;
-
- for (const mute of mutes) {
- const u = await Users.findOne({ id: mute.muteeId });
- if (u == null) {
- exportedCount++; continue;
- }
-
- const content = getFullApAccount(u.username, u.host);
- await new Promise<void>((res, rej) => {
- stream.write(content + '\n', err => {
- if (err) {
- logger.error(err);
- rej(err);
- } else {
- res();
- }
- });
- });
- exportedCount++;
- }
-
- const total = await Mutings.count({
- muterId: user.id,
- });
-
- job.progress(exportedCount / total);
- }
-
- stream.end();
- logger.succ(`Exported to: ${path}`);
-
- const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv';
- const driveFile = await addFile(user, path, fileName, null, null, true);
-
- logger.succ(`Exported to: ${driveFile.id}`);
- cleanup();
- done();
-}
diff --git a/src/queue/processors/db/export-notes.ts b/src/queue/processors/db/export-notes.ts
deleted file mode 100644
index 49850aa706..0000000000
--- a/src/queue/processors/db/export-notes.ts
+++ /dev/null
@@ -1,133 +0,0 @@
-import * as Bull from 'bull';
-import * as tmp from 'tmp';
-import * as fs from 'fs';
-
-import { queueLogger } from '../../logger';
-import addFile from '@/services/drive/add-file';
-import * as dateFormat from 'dateformat';
-import { Users, Notes, Polls } from '@/models/index';
-import { MoreThan } from 'typeorm';
-import { Note } from '@/models/entities/note';
-import { Poll } from '@/models/entities/poll';
-import { DbUserJobData } from '@/queue/types';
-
-const logger = queueLogger.createSubLogger('export-notes');
-
-export async function exportNotes(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
- logger.info(`Exporting notes of ${job.data.user.id} ...`);
-
- const user = await Users.findOne(job.data.user.id);
- if (user == null) {
- done();
- return;
- }
-
- // Create temp file
- const [path, cleanup] = await new Promise<[string, any]>((res, rej) => {
- tmp.file((e, path, fd, cleanup) => {
- if (e) return rej(e);
- res([path, cleanup]);
- });
- });
-
- logger.info(`Temp file is ${path}`);
-
- const stream = fs.createWriteStream(path, { flags: 'a' });
-
- await new Promise<void>((res, rej) => {
- stream.write('[', err => {
- if (err) {
- logger.error(err);
- rej(err);
- } else {
- res();
- }
- });
- });
-
- let exportedNotesCount = 0;
- let cursor: any = null;
-
- while (true) {
- const notes = await Notes.find({
- where: {
- userId: user.id,
- ...(cursor ? { id: MoreThan(cursor) } : {})
- },
- take: 100,
- order: {
- id: 1
- }
- });
-
- if (notes.length === 0) {
- job.progress(100);
- break;
- }
-
- cursor = notes[notes.length - 1].id;
-
- for (const note of notes) {
- let poll: Poll | undefined;
- if (note.hasPoll) {
- poll = await Polls.findOneOrFail({ noteId: note.id });
- }
- const content = JSON.stringify(serialize(note, poll));
- await new Promise<void>((res, rej) => {
- stream.write(exportedNotesCount === 0 ? content : ',\n' + content, err => {
- if (err) {
- logger.error(err);
- rej(err);
- } else {
- res();
- }
- });
- });
- exportedNotesCount++;
- }
-
- const total = await Notes.count({
- userId: user.id,
- });
-
- job.progress(exportedNotesCount / total);
- }
-
- await new Promise<void>((res, rej) => {
- stream.write(']', err => {
- if (err) {
- logger.error(err);
- rej(err);
- } else {
- res();
- }
- });
- });
-
- stream.end();
- logger.succ(`Exported to: ${path}`);
-
- const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.json';
- const driveFile = await addFile(user, path, fileName, null, null, true);
-
- logger.succ(`Exported to: ${driveFile.id}`);
- cleanup();
- done();
-}
-
-function serialize(note: Note, poll: Poll | null = null): any {
- return {
- id: note.id,
- text: note.text,
- createdAt: note.createdAt,
- fileIds: note.fileIds,
- replyId: note.replyId,
- renoteId: note.renoteId,
- poll: poll,
- cw: note.cw,
- viaMobile: note.viaMobile,
- visibility: note.visibility,
- visibleUserIds: note.visibleUserIds,
- localOnly: note.localOnly
- };
-}
diff --git a/src/queue/processors/db/export-user-lists.ts b/src/queue/processors/db/export-user-lists.ts
deleted file mode 100644
index 8a86c4df5d..0000000000
--- a/src/queue/processors/db/export-user-lists.ts
+++ /dev/null
@@ -1,71 +0,0 @@
-import * as Bull from 'bull';
-import * as tmp from 'tmp';
-import * as fs from 'fs';
-
-import { queueLogger } from '../../logger';
-import addFile from '@/services/drive/add-file';
-import * as dateFormat from 'dateformat';
-import { getFullApAccount } from '@/misc/convert-host';
-import { Users, UserLists, UserListJoinings } from '@/models/index';
-import { In } from 'typeorm';
-import { DbUserJobData } from '@/queue/types';
-
-const logger = queueLogger.createSubLogger('export-user-lists');
-
-export async function exportUserLists(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
- logger.info(`Exporting user lists of ${job.data.user.id} ...`);
-
- const user = await Users.findOne(job.data.user.id);
- if (user == null) {
- done();
- return;
- }
-
- const lists = await UserLists.find({
- userId: user.id
- });
-
- // Create temp file
- const [path, cleanup] = await new Promise<[string, any]>((res, rej) => {
- tmp.file((e, path, fd, cleanup) => {
- if (e) return rej(e);
- res([path, cleanup]);
- });
- });
-
- logger.info(`Temp file is ${path}`);
-
- const stream = fs.createWriteStream(path, { flags: 'a' });
-
- for (const list of lists) {
- const joinings = await UserListJoinings.find({ userListId: list.id });
- const users = await Users.find({
- id: In(joinings.map(j => j.userId))
- });
-
- for (const u of users) {
- const acct = getFullApAccount(u.username, u.host);
- const content = `${list.name},${acct}`;
- await new Promise<void>((res, rej) => {
- stream.write(content + '\n', err => {
- if (err) {
- logger.error(err);
- rej(err);
- } else {
- res();
- }
- });
- });
- }
- }
-
- stream.end();
- logger.succ(`Exported to: ${path}`);
-
- const fileName = 'user-lists-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv';
- const driveFile = await addFile(user, path, fileName, null, null, true);
-
- logger.succ(`Exported to: ${driveFile.id}`);
- cleanup();
- done();
-}
diff --git a/src/queue/processors/db/import-blocking.ts b/src/queue/processors/db/import-blocking.ts
deleted file mode 100644
index 9951da669d..0000000000
--- a/src/queue/processors/db/import-blocking.ts
+++ /dev/null
@@ -1,74 +0,0 @@
-import * as Bull from 'bull';
-
-import { queueLogger } from '../../logger';
-import { parseAcct } from '@/misc/acct';
-import { resolveUser } from '@/remote/resolve-user';
-import { downloadTextFile } from '@/misc/download-text-file';
-import { isSelfHost, toPuny } from '@/misc/convert-host';
-import { Users, DriveFiles, Blockings } from '@/models/index';
-import { DbUserImportJobData } from '@/queue/types';
-import block from '@/services/blocking/create';
-
-const logger = queueLogger.createSubLogger('import-blocking');
-
-export async function importBlocking(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> {
- logger.info(`Importing blocking of ${job.data.user.id} ...`);
-
- const user = await Users.findOne(job.data.user.id);
- if (user == null) {
- done();
- return;
- }
-
- const file = await DriveFiles.findOne({
- id: job.data.fileId
- });
- if (file == null) {
- done();
- return;
- }
-
- const csv = await downloadTextFile(file.url);
-
- let linenum = 0;
-
- for (const line of csv.trim().split('\n')) {
- linenum++;
-
- try {
- const acct = line.split(',')[0].trim();
- const { username, host } = parseAcct(acct);
-
- let target = isSelfHost(host!) ? await Users.findOne({
- host: null,
- usernameLower: username.toLowerCase()
- }) : await Users.findOne({
- host: toPuny(host!),
- usernameLower: username.toLowerCase()
- });
-
- if (host == null && target == null) continue;
-
- if (target == null) {
- target = await resolveUser(username, host);
- }
-
- if (target == null) {
- throw `cannot resolve user: @${username}@${host}`;
- }
-
- // skip myself
- if (target.id === job.data.user.id) continue;
-
- logger.info(`Block[${linenum}] ${target.id} ...`);
-
- await block(user, target);
- } catch (e) {
- logger.warn(`Error in line:${linenum} ${e}`);
- }
- }
-
- logger.succ('Imported');
- done();
-}
-
diff --git a/src/queue/processors/db/import-following.ts b/src/queue/processors/db/import-following.ts
deleted file mode 100644
index 3d7b7ea404..0000000000
--- a/src/queue/processors/db/import-following.ts
+++ /dev/null
@@ -1,73 +0,0 @@
-import * as Bull from 'bull';
-
-import { queueLogger } from '../../logger';
-import follow from '@/services/following/create';
-import { parseAcct } from '@/misc/acct';
-import { resolveUser } from '@/remote/resolve-user';
-import { downloadTextFile } from '@/misc/download-text-file';
-import { isSelfHost, toPuny } from '@/misc/convert-host';
-import { Users, DriveFiles } from '@/models/index';
-import { DbUserImportJobData } from '@/queue/types';
-
-const logger = queueLogger.createSubLogger('import-following');
-
-export async function importFollowing(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> {
- logger.info(`Importing following of ${job.data.user.id} ...`);
-
- const user = await Users.findOne(job.data.user.id);
- if (user == null) {
- done();
- return;
- }
-
- const file = await DriveFiles.findOne({
- id: job.data.fileId
- });
- if (file == null) {
- done();
- return;
- }
-
- const csv = await downloadTextFile(file.url);
-
- let linenum = 0;
-
- for (const line of csv.trim().split('\n')) {
- linenum++;
-
- try {
- const acct = line.split(',')[0].trim();
- const { username, host } = parseAcct(acct);
-
- let target = isSelfHost(host!) ? await Users.findOne({
- host: null,
- usernameLower: username.toLowerCase()
- }) : await Users.findOne({
- host: toPuny(host!),
- usernameLower: username.toLowerCase()
- });
-
- if (host == null && target == null) continue;
-
- if (target == null) {
- target = await resolveUser(username, host);
- }
-
- if (target == null) {
- throw `cannot resolve user: @${username}@${host}`;
- }
-
- // skip myself
- if (target.id === job.data.user.id) continue;
-
- logger.info(`Follow[${linenum}] ${target.id} ...`);
-
- follow(user, target);
- } catch (e) {
- logger.warn(`Error in line:${linenum} ${e}`);
- }
- }
-
- logger.succ('Imported');
- done();
-}
diff --git a/src/queue/processors/db/import-muting.ts b/src/queue/processors/db/import-muting.ts
deleted file mode 100644
index 798f03a627..0000000000
--- a/src/queue/processors/db/import-muting.ts
+++ /dev/null
@@ -1,83 +0,0 @@
-import * as Bull from 'bull';
-
-import { queueLogger } from '../../logger';
-import { parseAcct } from '@/misc/acct';
-import { resolveUser } from '@/remote/resolve-user';
-import { downloadTextFile } from '@/misc/download-text-file';
-import { isSelfHost, toPuny } from '@/misc/convert-host';
-import { Users, DriveFiles, Mutings } from '@/models/index';
-import { DbUserImportJobData } from '@/queue/types';
-import { User } from '@/models/entities/user';
-import { genId } from '@/misc/gen-id';
-
-const logger = queueLogger.createSubLogger('import-muting');
-
-export async function importMuting(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> {
- logger.info(`Importing muting of ${job.data.user.id} ...`);
-
- const user = await Users.findOne(job.data.user.id);
- if (user == null) {
- done();
- return;
- }
-
- const file = await DriveFiles.findOne({
- id: job.data.fileId
- });
- if (file == null) {
- done();
- return;
- }
-
- const csv = await downloadTextFile(file.url);
-
- let linenum = 0;
-
- for (const line of csv.trim().split('\n')) {
- linenum++;
-
- try {
- const acct = line.split(',')[0].trim();
- const { username, host } = parseAcct(acct);
-
- let target = isSelfHost(host!) ? await Users.findOne({
- host: null,
- usernameLower: username.toLowerCase()
- }) : await Users.findOne({
- host: toPuny(host!),
- usernameLower: username.toLowerCase()
- });
-
- if (host == null && target == null) continue;
-
- if (target == null) {
- target = await resolveUser(username, host);
- }
-
- if (target == null) {
- throw `cannot resolve user: @${username}@${host}`;
- }
-
- // skip myself
- if (target.id === job.data.user.id) continue;
-
- logger.info(`Mute[${linenum}] ${target.id} ...`);
-
- await mute(user, target);
- } catch (e) {
- logger.warn(`Error in line:${linenum} ${e}`);
- }
- }
-
- logger.succ('Imported');
- done();
-}
-
-async function mute(user: User, target: User) {
- await Mutings.insert({
- id: genId(),
- createdAt: new Date(),
- muterId: user.id,
- muteeId: target.id,
- });
-}
diff --git a/src/queue/processors/db/import-user-lists.ts b/src/queue/processors/db/import-user-lists.ts
deleted file mode 100644
index 3b8c13262a..0000000000
--- a/src/queue/processors/db/import-user-lists.ts
+++ /dev/null
@@ -1,80 +0,0 @@
-import * as Bull from 'bull';
-
-import { queueLogger } from '../../logger';
-import { parseAcct } from '@/misc/acct';
-import { resolveUser } from '@/remote/resolve-user';
-import { pushUserToUserList } from '@/services/user-list/push';
-import { downloadTextFile } from '@/misc/download-text-file';
-import { isSelfHost, toPuny } from '@/misc/convert-host';
-import { DriveFiles, Users, UserLists, UserListJoinings } from '@/models/index';
-import { genId } from '@/misc/gen-id';
-import { DbUserImportJobData } from '@/queue/types';
-
-const logger = queueLogger.createSubLogger('import-user-lists');
-
-export async function importUserLists(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> {
- logger.info(`Importing user lists of ${job.data.user.id} ...`);
-
- const user = await Users.findOne(job.data.user.id);
- if (user == null) {
- done();
- return;
- }
-
- const file = await DriveFiles.findOne({
- id: job.data.fileId
- });
- if (file == null) {
- done();
- return;
- }
-
- const csv = await downloadTextFile(file.url);
-
- let linenum = 0;
-
- for (const line of csv.trim().split('\n')) {
- linenum++;
-
- try {
- const listName = line.split(',')[0].trim();
- const { username, host } = parseAcct(line.split(',')[1].trim());
-
- let list = await UserLists.findOne({
- userId: user.id,
- name: listName
- });
-
- if (list == null) {
- list = await UserLists.save({
- id: genId(),
- createdAt: new Date(),
- userId: user.id,
- name: listName,
- userIds: []
- });
- }
-
- let target = isSelfHost(host!) ? await Users.findOne({
- host: null,
- usernameLower: username.toLowerCase()
- }) : await Users.findOne({
- host: toPuny(host!),
- usernameLower: username.toLowerCase()
- });
-
- if (target == null) {
- target = await resolveUser(username, host);
- }
-
- if (await UserListJoinings.findOne({ userListId: list.id, userId: target.id }) != null) continue;
-
- pushUserToUserList(target, list);
- } catch (e) {
- logger.warn(`Error in line:${linenum} ${e}`);
- }
- }
-
- logger.succ('Imported');
- done();
-}
diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts
deleted file mode 100644
index 97087642b7..0000000000
--- a/src/queue/processors/db/index.ts
+++ /dev/null
@@ -1,33 +0,0 @@
-import * as Bull from 'bull';
-import { DbJobData } from '@/queue/types';
-import { deleteDriveFiles } from './delete-drive-files';
-import { exportNotes } from './export-notes';
-import { exportFollowing } from './export-following';
-import { exportMute } from './export-mute';
-import { exportBlocking } from './export-blocking';
-import { exportUserLists } from './export-user-lists';
-import { importFollowing } from './import-following';
-import { importUserLists } from './import-user-lists';
-import { deleteAccount } from './delete-account';
-import { importMuting } from './import-muting';
-import { importBlocking } from './import-blocking';
-
-const jobs = {
- deleteDriveFiles,
- exportNotes,
- exportFollowing,
- exportMute,
- exportBlocking,
- exportUserLists,
- importFollowing,
- importMuting,
- importBlocking,
- importUserLists,
- deleteAccount,
-} as Record<string, Bull.ProcessCallbackFunction<DbJobData> | Bull.ProcessPromiseFunction<DbJobData>>;
-
-export default function(dbQueue: Bull.Queue<DbJobData>) {
- for (const [k, v] of Object.entries(jobs)) {
- dbQueue.process(k, v);
- }
-}
diff --git a/src/queue/processors/deliver.ts b/src/queue/processors/deliver.ts
deleted file mode 100644
index 3c61896a2f..0000000000
--- a/src/queue/processors/deliver.ts
+++ /dev/null
@@ -1,94 +0,0 @@
-import { URL } from 'url';
-import * as Bull from 'bull';
-import request from '@/remote/activitypub/request';
-import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc';
-import Logger from '@/services/logger';
-import { Instances } from '@/models/index';
-import { instanceChart } from '@/services/chart/index';
-import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata';
-import { fetchMeta } from '@/misc/fetch-meta';
-import { toPuny } from '@/misc/convert-host';
-import { Cache } from '@/misc/cache';
-import { Instance } from '@/models/entities/instance';
-import { DeliverJobData } from '../types';
-import { StatusError } from '@/misc/fetch';
-
-const logger = new Logger('deliver');
-
-let latest: string | null = null;
-
-const suspendedHostsCache = new Cache<Instance[]>(1000 * 60 * 60);
-
-export default async (job: Bull.Job<DeliverJobData>) => {
- const { host } = new URL(job.data.to);
-
- // ブロックしてたら中断
- const meta = await fetchMeta();
- if (meta.blockedHosts.includes(toPuny(host))) {
- return 'skip (blocked)';
- }
-
- // isSuspendedなら中断
- let suspendedHosts = suspendedHostsCache.get(null);
- if (suspendedHosts == null) {
- suspendedHosts = await Instances.find({
- where: {
- isSuspended: true
- },
- });
- suspendedHostsCache.set(null, suspendedHosts);
- }
- if (suspendedHosts.map(x => x.host).includes(toPuny(host))) {
- return 'skip (suspended)';
- }
-
- try {
- if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) {
- logger.debug(`delivering ${latest}`);
- }
-
- await request(job.data.user, job.data.to, job.data.content);
-
- // Update stats
- registerOrFetchInstanceDoc(host).then(i => {
- Instances.update(i.id, {
- latestRequestSentAt: new Date(),
- latestStatus: 200,
- lastCommunicatedAt: new Date(),
- isNotResponding: false
- });
-
- fetchInstanceMetadata(i);
-
- instanceChart.requestSent(i.host, true);
- });
-
- return 'Success';
- } catch (res) {
- // Update stats
- registerOrFetchInstanceDoc(host).then(i => {
- Instances.update(i.id, {
- latestRequestSentAt: new Date(),
- latestStatus: res instanceof StatusError ? res.statusCode : null,
- isNotResponding: true
- });
-
- instanceChart.requestSent(i.host, false);
- });
-
- if (res instanceof StatusError) {
- // 4xx
- if (res.isClientError) {
- // HTTPステータスコード4xxはクライアントエラーであり、それはつまり
- // 何回再送しても成功することはないということなのでエラーにはしないでおく
- return `${res.statusCode} ${res.statusMessage}`;
- }
-
- // 5xx etc.
- throw `${res.statusCode} ${res.statusMessage}`;
- } else {
- // DNS error, socket error, timeout ...
- throw res;
- }
- }
-};
diff --git a/src/queue/processors/inbox.ts b/src/queue/processors/inbox.ts
deleted file mode 100644
index 4032ce8653..0000000000
--- a/src/queue/processors/inbox.ts
+++ /dev/null
@@ -1,149 +0,0 @@
-import { URL } from 'url';
-import * as Bull from 'bull';
-import * as httpSignature from 'http-signature';
-import perform from '@/remote/activitypub/perform';
-import Logger from '@/services/logger';
-import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc';
-import { Instances } from '@/models/index';
-import { instanceChart } from '@/services/chart/index';
-import { fetchMeta } from '@/misc/fetch-meta';
-import { toPuny, extractDbHost } from '@/misc/convert-host';
-import { getApId } from '@/remote/activitypub/type';
-import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata';
-import { InboxJobData } from '../types';
-import DbResolver from '@/remote/activitypub/db-resolver';
-import { resolvePerson } from '@/remote/activitypub/models/person';
-import { LdSignature } from '@/remote/activitypub/misc/ld-signature';
-import { StatusError } from '@/misc/fetch';
-
-const logger = new Logger('inbox');
-
-// ユーザーのinboxにアクティビティが届いた時の処理
-export default async (job: Bull.Job<InboxJobData>): Promise<string> => {
- const signature = job.data.signature; // HTTP-signature
- const activity = job.data.activity;
-
- //#region Log
- const info = Object.assign({}, activity) as any;
- delete info['@context'];
- logger.debug(JSON.stringify(info, null, 2));
- //#endregion
-
- const host = toPuny(new URL(signature.keyId).hostname);
-
- // ブロックしてたら中断
- const meta = await fetchMeta();
- if (meta.blockedHosts.includes(host)) {
- return `Blocked request: ${host}`;
- }
-
- const keyIdLower = signature.keyId.toLowerCase();
- if (keyIdLower.startsWith('acct:')) {
- return `Old keyId is no longer supported. ${keyIdLower}`;
- }
-
- // TDOO: キャッシュ
- const dbResolver = new DbResolver();
-
- // HTTP-Signature keyIdを元にDBから取得
- let authUser = await dbResolver.getAuthUserFromKeyId(signature.keyId);
-
- // keyIdでわからなければ、activity.actorを元にDBから取得 || activity.actorを元にリモートから取得
- if (authUser == null) {
- try {
- authUser = await dbResolver.getAuthUserFromApId(getApId(activity.actor));
- } catch (e) {
- // 対象が4xxならスキップ
- if (e instanceof StatusError && e.isClientError) {
- return `skip: Ignored deleted actors on both ends ${activity.actor} - ${e.statusCode}`;
- }
- throw `Error in actor ${activity.actor} - ${e.statusCode || e}`;
- }
- }
-
- // それでもわからなければ終了
- if (authUser == null) {
- return `skip: failed to resolve user`;
- }
-
- // publicKey がなくても終了
- if (authUser.key == null) {
- return `skip: failed to resolve user publicKey`;
- }
-
- // HTTP-Signatureの検証
- const httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem);
-
- // また、signatureのsignerは、activity.actorと一致する必要がある
- if (!httpSignatureValidated || authUser.user.uri !== activity.actor) {
- // 一致しなくても、でもLD-Signatureがありそうならそっちも見る
- if (activity.signature) {
- if (activity.signature.type !== 'RsaSignature2017') {
- return `skip: unsupported LD-signature type ${activity.signature.type}`;
- }
-
- // activity.signature.creator: https://example.oom/users/user#main-key
- // みたいになっててUserを引っ張れば公開キーも入ることを期待する
- if (activity.signature.creator) {
- const candicate = activity.signature.creator.replace(/#.*/, '');
- await resolvePerson(candicate).catch(() => null);
- }
-
- // keyIdからLD-Signatureのユーザーを取得
- authUser = await dbResolver.getAuthUserFromKeyId(activity.signature.creator);
- if (authUser == null) {
- return `skip: LD-Signatureのユーザーが取得できませんでした`;
- }
-
- if (authUser.key == null) {
- return `skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした`;
- }
-
- // LD-Signature検証
- const ldSignature = new LdSignature();
- const verified = await ldSignature.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false);
- if (!verified) {
- return `skip: LD-Signatureの検証に失敗しました`;
- }
-
- // もう一度actorチェック
- if (authUser.user.uri !== activity.actor) {
- return `skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`;
- }
-
- // ブロックしてたら中断
- const ldHost = extractDbHost(authUser.user.uri);
- if (meta.blockedHosts.includes(ldHost)) {
- return `Blocked request: ${ldHost}`;
- }
- } else {
- return `skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`;
- }
- }
-
- // activity.idがあればホストが署名者のホストであることを確認する
- if (typeof activity.id === 'string') {
- const signerHost = extractDbHost(authUser.user.uri!);
- const activityIdHost = extractDbHost(activity.id);
- if (signerHost !== activityIdHost) {
- return `skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`;
- }
- }
-
- // Update stats
- registerOrFetchInstanceDoc(authUser.user.host).then(i => {
- Instances.update(i.id, {
- latestRequestReceivedAt: new Date(),
- lastCommunicatedAt: new Date(),
- isNotResponding: false
- });
-
- fetchInstanceMetadata(i);
-
- instanceChart.requestReceived(i.host);
- });
-
- // アクティビティを処理
- await perform(authUser.user, activity);
- return `ok`;
-};
diff --git a/src/queue/processors/object-storage/clean-remote-files.ts b/src/queue/processors/object-storage/clean-remote-files.ts
deleted file mode 100644
index 3b2e4ea939..0000000000
--- a/src/queue/processors/object-storage/clean-remote-files.ts
+++ /dev/null
@@ -1,50 +0,0 @@
-import * as Bull from 'bull';
-
-import { queueLogger } from '../../logger';
-import { deleteFileSync } from '@/services/drive/delete-file';
-import { DriveFiles } from '@/models/index';
-import { MoreThan, Not, IsNull } from 'typeorm';
-
-const logger = queueLogger.createSubLogger('clean-remote-files');
-
-export default async function cleanRemoteFiles(job: Bull.Job<{}>, done: any): Promise<void> {
- logger.info(`Deleting cached remote files...`);
-
- let deletedCount = 0;
- let cursor: any = null;
-
- while (true) {
- const files = await DriveFiles.find({
- where: {
- userHost: Not(IsNull()),
- isLink: false,
- ...(cursor ? { id: MoreThan(cursor) } : {})
- },
- take: 8,
- order: {
- id: 1
- }
- });
-
- if (files.length === 0) {
- job.progress(100);
- break;
- }
-
- cursor = files[files.length - 1].id;
-
- await Promise.all(files.map(file => deleteFileSync(file, true)));
-
- deletedCount += 8;
-
- const total = await DriveFiles.count({
- userHost: Not(IsNull()),
- isLink: false,
- });
-
- job.progress(deletedCount / total);
- }
-
- logger.succ(`All cahced remote files has been deleted.`);
- done();
-}
diff --git a/src/queue/processors/object-storage/delete-file.ts b/src/queue/processors/object-storage/delete-file.ts
deleted file mode 100644
index ed22968a27..0000000000
--- a/src/queue/processors/object-storage/delete-file.ts
+++ /dev/null
@@ -1,11 +0,0 @@
-import { ObjectStorageFileJobData } from '@/queue/types';
-import * as Bull from 'bull';
-import { deleteObjectStorageFile } from '@/services/drive/delete-file';
-
-export default async (job: Bull.Job<ObjectStorageFileJobData>) => {
- const key: string = job.data.key;
-
- await deleteObjectStorageFile(key);
-
- return 'Success';
-};
diff --git a/src/queue/processors/object-storage/index.ts b/src/queue/processors/object-storage/index.ts
deleted file mode 100644
index 0d9570e179..0000000000
--- a/src/queue/processors/object-storage/index.ts
+++ /dev/null
@@ -1,15 +0,0 @@
-import * as Bull from 'bull';
-import { ObjectStorageJobData } from '@/queue/types';
-import deleteFile from './delete-file';
-import cleanRemoteFiles from './clean-remote-files';
-
-const jobs = {
- deleteFile,
- cleanRemoteFiles,
-} as Record<string, Bull.ProcessCallbackFunction<ObjectStorageJobData> | Bull.ProcessPromiseFunction<ObjectStorageJobData>>;
-
-export default function(q: Bull.Queue) {
- for (const [k, v] of Object.entries(jobs)) {
- q.process(k, 16, v);
- }
-}
diff --git a/src/queue/processors/system/index.ts b/src/queue/processors/system/index.ts
deleted file mode 100644
index 52b7868105..0000000000
--- a/src/queue/processors/system/index.ts
+++ /dev/null
@@ -1,12 +0,0 @@
-import * as Bull from 'bull';
-import { resyncCharts } from './resync-charts';
-
-const jobs = {
- resyncCharts,
-} as Record<string, Bull.ProcessCallbackFunction<{}> | Bull.ProcessPromiseFunction<{}>>;
-
-export default function(dbQueue: Bull.Queue<{}>) {
- for (const [k, v] of Object.entries(jobs)) {
- dbQueue.process(k, v);
- }
-}
diff --git a/src/queue/processors/system/resync-charts.ts b/src/queue/processors/system/resync-charts.ts
deleted file mode 100644
index b36b024cfb..0000000000
--- a/src/queue/processors/system/resync-charts.ts
+++ /dev/null
@@ -1,21 +0,0 @@
-import * as Bull from 'bull';
-
-import { queueLogger } from '../../logger';
-import { driveChart, notesChart, usersChart } from '@/services/chart/index';
-
-const logger = queueLogger.createSubLogger('resync-charts');
-
-export default async function resyncCharts(job: Bull.Job<{}>, done: any): Promise<void> {
- logger.info(`Resync charts...`);
-
- // TODO: ユーザーごとのチャートも更新する
- // TODO: インスタンスごとのチャートも更新する
- await Promise.all([
- driveChart.resync(),
- notesChart.resync(),
- usersChart.resync(),
- ]);
-
- logger.succ(`All charts successfully resynced.`);
- done();
-}