From d7c13b975f55c85b695b72a3ded3d5de97227414 Mon Sep 17 00:00:00 2001 From: Akihiko Odaki Date: Wed, 4 Apr 2018 22:45:55 +0900 Subject: Retry HTTP requests --- src/queue/index.ts | 38 ++++++++++++ src/queue/processors/db/delete-post-dependents.ts | 22 +++++++ src/queue/processors/db/index.ts | 7 +++ src/queue/processors/http/deliver-post.ts | 21 +++++++ src/queue/processors/http/follow.ts | 69 ++++++++++++++++++++++ src/queue/processors/http/index.ts | 17 ++++++ src/queue/processors/http/perform-activitypub.ts | 7 +++ src/queue/processors/http/process-inbox.ts | 39 ++++++++++++ src/queue/processors/http/report-github-failure.ts | 24 ++++++++ src/queue/processors/http/unfollow.ts | 63 ++++++++++++++++++++ 10 files changed, 307 insertions(+) create mode 100644 src/queue/index.ts create mode 100644 src/queue/processors/db/delete-post-dependents.ts create mode 100644 src/queue/processors/db/index.ts create mode 100644 src/queue/processors/http/deliver-post.ts create mode 100644 src/queue/processors/http/follow.ts create mode 100644 src/queue/processors/http/index.ts create mode 100644 src/queue/processors/http/perform-activitypub.ts create mode 100644 src/queue/processors/http/process-inbox.ts create mode 100644 src/queue/processors/http/report-github-failure.ts create mode 100644 src/queue/processors/http/unfollow.ts (limited to 'src/queue') diff --git a/src/queue/index.ts b/src/queue/index.ts new file mode 100644 index 0000000000..f90754a561 --- /dev/null +++ b/src/queue/index.ts @@ -0,0 +1,38 @@ +import { createQueue } from 'kue'; +import config from '../config'; +import db from './processors/db'; +import http from './processors/http'; + +const queue = createQueue({ + redis: { + port: config.redis.port, + host: config.redis.host, + auth: config.redis.pass + } +}); + +export function createHttp(data) { + return queue + .create('http', data) + .attempts(16) + .backoff({ delay: 16384, type: 'exponential' }); +} + +export function createDb(data) { + return queue.create('db', data); +} + +export function process() { + queue.process('db', db); + + /* + 256 is the default concurrency limit of Mozilla Firefox and Google + Chromium. + + a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google + https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff + Network.http.max-connections - MozillaZine Knowledge Base + http://kb.mozillazine.org/Network.http.max-connections + */ + queue.process('http', 256, http); +} diff --git a/src/queue/processors/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts new file mode 100644 index 0000000000..6de21eb053 --- /dev/null +++ b/src/queue/processors/db/delete-post-dependents.ts @@ -0,0 +1,22 @@ +import Favorite from '../../../models/favorite'; +import Notification from '../../../models/notification'; +import PollVote from '../../../models/poll-vote'; +import PostReaction from '../../../models/post-reaction'; +import PostWatching from '../../../models/post-watching'; +import Post from '../../../models/post'; + +export default async ({ data }) => Promise.all([ + Favorite.remove({ postId: data._id }), + Notification.remove({ postId: data._id }), + PollVote.remove({ postId: data._id }), + PostReaction.remove({ postId: data._id }), + PostWatching.remove({ postId: data._id }), + Post.find({ repostId: data._id }).then(reposts => Promise.all([ + Notification.remove({ + postId: { + $in: reposts.map(({ _id }) => _id) + } + }), + Post.remove({ repostId: data._id }) + ])) +]); diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts new file mode 100644 index 0000000000..75838c099b --- /dev/null +++ b/src/queue/processors/db/index.ts @@ -0,0 +1,7 @@ +import deletePostDependents from './delete-post-dependents'; + +const handlers = { + deletePostDependents +}; + +export default (job, done) => handlers[job.data.type](job).then(() => done(), done); diff --git a/src/queue/processors/http/deliver-post.ts b/src/queue/processors/http/deliver-post.ts new file mode 100644 index 0000000000..e743fc5f68 --- /dev/null +++ b/src/queue/processors/http/deliver-post.ts @@ -0,0 +1,21 @@ +import Post from '../../../models/post'; +import User, { IRemoteUser } from '../../../models/user'; +import context from '../../../remote/activitypub/renderer/context'; +import renderCreate from '../../../remote/activitypub/renderer/create'; +import renderNote from '../../../remote/activitypub/renderer/note'; +import request from '../../../remote/request'; + +export default async ({ data }) => { + const promisedTo = User.findOne({ _id: data.toId }) as Promise; + const [from, post] = await Promise.all([ + User.findOne({ _id: data.fromId }), + Post.findOne({ _id: data.postId }) + ]); + const note = await renderNote(from, post); + const to = await promisedTo; + const create = renderCreate(note); + + create['@context'] = context; + + return request(from, to.account.inbox, create); +}; diff --git a/src/queue/processors/http/follow.ts b/src/queue/processors/http/follow.ts new file mode 100644 index 0000000000..4cb72828e7 --- /dev/null +++ b/src/queue/processors/http/follow.ts @@ -0,0 +1,69 @@ +import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user'; +import Following from '../../../models/following'; +import FollowingLog from '../../../models/following-log'; +import FollowedLog from '../../../models/followed-log'; +import event from '../../../publishers/stream'; +import notify from '../../../publishers/notify'; +import context from '../../../remote/activitypub/renderer/context'; +import render from '../../../remote/activitypub/renderer/follow'; +import request from '../../../remote/request'; +import Logger from '../../../utils/logger'; + +export default async ({ data }) => { + const { followerId, followeeId } = await Following.findOne({ _id: data.following }); + const [follower, followee] = await Promise.all([ + User.findOne({ _id: followerId }), + User.findOne({ _id: followeeId }) + ]); + + if (isLocalUser(follower) && isRemoteUser(followee)) { + const rendered = render(follower, followee); + rendered['@context'] = context; + + await request(follower, followee.account.inbox, rendered); + } + + try { + await Promise.all([ + // Increment following count + User.update(followerId, { + $inc: { + followingCount: 1 + } + }), + + FollowingLog.insert({ + createdAt: data.following.createdAt, + userId: followerId, + count: follower.followingCount + 1 + }), + + // Increment followers count + User.update({ _id: followeeId }, { + $inc: { + followersCount: 1 + } + }), + + FollowedLog.insert({ + createdAt: data.following.createdAt, + userId: followerId, + count: followee.followersCount + 1 + }), + + // Publish follow event + isLocalUser(follower) && packUser(followee, follower) + .then(packed => event(follower._id, 'follow', packed)), + + isLocalUser(followee) && Promise.all([ + packUser(follower, followee) + .then(packed => event(followee._id, 'followed', packed)), + + // Notify + isLocalUser(followee) && notify(followeeId, followerId, 'follow') + ]) + ]); + } catch (error) { + Logger.error(error.toString()); + } +}; diff --git a/src/queue/processors/http/index.ts b/src/queue/processors/http/index.ts new file mode 100644 index 0000000000..8f9aa717c3 --- /dev/null +++ b/src/queue/processors/http/index.ts @@ -0,0 +1,17 @@ +import deliverPost from './deliver-post'; +import follow from './follow'; +import performActivityPub from './perform-activitypub'; +import processInbox from './process-inbox'; +import reportGitHubFailure from './report-github-failure'; +import unfollow from './unfollow'; + +const handlers = { + deliverPost, + follow, + performActivityPub, + processInbox, + reportGitHubFailure, + unfollow +}; + +export default (job, done) => handlers[job.data.type](job).then(() => done(), done); diff --git a/src/queue/processors/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts new file mode 100644 index 0000000000..7b84400d5c --- /dev/null +++ b/src/queue/processors/http/perform-activitypub.ts @@ -0,0 +1,7 @@ +import User from '../../../models/user'; +import act from '../../../remote/activitypub/act'; +import Resolver from '../../../remote/activitypub/resolver'; + +export default ({ data }) => User.findOne({ _id: data.actor }) + .then(actor => act(new Resolver(), actor, data.outbox)) + .then(Promise.all); diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts new file mode 100644 index 0000000000..de1dbd2f98 --- /dev/null +++ b/src/queue/processors/http/process-inbox.ts @@ -0,0 +1,39 @@ +import { verifySignature } from 'http-signature'; +import parseAcct from '../../../acct/parse'; +import User, { IRemoteUser } from '../../../models/user'; +import act from '../../../remote/activitypub/act'; +import resolvePerson from '../../../remote/activitypub/resolve-person'; +import Resolver from '../../../remote/activitypub/resolver'; + +export default async ({ data }): Promise => { + const keyIdLower = data.signature.keyId.toLowerCase(); + let user; + + if (keyIdLower.startsWith('acct:')) { + const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); + if (host === null) { + throw 'request was made by local user'; + } + + user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser; + } else { + user = await User.findOne({ + host: { $ne: null }, + 'account.publicKey.id': data.signature.keyId + }) as IRemoteUser; + + if (user === null) { + user = await resolvePerson(data.signature.keyId); + } + } + + if (user === null) { + throw 'failed to resolve user'; + } + + if (!verifySignature(data.signature, user.account.publicKey.publicKeyPem)) { + throw 'signature verification failed'; + } + + await Promise.all(await act(new Resolver(), user, data.inbox, true)); +}; diff --git a/src/queue/processors/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts new file mode 100644 index 0000000000..21683ba3c2 --- /dev/null +++ b/src/queue/processors/http/report-github-failure.ts @@ -0,0 +1,24 @@ +import * as request from 'request-promise-native'; +import User from '../../../models/user'; +const createPost = require('../../../server/api/endpoints/posts/create'); + +export default async ({ data }) => { + const asyncBot = User.findOne({ _id: data.userId }); + + // Fetch parent status + const parentStatuses = await request({ + url: `${data.parentUrl}/statuses`, + headers: { + 'User-Agent': 'misskey' + }, + json: true + }); + + const parentState = parentStatuses[0].state; + const stillFailed = parentState == 'failure' || parentState == 'error'; + const text = stillFailed ? + `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` : + `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`; + + createPost({ text }, await asyncBot); +}; diff --git a/src/queue/processors/http/unfollow.ts b/src/queue/processors/http/unfollow.ts new file mode 100644 index 0000000000..801a3612a7 --- /dev/null +++ b/src/queue/processors/http/unfollow.ts @@ -0,0 +1,63 @@ +import FollowedLog from '../../../models/followed-log'; +import Following from '../../../models/following'; +import FollowingLog from '../../../models/following-log'; +import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user'; +import stream from '../../../publishers/stream'; +import renderFollow from '../../../remote/activitypub/renderer/follow'; +import renderUndo from '../../../remote/activitypub/renderer/undo'; +import context from '../../../remote/activitypub/renderer/context'; +import request from '../../../remote/request'; +import Logger from '../../../utils/logger'; + +export default async ({ data }) => { + const following = await Following.findOne({ _id: data.id }); + if (following === null) { + return; + } + + const [follower, followee] = await Promise.all([ + User.findOne({ _id: following.followerId }), + User.findOne({ _id: following.followeeId }) + ]); + + if (isLocalUser(follower) && isRemoteUser(followee)) { + const undo = renderUndo(renderFollow(follower, followee)); + undo['@context'] = context; + + await request(follower, followee.account.inbox, undo); + } + + try { + await Promise.all([ + // Delete following + Following.findOneAndDelete({ _id: data.id }), + + // Decrement following count + User.update({ _id: follower._id }, { $inc: { followingCount: -1 } }), + FollowingLog.insert({ + createdAt: new Date(), + userId: follower._id, + count: follower.followingCount - 1 + }), + + // Decrement followers count + User.update({ _id: followee._id }, { $inc: { followersCount: -1 } }), + FollowedLog.insert({ + createdAt: new Date(), + userId: followee._id, + count: followee.followersCount - 1 + }) + ]); + + if (isLocalUser(follower)) { + return; + } + + const promisedPackedUser = packUser(followee, follower); + + // Publish follow event + stream(follower._id, 'unfollow', promisedPackedUser); + } catch (error) { + Logger.error(error.toString()); + } +}; -- cgit v1.2.3-freya From e330ac1934516807757afe2d2760fa21b27006e6 Mon Sep 17 00:00:00 2001 From: Akihiko Odaki Date: Thu, 5 Apr 2018 01:04:44 +0900 Subject: Let unhandled rejection handler handle rejections in jobs --- src/queue/processors/db/delete-post-dependents.ts | 4 +- src/queue/processors/db/index.ts | 2 +- src/queue/processors/http/deliver-post.ts | 28 +++++--- src/queue/processors/http/follow.ts | 79 +++++++++++----------- src/queue/processors/http/index.ts | 2 +- src/queue/processors/http/perform-activitypub.ts | 5 +- src/queue/processors/http/process-inbox.ts | 51 +++++++------- src/queue/processors/http/report-github-failure.ts | 39 ++++++----- src/queue/processors/http/unfollow.ts | 31 +++++---- 9 files changed, 132 insertions(+), 109 deletions(-) (limited to 'src/queue') diff --git a/src/queue/processors/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts index 6de21eb053..fb6617e952 100644 --- a/src/queue/processors/db/delete-post-dependents.ts +++ b/src/queue/processors/db/delete-post-dependents.ts @@ -5,7 +5,7 @@ import PostReaction from '../../../models/post-reaction'; import PostWatching from '../../../models/post-watching'; import Post from '../../../models/post'; -export default async ({ data }) => Promise.all([ +export default ({ data }, done) => Promise.all([ Favorite.remove({ postId: data._id }), Notification.remove({ postId: data._id }), PollVote.remove({ postId: data._id }), @@ -19,4 +19,4 @@ export default async ({ data }) => Promise.all([ }), Post.remove({ repostId: data._id }) ])) -]); +]).then(() => done(), done); diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts index 75838c099b..468ec442ac 100644 --- a/src/queue/processors/db/index.ts +++ b/src/queue/processors/db/index.ts @@ -4,4 +4,4 @@ const handlers = { deletePostDependents }; -export default (job, done) => handlers[job.data.type](job).then(() => done(), done); +export default (job, done) => handlers[job.data.type](job, done); diff --git a/src/queue/processors/http/deliver-post.ts b/src/queue/processors/http/deliver-post.ts index e743fc5f68..8107c8bf74 100644 --- a/src/queue/processors/http/deliver-post.ts +++ b/src/queue/processors/http/deliver-post.ts @@ -5,17 +5,23 @@ import renderCreate from '../../../remote/activitypub/renderer/create'; import renderNote from '../../../remote/activitypub/renderer/note'; import request from '../../../remote/request'; -export default async ({ data }) => { - const promisedTo = User.findOne({ _id: data.toId }) as Promise; - const [from, post] = await Promise.all([ - User.findOne({ _id: data.fromId }), - Post.findOne({ _id: data.postId }) - ]); - const note = await renderNote(from, post); - const to = await promisedTo; - const create = renderCreate(note); +export default async ({ data }, done) => { + try { + const promisedTo = User.findOne({ _id: data.toId }) as Promise; + const [from, post] = await Promise.all([ + User.findOne({ _id: data.fromId }), + Post.findOne({ _id: data.postId }) + ]); + const note = await renderNote(from, post); + const to = await promisedTo; + const create = renderCreate(note); - create['@context'] = context; + create['@context'] = context; - return request(from, to.account.inbox, create); + await request(from, to.account.inbox, create); + } catch (error) { + done(error); + } + + done(); }; diff --git a/src/queue/processors/http/follow.ts b/src/queue/processors/http/follow.ts index 4cb72828e7..ba1cc31186 100644 --- a/src/queue/processors/http/follow.ts +++ b/src/queue/processors/http/follow.ts @@ -7,10 +7,8 @@ import notify from '../../../publishers/notify'; import context from '../../../remote/activitypub/renderer/context'; import render from '../../../remote/activitypub/renderer/follow'; import request from '../../../remote/request'; -import Logger from '../../../utils/logger'; -export default async ({ data }) => { - const { followerId, followeeId } = await Following.findOne({ _id: data.following }); +export default ({ data }, done) => Following.findOne({ _id: data.following }).then(async ({ followerId, followeeId }) => { const [follower, followee] = await Promise.all([ User.findOne({ _id: followerId }), User.findOne({ _id: followeeId }) @@ -23,47 +21,46 @@ export default async ({ data }) => { await request(follower, followee.account.inbox, rendered); } - try { - await Promise.all([ - // Increment following count - User.update(followerId, { - $inc: { - followingCount: 1 - } - }), + return [follower, followee]; +}).then(([follower, followee]) => Promise.all([ + // Increment following count + User.update(follower._id, { + $inc: { + followingCount: 1 + } + }), - FollowingLog.insert({ - createdAt: data.following.createdAt, - userId: followerId, - count: follower.followingCount + 1 - }), + FollowingLog.insert({ + createdAt: data.following.createdAt, + userId: follower._id, + count: follower.followingCount + 1 + }), - // Increment followers count - User.update({ _id: followeeId }, { - $inc: { - followersCount: 1 - } - }), + // Increment followers count + User.update({ _id: followee._id }, { + $inc: { + followersCount: 1 + } + }), - FollowedLog.insert({ - createdAt: data.following.createdAt, - userId: followerId, - count: followee.followersCount + 1 - }), + FollowedLog.insert({ + createdAt: data.following.createdAt, + userId: follower._id, + count: followee.followersCount + 1 + }), - // Publish follow event - isLocalUser(follower) && packUser(followee, follower) - .then(packed => event(follower._id, 'follow', packed)), + // Publish follow event + isLocalUser(follower) && packUser(followee, follower) + .then(packed => event(follower._id, 'follow', packed)), - isLocalUser(followee) && Promise.all([ - packUser(follower, followee) - .then(packed => event(followee._id, 'followed', packed)), + isLocalUser(followee) && Promise.all([ + packUser(follower, followee) + .then(packed => event(followee._id, 'followed', packed)), - // Notify - isLocalUser(followee) && notify(followeeId, followerId, 'follow') - ]) - ]); - } catch (error) { - Logger.error(error.toString()); - } -}; + // Notify + isLocalUser(followee) && notify(followee._id, follower._id, 'follow') + ]) +]).then(() => done(), error => { + done(); + throw error; +}), done); diff --git a/src/queue/processors/http/index.ts b/src/queue/processors/http/index.ts index 8f9aa717c3..0ea79305c6 100644 --- a/src/queue/processors/http/index.ts +++ b/src/queue/processors/http/index.ts @@ -14,4 +14,4 @@ const handlers = { unfollow }; -export default (job, done) => handlers[job.data.type](job).then(() => done(), done); +export default (job, done) => handlers[job.data.type](job, done); diff --git a/src/queue/processors/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts index 7b84400d5c..ae70c0f0be 100644 --- a/src/queue/processors/http/perform-activitypub.ts +++ b/src/queue/processors/http/perform-activitypub.ts @@ -2,6 +2,7 @@ import User from '../../../models/user'; import act from '../../../remote/activitypub/act'; import Resolver from '../../../remote/activitypub/resolver'; -export default ({ data }) => User.findOne({ _id: data.actor }) +export default ({ data }, done) => User.findOne({ _id: data.actor }) .then(actor => act(new Resolver(), actor, data.outbox)) - .then(Promise.all); + .then(Promise.all) + .then(() => done(), done); diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts index de1dbd2f98..88fbb97377 100644 --- a/src/queue/processors/http/process-inbox.ts +++ b/src/queue/processors/http/process-inbox.ts @@ -5,35 +5,40 @@ import act from '../../../remote/activitypub/act'; import resolvePerson from '../../../remote/activitypub/resolve-person'; import Resolver from '../../../remote/activitypub/resolver'; -export default async ({ data }): Promise => { - const keyIdLower = data.signature.keyId.toLowerCase(); - let user; +export default async ({ data }, done) => { + try { + const keyIdLower = data.signature.keyId.toLowerCase(); + let user; - if (keyIdLower.startsWith('acct:')) { - const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); - if (host === null) { - throw 'request was made by local user'; - } + if (keyIdLower.startsWith('acct:')) { + const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); + if (host === null) { + done(); + return; + } - user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser; - } else { - user = await User.findOne({ - host: { $ne: null }, - 'account.publicKey.id': data.signature.keyId - }) as IRemoteUser; + user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser; + } else { + user = await User.findOne({ + host: { $ne: null }, + 'account.publicKey.id': data.signature.keyId + }) as IRemoteUser; - if (user === null) { - user = await resolvePerson(data.signature.keyId); + if (user === null) { + user = await resolvePerson(data.signature.keyId); + } } - } - if (user === null) { - throw 'failed to resolve user'; - } + if (user === null || !verifySignature(data.signature, user.account.publicKey.publicKeyPem)) { + done(); + return; + } - if (!verifySignature(data.signature, user.account.publicKey.publicKeyPem)) { - throw 'signature verification failed'; + await Promise.all(await act(new Resolver(), user, data.inbox, true)); + } catch (error) { + done(error); + return; } - await Promise.all(await act(new Resolver(), user, data.inbox, true)); + done(); }; diff --git a/src/queue/processors/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts index 21683ba3c2..af9659bdac 100644 --- a/src/queue/processors/http/report-github-failure.ts +++ b/src/queue/processors/http/report-github-failure.ts @@ -2,23 +2,30 @@ import * as request from 'request-promise-native'; import User from '../../../models/user'; const createPost = require('../../../server/api/endpoints/posts/create'); -export default async ({ data }) => { - const asyncBot = User.findOne({ _id: data.userId }); +export default async ({ data }, done) => { + try { + const asyncBot = User.findOne({ _id: data.userId }); - // Fetch parent status - const parentStatuses = await request({ - url: `${data.parentUrl}/statuses`, - headers: { - 'User-Agent': 'misskey' - }, - json: true - }); + // Fetch parent status + const parentStatuses = await request({ + url: `${data.parentUrl}/statuses`, + headers: { + 'User-Agent': 'misskey' + }, + json: true + }); - const parentState = parentStatuses[0].state; - const stillFailed = parentState == 'failure' || parentState == 'error'; - const text = stillFailed ? - `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` : - `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`; + const parentState = parentStatuses[0].state; + const stillFailed = parentState == 'failure' || parentState == 'error'; + const text = stillFailed ? + `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` : + `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`; - createPost({ text }, await asyncBot); + createPost({ text }, await asyncBot); + } catch (error) { + done(error); + return; + } + + done(); }; diff --git a/src/queue/processors/http/unfollow.ts b/src/queue/processors/http/unfollow.ts index 801a3612a7..dc50e946c9 100644 --- a/src/queue/processors/http/unfollow.ts +++ b/src/queue/processors/http/unfollow.ts @@ -7,24 +7,31 @@ import renderFollow from '../../../remote/activitypub/renderer/follow'; import renderUndo from '../../../remote/activitypub/renderer/undo'; import context from '../../../remote/activitypub/renderer/context'; import request from '../../../remote/request'; -import Logger from '../../../utils/logger'; -export default async ({ data }) => { +export default async ({ data }, done) => { const following = await Following.findOne({ _id: data.id }); if (following === null) { + done(); return; } - const [follower, followee] = await Promise.all([ - User.findOne({ _id: following.followerId }), - User.findOne({ _id: following.followeeId }) - ]); + let follower, followee; - if (isLocalUser(follower) && isRemoteUser(followee)) { - const undo = renderUndo(renderFollow(follower, followee)); - undo['@context'] = context; + try { + [follower, followee] = await Promise.all([ + User.findOne({ _id: following.followerId }), + User.findOne({ _id: following.followeeId }) + ]); - await request(follower, followee.account.inbox, undo); + if (isLocalUser(follower) && isRemoteUser(followee)) { + const undo = renderUndo(renderFollow(follower, followee)); + undo['@context'] = context; + + await request(follower, followee.account.inbox, undo); + } + } catch (error) { + done(error); + return; } try { @@ -57,7 +64,7 @@ export default async ({ data }) => { // Publish follow event stream(follower._id, 'unfollow', promisedPackedUser); - } catch (error) { - Logger.error(error.toString()); + } finally { + done(); } }; -- cgit v1.2.3-freya From a5715ecc1b73d3e3a950c392fa3a466dee810248 Mon Sep 17 00:00:00 2001 From: Akihiko Odaki Date: Thu, 5 Apr 2018 01:07:55 +0900 Subject: Do not declare two variables in a statement --- src/queue/processors/http/unfollow.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/queue') diff --git a/src/queue/processors/http/unfollow.ts b/src/queue/processors/http/unfollow.ts index dc50e946c9..d62eb280dc 100644 --- a/src/queue/processors/http/unfollow.ts +++ b/src/queue/processors/http/unfollow.ts @@ -15,7 +15,8 @@ export default async ({ data }, done) => { return; } - let follower, followee; + let follower; + let followee; try { [follower, followee] = await Promise.all([ -- cgit v1.2.3-freya From 168b0730b46fd283b900b553dd2eede2aa4c7dac Mon Sep 17 00:00:00 2001 From: Akihiko Odaki Date: Thu, 5 Apr 2018 01:24:39 +0900 Subject: Implement Mention object --- src/post/create.ts | 14 ++------------ src/queue/processors/http/process-inbox.ts | 2 +- src/remote/activitypub/create.ts | 12 +++++++++++- src/remote/activitypub/resolve-person.ts | 5 ++--- src/remote/resolve-user.ts | 3 ++- src/server/api/endpoints/posts/create.ts | 10 +++++++--- 6 files changed, 25 insertions(+), 21 deletions(-) (limited to 'src/queue') diff --git a/src/post/create.ts b/src/post/create.ts index ecea37382d..4ad1503e0f 100644 --- a/src/post/create.ts +++ b/src/post/create.ts @@ -1,8 +1,6 @@ -import parseAcct from '../acct/parse'; import Post from '../models/post'; -import User from '../models/user'; -export default async (post, reply, repost, atMentions) => { +export default async (post, reply, repost, mentions) => { post.mentions = []; function addMention(mentionee) { @@ -36,15 +34,7 @@ export default async (post, reply, repost, atMentions) => { post._repost = null; } - await Promise.all(atMentions.map(async mention => { - // Fetch mentioned user - // SELECT _id - const { _id } = await User - .findOne(parseAcct(mention), { _id: true }); - - // Add mention - addMention(_id); - })); + await Promise.all(mentions.map(({ _id }) => addMention(_id))); return Post.insert(post); }; diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts index 88fbb97377..7eeaa19f8a 100644 --- a/src/queue/processors/http/process-inbox.ts +++ b/src/queue/processors/http/process-inbox.ts @@ -25,7 +25,7 @@ export default async ({ data }, done) => { }) as IRemoteUser; if (user === null) { - user = await resolvePerson(data.signature.keyId); + user = await resolvePerson(new Resolver(), data.signature.keyId); } } diff --git a/src/remote/activitypub/create.ts b/src/remote/activitypub/create.ts index 97c72860fd..710d56fd3d 100644 --- a/src/remote/activitypub/create.ts +++ b/src/remote/activitypub/create.ts @@ -7,6 +7,7 @@ import { IRemoteUser } from '../../models/user'; import uploadFromUrl from '../../drive/upload-from-url'; import createPost from '../../post/create'; import distributePost from '../../post/distribute'; +import resolvePerson from './resolve-person'; import Resolver from './resolver'; const createDOMPurify = require('dompurify'); @@ -53,6 +54,15 @@ class Creator { .map(({ object }) => object.$id); const { window } = new JSDOM(note.content); + const mentions = []; + + for (const { href, type } of note.tags) { + switch (type) { + case 'Mention': + mentions.push(resolvePerson(resolver, href)); + break; + } + } const inserted = await createPost({ channelId: undefined, @@ -69,7 +79,7 @@ class Creator { viaMobile: false, geo: undefined, uri: note.id - }, null, null, []); + }, null, null, await Promise.all(mentions)); const promises = []; diff --git a/src/remote/activitypub/resolve-person.ts b/src/remote/activitypub/resolve-person.ts index 2cf3ad32d8..7ed01e3222 100644 --- a/src/remote/activitypub/resolve-person.ts +++ b/src/remote/activitypub/resolve-person.ts @@ -4,14 +4,13 @@ import User, { validateUsername, isValidName, isValidDescription } from '../../m import { createHttp } from '../../queue'; import webFinger from '../webfinger'; import create from './create'; -import Resolver from './resolver'; async function isCollection(collection) { return ['Collection', 'OrderedCollection'].includes(collection.type); } -export default async (value, verifier?: string) => { - const { resolver, object } = await new Resolver().resolveOne(value); +export default async (parentResolver, value, verifier?: string) => { + const { resolver, object } = parentResolver.resolveOne(value); if ( object === null || diff --git a/src/remote/resolve-user.ts b/src/remote/resolve-user.ts index 48219e8cb3..097ed66738 100644 --- a/src/remote/resolve-user.ts +++ b/src/remote/resolve-user.ts @@ -1,6 +1,7 @@ import { toUnicode, toASCII } from 'punycode'; import User from '../models/user'; import resolvePerson from './activitypub/resolve-person'; +import Resolver from './activitypub/resolver'; import webFinger from './webfinger'; export default async (username, host, option) => { @@ -19,7 +20,7 @@ export default async (username, host, option) => { throw new Error(); } - user = await resolvePerson(self.href, acctLower); + user = await resolvePerson(new Resolver(), self.href, acctLower); } return user; diff --git a/src/server/api/endpoints/posts/create.ts b/src/server/api/endpoints/posts/create.ts index 03af7ee763..47897626f1 100644 --- a/src/server/api/endpoints/posts/create.ts +++ b/src/server/api/endpoints/posts/create.ts @@ -3,12 +3,13 @@ */ import $ from 'cafy'; import deepEqual = require('deep-equal'); +import parseAcct from '../../../../acct/parse'; import renderAcct from '../../../../acct/render'; import config from '../../../../config'; import html from '../../../../text/html'; import parse from '../../../../text/parse'; import Post, { IPost, isValidText, isValidCw } from '../../../../models/post'; -import { ILocalUser } from '../../../../models/user'; +import User, { ILocalUser } from '../../../../models/user'; import Channel, { IChannel } from '../../../../models/channel'; import DriveFile from '../../../../models/drive-file'; import create from '../../../../post/create'; @@ -267,7 +268,10 @@ module.exports = (params, user: ILocalUser, app) => new Promise(async (res, rej) .filter(t => t.type == 'mention') .map(renderAcct) // Drop dupulicates - .filter((v, i, s) => s.indexOf(v) == i); + .filter((v, i, s) => s.indexOf(v) == i) + // Fetch mentioned user + // SELECT _id + .map(mention => User.findOne(parseAcct(mention), { _id: true })); } // 投稿を作成 @@ -286,7 +290,7 @@ module.exports = (params, user: ILocalUser, app) => new Promise(async (res, rej) viaMobile: viaMobile, visibility, geo - }, reply, repost, atMentions); + }, reply, repost, await Promise.all(atMentions)); const postObj = await distribute(user, post.mentions, post); -- cgit v1.2.3-freya