diff options
Diffstat (limited to 'src/server/api/stream/index.ts')
| -rw-r--r-- | src/server/api/stream/index.ts | 112 |
1 files changed, 88 insertions, 24 deletions
diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts index c56a0a157b..99ae558696 100644 --- a/src/server/api/stream/index.ts +++ b/src/server/api/stream/index.ts @@ -14,6 +14,7 @@ import { AccessToken } from '../../../models/entities/access-token'; import { UserProfile } from '../../../models/entities/user-profile'; import { publishChannelStream, publishGroupMessagingStream, publishMessagingStream } from '../../../services/stream'; import { UserGroup } from '../../../models/entities/user-group'; +import { PackedNote } from '../../../models/repositories/note'; /** * Main stream connection @@ -29,10 +30,7 @@ export default class Connection { public subscriber: EventEmitter; private channels: Channel[] = []; private subscribingNotes: any = {}; - private followingClock: ReturnType<typeof setInterval>; - private mutingClock: ReturnType<typeof setInterval>; - private followingChannelsClock: ReturnType<typeof setInterval>; - private userProfileClock: ReturnType<typeof setInterval>; + private cachedNotes: PackedNote[] = []; constructor( wsConnection: websocket.connection, @@ -53,16 +51,49 @@ export default class Connection { if (this.user) { this.updateFollowing(); - this.followingClock = setInterval(this.updateFollowing, 5000); - this.updateMuting(); - this.mutingClock = setInterval(this.updateMuting, 5000); - this.updateFollowingChannels(); - this.followingChannelsClock = setInterval(this.updateFollowingChannels, 5000); - this.updateUserProfile(); - this.userProfileClock = setInterval(this.updateUserProfile, 5000); + + this.subscriber.on(`user:${this.user.id}`, ({ type, body }) => { + this.onUserEvent(type, body); + }); + } + } + + @autobind + private onUserEvent(type: string, body: any) { + switch (type) { + case 'follow': + this.following.add(body.id); + break; + + case 'unfollow': + this.following.delete(body.id); + break; + + case 'mute': + this.muting.add(body.id); + break; + + case 'unmute': + this.muting.delete(body.id); + break; + + case 'followChannel': + this.followingChannels.add(body.id); + break; + + case 'unfollowChannel': + this.followingChannels.delete(body.id); + break; + + case 'updateUserProfile': + this.userProfile = body; + break; + + default: + break; } } @@ -86,9 +117,9 @@ export default class Connection { switch (type) { case 'api': this.onApiRequest(body); break; case 'readNotification': this.onReadNotification(body); break; - case 'subNote': this.onSubscribeNote(body, true); break; - case 'sn': this.onSubscribeNote(body, true); break; // alias - case 's': this.onSubscribeNote(body, false); break; + case 'subNote': this.onSubscribeNote(body); break; + case 's': this.onSubscribeNote(body); break; // alias + case 'sr': this.onSubscribeNote(body); this.readNote(body); break; case 'unsubNote': this.onUnsubscribeNote(body); break; case 'un': this.onUnsubscribeNote(body); break; // alias case 'connect': this.onChannelConnectRequested(body); break; @@ -109,6 +140,48 @@ export default class Connection { this.sendMessageToWs(type, body); } + @autobind + public cacheNote(note: PackedNote) { + const add = (note: PackedNote) => { + const existIndex = this.cachedNotes.findIndex(n => n.id === note.id); + if (existIndex > -1) { + this.cachedNotes[existIndex] = note; + return; + } + + this.cachedNotes.unshift(note); + if (this.cachedNotes.length > 32) { + this.cachedNotes.splice(32); + } + }; + + add(note); + if (note.reply) add(note.reply); + if (note.renote) add(note.renote); + } + + @autobind + private readNote(body: any) { + const id = body.id; + + const note = this.cachedNotes.find(n => n.id === id); + if (note == null) return; + + if (this.user && (note.userId !== this.user.id)) { + if (note.mentions && note.mentions.includes(this.user.id)) { + readNote(this.user.id, [note]); + } else if (note.visibleUserIds && note.visibleUserIds.includes(this.user.id)) { + readNote(this.user.id, [note]); + } + + if (this.followingChannels.has(note.channelId)) { + // TODO + } + + // TODO: アンテナの既読処理 + } + } + /** * APIリクエスト要求時 */ @@ -145,7 +218,7 @@ export default class Connection { * 投稿購読要求時 */ @autobind - private onSubscribeNote(payload: any, read: boolean) { + private onSubscribeNote(payload: any) { if (!payload.id) return; if (this.subscribingNotes[payload.id] == null) { @@ -157,10 +230,6 @@ export default class Connection { if (this.subscribingNotes[payload.id] === 1) { this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage); } - - if (this.user && read) { - readNote(this.user.id, payload.id); - } } /** @@ -335,10 +404,5 @@ export default class Connection { for (const c of this.channels.filter(c => c.dispose)) { if (c.dispose) c.dispose(); } - - if (this.followingClock) clearInterval(this.followingClock); - if (this.mutingClock) clearInterval(this.mutingClock); - if (this.followingChannelsClock) clearInterval(this.followingChannelsClock); - if (this.userProfileClock) clearInterval(this.userProfileClock); } } |