summaryrefslogtreecommitdiff
path: root/src/web/app/common/scripts/streaming/stream.ts
diff options
context:
space:
mode:
authorsyuilo <syuilotan@yahoo.co.jp>2017-11-17 01:24:44 +0900
committersyuilo <syuilotan@yahoo.co.jp>2017-11-17 01:24:44 +0900
commitc614e6f5d73b3b4314c7f6abf52260e58cc176ad (patch)
treeb8ad5a47d532a9af4060022e5ef3de4166f884c5 /src/web/app/common/scripts/streaming/stream.ts
parenttypo (diff)
downloadsharkey-c614e6f5d73b3b4314c7f6abf52260e58cc176ad.tar.gz
sharkey-c614e6f5d73b3b4314c7f6abf52260e58cc176ad.tar.bz2
sharkey-c614e6f5d73b3b4314c7f6abf52260e58cc176ad.zip
#925, #926, and refactoring
Diffstat (limited to 'src/web/app/common/scripts/streaming/stream.ts')
-rw-r--r--src/web/app/common/scripts/streaming/stream.ts95
1 files changed, 95 insertions, 0 deletions
diff --git a/src/web/app/common/scripts/streaming/stream.ts b/src/web/app/common/scripts/streaming/stream.ts
new file mode 100644
index 0000000000..97ebdcdd74
--- /dev/null
+++ b/src/web/app/common/scripts/streaming/stream.ts
@@ -0,0 +1,95 @@
+import { EventEmitter } from 'eventemitter3';
+import * as ReconnectingWebsocket from 'reconnecting-websocket';
+import CONFIG from '../config';
+
+/**
+ * Misskey stream connection
+ */
+export default class Connection extends EventEmitter {
+ private state: string;
+ private buffer: any[];
+ private socket: ReconnectingWebsocket;
+
+ constructor(endpoint, params?) {
+ super();
+
+ //#region BIND
+ this.onOpen = this.onOpen.bind(this);
+ this.onClose = this.onClose.bind(this);
+ this.onMessage = this.onMessage.bind(this);
+ this.send = this.send.bind(this);
+ this.close = this.close.bind(this);
+ //#endregion
+
+ this.state = 'initializing';
+ this.buffer = [];
+
+ const host = CONFIG.apiUrl.replace('http', 'ws');
+ const query = params
+ ? Object.keys(params)
+ .map(k => encodeURIComponent(k) + '=' + encodeURIComponent(params[k]))
+ .join('&')
+ : null;
+
+ this.socket = new ReconnectingWebsocket(`${host}/${endpoint}${query ? '?' + query : ''}`);
+ this.socket.addEventListener('open', this.onOpen);
+ this.socket.addEventListener('close', this.onClose);
+ this.socket.addEventListener('message', this.onMessage);
+ }
+
+ /**
+ * Callback of when open connection
+ */
+ private onOpen() {
+ this.state = 'connected';
+ this.emit('_connected_');
+
+ // バッファーを処理
+ const _buffer = [].concat(this.buffer); // Shallow copy
+ this.buffer = []; // Clear buffer
+ _buffer.forEach(message => {
+ this.send(message); // Resend each buffered messages
+ });
+ }
+
+ /**
+ * Callback of when close connection
+ */
+ private onClose() {
+ this.state = 'reconnecting';
+ this.emit('_closed_');
+ }
+
+ /**
+ * Callback of when received a message from connection
+ */
+ private onMessage(message) {
+ try {
+ const msg = JSON.parse(message.data);
+ if (msg.type) this.emit(msg.type, msg.body);
+ } catch (e) {
+ // noop
+ }
+ }
+
+ /**
+ * Send a message to connection
+ */
+ public send(message) {
+ // まだ接続が確立されていなかったらバッファリングして次に接続した時に送信する
+ if (this.state != 'connected') {
+ this.buffer.push(message);
+ return;
+ }
+
+ this.socket.send(JSON.stringify(message));
+ }
+
+ /**
+ * Close this connection
+ */
+ public close() {
+ this.socket.removeEventListener('open', this.onOpen);
+ this.socket.removeEventListener('message', this.onMessage);
+ }
+}