From b6aeacdeb942beb7b5b2f6ac8cf4a89163e59153 Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 6 Apr 2018 03:10:25 +0900 Subject: RENAME: api --> services --- src/services/post/create.ts | 344 +++++++++++++++++++++++++++++++++++ src/services/post/reaction/create.ts | 0 src/services/post/watch.ts | 26 +++ 3 files changed, 370 insertions(+) create mode 100644 src/services/post/create.ts create mode 100644 src/services/post/reaction/create.ts create mode 100644 src/services/post/watch.ts (limited to 'src/services/post') diff --git a/src/services/post/create.ts b/src/services/post/create.ts new file mode 100644 index 0000000000..9723dbe452 --- /dev/null +++ b/src/services/post/create.ts @@ -0,0 +1,344 @@ +import Post, { pack, IPost } from '../../models/post'; +import User, { isLocalUser, IUser } from '../../models/user'; +import stream from '../../publishers/stream'; +import Following from '../../models/following'; +import { deliver } from '../../queue'; +import renderNote from '../../remote/activitypub/renderer/note'; +import renderCreate from '../../remote/activitypub/renderer/create'; +import context from '../../remote/activitypub/renderer/context'; +import { IDriveFile } from '../../models/drive-file'; +import notify from '../../publishers/notify'; +import PostWatching from '../../models/post-watching'; +import watch from './watch'; +import Mute from '../../models/mute'; +import pushSw from '../../publishers/push-sw'; +import event from '../../publishers/stream'; +import parse from '../../text/parse'; +import html from '../../text/html'; +import { IApp } from '../../models/app'; + +export default async (user: IUser, content: { + createdAt?: Date; + text?: string; + reply?: IPost; + repost?: IPost; + media?: IDriveFile[]; + geo?: any; + poll?: any; + viaMobile?: boolean; + tags?: string[]; + cw?: string; + visibility?: string; + uri?: string; + app?: IApp; +}, silent = false) => new Promise(async (res, rej) => { + if (content.createdAt == null) content.createdAt = new Date(); + if (content.visibility == null) content.visibility = 'public'; + + const tags = content.tags || []; + + let tokens = null; + + if (content.text) { + // Analyze + tokens = parse(content.text); + + // Extract hashtags + const hashtags = tokens + .filter(t => t.type == 'hashtag') + .map(t => t.hashtag); + + hashtags.forEach(tag => { + if (tags.indexOf(tag) == -1) { + tags.push(tag); + } + }); + } + + const data: any = { + createdAt: content.createdAt, + mediaIds: content.media ? content.media.map(file => file._id) : [], + replyId: content.reply ? content.reply._id : null, + repostId: content.repost ? content.repost._id : null, + text: content.text, + textHtml: tokens === null ? null : html(tokens), + poll: content.poll, + cw: content.cw, + tags, + userId: user._id, + viaMobile: content.viaMobile, + geo: content.geo || null, + appId: content.app ? content.app._id : null, + visibility: content.visibility, + + // 以下非正規化データ + _reply: content.reply ? { userId: content.reply.userId } : null, + _repost: content.repost ? { userId: content.repost.userId } : null, + }; + + if (content.uri != null) data.uri = content.uri; + + // 投稿を作成 + const post = await Post.insert(data); + + res(post); + + User.update({ _id: user._id }, { + // Increment posts count + $inc: { + postsCount: 1 + }, + // Update latest post + $set: { + latestPost: post + } + }); + + // Serialize + const postObj = await pack(post); + + // タイムラインへの投稿 + if (!post.channelId) { + // Publish event to myself's stream + if (isLocalUser(user)) { + stream(post.userId, 'post', postObj); + } + + // Fetch all followers + const followers = await Following.aggregate([{ + $lookup: { + from: 'users', + localField: 'followerId', + foreignField: '_id', + as: 'follower' + } + }, { + $match: { + followeeId: post.userId + } + }], { + _id: false + }); + + if (!silent) { + const note = await renderNote(user, post); + const content = renderCreate(note); + content['@context'] = context; + + Promise.all(followers.map(({ follower }) => { + if (isLocalUser(follower)) { + // Publish event to followers stream + stream(follower._id, 'post', postObj); + } else { + // フォロワーがリモートユーザーかつ投稿者がローカルユーザーなら投稿を配信 + if (isLocalUser(user)) { + deliver(user, content, follower.account.inbox).save(); + } + } + })); + } + } + + // チャンネルへの投稿 + /* TODO + if (post.channelId) { + promises.push( + // Increment channel index(posts count) + Channel.update({ _id: post.channelId }, { + $inc: { + index: 1 + } + }), + + // Publish event to channel + promisedPostObj.then(postObj => { + publishChannelStream(post.channelId, 'post', postObj); + }), + + Promise.all([ + promisedPostObj, + + // Get channel watchers + ChannelWatching.find({ + channelId: post.channelId, + // 削除されたドキュメントは除く + deletedAt: { $exists: false } + }) + ]).then(([postObj, watches]) => { + // チャンネルの視聴者(のタイムライン)に配信 + watches.forEach(w => { + stream(w.userId, 'post', postObj); + }); + }) + ); + }*/ + + const mentions = []; + + async function addMention(mentionee, reason) { + // Reject if already added + if (mentions.some(x => x.equals(mentionee))) return; + + // Add mention + mentions.push(mentionee); + + // Publish event + if (!user._id.equals(mentionee)) { + const mentioneeMutes = await Mute.find({ + muter_id: mentionee, + deleted_at: { $exists: false } + }); + const mentioneesMutedUserIds = mentioneeMutes.map(m => m.muteeId.toString()); + if (mentioneesMutedUserIds.indexOf(user._id.toString()) == -1) { + event(mentionee, reason, postObj); + pushSw(mentionee, reason, postObj); + } + } + } + + // If has in reply to post + if (content.reply) { + // Increment replies count + Post.update({ _id: content.reply._id }, { + $inc: { + repliesCount: 1 + } + }); + + // (自分自身へのリプライでない限りは)通知を作成 + notify(content.reply.userId, user._id, 'reply', { + postId: post._id + }); + + // Fetch watchers + PostWatching.find({ + postId: content.reply._id, + userId: { $ne: user._id }, + // 削除されたドキュメントは除く + deletedAt: { $exists: false } + }, { + fields: { + userId: true + } + }).then(watchers => { + watchers.forEach(watcher => { + notify(watcher.userId, user._id, 'reply', { + postId: post._id + }); + }); + }); + + // この投稿をWatchする + if (isLocalUser(user) && user.account.settings.autoWatch !== false) { + watch(user._id, content.reply); + } + + // Add mention + addMention(content.reply.userId, 'reply'); + } + + // If it is repost + if (content.repost) { + // Notify + const type = content.text ? 'quote' : 'repost'; + notify(content.repost.userId, user._id, type, { + post_id: post._id + }); + + // Fetch watchers + PostWatching.find({ + postId: content.repost._id, + userId: { $ne: user._id }, + // 削除されたドキュメントは除く + deletedAt: { $exists: false } + }, { + fields: { + userId: true + } + }).then(watchers => { + watchers.forEach(watcher => { + notify(watcher.userId, user._id, type, { + postId: post._id + }); + }); + }); + + // この投稿をWatchする + if (isLocalUser(user) && user.account.settings.autoWatch !== false) { + watch(user._id, content.repost); + } + + // If it is quote repost + if (content.text) { + // Add mention + addMention(content.repost.userId, 'quote'); + } else { + // Publish event + if (!user._id.equals(content.repost.userId)) { + event(content.repost.userId, 'repost', postObj); + } + } + + // 今までで同じ投稿をRepostしているか + const existRepost = await Post.findOne({ + userId: user._id, + repostId: content.repost._id, + _id: { + $ne: post._id + } + }); + + if (!existRepost) { + // Update repostee status + Post.update({ _id: content.repost._id }, { + $inc: { + repostCount: 1 + } + }); + } + } + + // If has text content + if (content.text) { + // Extract an '@' mentions + const atMentions = tokens + .filter(t => t.type == 'mention') + .map(m => m.username) + // Drop dupulicates + .filter((v, i, s) => s.indexOf(v) == i); + + // Resolve all mentions + await Promise.all(atMentions.map(async mention => { + // Fetch mentioned user + // SELECT _id + const mentionee = await User + .findOne({ + usernameLower: mention.toLowerCase() + }, { _id: true }); + + // When mentioned user not found + if (mentionee == null) return; + + // 既に言及されたユーザーに対する返信や引用repostの場合も無視 + if (content.reply && content.reply.userId.equals(mentionee._id)) return; + if (content.repost && content.repost.userId.equals(mentionee._id)) return; + + // Add mention + addMention(mentionee._id, 'mention'); + + // Create notification + notify(mentionee._id, user._id, 'mention', { + post_id: post._id + }); + })); + } + + // Append mentions data + if (mentions.length > 0) { + Post.update({ _id: post._id }, { + $set: { + mentions + } + }); + } +}); diff --git a/src/services/post/reaction/create.ts b/src/services/post/reaction/create.ts new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/services/post/watch.ts b/src/services/post/watch.ts new file mode 100644 index 0000000000..bbd9976f40 --- /dev/null +++ b/src/services/post/watch.ts @@ -0,0 +1,26 @@ +import * as mongodb from 'mongodb'; +import Watching from '../../models/post-watching'; + +export default async (me: mongodb.ObjectID, post: object) => { + // 自分の投稿はwatchできない + if (me.equals((post as any).userId)) { + return; + } + + // if watching now + const exist = await Watching.findOne({ + postId: (post as any)._id, + userId: me, + deletedAt: { $exists: false } + }); + + if (exist !== null) { + return; + } + + await Watching.insert({ + createdAt: new Date(), + postId: (post as any)._id, + userId: me + }); +}; -- cgit v1.2.3-freya From 0154e44e1d02829e8f35fa131005448f694e745e Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 6 Apr 2018 03:42:55 +0900 Subject: Fix bugs --- src/queue/processors/http/index.ts | 3 ++- src/services/post/create.ts | 8 +++++--- 2 files changed, 7 insertions(+), 4 deletions(-) (limited to 'src/services/post') diff --git a/src/queue/processors/http/index.ts b/src/queue/processors/http/index.ts index 3d7d941b1a..61d7f9ac94 100644 --- a/src/queue/processors/http/index.ts +++ b/src/queue/processors/http/index.ts @@ -12,8 +12,9 @@ export default (job, done) => { const handler = handlers[job.data.type]; if (handler) { - handler(job).then(() => done(), done); + handler(job, done); } else { console.warn(`Unknown job: ${job.data.type}`); + done(); } }; diff --git a/src/services/post/create.ts b/src/services/post/create.ts index 9723dbe452..405e4a2f7b 100644 --- a/src/services/post/create.ts +++ b/src/services/post/create.ts @@ -98,7 +98,7 @@ export default async (user: IUser, content: { const postObj = await pack(post); // タイムラインへの投稿 - if (!post.channelId) { + if (post.channelId == null) { // Publish event to myself's stream if (isLocalUser(user)) { stream(post.userId, 'post', postObj); @@ -110,7 +110,7 @@ export default async (user: IUser, content: { from: 'users', localField: 'followerId', foreignField: '_id', - as: 'follower' + as: 'user' } }, { $match: { @@ -125,7 +125,9 @@ export default async (user: IUser, content: { const content = renderCreate(note); content['@context'] = context; - Promise.all(followers.map(({ follower }) => { + Promise.all(followers.map(follower => { + follower = follower.user[0]; + if (isLocalUser(follower)) { // Publish event to followers stream stream(follower._id, 'post', postObj); -- cgit v1.2.3-freya From a6fb4f2e33b5bcbda5c9461d3c55e3d1c956b2c9 Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 6 Apr 2018 04:04:50 +0900 Subject: wip --- src/models/post.ts | 14 +++++++ src/services/post/create.ts | 98 +++++++++++++++++++++++++-------------------- 2 files changed, 69 insertions(+), 43 deletions(-) (limited to 'src/services/post') diff --git a/src/models/post.ts b/src/models/post.ts index 2f2b51b946..68a638fa2f 100644 --- a/src/models/post.ts +++ b/src/models/post.ts @@ -52,6 +52,20 @@ export type IPost = { speed: number; }; uri: string; + + _reply?: { + userId: mongo.ObjectID; + }; + _repost?: { + userId: mongo.ObjectID; + }; + _user: { + host: string; + hostLower: string; + account: { + inbox?: string; + }; + }; }; /** diff --git a/src/services/post/create.ts b/src/services/post/create.ts index 405e4a2f7b..745683b518 100644 --- a/src/services/post/create.ts +++ b/src/services/post/create.ts @@ -1,5 +1,5 @@ import Post, { pack, IPost } from '../../models/post'; -import User, { isLocalUser, IUser } from '../../models/user'; +import User, { isLocalUser, IUser, isRemoteUser } from '../../models/user'; import stream from '../../publishers/stream'; import Following from '../../models/following'; import { deliver } from '../../queue'; @@ -17,7 +17,7 @@ import parse from '../../text/parse'; import html from '../../text/html'; import { IApp } from '../../models/app'; -export default async (user: IUser, content: { +export default async (user: IUser, data: { createdAt?: Date; text?: string; reply?: IPost; @@ -32,16 +32,16 @@ export default async (user: IUser, content: { uri?: string; app?: IApp; }, silent = false) => new Promise(async (res, rej) => { - if (content.createdAt == null) content.createdAt = new Date(); - if (content.visibility == null) content.visibility = 'public'; + if (data.createdAt == null) data.createdAt = new Date(); + if (data.visibility == null) data.visibility = 'public'; - const tags = content.tags || []; + const tags = data.tags || []; let tokens = null; - if (content.text) { + if (data.text) { // Analyze - tokens = parse(content.text); + tokens = parse(data.text); // Extract hashtags const hashtags = tokens @@ -55,31 +55,38 @@ export default async (user: IUser, content: { }); } - const data: any = { - createdAt: content.createdAt, - mediaIds: content.media ? content.media.map(file => file._id) : [], - replyId: content.reply ? content.reply._id : null, - repostId: content.repost ? content.repost._id : null, - text: content.text, + const insert: any = { + createdAt: data.createdAt, + mediaIds: data.media ? data.media.map(file => file._id) : [], + replyId: data.reply ? data.reply._id : null, + repostId: data.repost ? data.repost._id : null, + text: data.text, textHtml: tokens === null ? null : html(tokens), - poll: content.poll, - cw: content.cw, + poll: data.poll, + cw: data.cw, tags, userId: user._id, - viaMobile: content.viaMobile, - geo: content.geo || null, - appId: content.app ? content.app._id : null, - visibility: content.visibility, + viaMobile: data.viaMobile, + geo: data.geo || null, + appId: data.app ? data.app._id : null, + visibility: data.visibility, // 以下非正規化データ - _reply: content.reply ? { userId: content.reply.userId } : null, - _repost: content.repost ? { userId: content.repost.userId } : null, + _reply: data.reply ? { userId: data.reply.userId } : null, + _repost: data.repost ? { userId: data.repost.userId } : null, + _user: { + host: user.host, + hostLower: user.hostLower, + account: isLocalUser(user) ? {} : { + inbox: user.account.inbox + } + } }; - if (content.uri != null) data.uri = content.uri; + if (data.uri != null) insert.uri = data.uri; // 投稿を作成 - const post = await Post.insert(data); + const post = await Post.insert(insert); res(post); @@ -125,6 +132,11 @@ export default async (user: IUser, content: { const content = renderCreate(note); content['@context'] = context; + // 投稿がリプライかつ投稿者がローカルユーザーかつリプライ先の投稿の投稿者がリモートユーザーなら配送 + if (data.reply && isLocalUser(user) && isRemoteUser(data.reply._user)) { + deliver(user, content, data.reply._user.account.inbox).save(); + } + Promise.all(followers.map(follower => { follower = follower.user[0]; @@ -199,22 +211,22 @@ export default async (user: IUser, content: { } // If has in reply to post - if (content.reply) { + if (data.reply) { // Increment replies count - Post.update({ _id: content.reply._id }, { + Post.update({ _id: data.reply._id }, { $inc: { repliesCount: 1 } }); // (自分自身へのリプライでない限りは)通知を作成 - notify(content.reply.userId, user._id, 'reply', { + notify(data.reply.userId, user._id, 'reply', { postId: post._id }); // Fetch watchers PostWatching.find({ - postId: content.reply._id, + postId: data.reply._id, userId: { $ne: user._id }, // 削除されたドキュメントは除く deletedAt: { $exists: false } @@ -232,24 +244,24 @@ export default async (user: IUser, content: { // この投稿をWatchする if (isLocalUser(user) && user.account.settings.autoWatch !== false) { - watch(user._id, content.reply); + watch(user._id, data.reply); } // Add mention - addMention(content.reply.userId, 'reply'); + addMention(data.reply.userId, 'reply'); } // If it is repost - if (content.repost) { + if (data.repost) { // Notify - const type = content.text ? 'quote' : 'repost'; - notify(content.repost.userId, user._id, type, { + const type = data.text ? 'quote' : 'repost'; + notify(data.repost.userId, user._id, type, { post_id: post._id }); // Fetch watchers PostWatching.find({ - postId: content.repost._id, + postId: data.repost._id, userId: { $ne: user._id }, // 削除されたドキュメントは除く deletedAt: { $exists: false } @@ -267,24 +279,24 @@ export default async (user: IUser, content: { // この投稿をWatchする if (isLocalUser(user) && user.account.settings.autoWatch !== false) { - watch(user._id, content.repost); + watch(user._id, data.repost); } // If it is quote repost - if (content.text) { + if (data.text) { // Add mention - addMention(content.repost.userId, 'quote'); + addMention(data.repost.userId, 'quote'); } else { // Publish event - if (!user._id.equals(content.repost.userId)) { - event(content.repost.userId, 'repost', postObj); + if (!user._id.equals(data.repost.userId)) { + event(data.repost.userId, 'repost', postObj); } } // 今までで同じ投稿をRepostしているか const existRepost = await Post.findOne({ userId: user._id, - repostId: content.repost._id, + repostId: data.repost._id, _id: { $ne: post._id } @@ -292,7 +304,7 @@ export default async (user: IUser, content: { if (!existRepost) { // Update repostee status - Post.update({ _id: content.repost._id }, { + Post.update({ _id: data.repost._id }, { $inc: { repostCount: 1 } @@ -301,7 +313,7 @@ export default async (user: IUser, content: { } // If has text content - if (content.text) { + if (data.text) { // Extract an '@' mentions const atMentions = tokens .filter(t => t.type == 'mention') @@ -322,8 +334,8 @@ export default async (user: IUser, content: { if (mentionee == null) return; // 既に言及されたユーザーに対する返信や引用repostの場合も無視 - if (content.reply && content.reply.userId.equals(mentionee._id)) return; - if (content.repost && content.repost.userId.equals(mentionee._id)) return; + if (data.reply && data.reply.userId.equals(mentionee._id)) return; + if (data.repost && data.repost.userId.equals(mentionee._id)) return; // Add mention addMention(mentionee._id, 'mention'); -- cgit v1.2.3-freya From 9c15c94f801de7019f014446aa3b8ea42980a1da Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 6 Apr 2018 19:18:38 +0900 Subject: Remove silent flag --- src/remote/activitypub/act/create.ts | 6 +++--- src/services/post/create.ts | 7 +++++-- 2 files changed, 8 insertions(+), 5 deletions(-) (limited to 'src/services/post') diff --git a/src/remote/activitypub/act/create.ts b/src/remote/activitypub/act/create.ts index 10083995d1..1b9bad8ff5 100644 --- a/src/remote/activitypub/act/create.ts +++ b/src/remote/activitypub/act/create.ts @@ -58,7 +58,7 @@ async function createImage(resolver: Resolver, actor: IRemoteUser, image) { return await uploadFromUrl(image.url, actor); } -async function createNote(resolver: Resolver, actor: IRemoteUser, note, silent = false) { +async function createNote(resolver: Resolver, actor: IRemoteUser, note) { if ( ('attributedTo' in note && actor.account.uri !== note.attributedTo) || typeof note.id !== 'string' @@ -86,7 +86,7 @@ async function createNote(resolver: Resolver, actor: IRemoteUser, note, silent = const inReplyTo = await resolver.resolve(note.inReplyTo) as any; const actor = await resolvePerson(inReplyTo.attributedTo); if (isRemoteUser(actor)) { - reply = await createNote(resolver, actor, inReplyTo, true); + reply = await createNote(resolver, actor, inReplyTo); } } } @@ -102,5 +102,5 @@ async function createNote(resolver: Resolver, actor: IRemoteUser, note, silent = viaMobile: false, geo: undefined, uri: note.id - }, silent); + }); } diff --git a/src/services/post/create.ts b/src/services/post/create.ts index 745683b518..0bede2772d 100644 --- a/src/services/post/create.ts +++ b/src/services/post/create.ts @@ -31,7 +31,7 @@ export default async (user: IUser, data: { visibility?: string; uri?: string; app?: IApp; -}, silent = false) => new Promise(async (res, rej) => { +}) => new Promise(async (res, rej) => { if (data.createdAt == null) data.createdAt = new Date(); if (data.visibility == null) data.visibility = 'public'; @@ -127,7 +127,10 @@ export default async (user: IUser, data: { _id: false }); - if (!silent) { + // この投稿が3分以内に作成されたものであるならストリームに配信 + const shouldDistribute = new Date().getTime() - post.createdAt.getTime() < 1000 * 60 * 3; + + if (shouldDistribute) { const note = await renderNote(user, post); const content = renderCreate(note); content['@context'] = context; -- cgit v1.2.3-freya From a01251477ee5ab4766810453dd540170e65c02b2 Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 6 Apr 2018 20:45:33 +0900 Subject: Revert "Remove silent flag" This reverts commit 9c15c94f801de7019f014446aa3b8ea42980a1da. --- src/remote/activitypub/act/create/note.ts | 2 +- src/services/post/create.ts | 7 ++----- 2 files changed, 3 insertions(+), 6 deletions(-) (limited to 'src/services/post') diff --git a/src/remote/activitypub/act/create/note.ts b/src/remote/activitypub/act/create/note.ts index 2ccd503aeb..d50042e163 100644 --- a/src/remote/activitypub/act/create/note.ts +++ b/src/remote/activitypub/act/create/note.ts @@ -10,7 +10,7 @@ import createImage from './image'; const log = debug('misskey:activitypub'); -export default async function createNote(resolver: Resolver, actor: IRemoteUser, note): Promise { +export default async function createNote(resolver: Resolver, actor: IRemoteUser, note, silent = false): Promise { if ( ('attributedTo' in note && actor.account.uri !== note.attributedTo) || typeof note.id !== 'string' diff --git a/src/services/post/create.ts b/src/services/post/create.ts index 0bede2772d..745683b518 100644 --- a/src/services/post/create.ts +++ b/src/services/post/create.ts @@ -31,7 +31,7 @@ export default async (user: IUser, data: { visibility?: string; uri?: string; app?: IApp; -}) => new Promise(async (res, rej) => { +}, silent = false) => new Promise(async (res, rej) => { if (data.createdAt == null) data.createdAt = new Date(); if (data.visibility == null) data.visibility = 'public'; @@ -127,10 +127,7 @@ export default async (user: IUser, data: { _id: false }); - // この投稿が3分以内に作成されたものであるならストリームに配信 - const shouldDistribute = new Date().getTime() - post.createdAt.getTime() < 1000 * 60 * 3; - - if (shouldDistribute) { + if (!silent) { const note = await renderNote(user, post); const content = renderCreate(note); content['@context'] = context; -- cgit v1.2.3-freya