summaryrefslogtreecommitdiff
path: root/src/queue
diff options
context:
space:
mode:
authorAkihiko Odaki <nekomanma@pixiv.co.jp>2018-04-05 01:04:44 +0900
committerAkihiko Odaki <nekomanma@pixiv.co.jp>2018-04-05 01:04:44 +0900
commite330ac1934516807757afe2d2760fa21b27006e6 (patch)
tree84171f44928b460ea3c23a32e4b53bb0f2879318 /src/queue
parentRetry HTTP requests (diff)
downloadsharkey-e330ac1934516807757afe2d2760fa21b27006e6.tar.gz
sharkey-e330ac1934516807757afe2d2760fa21b27006e6.tar.bz2
sharkey-e330ac1934516807757afe2d2760fa21b27006e6.zip
Let unhandled rejection handler handle rejections in jobs
Diffstat (limited to 'src/queue')
-rw-r--r--src/queue/processors/db/delete-post-dependents.ts4
-rw-r--r--src/queue/processors/db/index.ts2
-rw-r--r--src/queue/processors/http/deliver-post.ts28
-rw-r--r--src/queue/processors/http/follow.ts79
-rw-r--r--src/queue/processors/http/index.ts2
-rw-r--r--src/queue/processors/http/perform-activitypub.ts5
-rw-r--r--src/queue/processors/http/process-inbox.ts51
-rw-r--r--src/queue/processors/http/report-github-failure.ts39
-rw-r--r--src/queue/processors/http/unfollow.ts31
9 files changed, 132 insertions, 109 deletions
diff --git a/src/queue/processors/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts
index 6de21eb053..fb6617e952 100644
--- a/src/queue/processors/db/delete-post-dependents.ts
+++ b/src/queue/processors/db/delete-post-dependents.ts
@@ -5,7 +5,7 @@ import PostReaction from '../../../models/post-reaction';
import PostWatching from '../../../models/post-watching';
import Post from '../../../models/post';
-export default async ({ data }) => Promise.all([
+export default ({ data }, done) => Promise.all([
Favorite.remove({ postId: data._id }),
Notification.remove({ postId: data._id }),
PollVote.remove({ postId: data._id }),
@@ -19,4 +19,4 @@ export default async ({ data }) => Promise.all([
}),
Post.remove({ repostId: data._id })
]))
-]);
+]).then(() => done(), done);
diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts
index 75838c099b..468ec442ac 100644
--- a/src/queue/processors/db/index.ts
+++ b/src/queue/processors/db/index.ts
@@ -4,4 +4,4 @@ const handlers = {
deletePostDependents
};
-export default (job, done) => handlers[job.data.type](job).then(() => done(), done);
+export default (job, done) => handlers[job.data.type](job, done);
diff --git a/src/queue/processors/http/deliver-post.ts b/src/queue/processors/http/deliver-post.ts
index e743fc5f68..8107c8bf74 100644
--- a/src/queue/processors/http/deliver-post.ts
+++ b/src/queue/processors/http/deliver-post.ts
@@ -5,17 +5,23 @@ import renderCreate from '../../../remote/activitypub/renderer/create';
import renderNote from '../../../remote/activitypub/renderer/note';
import request from '../../../remote/request';
-export default async ({ data }) => {
- const promisedTo = User.findOne({ _id: data.toId }) as Promise<IRemoteUser>;
- const [from, post] = await Promise.all([
- User.findOne({ _id: data.fromId }),
- Post.findOne({ _id: data.postId })
- ]);
- const note = await renderNote(from, post);
- const to = await promisedTo;
- const create = renderCreate(note);
+export default async ({ data }, done) => {
+ try {
+ const promisedTo = User.findOne({ _id: data.toId }) as Promise<IRemoteUser>;
+ const [from, post] = await Promise.all([
+ User.findOne({ _id: data.fromId }),
+ Post.findOne({ _id: data.postId })
+ ]);
+ const note = await renderNote(from, post);
+ const to = await promisedTo;
+ const create = renderCreate(note);
- create['@context'] = context;
+ create['@context'] = context;
- return request(from, to.account.inbox, create);
+ await request(from, to.account.inbox, create);
+ } catch (error) {
+ done(error);
+ }
+
+ done();
};
diff --git a/src/queue/processors/http/follow.ts b/src/queue/processors/http/follow.ts
index 4cb72828e7..ba1cc31186 100644
--- a/src/queue/processors/http/follow.ts
+++ b/src/queue/processors/http/follow.ts
@@ -7,10 +7,8 @@ import notify from '../../../publishers/notify';
import context from '../../../remote/activitypub/renderer/context';
import render from '../../../remote/activitypub/renderer/follow';
import request from '../../../remote/request';
-import Logger from '../../../utils/logger';
-export default async ({ data }) => {
- const { followerId, followeeId } = await Following.findOne({ _id: data.following });
+export default ({ data }, done) => Following.findOne({ _id: data.following }).then(async ({ followerId, followeeId }) => {
const [follower, followee] = await Promise.all([
User.findOne({ _id: followerId }),
User.findOne({ _id: followeeId })
@@ -23,47 +21,46 @@ export default async ({ data }) => {
await request(follower, followee.account.inbox, rendered);
}
- try {
- await Promise.all([
- // Increment following count
- User.update(followerId, {
- $inc: {
- followingCount: 1
- }
- }),
+ return [follower, followee];
+}).then(([follower, followee]) => Promise.all([
+ // Increment following count
+ User.update(follower._id, {
+ $inc: {
+ followingCount: 1
+ }
+ }),
- FollowingLog.insert({
- createdAt: data.following.createdAt,
- userId: followerId,
- count: follower.followingCount + 1
- }),
+ FollowingLog.insert({
+ createdAt: data.following.createdAt,
+ userId: follower._id,
+ count: follower.followingCount + 1
+ }),
- // Increment followers count
- User.update({ _id: followeeId }, {
- $inc: {
- followersCount: 1
- }
- }),
+ // Increment followers count
+ User.update({ _id: followee._id }, {
+ $inc: {
+ followersCount: 1
+ }
+ }),
- FollowedLog.insert({
- createdAt: data.following.createdAt,
- userId: followerId,
- count: followee.followersCount + 1
- }),
+ FollowedLog.insert({
+ createdAt: data.following.createdAt,
+ userId: follower._id,
+ count: followee.followersCount + 1
+ }),
- // Publish follow event
- isLocalUser(follower) && packUser(followee, follower)
- .then(packed => event(follower._id, 'follow', packed)),
+ // Publish follow event
+ isLocalUser(follower) && packUser(followee, follower)
+ .then(packed => event(follower._id, 'follow', packed)),
- isLocalUser(followee) && Promise.all([
- packUser(follower, followee)
- .then(packed => event(followee._id, 'followed', packed)),
+ isLocalUser(followee) && Promise.all([
+ packUser(follower, followee)
+ .then(packed => event(followee._id, 'followed', packed)),
- // Notify
- isLocalUser(followee) && notify(followeeId, followerId, 'follow')
- ])
- ]);
- } catch (error) {
- Logger.error(error.toString());
- }
-};
+ // Notify
+ isLocalUser(followee) && notify(followee._id, follower._id, 'follow')
+ ])
+]).then(() => done(), error => {
+ done();
+ throw error;
+}), done);
diff --git a/src/queue/processors/http/index.ts b/src/queue/processors/http/index.ts
index 8f9aa717c3..0ea79305c6 100644
--- a/src/queue/processors/http/index.ts
+++ b/src/queue/processors/http/index.ts
@@ -14,4 +14,4 @@ const handlers = {
unfollow
};
-export default (job, done) => handlers[job.data.type](job).then(() => done(), done);
+export default (job, done) => handlers[job.data.type](job, done);
diff --git a/src/queue/processors/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts
index 7b84400d5c..ae70c0f0be 100644
--- a/src/queue/processors/http/perform-activitypub.ts
+++ b/src/queue/processors/http/perform-activitypub.ts
@@ -2,6 +2,7 @@ import User from '../../../models/user';
import act from '../../../remote/activitypub/act';
import Resolver from '../../../remote/activitypub/resolver';
-export default ({ data }) => User.findOne({ _id: data.actor })
+export default ({ data }, done) => User.findOne({ _id: data.actor })
.then(actor => act(new Resolver(), actor, data.outbox))
- .then(Promise.all);
+ .then(Promise.all)
+ .then(() => done(), done);
diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts
index de1dbd2f98..88fbb97377 100644
--- a/src/queue/processors/http/process-inbox.ts
+++ b/src/queue/processors/http/process-inbox.ts
@@ -5,35 +5,40 @@ import act from '../../../remote/activitypub/act';
import resolvePerson from '../../../remote/activitypub/resolve-person';
import Resolver from '../../../remote/activitypub/resolver';
-export default async ({ data }): Promise<void> => {
- const keyIdLower = data.signature.keyId.toLowerCase();
- let user;
+export default async ({ data }, done) => {
+ try {
+ const keyIdLower = data.signature.keyId.toLowerCase();
+ let user;
- if (keyIdLower.startsWith('acct:')) {
- const { username, host } = parseAcct(keyIdLower.slice('acct:'.length));
- if (host === null) {
- throw 'request was made by local user';
- }
+ if (keyIdLower.startsWith('acct:')) {
+ const { username, host } = parseAcct(keyIdLower.slice('acct:'.length));
+ if (host === null) {
+ done();
+ return;
+ }
- user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser;
- } else {
- user = await User.findOne({
- host: { $ne: null },
- 'account.publicKey.id': data.signature.keyId
- }) as IRemoteUser;
+ user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser;
+ } else {
+ user = await User.findOne({
+ host: { $ne: null },
+ 'account.publicKey.id': data.signature.keyId
+ }) as IRemoteUser;
- if (user === null) {
- user = await resolvePerson(data.signature.keyId);
+ if (user === null) {
+ user = await resolvePerson(data.signature.keyId);
+ }
}
- }
- if (user === null) {
- throw 'failed to resolve user';
- }
+ if (user === null || !verifySignature(data.signature, user.account.publicKey.publicKeyPem)) {
+ done();
+ return;
+ }
- if (!verifySignature(data.signature, user.account.publicKey.publicKeyPem)) {
- throw 'signature verification failed';
+ await Promise.all(await act(new Resolver(), user, data.inbox, true));
+ } catch (error) {
+ done(error);
+ return;
}
- await Promise.all(await act(new Resolver(), user, data.inbox, true));
+ done();
};
diff --git a/src/queue/processors/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts
index 21683ba3c2..af9659bdac 100644
--- a/src/queue/processors/http/report-github-failure.ts
+++ b/src/queue/processors/http/report-github-failure.ts
@@ -2,23 +2,30 @@ import * as request from 'request-promise-native';
import User from '../../../models/user';
const createPost = require('../../../server/api/endpoints/posts/create');
-export default async ({ data }) => {
- const asyncBot = User.findOne({ _id: data.userId });
+export default async ({ data }, done) => {
+ try {
+ const asyncBot = User.findOne({ _id: data.userId });
- // Fetch parent status
- const parentStatuses = await request({
- url: `${data.parentUrl}/statuses`,
- headers: {
- 'User-Agent': 'misskey'
- },
- json: true
- });
+ // Fetch parent status
+ const parentStatuses = await request({
+ url: `${data.parentUrl}/statuses`,
+ headers: {
+ 'User-Agent': 'misskey'
+ },
+ json: true
+ });
- const parentState = parentStatuses[0].state;
- const stillFailed = parentState == 'failure' || parentState == 'error';
- const text = stillFailed ?
- `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` :
- `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`;
+ const parentState = parentStatuses[0].state;
+ const stillFailed = parentState == 'failure' || parentState == 'error';
+ const text = stillFailed ?
+ `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` :
+ `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`;
- createPost({ text }, await asyncBot);
+ createPost({ text }, await asyncBot);
+ } catch (error) {
+ done(error);
+ return;
+ }
+
+ done();
};
diff --git a/src/queue/processors/http/unfollow.ts b/src/queue/processors/http/unfollow.ts
index 801a3612a7..dc50e946c9 100644
--- a/src/queue/processors/http/unfollow.ts
+++ b/src/queue/processors/http/unfollow.ts
@@ -7,24 +7,31 @@ import renderFollow from '../../../remote/activitypub/renderer/follow';
import renderUndo from '../../../remote/activitypub/renderer/undo';
import context from '../../../remote/activitypub/renderer/context';
import request from '../../../remote/request';
-import Logger from '../../../utils/logger';
-export default async ({ data }) => {
+export default async ({ data }, done) => {
const following = await Following.findOne({ _id: data.id });
if (following === null) {
+ done();
return;
}
- const [follower, followee] = await Promise.all([
- User.findOne({ _id: following.followerId }),
- User.findOne({ _id: following.followeeId })
- ]);
+ let follower, followee;
- if (isLocalUser(follower) && isRemoteUser(followee)) {
- const undo = renderUndo(renderFollow(follower, followee));
- undo['@context'] = context;
+ try {
+ [follower, followee] = await Promise.all([
+ User.findOne({ _id: following.followerId }),
+ User.findOne({ _id: following.followeeId })
+ ]);
- await request(follower, followee.account.inbox, undo);
+ if (isLocalUser(follower) && isRemoteUser(followee)) {
+ const undo = renderUndo(renderFollow(follower, followee));
+ undo['@context'] = context;
+
+ await request(follower, followee.account.inbox, undo);
+ }
+ } catch (error) {
+ done(error);
+ return;
}
try {
@@ -57,7 +64,7 @@ export default async ({ data }) => {
// Publish follow event
stream(follower._id, 'unfollow', promisedPackedUser);
- } catch (error) {
- Logger.error(error.toString());
+ } finally {
+ done();
}
};