diff options
Diffstat (limited to 'src/room/mod.rs')
-rw-r--r-- | src/room/mod.rs | 128 |
1 files changed, 0 insertions, 128 deletions
diff --git a/src/room/mod.rs b/src/room/mod.rs deleted file mode 100644 index 8b3d8c2..0000000 --- a/src/room/mod.rs +++ /dev/null @@ -1,128 +0,0 @@ -use std::{time::Duration, collections::HashSet}; - -use axum::extract::ws::WebSocket; -use tokio::sync::mpsc; - -mod websocket; -mod messages; -mod handle; - -use messages::{ClientMessage, ServerMessage}; - -pub enum RoomMessage { - Add(WebSocket), - Remove(usize), - WsMessage(usize, ClientMessage), -} - -pub type Client = mpsc::Sender<ServerMessage>; -pub type Clients = Vec<Option<Client>>; - -pub type Room = mpsc::Sender<RoomMessage>; - -// spawns a task for the room that listens for incoming messages from websockets as well as connections and disconnections -pub fn start_room(room_id: String, room_service: super::rooms::RoomService) -> Room { - let (tx, rx) = mpsc::channel::<RoomMessage>(20); - - let txret = tx.clone(); - - tokio::spawn(room_task(tx, rx, room_id, room_service)); - - txret -} - -async fn room_task(tx: mpsc::Sender<RoomMessage>, mut rx: mpsc::Receiver<RoomMessage>, room_id: String, room_service: super::rooms::RoomService) { - let mut ws = Vec::new(); - let mut state_requests = HashSet::new(); - let mut pending: Vec<(Option<usize>, Option<usize>)> = Vec::new(); - - while let Some(message) = rx.recv().await { - match message { - RoomMessage::Add(w) => { // a new connection is added - // create channels for the websocket and start a task to send and receive from it - let (wstx, wsrx) = mpsc::channel(5); - let id = ws.len(); - ws.push(Some(wstx)); - tokio::spawn(websocket::start_ws(w, id, tx.clone(), wsrx)); - - if conn_count(&ws) < 2 { // the first connection is on frame 0 - handle::send_connections(&mut ws, Some(id), None, 0).await; - } else { - // connections need to be added on a specific frame - // so ask the clients for a frame to put this event on - broadcast(&mut ws, ServerMessage::FrameRequest, Some(id)).await; - pending.push((Some(id), None)); - } - }, - RoomMessage::Remove(id) => { // a connection is closed (sent by the websocket task on exiting) - // only remove it if it exists - if let Some(item) = ws.get_mut(id) { - *item = None; - }; - let count = conn_count(&ws); - if count == 0 { // remove rooms once they become empty - room_service.send(super::rooms::RoomServiceRequest::Remove(room_id.clone())).await.ok(); - break; - } - - // disconnections happen on a specific frame, ask the clients for a frame - broadcast(&mut ws, ServerMessage::FrameRequest, None).await; - pending.push((None, Some(id))); - }, - RoomMessage::WsMessage(id, msg) => { // new data from a websocket - handle::handle(&mut ws, &mut state_requests, &mut pending, id, msg).await; - } - } - } -} - -// send the websocket to the room task -pub async fn add_connection(tx: &Room, ws: WebSocket) { - tx.send_timeout(RoomMessage::Add(ws), Duration::from_secs(1)).await.ok(); -} - -pub fn conn_count(v: &Clients) -> usize { - v.iter().filter(|i| i.is_some()).count() -} - -// send a message to all or some of the clients, in parallel rather than series, -// based on a callback -pub async fn send(v: &mut Clients, create_message: impl Fn(usize, &Client) -> Option<ServerMessage>) -> usize { - let tasks = v.iter() - .enumerate() - .map(|(id, c)| { - // send to existing clients - let Some(client) = c.clone() else { - return None; - }; - - let Some(msg) = create_message(id, &client) else { - return None; - }; - - Some(tokio::spawn(async move { - client.send(msg).await.ok(); - })) - }); - - let count = tasks.len(); - // make sure all the tasks complete - for task in tasks { - if let Some(t) = task { - t.await.ok(); - } - } - - count -} - -// send a message to all the websockets in the room (optionally excluding one) -pub async fn broadcast(v: &mut Clients, msg: ServerMessage, except: Option<usize>) -> usize { - send(v, |id, _client| { - if Some(id) == except { - return None; - } - - Some(msg.clone()) - }).await -} |