diff options
Diffstat (limited to 'src/services')
| -rw-r--r-- | src/services/drive/add-file.ts | 314 | ||||
| -rw-r--r-- | src/services/drive/upload-from-url.ts | 52 | ||||
| -rw-r--r-- | src/services/following/create.ts | 72 | ||||
| -rw-r--r-- | src/services/following/delete.ts | 64 | ||||
| -rw-r--r-- | src/services/post/create.ts | 344 | ||||
| -rw-r--r-- | src/services/post/reaction/create.ts | 0 | ||||
| -rw-r--r-- | src/services/post/watch.ts | 26 |
7 files changed, 872 insertions, 0 deletions
diff --git a/src/services/drive/add-file.ts b/src/services/drive/add-file.ts new file mode 100644 index 0000000000..64a2f18340 --- /dev/null +++ b/src/services/drive/add-file.ts @@ -0,0 +1,314 @@ +import { Buffer } from 'buffer'; +import * as fs from 'fs'; +import * as tmp from 'tmp'; +import * as stream from 'stream'; + +import * as mongodb from 'mongodb'; +import * as crypto from 'crypto'; +import * as _gm from 'gm'; +import * as debug from 'debug'; +import fileType = require('file-type'); +import prominence = require('prominence'); + +import DriveFile, { IMetadata, getGridFSBucket } from '../../models/drive-file'; +import DriveFolder from '../../models/drive-folder'; +import { pack } from '../../models/drive-file'; +import event, { publishDriveStream } from '../../publishers/stream'; +import getAcct from '../../acct/render'; +import config from '../../config'; + +const gm = _gm.subClass({ + imageMagick: true +}); + +const log = debug('misskey:drive:add-file'); + +const tmpFile = (): Promise<string> => new Promise((resolve, reject) => { + tmp.file((e, path) => { + if (e) return reject(e); + resolve(path); + }); +}); + +const addToGridFS = (name: string, readable: stream.Readable, type: string, metadata: any): Promise<any> => + getGridFSBucket() + .then(bucket => new Promise((resolve, reject) => { + const writeStream = bucket.openUploadStream(name, { contentType: type, metadata }); + writeStream.once('finish', (doc) => { resolve(doc); }); + writeStream.on('error', reject); + readable.pipe(writeStream); + })); + +const addFile = async ( + user: any, + path: string, + name: string = null, + comment: string = null, + folderId: mongodb.ObjectID = null, + force: boolean = false, + uri: string = null +) => { + log(`registering ${name} (user: ${getAcct(user)}, path: ${path})`); + + // Calculate hash, get content type and get file size + const [hash, [mime, ext], size] = await Promise.all([ + // hash + ((): Promise<string> => new Promise((res, rej) => { + const readable = fs.createReadStream(path); + const hash = crypto.createHash('md5'); + const chunks = []; + readable + .on('error', rej) + .pipe(hash) + .on('error', rej) + .on('data', (chunk) => chunks.push(chunk)) + .on('end', () => { + const buffer = Buffer.concat(chunks); + res(buffer.toString('hex')); + }); + }))(), + // mime + ((): Promise<[string, string | null]> => new Promise((res, rej) => { + const readable = fs.createReadStream(path); + readable + .on('error', rej) + .once('data', (buffer: Buffer) => { + readable.destroy(); + const type = fileType(buffer); + if (type) { + return res([type.mime, type.ext]); + } else { + // 種類が同定できなかったら application/octet-stream にする + return res(['application/octet-stream', null]); + } + }); + }))(), + // size + ((): Promise<number> => new Promise((res, rej) => { + fs.stat(path, (err, stats) => { + if (err) return rej(err); + res(stats.size); + }); + }))() + ]); + + log(`hash: ${hash}, mime: ${mime}, ext: ${ext}, size: ${size}`); + + // detect name + const detectedName: string = name || (ext ? `untitled.${ext}` : 'untitled'); + + if (!force) { + // Check if there is a file with the same hash + const much = await DriveFile.findOne({ + md5: hash, + 'metadata.userId': user._id + }); + + if (much !== null) { + log('file with same hash is found'); + return much; + } else { + log('file with same hash is not found'); + } + } + + const [wh, averageColor, folder] = await Promise.all([ + // Width and height (when image) + (async () => { + // 画像かどうか + if (!/^image\/.*$/.test(mime)) { + return null; + } + + const imageType = mime.split('/')[1]; + + // 画像でもPNGかJPEGかGIFでないならスキップ + if (imageType != 'png' && imageType != 'jpeg' && imageType != 'gif') { + return null; + } + + log('calculate image width and height...'); + + // Calculate width and height + const g = gm(fs.createReadStream(path), name); + const size = await prominence(g).size(); + + log(`image width and height is calculated: ${size.width}, ${size.height}`); + + return [size.width, size.height]; + })(), + // average color (when image) + (async () => { + // 画像かどうか + if (!/^image\/.*$/.test(mime)) { + return null; + } + + const imageType = mime.split('/')[1]; + + // 画像でもPNGかJPEGでないならスキップ + if (imageType != 'png' && imageType != 'jpeg') { + return null; + } + + log('calculate average color...'); + + const buffer = await prominence(gm(fs.createReadStream(path), name) + .setFormat('ppm') + .resize(1, 1)) // 1pxのサイズに縮小して平均色を取得するというハック + .toBuffer(); + + const r = buffer.readUInt8(buffer.length - 3); + const g = buffer.readUInt8(buffer.length - 2); + const b = buffer.readUInt8(buffer.length - 1); + + log(`average color is calculated: ${r}, ${g}, ${b}`); + + return [r, g, b]; + })(), + // folder + (async () => { + if (!folderId) { + return null; + } + const driveFolder = await DriveFolder.findOne({ + _id: folderId, + userId: user._id + }); + if (!driveFolder) { + throw 'folder-not-found'; + } + return driveFolder; + })(), + // usage checker + (async () => { + // Calculate drive usage + const usage = await DriveFile + .aggregate([{ + $match: { 'metadata.userId': user._id } + }, { + $project: { + length: true + } + }, { + $group: { + _id: null, + usage: { $sum: '$length' } + } + }]) + .then((aggregates: any[]) => { + if (aggregates.length > 0) { + return aggregates[0].usage; + } + return 0; + }); + + log(`drive usage is ${usage}`); + + // If usage limit exceeded + if (usage + size > user.driveCapacity) { + throw 'no-free-space'; + } + })() + ]); + + const readable = fs.createReadStream(path); + + const properties = {}; + + if (wh) { + properties['width'] = wh[0]; + properties['height'] = wh[1]; + } + + if (averageColor) { + properties['avgColor'] = averageColor; + } + + const metadata = { + userId: user._id, + folderId: folder !== null ? folder._id : null, + comment: comment, + properties: properties + } as IMetadata; + + if (uri !== null) { + metadata.uri = uri; + } + + return addToGridFS(detectedName, readable, mime, metadata); +}; + +/** + * Add file to drive + * + * @param user User who wish to add file + * @param file File path or readableStream + * @param comment Comment + * @param type File type + * @param folderId Folder ID + * @param force If set to true, forcibly upload the file even if there is a file with the same hash. + * @return Object that represents added file + */ +export default (user: any, file: string | stream.Readable, ...args) => new Promise<any>((resolve, reject) => { + // Get file path + new Promise((res: (v: [string, boolean]) => void, rej) => { + if (typeof file === 'string') { + res([file, false]); + return; + } + if (typeof file === 'object' && typeof file.read === 'function') { + tmpFile() + .then(path => { + const readable: stream.Readable = file; + const writable = fs.createWriteStream(path); + readable + .on('error', rej) + .on('end', () => { + res([path, true]); + }) + .pipe(writable) + .on('error', rej); + }) + .catch(rej); + } + rej(new Error('un-compatible file.')); + }) + .then(([path, shouldCleanup]): Promise<any> => new Promise((res, rej) => { + addFile(user, path, ...args) + .then(file => { + res(file); + if (shouldCleanup) { + fs.unlink(path, (e) => { + if (e) log(e.stack); + }); + } + }) + .catch(rej); + })) + .then(file => { + log(`drive file has been created ${file._id}`); + resolve(file); + + pack(file).then(serializedFile => { + // Publish drive_file_created event + event(user._id, 'drive_file_created', serializedFile); + publishDriveStream(user._id, 'file_created', serializedFile); + + // Register to search database + if (config.elasticsearch.enable) { + const es = require('../db/elasticsearch'); + es.index({ + index: 'misskey', + type: 'drive_file', + id: file._id.toString(), + body: { + name: file.name, + userId: user._id.toString() + } + }); + } + }); + }) + .catch(reject); +}); diff --git a/src/services/drive/upload-from-url.ts b/src/services/drive/upload-from-url.ts new file mode 100644 index 0000000000..676586cd15 --- /dev/null +++ b/src/services/drive/upload-from-url.ts @@ -0,0 +1,52 @@ +import * as URL from 'url'; +import { IDriveFile, validateFileName } from '../../models/drive-file'; +import create from './add-file'; +import * as debug from 'debug'; +import * as tmp from 'tmp'; +import * as fs from 'fs'; +import * as request from 'request'; + +const log = debug('misskey:drive:upload-from-url'); + +export default async (url, user, folderId = null, uri = null): Promise<IDriveFile> => { + log(`REQUESTED: ${url}`); + + let name = URL.parse(url).pathname.split('/').pop(); + if (!validateFileName(name)) { + name = null; + } + + log(`name: ${name}`); + + // Create temp file + const path = await new Promise((res: (string) => void, rej) => { + tmp.file((e, path) => { + if (e) return rej(e); + res(path); + }); + }); + + // write content at URL to temp file + await new Promise((res, rej) => { + const writable = fs.createWriteStream(path); + request(url) + .on('error', rej) + .on('end', () => { + writable.close(); + res(path); + }) + .pipe(writable) + .on('error', rej); + }); + + const driveFile = await create(user, path, name, null, folderId, false, uri); + + log(`created: ${driveFile._id}`); + + // clean-up + fs.unlink(path, (e) => { + if (e) log(e.stack); + }); + + return driveFile; +}; diff --git a/src/services/following/create.ts b/src/services/following/create.ts new file mode 100644 index 0000000000..d919f4487f --- /dev/null +++ b/src/services/following/create.ts @@ -0,0 +1,72 @@ +import User, { isLocalUser, isRemoteUser, pack as packUser, IUser } from '../../models/user'; +import Following from '../../models/following'; +import FollowingLog from '../../models/following-log'; +import FollowedLog from '../../models/followed-log'; +import event from '../../publishers/stream'; +import notify from '../../publishers/notify'; +import context from '../../remote/activitypub/renderer/context'; +import renderFollow from '../../remote/activitypub/renderer/follow'; +import renderAccept from '../../remote/activitypub/renderer/accept'; +import { deliver } from '../../queue'; + +export default async function(follower: IUser, followee: IUser, activity?) { + const following = await Following.insert({ + createdAt: new Date(), + followerId: follower._id, + followeeId: followee._id + }); + + //#region Increment following count + User.update({ _id: follower._id }, { + $inc: { + followingCount: 1 + } + }); + + FollowingLog.insert({ + createdAt: following.createdAt, + userId: follower._id, + count: follower.followingCount + 1 + }); + //#endregion + + //#region Increment followers count + User.update({ _id: followee._id }, { + $inc: { + followersCount: 1 + } + }); + FollowedLog.insert({ + createdAt: following.createdAt, + userId: followee._id, + count: followee.followersCount + 1 + }); + //#endregion + + // Publish follow event + if (isLocalUser(follower)) { + packUser(followee, follower).then(packed => event(follower._id, 'follow', packed)); + } + + // Publish followed event + if (isLocalUser(followee)) { + packUser(follower, followee).then(packed => event(followee._id, 'followed', packed)), + + // 通知を作成 + notify(followee._id, follower._id, 'follow'); + } + + if (isLocalUser(follower) && isRemoteUser(followee)) { + const content = renderFollow(follower, followee); + content['@context'] = context; + + deliver(follower, content, followee.account.inbox).save(); + } + + if (isRemoteUser(follower) && isLocalUser(followee)) { + const content = renderAccept(activity); + content['@context'] = context; + + deliver(followee, content, follower.account.inbox).save(); + } +} diff --git a/src/services/following/delete.ts b/src/services/following/delete.ts new file mode 100644 index 0000000000..364a4803b9 --- /dev/null +++ b/src/services/following/delete.ts @@ -0,0 +1,64 @@ +import User, { isLocalUser, isRemoteUser, pack as packUser, IUser } from '../../models/user'; +import Following from '../../models/following'; +import FollowingLog from '../../models/following-log'; +import FollowedLog from '../../models/followed-log'; +import event from '../../publishers/stream'; +import context from '../../remote/activitypub/renderer/context'; +import renderFollow from '../../remote/activitypub/renderer/follow'; +import renderUndo from '../../remote/activitypub/renderer/undo'; +import { deliver } from '../../queue'; + +export default async function(follower: IUser, followee: IUser, activity?) { + const following = await Following.findOne({ + followerId: follower._id, + followeeId: followee._id + }); + + if (following == null) { + console.warn('フォロー解除がリクエストされましたがフォローしていませんでした'); + return; + } + + Following.remove({ + _id: following._id + }); + + //#region Decrement following count + User.update({ _id: follower._id }, { + $inc: { + followingCount: -1 + } + }); + + FollowingLog.insert({ + createdAt: following.createdAt, + userId: follower._id, + count: follower.followingCount - 1 + }); + //#endregion + + //#region Decrement followers count + User.update({ _id: followee._id }, { + $inc: { + followersCount: -1 + } + }); + FollowedLog.insert({ + createdAt: following.createdAt, + userId: followee._id, + count: followee.followersCount - 1 + }); + //#endregion + + // Publish unfollow event + if (isLocalUser(follower)) { + packUser(followee, follower).then(packed => event(follower._id, 'unfollow', packed)); + } + + if (isLocalUser(follower) && isRemoteUser(followee)) { + const content = renderUndo(renderFollow(follower, followee)); + content['@context'] = context; + + deliver(follower, content, followee.account.inbox).save(); + } +} diff --git a/src/services/post/create.ts b/src/services/post/create.ts new file mode 100644 index 0000000000..9723dbe452 --- /dev/null +++ b/src/services/post/create.ts @@ -0,0 +1,344 @@ +import Post, { pack, IPost } from '../../models/post'; +import User, { isLocalUser, IUser } from '../../models/user'; +import stream from '../../publishers/stream'; +import Following from '../../models/following'; +import { deliver } from '../../queue'; +import renderNote from '../../remote/activitypub/renderer/note'; +import renderCreate from '../../remote/activitypub/renderer/create'; +import context from '../../remote/activitypub/renderer/context'; +import { IDriveFile } from '../../models/drive-file'; +import notify from '../../publishers/notify'; +import PostWatching from '../../models/post-watching'; +import watch from './watch'; +import Mute from '../../models/mute'; +import pushSw from '../../publishers/push-sw'; +import event from '../../publishers/stream'; +import parse from '../../text/parse'; +import html from '../../text/html'; +import { IApp } from '../../models/app'; + +export default async (user: IUser, content: { + createdAt?: Date; + text?: string; + reply?: IPost; + repost?: IPost; + media?: IDriveFile[]; + geo?: any; + poll?: any; + viaMobile?: boolean; + tags?: string[]; + cw?: string; + visibility?: string; + uri?: string; + app?: IApp; +}, silent = false) => new Promise<IPost>(async (res, rej) => { + if (content.createdAt == null) content.createdAt = new Date(); + if (content.visibility == null) content.visibility = 'public'; + + const tags = content.tags || []; + + let tokens = null; + + if (content.text) { + // Analyze + tokens = parse(content.text); + + // Extract hashtags + const hashtags = tokens + .filter(t => t.type == 'hashtag') + .map(t => t.hashtag); + + hashtags.forEach(tag => { + if (tags.indexOf(tag) == -1) { + tags.push(tag); + } + }); + } + + const data: any = { + createdAt: content.createdAt, + mediaIds: content.media ? content.media.map(file => file._id) : [], + replyId: content.reply ? content.reply._id : null, + repostId: content.repost ? content.repost._id : null, + text: content.text, + textHtml: tokens === null ? null : html(tokens), + poll: content.poll, + cw: content.cw, + tags, + userId: user._id, + viaMobile: content.viaMobile, + geo: content.geo || null, + appId: content.app ? content.app._id : null, + visibility: content.visibility, + + // 以下非正規化データ + _reply: content.reply ? { userId: content.reply.userId } : null, + _repost: content.repost ? { userId: content.repost.userId } : null, + }; + + if (content.uri != null) data.uri = content.uri; + + // 投稿を作成 + const post = await Post.insert(data); + + res(post); + + User.update({ _id: user._id }, { + // Increment posts count + $inc: { + postsCount: 1 + }, + // Update latest post + $set: { + latestPost: post + } + }); + + // Serialize + const postObj = await pack(post); + + // タイムラインへの投稿 + if (!post.channelId) { + // Publish event to myself's stream + if (isLocalUser(user)) { + stream(post.userId, 'post', postObj); + } + + // Fetch all followers + const followers = await Following.aggregate([{ + $lookup: { + from: 'users', + localField: 'followerId', + foreignField: '_id', + as: 'follower' + } + }, { + $match: { + followeeId: post.userId + } + }], { + _id: false + }); + + if (!silent) { + const note = await renderNote(user, post); + const content = renderCreate(note); + content['@context'] = context; + + Promise.all(followers.map(({ follower }) => { + if (isLocalUser(follower)) { + // Publish event to followers stream + stream(follower._id, 'post', postObj); + } else { + // フォロワーがリモートユーザーかつ投稿者がローカルユーザーなら投稿を配信 + if (isLocalUser(user)) { + deliver(user, content, follower.account.inbox).save(); + } + } + })); + } + } + + // チャンネルへの投稿 + /* TODO + if (post.channelId) { + promises.push( + // Increment channel index(posts count) + Channel.update({ _id: post.channelId }, { + $inc: { + index: 1 + } + }), + + // Publish event to channel + promisedPostObj.then(postObj => { + publishChannelStream(post.channelId, 'post', postObj); + }), + + Promise.all([ + promisedPostObj, + + // Get channel watchers + ChannelWatching.find({ + channelId: post.channelId, + // 削除されたドキュメントは除く + deletedAt: { $exists: false } + }) + ]).then(([postObj, watches]) => { + // チャンネルの視聴者(のタイムライン)に配信 + watches.forEach(w => { + stream(w.userId, 'post', postObj); + }); + }) + ); + }*/ + + const mentions = []; + + async function addMention(mentionee, reason) { + // Reject if already added + if (mentions.some(x => x.equals(mentionee))) return; + + // Add mention + mentions.push(mentionee); + + // Publish event + if (!user._id.equals(mentionee)) { + const mentioneeMutes = await Mute.find({ + muter_id: mentionee, + deleted_at: { $exists: false } + }); + const mentioneesMutedUserIds = mentioneeMutes.map(m => m.muteeId.toString()); + if (mentioneesMutedUserIds.indexOf(user._id.toString()) == -1) { + event(mentionee, reason, postObj); + pushSw(mentionee, reason, postObj); + } + } + } + + // If has in reply to post + if (content.reply) { + // Increment replies count + Post.update({ _id: content.reply._id }, { + $inc: { + repliesCount: 1 + } + }); + + // (自分自身へのリプライでない限りは)通知を作成 + notify(content.reply.userId, user._id, 'reply', { + postId: post._id + }); + + // Fetch watchers + PostWatching.find({ + postId: content.reply._id, + userId: { $ne: user._id }, + // 削除されたドキュメントは除く + deletedAt: { $exists: false } + }, { + fields: { + userId: true + } + }).then(watchers => { + watchers.forEach(watcher => { + notify(watcher.userId, user._id, 'reply', { + postId: post._id + }); + }); + }); + + // この投稿をWatchする + if (isLocalUser(user) && user.account.settings.autoWatch !== false) { + watch(user._id, content.reply); + } + + // Add mention + addMention(content.reply.userId, 'reply'); + } + + // If it is repost + if (content.repost) { + // Notify + const type = content.text ? 'quote' : 'repost'; + notify(content.repost.userId, user._id, type, { + post_id: post._id + }); + + // Fetch watchers + PostWatching.find({ + postId: content.repost._id, + userId: { $ne: user._id }, + // 削除されたドキュメントは除く + deletedAt: { $exists: false } + }, { + fields: { + userId: true + } + }).then(watchers => { + watchers.forEach(watcher => { + notify(watcher.userId, user._id, type, { + postId: post._id + }); + }); + }); + + // この投稿をWatchする + if (isLocalUser(user) && user.account.settings.autoWatch !== false) { + watch(user._id, content.repost); + } + + // If it is quote repost + if (content.text) { + // Add mention + addMention(content.repost.userId, 'quote'); + } else { + // Publish event + if (!user._id.equals(content.repost.userId)) { + event(content.repost.userId, 'repost', postObj); + } + } + + // 今までで同じ投稿をRepostしているか + const existRepost = await Post.findOne({ + userId: user._id, + repostId: content.repost._id, + _id: { + $ne: post._id + } + }); + + if (!existRepost) { + // Update repostee status + Post.update({ _id: content.repost._id }, { + $inc: { + repostCount: 1 + } + }); + } + } + + // If has text content + if (content.text) { + // Extract an '@' mentions + const atMentions = tokens + .filter(t => t.type == 'mention') + .map(m => m.username) + // Drop dupulicates + .filter((v, i, s) => s.indexOf(v) == i); + + // Resolve all mentions + await Promise.all(atMentions.map(async mention => { + // Fetch mentioned user + // SELECT _id + const mentionee = await User + .findOne({ + usernameLower: mention.toLowerCase() + }, { _id: true }); + + // When mentioned user not found + if (mentionee == null) return; + + // 既に言及されたユーザーに対する返信や引用repostの場合も無視 + if (content.reply && content.reply.userId.equals(mentionee._id)) return; + if (content.repost && content.repost.userId.equals(mentionee._id)) return; + + // Add mention + addMention(mentionee._id, 'mention'); + + // Create notification + notify(mentionee._id, user._id, 'mention', { + post_id: post._id + }); + })); + } + + // Append mentions data + if (mentions.length > 0) { + Post.update({ _id: post._id }, { + $set: { + mentions + } + }); + } +}); diff --git a/src/services/post/reaction/create.ts b/src/services/post/reaction/create.ts new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/src/services/post/reaction/create.ts diff --git a/src/services/post/watch.ts b/src/services/post/watch.ts new file mode 100644 index 0000000000..bbd9976f40 --- /dev/null +++ b/src/services/post/watch.ts @@ -0,0 +1,26 @@ +import * as mongodb from 'mongodb'; +import Watching from '../../models/post-watching'; + +export default async (me: mongodb.ObjectID, post: object) => { + // 自分の投稿はwatchできない + if (me.equals((post as any).userId)) { + return; + } + + // if watching now + const exist = await Watching.findOne({ + postId: (post as any)._id, + userId: me, + deletedAt: { $exists: false } + }); + + if (exist !== null) { + return; + } + + await Watching.insert({ + createdAt: new Date(), + postId: (post as any)._id, + userId: me + }); +}; |