summaryrefslogtreecommitdiff
path: root/src/queue
diff options
context:
space:
mode:
authorsyuilo <syuilotan@yahoo.co.jp>2018-04-04 23:12:35 +0900
committersyuilo <syuilotan@yahoo.co.jp>2018-04-04 23:12:35 +0900
commite8b42d7e1668679e6a6ee0a7aea1e2ff7f37005b (patch)
tree6973284192eb419bd7bfed2361a594e668b81f9a /src/queue
parentMerge pull request #1393 from akihikodaki/duplicate (diff)
downloadsharkey-e8b42d7e1668679e6a6ee0a7aea1e2ff7f37005b.tar.gz
sharkey-e8b42d7e1668679e6a6ee0a7aea1e2ff7f37005b.tar.bz2
sharkey-e8b42d7e1668679e6a6ee0a7aea1e2ff7f37005b.zip
wip
Diffstat (limited to 'src/queue')
-rw-r--r--src/queue/index.ts37
-rw-r--r--src/queue/processors/db/delete-post-dependents.ts22
-rw-r--r--src/queue/processors/db/index.ts7
-rw-r--r--src/queue/processors/http/deliver.ts17
-rw-r--r--src/queue/processors/http/follow.ts69
-rw-r--r--src/queue/processors/http/index.ts17
-rw-r--r--src/queue/processors/http/perform-activitypub.ts7
-rw-r--r--src/queue/processors/http/process-inbox.ts55
-rw-r--r--src/queue/processors/http/report-github-failure.ts24
-rw-r--r--src/queue/processors/http/unfollow.ts56
-rw-r--r--src/queue/processors/index.ts18
11 files changed, 329 insertions, 0 deletions
diff --git a/src/queue/index.ts b/src/queue/index.ts
new file mode 100644
index 0000000000..c8c436b18c
--- /dev/null
+++ b/src/queue/index.ts
@@ -0,0 +1,37 @@
+import { createQueue } from 'kue';
+import config from '../config';
+import db from './processors/db';
+import http from './processors/http';
+
+const queue = createQueue({
+ redis: {
+ port: config.redis.port,
+ host: config.redis.host,
+ auth: config.redis.pass
+ }
+});
+
+export function createHttp(data) {
+ return queue
+ .create('http', data)
+ .attempts(16)
+ .backoff({ delay: 16384, type: 'exponential' });
+}
+
+export function createDb(data) {
+ return queue.create('db', data);
+}
+
+export function process() {
+ queue.process('db', db);
+
+ /*
+ 256 is the default concurrency limit of Mozilla Firefox and Google
+ Chromium.
+ a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google
+ https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff
+ Network.http.max-connections - MozillaZine Knowledge Base
+ http://kb.mozillazine.org/Network.http.max-connections
+ */
+ queue.process('http', 256, http);
+}
diff --git a/src/queue/processors/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts
new file mode 100644
index 0000000000..879c41ec9c
--- /dev/null
+++ b/src/queue/processors/db/delete-post-dependents.ts
@@ -0,0 +1,22 @@
+import Favorite from '../../models/favorite';
+import Notification from '../../models/notification';
+import PollVote from '../../models/poll-vote';
+import PostReaction from '../../models/post-reaction';
+import PostWatching from '../../models/post-watching';
+import Post from '../../models/post';
+
+export default async ({ data }) => Promise.all([
+ Favorite.remove({ postId: data._id }),
+ Notification.remove({ postId: data._id }),
+ PollVote.remove({ postId: data._id }),
+ PostReaction.remove({ postId: data._id }),
+ PostWatching.remove({ postId: data._id }),
+ Post.find({ repostId: data._id }).then(reposts => Promise.all([
+ Notification.remove({
+ postId: {
+ $in: reposts.map(({ _id }) => _id)
+ }
+ }),
+ Post.remove({ repostId: data._id })
+ ]))
+]);
diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts
new file mode 100644
index 0000000000..75838c099b
--- /dev/null
+++ b/src/queue/processors/db/index.ts
@@ -0,0 +1,7 @@
+import deletePostDependents from './delete-post-dependents';
+
+const handlers = {
+ deletePostDependents
+};
+
+export default (job, done) => handlers[job.data.type](job).then(() => done(), done);
diff --git a/src/queue/processors/http/deliver.ts b/src/queue/processors/http/deliver.ts
new file mode 100644
index 0000000000..8cd9eb624e
--- /dev/null
+++ b/src/queue/processors/http/deliver.ts
@@ -0,0 +1,17 @@
+import * as kue from 'kue';
+
+import Channel from '../../models/channel';
+import Following from '../../models/following';
+import ChannelWatching from '../../models/channel-watching';
+import Post, { pack } from '../../models/post';
+import User, { isLocalUser } from '../../models/user';
+import stream, { publishChannelStream } from '../../publishers/stream';
+import context from '../../remote/activitypub/renderer/context';
+import renderCreate from '../../remote/activitypub/renderer/create';
+import renderNote from '../../remote/activitypub/renderer/note';
+import request from '../../remote/request';
+
+export default async (job: kue.Job, done): Promise<void> => {
+
+ request(user, following.follower[0].account.inbox, create);
+}
diff --git a/src/queue/processors/http/follow.ts b/src/queue/processors/http/follow.ts
new file mode 100644
index 0000000000..8bf890efbc
--- /dev/null
+++ b/src/queue/processors/http/follow.ts
@@ -0,0 +1,69 @@
+import User, { isLocalUser, pack as packUser } from '../../models/user';
+import Following from '../../models/following';
+import FollowingLog from '../../models/following-log';
+import FollowedLog from '../../models/followed-log';
+import event from '../../publishers/stream';
+import notify from '../../publishers/notify';
+import context from '../../remote/activitypub/renderer/context';
+import render from '../../remote/activitypub/renderer/follow';
+import request from '../../remote/request';
+
+export default ({ data }) => Following.findOne({ _id: data.following }).then(({ followerId, followeeId }) => {
+ const promisedFollower = User.findOne({ _id: followerId });
+ const promisedFollowee = User.findOne({ _id: followeeId });
+
+ return Promise.all([
+ // Increment following count
+ User.update(followerId, {
+ $inc: {
+ followingCount: 1
+ }
+ }),
+
+ promisedFollower.then(({ followingCount }) => FollowingLog.insert({
+ createdAt: data.following.createdAt,
+ userId: followerId,
+ count: followingCount + 1
+ })),
+
+ // Increment followers count
+ User.update({ _id: followeeId }, {
+ $inc: {
+ followersCount: 1
+ }
+ }),
+
+ promisedFollowee.then(({ followersCount }) => FollowedLog.insert({
+ createdAt: data.following.createdAt,
+ userId: followerId,
+ count: followersCount + 1
+ })),
+
+ // Notify
+ promisedFollowee.then(followee => followee.host === null ?
+ notify(followeeId, followerId, 'follow') : null),
+
+ // Publish follow event
+ Promise.all([promisedFollower, promisedFollowee]).then(([follower, followee]) => {
+ let followerEvent;
+ let followeeEvent;
+
+ if (isLocalUser(follower)) {
+ followerEvent = packUser(followee, follower)
+ .then(packed => event(follower._id, 'follow', packed));
+ }
+
+ if (isLocalUser(followee)) {
+ followeeEvent = packUser(follower, followee)
+ .then(packed => event(followee._id, 'followed', packed));
+ } else if (isLocalUser(follower)) {
+ const rendered = render(follower, followee);
+ rendered['@context'] = context;
+
+ followeeEvent = request(follower, followee.account.inbox, rendered);
+ }
+
+ return Promise.all([followerEvent, followeeEvent]);
+ })
+ ]);
+});
diff --git a/src/queue/processors/http/index.ts b/src/queue/processors/http/index.ts
new file mode 100644
index 0000000000..8f9aa717c3
--- /dev/null
+++ b/src/queue/processors/http/index.ts
@@ -0,0 +1,17 @@
+import deliverPost from './deliver-post';
+import follow from './follow';
+import performActivityPub from './perform-activitypub';
+import processInbox from './process-inbox';
+import reportGitHubFailure from './report-github-failure';
+import unfollow from './unfollow';
+
+const handlers = {
+ deliverPost,
+ follow,
+ performActivityPub,
+ processInbox,
+ reportGitHubFailure,
+ unfollow
+};
+
+export default (job, done) => handlers[job.data.type](job).then(() => done(), done);
diff --git a/src/queue/processors/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts
new file mode 100644
index 0000000000..963e532fe5
--- /dev/null
+++ b/src/queue/processors/http/perform-activitypub.ts
@@ -0,0 +1,7 @@
+import User from '../../models/user';
+import act from '../../remote/activitypub/act';
+import Resolver from '../../remote/activitypub/resolver';
+
+export default ({ data }) => User.findOne({ _id: data.actor })
+ .then(actor => act(new Resolver(), actor, data.outbox))
+ .then(Promise.all);
diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts
new file mode 100644
index 0000000000..fff1fbf663
--- /dev/null
+++ b/src/queue/processors/http/process-inbox.ts
@@ -0,0 +1,55 @@
+import * as kue from 'kue';
+
+import { verifySignature } from 'http-signature';
+import parseAcct from '../../acct/parse';
+import User, { IRemoteUser } from '../../models/user';
+import act from '../../remote/activitypub/act';
+import resolvePerson from '../../remote/activitypub/resolve-person';
+import Resolver from '../../remote/activitypub/resolver';
+
+// ユーザーのinboxにアクティビティが届いた時の処理
+export default async (job: kue.Job, done): Promise<void> => {
+ const signature = job.data.signature;
+ const activity = job.data.activity;
+
+ const keyIdLower = signature.keyId.toLowerCase();
+ let user;
+
+ if (keyIdLower.startsWith('acct:')) {
+ const { username, host } = parseAcct(keyIdLower.slice('acct:'.length));
+ if (host === null) {
+ console.warn(`request was made by local user: @${username}`);
+ done();
+ }
+
+ user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser;
+ } else {
+ user = await User.findOne({
+ host: { $ne: null },
+ 'account.publicKey.id': signature.keyId
+ }) as IRemoteUser;
+
+ // アクティビティを送信してきたユーザーがまだMisskeyサーバーに登録されていなかったら登録する
+ if (user === null) {
+ user = await resolvePerson(signature.keyId);
+ }
+ }
+
+ if (user === null) {
+ done(new Error('failed to resolve user'));
+ return;
+ }
+
+ if (!verifySignature(signature, user.account.publicKey.publicKeyPem)) {
+ done(new Error('signature verification failed'));
+ return;
+ }
+
+ // アクティビティを処理
+ try {
+ await act(new Resolver(), user, activity);
+ done();
+ } catch (e) {
+ done(e);
+ }
+};
diff --git a/src/queue/processors/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts
new file mode 100644
index 0000000000..4f6f5ccee5
--- /dev/null
+++ b/src/queue/processors/http/report-github-failure.ts
@@ -0,0 +1,24 @@
+import * as request from 'request-promise-native';
+import User from '../../models/user';
+const createPost = require('../../server/api/endpoints/posts/create');
+
+export default async ({ data }) => {
+ const asyncBot = User.findOne({ _id: data.userId });
+
+ // Fetch parent status
+ const parentStatuses = await request({
+ url: `${data.parentUrl}/statuses`,
+ headers: {
+ 'User-Agent': 'misskey'
+ },
+ json: true
+ });
+
+ const parentState = parentStatuses[0].state;
+ const stillFailed = parentState == 'failure' || parentState == 'error';
+ const text = stillFailed ?
+ `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` :
+ `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`;
+
+ createPost({ text }, await asyncBot);
+};
diff --git a/src/queue/processors/http/unfollow.ts b/src/queue/processors/http/unfollow.ts
new file mode 100644
index 0000000000..d3d5f2246f
--- /dev/null
+++ b/src/queue/processors/http/unfollow.ts
@@ -0,0 +1,56 @@
+import FollowedLog from '../../models/followed-log';
+import Following from '../../models/following';
+import FollowingLog from '../../models/following-log';
+import User, { isRemoteUser, pack as packUser } from '../../models/user';
+import stream from '../../publishers/stream';
+import renderFollow from '../../remote/activitypub/renderer/follow';
+import renderUndo from '../../remote/activitypub/renderer/undo';
+import context from '../../remote/activitypub/renderer/context';
+import request from '../../remote/request';
+
+export default async ({ data }) => {
+ // Delete following
+ const following = await Following.findOneAndDelete({ _id: data.id });
+ if (following === null) {
+ return;
+ }
+
+ const promisedFollower = User.findOne({ _id: following.followerId });
+ const promisedFollowee = User.findOne({ _id: following.followeeId });
+
+ await Promise.all([
+ // Decrement following count
+ User.update({ _id: following.followerId }, { $inc: { followingCount: -1 } }),
+ promisedFollower.then(({ followingCount }) => FollowingLog.insert({
+ createdAt: new Date(),
+ userId: following.followerId,
+ count: followingCount - 1
+ })),
+
+ // Decrement followers count
+ User.update({ _id: following.followeeId }, { $inc: { followersCount: -1 } }),
+ promisedFollowee.then(({ followersCount }) => FollowedLog.insert({
+ createdAt: new Date(),
+ userId: following.followeeId,
+ count: followersCount - 1
+ })),
+
+ // Publish follow event
+ Promise.all([promisedFollower, promisedFollowee]).then(async ([follower, followee]) => {
+ if (isRemoteUser(follower)) {
+ return;
+ }
+
+ const promisedPackedUser = packUser(followee, follower);
+
+ if (isRemoteUser(followee)) {
+ const undo = renderUndo(renderFollow(follower, followee));
+ undo['@context'] = context;
+
+ await request(follower, followee.account.inbox, undo);
+ }
+
+ stream(follower._id, 'unfollow', promisedPackedUser);
+ })
+ ]);
+};
diff --git a/src/queue/processors/index.ts b/src/queue/processors/index.ts
new file mode 100644
index 0000000000..172048ddae
--- /dev/null
+++ b/src/queue/processors/index.ts
@@ -0,0 +1,18 @@
+import queue from '../queue';
+import db from './db';
+import http from './http';
+
+export default () => {
+ queue.process('db', db);
+
+ /*
+ 256 is the default concurrency limit of Mozilla Firefox and Google
+ Chromium.
+
+ a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google
+ https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff
+ Network.http.max-connections - MozillaZine Knowledge Base
+ http://kb.mozillazine.org/Network.http.max-connections
+ */
+ queue.process('http', 256, http);
+};