diff options
Diffstat (limited to 'src/room/mod.rs')
-rw-r--r-- | src/room/mod.rs | 128 |
1 files changed, 128 insertions, 0 deletions
diff --git a/src/room/mod.rs b/src/room/mod.rs new file mode 100644 index 0000000..8b3d8c2 --- /dev/null +++ b/src/room/mod.rs @@ -0,0 +1,128 @@ +use std::{time::Duration, collections::HashSet}; + +use axum::extract::ws::WebSocket; +use tokio::sync::mpsc; + +mod websocket; +mod messages; +mod handle; + +use messages::{ClientMessage, ServerMessage}; + +pub enum RoomMessage { + Add(WebSocket), + Remove(usize), + WsMessage(usize, ClientMessage), +} + +pub type Client = mpsc::Sender<ServerMessage>; +pub type Clients = Vec<Option<Client>>; + +pub type Room = mpsc::Sender<RoomMessage>; + +// spawns a task for the room that listens for incoming messages from websockets as well as connections and disconnections +pub fn start_room(room_id: String, room_service: super::rooms::RoomService) -> Room { + let (tx, rx) = mpsc::channel::<RoomMessage>(20); + + let txret = tx.clone(); + + tokio::spawn(room_task(tx, rx, room_id, room_service)); + + txret +} + +async fn room_task(tx: mpsc::Sender<RoomMessage>, mut rx: mpsc::Receiver<RoomMessage>, room_id: String, room_service: super::rooms::RoomService) { + let mut ws = Vec::new(); + let mut state_requests = HashSet::new(); + let mut pending: Vec<(Option<usize>, Option<usize>)> = Vec::new(); + + while let Some(message) = rx.recv().await { + match message { + RoomMessage::Add(w) => { // a new connection is added + // create channels for the websocket and start a task to send and receive from it + let (wstx, wsrx) = mpsc::channel(5); + let id = ws.len(); + ws.push(Some(wstx)); + tokio::spawn(websocket::start_ws(w, id, tx.clone(), wsrx)); + + if conn_count(&ws) < 2 { // the first connection is on frame 0 + handle::send_connections(&mut ws, Some(id), None, 0).await; + } else { + // connections need to be added on a specific frame + // so ask the clients for a frame to put this event on + broadcast(&mut ws, ServerMessage::FrameRequest, Some(id)).await; + pending.push((Some(id), None)); + } + }, + RoomMessage::Remove(id) => { // a connection is closed (sent by the websocket task on exiting) + // only remove it if it exists + if let Some(item) = ws.get_mut(id) { + *item = None; + }; + let count = conn_count(&ws); + if count == 0 { // remove rooms once they become empty + room_service.send(super::rooms::RoomServiceRequest::Remove(room_id.clone())).await.ok(); + break; + } + + // disconnections happen on a specific frame, ask the clients for a frame + broadcast(&mut ws, ServerMessage::FrameRequest, None).await; + pending.push((None, Some(id))); + }, + RoomMessage::WsMessage(id, msg) => { // new data from a websocket + handle::handle(&mut ws, &mut state_requests, &mut pending, id, msg).await; + } + } + } +} + +// send the websocket to the room task +pub async fn add_connection(tx: &Room, ws: WebSocket) { + tx.send_timeout(RoomMessage::Add(ws), Duration::from_secs(1)).await.ok(); +} + +pub fn conn_count(v: &Clients) -> usize { + v.iter().filter(|i| i.is_some()).count() +} + +// send a message to all or some of the clients, in parallel rather than series, +// based on a callback +pub async fn send(v: &mut Clients, create_message: impl Fn(usize, &Client) -> Option<ServerMessage>) -> usize { + let tasks = v.iter() + .enumerate() + .map(|(id, c)| { + // send to existing clients + let Some(client) = c.clone() else { + return None; + }; + + let Some(msg) = create_message(id, &client) else { + return None; + }; + + Some(tokio::spawn(async move { + client.send(msg).await.ok(); + })) + }); + + let count = tasks.len(); + // make sure all the tasks complete + for task in tasks { + if let Some(t) = task { + t.await.ok(); + } + } + + count +} + +// send a message to all the websockets in the room (optionally excluding one) +pub async fn broadcast(v: &mut Clients, msg: ServerMessage, except: Option<usize>) -> usize { + send(v, |id, _client| { + if Some(id) == except { + return None; + } + + Some(msg.clone()) + }).await +} |