summaryrefslogtreecommitdiff
path: root/src/room/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/room/mod.rs')
-rw-r--r--src/room/mod.rs128
1 files changed, 0 insertions, 128 deletions
diff --git a/src/room/mod.rs b/src/room/mod.rs
deleted file mode 100644
index 8b3d8c2..0000000
--- a/src/room/mod.rs
+++ /dev/null
@@ -1,128 +0,0 @@
-use std::{time::Duration, collections::HashSet};
-
-use axum::extract::ws::WebSocket;
-use tokio::sync::mpsc;
-
-mod websocket;
-mod messages;
-mod handle;
-
-use messages::{ClientMessage, ServerMessage};
-
-pub enum RoomMessage {
- Add(WebSocket),
- Remove(usize),
- WsMessage(usize, ClientMessage),
-}
-
-pub type Client = mpsc::Sender<ServerMessage>;
-pub type Clients = Vec<Option<Client>>;
-
-pub type Room = mpsc::Sender<RoomMessage>;
-
-// spawns a task for the room that listens for incoming messages from websockets as well as connections and disconnections
-pub fn start_room(room_id: String, room_service: super::rooms::RoomService) -> Room {
- let (tx, rx) = mpsc::channel::<RoomMessage>(20);
-
- let txret = tx.clone();
-
- tokio::spawn(room_task(tx, rx, room_id, room_service));
-
- txret
-}
-
-async fn room_task(tx: mpsc::Sender<RoomMessage>, mut rx: mpsc::Receiver<RoomMessage>, room_id: String, room_service: super::rooms::RoomService) {
- let mut ws = Vec::new();
- let mut state_requests = HashSet::new();
- let mut pending: Vec<(Option<usize>, Option<usize>)> = Vec::new();
-
- while let Some(message) = rx.recv().await {
- match message {
- RoomMessage::Add(w) => { // a new connection is added
- // create channels for the websocket and start a task to send and receive from it
- let (wstx, wsrx) = mpsc::channel(5);
- let id = ws.len();
- ws.push(Some(wstx));
- tokio::spawn(websocket::start_ws(w, id, tx.clone(), wsrx));
-
- if conn_count(&ws) < 2 { // the first connection is on frame 0
- handle::send_connections(&mut ws, Some(id), None, 0).await;
- } else {
- // connections need to be added on a specific frame
- // so ask the clients for a frame to put this event on
- broadcast(&mut ws, ServerMessage::FrameRequest, Some(id)).await;
- pending.push((Some(id), None));
- }
- },
- RoomMessage::Remove(id) => { // a connection is closed (sent by the websocket task on exiting)
- // only remove it if it exists
- if let Some(item) = ws.get_mut(id) {
- *item = None;
- };
- let count = conn_count(&ws);
- if count == 0 { // remove rooms once they become empty
- room_service.send(super::rooms::RoomServiceRequest::Remove(room_id.clone())).await.ok();
- break;
- }
-
- // disconnections happen on a specific frame, ask the clients for a frame
- broadcast(&mut ws, ServerMessage::FrameRequest, None).await;
- pending.push((None, Some(id)));
- },
- RoomMessage::WsMessage(id, msg) => { // new data from a websocket
- handle::handle(&mut ws, &mut state_requests, &mut pending, id, msg).await;
- }
- }
- }
-}
-
-// send the websocket to the room task
-pub async fn add_connection(tx: &Room, ws: WebSocket) {
- tx.send_timeout(RoomMessage::Add(ws), Duration::from_secs(1)).await.ok();
-}
-
-pub fn conn_count(v: &Clients) -> usize {
- v.iter().filter(|i| i.is_some()).count()
-}
-
-// send a message to all or some of the clients, in parallel rather than series,
-// based on a callback
-pub async fn send(v: &mut Clients, create_message: impl Fn(usize, &Client) -> Option<ServerMessage>) -> usize {
- let tasks = v.iter()
- .enumerate()
- .map(|(id, c)| {
- // send to existing clients
- let Some(client) = c.clone() else {
- return None;
- };
-
- let Some(msg) = create_message(id, &client) else {
- return None;
- };
-
- Some(tokio::spawn(async move {
- client.send(msg).await.ok();
- }))
- });
-
- let count = tasks.len();
- // make sure all the tasks complete
- for task in tasks {
- if let Some(t) = task {
- t.await.ok();
- }
- }
-
- count
-}
-
-// send a message to all the websockets in the room (optionally excluding one)
-pub async fn broadcast(v: &mut Clients, msg: ServerMessage, except: Option<usize>) -> usize {
- send(v, |id, _client| {
- if Some(id) == except {
- return None;
- }
-
- Some(msg.clone())
- }).await
-}