summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/argv.ts6
-rw-r--r--src/index.ts80
-rw-r--r--src/queue/index.ts15
3 files changed, 71 insertions, 30 deletions
diff --git a/src/argv.ts b/src/argv.ts
index 2a9d324a58..31325d138d 100644
--- a/src/argv.ts
+++ b/src/argv.ts
@@ -5,11 +5,15 @@ program
.version(pkg.version)
.option('--no-daemons', 'Disable daemon processes (for debbuging)')
.option('--disable-clustering', 'Disable clustering')
- .option('--disable-queue', 'Disable job queue')
+ .option('--disable-queue', 'Disable job queue processing')
+ .option('--only-queue', 'Pocessing job queue only')
.option('--quiet', 'Suppress all logs')
.option('--verbose', 'Enable all logs')
.option('--slow', 'Delay all requests (for debbuging)')
.option('--color', 'This option is a dummy for some external program\'s (e.g. forever) issue.')
.parse(process.argv);
+if (process.env.MK_DISABLE_QUEUE) program.disableQueue = true;
+if (process.env.MK_ONLY_QUEUE) program.onlyQueue = true;
+
export { program };
diff --git a/src/index.ts b/src/index.ts
index c17f5ee70d..a55251d3be 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -35,6 +35,11 @@ const ev = new Xev();
function main() {
process.title = `Misskey (${cluster.isMaster ? 'master' : 'worker'})`;
+ if (program.onlyQueue) {
+ queueMain();
+ return;
+ }
+
if (cluster.isMaster || program.disableClustering) {
masterMain();
@@ -53,12 +58,7 @@ function main() {
}
}
-/**
- * Init master process
- */
-async function masterMain() {
- let config: Config;
-
+function greet() {
if (!program.quiet) {
//#region Misskey logo
const v = `v${pkg.version}`;
@@ -75,10 +75,34 @@ async function masterMain() {
bootLogger.info('Welcome to Misskey!');
bootLogger.info(`Misskey v${pkg.version}`, true);
bootLogger.info('Misskey is maintained by @syuilo, @AyaMorisawa, @mei23, and @acid-chicken.');
+}
+
+/**
+ * Init master process
+ */
+async function masterMain() {
+ greet();
+
+ let config: Config;
try {
// initialize app
config = await init();
+
+ if (config.port == null) {
+ bootLogger.error('The port is not configured. Please configure port.', true);
+ process.exit(1);
+ }
+
+ if (process.platform === 'linux' && isWellKnownPort(config.port) && !isRoot()) {
+ bootLogger.error('You need root privileges to listen on well-known port on Linux', true);
+ process.exit(1);
+ }
+
+ if (!await isPortAvailable(config.port)) {
+ bootLogger.error(`Port ${config.port} is already in use`, true);
+ process.exit(1);
+ }
} catch (e) {
bootLogger.error('Fatal error occurred during initialization', true);
process.exit(1);
@@ -90,6 +114,9 @@ async function masterMain() {
await spawnWorkers(config.clusterLimit);
}
+ // start queue
+ require('./queue').default();
+
bootLogger.succ(`Now listening on port ${config.port} on ${config.url}`, true);
}
@@ -100,15 +127,35 @@ async function workerMain() {
// start server
await require('./server').default();
- // start processor
- require('./queue').default();
-
if (cluster.isWorker) {
// Send a 'ready' message to parent process
process.send('ready');
}
}
+async function queueMain() {
+ greet();
+
+ try {
+ // initialize app
+ await init();
+ } catch (e) {
+ bootLogger.error('Fatal error occurred during initialization', true);
+ process.exit(1);
+ }
+
+ bootLogger.succ('Misskey initialized');
+
+ // start processor
+ const queue = require('./queue').default();
+
+ if (queue) {
+ bootLogger.succ('Queue started', true);
+ } else {
+ bootLogger.error('Queue not available');
+ }
+}
+
const runningNodejsVersion = process.version.slice(1).split('.').map(x => parseInt(x, 10));
const requiredNodejsVersion = [10, 0, 0];
const satisfyNodejsVersion = !lessThan(runningNodejsVersion, requiredNodejsVersion);
@@ -170,21 +217,6 @@ async function init(): Promise<Config> {
configLogger.succ('Loaded');
- if (config.port == null) {
- bootLogger.error('The port is not configured. Please configure port.', true);
- process.exit(1);
- }
-
- if (process.platform === 'linux' && isWellKnownPort(config.port) && !isRoot()) {
- bootLogger.error('You need root privileges to listen on well-known port on Linux', true);
- process.exit(1);
- }
-
- if (!await isPortAvailable(config.port)) {
- bootLogger.error(`Port ${config.port} is already in use`, true);
- process.exit(1);
- }
-
// Try to connect to MongoDB
try {
await checkMongoDB(config, bootLogger);
diff --git a/src/queue/index.ts b/src/queue/index.ts
index cf8af17a48..161b8f9b24 100644
--- a/src/queue/index.ts
+++ b/src/queue/index.ts
@@ -4,13 +4,15 @@ import config from '../config';
import { ILocalUser } from '../models/user';
import { program } from '../argv';
import handler from './processors';
+import { queueLogger } from './logger';
-const enableQueue = config.redis != null && !program.disableQueue;
+const enableQueue = !program.disableQueue;
+const queueAvailable = config.redis != null;
const queue = initializeQueue();
function initializeQueue() {
- if (enableQueue) {
+ if (queueAvailable) {
return new Queue('misskey', {
redis: {
port: config.redis.port,
@@ -30,7 +32,7 @@ function initializeQueue() {
}
export function createHttpJob(data: any) {
- if (enableQueue) {
+ if (queueAvailable) {
return queue.createJob(data)
.retries(4)
.backoff('exponential', 16384) // 16s
@@ -52,7 +54,7 @@ export function deliver(user: ILocalUser, content: any, to: any) {
}
export function createExportNotesJob(user: ILocalUser) {
- if (!enableQueue) throw 'queue disabled';
+ if (!queueAvailable) throw 'queue unavailable';
return queue.createJob({
type: 'exportNotes',
@@ -62,7 +64,10 @@ export function createExportNotesJob(user: ILocalUser) {
}
export default function() {
- if (enableQueue) {
+ if (queueAvailable && enableQueue) {
queue.process(128, handler);
+ queueLogger.succ('Processing started');
}
+
+ return queue;
}