From e8b42d7e1668679e6a6ee0a7aea1e2ff7f37005b Mon Sep 17 00:00:00 2001 From: syuilo Date: Wed, 4 Apr 2018 23:12:35 +0900 Subject: wip --- src/post/create.ts | 107 +++++++++++++- src/processor/db/delete-post-dependents.ts | 22 --- src/processor/db/index.ts | 7 - src/processor/http/deliver-post.ts | 93 ------------ src/processor/http/follow.ts | 69 --------- src/processor/http/index.ts | 17 --- src/processor/http/perform-activitypub.ts | 7 - src/processor/http/process-inbox.ts | 39 ----- src/processor/http/report-github-failure.ts | 24 ---- src/processor/http/unfollow.ts | 56 -------- src/processor/index.ts | 18 --- src/queue.ts | 10 -- src/queue/index.ts | 37 +++++ src/queue/processors/db/delete-post-dependents.ts | 22 +++ src/queue/processors/db/index.ts | 7 + src/queue/processors/http/deliver.ts | 17 +++ src/queue/processors/http/follow.ts | 69 +++++++++ src/queue/processors/http/index.ts | 17 +++ src/queue/processors/http/perform-activitypub.ts | 7 + src/queue/processors/http/process-inbox.ts | 55 +++++++ src/queue/processors/http/report-github-failure.ts | 24 ++++ src/queue/processors/http/unfollow.ts | 56 ++++++++ src/queue/processors/index.ts | 18 +++ src/remote/activitypub/act/create.ts | 90 +++++++++++- src/remote/activitypub/act/index.ts | 54 ++++--- src/remote/activitypub/create.ts | 158 --------------------- src/remote/activitypub/resolver.ts | 75 +++++----- src/server/activitypub/inbox.ts | 2 +- 28 files changed, 577 insertions(+), 600 deletions(-) delete mode 100644 src/processor/db/delete-post-dependents.ts delete mode 100644 src/processor/db/index.ts delete mode 100644 src/processor/http/deliver-post.ts delete mode 100644 src/processor/http/follow.ts delete mode 100644 src/processor/http/index.ts delete mode 100644 src/processor/http/perform-activitypub.ts delete mode 100644 src/processor/http/process-inbox.ts delete mode 100644 src/processor/http/report-github-failure.ts delete mode 100644 src/processor/http/unfollow.ts delete mode 100644 src/processor/index.ts delete mode 100644 src/queue.ts create mode 100644 src/queue/index.ts create mode 100644 src/queue/processors/db/delete-post-dependents.ts create mode 100644 src/queue/processors/db/index.ts create mode 100644 src/queue/processors/http/deliver.ts create mode 100644 src/queue/processors/http/follow.ts create mode 100644 src/queue/processors/http/index.ts create mode 100644 src/queue/processors/http/perform-activitypub.ts create mode 100644 src/queue/processors/http/process-inbox.ts create mode 100644 src/queue/processors/http/report-github-failure.ts create mode 100644 src/queue/processors/http/unfollow.ts create mode 100644 src/queue/processors/index.ts delete mode 100644 src/remote/activitypub/create.ts diff --git a/src/post/create.ts b/src/post/create.ts index ecea37382d..f78bbe7521 100644 --- a/src/post/create.ts +++ b/src/post/create.ts @@ -1,8 +1,14 @@ import parseAcct from '../acct/parse'; -import Post from '../models/post'; -import User from '../models/user'; +import Post, { pack } from '../models/post'; +import User, { isLocalUser, isRemoteUser, IUser } from '../models/user'; +import stream from '../publishers/stream'; +import Following from '../models/following'; +import { createHttp } from '../queue'; +import renderNote from '../remote/activitypub/renderer/note'; +import renderCreate from '../remote/activitypub/renderer/create'; +import context from '../remote/activitypub/renderer/context'; -export default async (post, reply, repost, atMentions) => { +export default async (user: IUser, post, reply, repost, atMentions) => { post.mentions = []; function addMention(mentionee) { @@ -46,5 +52,98 @@ export default async (post, reply, repost, atMentions) => { addMention(_id); })); - return Post.insert(post); + const inserted = await Post.insert(post); + + User.update({ _id: user._id }, { + // Increment my posts count + $inc: { + postsCount: 1 + }, + + $set: { + latestPost: post._id + } + }); + + const postObj = await pack(inserted); + + // タイムラインへの投稿 + if (!post.channelId) { + // Publish event to myself's stream + stream(post.userId, 'post', postObj); + + // Fetch all followers + const followers = await Following.aggregate([{ + $lookup: { + from: 'users', + localField: 'followerId', + foreignField: '_id', + as: 'follower' + } + }, { + $match: { + followeeId: post.userId + } + }], { + _id: false + }); + + const note = await renderNote(user, post); + const content = renderCreate(note); + content['@context'] = context; + + Promise.all(followers.map(({ follower }) => { + if (isLocalUser(follower)) { + // Publish event to followers stream + stream(follower._id, 'post', postObj); + } else { + // フォロワーがリモートユーザーかつ投稿者がローカルユーザーなら投稿を配信 + if (isLocalUser(user)) { + createHttp({ + type: 'deliver', + user, + content, + to: follower.account.inbox + }).save(); + } + } + })); + } + + // チャンネルへの投稿 + /* TODO + if (post.channelId) { + promises.push( + // Increment channel index(posts count) + Channel.update({ _id: post.channelId }, { + $inc: { + index: 1 + } + }), + + // Publish event to channel + promisedPostObj.then(postObj => { + publishChannelStream(post.channelId, 'post', postObj); + }), + + Promise.all([ + promisedPostObj, + + // Get channel watchers + ChannelWatching.find({ + channelId: post.channelId, + // 削除されたドキュメントは除く + deletedAt: { $exists: false } + }) + ]).then(([postObj, watches]) => { + // チャンネルの視聴者(のタイムライン)に配信 + watches.forEach(w => { + stream(w.userId, 'post', postObj); + }); + }) + ); + }*/ + + return Promise.all(promises); + }; diff --git a/src/processor/db/delete-post-dependents.ts b/src/processor/db/delete-post-dependents.ts deleted file mode 100644 index 879c41ec9c..0000000000 --- a/src/processor/db/delete-post-dependents.ts +++ /dev/null @@ -1,22 +0,0 @@ -import Favorite from '../../models/favorite'; -import Notification from '../../models/notification'; -import PollVote from '../../models/poll-vote'; -import PostReaction from '../../models/post-reaction'; -import PostWatching from '../../models/post-watching'; -import Post from '../../models/post'; - -export default async ({ data }) => Promise.all([ - Favorite.remove({ postId: data._id }), - Notification.remove({ postId: data._id }), - PollVote.remove({ postId: data._id }), - PostReaction.remove({ postId: data._id }), - PostWatching.remove({ postId: data._id }), - Post.find({ repostId: data._id }).then(reposts => Promise.all([ - Notification.remove({ - postId: { - $in: reposts.map(({ _id }) => _id) - } - }), - Post.remove({ repostId: data._id }) - ])) -]); diff --git a/src/processor/db/index.ts b/src/processor/db/index.ts deleted file mode 100644 index 75838c099b..0000000000 --- a/src/processor/db/index.ts +++ /dev/null @@ -1,7 +0,0 @@ -import deletePostDependents from './delete-post-dependents'; - -const handlers = { - deletePostDependents -}; - -export default (job, done) => handlers[job.data.type](job).then(() => done(), done); diff --git a/src/processor/http/deliver-post.ts b/src/processor/http/deliver-post.ts deleted file mode 100644 index c00ab912c9..0000000000 --- a/src/processor/http/deliver-post.ts +++ /dev/null @@ -1,93 +0,0 @@ -import Channel from '../../models/channel'; -import Following from '../../models/following'; -import ChannelWatching from '../../models/channel-watching'; -import Post, { pack } from '../../models/post'; -import User, { isLocalUser } from '../../models/user'; -import stream, { publishChannelStream } from '../../publishers/stream'; -import context from '../../remote/activitypub/renderer/context'; -import renderCreate from '../../remote/activitypub/renderer/create'; -import renderNote from '../../remote/activitypub/renderer/note'; -import request from '../../remote/request'; - -export default ({ data }) => Post.findOne({ _id: data.id }).then(post => { - const promisedPostObj = pack(post); - const promises = []; - - // タイムラインへの投稿 - if (!post.channelId) { - promises.push( - // Publish event to myself's stream - promisedPostObj.then(postObj => { - stream(post.userId, 'post', postObj); - }), - - Promise.all([ - User.findOne({ _id: post.userId }), - - // Fetch all followers - Following.aggregate([{ - $lookup: { - from: 'users', - localField: 'followerId', - foreignField: '_id', - as: 'follower' - } - }, { - $match: { - followeeId: post.userId - } - }], { - _id: false - }) - ]).then(([user, followers]) => Promise.all(followers.map(following => { - if (isLocalUser(following.follower)) { - // Publish event to followers stream - return promisedPostObj.then(postObj => { - stream(following.followerId, 'post', postObj); - }); - } - - return renderNote(user, post).then(note => { - const create = renderCreate(note); - create['@context'] = context; - return request(user, following.follower[0].account.inbox, create); - }); - }))) - ); - } - - // チャンネルへの投稿 - if (post.channelId) { - promises.push( - // Increment channel index(posts count) - Channel.update({ _id: post.channelId }, { - $inc: { - index: 1 - } - }), - - // Publish event to channel - promisedPostObj.then(postObj => { - publishChannelStream(post.channelId, 'post', postObj); - }), - - Promise.all([ - promisedPostObj, - - // Get channel watchers - ChannelWatching.find({ - channelId: post.channelId, - // 削除されたドキュメントは除く - deletedAt: { $exists: false } - }) - ]).then(([postObj, watches]) => { - // チャンネルの視聴者(のタイムライン)に配信 - watches.forEach(w => { - stream(w.userId, 'post', postObj); - }); - }) - ); - } - - return Promise.all(promises); -}); diff --git a/src/processor/http/follow.ts b/src/processor/http/follow.ts deleted file mode 100644 index 8bf890efbc..0000000000 --- a/src/processor/http/follow.ts +++ /dev/null @@ -1,69 +0,0 @@ -import User, { isLocalUser, pack as packUser } from '../../models/user'; -import Following from '../../models/following'; -import FollowingLog from '../../models/following-log'; -import FollowedLog from '../../models/followed-log'; -import event from '../../publishers/stream'; -import notify from '../../publishers/notify'; -import context from '../../remote/activitypub/renderer/context'; -import render from '../../remote/activitypub/renderer/follow'; -import request from '../../remote/request'; - -export default ({ data }) => Following.findOne({ _id: data.following }).then(({ followerId, followeeId }) => { - const promisedFollower = User.findOne({ _id: followerId }); - const promisedFollowee = User.findOne({ _id: followeeId }); - - return Promise.all([ - // Increment following count - User.update(followerId, { - $inc: { - followingCount: 1 - } - }), - - promisedFollower.then(({ followingCount }) => FollowingLog.insert({ - createdAt: data.following.createdAt, - userId: followerId, - count: followingCount + 1 - })), - - // Increment followers count - User.update({ _id: followeeId }, { - $inc: { - followersCount: 1 - } - }), - - promisedFollowee.then(({ followersCount }) => FollowedLog.insert({ - createdAt: data.following.createdAt, - userId: followerId, - count: followersCount + 1 - })), - - // Notify - promisedFollowee.then(followee => followee.host === null ? - notify(followeeId, followerId, 'follow') : null), - - // Publish follow event - Promise.all([promisedFollower, promisedFollowee]).then(([follower, followee]) => { - let followerEvent; - let followeeEvent; - - if (isLocalUser(follower)) { - followerEvent = packUser(followee, follower) - .then(packed => event(follower._id, 'follow', packed)); - } - - if (isLocalUser(followee)) { - followeeEvent = packUser(follower, followee) - .then(packed => event(followee._id, 'followed', packed)); - } else if (isLocalUser(follower)) { - const rendered = render(follower, followee); - rendered['@context'] = context; - - followeeEvent = request(follower, followee.account.inbox, rendered); - } - - return Promise.all([followerEvent, followeeEvent]); - }) - ]); -}); diff --git a/src/processor/http/index.ts b/src/processor/http/index.ts deleted file mode 100644 index 8f9aa717c3..0000000000 --- a/src/processor/http/index.ts +++ /dev/null @@ -1,17 +0,0 @@ -import deliverPost from './deliver-post'; -import follow from './follow'; -import performActivityPub from './perform-activitypub'; -import processInbox from './process-inbox'; -import reportGitHubFailure from './report-github-failure'; -import unfollow from './unfollow'; - -const handlers = { - deliverPost, - follow, - performActivityPub, - processInbox, - reportGitHubFailure, - unfollow -}; - -export default (job, done) => handlers[job.data.type](job).then(() => done(), done); diff --git a/src/processor/http/perform-activitypub.ts b/src/processor/http/perform-activitypub.ts deleted file mode 100644 index 963e532fe5..0000000000 --- a/src/processor/http/perform-activitypub.ts +++ /dev/null @@ -1,7 +0,0 @@ -import User from '../../models/user'; -import act from '../../remote/activitypub/act'; -import Resolver from '../../remote/activitypub/resolver'; - -export default ({ data }) => User.findOne({ _id: data.actor }) - .then(actor => act(new Resolver(), actor, data.outbox)) - .then(Promise.all); diff --git a/src/processor/http/process-inbox.ts b/src/processor/http/process-inbox.ts deleted file mode 100644 index f102f8d6b4..0000000000 --- a/src/processor/http/process-inbox.ts +++ /dev/null @@ -1,39 +0,0 @@ -import { verifySignature } from 'http-signature'; -import parseAcct from '../../acct/parse'; -import User, { IRemoteUser } from '../../models/user'; -import act from '../../remote/activitypub/act'; -import resolvePerson from '../../remote/activitypub/resolve-person'; -import Resolver from '../../remote/activitypub/resolver'; - -export default async ({ data }): Promise => { - const keyIdLower = data.signature.keyId.toLowerCase(); - let user; - - if (keyIdLower.startsWith('acct:')) { - const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); - if (host === null) { - throw 'request was made by local user'; - } - - user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser; - } else { - user = await User.findOne({ - host: { $ne: null }, - 'account.publicKey.id': data.signature.keyId - }) as IRemoteUser; - - if (user === null) { - user = await resolvePerson(data.signature.keyId); - } - } - - if (user === null) { - throw 'failed to resolve user'; - } - - if (!verifySignature(data.signature, user.account.publicKey.publicKeyPem)) { - throw 'signature verification failed'; - } - - await Promise.all(await act(new Resolver(), user, data.inbox, true)); -}; diff --git a/src/processor/http/report-github-failure.ts b/src/processor/http/report-github-failure.ts deleted file mode 100644 index 4f6f5ccee5..0000000000 --- a/src/processor/http/report-github-failure.ts +++ /dev/null @@ -1,24 +0,0 @@ -import * as request from 'request-promise-native'; -import User from '../../models/user'; -const createPost = require('../../server/api/endpoints/posts/create'); - -export default async ({ data }) => { - const asyncBot = User.findOne({ _id: data.userId }); - - // Fetch parent status - const parentStatuses = await request({ - url: `${data.parentUrl}/statuses`, - headers: { - 'User-Agent': 'misskey' - }, - json: true - }); - - const parentState = parentStatuses[0].state; - const stillFailed = parentState == 'failure' || parentState == 'error'; - const text = stillFailed ? - `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` : - `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`; - - createPost({ text }, await asyncBot); -}; diff --git a/src/processor/http/unfollow.ts b/src/processor/http/unfollow.ts deleted file mode 100644 index d3d5f2246f..0000000000 --- a/src/processor/http/unfollow.ts +++ /dev/null @@ -1,56 +0,0 @@ -import FollowedLog from '../../models/followed-log'; -import Following from '../../models/following'; -import FollowingLog from '../../models/following-log'; -import User, { isRemoteUser, pack as packUser } from '../../models/user'; -import stream from '../../publishers/stream'; -import renderFollow from '../../remote/activitypub/renderer/follow'; -import renderUndo from '../../remote/activitypub/renderer/undo'; -import context from '../../remote/activitypub/renderer/context'; -import request from '../../remote/request'; - -export default async ({ data }) => { - // Delete following - const following = await Following.findOneAndDelete({ _id: data.id }); - if (following === null) { - return; - } - - const promisedFollower = User.findOne({ _id: following.followerId }); - const promisedFollowee = User.findOne({ _id: following.followeeId }); - - await Promise.all([ - // Decrement following count - User.update({ _id: following.followerId }, { $inc: { followingCount: -1 } }), - promisedFollower.then(({ followingCount }) => FollowingLog.insert({ - createdAt: new Date(), - userId: following.followerId, - count: followingCount - 1 - })), - - // Decrement followers count - User.update({ _id: following.followeeId }, { $inc: { followersCount: -1 } }), - promisedFollowee.then(({ followersCount }) => FollowedLog.insert({ - createdAt: new Date(), - userId: following.followeeId, - count: followersCount - 1 - })), - - // Publish follow event - Promise.all([promisedFollower, promisedFollowee]).then(async ([follower, followee]) => { - if (isRemoteUser(follower)) { - return; - } - - const promisedPackedUser = packUser(followee, follower); - - if (isRemoteUser(followee)) { - const undo = renderUndo(renderFollow(follower, followee)); - undo['@context'] = context; - - await request(follower, followee.account.inbox, undo); - } - - stream(follower._id, 'unfollow', promisedPackedUser); - }) - ]); -}; diff --git a/src/processor/index.ts b/src/processor/index.ts deleted file mode 100644 index 172048ddae..0000000000 --- a/src/processor/index.ts +++ /dev/null @@ -1,18 +0,0 @@ -import queue from '../queue'; -import db from './db'; -import http from './http'; - -export default () => { - queue.process('db', db); - - /* - 256 is the default concurrency limit of Mozilla Firefox and Google - Chromium. - - a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google - https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff - Network.http.max-connections - MozillaZine Knowledge Base - http://kb.mozillazine.org/Network.http.max-connections - */ - queue.process('http', 256, http); -}; diff --git a/src/queue.ts b/src/queue.ts deleted file mode 100644 index 08ea13c2a3..0000000000 --- a/src/queue.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { createQueue } from 'kue'; -import config from './config'; - -export default createQueue({ - redis: { - port: config.redis.port, - host: config.redis.host, - auth: config.redis.pass - } -}); diff --git a/src/queue/index.ts b/src/queue/index.ts new file mode 100644 index 0000000000..c8c436b18c --- /dev/null +++ b/src/queue/index.ts @@ -0,0 +1,37 @@ +import { createQueue } from 'kue'; +import config from '../config'; +import db from './processors/db'; +import http from './processors/http'; + +const queue = createQueue({ + redis: { + port: config.redis.port, + host: config.redis.host, + auth: config.redis.pass + } +}); + +export function createHttp(data) { + return queue + .create('http', data) + .attempts(16) + .backoff({ delay: 16384, type: 'exponential' }); +} + +export function createDb(data) { + return queue.create('db', data); +} + +export function process() { + queue.process('db', db); + + /* + 256 is the default concurrency limit of Mozilla Firefox and Google + Chromium. + a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google + https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff + Network.http.max-connections - MozillaZine Knowledge Base + http://kb.mozillazine.org/Network.http.max-connections + */ + queue.process('http', 256, http); +} diff --git a/src/queue/processors/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts new file mode 100644 index 0000000000..879c41ec9c --- /dev/null +++ b/src/queue/processors/db/delete-post-dependents.ts @@ -0,0 +1,22 @@ +import Favorite from '../../models/favorite'; +import Notification from '../../models/notification'; +import PollVote from '../../models/poll-vote'; +import PostReaction from '../../models/post-reaction'; +import PostWatching from '../../models/post-watching'; +import Post from '../../models/post'; + +export default async ({ data }) => Promise.all([ + Favorite.remove({ postId: data._id }), + Notification.remove({ postId: data._id }), + PollVote.remove({ postId: data._id }), + PostReaction.remove({ postId: data._id }), + PostWatching.remove({ postId: data._id }), + Post.find({ repostId: data._id }).then(reposts => Promise.all([ + Notification.remove({ + postId: { + $in: reposts.map(({ _id }) => _id) + } + }), + Post.remove({ repostId: data._id }) + ])) +]); diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts new file mode 100644 index 0000000000..75838c099b --- /dev/null +++ b/src/queue/processors/db/index.ts @@ -0,0 +1,7 @@ +import deletePostDependents from './delete-post-dependents'; + +const handlers = { + deletePostDependents +}; + +export default (job, done) => handlers[job.data.type](job).then(() => done(), done); diff --git a/src/queue/processors/http/deliver.ts b/src/queue/processors/http/deliver.ts new file mode 100644 index 0000000000..8cd9eb624e --- /dev/null +++ b/src/queue/processors/http/deliver.ts @@ -0,0 +1,17 @@ +import * as kue from 'kue'; + +import Channel from '../../models/channel'; +import Following from '../../models/following'; +import ChannelWatching from '../../models/channel-watching'; +import Post, { pack } from '../../models/post'; +import User, { isLocalUser } from '../../models/user'; +import stream, { publishChannelStream } from '../../publishers/stream'; +import context from '../../remote/activitypub/renderer/context'; +import renderCreate from '../../remote/activitypub/renderer/create'; +import renderNote from '../../remote/activitypub/renderer/note'; +import request from '../../remote/request'; + +export default async (job: kue.Job, done): Promise => { + + request(user, following.follower[0].account.inbox, create); +} diff --git a/src/queue/processors/http/follow.ts b/src/queue/processors/http/follow.ts new file mode 100644 index 0000000000..8bf890efbc --- /dev/null +++ b/src/queue/processors/http/follow.ts @@ -0,0 +1,69 @@ +import User, { isLocalUser, pack as packUser } from '../../models/user'; +import Following from '../../models/following'; +import FollowingLog from '../../models/following-log'; +import FollowedLog from '../../models/followed-log'; +import event from '../../publishers/stream'; +import notify from '../../publishers/notify'; +import context from '../../remote/activitypub/renderer/context'; +import render from '../../remote/activitypub/renderer/follow'; +import request from '../../remote/request'; + +export default ({ data }) => Following.findOne({ _id: data.following }).then(({ followerId, followeeId }) => { + const promisedFollower = User.findOne({ _id: followerId }); + const promisedFollowee = User.findOne({ _id: followeeId }); + + return Promise.all([ + // Increment following count + User.update(followerId, { + $inc: { + followingCount: 1 + } + }), + + promisedFollower.then(({ followingCount }) => FollowingLog.insert({ + createdAt: data.following.createdAt, + userId: followerId, + count: followingCount + 1 + })), + + // Increment followers count + User.update({ _id: followeeId }, { + $inc: { + followersCount: 1 + } + }), + + promisedFollowee.then(({ followersCount }) => FollowedLog.insert({ + createdAt: data.following.createdAt, + userId: followerId, + count: followersCount + 1 + })), + + // Notify + promisedFollowee.then(followee => followee.host === null ? + notify(followeeId, followerId, 'follow') : null), + + // Publish follow event + Promise.all([promisedFollower, promisedFollowee]).then(([follower, followee]) => { + let followerEvent; + let followeeEvent; + + if (isLocalUser(follower)) { + followerEvent = packUser(followee, follower) + .then(packed => event(follower._id, 'follow', packed)); + } + + if (isLocalUser(followee)) { + followeeEvent = packUser(follower, followee) + .then(packed => event(followee._id, 'followed', packed)); + } else if (isLocalUser(follower)) { + const rendered = render(follower, followee); + rendered['@context'] = context; + + followeeEvent = request(follower, followee.account.inbox, rendered); + } + + return Promise.all([followerEvent, followeeEvent]); + }) + ]); +}); diff --git a/src/queue/processors/http/index.ts b/src/queue/processors/http/index.ts new file mode 100644 index 0000000000..8f9aa717c3 --- /dev/null +++ b/src/queue/processors/http/index.ts @@ -0,0 +1,17 @@ +import deliverPost from './deliver-post'; +import follow from './follow'; +import performActivityPub from './perform-activitypub'; +import processInbox from './process-inbox'; +import reportGitHubFailure from './report-github-failure'; +import unfollow from './unfollow'; + +const handlers = { + deliverPost, + follow, + performActivityPub, + processInbox, + reportGitHubFailure, + unfollow +}; + +export default (job, done) => handlers[job.data.type](job).then(() => done(), done); diff --git a/src/queue/processors/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts new file mode 100644 index 0000000000..963e532fe5 --- /dev/null +++ b/src/queue/processors/http/perform-activitypub.ts @@ -0,0 +1,7 @@ +import User from '../../models/user'; +import act from '../../remote/activitypub/act'; +import Resolver from '../../remote/activitypub/resolver'; + +export default ({ data }) => User.findOne({ _id: data.actor }) + .then(actor => act(new Resolver(), actor, data.outbox)) + .then(Promise.all); diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts new file mode 100644 index 0000000000..fff1fbf663 --- /dev/null +++ b/src/queue/processors/http/process-inbox.ts @@ -0,0 +1,55 @@ +import * as kue from 'kue'; + +import { verifySignature } from 'http-signature'; +import parseAcct from '../../acct/parse'; +import User, { IRemoteUser } from '../../models/user'; +import act from '../../remote/activitypub/act'; +import resolvePerson from '../../remote/activitypub/resolve-person'; +import Resolver from '../../remote/activitypub/resolver'; + +// ユーザーのinboxにアクティビティが届いた時の処理 +export default async (job: kue.Job, done): Promise => { + const signature = job.data.signature; + const activity = job.data.activity; + + const keyIdLower = signature.keyId.toLowerCase(); + let user; + + if (keyIdLower.startsWith('acct:')) { + const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); + if (host === null) { + console.warn(`request was made by local user: @${username}`); + done(); + } + + user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser; + } else { + user = await User.findOne({ + host: { $ne: null }, + 'account.publicKey.id': signature.keyId + }) as IRemoteUser; + + // アクティビティを送信してきたユーザーがまだMisskeyサーバーに登録されていなかったら登録する + if (user === null) { + user = await resolvePerson(signature.keyId); + } + } + + if (user === null) { + done(new Error('failed to resolve user')); + return; + } + + if (!verifySignature(signature, user.account.publicKey.publicKeyPem)) { + done(new Error('signature verification failed')); + return; + } + + // アクティビティを処理 + try { + await act(new Resolver(), user, activity); + done(); + } catch (e) { + done(e); + } +}; diff --git a/src/queue/processors/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts new file mode 100644 index 0000000000..4f6f5ccee5 --- /dev/null +++ b/src/queue/processors/http/report-github-failure.ts @@ -0,0 +1,24 @@ +import * as request from 'request-promise-native'; +import User from '../../models/user'; +const createPost = require('../../server/api/endpoints/posts/create'); + +export default async ({ data }) => { + const asyncBot = User.findOne({ _id: data.userId }); + + // Fetch parent status + const parentStatuses = await request({ + url: `${data.parentUrl}/statuses`, + headers: { + 'User-Agent': 'misskey' + }, + json: true + }); + + const parentState = parentStatuses[0].state; + const stillFailed = parentState == 'failure' || parentState == 'error'; + const text = stillFailed ? + `**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` : + `**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`; + + createPost({ text }, await asyncBot); +}; diff --git a/src/queue/processors/http/unfollow.ts b/src/queue/processors/http/unfollow.ts new file mode 100644 index 0000000000..d3d5f2246f --- /dev/null +++ b/src/queue/processors/http/unfollow.ts @@ -0,0 +1,56 @@ +import FollowedLog from '../../models/followed-log'; +import Following from '../../models/following'; +import FollowingLog from '../../models/following-log'; +import User, { isRemoteUser, pack as packUser } from '../../models/user'; +import stream from '../../publishers/stream'; +import renderFollow from '../../remote/activitypub/renderer/follow'; +import renderUndo from '../../remote/activitypub/renderer/undo'; +import context from '../../remote/activitypub/renderer/context'; +import request from '../../remote/request'; + +export default async ({ data }) => { + // Delete following + const following = await Following.findOneAndDelete({ _id: data.id }); + if (following === null) { + return; + } + + const promisedFollower = User.findOne({ _id: following.followerId }); + const promisedFollowee = User.findOne({ _id: following.followeeId }); + + await Promise.all([ + // Decrement following count + User.update({ _id: following.followerId }, { $inc: { followingCount: -1 } }), + promisedFollower.then(({ followingCount }) => FollowingLog.insert({ + createdAt: new Date(), + userId: following.followerId, + count: followingCount - 1 + })), + + // Decrement followers count + User.update({ _id: following.followeeId }, { $inc: { followersCount: -1 } }), + promisedFollowee.then(({ followersCount }) => FollowedLog.insert({ + createdAt: new Date(), + userId: following.followeeId, + count: followersCount - 1 + })), + + // Publish follow event + Promise.all([promisedFollower, promisedFollowee]).then(async ([follower, followee]) => { + if (isRemoteUser(follower)) { + return; + } + + const promisedPackedUser = packUser(followee, follower); + + if (isRemoteUser(followee)) { + const undo = renderUndo(renderFollow(follower, followee)); + undo['@context'] = context; + + await request(follower, followee.account.inbox, undo); + } + + stream(follower._id, 'unfollow', promisedPackedUser); + }) + ]); +}; diff --git a/src/queue/processors/index.ts b/src/queue/processors/index.ts new file mode 100644 index 0000000000..172048ddae --- /dev/null +++ b/src/queue/processors/index.ts @@ -0,0 +1,18 @@ +import queue from '../queue'; +import db from './db'; +import http from './http'; + +export default () => { + queue.process('db', db); + + /* + 256 is the default concurrency limit of Mozilla Firefox and Google + Chromium. + + a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google + https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff + Network.http.max-connections - MozillaZine Knowledge Base + http://kb.mozillazine.org/Network.http.max-connections + */ + queue.process('http', 256, http); +}; diff --git a/src/remote/activitypub/act/create.ts b/src/remote/activitypub/act/create.ts index fa681982cf..c1a30ce7d0 100644 --- a/src/remote/activitypub/act/create.ts +++ b/src/remote/activitypub/act/create.ts @@ -1,10 +1,92 @@ -import create from '../create'; +import { JSDOM } from 'jsdom'; +const createDOMPurify = require('dompurify'); + import Resolver from '../resolver'; +import DriveFile from '../../../models/drive-file'; +import Post from '../../../models/post'; +import uploadFromUrl from '../../../drive/upload-from-url'; +import createPost from '../../../post/create'; -export default (resolver: Resolver, actor, activity, distribute) => { +export default async (resolver: Resolver, actor, activity): Promise => { if ('actor' in activity && actor.account.uri !== activity.actor) { - throw new Error(); + throw new Error('invalid actor'); } - return create(resolver, actor, activity.object, distribute); + const uri = activity.id || activity; + + try { + await Promise.all([ + DriveFile.findOne({ 'metadata.uri': uri }).then(file => { + if (file !== null) { + throw new Error(); + } + }, () => {}), + Post.findOne({ uri }).then(post => { + if (post !== null) { + throw new Error(); + } + }, () => {}) + ]); + } catch (object) { + throw new Error(`already registered: ${uri}`); + } + + const object = await resolver.resolve(activity); + + switch (object.type) { + case 'Image': + createImage(resolver, object); + break; + + case 'Note': + createNote(resolver, object); + break; + } + + /// + + async function createImage(resolver: Resolver, image) { + if ('attributedTo' in image && actor.account.uri !== image.attributedTo) { + throw new Error('invalid image'); + } + + return await uploadFromUrl(image.url, actor); + } + + async function createNote(resolver: Resolver, note) { + if ( + ('attributedTo' in note && actor.account.uri !== note.attributedTo) || + typeof note.id !== 'string' + ) { + throw new Error('invalid note'); + } + + const mediaIds = []; + + if ('attachment' in note) { + note.attachment.forEach(async media => { + const created = await createImage(resolver, media); + mediaIds.push(created._id); + }); + } + + const { window } = new JSDOM(note.content); + + await createPost(actor, { + channelId: undefined, + index: undefined, + createdAt: new Date(note.published), + mediaIds, + replyId: undefined, + repostId: undefined, + poll: undefined, + text: window.document.body.textContent, + textHtml: note.content && createDOMPurify(window).sanitize(note.content), + userId: actor._id, + appId: null, + viaMobile: false, + geo: undefined, + uri: note.id + }, null, null, []); + } }; diff --git a/src/remote/activitypub/act/index.ts b/src/remote/activitypub/act/index.ts index d282e12885..d78335f16e 100644 --- a/src/remote/activitypub/act/index.ts +++ b/src/remote/activitypub/act/index.ts @@ -2,35 +2,29 @@ import create from './create'; import performDeleteActivity from './delete'; import follow from './follow'; import undo from './undo'; -import createObject from '../create'; import Resolver from '../resolver'; - -export default async (parentResolver: Resolver, actor, value, distribute?: boolean) => { - const collection = await parentResolver.resolveCollection(value); - - return collection.object.map(async element => { - const { resolver, object } = await collection.resolver.resolveOne(element); - const created = await (await createObject(resolver, actor, [object], distribute))[0]; - - if (created !== null) { - return created; - } - - switch (object.type) { - case 'Create': - return create(resolver, actor, object, distribute); - - case 'Delete': - return performDeleteActivity(resolver, actor, object); - - case 'Follow': - return follow(resolver, actor, object, distribute); - - case 'Undo': - return undo(resolver, actor, object); - - default: - return null; - } - }); +import { IObject } from '../type'; + +export default async (parentResolver: Resolver, actor, activity: IObject): Promise => { + switch (activity.type) { + case 'Create': + await create(parentResolver, actor, activity); + break; + + case 'Delete': + await performDeleteActivity(parentResolver, actor, activity); + break; + + case 'Follow': + await follow(parentResolver, actor, activity); + break; + + case 'Undo': + await undo(parentResolver, actor, activity); + break; + + default: + console.warn(`unknown activity type: ${activity.type}`); + return null; + } }; diff --git a/src/remote/activitypub/create.ts b/src/remote/activitypub/create.ts deleted file mode 100644 index 97c72860fd..0000000000 --- a/src/remote/activitypub/create.ts +++ /dev/null @@ -1,158 +0,0 @@ -import { JSDOM } from 'jsdom'; -import { ObjectID } from 'mongodb'; -import config from '../../config'; -import DriveFile from '../../models/drive-file'; -import Post from '../../models/post'; -import { IRemoteUser } from '../../models/user'; -import uploadFromUrl from '../../drive/upload-from-url'; -import createPost from '../../post/create'; -import distributePost from '../../post/distribute'; -import Resolver from './resolver'; -const createDOMPurify = require('dompurify'); - -type IResult = { - resolver: Resolver; - object: { - $ref: string; - $id: ObjectID; - }; -}; - -class Creator { - private actor: IRemoteUser; - private distribute: boolean; - - constructor(actor, distribute) { - this.actor = actor; - this.distribute = distribute; - } - - private async createImage(resolver: Resolver, image) { - if ('attributedTo' in image && this.actor.account.uri !== image.attributedTo) { - throw new Error(); - } - - const { _id } = await uploadFromUrl(image.url, this.actor, image.id || null); - return { - resolver, - object: { $ref: 'driveFiles.files', $id: _id } - }; - } - - private async createNote(resolver: Resolver, note) { - if ( - ('attributedTo' in note && this.actor.account.uri !== note.attributedTo) || - typeof note.id !== 'string' - ) { - throw new Error(); - } - - const mediaIds = 'attachment' in note && - (await Promise.all(await this.create(resolver, note.attachment))) - .filter(media => media !== null && media.object.$ref === 'driveFiles.files') - .map(({ object }) => object.$id); - - const { window } = new JSDOM(note.content); - - const inserted = await createPost({ - channelId: undefined, - index: undefined, - createdAt: new Date(note.published), - mediaIds, - replyId: undefined, - repostId: undefined, - poll: undefined, - text: window.document.body.textContent, - textHtml: note.content && createDOMPurify(window).sanitize(note.content), - userId: this.actor._id, - appId: null, - viaMobile: false, - geo: undefined, - uri: note.id - }, null, null, []); - - const promises = []; - - if (this.distribute) { - promises.push(distributePost(this.actor, inserted.mentions, inserted)); - } - - // Register to search database - if (note.content && config.elasticsearch.enable) { - const es = require('../../db/elasticsearch'); - - promises.push(new Promise((resolve, reject) => { - es.index({ - index: 'misskey', - type: 'post', - id: inserted._id.toString(), - body: { - text: window.document.body.textContent - } - }, resolve); - })); - } - - await Promise.all(promises); - - return { - resolver, - object: { $ref: 'posts', id: inserted._id } - }; - } - - public async create(parentResolver: Resolver, value): Promise>> { - const collection = await parentResolver.resolveCollection(value); - - return collection.object.map(async element => { - const uri = element.id || element; - - try { - await Promise.all([ - DriveFile.findOne({ 'metadata.uri': uri }).then(file => { - if (file === null) { - return; - } - - throw { - $ref: 'driveFile.files', - $id: file._id - }; - }, () => {}), - Post.findOne({ uri }).then(post => { - if (post === null) { - return; - } - - throw { - $ref: 'posts', - $id: post._id - }; - }, () => {}) - ]); - } catch (object) { - return { - resolver: collection.resolver, - object - }; - } - - const { resolver, object } = await collection.resolver.resolveOne(element); - - switch (object.type) { - case 'Image': - return this.createImage(resolver, object); - - case 'Note': - return this.createNote(resolver, object); - } - - return null; - }); - } -} - -export default (resolver: Resolver, actor, value, distribute?: boolean) => { - const creator = new Creator(actor, distribute); - return creator.create(resolver, value); -}; diff --git a/src/remote/activitypub/resolver.ts b/src/remote/activitypub/resolver.ts index 371ccdcc30..de0bba2687 100644 --- a/src/remote/activitypub/resolver.ts +++ b/src/remote/activitypub/resolver.ts @@ -1,20 +1,45 @@ +import { IObject } from "./type"; + const request = require('request-promise-native'); export default class Resolver { - private requesting: Set; + private history: Set; - constructor(iterable?: Iterable) { - this.requesting = new Set(iterable); + constructor() { + this.history = new Set(); } - private async resolveUnrequestedOne(value) { + public async resolveCollection(value) { + const collection = typeof value === 'string' + ? await this.resolve(value) + : value; + + switch (collection.type) { + case 'Collection': + collection.objects = collection.object.items; + break; + + case 'OrderedCollection': + collection.objects = collection.object.orderedItems; + break; + + default: + throw new Error(`unknown collection type: ${collection.type}`); + } + + return collection; + } + + public async resolve(value): Promise { if (typeof value !== 'string') { - return { resolver: this, object: value }; + return value; } - const resolver = new Resolver(this.requesting); + if (this.history.has(value)) { + throw new Error('cannot resolve already resolved one'); + } - resolver.requesting.add(value); + this.history.add(value); const object = await request({ url: value, @@ -29,41 +54,9 @@ export default class Resolver { !object['@context'].includes('https://www.w3.org/ns/activitystreams') : object['@context'] !== 'https://www.w3.org/ns/activitystreams' )) { - throw new Error(); - } - - return { resolver, object }; - } - - public async resolveCollection(value) { - const resolved = typeof value === 'string' ? - await this.resolveUnrequestedOne(value) : - { resolver: this, object: value }; - - switch (resolved.object.type) { - case 'Collection': - resolved.object = resolved.object.items; - break; - - case 'OrderedCollection': - resolved.object = resolved.object.orderedItems; - break; - - default: - if (!Array.isArray(value)) { - resolved.object = [resolved.object]; - } - break; - } - - return resolved; - } - - public resolveOne(value) { - if (this.requesting.has(value)) { - throw new Error(); + throw new Error('invalid response'); } - return this.resolveUnrequestedOne(value); + return object; } } diff --git a/src/server/activitypub/inbox.ts b/src/server/activitypub/inbox.ts index 5de8433850..847dc19af6 100644 --- a/src/server/activitypub/inbox.ts +++ b/src/server/activitypub/inbox.ts @@ -24,7 +24,7 @@ app.post('/@:user/inbox', bodyParser.json({ queue.create('http', { type: 'processInbox', - inbox: req.body, + activity: req.body, signature, }).save(); -- cgit v1.2.3-freya