summaryrefslogtreecommitdiff
path: root/src/queue/processors
diff options
context:
space:
mode:
authorsyuilo <Syuilotan@yahoo.co.jp>2018-04-07 17:19:44 +0900
committerGitHub <noreply@github.com>2018-04-07 17:19:44 +0900
commitb3cb4c7d94c7b9a2900db7042dd2971362f8de88 (patch)
treefa3c3588cb376b2464fd8f4691e89238581d1192 /src/queue/processors
parentfix(package): update webpack-cli to version 2.0.14 (diff)
parentMerge pull request #1401 from syuilo/greenkeeper/webpack-4.5.0 (diff)
downloadsharkey-b3cb4c7d94c7b9a2900db7042dd2971362f8de88.tar.gz
sharkey-b3cb4c7d94c7b9a2900db7042dd2971362f8de88.tar.bz2
sharkey-b3cb4c7d94c7b9a2900db7042dd2971362f8de88.zip
Merge branch 'master' into greenkeeper/webpack-cli-2.0.14
Diffstat (limited to 'src/queue/processors')
-rw-r--r--src/queue/processors/db/delete-post-dependents.ts22
-rw-r--r--src/queue/processors/db/index.ts7
-rw-r--r--src/queue/processors/http/deliver-post.ts27
-rw-r--r--src/queue/processors/http/deliver.ts19
-rw-r--r--src/queue/processors/http/follow.ts66
-rw-r--r--src/queue/processors/http/index.ts25
-rw-r--r--src/queue/processors/http/perform-activitypub.ts8
-rw-r--r--src/queue/processors/http/process-inbox.ts74
-rw-r--r--src/queue/processors/http/report-github-failure.ts41
-rw-r--r--src/queue/processors/http/unfollow.ts71
10 files changed, 98 insertions, 262 deletions
diff --git a/src/queue/processors/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts
deleted file mode 100644
index fb6617e952..0000000000
--- a/src/queue/processors/db/delete-post-dependents.ts
+++ /dev/null
@@ -1,22 +0,0 @@
-import Favorite from '../../../models/favorite';
-import Notification from '../../../models/notification';
-import PollVote from '../../../models/poll-vote';
-import PostReaction from '../../../models/post-reaction';
-import PostWatching from '../../../models/post-watching';
-import Post from '../../../models/post';
-
-export default ({ data }, done) => Promise.all([
- Favorite.remove({ postId: data._id }),
- Notification.remove({ postId: data._id }),
- PollVote.remove({ postId: data._id }),
- PostReaction.remove({ postId: data._id }),
- PostWatching.remove({ postId: data._id }),
- Post.find({ repostId: data._id }).then(reposts => Promise.all([
- Notification.remove({
- postId: {
- $in: reposts.map(({ _id }) => _id)
- }
- }),
- Post.remove({ repostId: data._id })
- ]))
-]).then(() => done(), done);
diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts
deleted file mode 100644
index 468ec442ac..0000000000
--- a/src/queue/processors/db/index.ts
+++ /dev/null
@@ -1,7 +0,0 @@
-import deletePostDependents from './delete-post-dependents';
-
-const handlers = {
- deletePostDependents
-};
-
-export default (job, done) => handlers[job.data.type](job, done);
diff --git a/src/queue/processors/http/deliver-post.ts b/src/queue/processors/http/deliver-post.ts
deleted file mode 100644
index 8107c8bf74..0000000000
--- a/src/queue/processors/http/deliver-post.ts
+++ /dev/null
@@ -1,27 +0,0 @@
-import Post from '../../../models/post';
-import User, { IRemoteUser } from '../../../models/user';
-import context from '../../../remote/activitypub/renderer/context';
-import renderCreate from '../../../remote/activitypub/renderer/create';
-import renderNote from '../../../remote/activitypub/renderer/note';
-import request from '../../../remote/request';
-
-export default async ({ data }, done) => {
- try {
- const promisedTo = User.findOne({ _id: data.toId }) as Promise<IRemoteUser>;
- const [from, post] = await Promise.all([
- User.findOne({ _id: data.fromId }),
- Post.findOne({ _id: data.postId })
- ]);
- const note = await renderNote(from, post);
- const to = await promisedTo;
- const create = renderCreate(note);
-
- create['@context'] = context;
-
- await request(from, to.account.inbox, create);
- } catch (error) {
- done(error);
- }
-
- done();
-};
diff --git a/src/queue/processors/http/deliver.ts b/src/queue/processors/http/deliver.ts
new file mode 100644
index 0000000000..422e355b5f
--- /dev/null
+++ b/src/queue/processors/http/deliver.ts
@@ -0,0 +1,19 @@
+import * as kue from 'kue';
+
+import request from '../../../remote/request';
+
+export default async (job: kue.Job, done): Promise<void> => {
+ try {
+ await request(job.data.user, job.data.to, job.data.content);
+ done();
+ } catch (res) {
+ if (res.statusCode >= 400 && res.statusCode < 500) {
+ // HTTPステータスコード4xxはクライアントエラーであり、それはつまり
+ // 何回再送しても成功することはないということなのでエラーにはしないでおく
+ done();
+ } else {
+ console.warn(`deliver failed: ${res.statusMessage}`);
+ done(new Error(res.statusMessage));
+ }
+ }
+};
diff --git a/src/queue/processors/http/follow.ts b/src/queue/processors/http/follow.ts
deleted file mode 100644
index ba1cc31186..0000000000
--- a/src/queue/processors/http/follow.ts
+++ /dev/null
@@ -1,66 +0,0 @@
-import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user';
-import Following from '../../../models/following';
-import FollowingLog from '../../../models/following-log';
-import FollowedLog from '../../../models/followed-log';
-import event from '../../../publishers/stream';
-import notify from '../../../publishers/notify';
-import context from '../../../remote/activitypub/renderer/context';
-import render from '../../../remote/activitypub/renderer/follow';
-import request from '../../../remote/request';
-
-export default ({ data }, done) => Following.findOne({ _id: data.following }).then(async ({ followerId, followeeId }) => {
- const [follower, followee] = await Promise.all([
- User.findOne({ _id: followerId }),
- User.findOne({ _id: followeeId })
- ]);
-
- if (isLocalUser(follower) && isRemoteUser(followee)) {
- const rendered = render(follower, followee);
- rendered['@context'] = context;
-
- await request(follower, followee.account.inbox, rendered);
- }
-
- return [follower, followee];
-}).then(([follower, followee]) => Promise.all([
- // Increment following count
- User.update(follower._id, {
- $inc: {
- followingCount: 1
- }
- }),
-
- FollowingLog.insert({
- createdAt: data.following.createdAt,
- userId: follower._id,
- count: follower.followingCount + 1
- }),
-
- // Increment followers count
- User.update({ _id: followee._id }, {
- $inc: {
- followersCount: 1
- }
- }),
-
- FollowedLog.insert({
- createdAt: data.following.createdAt,
- userId: follower._id,
- count: followee.followersCount + 1
- }),
-
- // Publish follow event
- isLocalUser(follower) && packUser(followee, follower)
- .then(packed => event(follower._id, 'follow', packed)),
-
- isLocalUser(followee) && Promise.all([
- packUser(follower, followee)
- .then(packed => event(followee._id, 'followed', packed)),
-
- // Notify
- isLocalUser(followee) && notify(followee._id, follower._id, 'follow')
- ])
-]).then(() => done(), error => {
- done();
- throw error;
-}), done);
diff --git a/src/queue/processors/http/index.ts b/src/queue/processors/http/index.ts
index 0ea79305c6..3dc2595374 100644
--- a/src/queue/processors/http/index.ts
+++ b/src/queue/processors/http/index.ts
@@ -1,17 +1,20 @@
-import deliverPost from './deliver-post';
-import follow from './follow';
-import performActivityPub from './perform-activitypub';
+import deliver from './deliver';
import processInbox from './process-inbox';
import reportGitHubFailure from './report-github-failure';
-import unfollow from './unfollow';
const handlers = {
- deliverPost,
- follow,
- performActivityPub,
- processInbox,
- reportGitHubFailure,
- unfollow
+ deliver,
+ processInbox,
+ reportGitHubFailure
};
-export default (job, done) => handlers[job.data.type](job, done);
+export default (job, done) => {
+ const handler = handlers[job.data.type];
+
+ if (handler) {
+ handler(job, done);
+ } else {
+ console.error(`Unknown job: ${job.data.type}`);
+ done();
+ }
+};
diff --git a/src/queue/processors/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts
deleted file mode 100644
index ae70c0f0be..0000000000
--- a/src/queue/processors/http/perform-activitypub.ts
+++ /dev/null
@@ -1,8 +0,0 @@
-import User from '../../../models/user';
-import act from '../../../remote/activitypub/act';
-import Resolver from '../../../remote/activitypub/resolver';
-
-export default ({ data }, done) => User.findOne({ _id: data.actor })
- .then(actor => act(new Resolver(), actor, data.outbox))
- .then(Promise.all)
- .then(() => done(), done);
diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts
index 7eeaa19f8a..eb4b62d37f 100644
--- a/src/queue/processors/http/process-inbox.ts
+++ b/src/queue/processors/http/process-inbox.ts
@@ -1,44 +1,66 @@
+import * as kue from 'kue';
+import * as debug from 'debug';
+
import { verifySignature } from 'http-signature';
import parseAcct from '../../../acct/parse';
import User, { IRemoteUser } from '../../../models/user';
import act from '../../../remote/activitypub/act';
import resolvePerson from '../../../remote/activitypub/resolve-person';
-import Resolver from '../../../remote/activitypub/resolver';
-export default async ({ data }, done) => {
- try {
- const keyIdLower = data.signature.keyId.toLowerCase();
- let user;
+const log = debug('misskey:queue:inbox');
- if (keyIdLower.startsWith('acct:')) {
- const { username, host } = parseAcct(keyIdLower.slice('acct:'.length));
- if (host === null) {
- done();
- return;
- }
+// ユーザーのinboxにアクティビティが届いた時の処理
+export default async (job: kue.Job, done): Promise<void> => {
+ const signature = job.data.signature;
+ const activity = job.data.activity;
- user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser;
- } else {
- user = await User.findOne({
- host: { $ne: null },
- 'account.publicKey.id': data.signature.keyId
- }) as IRemoteUser;
+ //#region Log
+ const info = Object.assign({}, activity);
+ delete info['@context'];
+ delete info['signature'];
+ log(info);
+ //#endregion
- if (user === null) {
- user = await resolvePerson(new Resolver(), data.signature.keyId);
- }
- }
+ const keyIdLower = signature.keyId.toLowerCase();
+ let user;
- if (user === null || !verifySignature(data.signature, user.account.publicKey.publicKeyPem)) {
+ if (keyIdLower.startsWith('acct:')) {
+ const { username, host } = parseAcct(keyIdLower.slice('acct:'.length));
+ if (host === null) {
+ console.warn(`request was made by local user: @${username}`);
done();
return;
}
- await Promise.all(await act(new Resolver(), user, data.inbox, true));
- } catch (error) {
- done(error);
+ user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser;
+ } else {
+ user = await User.findOne({
+ host: { $ne: null },
+ 'account.publicKey.id': signature.keyId
+ }) as IRemoteUser;
+
+ // アクティビティを送信してきたユーザーがまだMisskeyサーバーに登録されていなかったら登録する
+ if (user === null) {
+ user = await resolvePerson(signature.keyId);
+ }
+ }
+
+ if (user === null) {
+ done(new Error('failed to resolve user'));
+ return;
+ }
+
+ if (!verifySignature(signature, user.account.publicKey.publicKeyPem)) {
+ console.warn('signature verification failed');
+ done();
return;
}
- done();
+ // アクティビティを処理
+ try {
+ await act(user, activity);
+ done();
+ } catch (e) {
+ done(e);
+ }
};
diff --git a/src/queue/processors/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts
index af9659bdac..1e0b51f89f 100644
--- a/src/queue/processors/http/report-github-failure.ts
+++ b/src/queue/processors/http/report-github-failure.ts
@@ -1,31 +1,24 @@
import * as request from 'request-promise-native';
import User from '../../../models/user';
-const createPost = require('../../../server/api/endpoints/posts/create');
+import createPost from '../../../services/post/create';
-export default async ({ data }, done) => {
- try {
- const asyncBot = User.findOne({ _id: data.userId });
+export default async ({ data }) => {
+ const asyncBot = User.findOne({ _id: data.userId });
- // Fetch parent status
- const parentStatuses = await request({
- url: `${data.parentUrl}/statuses`,
- headers: {
- 'User-Agent': 'misskey'
- },
- json: true
- });
+ // Fetch parent status
+ const parentStatuses = await request({
+ url: `${data.parentUrl}/statuses`,
+ headers: {
+ 'User-Agent': 'misskey'
+ },
+ json: true
+ });
- const parentState = parentStatuses[0].state;
- const stillFailed = parentState == 'failure' || parentState == 'error';
- const text = stillFailed ?
- `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` :
- `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`;
+ const parentState = parentStatuses[0].state;
+ const stillFailed = parentState == 'failure' || parentState == 'error';
+ const text = stillFailed ?
+ `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` :
+ `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`;
- createPost({ text }, await asyncBot);
- } catch (error) {
- done(error);
- return;
- }
-
- done();
+ createPost(await asyncBot, { text });
};
diff --git a/src/queue/processors/http/unfollow.ts b/src/queue/processors/http/unfollow.ts
deleted file mode 100644
index d62eb280dc..0000000000
--- a/src/queue/processors/http/unfollow.ts
+++ /dev/null
@@ -1,71 +0,0 @@
-import FollowedLog from '../../../models/followed-log';
-import Following from '../../../models/following';
-import FollowingLog from '../../../models/following-log';
-import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user';
-import stream from '../../../publishers/stream';
-import renderFollow from '../../../remote/activitypub/renderer/follow';
-import renderUndo from '../../../remote/activitypub/renderer/undo';
-import context from '../../../remote/activitypub/renderer/context';
-import request from '../../../remote/request';
-
-export default async ({ data }, done) => {
- const following = await Following.findOne({ _id: data.id });
- if (following === null) {
- done();
- return;
- }
-
- let follower;
- let followee;
-
- try {
- [follower, followee] = await Promise.all([
- User.findOne({ _id: following.followerId }),
- User.findOne({ _id: following.followeeId })
- ]);
-
- if (isLocalUser(follower) && isRemoteUser(followee)) {
- const undo = renderUndo(renderFollow(follower, followee));
- undo['@context'] = context;
-
- await request(follower, followee.account.inbox, undo);
- }
- } catch (error) {
- done(error);
- return;
- }
-
- try {
- await Promise.all([
- // Delete following
- Following.findOneAndDelete({ _id: data.id }),
-
- // Decrement following count
- User.update({ _id: follower._id }, { $inc: { followingCount: -1 } }),
- FollowingLog.insert({
- createdAt: new Date(),
- userId: follower._id,
- count: follower.followingCount - 1
- }),
-
- // Decrement followers count
- User.update({ _id: followee._id }, { $inc: { followersCount: -1 } }),
- FollowedLog.insert({
- createdAt: new Date(),
- userId: followee._id,
- count: followee.followersCount - 1
- })
- ]);
-
- if (isLocalUser(follower)) {
- return;
- }
-
- const promisedPackedUser = packUser(followee, follower);
-
- // Publish follow event
- stream(follower._id, 'unfollow', promisedPackedUser);
- } finally {
- done();
- }
-};