diff options
| author | ha-dai <contact@haradai.net> | 2018-05-04 02:49:46 +0900 |
|---|---|---|
| committer | ha-dai <contact@haradai.net> | 2018-05-04 02:49:46 +0900 |
| commit | f850283147072c681df1b39c57f8bd0b14f18016 (patch) | |
| tree | 63ff533c91097da2d8ca2070fc67a28f67ee33da /src/queue | |
| parent | Merge branch 'master' of github.com:syuilo/misskey (diff) | |
| parent | 1.7.0 (diff) | |
| download | misskey-f850283147072c681df1b39c57f8bd0b14f18016.tar.gz misskey-f850283147072c681df1b39c57f8bd0b14f18016.tar.bz2 misskey-f850283147072c681df1b39c57f8bd0b14f18016.zip | |
Merge branch 'master' of github.com:syuilo/misskey
Diffstat (limited to 'src/queue')
| -rw-r--r-- | src/queue/index.ts | 44 | ||||
| -rw-r--r-- | src/queue/processors/http/deliver.ts | 19 | ||||
| -rw-r--r-- | src/queue/processors/http/index.ts | 18 | ||||
| -rw-r--r-- | src/queue/processors/http/process-inbox.ts | 71 |
4 files changed, 152 insertions, 0 deletions
diff --git a/src/queue/index.ts b/src/queue/index.ts new file mode 100644 index 0000000000..32fd043f79 --- /dev/null +++ b/src/queue/index.ts @@ -0,0 +1,44 @@ +import { createQueue } from 'kue'; + +import config from '../config'; +import http from './processors/http'; + +const queue = createQueue({ + redis: { + port: config.redis.port, + host: config.redis.host, + auth: config.redis.pass + } +}); + +export function createHttp(data) { + return queue + .create('http', data) + .removeOnComplete(true) + .events(false) + .attempts(8) + .backoff({ delay: 16384, type: 'exponential' }); +} + +export function deliver(user, content, to) { + createHttp({ + title: 'deliver', + type: 'deliver', + user, + content, + to + }).save(); +} + +export default function() { + /* + 256 is the default concurrency limit of Mozilla Firefox and Google + Chromium. + a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google + https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff + Network.http.max-connections - MozillaZine Knowledge Base + http://kb.mozillazine.org/Network.http.max-connections + */ + //queue.process('http', 256, http); + queue.process('http', 128, http); +} diff --git a/src/queue/processors/http/deliver.ts b/src/queue/processors/http/deliver.ts new file mode 100644 index 0000000000..cf843fad07 --- /dev/null +++ b/src/queue/processors/http/deliver.ts @@ -0,0 +1,19 @@ +import * as kue from 'kue'; + +import request from '../../../remote/activitypub/request'; + +export default async (job: kue.Job, done): Promise<void> => { + try { + await request(job.data.user, job.data.to, job.data.content); + done(); + } catch (res) { + if (res.statusCode >= 400 && res.statusCode < 500) { + // HTTPステータスコード4xxはクライアントエラーであり、それはつまり + // 何回再送しても成功することはないということなのでエラーにはしないでおく + done(); + } else { + console.warn(`deliver failed: ${res.statusMessage}`); + done(new Error(res.statusMessage)); + } + } +}; diff --git a/src/queue/processors/http/index.ts b/src/queue/processors/http/index.ts new file mode 100644 index 0000000000..6f8d1dbc2b --- /dev/null +++ b/src/queue/processors/http/index.ts @@ -0,0 +1,18 @@ +import deliver from './deliver'; +import processInbox from './process-inbox'; + +const handlers = { + deliver, + processInbox, +}; + +export default (job, done) => { + const handler = handlers[job.data.type]; + + if (handler) { + handler(job, done); + } else { + console.error(`Unknown job: ${job.data.type}`); + done(); + } +}; diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts new file mode 100644 index 0000000000..dfafe64a78 --- /dev/null +++ b/src/queue/processors/http/process-inbox.ts @@ -0,0 +1,71 @@ +import * as kue from 'kue'; +import * as debug from 'debug'; + +const httpSignature = require('http-signature'); +import parseAcct from '../../../acct/parse'; +import User, { IRemoteUser } from '../../../models/user'; +import perform from '../../../remote/activitypub/perform'; +import { resolvePerson } from '../../../remote/activitypub/models/person'; + +const log = debug('misskey:queue:inbox'); + +// ユーザーのinboxにアクティビティが届いた時の処理 +export default async (job: kue.Job, done): Promise<void> => { + const signature = job.data.signature; + const activity = job.data.activity; + + //#region Log + const info = Object.assign({}, activity); + delete info['@context']; + delete info['signature']; + log(info); + //#endregion + + const keyIdLower = signature.keyId.toLowerCase(); + let user; + + if (keyIdLower.startsWith('acct:')) { + const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); + if (host === null) { + console.warn(`request was made by local user: @${username}`); + done(); + return; + } + + user = await User.findOne({ usernameLower: username, host: host.toLowerCase() }) as IRemoteUser; + + // アクティビティを送信してきたユーザーがまだMisskeyサーバーに登録されていなかったら登録する + if (user === null) { + user = await resolvePerson(activity.actor); + } + } else { + user = await User.findOne({ + host: { $ne: null }, + 'publicKey.id': signature.keyId + }) as IRemoteUser; + + // アクティビティを送信してきたユーザーがまだMisskeyサーバーに登録されていなかったら登録する + if (user === null) { + user = await resolvePerson(signature.keyId); + } + } + + if (user === null) { + done(new Error('failed to resolve user')); + return; + } + + if (!httpSignature.verifySignature(signature, user.publicKey.publicKeyPem)) { + console.warn('signature verification failed'); + done(); + return; + } + + // アクティビティを処理 + try { + await perform(user, activity); + done(); + } catch (e) { + done(e); + } +}; |