summaryrefslogtreecommitdiff
path: root/server/src/room/websocket.rs
blob: 50a45374fdc814ddbe7c2a279121fbfa75557905 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
use std::time::Duration;

use axum::extract::ws::{WebSocket, Message};
use tokio::sync::mpsc;

use super::RoomMessage;
use super::messages::ServerMessage;

// set up some senders and receivers so that the websocket can receive messages from the task, send messages to the task, and notify the task when it closes
pub async fn start_ws(mut ws: WebSocket, id: usize, tx: mpsc::Sender<RoomMessage>, mut rx: mpsc::Receiver<ServerMessage>) {
	loop {
		tokio::select! {
			m = ws.recv() => { // receive from the websocket and send it to `tx`
				if let Some(Ok(msg)) = m {
					// get the string contents
					let optionstring = match msg {
						Message::Text(s) => {
							Some(s)
						},
						Message::Binary(bin) => {
							String::from_utf8(bin).ok()
						},
						Message::Close(_) => { // quit the whole loop on disconnect
							break;
						},
						_ => None
					};
					
					// ignore things that aren't strings
					let Some(s) = optionstring else {
						continue;
					};
					
					// decode and send to the room
					match serde_json::from_str(&s) {
						Ok(message) => {
							tx.send_timeout(RoomMessage::WsMessage(id, message), Duration::from_secs(1)).await.ok();
						},
						Err(e) => { // let the client know if they sent a bad message
							if let Ok(text) = serde_json::to_string(&ServerMessage::Error{
								error: format!("Failed to decode JSON message: {}: {}", e, s),
							}) {
								ws.send(Message::Text(text)).await.ok();
							}
						}
					}
				} else { // websocket error
					break;
				}
			}
			s = rx.recv() => { // receive from `rx` and send it to the websocket
				if let Some(msg) = s {
					if let Ok(string) = serde_json::to_string(&msg) {
						ws.send(Message::Text(string)).await.ok();
					}
				} else { // shouldn't happen but this is if the room drops the sender, it should close the websocket anyways
					break;
				}
			}
		}
	}
	
	// websocket disconnect due to either error or normal disconnect
	// notify the room that the socket should be removed
	tx.send_timeout(RoomMessage::Remove(id), Duration::from_secs(1)).await.ok();
}