diff options
| author | Akihiko Odaki <nekomanma@pixiv.co.jp> | 2018-04-02 20:16:13 +0900 |
|---|---|---|
| committer | Akihiko Odaki <nekomanma@pixiv.co.jp> | 2018-04-03 02:10:43 +0900 |
| commit | 32c008d0087eae2a2b32f050fd2ae126a3e2c732 (patch) | |
| tree | 492ee92b3c35af6a243c38e5a4fccfccdad63c7b /src/post | |
| parent | Fix job processor interfaces (diff) | |
| download | misskey-32c008d0087eae2a2b32f050fd2ae126a3e2c732.tar.gz misskey-32c008d0087eae2a2b32f050fd2ae126a3e2c732.tar.bz2 misskey-32c008d0087eae2a2b32f050fd2ae126a3e2c732.zip | |
Deliver posts to remote followers
Diffstat (limited to 'src/post')
| -rw-r--r-- | src/post/distribute.ts | 138 |
1 files changed, 57 insertions, 81 deletions
diff --git a/src/post/distribute.ts b/src/post/distribute.ts index 4def2c27fe..49c6eb22df 100644 --- a/src/post/distribute.ts +++ b/src/post/distribute.ts @@ -1,16 +1,15 @@ -import Channel from '../models/channel'; import Mute from '../models/mute'; -import Following from '../models/following'; -import Post from '../models/post'; +import Post, { pack } from '../models/post'; import Watching from '../models/post-watching'; -import ChannelWatching from '../models/channel-watching'; import User from '../models/user'; -import stream, { publishChannelStream } from '../publishers/stream'; +import stream from '../publishers/stream'; import notify from '../publishers/notify'; import pushSw from '../publishers/push-sw'; +import queue from '../queue'; import watch from './watch'; export default async (user, mentions, post) => { + const promisedPostObj = pack(post); const promises = [ User.update({ _id: user._id }, { // Increment my posts count @@ -22,66 +21,33 @@ export default async (user, mentions, post) => { latestPost: post._id } }), + new Promise((resolve, reject) => queue.create('http', { + type: 'deliverPost', + id: post._id, + }).save(error => error ? reject(error) : resolve())), ] as Array<Promise<any>>; - function addMention(mentionee, reason) { + function addMention(promisedMentionee, reason) { // Publish event - if (!user._id.equals(mentionee)) { - promises.push(Mute.find({ - muterId: mentionee, - deletedAt: { $exists: false } - }).then(mentioneeMutes => { + promises.push(promisedMentionee.then(mentionee => { + if (user._id.equals(mentionee)) { + return Promise.resolve(); + } + + return Promise.all([ + promisedPostObj, + Mute.find({ + muterId: mentionee, + deletedAt: { $exists: false } + }) + ]).then(([postObj, mentioneeMutes]) => { const mentioneesMutedUserIds = mentioneeMutes.map(m => m.muteeId.toString()); if (mentioneesMutedUserIds.indexOf(user._id.toString()) == -1) { - stream(mentionee, reason, post); - pushSw(mentionee, reason, post); + stream(mentionee, reason, postObj); + pushSw(mentionee, reason, postObj); } - })); - } - } - - // タイムラインへの投稿 - if (!post.channelId) { - // Publish event to myself's stream - stream(user._id, 'post', post); - - // Fetch all followers - const followers = await Following - .find({ - followeeId: user._id - }, { - followerId: true, - _id: false }); - - // Publish event to followers stream - followers.forEach(following => - stream(following.followerId, 'post', post)); - } - - // チャンネルへの投稿 - if (post.channelId) { - // Increment channel index(posts count) - promises.push(Channel.update({ _id: post.channelId }, { - $inc: { - index: 1 - } })); - - // Publish event to channel - publishChannelStream(post.channelId, 'post', post); - - // Get channel watchers - const watches = await ChannelWatching.find({ - channelId: post.channelId, - // 削除されたドキュメントは除く - deletedAt: { $exists: false } - }); - - // チャンネルの視聴者(のタイムライン)に配信 - watches.forEach(w => { - stream(w.userId, 'post', post); - }); } // If has in reply to post @@ -95,8 +61,10 @@ export default async (user, mentions, post) => { }), // 自分自身へのリプライでない限りは通知を作成 - notify(post.reply.userId, user._id, 'reply', { - postId: post._id + promisedPostObj.then(({ reply }) => { + return notify(reply.userId, user._id, 'reply', { + postId: post._id + }); }), // Fetch watchers @@ -121,11 +89,13 @@ export default async (user, mentions, post) => { ); // Add mention - addMention(post.reply.userId, 'reply'); + addMention(promisedPostObj.then(({ reply }) => reply.userId), 'reply'); // この投稿をWatchする if (user.account.settings.autoWatch !== false) { - promises.push(watch(user._id, post.reply)); + promises.push(promisedPostObj.then(({ reply }) => { + return watch(user._id, reply); + })); } } @@ -134,10 +104,17 @@ export default async (user, mentions, post) => { const type = post.text ? 'quote' : 'repost'; promises.push( - // Notify - notify(post.repost.userId, user._id, type, { - postId: post._id - }), + promisedPostObj.then(({ repost }) => Promise.all([ + // Notify + notify(repost.userId, user._id, type, { + postId: post._id + }), + + // この投稿をWatchする + // TODO: ユーザーが「Repostしたときに自動でWatchする」設定を + // オフにしていた場合はしない + watch(user._id, repost) + ])), // Fetch watchers Watching @@ -157,23 +134,20 @@ export default async (user, mentions, post) => { postId: post._id }); }); - }), - - // この投稿をWatchする - // TODO: ユーザーが「Repostしたときに自動でWatchする」設定を - // オフにしていた場合はしない - watch(user._id, post.repost) + }) ); // If it is quote repost if (post.text) { // Add mention - addMention(post.repost.userId, 'quote'); + addMention(promisedPostObj.then(({ repost }) => repost.userId), 'quote'); } else { - // Publish event - if (!user._id.equals(post.repost.userId)) { - stream(post.repost.userId, 'repost', post); - } + promises.push(promisedPostObj.then(postObj => { + // Publish event + if (!user._id.equals(postObj.repost.userId)) { + stream(postObj.repost.userId, 'repost', postObj); + } + })); } // 今までで同じ投稿をRepostしているか @@ -196,10 +170,10 @@ export default async (user, mentions, post) => { } // Resolve all mentions - await Promise.all(mentions.map(async mention => { + await promisedPostObj.then(({ reply, repost }) => Promise.all(mentions.map(async mention => { // 既に言及されたユーザーに対する返信や引用repostの場合も無視 - if (post.reply && post.reply.userId.equals(mention)) return; - if (post.repost && post.repost.userId.equals(mention)) return; + if (reply && reply.userId.equals(mention)) return; + if (repost && repost.userId.equals(mention)) return; // Add mention addMention(mention, 'mention'); @@ -208,7 +182,9 @@ export default async (user, mentions, post) => { await notify(mention, user._id, 'mention', { postId: post._id }); - })); + }))); + + await Promise.all(promises); - return Promise.all(promises); + return promisedPostObj; }; |