summaryrefslogtreecommitdiff
path: root/packages/backend/src/queue
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2021-11-12 02:02:25 +0900
committersyuilo <Syuilotan@yahoo.co.jp>2021-11-12 02:02:25 +0900
commit0e4a111f81cceed275d9bec2695f6e401fb654d8 (patch)
tree40874799472fa07416f17b50a398ac33b7771905 /packages/backend/src/queue
parentupdate deps (diff)
downloadsharkey-0e4a111f81cceed275d9bec2695f6e401fb654d8.tar.gz
sharkey-0e4a111f81cceed275d9bec2695f6e401fb654d8.tar.bz2
sharkey-0e4a111f81cceed275d9bec2695f6e401fb654d8.zip
refactoring
Resolve #7779
Diffstat (limited to 'packages/backend/src/queue')
-rw-r--r--packages/backend/src/queue/get-job-info.ts15
-rw-r--r--packages/backend/src/queue/index.ts255
-rw-r--r--packages/backend/src/queue/initialize.ts33
-rw-r--r--packages/backend/src/queue/logger.ts3
-rw-r--r--packages/backend/src/queue/processors/db/delete-account.ts94
-rw-r--r--packages/backend/src/queue/processors/db/delete-drive-files.ts56
-rw-r--r--packages/backend/src/queue/processors/db/export-blocking.ts94
-rw-r--r--packages/backend/src/queue/processors/db/export-following.ts94
-rw-r--r--packages/backend/src/queue/processors/db/export-mute.ts94
-rw-r--r--packages/backend/src/queue/processors/db/export-notes.ts133
-rw-r--r--packages/backend/src/queue/processors/db/export-user-lists.ts71
-rw-r--r--packages/backend/src/queue/processors/db/import-blocking.ts74
-rw-r--r--packages/backend/src/queue/processors/db/import-following.ts73
-rw-r--r--packages/backend/src/queue/processors/db/import-muting.ts83
-rw-r--r--packages/backend/src/queue/processors/db/import-user-lists.ts80
-rw-r--r--packages/backend/src/queue/processors/db/index.ts33
-rw-r--r--packages/backend/src/queue/processors/deliver.ts94
-rw-r--r--packages/backend/src/queue/processors/inbox.ts149
-rw-r--r--packages/backend/src/queue/processors/object-storage/clean-remote-files.ts50
-rw-r--r--packages/backend/src/queue/processors/object-storage/delete-file.ts11
-rw-r--r--packages/backend/src/queue/processors/object-storage/index.ts15
-rw-r--r--packages/backend/src/queue/processors/system/index.ts12
-rw-r--r--packages/backend/src/queue/processors/system/resync-charts.ts21
-rw-r--r--packages/backend/src/queue/queues.ts9
-rw-r--r--packages/backend/src/queue/types.ts44
25 files changed, 1690 insertions, 0 deletions
diff --git a/packages/backend/src/queue/get-job-info.ts b/packages/backend/src/queue/get-job-info.ts
new file mode 100644
index 0000000000..f601ae62d0
--- /dev/null
+++ b/packages/backend/src/queue/get-job-info.ts
@@ -0,0 +1,15 @@
+import * as Bull from 'bull';
+
+export function getJobInfo(job: Bull.Job, increment = false) {
+ const age = Date.now() - job.timestamp;
+
+ const formated = age > 60000 ? `${Math.floor(age / 1000 / 60)}m`
+ : age > 10000 ? `${Math.floor(age / 1000)}s`
+ : `${age}ms`;
+
+ // onActiveとかonCompletedのattemptsMadeがなぜか0始まりなのでインクリメントする
+ const currentAttempts = job.attemptsMade + (increment ? 1 : 0);
+ const maxAttempts = job.opts ? job.opts.attempts : 0;
+
+ return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`;
+}
diff --git a/packages/backend/src/queue/index.ts b/packages/backend/src/queue/index.ts
new file mode 100644
index 0000000000..37eb809604
--- /dev/null
+++ b/packages/backend/src/queue/index.ts
@@ -0,0 +1,255 @@
+import * as httpSignature from 'http-signature';
+
+import config from '@/config/index';
+import { envOption } from '../env';
+
+import processDeliver from './processors/deliver';
+import processInbox from './processors/inbox';
+import processDb from './processors/db/index';
+import procesObjectStorage from './processors/object-storage/index';
+import { queueLogger } from './logger';
+import { DriveFile } from '@/models/entities/drive-file';
+import { getJobInfo } from './get-job-info';
+import { systemQueue, dbQueue, deliverQueue, inboxQueue, objectStorageQueue } from './queues';
+import { ThinUser } from './types';
+import { IActivity } from '@/remote/activitypub/type';
+
+function renderError(e: Error): any {
+ return {
+ stack: e?.stack,
+ message: e?.message,
+ name: e?.name
+ };
+}
+
+const systemLogger = queueLogger.createSubLogger('system');
+const deliverLogger = queueLogger.createSubLogger('deliver');
+const inboxLogger = queueLogger.createSubLogger('inbox');
+const dbLogger = queueLogger.createSubLogger('db');
+const objectStorageLogger = queueLogger.createSubLogger('objectStorage');
+
+systemQueue
+ .on('waiting', (jobId) => systemLogger.debug(`waiting id=${jobId}`))
+ .on('active', (job) => systemLogger.debug(`active id=${job.id}`))
+ .on('completed', (job, result) => systemLogger.debug(`completed(${result}) id=${job.id}`))
+ .on('failed', (job, err) => systemLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
+ .on('error', (job: any, err: Error) => systemLogger.error(`error ${err}`, { job, e: renderError(err) }))
+ .on('stalled', (job) => systemLogger.warn(`stalled id=${job.id}`));
+
+deliverQueue
+ .on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`))
+ .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
+ .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`))
+ .on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`, { job, e: renderError(err) }))
+ .on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
+
+inboxQueue
+ .on('waiting', (jobId) => inboxLogger.debug(`waiting id=${jobId}`))
+ .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`))
+ .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
+ .on('failed', (job, err) => inboxLogger.warn(`failed(${err}) ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`, { job, e: renderError(err) }))
+ .on('error', (job: any, err: Error) => inboxLogger.error(`error ${err}`, { job, e: renderError(err) }))
+ .on('stalled', (job) => inboxLogger.warn(`stalled ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`));
+
+dbQueue
+ .on('waiting', (jobId) => dbLogger.debug(`waiting id=${jobId}`))
+ .on('active', (job) => dbLogger.debug(`active id=${job.id}`))
+ .on('completed', (job, result) => dbLogger.debug(`completed(${result}) id=${job.id}`))
+ .on('failed', (job, err) => dbLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
+ .on('error', (job: any, err: Error) => dbLogger.error(`error ${err}`, { job, e: renderError(err) }))
+ .on('stalled', (job) => dbLogger.warn(`stalled id=${job.id}`));
+
+objectStorageQueue
+ .on('waiting', (jobId) => objectStorageLogger.debug(`waiting id=${jobId}`))
+ .on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`))
+ .on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`))
+ .on('failed', (job, err) => objectStorageLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
+ .on('error', (job: any, err: Error) => objectStorageLogger.error(`error ${err}`, { job, e: renderError(err) }))
+ .on('stalled', (job) => objectStorageLogger.warn(`stalled id=${job.id}`));
+
+export function deliver(user: ThinUser, content: unknown, to: string | null) {
+ if (content == null) return null;
+ if (to == null) return null;
+
+ const data = {
+ user: {
+ id: user.id
+ },
+ content,
+ to
+ };
+
+ return deliverQueue.add(data, {
+ attempts: config.deliverJobMaxAttempts || 12,
+ timeout: 1 * 60 * 1000, // 1min
+ backoff: {
+ type: 'apBackoff'
+ },
+ removeOnComplete: true,
+ removeOnFail: true
+ });
+}
+
+export function inbox(activity: IActivity, signature: httpSignature.IParsedSignature) {
+ const data = {
+ activity: activity,
+ signature
+ };
+
+ return inboxQueue.add(data, {
+ attempts: config.inboxJobMaxAttempts || 8,
+ timeout: 5 * 60 * 1000, // 5min
+ backoff: {
+ type: 'apBackoff'
+ },
+ removeOnComplete: true,
+ removeOnFail: true
+ });
+}
+
+export function createDeleteDriveFilesJob(user: ThinUser) {
+ return dbQueue.add('deleteDriveFiles', {
+ user: user
+ }, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
+}
+
+export function createExportNotesJob(user: ThinUser) {
+ return dbQueue.add('exportNotes', {
+ user: user
+ }, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
+}
+
+export function createExportFollowingJob(user: ThinUser) {
+ return dbQueue.add('exportFollowing', {
+ user: user
+ }, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
+}
+
+export function createExportMuteJob(user: ThinUser) {
+ return dbQueue.add('exportMute', {
+ user: user
+ }, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
+}
+
+export function createExportBlockingJob(user: ThinUser) {
+ return dbQueue.add('exportBlocking', {
+ user: user
+ }, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
+}
+
+export function createExportUserListsJob(user: ThinUser) {
+ return dbQueue.add('exportUserLists', {
+ user: user
+ }, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
+}
+
+export function createImportFollowingJob(user: ThinUser, fileId: DriveFile['id']) {
+ return dbQueue.add('importFollowing', {
+ user: user,
+ fileId: fileId
+ }, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
+}
+
+export function createImportMutingJob(user: ThinUser, fileId: DriveFile['id']) {
+ return dbQueue.add('importMuting', {
+ user: user,
+ fileId: fileId
+ }, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
+}
+
+export function createImportBlockingJob(user: ThinUser, fileId: DriveFile['id']) {
+ return dbQueue.add('importBlocking', {
+ user: user,
+ fileId: fileId
+ }, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
+}
+
+export function createImportUserListsJob(user: ThinUser, fileId: DriveFile['id']) {
+ return dbQueue.add('importUserLists', {
+ user: user,
+ fileId: fileId
+ }, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
+}
+
+export function createDeleteAccountJob(user: ThinUser, opts: { soft?: boolean; } = {}) {
+ return dbQueue.add('deleteAccount', {
+ user: user,
+ soft: opts.soft
+ }, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
+}
+
+export function createDeleteObjectStorageFileJob(key: string) {
+ return objectStorageQueue.add('deleteFile', {
+ key: key
+ }, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
+}
+
+export function createCleanRemoteFilesJob() {
+ return objectStorageQueue.add('cleanRemoteFiles', {}, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
+}
+
+export default function() {
+ if (envOption.onlyServer) return;
+
+ deliverQueue.process(config.deliverJobConcurrency || 128, processDeliver);
+ inboxQueue.process(config.inboxJobConcurrency || 16, processInbox);
+ processDb(dbQueue);
+ procesObjectStorage(objectStorageQueue);
+
+ systemQueue.add('resyncCharts', {
+ }, {
+ repeat: { cron: '0 0 * * *' }
+ });
+}
+
+export function destroy() {
+ deliverQueue.once('cleaned', (jobs, status) => {
+ deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
+ });
+ deliverQueue.clean(0, 'delayed');
+
+ inboxQueue.once('cleaned', (jobs, status) => {
+ inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
+ });
+ inboxQueue.clean(0, 'delayed');
+}
diff --git a/packages/backend/src/queue/initialize.ts b/packages/backend/src/queue/initialize.ts
new file mode 100644
index 0000000000..31102a3ed2
--- /dev/null
+++ b/packages/backend/src/queue/initialize.ts
@@ -0,0 +1,33 @@
+import * as Bull from 'bull';
+import config from '@/config/index';
+
+export function initialize<T>(name: string, limitPerSec = -1) {
+ return new Bull<T>(name, {
+ redis: {
+ port: config.redis.port,
+ host: config.redis.host,
+ password: config.redis.pass,
+ db: config.redis.db || 0,
+ },
+ prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : 'queue',
+ limiter: limitPerSec > 0 ? {
+ max: limitPerSec,
+ duration: 1000
+ } : undefined,
+ settings: {
+ backoffStrategies: {
+ apBackoff
+ }
+ }
+ });
+}
+
+// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
+function apBackoff(attemptsMade: number, err: Error) {
+ const baseDelay = 60 * 1000; // 1min
+ const maxBackoff = 8 * 60 * 60 * 1000; // 8hours
+ let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay;
+ backoff = Math.min(backoff, maxBackoff);
+ backoff += Math.round(backoff * Math.random() * 0.2);
+ return backoff;
+}
diff --git a/packages/backend/src/queue/logger.ts b/packages/backend/src/queue/logger.ts
new file mode 100644
index 0000000000..f789b9d079
--- /dev/null
+++ b/packages/backend/src/queue/logger.ts
@@ -0,0 +1,3 @@
+import Logger from '@/services/logger';
+
+export const queueLogger = new Logger('queue', 'orange');
diff --git a/packages/backend/src/queue/processors/db/delete-account.ts b/packages/backend/src/queue/processors/db/delete-account.ts
new file mode 100644
index 0000000000..e54f38e35e
--- /dev/null
+++ b/packages/backend/src/queue/processors/db/delete-account.ts
@@ -0,0 +1,94 @@
+import * as Bull from 'bull';
+import { queueLogger } from '../../logger';
+import { DriveFiles, Notes, UserProfiles, Users } from '@/models/index';
+import { DbUserDeleteJobData } from '@/queue/types';
+import { Note } from '@/models/entities/note';
+import { DriveFile } from '@/models/entities/drive-file';
+import { MoreThan } from 'typeorm';
+import { deleteFileSync } from '@/services/drive/delete-file';
+import { sendEmail } from '@/services/send-email';
+
+const logger = queueLogger.createSubLogger('delete-account');
+
+export async function deleteAccount(job: Bull.Job<DbUserDeleteJobData>): Promise<string | void> {
+ logger.info(`Deleting account of ${job.data.user.id} ...`);
+
+ const user = await Users.findOne(job.data.user.id);
+ if (user == null) {
+ return;
+ }
+
+ { // Delete notes
+ let cursor: Note['id'] | null = null;
+
+ while (true) {
+ const notes = await Notes.find({
+ where: {
+ userId: user.id,
+ ...(cursor ? { id: MoreThan(cursor) } : {})
+ },
+ take: 100,
+ order: {
+ id: 1
+ }
+ });
+
+ if (notes.length === 0) {
+ break;
+ }
+
+ cursor = notes[notes.length - 1].id;
+
+ await Notes.delete(notes.map(note => note.id));
+ }
+
+ logger.succ(`All of notes deleted`);
+ }
+
+ { // Delete files
+ let cursor: DriveFile['id'] | null = null;
+
+ while (true) {
+ const files = await DriveFiles.find({
+ where: {
+ userId: user.id,
+ ...(cursor ? { id: MoreThan(cursor) } : {})
+ },
+ take: 10,
+ order: {
+ id: 1
+ }
+ });
+
+ if (files.length === 0) {
+ break;
+ }
+
+ cursor = files[files.length - 1].id;
+
+ for (const file of files) {
+ await deleteFileSync(file);
+ }
+ }
+
+ logger.succ(`All of files deleted`);
+ }
+
+ { // Send email notification
+ const profile = await UserProfiles.findOneOrFail(user.id);
+ if (profile.email && profile.emailVerified) {
+ sendEmail(profile.email, 'Account deleted',
+ `Your account has been deleted.`,
+ `Your account has been deleted.`);
+ }
+ }
+
+ // soft指定されている場合は物理削除しない
+ if (job.data.soft) {
+ // nop
+ } else {
+ await Users.delete(job.data.user.id);
+ }
+
+ return 'Account deleted';
+}
diff --git a/packages/backend/src/queue/processors/db/delete-drive-files.ts b/packages/backend/src/queue/processors/db/delete-drive-files.ts
new file mode 100644
index 0000000000..8a28468b0d
--- /dev/null
+++ b/packages/backend/src/queue/processors/db/delete-drive-files.ts
@@ -0,0 +1,56 @@
+import * as Bull from 'bull';
+
+import { queueLogger } from '../../logger';
+import { deleteFileSync } from '@/services/drive/delete-file';
+import { Users, DriveFiles } from '@/models/index';
+import { MoreThan } from 'typeorm';
+import { DbUserJobData } from '@/queue/types';
+
+const logger = queueLogger.createSubLogger('delete-drive-files');
+
+export async function deleteDriveFiles(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
+ logger.info(`Deleting drive files of ${job.data.user.id} ...`);
+
+ const user = await Users.findOne(job.data.user.id);
+ if (user == null) {
+ done();
+ return;
+ }
+
+ let deletedCount = 0;
+ let cursor: any = null;
+
+ while (true) {
+ const files = await DriveFiles.find({
+ where: {
+ userId: user.id,
+ ...(cursor ? { id: MoreThan(cursor) } : {})
+ },
+ take: 100,
+ order: {
+ id: 1
+ }
+ });
+
+ if (files.length === 0) {
+ job.progress(100);
+ break;
+ }
+
+ cursor = files[files.length - 1].id;
+
+ for (const file of files) {
+ await deleteFileSync(file);
+ deletedCount++;
+ }
+
+ const total = await DriveFiles.count({
+ userId: user.id,
+ });
+
+ job.progress(deletedCount / total);
+ }
+
+ logger.succ(`All drive files (${deletedCount}) of ${user.id} has been deleted.`);
+ done();
+}
diff --git a/packages/backend/src/queue/processors/db/export-blocking.ts b/packages/backend/src/queue/processors/db/export-blocking.ts
new file mode 100644
index 0000000000..8b8aa259d4
--- /dev/null
+++ b/packages/backend/src/queue/processors/db/export-blocking.ts
@@ -0,0 +1,94 @@
+import * as Bull from 'bull';
+import * as tmp from 'tmp';
+import * as fs from 'fs';
+
+import { queueLogger } from '../../logger';
+import addFile from '@/services/drive/add-file';
+import * as dateFormat from 'dateformat';
+import { getFullApAccount } from '@/misc/convert-host';
+import { Users, Blockings } from '@/models/index';
+import { MoreThan } from 'typeorm';
+import { DbUserJobData } from '@/queue/types';
+
+const logger = queueLogger.createSubLogger('export-blocking');
+
+export async function exportBlocking(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
+ logger.info(`Exporting blocking of ${job.data.user.id} ...`);
+
+ const user = await Users.findOne(job.data.user.id);
+ if (user == null) {
+ done();
+ return;
+ }
+
+ // Create temp file
+ const [path, cleanup] = await new Promise<[string, any]>((res, rej) => {
+ tmp.file((e, path, fd, cleanup) => {
+ if (e) return rej(e);
+ res([path, cleanup]);
+ });
+ });
+
+ logger.info(`Temp file is ${path}`);
+
+ const stream = fs.createWriteStream(path, { flags: 'a' });
+
+ let exportedCount = 0;
+ let cursor: any = null;
+
+ while (true) {
+ const blockings = await Blockings.find({
+ where: {
+ blockerId: user.id,
+ ...(cursor ? { id: MoreThan(cursor) } : {})
+ },
+ take: 100,
+ order: {
+ id: 1
+ }
+ });
+
+ if (blockings.length === 0) {
+ job.progress(100);
+ break;
+ }
+
+ cursor = blockings[blockings.length - 1].id;
+
+ for (const block of blockings) {
+ const u = await Users.findOne({ id: block.blockeeId });
+ if (u == null) {
+ exportedCount++; continue;
+ }
+
+ const content = getFullApAccount(u.username, u.host);
+ await new Promise<void>((res, rej) => {
+ stream.write(content + '\n', err => {
+ if (err) {
+ logger.error(err);
+ rej(err);
+ } else {
+ res();
+ }
+ });
+ });
+ exportedCount++;
+ }
+
+ const total = await Blockings.count({
+ blockerId: user.id,
+ });
+
+ job.progress(exportedCount / total);
+ }
+
+ stream.end();
+ logger.succ(`Exported to: ${path}`);
+
+ const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv';
+ const driveFile = await addFile(user, path, fileName, null, null, true);
+
+ logger.succ(`Exported to: ${driveFile.id}`);
+ cleanup();
+ done();
+}
diff --git a/packages/backend/src/queue/processors/db/export-following.ts b/packages/backend/src/queue/processors/db/export-following.ts
new file mode 100644
index 0000000000..a0ecf5f560
--- /dev/null
+++ b/packages/backend/src/queue/processors/db/export-following.ts
@@ -0,0 +1,94 @@
+import * as Bull from 'bull';
+import * as tmp from 'tmp';
+import * as fs from 'fs';
+
+import { queueLogger } from '../../logger';
+import addFile from '@/services/drive/add-file';
+import * as dateFormat from 'dateformat';
+import { getFullApAccount } from '@/misc/convert-host';
+import { Users, Followings } from '@/models/index';
+import { MoreThan } from 'typeorm';
+import { DbUserJobData } from '@/queue/types';
+
+const logger = queueLogger.createSubLogger('export-following');
+
+export async function exportFollowing(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
+ logger.info(`Exporting following of ${job.data.user.id} ...`);
+
+ const user = await Users.findOne(job.data.user.id);
+ if (user == null) {
+ done();
+ return;
+ }
+
+ // Create temp file
+ const [path, cleanup] = await new Promise<[string, any]>((res, rej) => {
+ tmp.file((e, path, fd, cleanup) => {
+ if (e) return rej(e);
+ res([path, cleanup]);
+ });
+ });
+
+ logger.info(`Temp file is ${path}`);
+
+ const stream = fs.createWriteStream(path, { flags: 'a' });
+
+ let exportedCount = 0;
+ let cursor: any = null;
+
+ while (true) {
+ const followings = await Followings.find({
+ where: {
+ followerId: user.id,
+ ...(cursor ? { id: MoreThan(cursor) } : {})
+ },
+ take: 100,
+ order: {
+ id: 1
+ }
+ });
+
+ if (followings.length === 0) {
+ job.progress(100);
+ break;
+ }
+
+ cursor = followings[followings.length - 1].id;
+
+ for (const following of followings) {
+ const u = await Users.findOne({ id: following.followeeId });
+ if (u == null) {
+ exportedCount++; continue;
+ }
+
+ const content = getFullApAccount(u.username, u.host);
+ await new Promise<void>((res, rej) => {
+ stream.write(content + '\n', err => {
+ if (err) {
+ logger.error(err);
+ rej(err);
+ } else {
+ res();
+ }
+ });
+ });
+ exportedCount++;
+ }
+
+ const total = await Followings.count({
+ followerId: user.id,
+ });
+
+ job.progress(exportedCount / total);
+ }
+
+ stream.end();
+ logger.succ(`Exported to: ${path}`);
+
+ const fileName = 'following-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv';
+ const driveFile = await addFile(user, path, fileName, null, null, true);
+
+ logger.succ(`Exported to: ${driveFile.id}`);
+ cleanup();
+ done();
+}
diff --git a/packages/backend/src/queue/processors/db/export-mute.ts b/packages/backend/src/queue/processors/db/export-mute.ts
new file mode 100644
index 0000000000..d5976f7d56
--- /dev/null
+++ b/packages/backend/src/queue/processors/db/export-mute.ts
@@ -0,0 +1,94 @@
+import * as Bull from 'bull';
+import * as tmp from 'tmp';
+import * as fs from 'fs';
+
+import { queueLogger } from '../../logger';
+import addFile from '@/services/drive/add-file';
+import * as dateFormat from 'dateformat';
+import { getFullApAccount } from '@/misc/convert-host';
+import { Users, Mutings } from '@/models/index';
+import { MoreThan } from 'typeorm';
+import { DbUserJobData } from '@/queue/types';
+
+const logger = queueLogger.createSubLogger('export-mute');
+
+export async function exportMute(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
+ logger.info(`Exporting mute of ${job.data.user.id} ...`);
+
+ const user = await Users.findOne(job.data.user.id);
+ if (user == null) {
+ done();
+ return;
+ }
+
+ // Create temp file
+ const [path, cleanup] = await new Promise<[string, any]>((res, rej) => {
+ tmp.file((e, path, fd, cleanup) => {
+ if (e) return rej(e);
+ res([path, cleanup]);
+ });
+ });
+
+ logger.info(`Temp file is ${path}`);
+
+ const stream = fs.createWriteStream(path, { flags: 'a' });
+
+ let exportedCount = 0;
+ let cursor: any = null;
+
+ while (true) {
+ const mutes = await Mutings.find({
+ where: {
+ muterId: user.id,
+ ...(cursor ? { id: MoreThan(cursor) } : {})
+ },
+ take: 100,
+ order: {
+ id: 1
+ }
+ });
+
+ if (mutes.length === 0) {
+ job.progress(100);
+ break;
+ }
+
+ cursor = mutes[mutes.length - 1].id;
+
+ for (const mute of mutes) {
+ const u = await Users.findOne({ id: mute.muteeId });
+ if (u == null) {
+ exportedCount++; continue;
+ }
+
+ const content = getFullApAccount(u.username, u.host);
+ await new Promise<void>((res, rej) => {
+ stream.write(content + '\n', err => {
+ if (err) {
+ logger.error(err);
+ rej(err);
+ } else {
+ res();
+ }
+ });
+ });
+ exportedCount++;
+ }
+
+ const total = await Mutings.count({
+ muterId: user.id,
+ });
+
+ job.progress(exportedCount / total);
+ }
+
+ stream.end();
+ logger.succ(`Exported to: ${path}`);
+
+ const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv';
+ const driveFile = await addFile(user, path, fileName, null, null, true);
+
+ logger.succ(`Exported to: ${driveFile.id}`);
+ cleanup();
+ done();
+}
diff --git a/packages/backend/src/queue/processors/db/export-notes.ts b/packages/backend/src/queue/processors/db/export-notes.ts
new file mode 100644
index 0000000000..49850aa706
--- /dev/null
+++ b/packages/backend/src/queue/processors/db/export-notes.ts
@@ -0,0 +1,133 @@
+import * as Bull from 'bull';
+import * as tmp from 'tmp';
+import * as fs from 'fs';
+
+import { queueLogger } from '../../logger';
+import addFile from '@/services/drive/add-file';
+import * as dateFormat from 'dateformat';
+import { Users, Notes, Polls } from '@/models/index';
+import { MoreThan } from 'typeorm';
+import { Note } from '@/models/entities/note';
+import { Poll } from '@/models/entities/poll';
+import { DbUserJobData } from '@/queue/types';
+
+const logger = queueLogger.createSubLogger('export-notes');
+
+export async function exportNotes(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
+ logger.info(`Exporting notes of ${job.data.user.id} ...`);
+
+ const user = await Users.findOne(job.data.user.id);
+ if (user == null) {
+ done();
+ return;
+ }
+
+ // Create temp file
+ const [path, cleanup] = await new Promise<[string, any]>((res, rej) => {
+ tmp.file((e, path, fd, cleanup) => {
+ if (e) return rej(e);
+ res([path, cleanup]);
+ });
+ });
+
+ logger.info(`Temp file is ${path}`);
+
+ const stream = fs.createWriteStream(path, { flags: 'a' });
+
+ await new Promise<void>((res, rej) => {
+ stream.write('[', err => {
+ if (err) {
+ logger.error(err);
+ rej(err);
+ } else {
+ res();
+ }
+ });
+ });
+
+ let exportedNotesCount = 0;
+ let cursor: any = null;
+
+ while (true) {
+ const notes = await Notes.find({
+ where: {
+ userId: user.id,
+ ...(cursor ? { id: MoreThan(cursor) } : {})
+ },
+ take: 100,
+ order: {
+ id: 1
+ }
+ });
+
+ if (notes.length === 0) {
+ job.progress(100);
+ break;
+ }
+
+ cursor = notes[notes.length - 1].id;
+
+ for (const note of notes) {
+ let poll: Poll | undefined;
+ if (note.hasPoll) {
+ poll = await Polls.findOneOrFail({ noteId: note.id });
+ }
+ const content = JSON.stringify(serialize(note, poll));
+ await new Promise<void>((res, rej) => {
+ stream.write(exportedNotesCount === 0 ? content : ',\n' + content, err => {
+ if (err) {
+ logger.error(err);
+ rej(err);
+ } else {
+ res();
+ }
+ });
+ });
+ exportedNotesCount++;
+ }
+
+ const total = await Notes.count({
+ userId: user.id,
+ });
+
+ job.progress(exportedNotesCount / total);
+ }
+
+ await new Promise<void>((res, rej) => {
+ stream.write(']', err => {
+ if (err) {
+ logger.error(err);
+ rej(err);
+ } else {
+ res();
+ }
+ });
+ });
+
+ stream.end();
+ logger.succ(`Exported to: ${path}`);
+
+ const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.json';
+ const driveFile = await addFile(user, path, fileName, null, null, true);
+
+ logger.succ(`Exported to: ${driveFile.id}`);
+ cleanup();
+ done();
+}
+
+function serialize(note: Note, poll: Poll | null = null): any {
+ return {
+ id: note.id,
+ text: note.text,
+ createdAt: note.createdAt,
+ fileIds: note.fileIds,
+ replyId: note.replyId,
+ renoteId: note.renoteId,
+ poll: poll,
+ cw: note.cw,
+ viaMobile: note.viaMobile,
+ visibility: note.visibility,
+ visibleUserIds: note.visibleUserIds,
+ localOnly: note.localOnly
+ };
+}
diff --git a/packages/backend/src/queue/processors/db/export-user-lists.ts b/packages/backend/src/queue/processors/db/export-user-lists.ts
new file mode 100644
index 0000000000..8a86c4df5d
--- /dev/null
+++ b/packages/backend/src/queue/processors/db/export-user-lists.ts
@@ -0,0 +1,71 @@
+import * as Bull from 'bull';
+import * as tmp from 'tmp';
+import * as fs from 'fs';
+
+import { queueLogger } from '../../logger';
+import addFile from '@/services/drive/add-file';
+import * as dateFormat from 'dateformat';
+import { getFullApAccount } from '@/misc/convert-host';
+import { Users, UserLists, UserListJoinings } from '@/models/index';
+import { In } from 'typeorm';
+import { DbUserJobData } from '@/queue/types';
+
+const logger = queueLogger.createSubLogger('export-user-lists');
+
+export async function exportUserLists(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
+ logger.info(`Exporting user lists of ${job.data.user.id} ...`);
+
+ const user = await Users.findOne(job.data.user.id);
+ if (user == null) {
+ done();
+ return;
+ }
+
+ const lists = await UserLists.find({
+ userId: user.id
+ });
+
+ // Create temp file
+ const [path, cleanup] = await new Promise<[string, any]>((res, rej) => {
+ tmp.file((e, path, fd, cleanup) => {
+ if (e) return rej(e);
+ res([path, cleanup]);
+ });
+ });
+
+ logger.info(`Temp file is ${path}`);
+
+ const stream = fs.createWriteStream(path, { flags: 'a' });
+
+ for (const list of lists) {
+ const joinings = await UserListJoinings.find({ userListId: list.id });
+ const users = await Users.find({
+ id: In(joinings.map(j => j.userId))
+ });
+
+ for (const u of users) {
+ const acct = getFullApAccount(u.username, u.host);
+ const content = `${list.name},${acct}`;
+ await new Promise<void>((res, rej) => {
+ stream.write(content + '\n', err => {
+ if (err) {
+ logger.error(err);
+ rej(err);
+ } else {
+ res();
+ }
+ });
+ });
+ }
+ }
+
+ stream.end();
+ logger.succ(`Exported to: ${path}`);
+
+ const fileName = 'user-lists-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv';
+ const driveFile = await addFile(user, path, fileName, null, null, true);
+
+ logger.succ(`Exported to: ${driveFile.id}`);
+ cleanup();
+ done();
+}
diff --git a/packages/backend/src/queue/processors/db/import-blocking.ts b/packages/backend/src/queue/processors/db/import-blocking.ts
new file mode 100644
index 0000000000..2e77107034
--- /dev/null
+++ b/packages/backend/src/queue/processors/db/import-blocking.ts
@@ -0,0 +1,74 @@
+import * as Bull from 'bull';
+
+import { queueLogger } from '../../logger';
+import * as Acct from 'misskey-js/built/acct';
+import { resolveUser } from '@/remote/resolve-user';
+import { downloadTextFile } from '@/misc/download-text-file';
+import { isSelfHost, toPuny } from '@/misc/convert-host';
+import { Users, DriveFiles, Blockings } from '@/models/index';
+import { DbUserImportJobData } from '@/queue/types';
+import block from '@/services/blocking/create';
+
+const logger = queueLogger.createSubLogger('import-blocking');
+
+export async function importBlocking(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> {
+ logger.info(`Importing blocking of ${job.data.user.id} ...`);
+
+ const user = await Users.findOne(job.data.user.id);
+ if (user == null) {
+ done();
+ return;
+ }
+
+ const file = await DriveFiles.findOne({
+ id: job.data.fileId
+ });
+ if (file == null) {
+ done();
+ return;
+ }
+
+ const csv = await downloadTextFile(file.url);
+
+ let linenum = 0;
+
+ for (const line of csv.trim().split('\n')) {
+ linenum++;
+
+ try {
+ const acct = line.split(',')[0].trim();
+ const { username, host } = Acct.parse(acct);
+
+ let target = isSelfHost(host!) ? await Users.findOne({
+ host: null,
+ usernameLower: username.toLowerCase()
+ }) : await Users.findOne({
+ host: toPuny(host!),
+ usernameLower: username.toLowerCase()
+ });
+
+ if (host == null && target == null) continue;
+
+ if (target == null) {
+ target = await resolveUser(username, host);
+ }
+
+ if (target == null) {
+ throw `cannot resolve user: @${username}@${host}`;
+ }
+
+ // skip myself
+ if (target.id === job.data.user.id) continue;
+
+ logger.info(`Block[${linenum}] ${target.id} ...`);
+
+ await block(user, target);
+ } catch (e) {
+ logger.warn(`Error in line:${linenum} ${e}`);
+ }
+ }
+
+ logger.succ('Imported');
+ done();
+}
+
diff --git a/packages/backend/src/queue/processors/db/import-following.ts b/packages/backend/src/queue/processors/db/import-following.ts
new file mode 100644
index 0000000000..2bd079e4bc
--- /dev/null
+++ b/packages/backend/src/queue/processors/db/import-following.ts
@@ -0,0 +1,73 @@
+import * as Bull from 'bull';
+
+import { queueLogger } from '../../logger';
+import follow from '@/services/following/create';
+import * as Acct from 'misskey-js/built/acct';
+import { resolveUser } from '@/remote/resolve-user';
+import { downloadTextFile } from '@/misc/download-text-file';
+import { isSelfHost, toPuny } from '@/misc/convert-host';
+import { Users, DriveFiles } from '@/models/index';
+import { DbUserImportJobData } from '@/queue/types';
+
+const logger = queueLogger.createSubLogger('import-following');
+
+export async function importFollowing(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> {
+ logger.info(`Importing following of ${job.data.user.id} ...`);
+
+ const user = await Users.findOne(job.data.user.id);
+ if (user == null) {
+ done();
+ return;
+ }
+
+ const file = await DriveFiles.findOne({
+ id: job.data.fileId
+ });
+ if (file == null) {
+ done();
+ return;
+ }
+
+ const csv = await downloadTextFile(file.url);
+
+ let linenum = 0;
+
+ for (const line of csv.trim().split('\n')) {
+ linenum++;
+
+ try {
+ const acct = line.split(',')[0].trim();
+ const { username, host } = Acct.parse(acct);
+
+ let target = isSelfHost(host!) ? await Users.findOne({
+ host: null,
+ usernameLower: username.toLowerCase()
+ }) : await Users.findOne({
+ host: toPuny(host!),
+ usernameLower: username.toLowerCase()
+ });
+
+ if (host == null && target == null) continue;
+
+ if (target == null) {
+ target = await resolveUser(username, host);
+ }
+
+ if (target == null) {
+ throw `cannot resolve user: @${username}@${host}`;
+ }
+
+ // skip myself
+ if (target.id === job.data.user.id) continue;
+
+ logger.info(`Follow[${linenum}] ${target.id} ...`);
+
+ follow(user, target);
+ } catch (e) {
+ logger.warn(`Error in line:${linenum} ${e}`);
+ }
+ }
+
+ logger.succ('Imported');
+ done();
+}
diff --git a/packages/backend/src/queue/processors/db/import-muting.ts b/packages/backend/src/queue/processors/db/import-muting.ts
new file mode 100644
index 0000000000..8060980625
--- /dev/null
+++ b/packages/backend/src/queue/processors/db/import-muting.ts
@@ -0,0 +1,83 @@
+import * as Bull from 'bull';
+
+import { queueLogger } from '../../logger';
+import * as Acct from 'misskey-js/built/acct';
+import { resolveUser } from '@/remote/resolve-user';
+import { downloadTextFile } from '@/misc/download-text-file';
+import { isSelfHost, toPuny } from '@/misc/convert-host';
+import { Users, DriveFiles, Mutings } from '@/models/index';
+import { DbUserImportJobData } from '@/queue/types';
+import { User } from '@/models/entities/user';
+import { genId } from '@/misc/gen-id';
+
+const logger = queueLogger.createSubLogger('import-muting');
+
+export async function importMuting(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> {
+ logger.info(`Importing muting of ${job.data.user.id} ...`);
+
+ const user = await Users.findOne(job.data.user.id);
+ if (user == null) {
+ done();
+ return;
+ }
+
+ const file = await DriveFiles.findOne({
+ id: job.data.fileId
+ });
+ if (file == null) {
+ done();
+ return;
+ }
+
+ const csv = await downloadTextFile(file.url);
+
+ let linenum = 0;
+
+ for (const line of csv.trim().split('\n')) {
+ linenum++;
+
+ try {
+ const acct = line.split(',')[0].trim();
+ const { username, host } = Acct.parse(acct);
+
+ let target = isSelfHost(host!) ? await Users.findOne({
+ host: null,
+ usernameLower: username.toLowerCase()
+ }) : await Users.findOne({
+ host: toPuny(host!),
+ usernameLower: username.toLowerCase()
+ });
+
+ if (host == null && target == null) continue;
+
+ if (target == null) {
+ target = await resolveUser(username, host);
+ }
+
+ if (target == null) {
+ throw `cannot resolve user: @${username}@${host}`;
+ }
+
+ // skip myself
+ if (target.id === job.data.user.id) continue;
+
+ logger.info(`Mute[${linenum}] ${target.id} ...`);
+
+ await mute(user, target);
+ } catch (e) {
+ logger.warn(`Error in line:${linenum} ${e}`);
+ }
+ }
+
+ logger.succ('Imported');
+ done();
+}
+
+async function mute(user: User, target: User) {
+ await Mutings.insert({
+ id: genId(),
+ createdAt: new Date(),
+ muterId: user.id,
+ muteeId: target.id,
+ });
+}
diff --git a/packages/backend/src/queue/processors/db/import-user-lists.ts b/packages/backend/src/queue/processors/db/import-user-lists.ts
new file mode 100644
index 0000000000..46b728b387
--- /dev/null
+++ b/packages/backend/src/queue/processors/db/import-user-lists.ts
@@ -0,0 +1,80 @@
+import * as Bull from 'bull';
+
+import { queueLogger } from '../../logger';
+import * as Acct from 'misskey-js/built/acct';
+import { resolveUser } from '@/remote/resolve-user';
+import { pushUserToUserList } from '@/services/user-list/push';
+import { downloadTextFile } from '@/misc/download-text-file';
+import { isSelfHost, toPuny } from '@/misc/convert-host';
+import { DriveFiles, Users, UserLists, UserListJoinings } from '@/models/index';
+import { genId } from '@/misc/gen-id';
+import { DbUserImportJobData } from '@/queue/types';
+
+const logger = queueLogger.createSubLogger('import-user-lists');
+
+export async function importUserLists(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> {
+ logger.info(`Importing user lists of ${job.data.user.id} ...`);
+
+ const user = await Users.findOne(job.data.user.id);
+ if (user == null) {
+ done();
+ return;
+ }
+
+ const file = await DriveFiles.findOne({
+ id: job.data.fileId
+ });
+ if (file == null) {
+ done();
+ return;
+ }
+
+ const csv = await downloadTextFile(file.url);
+
+ let linenum = 0;
+
+ for (const line of csv.trim().split('\n')) {
+ linenum++;
+
+ try {
+ const listName = line.split(',')[0].trim();
+ const { username, host } = Acct.parse(line.split(',')[1].trim());
+
+ let list = await UserLists.findOne({
+ userId: user.id,
+ name: listName
+ });
+
+ if (list == null) {
+ list = await UserLists.save({
+ id: genId(),
+ createdAt: new Date(),
+ userId: user.id,
+ name: listName,
+ userIds: []
+ });
+ }
+
+ let target = isSelfHost(host!) ? await Users.findOne({
+ host: null,
+ usernameLower: username.toLowerCase()
+ }) : await Users.findOne({
+ host: toPuny(host!),
+ usernameLower: username.toLowerCase()
+ });
+
+ if (target == null) {
+ target = await resolveUser(username, host);
+ }
+
+ if (await UserListJoinings.findOne({ userListId: list.id, userId: target.id }) != null) continue;
+
+ pushUserToUserList(target, list);
+ } catch (e) {
+ logger.warn(`Error in line:${linenum} ${e}`);
+ }
+ }
+
+ logger.succ('Imported');
+ done();
+}
diff --git a/packages/backend/src/queue/processors/db/index.ts b/packages/backend/src/queue/processors/db/index.ts
new file mode 100644
index 0000000000..97087642b7
--- /dev/null
+++ b/packages/backend/src/queue/processors/db/index.ts
@@ -0,0 +1,33 @@
+import * as Bull from 'bull';
+import { DbJobData } from '@/queue/types';
+import { deleteDriveFiles } from './delete-drive-files';
+import { exportNotes } from './export-notes';
+import { exportFollowing } from './export-following';
+import { exportMute } from './export-mute';
+import { exportBlocking } from './export-blocking';
+import { exportUserLists } from './export-user-lists';
+import { importFollowing } from './import-following';
+import { importUserLists } from './import-user-lists';
+import { deleteAccount } from './delete-account';
+import { importMuting } from './import-muting';
+import { importBlocking } from './import-blocking';
+
+const jobs = {
+ deleteDriveFiles,
+ exportNotes,
+ exportFollowing,
+ exportMute,
+ exportBlocking,
+ exportUserLists,
+ importFollowing,
+ importMuting,
+ importBlocking,
+ importUserLists,
+ deleteAccount,
+} as Record<string, Bull.ProcessCallbackFunction<DbJobData> | Bull.ProcessPromiseFunction<DbJobData>>;
+
+export default function(dbQueue: Bull.Queue<DbJobData>) {
+ for (const [k, v] of Object.entries(jobs)) {
+ dbQueue.process(k, v);
+ }
+}
diff --git a/packages/backend/src/queue/processors/deliver.ts b/packages/backend/src/queue/processors/deliver.ts
new file mode 100644
index 0000000000..3c61896a2f
--- /dev/null
+++ b/packages/backend/src/queue/processors/deliver.ts
@@ -0,0 +1,94 @@
+import { URL } from 'url';
+import * as Bull from 'bull';
+import request from '@/remote/activitypub/request';
+import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc';
+import Logger from '@/services/logger';
+import { Instances } from '@/models/index';
+import { instanceChart } from '@/services/chart/index';
+import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata';
+import { fetchMeta } from '@/misc/fetch-meta';
+import { toPuny } from '@/misc/convert-host';
+import { Cache } from '@/misc/cache';
+import { Instance } from '@/models/entities/instance';
+import { DeliverJobData } from '../types';
+import { StatusError } from '@/misc/fetch';
+
+const logger = new Logger('deliver');
+
+let latest: string | null = null;
+
+const suspendedHostsCache = new Cache<Instance[]>(1000 * 60 * 60);
+
+export default async (job: Bull.Job<DeliverJobData>) => {
+ const { host } = new URL(job.data.to);
+
+ // ブロックしてたら中断
+ const meta = await fetchMeta();
+ if (meta.blockedHosts.includes(toPuny(host))) {
+ return 'skip (blocked)';
+ }
+
+ // isSuspendedなら中断
+ let suspendedHosts = suspendedHostsCache.get(null);
+ if (suspendedHosts == null) {
+ suspendedHosts = await Instances.find({
+ where: {
+ isSuspended: true
+ },
+ });
+ suspendedHostsCache.set(null, suspendedHosts);
+ }
+ if (suspendedHosts.map(x => x.host).includes(toPuny(host))) {
+ return 'skip (suspended)';
+ }
+
+ try {
+ if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) {
+ logger.debug(`delivering ${latest}`);
+ }
+
+ await request(job.data.user, job.data.to, job.data.content);
+
+ // Update stats
+ registerOrFetchInstanceDoc(host).then(i => {
+ Instances.update(i.id, {
+ latestRequestSentAt: new Date(),
+ latestStatus: 200,
+ lastCommunicatedAt: new Date(),
+ isNotResponding: false
+ });
+
+ fetchInstanceMetadata(i);
+
+ instanceChart.requestSent(i.host, true);
+ });
+
+ return 'Success';
+ } catch (res) {
+ // Update stats
+ registerOrFetchInstanceDoc(host).then(i => {
+ Instances.update(i.id, {
+ latestRequestSentAt: new Date(),
+ latestStatus: res instanceof StatusError ? res.statusCode : null,
+ isNotResponding: true
+ });
+
+ instanceChart.requestSent(i.host, false);
+ });
+
+ if (res instanceof StatusError) {
+ // 4xx
+ if (res.isClientError) {
+ // HTTPステータスコード4xxはクライアントエラーであり、それはつまり
+ // 何回再送しても成功することはないということなのでエラーにはしないでおく
+ return `${res.statusCode} ${res.statusMessage}`;
+ }
+
+ // 5xx etc.
+ throw `${res.statusCode} ${res.statusMessage}`;
+ } else {
+ // DNS error, socket error, timeout ...
+ throw res;
+ }
+ }
+};
diff --git a/packages/backend/src/queue/processors/inbox.ts b/packages/backend/src/queue/processors/inbox.ts
new file mode 100644
index 0000000000..4032ce8653
--- /dev/null
+++ b/packages/backend/src/queue/processors/inbox.ts
@@ -0,0 +1,149 @@
+import { URL } from 'url';
+import * as Bull from 'bull';
+import * as httpSignature from 'http-signature';
+import perform from '@/remote/activitypub/perform';
+import Logger from '@/services/logger';
+import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc';
+import { Instances } from '@/models/index';
+import { instanceChart } from '@/services/chart/index';
+import { fetchMeta } from '@/misc/fetch-meta';
+import { toPuny, extractDbHost } from '@/misc/convert-host';
+import { getApId } from '@/remote/activitypub/type';
+import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata';
+import { InboxJobData } from '../types';
+import DbResolver from '@/remote/activitypub/db-resolver';
+import { resolvePerson } from '@/remote/activitypub/models/person';
+import { LdSignature } from '@/remote/activitypub/misc/ld-signature';
+import { StatusError } from '@/misc/fetch';
+
+const logger = new Logger('inbox');
+
+// ユーザーのinboxにアクティビティが届いた時の処理
+export default async (job: Bull.Job<InboxJobData>): Promise<string> => {
+ const signature = job.data.signature; // HTTP-signature
+ const activity = job.data.activity;
+
+ //#region Log
+ const info = Object.assign({}, activity) as any;
+ delete info['@context'];
+ logger.debug(JSON.stringify(info, null, 2));
+ //#endregion
+
+ const host = toPuny(new URL(signature.keyId).hostname);
+
+ // ブロックしてたら中断
+ const meta = await fetchMeta();
+ if (meta.blockedHosts.includes(host)) {
+ return `Blocked request: ${host}`;
+ }
+
+ const keyIdLower = signature.keyId.toLowerCase();
+ if (keyIdLower.startsWith('acct:')) {
+ return `Old keyId is no longer supported. ${keyIdLower}`;
+ }
+
+ // TDOO: キャッシュ
+ const dbResolver = new DbResolver();
+
+ // HTTP-Signature keyIdを元にDBから取得
+ let authUser = await dbResolver.getAuthUserFromKeyId(signature.keyId);
+
+ // keyIdでわからなければ、activity.actorを元にDBから取得 || activity.actorを元にリモートから取得
+ if (authUser == null) {
+ try {
+ authUser = await dbResolver.getAuthUserFromApId(getApId(activity.actor));
+ } catch (e) {
+ // 対象が4xxならスキップ
+ if (e instanceof StatusError && e.isClientError) {
+ return `skip: Ignored deleted actors on both ends ${activity.actor} - ${e.statusCode}`;
+ }
+ throw `Error in actor ${activity.actor} - ${e.statusCode || e}`;
+ }
+ }
+
+ // それでもわからなければ終了
+ if (authUser == null) {
+ return `skip: failed to resolve user`;
+ }
+
+ // publicKey がなくても終了
+ if (authUser.key == null) {
+ return `skip: failed to resolve user publicKey`;
+ }
+
+ // HTTP-Signatureの検証
+ const httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem);
+
+ // また、signatureのsignerは、activity.actorと一致する必要がある
+ if (!httpSignatureValidated || authUser.user.uri !== activity.actor) {
+ // 一致しなくても、でもLD-Signatureがありそうならそっちも見る
+ if (activity.signature) {
+ if (activity.signature.type !== 'RsaSignature2017') {
+ return `skip: unsupported LD-signature type ${activity.signature.type}`;
+ }
+
+ // activity.signature.creator: https://example.oom/users/user#main-key
+ // みたいになっててUserを引っ張れば公開キーも入ることを期待する
+ if (activity.signature.creator) {
+ const candicate = activity.signature.creator.replace(/#.*/, '');
+ await resolvePerson(candicate).catch(() => null);
+ }
+
+ // keyIdからLD-Signatureのユーザーを取得
+ authUser = await dbResolver.getAuthUserFromKeyId(activity.signature.creator);
+ if (authUser == null) {
+ return `skip: LD-Signatureのユーザーが取得できませんでした`;
+ }
+
+ if (authUser.key == null) {
+ return `skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした`;
+ }
+
+ // LD-Signature検証
+ const ldSignature = new LdSignature();
+ const verified = await ldSignature.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false);
+ if (!verified) {
+ return `skip: LD-Signatureの検証に失敗しました`;
+ }
+
+ // もう一度actorチェック
+ if (authUser.user.uri !== activity.actor) {
+ return `skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`;
+ }
+
+ // ブロックしてたら中断
+ const ldHost = extractDbHost(authUser.user.uri);
+ if (meta.blockedHosts.includes(ldHost)) {
+ return `Blocked request: ${ldHost}`;
+ }
+ } else {
+ return `skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`;
+ }
+ }
+
+ // activity.idがあればホストが署名者のホストであることを確認する
+ if (typeof activity.id === 'string') {
+ const signerHost = extractDbHost(authUser.user.uri!);
+ const activityIdHost = extractDbHost(activity.id);
+ if (signerHost !== activityIdHost) {
+ return `skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`;
+ }
+ }
+
+ // Update stats
+ registerOrFetchInstanceDoc(authUser.user.host).then(i => {
+ Instances.update(i.id, {
+ latestRequestReceivedAt: new Date(),
+ lastCommunicatedAt: new Date(),
+ isNotResponding: false
+ });
+
+ fetchInstanceMetadata(i);
+
+ instanceChart.requestReceived(i.host);
+ });
+
+ // アクティビティを処理
+ await perform(authUser.user, activity);
+ return `ok`;
+};
diff --git a/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts b/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts
new file mode 100644
index 0000000000..3b2e4ea939
--- /dev/null
+++ b/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts
@@ -0,0 +1,50 @@
+import * as Bull from 'bull';
+
+import { queueLogger } from '../../logger';
+import { deleteFileSync } from '@/services/drive/delete-file';
+import { DriveFiles } from '@/models/index';
+import { MoreThan, Not, IsNull } from 'typeorm';
+
+const logger = queueLogger.createSubLogger('clean-remote-files');
+
+export default async function cleanRemoteFiles(job: Bull.Job<{}>, done: any): Promise<void> {
+ logger.info(`Deleting cached remote files...`);
+
+ let deletedCount = 0;
+ let cursor: any = null;
+
+ while (true) {
+ const files = await DriveFiles.find({
+ where: {
+ userHost: Not(IsNull()),
+ isLink: false,
+ ...(cursor ? { id: MoreThan(cursor) } : {})
+ },
+ take: 8,
+ order: {
+ id: 1
+ }
+ });
+
+ if (files.length === 0) {
+ job.progress(100);
+ break;
+ }
+
+ cursor = files[files.length - 1].id;
+
+ await Promise.all(files.map(file => deleteFileSync(file, true)));
+
+ deletedCount += 8;
+
+ const total = await DriveFiles.count({
+ userHost: Not(IsNull()),
+ isLink: false,
+ });
+
+ job.progress(deletedCount / total);
+ }
+
+ logger.succ(`All cahced remote files has been deleted.`);
+ done();
+}
diff --git a/packages/backend/src/queue/processors/object-storage/delete-file.ts b/packages/backend/src/queue/processors/object-storage/delete-file.ts
new file mode 100644
index 0000000000..ed22968a27
--- /dev/null
+++ b/packages/backend/src/queue/processors/object-storage/delete-file.ts
@@ -0,0 +1,11 @@
+import { ObjectStorageFileJobData } from '@/queue/types';
+import * as Bull from 'bull';
+import { deleteObjectStorageFile } from '@/services/drive/delete-file';
+
+export default async (job: Bull.Job<ObjectStorageFileJobData>) => {
+ const key: string = job.data.key;
+
+ await deleteObjectStorageFile(key);
+
+ return 'Success';
+};
diff --git a/packages/backend/src/queue/processors/object-storage/index.ts b/packages/backend/src/queue/processors/object-storage/index.ts
new file mode 100644
index 0000000000..0d9570e179
--- /dev/null
+++ b/packages/backend/src/queue/processors/object-storage/index.ts
@@ -0,0 +1,15 @@
+import * as Bull from 'bull';
+import { ObjectStorageJobData } from '@/queue/types';
+import deleteFile from './delete-file';
+import cleanRemoteFiles from './clean-remote-files';
+
+const jobs = {
+ deleteFile,
+ cleanRemoteFiles,
+} as Record<string, Bull.ProcessCallbackFunction<ObjectStorageJobData> | Bull.ProcessPromiseFunction<ObjectStorageJobData>>;
+
+export default function(q: Bull.Queue) {
+ for (const [k, v] of Object.entries(jobs)) {
+ q.process(k, 16, v);
+ }
+}
diff --git a/packages/backend/src/queue/processors/system/index.ts b/packages/backend/src/queue/processors/system/index.ts
new file mode 100644
index 0000000000..52b7868105
--- /dev/null
+++ b/packages/backend/src/queue/processors/system/index.ts
@@ -0,0 +1,12 @@
+import * as Bull from 'bull';
+import { resyncCharts } from './resync-charts';
+
+const jobs = {
+ resyncCharts,
+} as Record<string, Bull.ProcessCallbackFunction<{}> | Bull.ProcessPromiseFunction<{}>>;
+
+export default function(dbQueue: Bull.Queue<{}>) {
+ for (const [k, v] of Object.entries(jobs)) {
+ dbQueue.process(k, v);
+ }
+}
diff --git a/packages/backend/src/queue/processors/system/resync-charts.ts b/packages/backend/src/queue/processors/system/resync-charts.ts
new file mode 100644
index 0000000000..b36b024cfb
--- /dev/null
+++ b/packages/backend/src/queue/processors/system/resync-charts.ts
@@ -0,0 +1,21 @@
+import * as Bull from 'bull';
+
+import { queueLogger } from '../../logger';
+import { driveChart, notesChart, usersChart } from '@/services/chart/index';
+
+const logger = queueLogger.createSubLogger('resync-charts');
+
+export default async function resyncCharts(job: Bull.Job<{}>, done: any): Promise<void> {
+ logger.info(`Resync charts...`);
+
+ // TODO: ユーザーごとのチャートも更新する
+ // TODO: インスタンスごとのチャートも更新する
+ await Promise.all([
+ driveChart.resync(),
+ notesChart.resync(),
+ usersChart.resync(),
+ ]);
+
+ logger.succ(`All charts successfully resynced.`);
+ done();
+}
diff --git a/packages/backend/src/queue/queues.ts b/packages/backend/src/queue/queues.ts
new file mode 100644
index 0000000000..a66a7ca451
--- /dev/null
+++ b/packages/backend/src/queue/queues.ts
@@ -0,0 +1,9 @@
+import config from '@/config/index';
+import { initialize as initializeQueue } from './initialize';
+import { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData } from './types';
+
+export const systemQueue = initializeQueue<{}>('system');
+export const deliverQueue = initializeQueue<DeliverJobData>('deliver', config.deliverJobPerSec || 128);
+export const inboxQueue = initializeQueue<InboxJobData>('inbox', config.inboxJobPerSec || 16);
+export const dbQueue = initializeQueue<DbJobData>('db');
+export const objectStorageQueue = initializeQueue<ObjectStorageJobData>('objectStorage');
diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts
new file mode 100644
index 0000000000..39cab29966
--- /dev/null
+++ b/packages/backend/src/queue/types.ts
@@ -0,0 +1,44 @@
+import { DriveFile } from '@/models/entities/drive-file';
+import { User } from '@/models/entities/user';
+import { IActivity } from '@/remote/activitypub/type';
+import * as httpSignature from 'http-signature';
+
+export type DeliverJobData = {
+ /** Actor */
+ user: ThinUser;
+ /** Activity */
+ content: unknown;
+ /** inbox URL to deliver */
+ to: string;
+};
+
+export type InboxJobData = {
+ activity: IActivity;
+ signature: httpSignature.IParsedSignature;
+};
+
+export type DbJobData = DbUserJobData | DbUserImportJobData | DbUserDeleteJobData;
+
+export type DbUserJobData = {
+ user: ThinUser;
+};
+
+export type DbUserDeleteJobData = {
+ user: ThinUser;
+ soft?: boolean;
+};
+
+export type DbUserImportJobData = {
+ user: ThinUser;
+ fileId: DriveFile['id'];
+};
+
+export type ObjectStorageJobData = ObjectStorageFileJobData | {};
+
+export type ObjectStorageFileJobData = {
+ key: string;
+};
+
+export type ThinUser = {
+ id: User['id'];
+};