From 32c008d0087eae2a2b32f050fd2ae126a3e2c732 Mon Sep 17 00:00:00 2001 From: Akihiko Odaki Date: Mon, 2 Apr 2018 20:16:13 +0900 Subject: Deliver posts to remote followers --- src/processor/http/deliver-post.ts | 94 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 src/processor/http/deliver-post.ts (limited to 'src/processor/http/deliver-post.ts') diff --git a/src/processor/http/deliver-post.ts b/src/processor/http/deliver-post.ts new file mode 100644 index 0000000000..83ac8281f4 --- /dev/null +++ b/src/processor/http/deliver-post.ts @@ -0,0 +1,94 @@ +import Channel from '../../models/channel'; +import Following from '../../models/following'; +import ChannelWatching from '../../models/channel-watching'; +import Post, { pack } from '../../models/post'; +import User, { isLocalUser } from '../../models/user'; +import stream, { publishChannelStream } from '../../publishers/stream'; +import context from '../../remote/activitypub/renderer/context'; +import renderNote from '../../remote/activitypub/renderer/note'; +import request from '../../remote/request'; + +export default ({ data }) => Post.findOne({ _id: data.id }).then(post => { + const promisedPostObj = pack(post); + const promises = []; + + // タイムラインへの投稿 + if (!post.channelId) { + promises.push( + // Publish event to myself's stream + promisedPostObj.then(postObj => { + stream(post.userId, 'post', postObj); + }), + + Promise.all([ + User.findOne({ _id: post.userId }), + + // Fetch all followers + Following.aggregate([ + { + $lookup: { + from: 'users', + localField: 'followerId', + foreignField: '_id', + as: 'follower' + } + }, + { + $match: { + followeeId: post.userId + } + } + ], { + _id: false + }) + ]).then(([user, followers]) => Promise.all(followers.map(following => { + if (isLocalUser(following.follower)) { + // Publish event to followers stream + return promisedPostObj.then(postObj => { + stream(following.followerId, 'post', postObj); + }); + } + + return renderNote(user, post).then(rendered => { + rendered['@context'] = context; + return request(user, following.follower[0].account.inbox, rendered); + }); + }))) + ); + } + + // チャンネルへの投稿 + if (post.channelId) { + promises.push( + // Increment channel index(posts count) + Channel.update({ _id: post.channelId }, { + $inc: { + index: 1 + } + }), + + // Publish event to channel + promisedPostObj.then(postObj => { + publishChannelStream(post.channelId, 'post', postObj); + }), + + Promise.all([ + promisedPostObj, + + // Get channel watchers + ChannelWatching.find({ + channelId: post.channelId, + // 削除されたドキュメントは除く + deletedAt: { $exists: false } + }) + ]).then(([postObj, watches]) => { + // チャンネルの視聴者(のタイムライン)に配信 + watches.forEach(w => { + stream(w.userId, 'post', postObj); + }); + }) + ); + } + + return Promise.all(promises); +}); -- cgit v1.2.3-freya