summaryrefslogtreecommitdiff
path: root/src/queue/index.ts
blob: 28768bf38f3ff7e5f6d3d99231dbe2a95fba1d03 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import * as Queue from 'bee-queue';
import * as httpSignature from 'http-signature';

import config from '../config';
import { ILocalUser } from '../models/user';
import { program } from '../argv';
import handler from './processors';
import { queueLogger } from './logger';

const enableQueue = !program.disableQueue;
const queueAvailable = config.redis.isJust();

const queue = initializeQueue();

function initializeQueue() {
	return config.redis.map(({ port, host, pass }) => {
		return new Queue('misskey', {
			redis: {
				port: port,
				host: host,
				password: pass.getOrElse(null)
			},

			removeOnSuccess: true,
			removeOnFailure: true,
			getEvents: false,
			sendEvents: false,
			storeJobs: false
		});
	}).getOrElse(null);
}

export function deliver(user: ILocalUser, content: any, to: any) {
	if (content == null) return;

	const data = {
		type: 'deliver',
		user,
		content,
		to
	};

	if (queueAvailable && !program.disableApQueue) {
		return queue.createJob(data)
			.retries(8)
			.backoff('exponential', 1000)
			.save();
	} else {
		return handler({ data }, () => {});
	}
}

export function processInbox(activity: any, signature: httpSignature.IParsedSignature) {
	const data = {
		type: 'processInbox',
		activity: activity,
		signature
	};

	if (queueAvailable && !program.disableApQueue) {
		return queue.createJob(data)
			.retries(3)
			.backoff('exponential', 500)
			.save();
	} else {
		return handler({ data }, () => {});
	}
}

export function createExportNotesJob(user: ILocalUser) {
	if (!queueAvailable) throw 'queue unavailable';

	return queue.createJob({
		type: 'exportNotes',
		user: user
	})
		.save();
}

export function createExportFollowingJob(user: ILocalUser) {
	if (!queueAvailable) throw 'queue unavailable';

	return queue.createJob({
		type: 'exportFollowing',
		user: user
	})
		.save();
}

export function createExportMuteJob(user: ILocalUser) {
	if (!queueAvailable) throw 'queue unavailable';

	return queue.createJob({
		type: 'exportMute',
		user: user
	})
		.save();
}

export function createExportBlockingJob(user: ILocalUser) {
	if (!queueAvailable) throw 'queue unavailable';

	return queue.createJob({
		type: 'exportBlocking',
		user: user
	})
		.save();
}

export default function() {
	if (queueAvailable && enableQueue) {
		queue.process(128, handler);
		queueLogger.succ('Processing started');
	}

	return queue;
}

export function destroy() {
	queue.destroy().then(n => {
		queueLogger.succ(`All job removed (${n} jobs)`);
	});
}