diff options
Diffstat (limited to 'src/queue/processors')
| -rw-r--r-- | src/queue/processors/db/delete-post-dependents.ts | 22 | ||||
| -rw-r--r-- | src/queue/processors/db/index.ts | 7 | ||||
| -rw-r--r-- | src/queue/processors/http/deliver-post.ts | 27 | ||||
| -rw-r--r-- | src/queue/processors/http/deliver.ts | 19 | ||||
| -rw-r--r-- | src/queue/processors/http/follow.ts | 66 | ||||
| -rw-r--r-- | src/queue/processors/http/index.ts | 25 | ||||
| -rw-r--r-- | src/queue/processors/http/perform-activitypub.ts | 8 | ||||
| -rw-r--r-- | src/queue/processors/http/process-inbox.ts | 74 | ||||
| -rw-r--r-- | src/queue/processors/http/report-github-failure.ts | 41 | ||||
| -rw-r--r-- | src/queue/processors/http/unfollow.ts | 71 |
10 files changed, 98 insertions, 262 deletions
diff --git a/src/queue/processors/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts deleted file mode 100644 index fb6617e952..0000000000 --- a/src/queue/processors/db/delete-post-dependents.ts +++ /dev/null @@ -1,22 +0,0 @@ -import Favorite from '../../../models/favorite'; -import Notification from '../../../models/notification'; -import PollVote from '../../../models/poll-vote'; -import PostReaction from '../../../models/post-reaction'; -import PostWatching from '../../../models/post-watching'; -import Post from '../../../models/post'; - -export default ({ data }, done) => Promise.all([ - Favorite.remove({ postId: data._id }), - Notification.remove({ postId: data._id }), - PollVote.remove({ postId: data._id }), - PostReaction.remove({ postId: data._id }), - PostWatching.remove({ postId: data._id }), - Post.find({ repostId: data._id }).then(reposts => Promise.all([ - Notification.remove({ - postId: { - $in: reposts.map(({ _id }) => _id) - } - }), - Post.remove({ repostId: data._id }) - ])) -]).then(() => done(), done); diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts deleted file mode 100644 index 468ec442ac..0000000000 --- a/src/queue/processors/db/index.ts +++ /dev/null @@ -1,7 +0,0 @@ -import deletePostDependents from './delete-post-dependents'; - -const handlers = { - deletePostDependents -}; - -export default (job, done) => handlers[job.data.type](job, done); diff --git a/src/queue/processors/http/deliver-post.ts b/src/queue/processors/http/deliver-post.ts deleted file mode 100644 index 8107c8bf74..0000000000 --- a/src/queue/processors/http/deliver-post.ts +++ /dev/null @@ -1,27 +0,0 @@ -import Post from '../../../models/post'; -import User, { IRemoteUser } from '../../../models/user'; -import context from '../../../remote/activitypub/renderer/context'; -import renderCreate from '../../../remote/activitypub/renderer/create'; -import renderNote from '../../../remote/activitypub/renderer/note'; -import request from '../../../remote/request'; - -export default async ({ data }, done) => { - try { - const promisedTo = User.findOne({ _id: data.toId }) as Promise<IRemoteUser>; - const [from, post] = await Promise.all([ - User.findOne({ _id: data.fromId }), - Post.findOne({ _id: data.postId }) - ]); - const note = await renderNote(from, post); - const to = await promisedTo; - const create = renderCreate(note); - - create['@context'] = context; - - await request(from, to.account.inbox, create); - } catch (error) { - done(error); - } - - done(); -}; diff --git a/src/queue/processors/http/deliver.ts b/src/queue/processors/http/deliver.ts new file mode 100644 index 0000000000..422e355b5f --- /dev/null +++ b/src/queue/processors/http/deliver.ts @@ -0,0 +1,19 @@ +import * as kue from 'kue'; + +import request from '../../../remote/request'; + +export default async (job: kue.Job, done): Promise<void> => { + try { + await request(job.data.user, job.data.to, job.data.content); + done(); + } catch (res) { + if (res.statusCode >= 400 && res.statusCode < 500) { + // HTTPステータスコード4xxはクライアントエラーであり、それはつまり + // 何回再送しても成功することはないということなのでエラーにはしないでおく + done(); + } else { + console.warn(`deliver failed: ${res.statusMessage}`); + done(new Error(res.statusMessage)); + } + } +}; diff --git a/src/queue/processors/http/follow.ts b/src/queue/processors/http/follow.ts deleted file mode 100644 index ba1cc31186..0000000000 --- a/src/queue/processors/http/follow.ts +++ /dev/null @@ -1,66 +0,0 @@ -import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user'; -import Following from '../../../models/following'; -import FollowingLog from '../../../models/following-log'; -import FollowedLog from '../../../models/followed-log'; -import event from '../../../publishers/stream'; -import notify from '../../../publishers/notify'; -import context from '../../../remote/activitypub/renderer/context'; -import render from '../../../remote/activitypub/renderer/follow'; -import request from '../../../remote/request'; - -export default ({ data }, done) => Following.findOne({ _id: data.following }).then(async ({ followerId, followeeId }) => { - const [follower, followee] = await Promise.all([ - User.findOne({ _id: followerId }), - User.findOne({ _id: followeeId }) - ]); - - if (isLocalUser(follower) && isRemoteUser(followee)) { - const rendered = render(follower, followee); - rendered['@context'] = context; - - await request(follower, followee.account.inbox, rendered); - } - - return [follower, followee]; -}).then(([follower, followee]) => Promise.all([ - // Increment following count - User.update(follower._id, { - $inc: { - followingCount: 1 - } - }), - - FollowingLog.insert({ - createdAt: data.following.createdAt, - userId: follower._id, - count: follower.followingCount + 1 - }), - - // Increment followers count - User.update({ _id: followee._id }, { - $inc: { - followersCount: 1 - } - }), - - FollowedLog.insert({ - createdAt: data.following.createdAt, - userId: follower._id, - count: followee.followersCount + 1 - }), - - // Publish follow event - isLocalUser(follower) && packUser(followee, follower) - .then(packed => event(follower._id, 'follow', packed)), - - isLocalUser(followee) && Promise.all([ - packUser(follower, followee) - .then(packed => event(followee._id, 'followed', packed)), - - // Notify - isLocalUser(followee) && notify(followee._id, follower._id, 'follow') - ]) -]).then(() => done(), error => { - done(); - throw error; -}), done); diff --git a/src/queue/processors/http/index.ts b/src/queue/processors/http/index.ts index 0ea79305c6..3dc2595374 100644 --- a/src/queue/processors/http/index.ts +++ b/src/queue/processors/http/index.ts @@ -1,17 +1,20 @@ -import deliverPost from './deliver-post'; -import follow from './follow'; -import performActivityPub from './perform-activitypub'; +import deliver from './deliver'; import processInbox from './process-inbox'; import reportGitHubFailure from './report-github-failure'; -import unfollow from './unfollow'; const handlers = { - deliverPost, - follow, - performActivityPub, - processInbox, - reportGitHubFailure, - unfollow + deliver, + processInbox, + reportGitHubFailure }; -export default (job, done) => handlers[job.data.type](job, done); +export default (job, done) => { + const handler = handlers[job.data.type]; + + if (handler) { + handler(job, done); + } else { + console.error(`Unknown job: ${job.data.type}`); + done(); + } +}; diff --git a/src/queue/processors/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts deleted file mode 100644 index ae70c0f0be..0000000000 --- a/src/queue/processors/http/perform-activitypub.ts +++ /dev/null @@ -1,8 +0,0 @@ -import User from '../../../models/user'; -import act from '../../../remote/activitypub/act'; -import Resolver from '../../../remote/activitypub/resolver'; - -export default ({ data }, done) => User.findOne({ _id: data.actor }) - .then(actor => act(new Resolver(), actor, data.outbox)) - .then(Promise.all) - .then(() => done(), done); diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts index 7eeaa19f8a..eb4b62d37f 100644 --- a/src/queue/processors/http/process-inbox.ts +++ b/src/queue/processors/http/process-inbox.ts @@ -1,44 +1,66 @@ +import * as kue from 'kue'; +import * as debug from 'debug'; + import { verifySignature } from 'http-signature'; import parseAcct from '../../../acct/parse'; import User, { IRemoteUser } from '../../../models/user'; import act from '../../../remote/activitypub/act'; import resolvePerson from '../../../remote/activitypub/resolve-person'; -import Resolver from '../../../remote/activitypub/resolver'; -export default async ({ data }, done) => { - try { - const keyIdLower = data.signature.keyId.toLowerCase(); - let user; +const log = debug('misskey:queue:inbox'); - if (keyIdLower.startsWith('acct:')) { - const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); - if (host === null) { - done(); - return; - } +// ユーザーのinboxにアクティビティが届いた時の処理 +export default async (job: kue.Job, done): Promise<void> => { + const signature = job.data.signature; + const activity = job.data.activity; - user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser; - } else { - user = await User.findOne({ - host: { $ne: null }, - 'account.publicKey.id': data.signature.keyId - }) as IRemoteUser; + //#region Log + const info = Object.assign({}, activity); + delete info['@context']; + delete info['signature']; + log(info); + //#endregion - if (user === null) { - user = await resolvePerson(new Resolver(), data.signature.keyId); - } - } + const keyIdLower = signature.keyId.toLowerCase(); + let user; - if (user === null || !verifySignature(data.signature, user.account.publicKey.publicKeyPem)) { + if (keyIdLower.startsWith('acct:')) { + const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); + if (host === null) { + console.warn(`request was made by local user: @${username}`); done(); return; } - await Promise.all(await act(new Resolver(), user, data.inbox, true)); - } catch (error) { - done(error); + user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser; + } else { + user = await User.findOne({ + host: { $ne: null }, + 'account.publicKey.id': signature.keyId + }) as IRemoteUser; + + // アクティビティを送信してきたユーザーがまだMisskeyサーバーに登録されていなかったら登録する + if (user === null) { + user = await resolvePerson(signature.keyId); + } + } + + if (user === null) { + done(new Error('failed to resolve user')); + return; + } + + if (!verifySignature(signature, user.account.publicKey.publicKeyPem)) { + console.warn('signature verification failed'); + done(); return; } - done(); + // アクティビティを処理 + try { + await act(user, activity); + done(); + } catch (e) { + done(e); + } }; diff --git a/src/queue/processors/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts index af9659bdac..1e0b51f89f 100644 --- a/src/queue/processors/http/report-github-failure.ts +++ b/src/queue/processors/http/report-github-failure.ts @@ -1,31 +1,24 @@ import * as request from 'request-promise-native'; import User from '../../../models/user'; -const createPost = require('../../../server/api/endpoints/posts/create'); +import createPost from '../../../services/post/create'; -export default async ({ data }, done) => { - try { - const asyncBot = User.findOne({ _id: data.userId }); +export default async ({ data }) => { + const asyncBot = User.findOne({ _id: data.userId }); - // Fetch parent status - const parentStatuses = await request({ - url: `${data.parentUrl}/statuses`, - headers: { - 'User-Agent': 'misskey' - }, - json: true - }); + // Fetch parent status + const parentStatuses = await request({ + url: `${data.parentUrl}/statuses`, + headers: { + 'User-Agent': 'misskey' + }, + json: true + }); - const parentState = parentStatuses[0].state; - const stillFailed = parentState == 'failure' || parentState == 'error'; - const text = stillFailed ? - `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` : - `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`; + const parentState = parentStatuses[0].state; + const stillFailed = parentState == 'failure' || parentState == 'error'; + const text = stillFailed ? + `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` : + `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`; - createPost({ text }, await asyncBot); - } catch (error) { - done(error); - return; - } - - done(); + createPost(await asyncBot, { text }); }; diff --git a/src/queue/processors/http/unfollow.ts b/src/queue/processors/http/unfollow.ts deleted file mode 100644 index d62eb280dc..0000000000 --- a/src/queue/processors/http/unfollow.ts +++ /dev/null @@ -1,71 +0,0 @@ -import FollowedLog from '../../../models/followed-log'; -import Following from '../../../models/following'; -import FollowingLog from '../../../models/following-log'; -import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user'; -import stream from '../../../publishers/stream'; -import renderFollow from '../../../remote/activitypub/renderer/follow'; -import renderUndo from '../../../remote/activitypub/renderer/undo'; -import context from '../../../remote/activitypub/renderer/context'; -import request from '../../../remote/request'; - -export default async ({ data }, done) => { - const following = await Following.findOne({ _id: data.id }); - if (following === null) { - done(); - return; - } - - let follower; - let followee; - - try { - [follower, followee] = await Promise.all([ - User.findOne({ _id: following.followerId }), - User.findOne({ _id: following.followeeId }) - ]); - - if (isLocalUser(follower) && isRemoteUser(followee)) { - const undo = renderUndo(renderFollow(follower, followee)); - undo['@context'] = context; - - await request(follower, followee.account.inbox, undo); - } - } catch (error) { - done(error); - return; - } - - try { - await Promise.all([ - // Delete following - Following.findOneAndDelete({ _id: data.id }), - - // Decrement following count - User.update({ _id: follower._id }, { $inc: { followingCount: -1 } }), - FollowingLog.insert({ - createdAt: new Date(), - userId: follower._id, - count: follower.followingCount - 1 - }), - - // Decrement followers count - User.update({ _id: followee._id }, { $inc: { followersCount: -1 } }), - FollowedLog.insert({ - createdAt: new Date(), - userId: followee._id, - count: followee.followersCount - 1 - }) - ]); - - if (isLocalUser(follower)) { - return; - } - - const promisedPackedUser = packUser(followee, follower); - - // Publish follow event - stream(follower._id, 'unfollow', promisedPackedUser); - } finally { - done(); - } -}; |