use std::time::Duration; use axum::extract::ws::{WebSocket, Message}; use tokio::sync::mpsc; use super::RoomMessage; use super::messages::ServerMessage; // set up some senders and receivers so that the websocket can receive messages from the task, send messages to the task, and notify the task when it closes pub async fn start_ws(mut ws: WebSocket, id: usize, tx: mpsc::Sender, mut rx: mpsc::Receiver) { loop { tokio::select! { m = ws.recv() => { // receive from the websocket and send it to `tx` if let Some(Ok(msg)) = m { // get the string contents let optionstring = match msg { Message::Text(s) => { Some(s) }, Message::Binary(bin) => { String::from_utf8(bin).ok() }, Message::Close(_) => { // quit the whole loop on disconnect break; }, _ => None }; // ignore things that aren't strings let Some(s) = optionstring else { continue; }; // decode and send to the room match serde_json::from_str(&s) { Ok(message) => { tx.send_timeout(RoomMessage::WsMessage(id, message), Duration::from_secs(1)).await.ok(); }, Err(e) => { // let the client know if they sent a bad message if let Ok(text) = serde_json::to_string(&ServerMessage::Error{ error: format!("Failed to decode JSON message: {}: {}", e, s), }) { ws.send(Message::Text(text)).await.ok(); } } } } else { // websocket error break; } } s = rx.recv() => { // receive from `rx` and send it to the websocket if let Some(msg) = s { if let Ok(string) = serde_json::to_string(&msg) { ws.send(Message::Text(string)).await.ok(); } } else { // shouldn't happen but this is if the room drops the sender, it should close the websocket anyways break; } } } } // websocket disconnect due to either error or normal disconnect // notify the room that the socket should be removed tx.send_timeout(RoomMessage::Remove(id), Duration::from_secs(1)).await.ok(); }