diff options
Diffstat (limited to 'src/queue/processors')
| -rw-r--r-- | src/queue/processors/http/deliver.ts | 19 | ||||
| -rw-r--r-- | src/queue/processors/http/index.ts | 18 | ||||
| -rw-r--r-- | src/queue/processors/http/process-inbox.ts | 66 |
3 files changed, 103 insertions, 0 deletions
diff --git a/src/queue/processors/http/deliver.ts b/src/queue/processors/http/deliver.ts new file mode 100644 index 0000000000..cf843fad07 --- /dev/null +++ b/src/queue/processors/http/deliver.ts @@ -0,0 +1,19 @@ +import * as kue from 'kue'; + +import request from '../../../remote/activitypub/request'; + +export default async (job: kue.Job, done): Promise<void> => { + try { + await request(job.data.user, job.data.to, job.data.content); + done(); + } catch (res) { + if (res.statusCode >= 400 && res.statusCode < 500) { + // HTTPステータスコード4xxはクライアントエラーであり、それはつまり + // 何回再送しても成功することはないということなのでエラーにはしないでおく + done(); + } else { + console.warn(`deliver failed: ${res.statusMessage}`); + done(new Error(res.statusMessage)); + } + } +}; diff --git a/src/queue/processors/http/index.ts b/src/queue/processors/http/index.ts new file mode 100644 index 0000000000..6f8d1dbc2b --- /dev/null +++ b/src/queue/processors/http/index.ts @@ -0,0 +1,18 @@ +import deliver from './deliver'; +import processInbox from './process-inbox'; + +const handlers = { + deliver, + processInbox, +}; + +export default (job, done) => { + const handler = handlers[job.data.type]; + + if (handler) { + handler(job, done); + } else { + console.error(`Unknown job: ${job.data.type}`); + done(); + } +}; diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts new file mode 100644 index 0000000000..ce5b7d5a89 --- /dev/null +++ b/src/queue/processors/http/process-inbox.ts @@ -0,0 +1,66 @@ +import * as kue from 'kue'; +import * as debug from 'debug'; + +import { verifySignature } from 'http-signature'; +import parseAcct from '../../../acct/parse'; +import User, { IRemoteUser } from '../../../models/user'; +import perform from '../../../remote/activitypub/perform'; +import { resolvePerson } from '../../../remote/activitypub/objects/person'; + +const log = debug('misskey:queue:inbox'); + +// ユーザーのinboxにアクティビティが届いた時の処理 +export default async (job: kue.Job, done): Promise<void> => { + const signature = job.data.signature; + const activity = job.data.activity; + + //#region Log + const info = Object.assign({}, activity); + delete info['@context']; + delete info['signature']; + log(info); + //#endregion + + const keyIdLower = signature.keyId.toLowerCase(); + let user; + + if (keyIdLower.startsWith('acct:')) { + const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); + if (host === null) { + console.warn(`request was made by local user: @${username}`); + done(); + return; + } + + user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser; + } else { + user = await User.findOne({ + host: { $ne: null }, + 'publicKey.id': signature.keyId + }) as IRemoteUser; + + // アクティビティを送信してきたユーザーがまだMisskeyサーバーに登録されていなかったら登録する + if (user === null) { + user = await resolvePerson(signature.keyId); + } + } + + if (user === null) { + done(new Error('failed to resolve user')); + return; + } + + if (!verifySignature(signature, user.publicKey.publicKeyPem)) { + console.warn('signature verification failed'); + done(); + return; + } + + // アクティビティを処理 + try { + await perform(user, activity); + done(); + } catch (e) { + done(e); + } +}; |