summaryrefslogtreecommitdiff
path: root/src/queue
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/queue.ts10
-rw-r--r--src/queue/index.ts37
-rw-r--r--src/queue/processors/db/delete-post-dependents.ts (renamed from src/processor/db/delete-post-dependents.ts)0
-rw-r--r--src/queue/processors/db/index.ts (renamed from src/processor/db/index.ts)0
-rw-r--r--src/queue/processors/http/deliver.ts17
-rw-r--r--src/queue/processors/http/follow.ts (renamed from src/processor/http/follow.ts)0
-rw-r--r--src/queue/processors/http/index.ts (renamed from src/processor/http/index.ts)0
-rw-r--r--src/queue/processors/http/perform-activitypub.ts (renamed from src/processor/http/perform-activitypub.ts)0
-rw-r--r--src/queue/processors/http/process-inbox.ts55
-rw-r--r--src/queue/processors/http/report-github-failure.ts (renamed from src/processor/http/report-github-failure.ts)0
-rw-r--r--src/queue/processors/http/unfollow.ts (renamed from src/processor/http/unfollow.ts)0
-rw-r--r--src/queue/processors/index.ts (renamed from src/processor/index.ts)0
12 files changed, 109 insertions, 10 deletions
diff --git a/src/queue.ts b/src/queue.ts
deleted file mode 100644
index 08ea13c2a3..0000000000
--- a/src/queue.ts
+++ /dev/null
@@ -1,10 +0,0 @@
-import { createQueue } from 'kue';
-import config from './config';
-
-export default createQueue({
- redis: {
- port: config.redis.port,
- host: config.redis.host,
- auth: config.redis.pass
- }
-});
diff --git a/src/queue/index.ts b/src/queue/index.ts
new file mode 100644
index 0000000000..c8c436b18c
--- /dev/null
+++ b/src/queue/index.ts
@@ -0,0 +1,37 @@
+import { createQueue } from 'kue';
+import config from '../config';
+import db from './processors/db';
+import http from './processors/http';
+
+const queue = createQueue({
+ redis: {
+ port: config.redis.port,
+ host: config.redis.host,
+ auth: config.redis.pass
+ }
+});
+
+export function createHttp(data) {
+ return queue
+ .create('http', data)
+ .attempts(16)
+ .backoff({ delay: 16384, type: 'exponential' });
+}
+
+export function createDb(data) {
+ return queue.create('db', data);
+}
+
+export function process() {
+ queue.process('db', db);
+
+ /*
+ 256 is the default concurrency limit of Mozilla Firefox and Google
+ Chromium.
+ a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google
+ https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff
+ Network.http.max-connections - MozillaZine Knowledge Base
+ http://kb.mozillazine.org/Network.http.max-connections
+ */
+ queue.process('http', 256, http);
+}
diff --git a/src/processor/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts
index 879c41ec9c..879c41ec9c 100644
--- a/src/processor/db/delete-post-dependents.ts
+++ b/src/queue/processors/db/delete-post-dependents.ts
diff --git a/src/processor/db/index.ts b/src/queue/processors/db/index.ts
index 75838c099b..75838c099b 100644
--- a/src/processor/db/index.ts
+++ b/src/queue/processors/db/index.ts
diff --git a/src/queue/processors/http/deliver.ts b/src/queue/processors/http/deliver.ts
new file mode 100644
index 0000000000..8cd9eb624e
--- /dev/null
+++ b/src/queue/processors/http/deliver.ts
@@ -0,0 +1,17 @@
+import * as kue from 'kue';
+
+import Channel from '../../models/channel';
+import Following from '../../models/following';
+import ChannelWatching from '../../models/channel-watching';
+import Post, { pack } from '../../models/post';
+import User, { isLocalUser } from '../../models/user';
+import stream, { publishChannelStream } from '../../publishers/stream';
+import context from '../../remote/activitypub/renderer/context';
+import renderCreate from '../../remote/activitypub/renderer/create';
+import renderNote from '../../remote/activitypub/renderer/note';
+import request from '../../remote/request';
+
+export default async (job: kue.Job, done): Promise<void> => {
+
+ request(user, following.follower[0].account.inbox, create);
+}
diff --git a/src/processor/http/follow.ts b/src/queue/processors/http/follow.ts
index 8bf890efbc..8bf890efbc 100644
--- a/src/processor/http/follow.ts
+++ b/src/queue/processors/http/follow.ts
diff --git a/src/processor/http/index.ts b/src/queue/processors/http/index.ts
index 8f9aa717c3..8f9aa717c3 100644
--- a/src/processor/http/index.ts
+++ b/src/queue/processors/http/index.ts
diff --git a/src/processor/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts
index 963e532fe5..963e532fe5 100644
--- a/src/processor/http/perform-activitypub.ts
+++ b/src/queue/processors/http/perform-activitypub.ts
diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts
new file mode 100644
index 0000000000..fff1fbf663
--- /dev/null
+++ b/src/queue/processors/http/process-inbox.ts
@@ -0,0 +1,55 @@
+import * as kue from 'kue';
+
+import { verifySignature } from 'http-signature';
+import parseAcct from '../../acct/parse';
+import User, { IRemoteUser } from '../../models/user';
+import act from '../../remote/activitypub/act';
+import resolvePerson from '../../remote/activitypub/resolve-person';
+import Resolver from '../../remote/activitypub/resolver';
+
+// ユーザーのinboxにアクティビティが届いた時の処理
+export default async (job: kue.Job, done): Promise<void> => {
+ const signature = job.data.signature;
+ const activity = job.data.activity;
+
+ const keyIdLower = signature.keyId.toLowerCase();
+ let user;
+
+ if (keyIdLower.startsWith('acct:')) {
+ const { username, host } = parseAcct(keyIdLower.slice('acct:'.length));
+ if (host === null) {
+ console.warn(`request was made by local user: @${username}`);
+ done();
+ }
+
+ user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser;
+ } else {
+ user = await User.findOne({
+ host: { $ne: null },
+ 'account.publicKey.id': signature.keyId
+ }) as IRemoteUser;
+
+ // アクティビティを送信してきたユーザーがまだMisskeyサーバーに登録されていなかったら登録する
+ if (user === null) {
+ user = await resolvePerson(signature.keyId);
+ }
+ }
+
+ if (user === null) {
+ done(new Error('failed to resolve user'));
+ return;
+ }
+
+ if (!verifySignature(signature, user.account.publicKey.publicKeyPem)) {
+ done(new Error('signature verification failed'));
+ return;
+ }
+
+ // アクティビティを処理
+ try {
+ await act(new Resolver(), user, activity);
+ done();
+ } catch (e) {
+ done(e);
+ }
+};
diff --git a/src/processor/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts
index 4f6f5ccee5..4f6f5ccee5 100644
--- a/src/processor/http/report-github-failure.ts
+++ b/src/queue/processors/http/report-github-failure.ts
diff --git a/src/processor/http/unfollow.ts b/src/queue/processors/http/unfollow.ts
index d3d5f2246f..d3d5f2246f 100644
--- a/src/processor/http/unfollow.ts
+++ b/src/queue/processors/http/unfollow.ts
diff --git a/src/processor/index.ts b/src/queue/processors/index.ts
index 172048ddae..172048ddae 100644
--- a/src/processor/index.ts
+++ b/src/queue/processors/index.ts