summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAkihiko Odaki <nekomanma@pixiv.co.jp>2018-04-04 20:29:26 +0900
committerAkihiko Odaki <nekomanma@pixiv.co.jp>2018-04-04 21:04:58 +0900
commitdf38c2f485937d72c495d3195804830b09aa3e09 (patch)
tree61c62cabc7e56fbb6e13577bd4a74c0fbb3b108f /src
parentAllow to undo Create activity (diff)
downloadmisskey-df38c2f485937d72c495d3195804830b09aa3e09.tar.gz
misskey-df38c2f485937d72c495d3195804830b09aa3e09.tar.bz2
misskey-df38c2f485937d72c495d3195804830b09aa3e09.zip
Extract http request from post delivery job
Diffstat (limited to 'src')
-rw-r--r--src/post/distribute.ts96
-rw-r--r--src/processor/http/deliver-post.ts100
2 files changed, 104 insertions, 92 deletions
diff --git a/src/post/distribute.ts b/src/post/distribute.ts
index 49c6eb22df..ad699d6b84 100644
--- a/src/post/distribute.ts
+++ b/src/post/distribute.ts
@@ -1,8 +1,11 @@
+import Channel from '../models/channel';
+import ChannelWatching from '../models/channel-watching';
+import Following from '../models/following';
import Mute from '../models/mute';
import Post, { pack } from '../models/post';
import Watching from '../models/post-watching';
-import User from '../models/user';
-import stream from '../publishers/stream';
+import User, { isLocalUser } from '../models/user';
+import stream, { publishChannelStream } from '../publishers/stream';
import notify from '../publishers/notify';
import pushSw from '../publishers/push-sw';
import queue from '../queue';
@@ -21,10 +24,6 @@ export default async (user, mentions, post) => {
latestPost: post._id
}
}),
- new Promise((resolve, reject) => queue.create('http', {
- type: 'deliverPost',
- id: post._id,
- }).save(error => error ? reject(error) : resolve())),
] as Array<Promise<any>>;
function addMention(promisedMentionee, reason) {
@@ -50,6 +49,91 @@ export default async (user, mentions, post) => {
}));
}
+ // タイムラインへの投稿
+ if (!post.channelId) {
+ promises.push(
+ // Publish event to myself's stream
+ promisedPostObj.then(postObj => {
+ stream(post.userId, 'post', postObj);
+ }),
+
+ Promise.all([
+ User.findOne({ _id: post.userId }),
+
+ // Fetch all followers
+ Following.aggregate([{
+ $lookup: {
+ from: 'users',
+ localField: 'followerId',
+ foreignField: '_id',
+ as: 'follower'
+ }
+ }, {
+ $match: {
+ followeeId: post.userId
+ }
+ }], {
+ _id: false
+ })
+ ]).then(([user, followers]) => Promise.all(followers.map(following => {
+ if (isLocalUser(following.follower)) {
+ // Publish event to followers stream
+ return promisedPostObj.then(postObj => {
+ stream(following.followerId, 'post', postObj);
+ });
+ }
+
+ return new Promise((resolve, reject) => {
+ queue.create('http', {
+ type: 'deliverPost',
+ fromId: user._id,
+ toId: following.followerId,
+ postId: post._id
+ }).save(error => {
+ if (error) {
+ reject(error);
+ } else {
+ resolve();
+ }
+ });
+ });
+ })))
+ );
+ }
+
+ // チャンネルへの投稿
+ if (post.channelId) {
+ promises.push(
+ // Increment channel index(posts count)
+ Channel.update({ _id: post.channelId }, {
+ $inc: {
+ index: 1
+ }
+ }),
+
+ // Publish event to channel
+ promisedPostObj.then(postObj => {
+ publishChannelStream(post.channelId, 'post', postObj);
+ }),
+
+ Promise.all([
+ promisedPostObj,
+
+ // Get channel watchers
+ ChannelWatching.find({
+ channelId: post.channelId,
+ // 削除されたドキュメントは除く
+ deletedAt: { $exists: false }
+ })
+ ]).then(([postObj, watches]) => {
+ // チャンネルの視聴者(のタイムライン)に配信
+ watches.forEach(w => {
+ stream(w.userId, 'post', postObj);
+ });
+ })
+ );
+ }
+
// If has in reply to post
if (post.replyId) {
promises.push(
diff --git a/src/processor/http/deliver-post.ts b/src/processor/http/deliver-post.ts
index c00ab912c9..48ad4f95a1 100644
--- a/src/processor/http/deliver-post.ts
+++ b/src/processor/http/deliver-post.ts
@@ -1,93 +1,21 @@
-import Channel from '../../models/channel';
-import Following from '../../models/following';
-import ChannelWatching from '../../models/channel-watching';
-import Post, { pack } from '../../models/post';
-import User, { isLocalUser } from '../../models/user';
-import stream, { publishChannelStream } from '../../publishers/stream';
+import Post from '../../models/post';
+import User, { IRemoteUser } from '../../models/user';
import context from '../../remote/activitypub/renderer/context';
import renderCreate from '../../remote/activitypub/renderer/create';
import renderNote from '../../remote/activitypub/renderer/note';
import request from '../../remote/request';
-export default ({ data }) => Post.findOne({ _id: data.id }).then(post => {
- const promisedPostObj = pack(post);
- const promises = [];
+export default async ({ data }) => {
+ const promisedTo = User.findOne({ _id: data.toId }) as Promise<IRemoteUser>;
+ const [from, post] = await Promise.all([
+ User.findOne({ _id: data.fromId }),
+ Post.findOne({ _id: data.postId })
+ ]);
+ const note = await renderNote(from, post);
+ const to = await promisedTo;
+ const create = renderCreate(note);
- // タイムラインへの投稿
- if (!post.channelId) {
- promises.push(
- // Publish event to myself's stream
- promisedPostObj.then(postObj => {
- stream(post.userId, 'post', postObj);
- }),
+ create['@context'] = context;
- Promise.all([
- User.findOne({ _id: post.userId }),
-
- // Fetch all followers
- Following.aggregate([{
- $lookup: {
- from: 'users',
- localField: 'followerId',
- foreignField: '_id',
- as: 'follower'
- }
- }, {
- $match: {
- followeeId: post.userId
- }
- }], {
- _id: false
- })
- ]).then(([user, followers]) => Promise.all(followers.map(following => {
- if (isLocalUser(following.follower)) {
- // Publish event to followers stream
- return promisedPostObj.then(postObj => {
- stream(following.followerId, 'post', postObj);
- });
- }
-
- return renderNote(user, post).then(note => {
- const create = renderCreate(note);
- create['@context'] = context;
- return request(user, following.follower[0].account.inbox, create);
- });
- })))
- );
- }
-
- // チャンネルへの投稿
- if (post.channelId) {
- promises.push(
- // Increment channel index(posts count)
- Channel.update({ _id: post.channelId }, {
- $inc: {
- index: 1
- }
- }),
-
- // Publish event to channel
- promisedPostObj.then(postObj => {
- publishChannelStream(post.channelId, 'post', postObj);
- }),
-
- Promise.all([
- promisedPostObj,
-
- // Get channel watchers
- ChannelWatching.find({
- channelId: post.channelId,
- // 削除されたドキュメントは除く
- deletedAt: { $exists: false }
- })
- ]).then(([postObj, watches]) => {
- // チャンネルの視聴者(のタイムライン)に配信
- watches.forEach(w => {
- stream(w.userId, 'post', postObj);
- });
- })
- );
- }
-
- return Promise.all(promises);
-});
+ return request(from, to.account.inbox, create);
+};