diff options
| author | syuilo <syuilotan@yahoo.co.jp> | 2019-03-07 23:07:21 +0900 |
|---|---|---|
| committer | syuilo <syuilotan@yahoo.co.jp> | 2019-03-07 23:07:21 +0900 |
| commit | c934987b14dc2b0c362f2d13e0664ab275aca522 (patch) | |
| tree | 4a83b3a22745c9b5ad991a5f7c61d7acee0eba77 /src/queue/index.ts | |
| parent | Update issue templates (diff) | |
| download | sharkey-c934987b14dc2b0c362f2d13e0664ab275aca522.tar.gz sharkey-c934987b14dc2b0c362f2d13e0664ab275aca522.tar.bz2 sharkey-c934987b14dc2b0c362f2d13e0664ab275aca522.zip | |
Resolve #4444
Diffstat (limited to 'src/queue/index.ts')
| -rw-r--r-- | src/queue/index.ts | 149 |
1 files changed, 67 insertions, 82 deletions
diff --git a/src/queue/index.ts b/src/queue/index.ts index 351a035ada..bb3b66908d 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -1,73 +1,64 @@ -import * as Queue from 'bee-queue'; +import * as Queue from 'bull'; import * as httpSignature from 'http-signature'; import config from '../config'; import { ILocalUser } from '../models/user'; import { program } from '../argv'; -import handler from './processors'; -import { queueLogger } from './logger'; -const enableQueue = !program.disableQueue; -const enableQueueProcessing = !program.onlyServer && enableQueue; -const queueAvailable = config.redis != null; +import processDeliver from './processors/deliver'; +import processInbox from './processors/process-inbox'; +import processDb from './processors/db'; -const queue = initializeQueue(); - -function initializeQueue() { - if (queueAvailable && enableQueue) { - return new Queue('misskey-queue', { - redis: { - port: config.redis.port, - host: config.redis.host, - password: config.redis.pass - }, - - removeOnSuccess: true, - removeOnFailure: true, - getEvents: false, - sendEvents: false, - storeJobs: false - }); - } else { - return null; - } +function initializeQueue(name: string) { + return new Queue(name, config.redis != null ? { + redis: { + port: config.redis.port, + host: config.redis.host, + password: config.redis.pass, + db: 1 + } + } : null); } +const deliverQueue = initializeQueue('deliver'); +const inboxQueue = initializeQueue('inbox'); +const dbQueue = initializeQueue('db'); + export function deliver(user: ILocalUser, content: any, to: any) { - if (content == null) return; + if (content == null) return null; const data = { - type: 'deliver', user, content, to }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data) - .retries(8) - .backoff('exponential', 1000) - .save(); - } else { - return handler({ data }, () => {}); - } + return deliverQueue.add(data, { + attempts: 4, + backoff: { + type: 'exponential', + delay: 1000 + }, + removeOnComplete: true, + removeOnFail: true + }); } -export function processInbox(activity: any, signature: httpSignature.IParsedSignature) { +export function inbox(activity: any, signature: httpSignature.IParsedSignature) { const data = { - type: 'processInbox', activity: activity, signature }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data) - .retries(3) - .backoff('exponential', 500) - .save(); - } else { - return handler({ data }, () => {}); - } + return inboxQueue.add(data, { + attempts: 4, + backoff: { + type: 'exponential', + delay: 1000 + }, + removeOnComplete: true, + removeOnFail: true + }); } export function createDeleteNotesJob(user: ILocalUser) { @@ -76,11 +67,10 @@ export function createDeleteNotesJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createDeleteDriveFilesJob(user: ILocalUser) { @@ -89,11 +79,10 @@ export function createDeleteDriveFilesJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createExportNotesJob(user: ILocalUser) { @@ -102,11 +91,10 @@ export function createExportNotesJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createExportFollowingJob(user: ILocalUser) { @@ -115,11 +103,10 @@ export function createExportFollowingJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createExportMuteJob(user: ILocalUser) { @@ -128,11 +115,10 @@ export function createExportMuteJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export function createExportBlockingJob(user: ILocalUser) { @@ -141,24 +127,23 @@ export function createExportBlockingJob(user: ILocalUser) { user: user }; - if (queueAvailable && enableQueueProcessing) { - return queue.createJob(data).save(); - } else { - return handler({ data }, () => {}); - } + return dbQueue.add(data, { + removeOnComplete: true, + removeOnFail: true + }); } export default function() { - if (queueAvailable && enableQueueProcessing) { - queue.process(128, handler); - queueLogger.succ('Processing started'); + if (!program.onlyServer) { + deliverQueue.process(processDeliver); + inboxQueue.process(processInbox); + dbQueue.process(processDb); } - - return queue; } export function destroy() { + /* queue.destroy().then(n => { queueLogger.succ(`All job removed (${n} jobs)`); - }); + });*/ } |