diff options
Diffstat (limited to '')
| -rw-r--r-- | src/queue.ts | 10 | ||||
| -rw-r--r-- | src/queue/index.ts | 37 | ||||
| -rw-r--r-- | src/queue/processors/db/delete-post-dependents.ts (renamed from src/processor/db/delete-post-dependents.ts) | 0 | ||||
| -rw-r--r-- | src/queue/processors/db/index.ts (renamed from src/processor/db/index.ts) | 0 | ||||
| -rw-r--r-- | src/queue/processors/http/deliver.ts | 17 | ||||
| -rw-r--r-- | src/queue/processors/http/follow.ts (renamed from src/processor/http/follow.ts) | 0 | ||||
| -rw-r--r-- | src/queue/processors/http/index.ts (renamed from src/processor/http/index.ts) | 0 | ||||
| -rw-r--r-- | src/queue/processors/http/perform-activitypub.ts (renamed from src/processor/http/perform-activitypub.ts) | 0 | ||||
| -rw-r--r-- | src/queue/processors/http/process-inbox.ts | 55 | ||||
| -rw-r--r-- | src/queue/processors/http/report-github-failure.ts (renamed from src/processor/http/report-github-failure.ts) | 0 | ||||
| -rw-r--r-- | src/queue/processors/http/unfollow.ts (renamed from src/processor/http/unfollow.ts) | 0 | ||||
| -rw-r--r-- | src/queue/processors/index.ts (renamed from src/processor/index.ts) | 0 |
12 files changed, 109 insertions, 10 deletions
diff --git a/src/queue.ts b/src/queue.ts deleted file mode 100644 index 08ea13c2a3..0000000000 --- a/src/queue.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { createQueue } from 'kue'; -import config from './config'; - -export default createQueue({ - redis: { - port: config.redis.port, - host: config.redis.host, - auth: config.redis.pass - } -}); diff --git a/src/queue/index.ts b/src/queue/index.ts new file mode 100644 index 0000000000..c8c436b18c --- /dev/null +++ b/src/queue/index.ts @@ -0,0 +1,37 @@ +import { createQueue } from 'kue'; +import config from '../config'; +import db from './processors/db'; +import http from './processors/http'; + +const queue = createQueue({ + redis: { + port: config.redis.port, + host: config.redis.host, + auth: config.redis.pass + } +}); + +export function createHttp(data) { + return queue + .create('http', data) + .attempts(16) + .backoff({ delay: 16384, type: 'exponential' }); +} + +export function createDb(data) { + return queue.create('db', data); +} + +export function process() { + queue.process('db', db); + + /* + 256 is the default concurrency limit of Mozilla Firefox and Google + Chromium. + a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google + https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff + Network.http.max-connections - MozillaZine Knowledge Base + http://kb.mozillazine.org/Network.http.max-connections + */ + queue.process('http', 256, http); +} diff --git a/src/processor/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts index 879c41ec9c..879c41ec9c 100644 --- a/src/processor/db/delete-post-dependents.ts +++ b/src/queue/processors/db/delete-post-dependents.ts diff --git a/src/processor/db/index.ts b/src/queue/processors/db/index.ts index 75838c099b..75838c099b 100644 --- a/src/processor/db/index.ts +++ b/src/queue/processors/db/index.ts diff --git a/src/queue/processors/http/deliver.ts b/src/queue/processors/http/deliver.ts new file mode 100644 index 0000000000..8cd9eb624e --- /dev/null +++ b/src/queue/processors/http/deliver.ts @@ -0,0 +1,17 @@ +import * as kue from 'kue'; + +import Channel from '../../models/channel'; +import Following from '../../models/following'; +import ChannelWatching from '../../models/channel-watching'; +import Post, { pack } from '../../models/post'; +import User, { isLocalUser } from '../../models/user'; +import stream, { publishChannelStream } from '../../publishers/stream'; +import context from '../../remote/activitypub/renderer/context'; +import renderCreate from '../../remote/activitypub/renderer/create'; +import renderNote from '../../remote/activitypub/renderer/note'; +import request from '../../remote/request'; + +export default async (job: kue.Job, done): Promise<void> => { + + request(user, following.follower[0].account.inbox, create); +} diff --git a/src/processor/http/follow.ts b/src/queue/processors/http/follow.ts index 8bf890efbc..8bf890efbc 100644 --- a/src/processor/http/follow.ts +++ b/src/queue/processors/http/follow.ts diff --git a/src/processor/http/index.ts b/src/queue/processors/http/index.ts index 8f9aa717c3..8f9aa717c3 100644 --- a/src/processor/http/index.ts +++ b/src/queue/processors/http/index.ts diff --git a/src/processor/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts index 963e532fe5..963e532fe5 100644 --- a/src/processor/http/perform-activitypub.ts +++ b/src/queue/processors/http/perform-activitypub.ts diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts new file mode 100644 index 0000000000..fff1fbf663 --- /dev/null +++ b/src/queue/processors/http/process-inbox.ts @@ -0,0 +1,55 @@ +import * as kue from 'kue'; + +import { verifySignature } from 'http-signature'; +import parseAcct from '../../acct/parse'; +import User, { IRemoteUser } from '../../models/user'; +import act from '../../remote/activitypub/act'; +import resolvePerson from '../../remote/activitypub/resolve-person'; +import Resolver from '../../remote/activitypub/resolver'; + +// ユーザーのinboxにアクティビティが届いた時の処理 +export default async (job: kue.Job, done): Promise<void> => { + const signature = job.data.signature; + const activity = job.data.activity; + + const keyIdLower = signature.keyId.toLowerCase(); + let user; + + if (keyIdLower.startsWith('acct:')) { + const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); + if (host === null) { + console.warn(`request was made by local user: @${username}`); + done(); + } + + user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser; + } else { + user = await User.findOne({ + host: { $ne: null }, + 'account.publicKey.id': signature.keyId + }) as IRemoteUser; + + // アクティビティを送信してきたユーザーがまだMisskeyサーバーに登録されていなかったら登録する + if (user === null) { + user = await resolvePerson(signature.keyId); + } + } + + if (user === null) { + done(new Error('failed to resolve user')); + return; + } + + if (!verifySignature(signature, user.account.publicKey.publicKeyPem)) { + done(new Error('signature verification failed')); + return; + } + + // アクティビティを処理 + try { + await act(new Resolver(), user, activity); + done(); + } catch (e) { + done(e); + } +}; diff --git a/src/processor/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts index 4f6f5ccee5..4f6f5ccee5 100644 --- a/src/processor/http/report-github-failure.ts +++ b/src/queue/processors/http/report-github-failure.ts diff --git a/src/processor/http/unfollow.ts b/src/queue/processors/http/unfollow.ts index d3d5f2246f..d3d5f2246f 100644 --- a/src/processor/http/unfollow.ts +++ b/src/queue/processors/http/unfollow.ts diff --git a/src/processor/index.ts b/src/queue/processors/index.ts index 172048ddae..172048ddae 100644 --- a/src/processor/index.ts +++ b/src/queue/processors/index.ts |