summaryrefslogtreecommitdiff
path: root/src/client/app/common/scripts/streaming/stream.ts
blob: 4ab78f1190f55a3142e02762966b9ed2ac6f9a57 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import { EventEmitter } from 'eventemitter3';
import * as uuid from 'uuid';
import * as ReconnectingWebsocket from 'reconnecting-websocket';
import { wsUrl } from '../../../config';
import MiOS from '../../../mios';

/**
 * Misskey stream connection
 */
export default class Connection extends EventEmitter {
	public state: string;
	private buffer: any[];
	public socket: ReconnectingWebsocket;
	public name: string;
	public connectedAt: Date;
	public user: string = null;
	public in: number = 0;
	public out: number = 0;
	public inout: Array<{
		type: 'in' | 'out',
		at: Date,
		data: string
	}> = [];
	public id: string;
	public isSuspended = false;
	private os: MiOS;

	constructor(os: MiOS, endpoint, params?) {
		super();

		//#region BIND
		this.onOpen =    this.onOpen.bind(this);
		this.onClose =   this.onClose.bind(this);
		this.onMessage = this.onMessage.bind(this);
		this.send =      this.send.bind(this);
		this.close =     this.close.bind(this);
		//#endregion

		this.id = uuid();
		this.os = os;
		this.name = endpoint;
		this.state = 'initializing';
		this.buffer = [];

		const query = params
			? Object.keys(params)
				.map(k => `${encodeURIComponent(k)}=${encodeURIComponent(params[k])}`)
				.join('&')
			: null;

		this.socket = new ReconnectingWebsocket(`${wsUrl}/${endpoint}${query ? `?${query}` : ''}`);
		this.socket.addEventListener('open', this.onOpen);
		this.socket.addEventListener('close', this.onClose);
		this.socket.addEventListener('message', this.onMessage);

		// Register this connection for debugging
		this.os.registerStreamConnection(this);
	}

	/**
	 * Callback of when open connection
	 */
	private onOpen() {
		this.state = 'connected';
		this.emit('_connected_');

		this.connectedAt = new Date();

		// バッファーを処理
		const _buffer = [].concat(this.buffer); // Shallow copy
		this.buffer = []; // Clear buffer
		_buffer.forEach(data => {
			this.send(data); // Resend each buffered messages

			if (this.os.debug) {
				this.out++;
				this.inout.push({ type: 'out', at: new Date(), data });
			}
		});
	}

	/**
	 * Callback of when close connection
	 */
	private onClose() {
		this.state = 'reconnecting';
		this.emit('_disconnected_');
	}

	/**
	 * Callback of when received a message from connection
	 */
	private onMessage(message) {
		if (this.isSuspended) return;

		if (this.os.debug) {
			this.in++;
			this.inout.push({ type: 'in', at: new Date(), data: message.data });
		}

		try {
			const msg = JSON.parse(message.data);
			if (msg.type) this.emit(msg.type, msg.body);
		} catch (e) {
			// noop
		}
	}

	/**
	 * Send a message to connection
	 */
	public send(data) {
		if (this.isSuspended) return;

		// まだ接続が確立されていなかったらバッファリングして次に接続した時に送信する
		if (this.state != 'connected') {
			this.buffer.push(data);
			return;
		}

		if (this.os.debug) {
			this.out++;
			this.inout.push({ type: 'out', at: new Date(), data });
		}

		this.socket.send(JSON.stringify(data));
	}

	/**
	 * Close this connection
	 */
	public close() {
		this.os.unregisterStreamConnection(this);
		this.socket.removeEventListener('open', this.onOpen);
		this.socket.removeEventListener('message', this.onMessage);
	}
}