diff options
| author | syuilo <Syuilotan@yahoo.co.jp> | 2018-10-07 11:06:17 +0900 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-10-07 11:06:17 +0900 |
| commit | d0570d7fe3a3bf3c6b0312dece74bacc04c3534a (patch) | |
| tree | 698218279a38f9c78b0350e81b8ac77ae52e4a0d /src/server/api/stream/index.ts | |
| parent | Fix お知らせが確認中...のままになる(Announcement Fetching...) (... (diff) | |
| download | misskey-d0570d7fe3a3bf3c6b0312dece74bacc04c3534a.tar.gz misskey-d0570d7fe3a3bf3c6b0312dece74bacc04c3534a.tar.bz2 misskey-d0570d7fe3a3bf3c6b0312dece74bacc04c3534a.zip | |
V10 (#2826)
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* Update CHANGELOG.md
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* wip
* Update CHANGELOG.md
* Update CHANGELOG.md
* wip
* Update CHANGELOG.md
* wip
* wip
* wip
* wip
Diffstat (limited to 'src/server/api/stream/index.ts')
| -rw-r--r-- | src/server/api/stream/index.ts | 213 |
1 files changed, 213 insertions, 0 deletions
diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts new file mode 100644 index 0000000000..bd99f2755e --- /dev/null +++ b/src/server/api/stream/index.ts @@ -0,0 +1,213 @@ +import autobind from 'autobind-decorator'; +import * as websocket from 'websocket'; +import Xev from 'xev'; +import * as debug from 'debug'; + +import User, { IUser } from '../../../models/user'; +import readNotification from '../common/read-notification'; +import call from '../call'; +import { IApp } from '../../../models/app'; +import readNote from '../../../services/note/read'; + +import Channel from './channel'; +import channels from './channels'; + +const log = debug('misskey'); + +/** + * Main stream connection + */ +export default class Connection { + public user?: IUser; + public app: IApp; + private wsConnection: websocket.connection; + public subscriber: Xev; + private channels: Channel[] = []; + private subscribingNotes: any = {}; + + constructor( + wsConnection: websocket.connection, + subscriber: Xev, + user: IUser, + app: IApp + ) { + this.wsConnection = wsConnection; + this.user = user; + this.app = app; + this.subscriber = subscriber; + + this.wsConnection.on('message', this.onWsConnectionMessage); + } + + /** + * クライアントからメッセージ受信時 + */ + @autobind + private async onWsConnectionMessage(data: websocket.IMessage) { + const { type, body } = JSON.parse(data.utf8Data); + + switch (type) { + case 'api': this.onApiRequest(body); break; + case 'alive': this.onAlive(); break; + case 'readNotification': this.onReadNotification(body); break; + case 'subNote': this.onSubscribeNote(body); break; + case 'sn': this.onSubscribeNote(body); break; // alias + case 'unsubNote': this.onUnsubscribeNote(body); break; + case 'un': this.onUnsubscribeNote(body); break; // alias + case 'connect': this.onChannelConnectRequested(body); break; + case 'disconnect': this.onChannelDisconnectRequested(body); break; + case 'channel': this.onChannelMessageRequested(body); break; + } + } + + /** + * APIリクエスト要求時 + */ + @autobind + private async onApiRequest(payload: any) { + // 新鮮なデータを利用するためにユーザーをフェッチ + const user = this.user ? await User.findOne({ _id: this.user._id }) : null; + + const endpoint = payload.endpoint || payload.ep; // alias + + // 呼び出し + call(endpoint, user, this.app, payload.data).then(res => { + this.sendMessageToWs(`api:${payload.id}`, { res }); + }).catch(e => { + this.sendMessageToWs(`api:${payload.id}`, { e }); + }); + } + + @autobind + private onAlive() { + // Update lastUsedAt + User.update({ _id: this.user._id }, { + $set: { + 'lastUsedAt': new Date() + } + }); + } + + @autobind + private onReadNotification(payload: any) { + if (!payload.id) return; + readNotification(this.user._id, payload.id); + } + + /** + * 投稿購読要求時 + */ + @autobind + private onSubscribeNote(payload: any) { + if (!payload.id) return; + + if (this.subscribingNotes[payload.id] == null) { + this.subscribingNotes[payload.id] = 0; + } + + this.subscribingNotes[payload.id]++; + + if (this.subscribingNotes[payload.id] == 1) { + this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage); + } + + if (payload.read) { + readNote(this.user._id, payload.id); + } + } + + /** + * 投稿購読解除要求時 + */ + @autobind + private onUnsubscribeNote(payload: any) { + if (!payload.id) return; + + this.subscribingNotes[payload.id]--; + if (this.subscribingNotes[payload.id] <= 0) { + delete this.subscribingNotes[payload.id]; + this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage); + } + } + + @autobind + private async onNoteStreamMessage(data: any) { + this.sendMessageToWs('noteUpdated', { + id: data.body.id, + type: data.type, + body: data.body.body, + }); + } + + /** + * チャンネル接続要求時 + */ + @autobind + private onChannelConnectRequested(payload: any) { + const { channel, id, params } = payload; + log(`CH CONNECT: ${id} ${channel} by @${this.user.username}`); + this.connectChannel(id, params, (channels as any)[channel]); + } + + /** + * チャンネル切断要求時 + */ + @autobind + private onChannelDisconnectRequested(payload: any) { + const { id } = payload; + log(`CH DISCONNECT: ${id} by @${this.user.username}`); + this.disconnectChannel(id); + } + + /** + * クライアントにメッセージ送信 + */ + @autobind + public sendMessageToWs(type: string, payload: any) { + this.wsConnection.send(JSON.stringify({ + type: type, + body: payload + })); + } + + /** + * チャンネルに接続 + */ + @autobind + private connectChannel(id: string, params: any, channelClass: { new(id: string, connection: Connection): Channel }) { + const channel = new channelClass(id, this); + this.channels.push(channel); + channel.init(params); + } + + /** + * チャンネルから切断 + */ + @autobind + private disconnectChannel(id: string) { + const channel = this.channels.find(c => c.id === id); + + if (channel) { + if (channel.dispose) channel.dispose(); + this.channels = this.channels.filter(c => c.id !== id); + } + } + + @autobind + private onChannelMessageRequested(data: any) { + const channel = this.channels.find(c => c.id === data.id); + if (channel != null && channel.onMessage != null) { + channel.onMessage(data.type, data.body); + } + } + + /** + * ストリームが切れたとき + */ + @autobind + public dispose() { + this.channels.forEach(c => { + if (c.dispose) c.dispose(); + }); + } +} |