From d381d31e5eb66205759bba492e4eef9d6c09fde3 Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 19 Mar 2021 11:15:05 +0900 Subject: add note --- src/server/api/stream/index.ts | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/server/api/stream') diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts index c56a0a157b..bb37cfa622 100644 --- a/src/server/api/stream/index.ts +++ b/src/server/api/stream/index.ts @@ -159,6 +159,8 @@ export default class Connection { } if (this.user && read) { + // TODO: クライアントでタイムライン読み込みなどすると、一度に大量のreadNoteが発生しクエリ数がすごいことになるので、ある程度まとめてreadNoteするようにする + // 具体的には、この箇所ではキュー的な配列にread予定ノートを溜めておくに留めて、別の箇所で定期的にキューにあるノートを配列でreadNoteに渡すような実装にする readNote(this.user.id, payload.id); } } -- cgit v1.2.3-freya From 87c8f9ff953499340496e9c5db09c93eaff08851 Mon Sep 17 00:00:00 2001 From: syuilo Date: Fri, 19 Mar 2021 20:43:24 +0900 Subject: perf: Reduce database query --- src/client/components/note-detailed.vue | 10 ++- src/client/components/note.vue | 10 ++- src/client/ui/chat/note.vue | 10 ++- src/server/api/endpoints/notes/mentions.ts | 6 +- src/server/api/stream/index.ts | 38 ++++++++--- src/services/note/read-mention.ts | 29 ++++++++ src/services/note/read-specified-note.ts | 29 ++++++++ src/services/note/read.ts | 105 ----------------------------- 8 files changed, 114 insertions(+), 123 deletions(-) create mode 100644 src/services/note/read-mention.ts create mode 100644 src/services/note/read-specified-note.ts delete mode 100644 src/services/note/read.ts (limited to 'src/server/api/stream') diff --git a/src/client/components/note-detailed.vue b/src/client/components/note-detailed.vue index 1ef3f43389..4ad3d2d898 100644 --- a/src/client/components/note-detailed.vue +++ b/src/client/components/note-detailed.vue @@ -350,7 +350,15 @@ export default defineComponent({ capture(withHandler = false) { if (this.$i) { - this.connection.send(document.body.contains(this.$el) ? 'sn' : 's', { id: this.appearNote.id }); + this.connection.send('sn', { id: this.appearNote.id }); + if (this.appearNote.userId !== this.$i.id) { + if (this.appearNote.mentions && this.appearNote.mentions.includes(this.$i.id)) { + this.connection.send('readMention', { id: this.appearNote.id }); + } + if (this.appearNote.visibleUserIds && this.appearNote.visibleUserIds.includes(this.$i.id)) { + this.connection.send('readSpecifiedNote', { id: this.appearNote.id }); + } + } if (withHandler) this.connection.on('noteUpdated', this.onStreamNoteUpdated); } }, diff --git a/src/client/components/note.vue b/src/client/components/note.vue index 65e09b7802..3b59afd71d 100644 --- a/src/client/components/note.vue +++ b/src/client/components/note.vue @@ -325,7 +325,15 @@ export default defineComponent({ capture(withHandler = false) { if (this.$i) { - this.connection.send(document.body.contains(this.$el) ? 'sn' : 's', { id: this.appearNote.id }); + this.connection.send('sn', { id: this.appearNote.id }); + if (this.appearNote.userId !== this.$i.id) { + if (this.appearNote.mentions && this.appearNote.mentions.includes(this.$i.id)) { + this.connection.send('readMention', { id: this.appearNote.id }); + } + if (this.appearNote.visibleUserIds && this.appearNote.visibleUserIds.includes(this.$i.id)) { + this.connection.send('readSpecifiedNote', { id: this.appearNote.id }); + } + } if (withHandler) this.connection.on('noteUpdated', this.onStreamNoteUpdated); } }, diff --git a/src/client/ui/chat/note.vue b/src/client/ui/chat/note.vue index 5a4a13d889..29bc61d9c5 100644 --- a/src/client/ui/chat/note.vue +++ b/src/client/ui/chat/note.vue @@ -325,7 +325,15 @@ export default defineComponent({ capture(withHandler = false) { if (this.$i) { - this.connection.send(document.body.contains(this.$el) ? 'sn' : 's', { id: this.appearNote.id }); + this.connection.send('sn', { id: this.appearNote.id }); + if (this.appearNote.userId !== this.$i.id) { + if (this.appearNote.mentions && this.appearNote.mentions.includes(this.$i.id)) { + this.connection.send('readMention', { id: this.appearNote.id }); + } + if (this.appearNote.visibleUserIds && this.appearNote.visibleUserIds.includes(this.$i.id)) { + this.connection.send('readSpecifiedNote', { id: this.appearNote.id }); + } + } if (withHandler) this.connection.on('noteUpdated', this.onStreamNoteUpdated); } }, diff --git a/src/server/api/endpoints/notes/mentions.ts b/src/server/api/endpoints/notes/mentions.ts index 8a9d295d38..1e3014bd46 100644 --- a/src/server/api/endpoints/notes/mentions.ts +++ b/src/server/api/endpoints/notes/mentions.ts @@ -1,12 +1,12 @@ import $ from 'cafy'; import { ID } from '../../../../misc/cafy-id'; import define from '../../define'; -import read from '../../../../services/note/read'; import { Notes, Followings } from '../../../../models'; import { generateVisibilityQuery } from '../../common/generate-visibility-query'; import { generateMutedUserQuery } from '../../common/generate-muted-user-query'; import { makePaginationQuery } from '../../common/make-pagination-query'; import { Brackets } from 'typeorm'; +import { readMention } from '../../../../services/note/read-mention'; export const meta = { desc: { @@ -79,9 +79,7 @@ export default define(meta, async (ps, user) => { const mentions = await query.take(ps.limit!).getMany(); - for (const note of mentions) { - read(user.id, note.id); - } + readMention(user.id, mentions.map(n => n.id)); return await Notes.packMany(mentions, user); }); diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts index bb37cfa622..4a87f61e7f 100644 --- a/src/server/api/stream/index.ts +++ b/src/server/api/stream/index.ts @@ -2,7 +2,6 @@ import autobind from 'autobind-decorator'; import * as websocket from 'websocket'; import { readNotification } from '../common/read-notification'; import call from '../call'; -import readNote from '../../../services/note/read'; import Channel from './channel'; import channels from './channels'; import { EventEmitter } from 'events'; @@ -14,6 +13,8 @@ import { AccessToken } from '../../../models/entities/access-token'; import { UserProfile } from '../../../models/entities/user-profile'; import { publishChannelStream, publishGroupMessagingStream, publishMessagingStream } from '../../../services/stream'; import { UserGroup } from '../../../models/entities/user-group'; +import { readMention } from '../../../services/note/read-mention'; +import { readSpecifiedNote } from '../../../services/note/read-specified-note'; /** * Main stream connection @@ -86,9 +87,10 @@ export default class Connection { switch (type) { case 'api': this.onApiRequest(body); break; case 'readNotification': this.onReadNotification(body); break; - case 'subNote': this.onSubscribeNote(body, true); break; - case 'sn': this.onSubscribeNote(body, true); break; // alias - case 's': this.onSubscribeNote(body, false); break; + case 'readMention': this.onReadMention(body); break; + case 'readSpecifiedNote': this.onReadSpecifiedNote(body); break; + case 'subNote': this.onSubscribeNote(body); break; + case 'sn': this.onSubscribeNote(body); break; // alias case 'unsubNote': this.onUnsubscribeNote(body); break; case 'un': this.onUnsubscribeNote(body); break; // alias case 'connect': this.onChannelConnectRequested(body); break; @@ -141,11 +143,31 @@ export default class Connection { readNotification(this.user!.id, [payload.id]); } + @autobind + private onReadMention(payload: any) { + if (!payload.id) return; + if (this.user) { + // TODO: ある程度まとめてreadMentionするようにする + // 具体的には、この箇所ではキュー的な配列にread予定ノートを溜めておくに留めて、別の箇所で定期的にキューにあるノートを配列でreadMentionに渡すような実装にする + readMention(this.user.id, [payload.id]); + } + } + + @autobind + private onReadSpecifiedNote(payload: any) { + if (!payload.id) return; + if (this.user) { + // TODO: ある程度まとめてreadSpecifiedNoteするようにする + // 具体的には、この箇所ではキュー的な配列にread予定ノートを溜めておくに留めて、別の箇所で定期的にキューにあるノートを配列でreadSpecifiedNoteに渡すような実装にする + readSpecifiedNote(this.user.id, [payload.id]); + } + } + /** * 投稿購読要求時 */ @autobind - private onSubscribeNote(payload: any, read: boolean) { + private onSubscribeNote(payload: any) { if (!payload.id) return; if (this.subscribingNotes[payload.id] == null) { @@ -157,12 +179,6 @@ export default class Connection { if (this.subscribingNotes[payload.id] === 1) { this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage); } - - if (this.user && read) { - // TODO: クライアントでタイムライン読み込みなどすると、一度に大量のreadNoteが発生しクエリ数がすごいことになるので、ある程度まとめてreadNoteするようにする - // 具体的には、この箇所ではキュー的な配列にread予定ノートを溜めておくに留めて、別の箇所で定期的にキューにあるノートを配列でreadNoteに渡すような実装にする - readNote(this.user.id, payload.id); - } } /** diff --git a/src/services/note/read-mention.ts b/src/services/note/read-mention.ts new file mode 100644 index 0000000000..2a668ecd6c --- /dev/null +++ b/src/services/note/read-mention.ts @@ -0,0 +1,29 @@ +import { publishMainStream } from '../stream'; +import { Note } from '../../models/entities/note'; +import { User } from '../../models/entities/user'; +import { NoteUnreads } from '../../models'; +import { In } from 'typeorm'; + +/** + * Mark a mention note as read + */ +export async function readMention( + userId: User['id'], + noteIds: Note['id'][] +) { + // Remove the records + await NoteUnreads.delete({ + userId: userId, + noteId: In(noteIds), + }); + + const mentionsCount = await NoteUnreads.count({ + userId: userId, + isMentioned: true + }); + + if (mentionsCount === 0) { + // 全て既読になったイベントを発行 + publishMainStream(userId, 'readAllUnreadMentions'); + } +} diff --git a/src/services/note/read-specified-note.ts b/src/services/note/read-specified-note.ts new file mode 100644 index 0000000000..0fcb66bf98 --- /dev/null +++ b/src/services/note/read-specified-note.ts @@ -0,0 +1,29 @@ +import { publishMainStream } from '../stream'; +import { Note } from '../../models/entities/note'; +import { User } from '../../models/entities/user'; +import { NoteUnreads } from '../../models'; +import { In } from 'typeorm'; + +/** + * Mark a specified note as read + */ +export async function readSpecifiedNote( + userId: User['id'], + noteIds: Note['id'][] +) { + // Remove the records + await NoteUnreads.delete({ + userId: userId, + noteId: In(noteIds), + }); + + const specifiedCount = await NoteUnreads.count({ + userId: userId, + isSpecified: true + }); + + if (specifiedCount === 0) { + // 全て既読になったイベントを発行 + publishMainStream(userId, 'readAllUnreadSpecifiedNotes'); + } +} diff --git a/src/services/note/read.ts b/src/services/note/read.ts deleted file mode 100644 index 5a39ab30b7..0000000000 --- a/src/services/note/read.ts +++ /dev/null @@ -1,105 +0,0 @@ -import { publishMainStream } from '../stream'; -import { Note } from '../../models/entities/note'; -import { User } from '../../models/entities/user'; -import { NoteUnreads, Antennas, AntennaNotes, Users } from '../../models'; -import { Not, IsNull } from 'typeorm'; - -/** - * Mark a note as read - */ -export default async function( - userId: User['id'], - noteId: Note['id'] -) { - async function careNoteUnreads() { - const exist = await NoteUnreads.findOne({ - userId: userId, - noteId: noteId, - }); - - if (!exist) return; - - // Remove the record - await NoteUnreads.delete({ - userId: userId, - noteId: noteId, - }); - - if (exist.isMentioned) { - NoteUnreads.count({ - userId: userId, - isMentioned: true - }).then(mentionsCount => { - if (mentionsCount === 0) { - // 全て既読になったイベントを発行 - publishMainStream(userId, 'readAllUnreadMentions'); - } - }); - } - - if (exist.isSpecified) { - NoteUnreads.count({ - userId: userId, - isSpecified: true - }).then(specifiedCount => { - if (specifiedCount === 0) { - // 全て既読になったイベントを発行 - publishMainStream(userId, 'readAllUnreadSpecifiedNotes'); - } - }); - } - - if (exist.noteChannelId) { - NoteUnreads.count({ - userId: userId, - noteChannelId: Not(IsNull()) - }).then(channelNoteCount => { - if (channelNoteCount === 0) { - // 全て既読になったイベントを発行 - publishMainStream(userId, 'readAllChannels'); - } - }); - } - } - - async function careAntenna() { - const beforeUnread = await Users.getHasUnreadAntenna(userId); - if (!beforeUnread) return; - - const antennas = await Antennas.find({ userId }); - - await Promise.all(antennas.map(async antenna => { - const countBefore = await AntennaNotes.count({ - antennaId: antenna.id, - read: false - }); - - if (countBefore === 0) return; - - await AntennaNotes.update({ - antennaId: antenna.id, - noteId: noteId - }, { - read: true - }); - - const countAfter = await AntennaNotes.count({ - antennaId: antenna.id, - read: false - }); - - if (countAfter === 0) { - publishMainStream(userId, 'readAntenna', antenna); - } - })); - - Users.getHasUnreadAntenna(userId).then(unread => { - if (!unread) { - publishMainStream(userId, 'readAllAntennas'); - } - }); - } - - careNoteUnreads(); - careAntenna(); -} -- cgit v1.2.3-freya From b6d0d4eb99d7b93d3bfa47c37d74de26ac87567c Mon Sep 17 00:00:00 2001 From: syuilo Date: Sun, 21 Mar 2021 14:33:18 +0900 Subject: add note --- src/server/api/stream/index.ts | 1 + 1 file changed, 1 insertion(+) (limited to 'src/server/api/stream') diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts index 4a87f61e7f..a94923484d 100644 --- a/src/server/api/stream/index.ts +++ b/src/server/api/stream/index.ts @@ -52,6 +52,7 @@ export default class Connection { this.onBroadcastMessage(type, body); }); + // TODO: reidsでイベントをもらうようにし、ポーリングはしないようにする if (this.user) { this.updateFollowing(); this.followingClock = setInterval(this.updateFollowing, 5000); -- cgit v1.2.3-freya From 8050352ad88798be222f735a3217367acaee277f Mon Sep 17 00:00:00 2001 From: syuilo Date: Sun, 21 Mar 2021 15:14:03 +0900 Subject: perf: 各ストリーミング接続ごとにポーリングしないように MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/api/endpoints/channels/follow.ts | 3 ++ src/server/api/endpoints/channels/unfollow.ts | 3 ++ src/server/api/endpoints/i/update.ts | 3 +- src/server/api/endpoints/mute/create.ts | 3 ++ src/server/api/endpoints/mute/delete.ts | 3 ++ src/server/api/stream/index.ts | 57 +++++++++++++++++++-------- src/services/blocking/create.ts | 12 ++++-- src/services/following/create.ts | 7 +++- src/services/following/delete.ts | 7 +++- src/services/following/requests/reject.ts | 7 +++- src/services/stream.ts | 5 +++ 11 files changed, 83 insertions(+), 27 deletions(-) (limited to 'src/server/api/stream') diff --git a/src/server/api/endpoints/channels/follow.ts b/src/server/api/endpoints/channels/follow.ts index bf2f2bbb57..11c6e37ff7 100644 --- a/src/server/api/endpoints/channels/follow.ts +++ b/src/server/api/endpoints/channels/follow.ts @@ -4,6 +4,7 @@ import define from '../../define'; import { ApiError } from '../../error'; import { Channels, ChannelFollowings } from '../../../../models'; import { genId } from '../../../../misc/gen-id'; +import { publishUserEvent } from '../../../../services/stream'; export const meta = { tags: ['channels'], @@ -42,4 +43,6 @@ export default define(meta, async (ps, user) => { followerId: user.id, followeeId: channel.id, }); + + publishUserEvent(user.id, 'followChannel', channel); }); diff --git a/src/server/api/endpoints/channels/unfollow.ts b/src/server/api/endpoints/channels/unfollow.ts index 8cab5c36a6..3eb0f1519b 100644 --- a/src/server/api/endpoints/channels/unfollow.ts +++ b/src/server/api/endpoints/channels/unfollow.ts @@ -3,6 +3,7 @@ import { ID } from '../../../../misc/cafy-id'; import define from '../../define'; import { ApiError } from '../../error'; import { Channels, ChannelFollowings } from '../../../../models'; +import { publishUserEvent } from '../../../../services/stream'; export const meta = { tags: ['channels'], @@ -39,4 +40,6 @@ export default define(meta, async (ps, user) => { followerId: user.id, followeeId: channel.id, }); + + publishUserEvent(user.id, 'unfollowChannel', channel); }); diff --git a/src/server/api/endpoints/i/update.ts b/src/server/api/endpoints/i/update.ts index a1faf8f1c2..92be2e9e6d 100644 --- a/src/server/api/endpoints/i/update.ts +++ b/src/server/api/endpoints/i/update.ts @@ -1,6 +1,6 @@ import $ from 'cafy'; import { ID } from '../../../../misc/cafy-id'; -import { publishMainStream } from '../../../../services/stream'; +import { publishMainStream, publishUserEvent } from '../../../../services/stream'; import acceptAllFollowRequests from '../../../../services/following/requests/accept-all'; import { publishToFollowers } from '../../../../services/i/update'; import define from '../../define'; @@ -317,6 +317,7 @@ export default define(meta, async (ps, user, token) => { // Publish meUpdated event publishMainStream(user.id, 'meUpdated', iObj); + publishUserEvent(user.id, 'updateUserProfile', await UserProfiles.findOne(user.id)); // 鍵垢を解除したとき、溜まっていたフォローリクエストがあるならすべて承認 if (user.isLocked && ps.isLocked === false) { diff --git a/src/server/api/endpoints/mute/create.ts b/src/server/api/endpoints/mute/create.ts index 437ad96107..ebfc6028ed 100644 --- a/src/server/api/endpoints/mute/create.ts +++ b/src/server/api/endpoints/mute/create.ts @@ -6,6 +6,7 @@ import { getUser } from '../../common/getters'; import { genId } from '../../../../misc/gen-id'; import { Mutings, NoteWatchings } from '../../../../models'; import { Muting } from '../../../../models/entities/muting'; +import { publishUserEvent } from '../../../../services/stream'; export const meta = { desc: { @@ -82,6 +83,8 @@ export default define(meta, async (ps, user) => { muteeId: mutee.id, } as Muting); + publishUserEvent(user.id, 'mute', mutee); + NoteWatchings.delete({ userId: muter.id, noteUserId: mutee.id diff --git a/src/server/api/endpoints/mute/delete.ts b/src/server/api/endpoints/mute/delete.ts index 217352acb4..67a59e3ae4 100644 --- a/src/server/api/endpoints/mute/delete.ts +++ b/src/server/api/endpoints/mute/delete.ts @@ -4,6 +4,7 @@ import define from '../../define'; import { ApiError } from '../../error'; import { getUser } from '../../common/getters'; import { Mutings } from '../../../../models'; +import { publishUserEvent } from '../../../../services/stream'; export const meta = { desc: { @@ -76,4 +77,6 @@ export default define(meta, async (ps, user) => { await Mutings.delete({ id: exist.id }); + + publishUserEvent(user.id, 'unmute', mutee); }); diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts index a94923484d..748e894f83 100644 --- a/src/server/api/stream/index.ts +++ b/src/server/api/stream/index.ts @@ -30,10 +30,6 @@ export default class Connection { public subscriber: EventEmitter; private channels: Channel[] = []; private subscribingNotes: any = {}; - private followingClock: ReturnType; - private mutingClock: ReturnType; - private followingChannelsClock: ReturnType; - private userProfileClock: ReturnType; constructor( wsConnection: websocket.connection, @@ -52,19 +48,51 @@ export default class Connection { this.onBroadcastMessage(type, body); }); - // TODO: reidsでイベントをもらうようにし、ポーリングはしないようにする if (this.user) { this.updateFollowing(); - this.followingClock = setInterval(this.updateFollowing, 5000); - this.updateMuting(); - this.mutingClock = setInterval(this.updateMuting, 5000); - this.updateFollowingChannels(); - this.followingChannelsClock = setInterval(this.updateFollowingChannels, 5000); - this.updateUserProfile(); - this.userProfileClock = setInterval(this.updateUserProfile, 5000); + + this.subscriber.on(`user:${this.user.id}`, ({ type, body }) => { + this.onUserEvent(type, body); + }); + } + } + + @autobind + private onUserEvent(type: string, body: any) { + switch (type) { + case 'follow': + this.following.add(body.id); + break; + + case 'unfollow': + this.following.delete(body.id); + break; + + case 'mute': + this.muting.add(body.id); + break; + + case 'unmute': + this.muting.delete(body.id); + break; + + case 'followChannel': + this.followingChannels.add(body.id); + break; + + case 'unfollowChannel': + this.followingChannels.delete(body.id); + break; + + case 'updateUserProfile': + this.userProfile = body; + break; + + default: + break; } } @@ -354,10 +382,5 @@ export default class Connection { for (const c of this.channels.filter(c => c.dispose)) { if (c.dispose) c.dispose(); } - - if (this.followingClock) clearInterval(this.followingClock); - if (this.mutingClock) clearInterval(this.mutingClock); - if (this.followingChannelsClock) clearInterval(this.followingChannelsClock); - if (this.userProfileClock) clearInterval(this.userProfileClock); } } diff --git a/src/services/blocking/create.ts b/src/services/blocking/create.ts index def4f33585..4f0238db91 100644 --- a/src/services/blocking/create.ts +++ b/src/services/blocking/create.ts @@ -1,4 +1,4 @@ -import { publishMainStream } from '../stream'; +import { publishMainStream, publishUserEvent } from '../stream'; import { renderActivity } from '../../remote/activitypub/renderer'; import renderFollow from '../../remote/activitypub/renderer/follow'; import renderUndo from '../../remote/activitypub/renderer/undo'; @@ -55,7 +55,10 @@ async function cancelRequest(follower: User, followee: User) { if (Users.isLocalUser(follower)) { Users.pack(followee, follower, { detail: true - }).then(packed => publishMainStream(follower.id, 'unfollow', packed)); + }).then(packed => { + publishUserEvent(follower.id, 'unfollow', packed); + publishMainStream(follower.id, 'unfollow', packed); + }); } // リモートにフォローリクエストをしていたらUndoFollow送信 @@ -97,7 +100,10 @@ async function unFollow(follower: User, followee: User) { if (Users.isLocalUser(follower)) { Users.pack(followee, follower, { detail: true - }).then(packed => publishMainStream(follower.id, 'unfollow', packed)); + }).then(packed => { + publishUserEvent(follower.id, 'unfollow', packed); + publishMainStream(follower.id, 'unfollow', packed); + }); } // リモートにフォローをしていたらUndoFollow送信 diff --git a/src/services/following/create.ts b/src/services/following/create.ts index 6bc98aee87..eb6699b0bf 100644 --- a/src/services/following/create.ts +++ b/src/services/following/create.ts @@ -1,4 +1,4 @@ -import { publishMainStream } from '../stream'; +import { publishMainStream, publishUserEvent } from '../stream'; import { renderActivity } from '../../remote/activitypub/renderer'; import renderFollow from '../../remote/activitypub/renderer/follow'; import renderAccept from '../../remote/activitypub/renderer/accept'; @@ -88,7 +88,10 @@ export async function insertFollowingDoc(followee: User, follower: User) { if (Users.isLocalUser(follower)) { Users.pack(followee, follower, { detail: true - }).then(packed => publishMainStream(follower.id, 'follow', packed)); + }).then(packed => { + publishUserEvent(follower.id, 'follow', packed); + publishMainStream(follower.id, 'follow', packed); + }); } // Publish followed event diff --git a/src/services/following/delete.ts b/src/services/following/delete.ts index 8821611515..32c47ea7f4 100644 --- a/src/services/following/delete.ts +++ b/src/services/following/delete.ts @@ -1,4 +1,4 @@ -import { publishMainStream } from '../stream'; +import { publishMainStream, publishUserEvent } from '../stream'; import { renderActivity } from '../../remote/activitypub/renderer'; import renderFollow from '../../remote/activitypub/renderer/follow'; import renderUndo from '../../remote/activitypub/renderer/undo'; @@ -30,7 +30,10 @@ export default async function(follower: User, followee: User, silent = false) { if (!silent && Users.isLocalUser(follower)) { Users.pack(followee, follower, { detail: true - }).then(packed => publishMainStream(follower.id, 'unfollow', packed)); + }).then(packed => { + publishUserEvent(follower.id, 'unfollow', packed); + publishMainStream(follower.id, 'unfollow', packed); + }); } if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) { diff --git a/src/services/following/requests/reject.ts b/src/services/following/requests/reject.ts index 9a8b14bbfd..d8d3788088 100644 --- a/src/services/following/requests/reject.ts +++ b/src/services/following/requests/reject.ts @@ -2,7 +2,7 @@ import { renderActivity } from '../../../remote/activitypub/renderer'; import renderFollow from '../../../remote/activitypub/renderer/follow'; import renderReject from '../../../remote/activitypub/renderer/reject'; import { deliver } from '../../../queue'; -import { publishMainStream } from '../../stream'; +import { publishMainStream, publishUserEvent } from '../../stream'; import { User, ILocalUser } from '../../../models/entities/user'; import { Users, FollowRequests, Followings } from '../../../models'; import { decrementFollowing } from '../delete'; @@ -39,5 +39,8 @@ export default async function(followee: User, follower: User) { Users.pack(followee, follower, { detail: true - }).then(packed => publishMainStream(follower.id, 'unfollow', packed)); + }).then(packed => { + publishUserEvent(follower.id, 'unfollow', packed); + publishMainStream(follower.id, 'unfollow', packed); + }); } diff --git a/src/services/stream.ts b/src/services/stream.ts index d833d700fe..75385847ce 100644 --- a/src/services/stream.ts +++ b/src/services/stream.ts @@ -20,6 +20,10 @@ class Publisher { })); } + public publishUserEvent = (userId: User['id'], type: string, value?: any): void => { + this.publish(`user:${userId}`, type, typeof value === 'undefined' ? null : value); + } + public publishBroadcastStream = (type: string, value?: any): void => { this.publish('broadcast', type, typeof value === 'undefined' ? null : value); } @@ -84,6 +88,7 @@ const publisher = new Publisher(); export default publisher; +export const publishUserEvent = publisher.publishUserEvent; export const publishBroadcastStream = publisher.publishBroadcastStream; export const publishMainStream = publisher.publishMainStream; export const publishDriveStream = publisher.publishDriveStream; -- cgit v1.2.3-freya From 630464f38d2524ddc5d11d2abd4fddcccc4240d4 Mon Sep 17 00:00:00 2001 From: syuilo Date: Sun, 21 Mar 2021 15:35:02 +0900 Subject: Revert "perf: Reduce database query" This reverts commit 87c8f9ff953499340496e9c5db09c93eaff08851. --- src/client/components/note-detailed.vue | 10 +-- src/client/components/note.vue | 10 +-- src/client/ui/chat/note.vue | 10 +-- src/server/api/endpoints/notes/mentions.ts | 6 +- src/server/api/stream/index.ts | 38 +++-------- src/services/note/read-mention.ts | 29 -------- src/services/note/read-specified-note.ts | 29 -------- src/services/note/read.ts | 105 +++++++++++++++++++++++++++++ 8 files changed, 123 insertions(+), 114 deletions(-) delete mode 100644 src/services/note/read-mention.ts delete mode 100644 src/services/note/read-specified-note.ts create mode 100644 src/services/note/read.ts (limited to 'src/server/api/stream') diff --git a/src/client/components/note-detailed.vue b/src/client/components/note-detailed.vue index 4ad3d2d898..1ef3f43389 100644 --- a/src/client/components/note-detailed.vue +++ b/src/client/components/note-detailed.vue @@ -350,15 +350,7 @@ export default defineComponent({ capture(withHandler = false) { if (this.$i) { - this.connection.send('sn', { id: this.appearNote.id }); - if (this.appearNote.userId !== this.$i.id) { - if (this.appearNote.mentions && this.appearNote.mentions.includes(this.$i.id)) { - this.connection.send('readMention', { id: this.appearNote.id }); - } - if (this.appearNote.visibleUserIds && this.appearNote.visibleUserIds.includes(this.$i.id)) { - this.connection.send('readSpecifiedNote', { id: this.appearNote.id }); - } - } + this.connection.send(document.body.contains(this.$el) ? 'sn' : 's', { id: this.appearNote.id }); if (withHandler) this.connection.on('noteUpdated', this.onStreamNoteUpdated); } }, diff --git a/src/client/components/note.vue b/src/client/components/note.vue index 3b59afd71d..65e09b7802 100644 --- a/src/client/components/note.vue +++ b/src/client/components/note.vue @@ -325,15 +325,7 @@ export default defineComponent({ capture(withHandler = false) { if (this.$i) { - this.connection.send('sn', { id: this.appearNote.id }); - if (this.appearNote.userId !== this.$i.id) { - if (this.appearNote.mentions && this.appearNote.mentions.includes(this.$i.id)) { - this.connection.send('readMention', { id: this.appearNote.id }); - } - if (this.appearNote.visibleUserIds && this.appearNote.visibleUserIds.includes(this.$i.id)) { - this.connection.send('readSpecifiedNote', { id: this.appearNote.id }); - } - } + this.connection.send(document.body.contains(this.$el) ? 'sn' : 's', { id: this.appearNote.id }); if (withHandler) this.connection.on('noteUpdated', this.onStreamNoteUpdated); } }, diff --git a/src/client/ui/chat/note.vue b/src/client/ui/chat/note.vue index 29bc61d9c5..5a4a13d889 100644 --- a/src/client/ui/chat/note.vue +++ b/src/client/ui/chat/note.vue @@ -325,15 +325,7 @@ export default defineComponent({ capture(withHandler = false) { if (this.$i) { - this.connection.send('sn', { id: this.appearNote.id }); - if (this.appearNote.userId !== this.$i.id) { - if (this.appearNote.mentions && this.appearNote.mentions.includes(this.$i.id)) { - this.connection.send('readMention', { id: this.appearNote.id }); - } - if (this.appearNote.visibleUserIds && this.appearNote.visibleUserIds.includes(this.$i.id)) { - this.connection.send('readSpecifiedNote', { id: this.appearNote.id }); - } - } + this.connection.send(document.body.contains(this.$el) ? 'sn' : 's', { id: this.appearNote.id }); if (withHandler) this.connection.on('noteUpdated', this.onStreamNoteUpdated); } }, diff --git a/src/server/api/endpoints/notes/mentions.ts b/src/server/api/endpoints/notes/mentions.ts index 56640ec1ab..30844774e0 100644 --- a/src/server/api/endpoints/notes/mentions.ts +++ b/src/server/api/endpoints/notes/mentions.ts @@ -1,12 +1,12 @@ import $ from 'cafy'; import { ID } from '../../../../misc/cafy-id'; import define from '../../define'; +import read from '../../../../services/note/read'; import { Notes, Followings } from '../../../../models'; import { generateVisibilityQuery } from '../../common/generate-visibility-query'; import { generateMutedUserQuery } from '../../common/generate-muted-user-query'; import { makePaginationQuery } from '../../common/make-pagination-query'; import { Brackets } from 'typeorm'; -import { readMention } from '../../../../services/note/read-mention'; export const meta = { desc: { @@ -83,7 +83,9 @@ export default define(meta, async (ps, user) => { const mentions = await query.take(ps.limit!).getMany(); - readMention(user.id, mentions.map(n => n.id)); + for (const note of mentions) { + read(user.id, note.id); + } return await Notes.packMany(mentions, user); }); diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts index 748e894f83..f67faee1ce 100644 --- a/src/server/api/stream/index.ts +++ b/src/server/api/stream/index.ts @@ -2,6 +2,7 @@ import autobind from 'autobind-decorator'; import * as websocket from 'websocket'; import { readNotification } from '../common/read-notification'; import call from '../call'; +import readNote from '../../../services/note/read'; import Channel from './channel'; import channels from './channels'; import { EventEmitter } from 'events'; @@ -13,8 +14,6 @@ import { AccessToken } from '../../../models/entities/access-token'; import { UserProfile } from '../../../models/entities/user-profile'; import { publishChannelStream, publishGroupMessagingStream, publishMessagingStream } from '../../../services/stream'; import { UserGroup } from '../../../models/entities/user-group'; -import { readMention } from '../../../services/note/read-mention'; -import { readSpecifiedNote } from '../../../services/note/read-specified-note'; /** * Main stream connection @@ -116,10 +115,9 @@ export default class Connection { switch (type) { case 'api': this.onApiRequest(body); break; case 'readNotification': this.onReadNotification(body); break; - case 'readMention': this.onReadMention(body); break; - case 'readSpecifiedNote': this.onReadSpecifiedNote(body); break; - case 'subNote': this.onSubscribeNote(body); break; - case 'sn': this.onSubscribeNote(body); break; // alias + case 'subNote': this.onSubscribeNote(body, true); break; + case 'sn': this.onSubscribeNote(body, true); break; // alias + case 's': this.onSubscribeNote(body, false); break; case 'unsubNote': this.onUnsubscribeNote(body); break; case 'un': this.onUnsubscribeNote(body); break; // alias case 'connect': this.onChannelConnectRequested(body); break; @@ -172,31 +170,11 @@ export default class Connection { readNotification(this.user!.id, [payload.id]); } - @autobind - private onReadMention(payload: any) { - if (!payload.id) return; - if (this.user) { - // TODO: ある程度まとめてreadMentionするようにする - // 具体的には、この箇所ではキュー的な配列にread予定ノートを溜めておくに留めて、別の箇所で定期的にキューにあるノートを配列でreadMentionに渡すような実装にする - readMention(this.user.id, [payload.id]); - } - } - - @autobind - private onReadSpecifiedNote(payload: any) { - if (!payload.id) return; - if (this.user) { - // TODO: ある程度まとめてreadSpecifiedNoteするようにする - // 具体的には、この箇所ではキュー的な配列にread予定ノートを溜めておくに留めて、別の箇所で定期的にキューにあるノートを配列でreadSpecifiedNoteに渡すような実装にする - readSpecifiedNote(this.user.id, [payload.id]); - } - } - /** * 投稿購読要求時 */ @autobind - private onSubscribeNote(payload: any) { + private onSubscribeNote(payload: any, read: boolean) { if (!payload.id) return; if (this.subscribingNotes[payload.id] == null) { @@ -208,6 +186,12 @@ export default class Connection { if (this.subscribingNotes[payload.id] === 1) { this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage); } + + if (this.user && read) { + // TODO: クライアントでタイムライン読み込みなどすると、一度に大量のreadNoteが発生しクエリ数がすごいことになるので、ある程度まとめてreadNoteするようにする + // 具体的には、この箇所ではキュー的な配列にread予定ノートを溜めておくに留めて、別の箇所で定期的にキューにあるノートを配列でreadNoteに渡すような実装にする + readNote(this.user.id, payload.id); + } } /** diff --git a/src/services/note/read-mention.ts b/src/services/note/read-mention.ts deleted file mode 100644 index 2a668ecd6c..0000000000 --- a/src/services/note/read-mention.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { publishMainStream } from '../stream'; -import { Note } from '../../models/entities/note'; -import { User } from '../../models/entities/user'; -import { NoteUnreads } from '../../models'; -import { In } from 'typeorm'; - -/** - * Mark a mention note as read - */ -export async function readMention( - userId: User['id'], - noteIds: Note['id'][] -) { - // Remove the records - await NoteUnreads.delete({ - userId: userId, - noteId: In(noteIds), - }); - - const mentionsCount = await NoteUnreads.count({ - userId: userId, - isMentioned: true - }); - - if (mentionsCount === 0) { - // 全て既読になったイベントを発行 - publishMainStream(userId, 'readAllUnreadMentions'); - } -} diff --git a/src/services/note/read-specified-note.ts b/src/services/note/read-specified-note.ts deleted file mode 100644 index 0fcb66bf98..0000000000 --- a/src/services/note/read-specified-note.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { publishMainStream } from '../stream'; -import { Note } from '../../models/entities/note'; -import { User } from '../../models/entities/user'; -import { NoteUnreads } from '../../models'; -import { In } from 'typeorm'; - -/** - * Mark a specified note as read - */ -export async function readSpecifiedNote( - userId: User['id'], - noteIds: Note['id'][] -) { - // Remove the records - await NoteUnreads.delete({ - userId: userId, - noteId: In(noteIds), - }); - - const specifiedCount = await NoteUnreads.count({ - userId: userId, - isSpecified: true - }); - - if (specifiedCount === 0) { - // 全て既読になったイベントを発行 - publishMainStream(userId, 'readAllUnreadSpecifiedNotes'); - } -} diff --git a/src/services/note/read.ts b/src/services/note/read.ts new file mode 100644 index 0000000000..5a39ab30b7 --- /dev/null +++ b/src/services/note/read.ts @@ -0,0 +1,105 @@ +import { publishMainStream } from '../stream'; +import { Note } from '../../models/entities/note'; +import { User } from '../../models/entities/user'; +import { NoteUnreads, Antennas, AntennaNotes, Users } from '../../models'; +import { Not, IsNull } from 'typeorm'; + +/** + * Mark a note as read + */ +export default async function( + userId: User['id'], + noteId: Note['id'] +) { + async function careNoteUnreads() { + const exist = await NoteUnreads.findOne({ + userId: userId, + noteId: noteId, + }); + + if (!exist) return; + + // Remove the record + await NoteUnreads.delete({ + userId: userId, + noteId: noteId, + }); + + if (exist.isMentioned) { + NoteUnreads.count({ + userId: userId, + isMentioned: true + }).then(mentionsCount => { + if (mentionsCount === 0) { + // 全て既読になったイベントを発行 + publishMainStream(userId, 'readAllUnreadMentions'); + } + }); + } + + if (exist.isSpecified) { + NoteUnreads.count({ + userId: userId, + isSpecified: true + }).then(specifiedCount => { + if (specifiedCount === 0) { + // 全て既読になったイベントを発行 + publishMainStream(userId, 'readAllUnreadSpecifiedNotes'); + } + }); + } + + if (exist.noteChannelId) { + NoteUnreads.count({ + userId: userId, + noteChannelId: Not(IsNull()) + }).then(channelNoteCount => { + if (channelNoteCount === 0) { + // 全て既読になったイベントを発行 + publishMainStream(userId, 'readAllChannels'); + } + }); + } + } + + async function careAntenna() { + const beforeUnread = await Users.getHasUnreadAntenna(userId); + if (!beforeUnread) return; + + const antennas = await Antennas.find({ userId }); + + await Promise.all(antennas.map(async antenna => { + const countBefore = await AntennaNotes.count({ + antennaId: antenna.id, + read: false + }); + + if (countBefore === 0) return; + + await AntennaNotes.update({ + antennaId: antenna.id, + noteId: noteId + }, { + read: true + }); + + const countAfter = await AntennaNotes.count({ + antennaId: antenna.id, + read: false + }); + + if (countAfter === 0) { + publishMainStream(userId, 'readAntenna', antenna); + } + })); + + Users.getHasUnreadAntenna(userId).then(unread => { + if (!unread) { + publishMainStream(userId, 'readAllAntennas'); + } + }); + } + + careNoteUnreads(); + careAntenna(); +} -- cgit v1.2.3-freya From 667d58bad4544d6e9dc75cfc4e6216179e2bc1aa Mon Sep 17 00:00:00 2001 From: syuilo Date: Sun, 21 Mar 2021 17:38:09 +0900 Subject: better note read handling --- src/client/components/note-detailed.vue | 3 +- src/client/components/note.vue | 3 +- src/client/ui/chat/note.vue | 3 +- src/daemons/janitor.ts | 2 + src/server/api/endpoints/notes/mentions.ts | 4 +- src/server/api/stream/channels/antenna.ts | 2 + src/server/api/stream/channels/channel.ts | 2 + src/server/api/stream/channels/global-timeline.ts | 2 + src/server/api/stream/channels/hashtag.ts | 2 + src/server/api/stream/channels/home-timeline.ts | 2 + src/server/api/stream/channels/hybrid-timeline.ts | 2 + src/server/api/stream/channels/local-timeline.ts | 2 + src/server/api/stream/channels/main.ts | 8 ++- src/server/api/stream/index.ts | 58 +++++++++++++--- src/services/note/read.ts | 80 +++++++++-------------- 15 files changed, 109 insertions(+), 66 deletions(-) (limited to 'src/server/api/stream') diff --git a/src/client/components/note-detailed.vue b/src/client/components/note-detailed.vue index 1ef3f43389..ea26d31100 100644 --- a/src/client/components/note-detailed.vue +++ b/src/client/components/note-detailed.vue @@ -350,7 +350,8 @@ export default defineComponent({ capture(withHandler = false) { if (this.$i) { - this.connection.send(document.body.contains(this.$el) ? 'sn' : 's', { id: this.appearNote.id }); + // TODO: このノートがストリーミング経由で流れてきた場合のみ sr する + this.connection.send(document.body.contains(this.$el) ? 'sr' : 's', { id: this.appearNote.id }); if (withHandler) this.connection.on('noteUpdated', this.onStreamNoteUpdated); } }, diff --git a/src/client/components/note.vue b/src/client/components/note.vue index 65e09b7802..70f49fef7e 100644 --- a/src/client/components/note.vue +++ b/src/client/components/note.vue @@ -325,7 +325,8 @@ export default defineComponent({ capture(withHandler = false) { if (this.$i) { - this.connection.send(document.body.contains(this.$el) ? 'sn' : 's', { id: this.appearNote.id }); + // TODO: このノートがストリーミング経由で流れてきた場合のみ sr する + this.connection.send(document.body.contains(this.$el) ? 'sr' : 's', { id: this.appearNote.id }); if (withHandler) this.connection.on('noteUpdated', this.onStreamNoteUpdated); } }, diff --git a/src/client/ui/chat/note.vue b/src/client/ui/chat/note.vue index 5a4a13d889..97275875ca 100644 --- a/src/client/ui/chat/note.vue +++ b/src/client/ui/chat/note.vue @@ -325,7 +325,8 @@ export default defineComponent({ capture(withHandler = false) { if (this.$i) { - this.connection.send(document.body.contains(this.$el) ? 'sn' : 's', { id: this.appearNote.id }); + // TODO: このノートがストリーミング経由で流れてきた場合のみ sr する + this.connection.send(document.body.contains(this.$el) ? 'sr' : 's', { id: this.appearNote.id }); if (withHandler) this.connection.on('noteUpdated', this.onStreamNoteUpdated); } }, diff --git a/src/daemons/janitor.ts b/src/daemons/janitor.ts index 462ebf915c..c079086427 100644 --- a/src/daemons/janitor.ts +++ b/src/daemons/janitor.ts @@ -1,3 +1,5 @@ +// TODO: 消したい + const interval = 30 * 60 * 1000; import { AttestationChallenges } from '../models'; import { LessThan } from 'typeorm'; diff --git a/src/server/api/endpoints/notes/mentions.ts b/src/server/api/endpoints/notes/mentions.ts index 30844774e0..30368ea578 100644 --- a/src/server/api/endpoints/notes/mentions.ts +++ b/src/server/api/endpoints/notes/mentions.ts @@ -83,9 +83,7 @@ export default define(meta, async (ps, user) => { const mentions = await query.take(ps.limit!).getMany(); - for (const note of mentions) { - read(user.id, note.id); - } + read(user.id, mentions.map(note => note.id)); return await Notes.packMany(mentions, user); }); diff --git a/src/server/api/stream/channels/antenna.ts b/src/server/api/stream/channels/antenna.ts index b5a792f814..36a474f2ac 100644 --- a/src/server/api/stream/channels/antenna.ts +++ b/src/server/api/stream/channels/antenna.ts @@ -27,6 +27,8 @@ export default class extends Channel { // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する if (isMutedUserRelated(note, this.muting)) return; + this.connection.cacheNote(note); + this.send('note', note); } else { this.send(type, body); diff --git a/src/server/api/stream/channels/channel.ts b/src/server/api/stream/channels/channel.ts index aa570d1ef4..47a52465b2 100644 --- a/src/server/api/stream/channels/channel.ts +++ b/src/server/api/stream/channels/channel.ts @@ -43,6 +43,8 @@ export default class extends Channel { // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する if (isMutedUserRelated(note, this.muting)) return; + this.connection.cacheNote(note); + this.send('note', note); } diff --git a/src/server/api/stream/channels/global-timeline.ts b/src/server/api/stream/channels/global-timeline.ts index 8c97e67226..8353f45323 100644 --- a/src/server/api/stream/channels/global-timeline.ts +++ b/src/server/api/stream/channels/global-timeline.ts @@ -56,6 +56,8 @@ export default class extends Channel { // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + this.connection.cacheNote(note); + this.send('note', note); } diff --git a/src/server/api/stream/channels/hashtag.ts b/src/server/api/stream/channels/hashtag.ts index 41447039d5..1b7f8efcc1 100644 --- a/src/server/api/stream/channels/hashtag.ts +++ b/src/server/api/stream/channels/hashtag.ts @@ -37,6 +37,8 @@ export default class extends Channel { // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する if (isMutedUserRelated(note, this.muting)) return; + this.connection.cacheNote(note); + this.send('note', note); } diff --git a/src/server/api/stream/channels/home-timeline.ts b/src/server/api/stream/channels/home-timeline.ts index 6cfa6eae7b..59ba31c316 100644 --- a/src/server/api/stream/channels/home-timeline.ts +++ b/src/server/api/stream/channels/home-timeline.ts @@ -64,6 +64,8 @@ export default class extends Channel { // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + this.connection.cacheNote(note); + this.send('note', note); } diff --git a/src/server/api/stream/channels/hybrid-timeline.ts b/src/server/api/stream/channels/hybrid-timeline.ts index a9e577cacb..9715e9973f 100644 --- a/src/server/api/stream/channels/hybrid-timeline.ts +++ b/src/server/api/stream/channels/hybrid-timeline.ts @@ -73,6 +73,8 @@ export default class extends Channel { // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + this.connection.cacheNote(note); + this.send('note', note); } diff --git a/src/server/api/stream/channels/local-timeline.ts b/src/server/api/stream/channels/local-timeline.ts index a3a5e491fc..e159c72d60 100644 --- a/src/server/api/stream/channels/local-timeline.ts +++ b/src/server/api/stream/channels/local-timeline.ts @@ -58,6 +58,8 @@ export default class extends Channel { // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; + this.connection.cacheNote(note); + this.send('note', note); } diff --git a/src/server/api/stream/channels/main.ts b/src/server/api/stream/channels/main.ts index b69c2ec355..780bc0b89f 100644 --- a/src/server/api/stream/channels/main.ts +++ b/src/server/api/stream/channels/main.ts @@ -18,18 +18,22 @@ export default class extends Channel { case 'notification': { if (this.muting.has(body.userId)) return; if (body.note && body.note.isHidden) { - body.note = await Notes.pack(body.note.id, this.user, { + const note = await Notes.pack(body.note.id, this.user, { detail: true }); + this.connection.cacheNote(note); + body.note = note; } break; } case 'mention': { if (this.muting.has(body.userId)) return; if (body.isHidden) { - body = await Notes.pack(body.id, this.user, { + const note = await Notes.pack(body.id, this.user, { detail: true }); + this.connection.cacheNote(note); + body = note; } break; } diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts index f67faee1ce..99ae558696 100644 --- a/src/server/api/stream/index.ts +++ b/src/server/api/stream/index.ts @@ -14,6 +14,7 @@ import { AccessToken } from '../../../models/entities/access-token'; import { UserProfile } from '../../../models/entities/user-profile'; import { publishChannelStream, publishGroupMessagingStream, publishMessagingStream } from '../../../services/stream'; import { UserGroup } from '../../../models/entities/user-group'; +import { PackedNote } from '../../../models/repositories/note'; /** * Main stream connection @@ -29,6 +30,7 @@ export default class Connection { public subscriber: EventEmitter; private channels: Channel[] = []; private subscribingNotes: any = {}; + private cachedNotes: PackedNote[] = []; constructor( wsConnection: websocket.connection, @@ -115,9 +117,9 @@ export default class Connection { switch (type) { case 'api': this.onApiRequest(body); break; case 'readNotification': this.onReadNotification(body); break; - case 'subNote': this.onSubscribeNote(body, true); break; - case 'sn': this.onSubscribeNote(body, true); break; // alias - case 's': this.onSubscribeNote(body, false); break; + case 'subNote': this.onSubscribeNote(body); break; + case 's': this.onSubscribeNote(body); break; // alias + case 'sr': this.onSubscribeNote(body); this.readNote(body); break; case 'unsubNote': this.onUnsubscribeNote(body); break; case 'un': this.onUnsubscribeNote(body); break; // alias case 'connect': this.onChannelConnectRequested(body); break; @@ -138,6 +140,48 @@ export default class Connection { this.sendMessageToWs(type, body); } + @autobind + public cacheNote(note: PackedNote) { + const add = (note: PackedNote) => { + const existIndex = this.cachedNotes.findIndex(n => n.id === note.id); + if (existIndex > -1) { + this.cachedNotes[existIndex] = note; + return; + } + + this.cachedNotes.unshift(note); + if (this.cachedNotes.length > 32) { + this.cachedNotes.splice(32); + } + }; + + add(note); + if (note.reply) add(note.reply); + if (note.renote) add(note.renote); + } + + @autobind + private readNote(body: any) { + const id = body.id; + + const note = this.cachedNotes.find(n => n.id === id); + if (note == null) return; + + if (this.user && (note.userId !== this.user.id)) { + if (note.mentions && note.mentions.includes(this.user.id)) { + readNote(this.user.id, [note]); + } else if (note.visibleUserIds && note.visibleUserIds.includes(this.user.id)) { + readNote(this.user.id, [note]); + } + + if (this.followingChannels.has(note.channelId)) { + // TODO + } + + // TODO: アンテナの既読処理 + } + } + /** * APIリクエスト要求時 */ @@ -174,7 +218,7 @@ export default class Connection { * 投稿購読要求時 */ @autobind - private onSubscribeNote(payload: any, read: boolean) { + private onSubscribeNote(payload: any) { if (!payload.id) return; if (this.subscribingNotes[payload.id] == null) { @@ -186,12 +230,6 @@ export default class Connection { if (this.subscribingNotes[payload.id] === 1) { this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage); } - - if (this.user && read) { - // TODO: クライアントでタイムライン読み込みなどすると、一度に大量のreadNoteが発生しクエリ数がすごいことになるので、ある程度まとめてreadNoteするようにする - // 具体的には、この箇所ではキュー的な配列にread予定ノートを溜めておくに留めて、別の箇所で定期的にキューにあるノートを配列でreadNoteに渡すような実装にする - readNote(this.user.id, payload.id); - } } /** diff --git a/src/services/note/read.ts b/src/services/note/read.ts index 5a39ab30b7..35279db411 100644 --- a/src/services/note/read.ts +++ b/src/services/note/read.ts @@ -2,70 +2,54 @@ import { publishMainStream } from '../stream'; import { Note } from '../../models/entities/note'; import { User } from '../../models/entities/user'; import { NoteUnreads, Antennas, AntennaNotes, Users } from '../../models'; -import { Not, IsNull } from 'typeorm'; +import { Not, IsNull, In } from 'typeorm'; /** - * Mark a note as read + * Mark notes as read */ export default async function( userId: User['id'], - noteId: Note['id'] + noteIds: Note['id'][] ) { async function careNoteUnreads() { - const exist = await NoteUnreads.findOne({ - userId: userId, - noteId: noteId, - }); - - if (!exist) return; - // Remove the record await NoteUnreads.delete({ userId: userId, - noteId: noteId, + noteId: In(noteIds), }); - if (exist.isMentioned) { - NoteUnreads.count({ - userId: userId, - isMentioned: true - }).then(mentionsCount => { - if (mentionsCount === 0) { - // 全て既読になったイベントを発行 - publishMainStream(userId, 'readAllUnreadMentions'); - } - }); - } + NoteUnreads.count({ + userId: userId, + isMentioned: true + }).then(mentionsCount => { + if (mentionsCount === 0) { + // 全て既読になったイベントを発行 + publishMainStream(userId, 'readAllUnreadMentions'); + } + }); - if (exist.isSpecified) { - NoteUnreads.count({ - userId: userId, - isSpecified: true - }).then(specifiedCount => { - if (specifiedCount === 0) { - // 全て既読になったイベントを発行 - publishMainStream(userId, 'readAllUnreadSpecifiedNotes'); - } - }); - } + NoteUnreads.count({ + userId: userId, + isSpecified: true + }).then(specifiedCount => { + if (specifiedCount === 0) { + // 全て既読になったイベントを発行 + publishMainStream(userId, 'readAllUnreadSpecifiedNotes'); + } + }); - if (exist.noteChannelId) { - NoteUnreads.count({ - userId: userId, - noteChannelId: Not(IsNull()) - }).then(channelNoteCount => { - if (channelNoteCount === 0) { - // 全て既読になったイベントを発行 - publishMainStream(userId, 'readAllChannels'); - } - }); - } + NoteUnreads.count({ + userId: userId, + noteChannelId: Not(IsNull()) + }).then(channelNoteCount => { + if (channelNoteCount === 0) { + // 全て既読になったイベントを発行 + publishMainStream(userId, 'readAllChannels'); + } + }); } async function careAntenna() { - const beforeUnread = await Users.getHasUnreadAntenna(userId); - if (!beforeUnread) return; - const antennas = await Antennas.find({ userId }); await Promise.all(antennas.map(async antenna => { @@ -78,7 +62,7 @@ export default async function( await AntennaNotes.update({ antennaId: antenna.id, - noteId: noteId + noteId: In(noteIds) }, { read: true }); -- cgit v1.2.3-freya