diff options
author | Tyler Murphy <tylerm@tylerm.dev> | 2023-06-12 23:47:43 -0400 |
---|---|---|
committer | Tyler Murphy <tylerm@tylerm.dev> | 2023-06-12 23:47:43 -0400 |
commit | 0a06f163d1664bfd0ff938569223294edf9902f6 (patch) | |
tree | ac0dc03c010d3e5666b6a62249347ac434f76b44 /src | |
download | tuxman-0a06f163d1664bfd0ff938569223294edf9902f6.tar.gz tuxman-0a06f163d1664bfd0ff938569223294edf9902f6.tar.bz2 tuxman-0a06f163d1664bfd0ff938569223294edf9902f6.zip |
initial
Diffstat (limited to '')
-rw-r--r-- | src/main.rs | 22 | ||||
-rw-r--r-- | src/room/handle.rs | 106 | ||||
-rw-r--r-- | src/room/messages.rs | 69 | ||||
-rw-r--r-- | src/room/mod.rs | 128 | ||||
-rw-r--r-- | src/room/websocket.rs | 66 | ||||
-rw-r--r-- | src/rooms.rs | 59 | ||||
-rw-r--r-- | src/routes.rs | 51 |
7 files changed, 501 insertions, 0 deletions
diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..34783ac --- /dev/null +++ b/src/main.rs @@ -0,0 +1,22 @@ +mod routes; +mod rooms; +mod room; + +#[tokio::main] +async fn main() { + let port = std::env::var("PORT") + .unwrap_or("8080".to_owned()) + .parse::<u16>() + .unwrap_or(8080); + + axum::Server::bind(&std::net::SocketAddr::new( + std::net::IpAddr::V6(std::net::Ipv6Addr::from(0)), + port + )) + .serve( + routes::routes() + .into_make_service() + ) + .await + .expect("Error running the web server"); +} diff --git a/src/room/handle.rs b/src/room/handle.rs new file mode 100644 index 0000000..d397c70 --- /dev/null +++ b/src/room/handle.rs @@ -0,0 +1,106 @@ +use std::collections::HashSet; + +use super::messages::{ClientMessage, ServerMessage}; + +// send a ServerMessage::Connections to all sockets +pub async fn send_connections(v: &mut super::Clients, added: Option<usize>, removed: Option<usize>, frame: u64) { + // get the list of connection IDs + let connections: Vec<usize> = v.iter() + .enumerate() + .filter(|(_, n)| n.is_some()) + .map(|(id, _)| id) + .collect(); + + super::send(v, |id, _c| { + Some(ServerMessage::Connections { + connections: connections.clone(), + added, + removed, + id, + frame, + }) + }).await; +} + +// handle incoming websocket messages +pub async fn handle( + v: &mut super::Clients, + requests: &mut HashSet<(u64, Option<usize>, usize)>, // frame, connection, client id + pending: &mut Vec<(Option<usize>, Option<usize>)>, + id: usize, + msg: ClientMessage, +) { + match msg { + // broadcast inputs to every other connection + ClientMessage::Input { data, frame } => { + super::broadcast(v, ServerMessage::Input { + data, + frame, + connection: id + }, Some(id)).await; + }, + // a client needs the current game state, grab it from another client + ClientMessage::RequestState { frame, connection } => { + let count = super::conn_count(v); + + if count < 2 { // nobody to request state *from* + if let Some(Some(client)) = v.get(id) { + client.send(ServerMessage::State { + state: serde_json::Value::Null, + frame: 0, + connection: None, + }).await.ok(); + } + return; + } + + // request state from other clients + requests.insert((frame, connection, id)); + + match connection { + None => { + super::broadcast(v, ServerMessage::RequestState { frame }, Some(id)).await; + }, + Some(id) => { // it's to a specific connection + let Some(Some(client)) = v.get(id) else { + return; + }; + client.send(ServerMessage::RequestState { frame }).await.ok(); + }, + } + }, + // a client responded to a request for game state, tell all the requestees + ClientMessage::State { state, frame } => { + let mut new_requests = HashSet::new(); + for (fr, conn, cid) in requests.drain() { + if + fr != frame || // this isn't the requested frame + (conn.is_some() && Some(id) != conn) // this isn't the requested connection + { + new_requests.insert((fr, conn, cid)); + continue; + } + if let Some(Some(client)) = v.get(cid) { + client.send(ServerMessage::State { + state: state.clone(), + frame, + connection: Some(id), + }).await.ok(); + } + } + *requests = new_requests; + }, + // a client said what frame they're on, actually send the connections message + ClientMessage::Frame { frame } => { + for (added, removed) in pending.into_iter() { + send_connections(v, *added, *removed, frame).await; + } + *pending = Vec::new(); + }, + ClientMessage::Ping { frame } => { + if let Some(Some(client)) = v.get(id) { + client.send(ServerMessage::Pong { frame }).await.ok(); + } + } + } +} diff --git a/src/room/messages.rs b/src/room/messages.rs new file mode 100644 index 0000000..72958a6 --- /dev/null +++ b/src/room/messages.rs @@ -0,0 +1,69 @@ +use serde::{Serialize, Deserialize}; +use serde_json::Value; + +#[derive(Deserialize, Clone, Debug)] +#[serde(tag = "type")] +pub enum ClientMessage { + #[serde(rename = "frame")] + Frame { + frame: u64, + }, + #[serde(rename = "input")] + Input { + data: Value, + frame: u64, + }, + #[serde(rename = "requeststate")] + RequestState { + connection: Option<usize>, + frame: u64, + }, + #[serde(rename = "state")] + State { + state: Value, + frame: u64, + }, + #[serde(rename = "ping")] + Ping { + frame: u64, + }, +} + +#[derive(Serialize, Clone, Debug)] +#[serde(tag = "type")] +pub enum ServerMessage { + #[serde(rename = "framerequest")] + FrameRequest, + #[serde(rename = "connections")] + Connections { + connections: Vec<usize>, + added: Option<usize>, + removed: Option<usize>, + id: usize, + frame: u64, + }, + #[serde(rename = "input")] + Input { + data: Value, + frame: u64, + connection: usize, + }, + #[serde(rename = "requeststate")] + RequestState { + frame: u64, + }, + #[serde(rename = "state")] + State { + state: Value, + frame: u64, + connection: Option<usize>, + }, + #[serde(rename = "pong")] + Pong { + frame: u64, + }, + #[serde(rename = "error")] + Error { + error: String, + }, +} diff --git a/src/room/mod.rs b/src/room/mod.rs new file mode 100644 index 0000000..8b3d8c2 --- /dev/null +++ b/src/room/mod.rs @@ -0,0 +1,128 @@ +use std::{time::Duration, collections::HashSet}; + +use axum::extract::ws::WebSocket; +use tokio::sync::mpsc; + +mod websocket; +mod messages; +mod handle; + +use messages::{ClientMessage, ServerMessage}; + +pub enum RoomMessage { + Add(WebSocket), + Remove(usize), + WsMessage(usize, ClientMessage), +} + +pub type Client = mpsc::Sender<ServerMessage>; +pub type Clients = Vec<Option<Client>>; + +pub type Room = mpsc::Sender<RoomMessage>; + +// spawns a task for the room that listens for incoming messages from websockets as well as connections and disconnections +pub fn start_room(room_id: String, room_service: super::rooms::RoomService) -> Room { + let (tx, rx) = mpsc::channel::<RoomMessage>(20); + + let txret = tx.clone(); + + tokio::spawn(room_task(tx, rx, room_id, room_service)); + + txret +} + +async fn room_task(tx: mpsc::Sender<RoomMessage>, mut rx: mpsc::Receiver<RoomMessage>, room_id: String, room_service: super::rooms::RoomService) { + let mut ws = Vec::new(); + let mut state_requests = HashSet::new(); + let mut pending: Vec<(Option<usize>, Option<usize>)> = Vec::new(); + + while let Some(message) = rx.recv().await { + match message { + RoomMessage::Add(w) => { // a new connection is added + // create channels for the websocket and start a task to send and receive from it + let (wstx, wsrx) = mpsc::channel(5); + let id = ws.len(); + ws.push(Some(wstx)); + tokio::spawn(websocket::start_ws(w, id, tx.clone(), wsrx)); + + if conn_count(&ws) < 2 { // the first connection is on frame 0 + handle::send_connections(&mut ws, Some(id), None, 0).await; + } else { + // connections need to be added on a specific frame + // so ask the clients for a frame to put this event on + broadcast(&mut ws, ServerMessage::FrameRequest, Some(id)).await; + pending.push((Some(id), None)); + } + }, + RoomMessage::Remove(id) => { // a connection is closed (sent by the websocket task on exiting) + // only remove it if it exists + if let Some(item) = ws.get_mut(id) { + *item = None; + }; + let count = conn_count(&ws); + if count == 0 { // remove rooms once they become empty + room_service.send(super::rooms::RoomServiceRequest::Remove(room_id.clone())).await.ok(); + break; + } + + // disconnections happen on a specific frame, ask the clients for a frame + broadcast(&mut ws, ServerMessage::FrameRequest, None).await; + pending.push((None, Some(id))); + }, + RoomMessage::WsMessage(id, msg) => { // new data from a websocket + handle::handle(&mut ws, &mut state_requests, &mut pending, id, msg).await; + } + } + } +} + +// send the websocket to the room task +pub async fn add_connection(tx: &Room, ws: WebSocket) { + tx.send_timeout(RoomMessage::Add(ws), Duration::from_secs(1)).await.ok(); +} + +pub fn conn_count(v: &Clients) -> usize { + v.iter().filter(|i| i.is_some()).count() +} + +// send a message to all or some of the clients, in parallel rather than series, +// based on a callback +pub async fn send(v: &mut Clients, create_message: impl Fn(usize, &Client) -> Option<ServerMessage>) -> usize { + let tasks = v.iter() + .enumerate() + .map(|(id, c)| { + // send to existing clients + let Some(client) = c.clone() else { + return None; + }; + + let Some(msg) = create_message(id, &client) else { + return None; + }; + + Some(tokio::spawn(async move { + client.send(msg).await.ok(); + })) + }); + + let count = tasks.len(); + // make sure all the tasks complete + for task in tasks { + if let Some(t) = task { + t.await.ok(); + } + } + + count +} + +// send a message to all the websockets in the room (optionally excluding one) +pub async fn broadcast(v: &mut Clients, msg: ServerMessage, except: Option<usize>) -> usize { + send(v, |id, _client| { + if Some(id) == except { + return None; + } + + Some(msg.clone()) + }).await +} diff --git a/src/room/websocket.rs b/src/room/websocket.rs new file mode 100644 index 0000000..50a4537 --- /dev/null +++ b/src/room/websocket.rs @@ -0,0 +1,66 @@ +use std::time::Duration; + +use axum::extract::ws::{WebSocket, Message}; +use tokio::sync::mpsc; + +use super::RoomMessage; +use super::messages::ServerMessage; + +// set up some senders and receivers so that the websocket can receive messages from the task, send messages to the task, and notify the task when it closes +pub async fn start_ws(mut ws: WebSocket, id: usize, tx: mpsc::Sender<RoomMessage>, mut rx: mpsc::Receiver<ServerMessage>) { + loop { + tokio::select! { + m = ws.recv() => { // receive from the websocket and send it to `tx` + if let Some(Ok(msg)) = m { + // get the string contents + let optionstring = match msg { + Message::Text(s) => { + Some(s) + }, + Message::Binary(bin) => { + String::from_utf8(bin).ok() + }, + Message::Close(_) => { // quit the whole loop on disconnect + break; + }, + _ => None + }; + + // ignore things that aren't strings + let Some(s) = optionstring else { + continue; + }; + + // decode and send to the room + match serde_json::from_str(&s) { + Ok(message) => { + tx.send_timeout(RoomMessage::WsMessage(id, message), Duration::from_secs(1)).await.ok(); + }, + Err(e) => { // let the client know if they sent a bad message + if let Ok(text) = serde_json::to_string(&ServerMessage::Error{ + error: format!("Failed to decode JSON message: {}: {}", e, s), + }) { + ws.send(Message::Text(text)).await.ok(); + } + } + } + } else { // websocket error + break; + } + } + s = rx.recv() => { // receive from `rx` and send it to the websocket + if let Some(msg) = s { + if let Ok(string) = serde_json::to_string(&msg) { + ws.send(Message::Text(string)).await.ok(); + } + } else { // shouldn't happen but this is if the room drops the sender, it should close the websocket anyways + break; + } + } + } + } + + // websocket disconnect due to either error or normal disconnect + // notify the room that the socket should be removed + tx.send_timeout(RoomMessage::Remove(id), Duration::from_secs(1)).await.ok(); +} diff --git a/src/rooms.rs b/src/rooms.rs new file mode 100644 index 0000000..c8199d1 --- /dev/null +++ b/src/rooms.rs @@ -0,0 +1,59 @@ +use std::collections::HashMap; + +use axum::extract::ws::WebSocket; +use tokio::sync::mpsc; +use tokio::sync::oneshot; + +use super::room; + +pub enum RoomServiceRequest { + Exists(String, oneshot::Sender<bool>), + Join(String, WebSocket), + Remove(String), +} + +pub type RoomService = mpsc::Sender<RoomServiceRequest>; + +type RoomMap = HashMap<String, room::Room>; + +async fn handle_room_server_message(rooms: &mut RoomMap, req: RoomServiceRequest, tx: RoomService) { + match req { + // check whether a given room exists + // the sender must provide a tokio::sync::oneshot sender to receive a response + RoomServiceRequest::Exists(code, reply) => { + reply.send(rooms.get(&code).is_some()).ok(); + }, + // send a websocket into the given room, starting it if it doesn't exist + RoomServiceRequest::Join(code, ws) => { + let room = match rooms.get(&code) { + Some(rm) => rm, + None => { + let rm = room::start_room(code.clone(), tx); + rooms.insert(code.clone(), rm); + &rooms[&code] + } + }; + + room::add_connection(room, ws).await; + }, + // remove a room (called by the room task itself once there are no more connections to it) + RoomServiceRequest::Remove(code) => { + rooms.remove(&code); + } + } +} + +// a task to manage a hashmap holding the room task senders +// returns a sender to interface with the task +pub fn start_room_server() -> RoomService { + let (tx, mut rx) = mpsc::channel::<RoomServiceRequest>(10); + let txret = tx.clone(); + + tokio::spawn(async move { + let mut rooms: RoomMap = HashMap::new(); + while let Some(req) = rx.recv().await { + handle_room_server_message(&mut rooms, req, tx.clone()).await; + } + }); + txret +} diff --git a/src/routes.rs b/src/routes.rs new file mode 100644 index 0000000..1fa16c7 --- /dev/null +++ b/src/routes.rs @@ -0,0 +1,51 @@ +use axum::{ + extract::{ws::WebSocketUpgrade, Path}, + routing::get, + response::Response, + Router, Extension, +}; +use tokio::sync::oneshot; +use tower_http::services::ServeDir; + +use super::rooms; + +pub fn routes() -> Router { + let room_server: rooms::RoomService = rooms::start_room_server(); + + Router::new() + .route("/api/check", get(|| async {"ok"})) + .route("/api/exists/:code", get(game_exists)) + .route("/api/join/:code", get(game_join)) + .nest_service("/", ServeDir::new("client")) + .layer(Extension(room_server)) +} + +// check if a given room code exists already +async fn game_exists( + Path(code): Path<String>, + Extension(room_server): Extension<rooms::RoomService> +) -> &'static str { + let (tx, rx) = oneshot::channel(); + room_server.send(rooms::RoomServiceRequest::Exists(code, tx)).await.ok(); + + if let Ok(res) = rx.await { + if res { + "true" + } else { + "false" + } + } else { + return "error"; + } +} + +// start a websocket connection and join it to the room +async fn game_join( + Path(code): Path<String>, + ws: WebSocketUpgrade, + Extension(room_server): Extension<rooms::RoomService> +) -> Response { + ws.on_upgrade(|s| async move { + room_server.send(rooms::RoomServiceRequest::Join(code, s)).await.ok(); + }) +} |