1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
use std::{time::Duration, collections::HashSet};
use axum::extract::ws::WebSocket;
use tokio::sync::mpsc;
mod websocket;
mod messages;
mod handle;
use messages::{ClientMessage, ServerMessage};
pub enum RoomMessage {
Add(WebSocket),
Remove(usize),
WsMessage(usize, ClientMessage),
}
pub type Client = mpsc::Sender<ServerMessage>;
pub type Clients = Vec<Option<Client>>;
pub type Room = mpsc::Sender<RoomMessage>;
// spawns a task for the room that listens for incoming messages from websockets as well as connections and disconnections
pub fn start_room(room_id: String, room_service: super::rooms::RoomService) -> Room {
let (tx, rx) = mpsc::channel::<RoomMessage>(20);
let txret = tx.clone();
tokio::spawn(room_task(tx, rx, room_id, room_service));
txret
}
async fn room_task(tx: mpsc::Sender<RoomMessage>, mut rx: mpsc::Receiver<RoomMessage>, room_id: String, room_service: super::rooms::RoomService) {
let mut ws = Vec::new();
let mut state_requests = HashSet::new();
let mut pending: Vec<(Option<usize>, Option<usize>)> = Vec::new();
while let Some(message) = rx.recv().await {
match message {
RoomMessage::Add(w) => { // a new connection is added
// create channels for the websocket and start a task to send and receive from it
let (wstx, wsrx) = mpsc::channel(5);
let id = ws.len();
ws.push(Some(wstx));
tokio::spawn(websocket::start_ws(w, id, tx.clone(), wsrx));
if conn_count(&ws) < 2 { // the first connection is on frame 0
handle::send_connections(&mut ws, Some(id), None, 0).await;
} else {
// connections need to be added on a specific frame
// so ask the clients for a frame to put this event on
broadcast(&mut ws, ServerMessage::FrameRequest, Some(id)).await;
pending.push((Some(id), None));
}
},
RoomMessage::Remove(id) => { // a connection is closed (sent by the websocket task on exiting)
// only remove it if it exists
if let Some(item) = ws.get_mut(id) {
*item = None;
};
let count = conn_count(&ws);
if count == 0 { // remove rooms once they become empty
room_service.send(super::rooms::RoomServiceRequest::Remove(room_id.clone())).await.ok();
break;
}
// disconnections happen on a specific frame, ask the clients for a frame
broadcast(&mut ws, ServerMessage::FrameRequest, None).await;
pending.push((None, Some(id)));
},
RoomMessage::WsMessage(id, msg) => { // new data from a websocket
handle::handle(&mut ws, &mut state_requests, &mut pending, id, msg).await;
}
}
}
}
// send the websocket to the room task
pub async fn add_connection(tx: &Room, ws: WebSocket) {
tx.send_timeout(RoomMessage::Add(ws), Duration::from_secs(1)).await.ok();
}
pub fn conn_count(v: &Clients) -> usize {
v.iter().filter(|i| i.is_some()).count()
}
// send a message to all or some of the clients, in parallel rather than series,
// based on a callback
pub async fn send(v: &mut Clients, create_message: impl Fn(usize, &Client) -> Option<ServerMessage>) -> usize {
let tasks = v.iter()
.enumerate()
.map(|(id, c)| {
// send to existing clients
let Some(client) = c.clone() else {
return None;
};
let Some(msg) = create_message(id, &client) else {
return None;
};
Some(tokio::spawn(async move {
client.send(msg).await.ok();
}))
});
let count = tasks.len();
// make sure all the tasks complete
for task in tasks {
if let Some(t) = task {
t.await.ok();
}
}
count
}
// send a message to all the websockets in the room (optionally excluding one)
pub async fn broadcast(v: &mut Clients, msg: ServerMessage, except: Option<usize>) -> usize {
send(v, |id, _client| {
if Some(id) == except {
return None;
}
Some(msg.clone())
}).await
}
|