diff options
| author | syuilo <syuilotan@yahoo.co.jp> | 2018-10-10 20:59:10 +0900 |
|---|---|---|
| committer | syuilo <syuilotan@yahoo.co.jp> | 2018-10-10 20:59:10 +0900 |
| commit | 51c53f64d031b2da75ccd33d3d2c245e45395ce5 (patch) | |
| tree | e64f19ed6737600cf10c64ad4ef61355f0a25632 /src/client/app | |
| parent | Fix syntax error (diff) | |
| download | sharkey-51c53f64d031b2da75ccd33d3d2c245e45395ce5.tar.gz sharkey-51c53f64d031b2da75ccd33d3d2c245e45395ce5.tar.bz2 sharkey-51c53f64d031b2da75ccd33d3d2c245e45395ce5.zip | |
Fix #2881, Fix #2879
Diffstat (limited to 'src/client/app')
| -rw-r--r-- | src/client/app/common/scripts/stream.ts | 174 |
1 files changed, 122 insertions, 52 deletions
diff --git a/src/client/app/common/scripts/stream.ts b/src/client/app/common/scripts/stream.ts index c588d1bb39..215f93703a 100644 --- a/src/client/app/common/scripts/stream.ts +++ b/src/client/app/common/scripts/stream.ts @@ -11,6 +11,7 @@ export default class Stream extends EventEmitter { private stream: ReconnectingWebsocket; private state: string; private buffer: any[]; + private sharedConnectionPools: Pool[] = []; private sharedConnections: SharedConnection[] = []; private nonSharedConnections: NonSharedConnection[] = []; @@ -29,22 +30,21 @@ export default class Stream extends EventEmitter { } public useSharedConnection = (channel: string): SharedConnection => { - const existConnection = this.sharedConnections.find(c => c.channel === channel); + let pool = this.sharedConnectionPools.find(p => p.channel === channel); - if (existConnection) { - existConnection.use(); - return existConnection; - } else { - const connection = new SharedConnection(this, channel); - connection.use(); - this.sharedConnections.push(connection); - return connection; + if (pool == null) { + pool = new Pool(this, channel); + this.sharedConnectionPools.push(pool); } + + const connection = new SharedConnection(this, channel, pool); + this.sharedConnections.push(connection); + return connection; } @autobind public removeSharedConnection(connection: SharedConnection) { - this.sharedConnections = this.sharedConnections.filter(c => c.id !== connection.id); + this.sharedConnections = this.sharedConnections.filter(c => c !== connection); } public connectToChannel = (channel: string, params?: any): NonSharedConnection => { @@ -55,7 +55,7 @@ export default class Stream extends EventEmitter { @autobind public disconnectToChannel(connection: NonSharedConnection) { - this.nonSharedConnections = this.nonSharedConnections.filter(c => c.id !== connection.id); + this.nonSharedConnections = this.nonSharedConnections.filter(c => c !== connection); } /** @@ -77,8 +77,8 @@ export default class Stream extends EventEmitter { // チャンネル再接続 if (isReconnect) { - this.sharedConnections.forEach(c => { - c.connect(); + this.sharedConnectionPools.forEach(p => { + p.connect(); }); this.nonSharedConnections.forEach(c => { c.connect(); @@ -104,8 +104,18 @@ export default class Stream extends EventEmitter { if (type == 'channel') { const id = body.id; - const connection = this.sharedConnections.find(c => c.id === id) || this.nonSharedConnections.find(c => c.id === id); - connection.emit(body.type, body.body); + + let connections: Connection[]; + + connections = this.sharedConnections.filter(c => c.id === id); + + if (connections.length === 0) { + connections = [this.nonSharedConnections.find(c => c.id === id)]; + } + + connections.filter(c => c != null).forEach(c => { + c.emit(body.type, body.body); + }); } else { this.emit(type, body); } @@ -140,38 +150,86 @@ export default class Stream extends EventEmitter { } } -abstract class Connection extends EventEmitter { +class Pool { public channel: string; public id: string; - protected params: any; protected stream: Stream; + private users = 0; + private disposeTimerId: any; + private isConnected = false; - constructor(stream: Stream, channel: string, params?: any) { - super(); - - this.stream = stream; + constructor(stream: Stream, channel: string) { this.channel = channel; - this.params = params; + this.stream = stream; + this.id = Math.random().toString(); - this.connect(); + } + + @autobind + public inc() { + if (this.users === 0 && !this.isConnected) { + this.connect(); + } + + this.users++; + + // タイマー解除 + if (this.disposeTimerId) { + clearTimeout(this.disposeTimerId); + this.disposeTimerId = null; + } + } + + @autobind + public dec() { + this.users--; + + // そのコネクションの利用者が誰もいなくなったら + if (this.users === 0) { + // また直ぐに再利用される可能性があるので、一定時間待ち、 + // 新たな利用者が現れなければコネクションを切断する + this.disposeTimerId = setTimeout(() => { + this.disconnect(); + }, 3000); + } } @autobind public connect() { + this.isConnected = true; this.stream.send('connect', { channel: this.channel, - id: this.id, - params: this.params + id: this.id }); } @autobind - public send(typeOrPayload, payload?) { + private disconnect() { + this.isConnected = false; + this.disposeTimerId = null; + this.stream.send('disconnect', { id: this.id }); + } +} + +abstract class Connection extends EventEmitter { + public channel: string; + protected stream: Stream; + public abstract id: string; + + constructor(stream: Stream, channel: string) { + super(); + + this.stream = stream; + this.channel = channel; + } + + @autobind + public send(id: string, typeOrPayload, payload?) { const type = payload === undefined ? typeOrPayload.type : typeOrPayload; const body = payload === undefined ? typeOrPayload.body : payload; this.stream.send('ch', { - id: this.id, + id: id, type: type, body: body }); @@ -181,45 +239,57 @@ abstract class Connection extends EventEmitter { } class SharedConnection extends Connection { - private users = 0; - private disposeTimerId: any; + private pool: Pool; - constructor(stream: Stream, channel: string) { + public get id(): string { + return this.pool.id; + } + + constructor(stream: Stream, channel: string, pool: Pool) { super(stream, channel); + + this.pool = pool; + this.pool.inc(); } @autobind - public use() { - this.users++; - - // タイマー解除 - if (this.disposeTimerId) { - clearTimeout(this.disposeTimerId); - this.disposeTimerId = null; - } + public send(typeOrPayload, payload?) { + super.send(this.pool.id, typeOrPayload, payload); } @autobind public dispose() { - this.users--; - - // そのコネクションの利用者が誰もいなくなったら - if (this.users === 0) { - // また直ぐに再利用される可能性があるので、一定時間待ち、 - // 新たな利用者が現れなければコネクションを切断する - this.disposeTimerId = setTimeout(() => { - this.disposeTimerId = null; - this.removeAllListeners(); - this.stream.send('disconnect', { id: this.id }); - this.stream.removeSharedConnection(this); - }, 3000); - } + this.pool.dec(); + this.removeAllListeners(); + this.stream.removeSharedConnection(this); } } class NonSharedConnection extends Connection { + public id: string; + protected params: any; + constructor(stream: Stream, channel: string, params?: any) { - super(stream, channel, params); + super(stream, channel); + + this.params = params; + this.id = Math.random().toString(); + + this.connect(); + } + + @autobind + public connect() { + this.stream.send('connect', { + channel: this.channel, + id: this.id, + params: this.params + }); + } + + @autobind + public send(typeOrPayload, payload?) { + super.send(this.id, typeOrPayload, payload); } @autobind |