diff options
| author | rinsuki <428rinsuki+git@gmail.com> | 2018-05-17 07:52:24 +0900 |
|---|---|---|
| committer | rinsuki <428rinsuki+git@gmail.com> | 2018-05-17 07:52:24 +0900 |
| commit | 829b4012e6dc14eb64a3d8f60826fe9b6a41b40d (patch) | |
| tree | 42ac37f323db349dca9316e6fdb39fc33b860686 /src/services | |
| parent | add yarn.lock to gitignore (diff) | |
| parent | Update deliver.ts (diff) | |
| download | misskey-829b4012e6dc14eb64a3d8f60826fe9b6a41b40d.tar.gz misskey-829b4012e6dc14eb64a3d8f60826fe9b6a41b40d.tar.bz2 misskey-829b4012e6dc14eb64a3d8f60826fe9b6a41b40d.zip | |
Merge branch 'master' into fix/yarn-lock-ignore
Diffstat (limited to 'src/services')
| -rw-r--r-- | src/services/drive/add-file.ts | 97 | ||||
| -rw-r--r-- | src/services/drive/upload-from-url.ts | 2 | ||||
| -rw-r--r-- | src/services/following/create.ts | 13 | ||||
| -rw-r--r-- | src/services/note/create.ts | 367 | ||||
| -rw-r--r-- | src/services/note/reaction/create.ts | 19 |
5 files changed, 319 insertions, 179 deletions
diff --git a/src/services/drive/add-file.ts b/src/services/drive/add-file.ts index 30aae24ba6..efabe345d1 100644 --- a/src/services/drive/add-file.ts +++ b/src/services/drive/add-file.ts @@ -10,12 +10,14 @@ import * as debug from 'debug'; import fileType = require('file-type'); import prominence = require('prominence'); -import DriveFile, { IMetadata, getGridFSBucket, IDriveFile } from '../../models/drive-file'; +import DriveFile, { IMetadata, getDriveFileBucket, IDriveFile, DriveFileChunk } from '../../models/drive-file'; import DriveFolder from '../../models/drive-folder'; import { pack } from '../../models/drive-file'; import event, { publishDriveStream } from '../../publishers/stream'; import getAcct from '../../acct/render'; -import { IUser } from '../../models/user'; +import { IUser, isLocalUser, isRemoteUser } from '../../models/user'; +import DriveFileThumbnail, { getDriveFileThumbnailBucket, DriveFileThumbnailChunk } from '../../models/drive-file-thumbnail'; +import genThumbnail from '../../drive/gen-thumbnail'; const gm = _gm.subClass({ imageMagick: true @@ -30,8 +32,8 @@ const tmpFile = (): Promise<[string, any]> => new Promise((resolve, reject) => { }); }); -const addToGridFS = (name: string, readable: stream.Readable, type: string, metadata: any): Promise<any> => - getGridFSBucket() +const writeChunks = (name: string, readable: stream.Readable, type: string, metadata: any) => + getDriveFileBucket() .then(bucket => new Promise((resolve, reject) => { const writeStream = bucket.openUploadStream(name, { contentType: type, metadata }); writeStream.once('finish', resolve); @@ -39,6 +41,20 @@ const addToGridFS = (name: string, readable: stream.Readable, type: string, meta readable.pipe(writeStream); })); +const writeThumbnailChunks = (name: string, readable: stream.Readable, originalId) => + getDriveFileThumbnailBucket() + .then(bucket => new Promise((resolve, reject) => { + const writeStream = bucket.openUploadStream(name, { + contentType: 'image/jpeg', + metadata: { + originalId + } + }); + writeStream.once('finish', resolve); + writeStream.on('error', reject); + readable.pipe(writeStream); + })); + const addFile = async ( user: IUser, path: string, @@ -46,6 +62,7 @@ const addFile = async ( comment: string = null, folderId: mongodb.ObjectID = null, force: boolean = false, + url: string = null, uri: string = null ): Promise<IDriveFile> => { log(`registering ${name} (user: ${getAcct(user)}, path: ${path})`); @@ -101,7 +118,8 @@ const addFile = async ( // Check if there is a file with the same hash const much = await DriveFile.findOne({ md5: hash, - 'metadata.userId': user._id + 'metadata.userId': user._id, + 'metadata.deletedAt': { $exists: false } }); if (much !== null) { @@ -185,7 +203,10 @@ const addFile = async ( // Calculate drive usage const usage = await DriveFile .aggregate([{ - $match: { 'metadata.userId': user._id } + $match: { + 'metadata.userId': user._id, + 'metadata.deletedAt': { $exists: false } + } }, { $project: { length: true @@ -207,7 +228,49 @@ const addFile = async ( // If usage limit exceeded if (usage + size > user.driveCapacity) { - throw 'no-free-space'; + if (isLocalUser(user)) { + throw 'no-free-space'; + } else { + //#region (アバターまたはバナーを含まず)最も古いファイルを削除する + const oldFile = await DriveFile.findOne({ + _id: { + $nin: [user.avatarId, user.bannerId] + } + }, { + sort: { + _id: 1 + } + }); + + if (oldFile) { + // チャンクをすべて削除 + DriveFileChunk.remove({ + files_id: oldFile._id + }); + + DriveFile.update({ _id: oldFile._id }, { + $set: { + 'metadata.deletedAt': new Date(), + 'metadata.isExpired': true + } + }); + + //#region サムネイルもあれば削除 + const thumbnail = await DriveFileThumbnail.findOne({ + 'metadata.originalId': oldFile._id + }); + + if (thumbnail) { + DriveFileThumbnailChunk.remove({ + files_id: thumbnail._id + }); + + DriveFileThumbnail.remove({ _id: thumbnail._id }); + } + //#endregion + } + //#endregion + } } })() ]); @@ -227,16 +290,34 @@ const addFile = async ( const metadata = { userId: user._id, + _user: { + host: user.host + }, folderId: folder !== null ? folder._id : null, comment: comment, properties: properties } as IMetadata; + if (url !== null) { + metadata.url = url; + } + if (uri !== null) { metadata.uri = uri; } - return addToGridFS(detectedName, readable, mime, metadata); + const file = await (writeChunks(detectedName, readable, mime, metadata) as Promise<IDriveFile>); + + try { + const thumb = await genThumbnail(file); + if (thumb) { + await writeThumbnailChunks(detectedName, thumb, file._id); + } + } catch (e) { + // noop + } + + return file; }; /** diff --git a/src/services/drive/upload-from-url.ts b/src/services/drive/upload-from-url.ts index 08e0397706..ad2620c036 100644 --- a/src/services/drive/upload-from-url.ts +++ b/src/services/drive/upload-from-url.ts @@ -43,7 +43,7 @@ export default async (url, user, folderId = null, uri = null): Promise<IDriveFil let error; try { - driveFile = await create(user, path, name, null, folderId, false, uri); + driveFile = await create(user, path, name, null, folderId, false, url, uri); log(`created: ${driveFile._id}`); } catch (e) { error = e; diff --git a/src/services/following/create.ts b/src/services/following/create.ts index 375b028912..3424c55dae 100644 --- a/src/services/following/create.ts +++ b/src/services/following/create.ts @@ -13,7 +13,18 @@ export default async function(follower: IUser, followee: IUser, activity?) { const following = await Following.insert({ createdAt: new Date(), followerId: follower._id, - followeeId: followee._id + followeeId: followee._id, + stalk: true, + + // 非正規化 + _follower: { + host: follower.host, + inbox: isRemoteUser(follower) ? follower.inbox : undefined + }, + _followee: { + host: followee.host, + inbox: isRemoteUser(followee) ? followee.inbox : undefined + } }); //#region Increment following count diff --git a/src/services/note/create.ts b/src/services/note/create.ts index e35e5ecfbd..f049c34b65 100644 --- a/src/services/note/create.ts +++ b/src/services/note/create.ts @@ -1,6 +1,6 @@ import Note, { pack, INote } from '../../models/note'; -import User, { isLocalUser, IUser, isRemoteUser } from '../../models/user'; -import stream, { publishLocalTimelineStream, publishGlobalTimelineStream } from '../../publishers/stream'; +import User, { isLocalUser, IUser, isRemoteUser, IRemoteUser, ILocalUser } from '../../models/user'; +import stream, { publishLocalTimelineStream, publishGlobalTimelineStream, publishUserListStream } from '../../publishers/stream'; import Following from '../../models/following'; import { deliver } from '../../queue'; import renderNote from '../../remote/activitypub/renderer/note'; @@ -15,8 +15,64 @@ import Mute from '../../models/mute'; import pushSw from '../../publishers/push-sw'; import event from '../../publishers/stream'; import parse from '../../text/parse'; -import html from '../../text/html'; import { IApp } from '../../models/app'; +import UserList from '../../models/user-list'; +import resolveUser from '../../remote/resolve-user'; + +type Reason = 'reply' | 'quote' | 'mention'; + +/** + * ServiceWorkerへの通知を担当 + */ +class NotificationManager { + private user: IUser; + private note: any; + private list: Array<{ + user: ILocalUser['_id'], + reason: Reason; + }> = []; + + constructor(user, note) { + this.user = user; + this.note = note; + } + + public push(user: ILocalUser['_id'], reason: Reason) { + // 自分自身へは通知しない + if (this.user._id.equals(user)) return; + + const exist = this.list.find(x => x.user.equals(user)); + + if (exist) { + // 「メンションされているかつ返信されている」場合は、メンションとしての通知ではなく返信としての通知にする + if (reason != 'mention') { + exist.reason = reason; + } + } else { + this.list.push({ + user, reason + }); + } + } + + public deliver() { + this.list.forEach(async x => { + const mentionee = x.user; + + // ミュート情報を取得 + const mentioneeMutes = await Mute.find({ + muterId: mentionee + }); + + const mentioneesMutedUserIds = mentioneeMutes.map(m => m.muteeId.toString()); + + // 通知される側のユーザーが通知する側のユーザーをミュートしていない限りは通知する + if (!mentioneesMutedUserIds.includes(this.user._id.toString())) { + pushSw(mentionee, x.reason, this.note); + } + }); + } +} export default async (user: IUser, data: { createdAt?: Date; @@ -30,6 +86,7 @@ export default async (user: IUser, data: { tags?: string[]; cw?: string; visibility?: string; + visibleUsers?: IUser[]; uri?: string; app?: IApp; }, silent = false) => new Promise<INote>(async (res, rej) => { @@ -39,7 +96,7 @@ export default async (user: IUser, data: { const tags = data.tags || []; - let tokens = null; + let tokens: any[] = null; if (data.text) { // Analyze @@ -57,21 +114,29 @@ export default async (user: IUser, data: { }); } + if (data.visibleUsers) { + data.visibleUsers = data.visibleUsers.filter(x => x != null); + } + const insert: any = { createdAt: data.createdAt, mediaIds: data.media ? data.media.map(file => file._id) : [], replyId: data.reply ? data.reply._id : null, renoteId: data.renote ? data.renote._id : null, text: data.text, - textHtml: tokens === null ? null : html(tokens), poll: data.poll, - cw: data.cw, + cw: data.cw == null ? null : data.cw, tags, userId: user._id, viaMobile: data.viaMobile, geo: data.geo || null, appId: data.app ? data.app._id : null, visibility: data.visibility, + visibleUserIds: data.visibility == 'specified' + ? data.visibleUsers + ? data.visibleUsers.map(u => u._id) + : [] + : [], // 以下非正規化データ _reply: data.reply ? { userId: data.reply.userId } : null, @@ -85,143 +150,167 @@ export default async (user: IUser, data: { if (data.uri != null) insert.uri = data.uri; // 投稿を作成 - const note = await Note.insert(insert); + let note: INote; + try { + note = await Note.insert(insert); + } catch (e) { + // duplicate key error + if (e.code === 11000) { + return res(null); + } + + console.error(e); + return rej('something happened'); + } res(note); + // Increment notes count User.update({ _id: user._id }, { - // Increment notes count $inc: { notesCount: 1 - }, - // Update latest note - $set: { - latestNote: note } }); // Serialize const noteObj = await pack(note); - // タイムラインへの投稿 - if (note.channelId == null) { + const nm = new NotificationManager(user, noteObj); + + const render = async () => { + const content = data.renote && data.text == null + ? renderAnnounce(data.renote.uri ? data.renote.uri : await renderNote(data.renote)) + : renderCreate(await renderNote(note)); + return packAp(content); + }; + + if (!silent) { if (isLocalUser(user)) { - // Publish event to myself's stream - stream(note.userId, 'note', noteObj); + if (note.visibility == 'private' || note.visibility == 'followers' || note.visibility == 'specified') { + // Publish event to myself's stream + stream(note.userId, 'note', await pack(note, user, { + detail: true + })); + } else { + // Publish event to myself's stream + stream(note.userId, 'note', noteObj); - // Publish note to local timeline stream - publishLocalTimelineStream(noteObj); + // Publish note to local timeline stream + if (note.visibility != 'home') { + publishLocalTimelineStream(noteObj); + } + } } // Publish note to global timeline stream publishGlobalTimelineStream(noteObj); - // Fetch all followers - const followers = await Following.aggregate([{ - $lookup: { - from: 'users', - localField: 'followerId', - foreignField: '_id', - as: 'user' - } - }, { - $match: { + if (note.visibility == 'specified') { + data.visibleUsers.forEach(async u => { + stream(u._id, 'note', await pack(note, u, { + detail: true + })); + }); + } + + if (note.visibility == 'public' || note.visibility == 'home' || note.visibility == 'followers') { + // フォロワーに配信 + Following.find({ followeeId: note.userId - } - }], { - _id: false - }); + }).then(followers => { + followers.map(async following => { + const follower = following._follower; - if (!silent) { - const render = async () => { - const content = data.renote && data.text == null - ? renderAnnounce(data.renote.uri ? data.renote.uri : await renderNote(data.renote)) - : renderCreate(await renderNote(note)); - return packAp(content); - }; + if (isLocalUser(follower)) { + // ストーキングしていない場合 + if (!following.stalk) { + // この投稿が返信ならスキップ + if (note.replyId && !note._reply.userId.equals(following.followerId) && !note._reply.userId.equals(note.userId)) return; + } - // 投稿がリプライかつ投稿者がローカルユーザーかつリプライ先の投稿の投稿者がリモートユーザーなら配送 - if (data.reply && isLocalUser(user) && isRemoteUser(data.reply._user)) { - deliver(user, await render(), data.reply._user.inbox); - } + // Publish event to followers stream + stream(following.followerId, 'note', noteObj); + } else { + //#region AP配送 + // フォロワーがリモートユーザーかつ投稿者がローカルユーザーなら投稿を配信 + if (isLocalUser(user)) { + deliver(user, await render(), follower.inbox); + } + //#endergion + } + }); + }); + } - // 投稿がRenoteかつ投稿者がローカルユーザーかつRenote元の投稿の投稿者がリモートユーザーなら配送 - if (data.renote && isLocalUser(user) && isRemoteUser(data.renote._user)) { - deliver(user, await render(), data.renote._user.inbox); - } + // リストに配信 + UserList.find({ + userIds: note.userId + }).then(lists => { + lists.forEach(list => { + publishUserListStream(list._id, 'note', noteObj); + }); + }); + } - Promise.all(followers.map(async follower => { - follower = follower.user[0]; + //#region リプライとAnnounceのAP配送 - if (isLocalUser(follower)) { - // Publish event to followers stream - stream(follower._id, 'note', noteObj); - } else { - // フォロワーがリモートユーザーかつ投稿者がローカルユーザーなら投稿を配信 - if (isLocalUser(user)) { - deliver(user, await render(), follower.inbox); - } - } - })); - } + // 投稿がリプライかつ投稿者がローカルユーザーかつリプライ先の投稿の投稿者がリモートユーザーなら配送 + if (data.reply && isLocalUser(user) && isRemoteUser(data.reply._user)) { + deliver(user, await render(), data.reply._user.inbox); } - // チャンネルへの投稿 - /* TODO - if (note.channelId) { - promises.push( - // Increment channel index(notes count) - Channel.update({ _id: note.channelId }, { - $inc: { - index: 1 - } - }), + // 投稿がRenoteかつ投稿者がローカルユーザーかつRenote元の投稿の投稿者がリモートユーザーなら配送 + if (data.renote && isLocalUser(user) && isRemoteUser(data.renote._user)) { + deliver(user, await render(), data.renote._user.inbox); + } + //#endergion - // Publish event to channel - promisedNoteObj.then(noteObj => { - publishChannelStream(note.channelId, 'note', noteObj); - }), + //#region メンション + if (data.text) { + // TODO: Drop dupulicates + const mentions = tokens + .filter(t => t.type == 'mention'); - Promise.all([ - promisedNoteObj, + let mentionedUsers = await Promise.all(mentions.map(async m => { + try { + return await resolveUser(m.username, m.host); + } catch (e) { + return null; + } + })); - // Get channel watchers - ChannelWatching.find({ - channelId: note.channelId, - // 削除されたドキュメントは除く - deletedAt: { $exists: false } - }) - ]).then(([noteObj, watches]) => { - // チャンネルの視聴者(のタイムライン)に配信 - watches.forEach(w => { - stream(w.userId, 'note', noteObj); - }); - }) - ); - }*/ + // TODO: Drop dupulicates + mentionedUsers = mentionedUsers.filter(x => x != null); - const mentions = []; + mentionedUsers.filter(u => isLocalUser(u)).forEach(async u => { + // 既に言及されたユーザーに対する返信や引用renoteの場合も無視 + if (data.reply && data.reply.userId.equals(u._id)) return; + if (data.renote && data.renote.userId.equals(u._id)) return; - async function addMention(mentionee, reason) { - // Reject if already added - if (mentions.some(x => x.equals(mentionee))) return; + // Create notification + notify(u._id, user._id, 'mention', { + noteId: note._id + }); - // Add mention - mentions.push(mentionee); + nm.push(u._id, 'mention'); + }); - // Publish event - if (!user._id.equals(mentionee)) { - const mentioneeMutes = await Mute.find({ - muter_id: mentionee, - deleted_at: { $exists: false } + if (isLocalUser(user)) { + mentionedUsers.filter(u => isRemoteUser(u)).forEach(async u => { + deliver(user, await render(), (u as IRemoteUser).inbox); + }); + } + + // Append mentions data + if (mentionedUsers.length > 0) { + Note.update({ _id: note._id }, { + $set: { + mentions: mentionedUsers.map(u => u._id) + } }); - const mentioneesMutedUserIds = mentioneeMutes.map(m => m.muteeId.toString()); - if (mentioneesMutedUserIds.indexOf(user._id.toString()) == -1) { - event(mentionee, reason, noteObj); - pushSw(mentionee, reason, noteObj); - } } } + //#endregion // If has in reply to note if (data.reply) { @@ -260,8 +349,7 @@ export default async (user: IUser, data: { watch(user._id, data.reply); } - // Add mention - addMention(data.reply.userId, 'reply'); + nm.push(data.reply.userId, 'reply'); } // If it is renote @@ -296,7 +384,7 @@ export default async (user: IUser, data: { // If it is quote renote if (data.text) { // Add mention - addMention(data.renote.userId, 'quote'); + nm.push(data.renote.userId, 'quote'); } else { // Publish event if (!user._id.equals(data.renote.userId)) { @@ -304,14 +392,17 @@ export default async (user: IUser, data: { } } + //#region TODO: これ重い // 今までで同じ投稿をRenoteしているか - const existRenote = await Note.findOne({ - userId: user._id, - renoteId: data.renote._id, - _id: { - $ne: note._id - } - }); + //const existRenote = await Note.findOne({ + // userId: user._id, + // renoteId: data.renote._id, + // _id: { + // $ne: note._id + // } + //}); + const existRenote = null; + //#endregion if (!existRenote) { // Update renoteee status @@ -322,48 +413,4 @@ export default async (user: IUser, data: { }); } } - - // If has text content - if (data.text) { - // Extract an '@' mentions - const atMentions = tokens - .filter(t => t.type == 'mention') - .map(m => m.username) - // Drop dupulicates - .filter((v, i, s) => s.indexOf(v) == i); - - // Resolve all mentions - await Promise.all(atMentions.map(async mention => { - // Fetch mentioned user - // SELECT _id - const mentionee = await User - .findOne({ - usernameLower: mention.toLowerCase() - }, { _id: true }); - - // When mentioned user not found - if (mentionee == null) return; - - // 既に言及されたユーザーに対する返信や引用renoteの場合も無視 - if (data.reply && data.reply.userId.equals(mentionee._id)) return; - if (data.renote && data.renote.userId.equals(mentionee._id)) return; - - // Add mention - addMention(mentionee._id, 'mention'); - - // Create notification - notify(mentionee._id, user._id, 'mention', { - noteId: note._id - }); - })); - } - - // Append mentions data - if (mentions.length > 0) { - Note.update({ _id: note._id }, { - $set: { - mentions - } - }); - } }); diff --git a/src/services/note/reaction/create.ts b/src/services/note/reaction/create.ts index dd3d4be8b7..123c091c85 100644 --- a/src/services/note/reaction/create.ts +++ b/src/services/note/reaction/create.ts @@ -9,7 +9,6 @@ import watch from '../watch'; import renderLike from '../../../remote/activitypub/renderer/like'; import { deliver } from '../../../queue'; import pack from '../../../remote/activitypub/renderer'; -import { MongoError } from 'mongodb'; export default async (user: IUser, note: INote, reaction: string) => new Promise(async (res, rej) => { // Myself @@ -27,8 +26,8 @@ export default async (user: IUser, note: INote, reaction: string) => new Promise }); } catch (e) { // duplicate key error - if (e instanceof MongoError && e.code === 11000) { - return rej('already reacted'); + if (e.code === 11000) { + return res(null); } console.error(e); @@ -47,11 +46,13 @@ export default async (user: IUser, note: INote, reaction: string) => new Promise publishNoteStream(note._id, 'reacted'); - // Notify - notify(note.userId, user._id, 'reaction', { - noteId: note._id, - reaction: reaction - }); + // リアクションされたユーザーがローカルユーザーなら通知を作成 + if (isLocalUser(note._user)) { + notify(note.userId, user._id, 'reaction', { + noteId: note._id, + reaction: reaction + }); + } pushSw(note.userId, 'reaction', { user: await packUser(user, note.userId), @@ -86,7 +87,7 @@ export default async (user: IUser, note: INote, reaction: string) => new Promise //#region 配信 // リアクターがローカルユーザーかつリアクション対象がリモートユーザーの投稿なら配送 if (isLocalUser(user) && isRemoteUser(note._user)) { - const content = pack(renderLike(user, note)); + const content = pack(renderLike(user, note, reaction)); deliver(user, content, note._user.inbox); } //#endregion |