use std::{time::Duration, collections::HashSet}; use axum::extract::ws::WebSocket; use tokio::sync::mpsc; mod websocket; mod messages; mod handle; use messages::{ClientMessage, ServerMessage}; pub enum RoomMessage { Add(WebSocket), Remove(usize), WsMessage(usize, ClientMessage), } pub type Client = mpsc::Sender; pub type Clients = Vec>; pub type Room = mpsc::Sender; // spawns a task for the room that listens for incoming messages from websockets as well as connections and disconnections pub fn start_room(room_id: String, room_service: super::rooms::RoomService) -> Room { let (tx, rx) = mpsc::channel::(20); let txret = tx.clone(); tokio::spawn(room_task(tx, rx, room_id, room_service)); txret } async fn room_task(tx: mpsc::Sender, mut rx: mpsc::Receiver, room_id: String, room_service: super::rooms::RoomService) { let mut ws = Vec::new(); let mut state_requests = HashSet::new(); let mut pending: Vec<(Option, Option)> = Vec::new(); while let Some(message) = rx.recv().await { match message { RoomMessage::Add(w) => { // a new connection is added // create channels for the websocket and start a task to send and receive from it let (wstx, wsrx) = mpsc::channel(5); let id = ws.len(); ws.push(Some(wstx)); tokio::spawn(websocket::start_ws(w, id, tx.clone(), wsrx)); if conn_count(&ws) < 2 { // the first connection is on frame 0 handle::send_connections(&mut ws, Some(id), None, 0).await; } else { // connections need to be added on a specific frame // so ask the clients for a frame to put this event on broadcast(&mut ws, ServerMessage::FrameRequest, Some(id)).await; pending.push((Some(id), None)); } }, RoomMessage::Remove(id) => { // a connection is closed (sent by the websocket task on exiting) // only remove it if it exists if let Some(item) = ws.get_mut(id) { *item = None; }; let count = conn_count(&ws); if count == 0 { // remove rooms once they become empty room_service.send(super::rooms::RoomServiceRequest::Remove(room_id.clone())).await.ok(); break; } // disconnections happen on a specific frame, ask the clients for a frame broadcast(&mut ws, ServerMessage::FrameRequest, None).await; pending.push((None, Some(id))); }, RoomMessage::WsMessage(id, msg) => { // new data from a websocket handle::handle(&mut ws, &mut state_requests, &mut pending, id, msg).await; } } } } // send the websocket to the room task pub async fn add_connection(tx: &Room, ws: WebSocket) { tx.send_timeout(RoomMessage::Add(ws), Duration::from_secs(1)).await.ok(); } pub fn conn_count(v: &Clients) -> usize { v.iter().filter(|i| i.is_some()).count() } // send a message to all or some of the clients, in parallel rather than series, // based on a callback pub async fn send(v: &mut Clients, create_message: impl Fn(usize, &Client) -> Option) -> usize { let tasks = v.iter() .enumerate() .map(|(id, c)| { // send to existing clients let Some(client) = c.clone() else { return None; }; let Some(msg) = create_message(id, &client) else { return None; }; Some(tokio::spawn(async move { client.send(msg).await.ok(); })) }); let count = tasks.len(); // make sure all the tasks complete for task in tasks { if let Some(t) = task { t.await.ok(); } } count } // send a message to all the websockets in the room (optionally excluding one) pub async fn broadcast(v: &mut Clients, msg: ServerMessage, except: Option) -> usize { send(v, |id, _client| { if Some(id) == except { return None; } Some(msg.clone()) }).await }