summaryrefslogtreecommitdiff
path: root/src/queue/index.ts
diff options
context:
space:
mode:
authorsyuilo <syuilotan@yahoo.co.jp>2019-03-07 23:07:21 +0900
committersyuilo <syuilotan@yahoo.co.jp>2019-03-07 23:07:21 +0900
commitc934987b14dc2b0c362f2d13e0664ab275aca522 (patch)
tree4a83b3a22745c9b5ad991a5f7c61d7acee0eba77 /src/queue/index.ts
parentUpdate issue templates (diff)
downloadsharkey-c934987b14dc2b0c362f2d13e0664ab275aca522.tar.gz
sharkey-c934987b14dc2b0c362f2d13e0664ab275aca522.tar.bz2
sharkey-c934987b14dc2b0c362f2d13e0664ab275aca522.zip
Resolve #4444
Diffstat (limited to 'src/queue/index.ts')
-rw-r--r--src/queue/index.ts149
1 files changed, 67 insertions, 82 deletions
diff --git a/src/queue/index.ts b/src/queue/index.ts
index 351a035ada..bb3b66908d 100644
--- a/src/queue/index.ts
+++ b/src/queue/index.ts
@@ -1,73 +1,64 @@
-import * as Queue from 'bee-queue';
+import * as Queue from 'bull';
import * as httpSignature from 'http-signature';
import config from '../config';
import { ILocalUser } from '../models/user';
import { program } from '../argv';
-import handler from './processors';
-import { queueLogger } from './logger';
-const enableQueue = !program.disableQueue;
-const enableQueueProcessing = !program.onlyServer && enableQueue;
-const queueAvailable = config.redis != null;
+import processDeliver from './processors/deliver';
+import processInbox from './processors/process-inbox';
+import processDb from './processors/db';
-const queue = initializeQueue();
-
-function initializeQueue() {
- if (queueAvailable && enableQueue) {
- return new Queue('misskey-queue', {
- redis: {
- port: config.redis.port,
- host: config.redis.host,
- password: config.redis.pass
- },
-
- removeOnSuccess: true,
- removeOnFailure: true,
- getEvents: false,
- sendEvents: false,
- storeJobs: false
- });
- } else {
- return null;
- }
+function initializeQueue(name: string) {
+ return new Queue(name, config.redis != null ? {
+ redis: {
+ port: config.redis.port,
+ host: config.redis.host,
+ password: config.redis.pass,
+ db: 1
+ }
+ } : null);
}
+const deliverQueue = initializeQueue('deliver');
+const inboxQueue = initializeQueue('inbox');
+const dbQueue = initializeQueue('db');
+
export function deliver(user: ILocalUser, content: any, to: any) {
- if (content == null) return;
+ if (content == null) return null;
const data = {
- type: 'deliver',
user,
content,
to
};
- if (queueAvailable && enableQueueProcessing) {
- return queue.createJob(data)
- .retries(8)
- .backoff('exponential', 1000)
- .save();
- } else {
- return handler({ data }, () => {});
- }
+ return deliverQueue.add(data, {
+ attempts: 4,
+ backoff: {
+ type: 'exponential',
+ delay: 1000
+ },
+ removeOnComplete: true,
+ removeOnFail: true
+ });
}
-export function processInbox(activity: any, signature: httpSignature.IParsedSignature) {
+export function inbox(activity: any, signature: httpSignature.IParsedSignature) {
const data = {
- type: 'processInbox',
activity: activity,
signature
};
- if (queueAvailable && enableQueueProcessing) {
- return queue.createJob(data)
- .retries(3)
- .backoff('exponential', 500)
- .save();
- } else {
- return handler({ data }, () => {});
- }
+ return inboxQueue.add(data, {
+ attempts: 4,
+ backoff: {
+ type: 'exponential',
+ delay: 1000
+ },
+ removeOnComplete: true,
+ removeOnFail: true
+ });
}
export function createDeleteNotesJob(user: ILocalUser) {
@@ -76,11 +67,10 @@ export function createDeleteNotesJob(user: ILocalUser) {
user: user
};
- if (queueAvailable && enableQueueProcessing) {
- return queue.createJob(data).save();
- } else {
- return handler({ data }, () => {});
- }
+ return dbQueue.add(data, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
}
export function createDeleteDriveFilesJob(user: ILocalUser) {
@@ -89,11 +79,10 @@ export function createDeleteDriveFilesJob(user: ILocalUser) {
user: user
};
- if (queueAvailable && enableQueueProcessing) {
- return queue.createJob(data).save();
- } else {
- return handler({ data }, () => {});
- }
+ return dbQueue.add(data, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
}
export function createExportNotesJob(user: ILocalUser) {
@@ -102,11 +91,10 @@ export function createExportNotesJob(user: ILocalUser) {
user: user
};
- if (queueAvailable && enableQueueProcessing) {
- return queue.createJob(data).save();
- } else {
- return handler({ data }, () => {});
- }
+ return dbQueue.add(data, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
}
export function createExportFollowingJob(user: ILocalUser) {
@@ -115,11 +103,10 @@ export function createExportFollowingJob(user: ILocalUser) {
user: user
};
- if (queueAvailable && enableQueueProcessing) {
- return queue.createJob(data).save();
- } else {
- return handler({ data }, () => {});
- }
+ return dbQueue.add(data, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
}
export function createExportMuteJob(user: ILocalUser) {
@@ -128,11 +115,10 @@ export function createExportMuteJob(user: ILocalUser) {
user: user
};
- if (queueAvailable && enableQueueProcessing) {
- return queue.createJob(data).save();
- } else {
- return handler({ data }, () => {});
- }
+ return dbQueue.add(data, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
}
export function createExportBlockingJob(user: ILocalUser) {
@@ -141,24 +127,23 @@ export function createExportBlockingJob(user: ILocalUser) {
user: user
};
- if (queueAvailable && enableQueueProcessing) {
- return queue.createJob(data).save();
- } else {
- return handler({ data }, () => {});
- }
+ return dbQueue.add(data, {
+ removeOnComplete: true,
+ removeOnFail: true
+ });
}
export default function() {
- if (queueAvailable && enableQueueProcessing) {
- queue.process(128, handler);
- queueLogger.succ('Processing started');
+ if (!program.onlyServer) {
+ deliverQueue.process(processDeliver);
+ inboxQueue.process(processInbox);
+ dbQueue.process(processDb);
}
-
- return queue;
}
export function destroy() {
+ /*
queue.destroy().then(n => {
queueLogger.succ(`All job removed (${n} jobs)`);
- });
+ });*/
}