diff options
| author | Akihiko Odaki <nekomanma@pixiv.co.jp> | 2018-04-02 20:16:13 +0900 |
|---|---|---|
| committer | Akihiko Odaki <nekomanma@pixiv.co.jp> | 2018-04-03 02:10:43 +0900 |
| commit | 32c008d0087eae2a2b32f050fd2ae126a3e2c732 (patch) | |
| tree | 492ee92b3c35af6a243c38e5a4fccfccdad63c7b /src/processor/http/deliver-post.ts | |
| parent | Fix job processor interfaces (diff) | |
| download | sharkey-32c008d0087eae2a2b32f050fd2ae126a3e2c732.tar.gz sharkey-32c008d0087eae2a2b32f050fd2ae126a3e2c732.tar.bz2 sharkey-32c008d0087eae2a2b32f050fd2ae126a3e2c732.zip | |
Deliver posts to remote followers
Diffstat (limited to 'src/processor/http/deliver-post.ts')
| -rw-r--r-- | src/processor/http/deliver-post.ts | 94 |
1 files changed, 94 insertions, 0 deletions
diff --git a/src/processor/http/deliver-post.ts b/src/processor/http/deliver-post.ts new file mode 100644 index 0000000000..83ac8281f4 --- /dev/null +++ b/src/processor/http/deliver-post.ts @@ -0,0 +1,94 @@ +import Channel from '../../models/channel'; +import Following from '../../models/following'; +import ChannelWatching from '../../models/channel-watching'; +import Post, { pack } from '../../models/post'; +import User, { isLocalUser } from '../../models/user'; +import stream, { publishChannelStream } from '../../publishers/stream'; +import context from '../../remote/activitypub/renderer/context'; +import renderNote from '../../remote/activitypub/renderer/note'; +import request from '../../remote/request'; + +export default ({ data }) => Post.findOne({ _id: data.id }).then(post => { + const promisedPostObj = pack(post); + const promises = []; + + // タイムラインへの投稿 + if (!post.channelId) { + promises.push( + // Publish event to myself's stream + promisedPostObj.then(postObj => { + stream(post.userId, 'post', postObj); + }), + + Promise.all([ + User.findOne({ _id: post.userId }), + + // Fetch all followers + Following.aggregate([ + { + $lookup: { + from: 'users', + localField: 'followerId', + foreignField: '_id', + as: 'follower' + } + }, + { + $match: { + followeeId: post.userId + } + } + ], { + _id: false + }) + ]).then(([user, followers]) => Promise.all(followers.map(following => { + if (isLocalUser(following.follower)) { + // Publish event to followers stream + return promisedPostObj.then(postObj => { + stream(following.followerId, 'post', postObj); + }); + } + + return renderNote(user, post).then(rendered => { + rendered['@context'] = context; + return request(user, following.follower[0].account.inbox, rendered); + }); + }))) + ); + } + + // チャンネルへの投稿 + if (post.channelId) { + promises.push( + // Increment channel index(posts count) + Channel.update({ _id: post.channelId }, { + $inc: { + index: 1 + } + }), + + // Publish event to channel + promisedPostObj.then(postObj => { + publishChannelStream(post.channelId, 'post', postObj); + }), + + Promise.all([ + promisedPostObj, + + // Get channel watchers + ChannelWatching.find({ + channelId: post.channelId, + // 削除されたドキュメントは除く + deletedAt: { $exists: false } + }) + ]).then(([postObj, watches]) => { + // チャンネルの視聴者(のタイムライン)に配信 + watches.forEach(w => { + stream(w.userId, 'post', postObj); + }); + }) + ); + } + + return Promise.all(promises); +}); |