diff options
| author | syuilo <Syuilotan@yahoo.co.jp> | 2021-10-23 01:08:45 +0900 |
|---|---|---|
| committer | syuilo <Syuilotan@yahoo.co.jp> | 2021-10-23 01:08:45 +0900 |
| commit | d0d5068f728e13f3ebe1dc227ddaacf380817ec4 (patch) | |
| tree | 7bb95207e01bff1bee9877829c0556d3ecf62176 /src/queue | |
| parent | Merge branch 'develop' (diff) | |
| parent | 12.93.0 (diff) | |
| download | misskey-d0d5068f728e13f3ebe1dc227ddaacf380817ec4.tar.gz misskey-d0d5068f728e13f3ebe1dc227ddaacf380817ec4.tar.bz2 misskey-d0d5068f728e13f3ebe1dc227ddaacf380817ec4.zip | |
Merge branch 'develop'
Diffstat (limited to 'src/queue')
| -rw-r--r-- | src/queue/index.ts | 48 | ||||
| -rw-r--r-- | src/queue/processors/db/import-blocking.ts | 74 | ||||
| -rw-r--r-- | src/queue/processors/db/import-muting.ts | 83 | ||||
| -rw-r--r-- | src/queue/processors/db/index.ts | 4 | ||||
| -rw-r--r-- | src/queue/processors/system/index.ts | 12 | ||||
| -rw-r--r-- | src/queue/processors/system/resync-charts.ts | 21 | ||||
| -rw-r--r-- | src/queue/queues.ts | 1 |
7 files changed, 236 insertions, 7 deletions
diff --git a/src/queue/index.ts b/src/queue/index.ts index 1e1d5da5a2..37eb809604 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -10,7 +10,7 @@ import procesObjectStorage from './processors/object-storage/index'; import { queueLogger } from './logger'; import { DriveFile } from '@/models/entities/drive-file'; import { getJobInfo } from './get-job-info'; -import { dbQueue, deliverQueue, inboxQueue, objectStorageQueue } from './queues'; +import { systemQueue, dbQueue, deliverQueue, inboxQueue, objectStorageQueue } from './queues'; import { ThinUser } from './types'; import { IActivity } from '@/remote/activitypub/type'; @@ -22,11 +22,20 @@ function renderError(e: Error): any { }; } +const systemLogger = queueLogger.createSubLogger('system'); const deliverLogger = queueLogger.createSubLogger('deliver'); const inboxLogger = queueLogger.createSubLogger('inbox'); const dbLogger = queueLogger.createSubLogger('db'); const objectStorageLogger = queueLogger.createSubLogger('objectStorage'); +systemQueue + .on('waiting', (jobId) => systemLogger.debug(`waiting id=${jobId}`)) + .on('active', (job) => systemLogger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => systemLogger.debug(`completed(${result}) id=${job.id}`)) + .on('failed', (job, err) => systemLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) })) + .on('error', (job: any, err: Error) => systemLogger.error(`error ${err}`, { job, e: renderError(err) })) + .on('stalled', (job) => systemLogger.warn(`stalled id=${job.id}`)); + deliverQueue .on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`)) .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) @@ -163,6 +172,26 @@ export function createImportFollowingJob(user: ThinUser, fileId: DriveFile['id'] }); } +export function createImportMutingJob(user: ThinUser, fileId: DriveFile['id']) { + return dbQueue.add('importMuting', { + user: user, + fileId: fileId + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + +export function createImportBlockingJob(user: ThinUser, fileId: DriveFile['id']) { + return dbQueue.add('importBlocking', { + user: user, + fileId: fileId + }, { + removeOnComplete: true, + removeOnFail: true + }); +} + export function createImportUserListsJob(user: ThinUser, fileId: DriveFile['id']) { return dbQueue.add('importUserLists', { user: user, @@ -200,12 +229,17 @@ export function createCleanRemoteFilesJob() { } export default function() { - if (!envOption.onlyServer) { - deliverQueue.process(config.deliverJobConcurrency || 128, processDeliver); - inboxQueue.process(config.inboxJobConcurrency || 16, processInbox); - processDb(dbQueue); - procesObjectStorage(objectStorageQueue); - } + if (envOption.onlyServer) return; + + deliverQueue.process(config.deliverJobConcurrency || 128, processDeliver); + inboxQueue.process(config.inboxJobConcurrency || 16, processInbox); + processDb(dbQueue); + procesObjectStorage(objectStorageQueue); + + systemQueue.add('resyncCharts', { + }, { + repeat: { cron: '0 0 * * *' } + }); } export function destroy() { diff --git a/src/queue/processors/db/import-blocking.ts b/src/queue/processors/db/import-blocking.ts new file mode 100644 index 0000000000..9951da669d --- /dev/null +++ b/src/queue/processors/db/import-blocking.ts @@ -0,0 +1,74 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import { parseAcct } from '@/misc/acct'; +import { resolveUser } from '@/remote/resolve-user'; +import { downloadTextFile } from '@/misc/download-text-file'; +import { isSelfHost, toPuny } from '@/misc/convert-host'; +import { Users, DriveFiles, Blockings } from '@/models/index'; +import { DbUserImportJobData } from '@/queue/types'; +import block from '@/services/blocking/create'; + +const logger = queueLogger.createSubLogger('import-blocking'); + +export async function importBlocking(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> { + logger.info(`Importing blocking of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + const file = await DriveFiles.findOne({ + id: job.data.fileId + }); + if (file == null) { + done(); + return; + } + + const csv = await downloadTextFile(file.url); + + let linenum = 0; + + for (const line of csv.trim().split('\n')) { + linenum++; + + try { + const acct = line.split(',')[0].trim(); + const { username, host } = parseAcct(acct); + + let target = isSelfHost(host!) ? await Users.findOne({ + host: null, + usernameLower: username.toLowerCase() + }) : await Users.findOne({ + host: toPuny(host!), + usernameLower: username.toLowerCase() + }); + + if (host == null && target == null) continue; + + if (target == null) { + target = await resolveUser(username, host); + } + + if (target == null) { + throw `cannot resolve user: @${username}@${host}`; + } + + // skip myself + if (target.id === job.data.user.id) continue; + + logger.info(`Block[${linenum}] ${target.id} ...`); + + await block(user, target); + } catch (e) { + logger.warn(`Error in line:${linenum} ${e}`); + } + } + + logger.succ('Imported'); + done(); +} + diff --git a/src/queue/processors/db/import-muting.ts b/src/queue/processors/db/import-muting.ts new file mode 100644 index 0000000000..798f03a627 --- /dev/null +++ b/src/queue/processors/db/import-muting.ts @@ -0,0 +1,83 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import { parseAcct } from '@/misc/acct'; +import { resolveUser } from '@/remote/resolve-user'; +import { downloadTextFile } from '@/misc/download-text-file'; +import { isSelfHost, toPuny } from '@/misc/convert-host'; +import { Users, DriveFiles, Mutings } from '@/models/index'; +import { DbUserImportJobData } from '@/queue/types'; +import { User } from '@/models/entities/user'; +import { genId } from '@/misc/gen-id'; + +const logger = queueLogger.createSubLogger('import-muting'); + +export async function importMuting(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> { + logger.info(`Importing muting of ${job.data.user.id} ...`); + + const user = await Users.findOne(job.data.user.id); + if (user == null) { + done(); + return; + } + + const file = await DriveFiles.findOne({ + id: job.data.fileId + }); + if (file == null) { + done(); + return; + } + + const csv = await downloadTextFile(file.url); + + let linenum = 0; + + for (const line of csv.trim().split('\n')) { + linenum++; + + try { + const acct = line.split(',')[0].trim(); + const { username, host } = parseAcct(acct); + + let target = isSelfHost(host!) ? await Users.findOne({ + host: null, + usernameLower: username.toLowerCase() + }) : await Users.findOne({ + host: toPuny(host!), + usernameLower: username.toLowerCase() + }); + + if (host == null && target == null) continue; + + if (target == null) { + target = await resolveUser(username, host); + } + + if (target == null) { + throw `cannot resolve user: @${username}@${host}`; + } + + // skip myself + if (target.id === job.data.user.id) continue; + + logger.info(`Mute[${linenum}] ${target.id} ...`); + + await mute(user, target); + } catch (e) { + logger.warn(`Error in line:${linenum} ${e}`); + } + } + + logger.succ('Imported'); + done(); +} + +async function mute(user: User, target: User) { + await Mutings.insert({ + id: genId(), + createdAt: new Date(), + muterId: user.id, + muteeId: target.id, + }); +} diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts index b051a28e0b..97087642b7 100644 --- a/src/queue/processors/db/index.ts +++ b/src/queue/processors/db/index.ts @@ -9,6 +9,8 @@ import { exportUserLists } from './export-user-lists'; import { importFollowing } from './import-following'; import { importUserLists } from './import-user-lists'; import { deleteAccount } from './delete-account'; +import { importMuting } from './import-muting'; +import { importBlocking } from './import-blocking'; const jobs = { deleteDriveFiles, @@ -18,6 +20,8 @@ const jobs = { exportBlocking, exportUserLists, importFollowing, + importMuting, + importBlocking, importUserLists, deleteAccount, } as Record<string, Bull.ProcessCallbackFunction<DbJobData> | Bull.ProcessPromiseFunction<DbJobData>>; diff --git a/src/queue/processors/system/index.ts b/src/queue/processors/system/index.ts new file mode 100644 index 0000000000..52b7868105 --- /dev/null +++ b/src/queue/processors/system/index.ts @@ -0,0 +1,12 @@ +import * as Bull from 'bull'; +import { resyncCharts } from './resync-charts'; + +const jobs = { + resyncCharts, +} as Record<string, Bull.ProcessCallbackFunction<{}> | Bull.ProcessPromiseFunction<{}>>; + +export default function(dbQueue: Bull.Queue<{}>) { + for (const [k, v] of Object.entries(jobs)) { + dbQueue.process(k, v); + } +} diff --git a/src/queue/processors/system/resync-charts.ts b/src/queue/processors/system/resync-charts.ts new file mode 100644 index 0000000000..b36b024cfb --- /dev/null +++ b/src/queue/processors/system/resync-charts.ts @@ -0,0 +1,21 @@ +import * as Bull from 'bull'; + +import { queueLogger } from '../../logger'; +import { driveChart, notesChart, usersChart } from '@/services/chart/index'; + +const logger = queueLogger.createSubLogger('resync-charts'); + +export default async function resyncCharts(job: Bull.Job<{}>, done: any): Promise<void> { + logger.info(`Resync charts...`); + + // TODO: ユーザーごとのチャートも更新する + // TODO: インスタンスごとのチャートも更新する + await Promise.all([ + driveChart.resync(), + notesChart.resync(), + usersChart.resync(), + ]); + + logger.succ(`All charts successfully resynced.`); + done(); +} diff --git a/src/queue/queues.ts b/src/queue/queues.ts index d8c09ef86e..a66a7ca451 100644 --- a/src/queue/queues.ts +++ b/src/queue/queues.ts @@ -2,6 +2,7 @@ import config from '@/config/index'; import { initialize as initializeQueue } from './initialize'; import { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData } from './types'; +export const systemQueue = initializeQueue<{}>('system'); export const deliverQueue = initializeQueue<DeliverJobData>('deliver', config.deliverJobPerSec || 128); export const inboxQueue = initializeQueue<InboxJobData>('inbox', config.inboxJobPerSec || 16); export const dbQueue = initializeQueue<DbJobData>('db'); |