1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
import { EventEmitter } from 'eventemitter3';
import * as uuid from 'uuid';
import * as ReconnectingWebsocket from 'reconnecting-websocket';
import { wsUrl } from '../../../config';
import MiOS from '../../mios';
/**
* Misskey stream connection
*/
export default class Connection extends EventEmitter {
public state: string;
private buffer: any[];
public socket: ReconnectingWebsocket;
public name: string;
public connectedAt: Date;
public user: string = null;
public in: number = 0;
public out: number = 0;
public inout: Array<{
type: 'in' | 'out',
at: Date,
data: string
}> = [];
public id: string;
public isSuspended = false;
private os: MiOS;
constructor(os: MiOS, endpoint, params?) {
super();
//#region BIND
this.onOpen = this.onOpen.bind(this);
this.onClose = this.onClose.bind(this);
this.onMessage = this.onMessage.bind(this);
this.send = this.send.bind(this);
this.close = this.close.bind(this);
//#endregion
this.id = uuid();
this.os = os;
this.name = endpoint;
this.state = 'initializing';
this.buffer = [];
const query = params
? Object.keys(params)
.map(k => encodeURIComponent(k) + '=' + encodeURIComponent(params[k]))
.join('&')
: null;
this.socket = new ReconnectingWebsocket(`${wsUrl}/${endpoint}${query ? '?' + query : ''}`);
this.socket.addEventListener('open', this.onOpen);
this.socket.addEventListener('close', this.onClose);
this.socket.addEventListener('message', this.onMessage);
// Register this connection for debugging
this.os.registerStreamConnection(this);
}
/**
* Callback of when open connection
*/
private onOpen() {
this.state = 'connected';
this.emit('_connected_');
this.connectedAt = new Date();
// バッファーを処理
const _buffer = [].concat(this.buffer); // Shallow copy
this.buffer = []; // Clear buffer
_buffer.forEach(data => {
this.send(data); // Resend each buffered messages
if (this.os.debug) {
this.out++;
this.inout.push({ type: 'out', at: new Date(), data });
}
});
}
/**
* Callback of when close connection
*/
private onClose() {
this.state = 'reconnecting';
this.emit('_disconnected_');
}
/**
* Callback of when received a message from connection
*/
private onMessage(message) {
if (this.isSuspended) return;
if (this.os.debug) {
this.in++;
this.inout.push({ type: 'in', at: new Date(), data: message.data });
}
try {
const msg = JSON.parse(message.data);
if (msg.type) this.emit(msg.type, msg.body);
} catch (e) {
// noop
}
}
/**
* Send a message to connection
*/
public send(data) {
if (this.isSuspended) return;
// まだ接続が確立されていなかったらバッファリングして次に接続した時に送信する
if (this.state != 'connected') {
this.buffer.push(data);
return;
}
if (this.os.debug) {
this.out++;
this.inout.push({ type: 'out', at: new Date(), data });
}
this.socket.send(JSON.stringify(data));
}
/**
* Close this connection
*/
public close() {
this.os.unregisterStreamConnection(this);
this.socket.removeEventListener('open', this.onOpen);
this.socket.removeEventListener('message', this.onMessage);
}
}
|