1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
import * as Queue from 'bee-queue';
import * as httpSignature from 'http-signature';
import config from '../config';
import { ILocalUser } from '../models/user';
import { program } from '../argv';
import handler from './processors';
import { queueLogger } from './logger';
const enableQueue = !program.disableQueue;
const enableQueueProcessing = !program.onlyServer && enableQueue;
const queueAvailable = config.redis != null;
const queue = initializeQueue();
function initializeQueue() {
if (queueAvailable && enableQueue) {
return new Queue('misskey', {
redis: {
port: config.redis.port,
host: config.redis.host,
password: config.redis.pass
},
removeOnSuccess: true,
removeOnFailure: true,
getEvents: false,
sendEvents: false,
storeJobs: false
});
} else {
return null;
}
}
export function deliver(user: ILocalUser, content: any, to: any) {
if (content == null) return;
const data = {
type: 'deliver',
user,
content,
to
};
if (queueAvailable && enableQueueProcessing) {
return queue.createJob(data)
.retries(8)
.backoff('exponential', 1000)
.save();
} else {
return handler({ data }, () => {});
}
}
export function processInbox(activity: any, signature: httpSignature.IParsedSignature) {
const data = {
type: 'processInbox',
activity: activity,
signature
};
if (queueAvailable && enableQueueProcessing) {
return queue.createJob(data)
.retries(3)
.backoff('exponential', 500)
.save();
} else {
return handler({ data }, () => {});
}
}
export function createExportNotesJob(user: ILocalUser) {
const data = {
type: 'exportNotes',
user: user
};
if (queueAvailable && enableQueueProcessing) {
return queue.createJob(data).save();
} else {
return handler({ data }, () => {});
}
}
export function createExportFollowingJob(user: ILocalUser) {
const data = {
type: 'exportFollowing',
user: user
};
if (queueAvailable && enableQueueProcessing) {
return queue.createJob(data).save();
} else {
return handler({ data }, () => {});
}
}
export function createExportMuteJob(user: ILocalUser) {
const data = {
type: 'exportMute',
user: user
};
if (queueAvailable && enableQueueProcessing) {
return queue.createJob(data).save();
} else {
return handler({ data }, () => {});
}
}
export function createExportBlockingJob(user: ILocalUser) {
const data = {
type: 'exportBlocking',
user: user
};
if (queueAvailable && enableQueueProcessing) {
return queue.createJob(data).save();
} else {
return handler({ data }, () => {});
}
}
export default function() {
if (queueAvailable && enableQueueProcessing) {
queue.process(128, handler);
queueLogger.succ('Processing started');
}
return queue;
}
export function destroy() {
queue.destroy().then(n => {
queueLogger.succ(`All job removed (${n} jobs)`);
});
}
|