summaryrefslogtreecommitdiff
path: root/src/queue
diff options
context:
space:
mode:
authorha-dai <contact@haradai.net>2018-05-04 02:49:46 +0900
committerha-dai <contact@haradai.net>2018-05-04 02:49:46 +0900
commitf850283147072c681df1b39c57f8bd0b14f18016 (patch)
tree63ff533c91097da2d8ca2070fc67a28f67ee33da /src/queue
parentMerge branch 'master' of github.com:syuilo/misskey (diff)
parent1.7.0 (diff)
downloadmisskey-f850283147072c681df1b39c57f8bd0b14f18016.tar.gz
misskey-f850283147072c681df1b39c57f8bd0b14f18016.tar.bz2
misskey-f850283147072c681df1b39c57f8bd0b14f18016.zip
Merge branch 'master' of github.com:syuilo/misskey
Diffstat (limited to 'src/queue')
-rw-r--r--src/queue/index.ts44
-rw-r--r--src/queue/processors/http/deliver.ts19
-rw-r--r--src/queue/processors/http/index.ts18
-rw-r--r--src/queue/processors/http/process-inbox.ts71
4 files changed, 152 insertions, 0 deletions
diff --git a/src/queue/index.ts b/src/queue/index.ts
new file mode 100644
index 0000000000..32fd043f79
--- /dev/null
+++ b/src/queue/index.ts
@@ -0,0 +1,44 @@
+import { createQueue } from 'kue';
+
+import config from '../config';
+import http from './processors/http';
+
+const queue = createQueue({
+ redis: {
+ port: config.redis.port,
+ host: config.redis.host,
+ auth: config.redis.pass
+ }
+});
+
+export function createHttp(data) {
+ return queue
+ .create('http', data)
+ .removeOnComplete(true)
+ .events(false)
+ .attempts(8)
+ .backoff({ delay: 16384, type: 'exponential' });
+}
+
+export function deliver(user, content, to) {
+ createHttp({
+ title: 'deliver',
+ type: 'deliver',
+ user,
+ content,
+ to
+ }).save();
+}
+
+export default function() {
+ /*
+ 256 is the default concurrency limit of Mozilla Firefox and Google
+ Chromium.
+ a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google
+ https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff
+ Network.http.max-connections - MozillaZine Knowledge Base
+ http://kb.mozillazine.org/Network.http.max-connections
+ */
+ //queue.process('http', 256, http);
+ queue.process('http', 128, http);
+}
diff --git a/src/queue/processors/http/deliver.ts b/src/queue/processors/http/deliver.ts
new file mode 100644
index 0000000000..cf843fad07
--- /dev/null
+++ b/src/queue/processors/http/deliver.ts
@@ -0,0 +1,19 @@
+import * as kue from 'kue';
+
+import request from '../../../remote/activitypub/request';
+
+export default async (job: kue.Job, done): Promise<void> => {
+ try {
+ await request(job.data.user, job.data.to, job.data.content);
+ done();
+ } catch (res) {
+ if (res.statusCode >= 400 && res.statusCode < 500) {
+ // HTTPステータスコード4xxはクライアントエラーであり、それはつまり
+ // 何回再送しても成功することはないということなのでエラーにはしないでおく
+ done();
+ } else {
+ console.warn(`deliver failed: ${res.statusMessage}`);
+ done(new Error(res.statusMessage));
+ }
+ }
+};
diff --git a/src/queue/processors/http/index.ts b/src/queue/processors/http/index.ts
new file mode 100644
index 0000000000..6f8d1dbc2b
--- /dev/null
+++ b/src/queue/processors/http/index.ts
@@ -0,0 +1,18 @@
+import deliver from './deliver';
+import processInbox from './process-inbox';
+
+const handlers = {
+ deliver,
+ processInbox,
+};
+
+export default (job, done) => {
+ const handler = handlers[job.data.type];
+
+ if (handler) {
+ handler(job, done);
+ } else {
+ console.error(`Unknown job: ${job.data.type}`);
+ done();
+ }
+};
diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts
new file mode 100644
index 0000000000..dfafe64a78
--- /dev/null
+++ b/src/queue/processors/http/process-inbox.ts
@@ -0,0 +1,71 @@
+import * as kue from 'kue';
+import * as debug from 'debug';
+
+const httpSignature = require('http-signature');
+import parseAcct from '../../../acct/parse';
+import User, { IRemoteUser } from '../../../models/user';
+import perform from '../../../remote/activitypub/perform';
+import { resolvePerson } from '../../../remote/activitypub/models/person';
+
+const log = debug('misskey:queue:inbox');
+
+// ユーザーのinboxにアクティビティが届いた時の処理
+export default async (job: kue.Job, done): Promise<void> => {
+ const signature = job.data.signature;
+ const activity = job.data.activity;
+
+ //#region Log
+ const info = Object.assign({}, activity);
+ delete info['@context'];
+ delete info['signature'];
+ log(info);
+ //#endregion
+
+ const keyIdLower = signature.keyId.toLowerCase();
+ let user;
+
+ if (keyIdLower.startsWith('acct:')) {
+ const { username, host } = parseAcct(keyIdLower.slice('acct:'.length));
+ if (host === null) {
+ console.warn(`request was made by local user: @${username}`);
+ done();
+ return;
+ }
+
+ user = await User.findOne({ usernameLower: username, host: host.toLowerCase() }) as IRemoteUser;
+
+ // アクティビティを送信してきたユーザーがまだMisskeyサーバーに登録されていなかったら登録する
+ if (user === null) {
+ user = await resolvePerson(activity.actor);
+ }
+ } else {
+ user = await User.findOne({
+ host: { $ne: null },
+ 'publicKey.id': signature.keyId
+ }) as IRemoteUser;
+
+ // アクティビティを送信してきたユーザーがまだMisskeyサーバーに登録されていなかったら登録する
+ if (user === null) {
+ user = await resolvePerson(signature.keyId);
+ }
+ }
+
+ if (user === null) {
+ done(new Error('failed to resolve user'));
+ return;
+ }
+
+ if (!httpSignature.verifySignature(signature, user.publicKey.publicKeyPem)) {
+ console.warn('signature verification failed');
+ done();
+ return;
+ }
+
+ // アクティビティを処理
+ try {
+ await perform(user, activity);
+ done();
+ } catch (e) {
+ done(e);
+ }
+};