summaryrefslogtreecommitdiff
path: root/server/src/room/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'server/src/room/mod.rs')
-rw-r--r--server/src/room/mod.rs128
1 files changed, 128 insertions, 0 deletions
diff --git a/server/src/room/mod.rs b/server/src/room/mod.rs
new file mode 100644
index 0000000..8b3d8c2
--- /dev/null
+++ b/server/src/room/mod.rs
@@ -0,0 +1,128 @@
+use std::{time::Duration, collections::HashSet};
+
+use axum::extract::ws::WebSocket;
+use tokio::sync::mpsc;
+
+mod websocket;
+mod messages;
+mod handle;
+
+use messages::{ClientMessage, ServerMessage};
+
+pub enum RoomMessage {
+ Add(WebSocket),
+ Remove(usize),
+ WsMessage(usize, ClientMessage),
+}
+
+pub type Client = mpsc::Sender<ServerMessage>;
+pub type Clients = Vec<Option<Client>>;
+
+pub type Room = mpsc::Sender<RoomMessage>;
+
+// spawns a task for the room that listens for incoming messages from websockets as well as connections and disconnections
+pub fn start_room(room_id: String, room_service: super::rooms::RoomService) -> Room {
+ let (tx, rx) = mpsc::channel::<RoomMessage>(20);
+
+ let txret = tx.clone();
+
+ tokio::spawn(room_task(tx, rx, room_id, room_service));
+
+ txret
+}
+
+async fn room_task(tx: mpsc::Sender<RoomMessage>, mut rx: mpsc::Receiver<RoomMessage>, room_id: String, room_service: super::rooms::RoomService) {
+ let mut ws = Vec::new();
+ let mut state_requests = HashSet::new();
+ let mut pending: Vec<(Option<usize>, Option<usize>)> = Vec::new();
+
+ while let Some(message) = rx.recv().await {
+ match message {
+ RoomMessage::Add(w) => { // a new connection is added
+ // create channels for the websocket and start a task to send and receive from it
+ let (wstx, wsrx) = mpsc::channel(5);
+ let id = ws.len();
+ ws.push(Some(wstx));
+ tokio::spawn(websocket::start_ws(w, id, tx.clone(), wsrx));
+
+ if conn_count(&ws) < 2 { // the first connection is on frame 0
+ handle::send_connections(&mut ws, Some(id), None, 0).await;
+ } else {
+ // connections need to be added on a specific frame
+ // so ask the clients for a frame to put this event on
+ broadcast(&mut ws, ServerMessage::FrameRequest, Some(id)).await;
+ pending.push((Some(id), None));
+ }
+ },
+ RoomMessage::Remove(id) => { // a connection is closed (sent by the websocket task on exiting)
+ // only remove it if it exists
+ if let Some(item) = ws.get_mut(id) {
+ *item = None;
+ };
+ let count = conn_count(&ws);
+ if count == 0 { // remove rooms once they become empty
+ room_service.send(super::rooms::RoomServiceRequest::Remove(room_id.clone())).await.ok();
+ break;
+ }
+
+ // disconnections happen on a specific frame, ask the clients for a frame
+ broadcast(&mut ws, ServerMessage::FrameRequest, None).await;
+ pending.push((None, Some(id)));
+ },
+ RoomMessage::WsMessage(id, msg) => { // new data from a websocket
+ handle::handle(&mut ws, &mut state_requests, &mut pending, id, msg).await;
+ }
+ }
+ }
+}
+
+// send the websocket to the room task
+pub async fn add_connection(tx: &Room, ws: WebSocket) {
+ tx.send_timeout(RoomMessage::Add(ws), Duration::from_secs(1)).await.ok();
+}
+
+pub fn conn_count(v: &Clients) -> usize {
+ v.iter().filter(|i| i.is_some()).count()
+}
+
+// send a message to all or some of the clients, in parallel rather than series,
+// based on a callback
+pub async fn send(v: &mut Clients, create_message: impl Fn(usize, &Client) -> Option<ServerMessage>) -> usize {
+ let tasks = v.iter()
+ .enumerate()
+ .map(|(id, c)| {
+ // send to existing clients
+ let Some(client) = c.clone() else {
+ return None;
+ };
+
+ let Some(msg) = create_message(id, &client) else {
+ return None;
+ };
+
+ Some(tokio::spawn(async move {
+ client.send(msg).await.ok();
+ }))
+ });
+
+ let count = tasks.len();
+ // make sure all the tasks complete
+ for task in tasks {
+ if let Some(t) = task {
+ t.await.ok();
+ }
+ }
+
+ count
+}
+
+// send a message to all the websockets in the room (optionally excluding one)
+pub async fn broadcast(v: &mut Clients, msg: ServerMessage, except: Option<usize>) -> usize {
+ send(v, |id, _client| {
+ if Some(id) == except {
+ return None;
+ }
+
+ Some(msg.clone())
+ }).await
+}