summaryrefslogtreecommitdiff
path: root/src/queue
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2021-10-23 01:08:45 +0900
committersyuilo <Syuilotan@yahoo.co.jp>2021-10-23 01:08:45 +0900
commitd0d5068f728e13f3ebe1dc227ddaacf380817ec4 (patch)
tree7bb95207e01bff1bee9877829c0556d3ecf62176 /src/queue
parentMerge branch 'develop' (diff)
parent12.93.0 (diff)
downloadmisskey-d0d5068f728e13f3ebe1dc227ddaacf380817ec4.tar.gz
misskey-d0d5068f728e13f3ebe1dc227ddaacf380817ec4.tar.bz2
misskey-d0d5068f728e13f3ebe1dc227ddaacf380817ec4.zip
Merge branch 'develop'
Diffstat (limited to 'src/queue')
-rw-r--r--src/queue/index.ts48
-rw-r--r--src/queue/processors/db/import-blocking.ts74
-rw-r--r--src/queue/processors/db/import-muting.ts83
-rw-r--r--src/queue/processors/db/index.ts4
-rw-r--r--src/queue/processors/system/index.ts12
-rw-r--r--src/queue/processors/system/resync-charts.ts21
-rw-r--r--src/queue/queues.ts1
7 files changed, 236 insertions, 7 deletions
diff --git a/src/queue/index.ts b/src/queue/index.ts
index 1e1d5da5a2..37eb809604 100644
--- a/src/queue/index.ts
+++ b/src/queue/index.ts
@@ -10,7 +10,7 @@ import procesObjectStorage from './processors/object-storage/index';
import { queueLogger } from './logger';
import { DriveFile } from '@/models/entities/drive-file';
import { getJobInfo } from './get-job-info';
-import { dbQueue, deliverQueue, inboxQueue, objectStorageQueue } from './queues';
+import { systemQueue, dbQueue, deliverQueue, inboxQueue, objectStorageQueue } from './queues';
import { ThinUser } from './types';
import { IActivity } from '@/remote/activitypub/type';
@@ -22,11 +22,20 @@ function renderError(e: Error): any {
};
}
+const systemLogger = queueLogger.createSubLogger('system');
const deliverLogger = queueLogger.createSubLogger('deliver');
const inboxLogger = queueLogger.createSubLogger('inbox');
const dbLogger = queueLogger.createSubLogger('db');
const objectStorageLogger = queueLogger.createSubLogger('objectStorage');
+systemQueue
+ .on('waiting', (jobId) => systemLogger.debug(`waiting id=${jobId}`))
+ .on('active', (job) => systemLogger.debug(`active id=${job.id}`))
+ .on('completed', (job, result) => systemLogger.debug(`completed(${result}) id=${job.id}`))
+ .on('failed', (job, err) => systemLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
+ .on('error', (job: any, err: Error) => systemLogger.error(`error ${err}`, { job, e: renderError(err) }))
+ .on('stalled', (job) => systemLogger.warn(`stalled id=${job.id}`));
+
deliverQueue
.on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`))
.on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
@@ -163,6 +172,26 @@ export function createImportFollowingJob(user: ThinUser, fileId: DriveFile['id']
});
}
+export function createImportMutingJob(user: ThinUser, fileId: DriveFile['id']) {
+ return dbQueue.add('importMuting', {
+ user: user,
+ fileId: fileId
+ }, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
+}
+
+export function createImportBlockingJob(user: ThinUser, fileId: DriveFile['id']) {
+ return dbQueue.add('importBlocking', {
+ user: user,
+ fileId: fileId
+ }, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
+}
+
export function createImportUserListsJob(user: ThinUser, fileId: DriveFile['id']) {
return dbQueue.add('importUserLists', {
user: user,
@@ -200,12 +229,17 @@ export function createCleanRemoteFilesJob() {
}
export default function() {
- if (!envOption.onlyServer) {
- deliverQueue.process(config.deliverJobConcurrency || 128, processDeliver);
- inboxQueue.process(config.inboxJobConcurrency || 16, processInbox);
- processDb(dbQueue);
- procesObjectStorage(objectStorageQueue);
- }
+ if (envOption.onlyServer) return;
+
+ deliverQueue.process(config.deliverJobConcurrency || 128, processDeliver);
+ inboxQueue.process(config.inboxJobConcurrency || 16, processInbox);
+ processDb(dbQueue);
+ procesObjectStorage(objectStorageQueue);
+
+ systemQueue.add('resyncCharts', {
+ }, {
+ repeat: { cron: '0 0 * * *' }
+ });
}
export function destroy() {
diff --git a/src/queue/processors/db/import-blocking.ts b/src/queue/processors/db/import-blocking.ts
new file mode 100644
index 0000000000..9951da669d
--- /dev/null
+++ b/src/queue/processors/db/import-blocking.ts
@@ -0,0 +1,74 @@
+import * as Bull from 'bull';
+
+import { queueLogger } from '../../logger';
+import { parseAcct } from '@/misc/acct';
+import { resolveUser } from '@/remote/resolve-user';
+import { downloadTextFile } from '@/misc/download-text-file';
+import { isSelfHost, toPuny } from '@/misc/convert-host';
+import { Users, DriveFiles, Blockings } from '@/models/index';
+import { DbUserImportJobData } from '@/queue/types';
+import block from '@/services/blocking/create';
+
+const logger = queueLogger.createSubLogger('import-blocking');
+
+export async function importBlocking(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> {
+ logger.info(`Importing blocking of ${job.data.user.id} ...`);
+
+ const user = await Users.findOne(job.data.user.id);
+ if (user == null) {
+ done();
+ return;
+ }
+
+ const file = await DriveFiles.findOne({
+ id: job.data.fileId
+ });
+ if (file == null) {
+ done();
+ return;
+ }
+
+ const csv = await downloadTextFile(file.url);
+
+ let linenum = 0;
+
+ for (const line of csv.trim().split('\n')) {
+ linenum++;
+
+ try {
+ const acct = line.split(',')[0].trim();
+ const { username, host } = parseAcct(acct);
+
+ let target = isSelfHost(host!) ? await Users.findOne({
+ host: null,
+ usernameLower: username.toLowerCase()
+ }) : await Users.findOne({
+ host: toPuny(host!),
+ usernameLower: username.toLowerCase()
+ });
+
+ if (host == null && target == null) continue;
+
+ if (target == null) {
+ target = await resolveUser(username, host);
+ }
+
+ if (target == null) {
+ throw `cannot resolve user: @${username}@${host}`;
+ }
+
+ // skip myself
+ if (target.id === job.data.user.id) continue;
+
+ logger.info(`Block[${linenum}] ${target.id} ...`);
+
+ await block(user, target);
+ } catch (e) {
+ logger.warn(`Error in line:${linenum} ${e}`);
+ }
+ }
+
+ logger.succ('Imported');
+ done();
+}
+
diff --git a/src/queue/processors/db/import-muting.ts b/src/queue/processors/db/import-muting.ts
new file mode 100644
index 0000000000..798f03a627
--- /dev/null
+++ b/src/queue/processors/db/import-muting.ts
@@ -0,0 +1,83 @@
+import * as Bull from 'bull';
+
+import { queueLogger } from '../../logger';
+import { parseAcct } from '@/misc/acct';
+import { resolveUser } from '@/remote/resolve-user';
+import { downloadTextFile } from '@/misc/download-text-file';
+import { isSelfHost, toPuny } from '@/misc/convert-host';
+import { Users, DriveFiles, Mutings } from '@/models/index';
+import { DbUserImportJobData } from '@/queue/types';
+import { User } from '@/models/entities/user';
+import { genId } from '@/misc/gen-id';
+
+const logger = queueLogger.createSubLogger('import-muting');
+
+export async function importMuting(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> {
+ logger.info(`Importing muting of ${job.data.user.id} ...`);
+
+ const user = await Users.findOne(job.data.user.id);
+ if (user == null) {
+ done();
+ return;
+ }
+
+ const file = await DriveFiles.findOne({
+ id: job.data.fileId
+ });
+ if (file == null) {
+ done();
+ return;
+ }
+
+ const csv = await downloadTextFile(file.url);
+
+ let linenum = 0;
+
+ for (const line of csv.trim().split('\n')) {
+ linenum++;
+
+ try {
+ const acct = line.split(',')[0].trim();
+ const { username, host } = parseAcct(acct);
+
+ let target = isSelfHost(host!) ? await Users.findOne({
+ host: null,
+ usernameLower: username.toLowerCase()
+ }) : await Users.findOne({
+ host: toPuny(host!),
+ usernameLower: username.toLowerCase()
+ });
+
+ if (host == null && target == null) continue;
+
+ if (target == null) {
+ target = await resolveUser(username, host);
+ }
+
+ if (target == null) {
+ throw `cannot resolve user: @${username}@${host}`;
+ }
+
+ // skip myself
+ if (target.id === job.data.user.id) continue;
+
+ logger.info(`Mute[${linenum}] ${target.id} ...`);
+
+ await mute(user, target);
+ } catch (e) {
+ logger.warn(`Error in line:${linenum} ${e}`);
+ }
+ }
+
+ logger.succ('Imported');
+ done();
+}
+
+async function mute(user: User, target: User) {
+ await Mutings.insert({
+ id: genId(),
+ createdAt: new Date(),
+ muterId: user.id,
+ muteeId: target.id,
+ });
+}
diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts
index b051a28e0b..97087642b7 100644
--- a/src/queue/processors/db/index.ts
+++ b/src/queue/processors/db/index.ts
@@ -9,6 +9,8 @@ import { exportUserLists } from './export-user-lists';
import { importFollowing } from './import-following';
import { importUserLists } from './import-user-lists';
import { deleteAccount } from './delete-account';
+import { importMuting } from './import-muting';
+import { importBlocking } from './import-blocking';
const jobs = {
deleteDriveFiles,
@@ -18,6 +20,8 @@ const jobs = {
exportBlocking,
exportUserLists,
importFollowing,
+ importMuting,
+ importBlocking,
importUserLists,
deleteAccount,
} as Record<string, Bull.ProcessCallbackFunction<DbJobData> | Bull.ProcessPromiseFunction<DbJobData>>;
diff --git a/src/queue/processors/system/index.ts b/src/queue/processors/system/index.ts
new file mode 100644
index 0000000000..52b7868105
--- /dev/null
+++ b/src/queue/processors/system/index.ts
@@ -0,0 +1,12 @@
+import * as Bull from 'bull';
+import { resyncCharts } from './resync-charts';
+
+const jobs = {
+ resyncCharts,
+} as Record<string, Bull.ProcessCallbackFunction<{}> | Bull.ProcessPromiseFunction<{}>>;
+
+export default function(dbQueue: Bull.Queue<{}>) {
+ for (const [k, v] of Object.entries(jobs)) {
+ dbQueue.process(k, v);
+ }
+}
diff --git a/src/queue/processors/system/resync-charts.ts b/src/queue/processors/system/resync-charts.ts
new file mode 100644
index 0000000000..b36b024cfb
--- /dev/null
+++ b/src/queue/processors/system/resync-charts.ts
@@ -0,0 +1,21 @@
+import * as Bull from 'bull';
+
+import { queueLogger } from '../../logger';
+import { driveChart, notesChart, usersChart } from '@/services/chart/index';
+
+const logger = queueLogger.createSubLogger('resync-charts');
+
+export default async function resyncCharts(job: Bull.Job<{}>, done: any): Promise<void> {
+ logger.info(`Resync charts...`);
+
+ // TODO: ユーザーごとのチャートも更新する
+ // TODO: インスタンスごとのチャートも更新する
+ await Promise.all([
+ driveChart.resync(),
+ notesChart.resync(),
+ usersChart.resync(),
+ ]);
+
+ logger.succ(`All charts successfully resynced.`);
+ done();
+}
diff --git a/src/queue/queues.ts b/src/queue/queues.ts
index d8c09ef86e..a66a7ca451 100644
--- a/src/queue/queues.ts
+++ b/src/queue/queues.ts
@@ -2,6 +2,7 @@ import config from '@/config/index';
import { initialize as initializeQueue } from './initialize';
import { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData } from './types';
+export const systemQueue = initializeQueue<{}>('system');
export const deliverQueue = initializeQueue<DeliverJobData>('deliver', config.deliverJobPerSec || 128);
export const inboxQueue = initializeQueue<InboxJobData>('inbox', config.inboxJobPerSec || 16);
export const dbQueue = initializeQueue<DbJobData>('db');