From c934987b14dc2b0c362f2d13e0664ab275aca522 Mon Sep 17 00:00:00 2001 From: syuilo Date: Thu, 7 Mar 2019 23:07:21 +0900 Subject: Resolve #4444 --- src/queue/index.ts | 153 ++++++++++++++++++++++++----------------------------- 1 file changed, 69 insertions(+), 84 deletions(-) (limited to 'src/queue/index.ts') diff --git a/src/queue/index.ts b/src/queue/index.ts index 351a035ada..bb3b66908d 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -1,73 +1,64 @@ -import * as Queue from 'bee-queue'; +import * as Queue from 'bull'; import * as httpSignature from 'http-signature'; import config from '../config'; import { ILocalUser } from '../models/user'; import { program } from '../argv'; -import handler from './processors'; -import { queueLogger } from './logger'; - -const enableQueue = !program.disableQueue; -const enableQueueProcessing = !program.onlyServer && enableQueue; -const queueAvailable = config.redis != null; - -const queue = initializeQueue(); - -function initializeQueue() { - if (queueAvailable && enableQueue) { - return new Queue('misskey-queue', { - redis: { - port: config.redis.port, - host: config.redis.host, - password: config.redis.pass - }, - - removeOnSuccess: true, - removeOnFailure: true, - getEvents: false, - sendEvents: false, - storeJobs: false - }); - } else { - return null; - } + +import processDeliver from './processors/deliver'; +import processInbox from './processors/process-inbox'; +import processDb from './processors/db'; + +function initializeQueue(name: string) { + return new Queue(name, config.redis != null ? { + redis: { + port: config.redis.port, + host: config.redis.host, + password: config.redis.pass, + db: 1 + } + } : null); } +const deliverQueue = initializeQueue('deliver'); +const inboxQueue = initializeQueue('inbox'); +const dbQueue = initializeQueue('db'); + export function deliver(user: ILocalUser, content: any, to: any) { - if (content == null) return; + if (content == null) return null; const data = { - type: 'deliver', user, content, to }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data) - .retries(8) - .backoff('exponential', 1000) - .save(); - } else { - return handler({ data }, () => {}); - } + return deliverQueue.add(data, { + attempts: 4, + backoff: { + type: 'exponential', + delay: 1000 + }, + removeOnComplete: true, + removeOnFail: true + }); } -export function processInbox(activity: any, signature: httpSignature.IParsedSignature) { +export function inbox(activity: any, signature: httpSignature.IParsedSignature) { const data = { - type: 'processInbox', activity: activity, signature }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data) - .retries(3) - .backoff('exponential', 500) - .save(); - } else { - return handler({ data }, () => {}); - } + return inboxQueue.add(data, { + attempts: 4, + backoff: { + type: 'exponential', + delay: 1000 + }, + removeOnComplete: true, + removeOnFail: true + }); } export function createDeleteNotesJob(user: ILocalUser) { @@ -76,11 +67,10 @@ export function createDeleteNotesJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createDeleteDriveFilesJob(user: ILocalUser) { @@ -89,11 +79,10 @@ export function createDeleteDriveFilesJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createExportNotesJob(user: ILocalUser) { @@ -102,11 +91,10 @@ export function createExportNotesJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createExportFollowingJob(user: ILocalUser) { @@ -115,11 +103,10 @@ export function createExportFollowingJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createExportMuteJob(user: ILocalUser) { @@ -128,11 +115,10 @@ export function createExportMuteJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createExportBlockingJob(user: ILocalUser) { @@ -141,24 +127,23 @@ export function createExportBlockingJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export default function() { - if (queueAvailable && enableQueueProcessing) { - queue.process(128, handler); - queueLogger.succ('Processing started'); + if (!program.onlyServer) { + deliverQueue.process(processDeliver); + inboxQueue.process(processInbox); + dbQueue.process(processDb); } - - return queue; } export function destroy() { + /* queue.destroy().then(n => { queueLogger.succ(`All job removed (${n} jobs)`); - }); + });*/ } -- cgit v1.2.3-freya