websocket

This commit is contained in:
Murphy 2023-08-21 15:16:48 -04:00
parent a2c89301d5
commit 2adbe0d999
No known key found for this signature in database
GPG key ID: D1EC66EA6BAA4C47
6 changed files with 397 additions and 44 deletions

203
Cargo.lock generated
View file

@ -21,13 +21,13 @@ dependencies = [
[[package]]
name = "async-trait"
version = "0.1.63"
version = "0.1.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eff18d764974428cf3a9328e23fc5c986f5fbed46e6cd4cdf42544df5d297ec1"
checksum = "cc6dde6e4ed435a4c1ee4e73592f5ba9da2151af10076cc04858746af9352d09"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.27",
]
[[package]]
@ -38,12 +38,13 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.6.4"
version = "0.6.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5694b64066a2459918d8074c2ce0d5a88f409431994c2356617c8ae0c4721fc"
checksum = "a6a1de45611fdb535bfde7b7de4fd54f4fd2b17b1737c0a59b69bf9b92074b8c"
dependencies = [
"async-trait",
"axum-core",
"base64 0.21.2",
"bitflags",
"bytes",
"futures-util",
@ -62,19 +63,20 @@ dependencies = [
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sha1",
"sync_wrapper",
"tokio",
"tokio-tungstenite",
"tower",
"tower-http",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.3.2"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cae3e661676ffbacb30f1a824089a8c9150e71017f7e1e38f2aa32009188d34"
checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c"
dependencies = [
"async-trait",
"bytes",
@ -93,6 +95,12 @@ version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.21.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d"
[[package]]
name = "bit_field"
version = "0.10.1"
@ -244,6 +252,12 @@ dependencies = [
"typenum",
]
[[package]]
name = "data-encoding"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"
[[package]]
name = "digest"
version = "0.10.6"
@ -336,41 +350,42 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.26"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec90ff4d0fe1f57d600049061dc6bb68ed03c7d2fbd697274c41805dcb3f8608"
checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
[[package]]
name = "futures-macro"
version = "0.3.26"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95a73af87da33b5acf53acfebdc339fe592ecf5357ac7c0a7734ab9d8c876a70"
checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.27",
]
[[package]]
name = "futures-sink"
version = "0.3.26"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f310820bb3e8cfd46c80db4d7fb8353e15dfff853a127158425f31e0be6c8364"
checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e"
[[package]]
name = "futures-task"
version = "0.3.26"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcf79a1bf610b10f42aea489289c5a2c478a786509693b80cd39c44ccd936366"
checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
[[package]]
name = "futures-util"
version = "0.3.26"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c1d6de3acfef38d2be4b1f543f553131788603495be83da675e180c8d6b7bd1"
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
dependencies = [
"futures-core",
"futures-macro",
"futures-sink",
"futures-task",
"pin-project-lite",
"pin-utils",
@ -443,7 +458,7 @@ version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3e372db8e5c0d213e0cd0b9be18be2aca3d44cf2fe30a9d46a65581cd454584"
dependencies = [
"base64",
"base64 0.13.1",
"bitflags",
"bytes",
"headers-core",
@ -473,9 +488,9 @@ dependencies = [
[[package]]
name = "http"
version = "0.2.8"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399"
checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482"
dependencies = [
"bytes",
"fnv",
@ -513,9 +528,9 @@ checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
name = "hyper"
version = "0.14.23"
version = "0.14.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "034711faac9d2166cb1baf1a2fb0b60b1f277f8492fd72176c17f3515e1abd3c"
checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468"
dependencies = [
"bytes",
"futures-channel",
@ -534,6 +549,16 @@ dependencies = [
"want",
]
[[package]]
name = "idna"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "image"
version = "0.24.5"
@ -800,7 +825,7 @@ checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 1.0.107",
]
[[package]]
@ -841,18 +866,18 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "proc-macro2"
version = "1.0.50"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2"
checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.23"
version = "1.0.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b"
checksum = "50f3b39ccfb720540debaa0164757101c08ecb8d326b15358ce76a62c7e85965"
dependencies = [
"proc-macro2",
]
@ -973,7 +998,7 @@ checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 1.0.107",
]
[[package]]
@ -1082,12 +1107,43 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "syn"
version = "2.0.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b60f673f44a8255b9c8c657daf66a596d435f2da81a555b06dc644d080ba45e0"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "sync_wrapper"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
[[package]]
name = "thiserror"
version = "1.0.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "611040a08a0439f8248d1990b111c95baa9c704c805fa1f62104b39655fd7f90"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.27",
]
[[package]]
name = "thread_local"
version = "1.1.4"
@ -1144,6 +1200,21 @@ dependencies = [
"time-core",
]
[[package]]
name = "tinyvec"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
dependencies = [
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.25.0"
@ -1172,7 +1243,19 @@ checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 1.0.107",
]
[[package]]
name = "tokio-tungstenite"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec509ac96e9a0c43427c74f003127d953a265737636129424288d27cb5c4b12c"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite",
]
[[package]]
@ -1241,7 +1324,6 @@ dependencies = [
"pin-project-lite",
"tokio",
"tokio-util",
"tower",
"tower-layer",
"tower-service",
]
@ -1279,7 +1361,7 @@ checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 1.0.107",
]
[[package]]
@ -1323,6 +1405,25 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"
[[package]]
name = "tungstenite"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15fba1a6d6bb030745759a9a2a588bfe8490fc8b4751a277db3a0be1c9ebbf67"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http",
"httparse",
"log",
"rand",
"sha1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.16.0"
@ -1338,12 +1439,44 @@ dependencies = [
"version_check",
]
[[package]]
name = "unicode-bidi"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460"
[[package]]
name = "unicode-ident"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc"
[[package]]
name = "unicode-normalization"
version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921"
dependencies = [
"tinyvec",
]
[[package]]
name = "url"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643"
dependencies = [
"form_urlencoded",
"idna",
"percent-encoding",
]
[[package]]
name = "utf-8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "valuable"
version = "0.1.0"
@ -1399,7 +1532,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn",
"syn 1.0.107",
"wasm-bindgen-shared",
]
@ -1421,7 +1554,7 @@ checksum = "07bc0c051dc5f23e307b13285f9d75df86bfdf816c5721e573dec1f9b8aa193c"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 1.0.107",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]

View file

@ -5,7 +5,7 @@ edition = "2021"
[dependencies]
tokio = { version = "1.25.0", features = ["full"] }
axum = { version = "0.6.4", features = ["headers", "query"] }
axum = { version = "0.6.12", features = ["headers", "query", "ws"] }
tower-http = { version = "0.3.5", features = ["fs"] }
tower-cookies = "0.8.0"
tower = "0.4.13"

View file

@ -123,6 +123,10 @@ h2 {
background-color: #bfa354;
}
.get {
background-color: #00cc00;
}
.delete {
background-color: #cc0000;
}

View file

@ -1,13 +1,63 @@
use axum::{response::Response, Router, routing::{post, patch, delete}};
use std::collections::HashMap;
use axum::{response::Response, Router, routing::{post, patch, delete, get}, extract::{ws::Message, WebSocketUpgrade}};
use serde::Deserialize;
use tokio::sync::{Mutex, mpsc::{Sender, self}};
use crate::{
public::docs::{EndpointDocumentation, EndpointMethod},
types::{
extract::{AuthorizedUser, Check, CheckResult, Database, Json},
http::ResponseCode,
chat::ChatRoom, user::User,
chat::{ChatRoom, ChatEvent}, user::User,
},
};
use std::collections::hash_map::Values;
use lazy_static::lazy_static;
lazy_static!(
static ref CONNECTIONS: Mutex<HashMap<u64, ConnectionPool>> = Mutex::new(HashMap::new());
);
struct ConnectionPool {
inner: HashMap<usize, Sender<ChatEvent>>,
index: usize
}
impl ConnectionPool {
fn new() -> Self {
Self {
inner: HashMap::new(),
index: 0
}
}
fn add(&mut self, send: Sender<ChatEvent>) -> usize {
let idx = self.index;
self.index += 1;
self.inner.insert(idx, send);
idx
}
fn del(&mut self, idx: &usize) {
self.inner.remove(idx);
}
fn values(&self) -> Values<'_, usize, Sender<ChatEvent>> {
self.inner.values()
}
}
async fn send_event(event: ChatEvent, room: &ChatRoom) {
for user in &room.users {
let lock = CONNECTIONS.lock().await;
let Some(connection) = lock.get(&user) else {
continue
};
for channel in connection.values() {
channel.send(event.clone()).await.ok();
}
}
}
pub const CHAT_LIST: EndpointDocumentation = EndpointDocumentation {
uri: "/api/chat/list",
@ -80,11 +130,18 @@ async fn create (
Database(db): Database,
Json(body): Json<RoomCreateRequest>,
) -> Response {
let Ok(post) = ChatRoom::new(&db, vec![user.user_id], body.name) else {
let Ok(room) = ChatRoom::new(&db, vec![user.user_id], body.name) else {
return ResponseCode::InternalServerError.text("Failed to create room")
};
let Ok(json) = serde_json::to_string(&post) else {
for user in &room.users {
send_event(ChatEvent::Add {
user_id: *user,
room_id: room.room_id
}, &room).await;
}
let Ok(json) = serde_json::to_string(&room) else {
return ResponseCode::InternalServerError.text("Failed to create room")
};
@ -145,6 +202,11 @@ async fn add (
if !success {
return ResponseCode::BadRequest.text("User is already in the room")
}
send_event(ChatEvent::Add {
user_id: to_add.user_id,
room_id: room.room_id
}, &room).await;
ResponseCode::Success.text("Successfully added user")
}
@ -197,6 +259,11 @@ async fn leave (
if !success {
return ResponseCode::BadRequest.text("You are currently not in this room (how did this happen?)")
}
send_event(ChatEvent::Leave {
user_id: user.user_id,
room_id: room.room_id
}, &room).await;
ResponseCode::Success.text("Successfully left room")
}
@ -250,9 +317,17 @@ async fn send (
return ResponseCode::BadRequest.text("Room doesnt exist or you are not in it")
};
let Ok(_msg) = room.send_message(&db, user.user_id, body.content) else {
let Ok(msg) = room.send_message(&db, user.user_id, body.content) else {
return ResponseCode::InternalServerError.text("Failed to send message")
};
send_event(ChatEvent::Message {
user_id: msg.user_id,
room_id: msg.room_id,
message_id: msg.message_id,
content: msg.content,
date: msg.date
}, &room).await;
ResponseCode::Created.text("Successfully sent message")
}
@ -302,7 +377,7 @@ async fn load (
return ResponseCode::BadRequest.text("Room doesnt exist or you are not in it")
};
let Ok(msgs) = room.load_old_chat_messagegs(&db, body.newest_msg, body.page) else {
let Ok(msgs) = room.load_old_chat_messages(&db, body.newest_msg, body.page) else {
return ResponseCode::InternalServerError.text("Failed to load messages")
};
@ -313,6 +388,110 @@ async fn load (
ResponseCode::Created.json(&json)
}
pub const CHAT_TYPING: EndpointDocumentation = EndpointDocumentation {
uri: "/api/chat/typing",
method: EndpointMethod::Post,
description: "Set if your typing in a given room",
body: Some(
r#"
{
"room_id": 69,
}
"#,
),
responses: &[
(201, "Successfully sent typing indicator"),
(400, "Body does not match parameters"),
(401, "Unauthorized"),
(500, "Failed to send typing indicator"),
],
cookie: Some("auth"),
};
#[derive(Deserialize)]
struct TypingRequest {
room_id: u64,
}
impl Check for TypingRequest {
fn check(&self) -> CheckResult {
Ok(())
}
}
async fn typing (
AuthorizedUser(user): AuthorizedUser,
Database(db): Database,
Json(body): Json<TypingRequest>,
) -> Response {
let Ok(room) = ChatRoom::from_user_and_room_id(&db, user.user_id, body.room_id) else {
return ResponseCode::BadRequest.text("Room doesnt exist or you are not in it")
};
send_event(ChatEvent::Typing {
user_id: user.user_id,
room_id: room.room_id,
}, &room).await;
ResponseCode::Success.text("Successfully sent typing indicator")
}
pub const CHAT_CONNECT: EndpointDocumentation = EndpointDocumentation {
uri: "/api/chat/connect",
method: EndpointMethod::Get,
description: "Start a websocket connection for chat events",
body: None,
responses: &[],
cookie: Some("auth"),
};
async fn connect (
AuthorizedUser(user): AuthorizedUser,
ws: WebSocketUpgrade
) -> Response {
ws.on_upgrade(|mut ws| async move {
let user = user;
let (send, mut recv) = mpsc::channel::<ChatEvent>(20);
let id: usize;
{
let mut lock = CONNECTIONS.lock().await;
match lock.get_mut(&user.user_id) {
Some(pool) => {
id = pool.add(send);
},
None => {
let mut pool = ConnectionPool::new();
id = pool.add(send);
lock.insert(user.user_id, pool);
}
};
}
loop {
tokio::select! {
m = ws.recv() => {
let Some(Ok(_)) = m else {
break;
};
}
s = recv.recv() => {
let Some(msg) = s else {
break;
};
if let Ok(string) = serde_json::to_string(&msg) {
ws.send(Message::Text(string)).await.ok();
}
}
}
}
let mut lock = CONNECTIONS.lock().await;
if let Some(conn) = lock.get_mut(&user.user_id) {
conn.del(&id);
};
})
}
pub fn router() -> Router {
Router::new()
.route("/create", post(create))
@ -321,4 +500,6 @@ pub fn router() -> Router {
.route("/leave", delete(leave))
.route("/send", post(send))
.route("/load", post(load))
.route("/typing", post(typing))
.route("/connect", get(connect))
}

View file

@ -10,6 +10,7 @@ use crate::{
use super::console::beautify;
pub enum EndpointMethod {
Get,
Post,
Put,
Patch,
@ -19,6 +20,7 @@ pub enum EndpointMethod {
impl ToString for EndpointMethod {
fn to_string(&self) -> String {
match self {
Self::Get => "GET".to_owned(),
Self::Post => "POST".to_owned(),
Self::Put => "PUT".to_owned(),
Self::Patch => "PATCH".to_owned(),
@ -147,6 +149,8 @@ pub async fn init() {
chat::CHAT_LEAVE,
chat::CHAT_SEND,
chat::CHAT_LOAD,
chat::CHAT_TYPING,
chat::CHAT_CONNECT,
admin::ADMIN_AUTH,
admin::ADMIN_QUERY,
admin::ADMIN_POSTS,

View file

@ -1,4 +1,4 @@
use serde::Serialize;
use serde::{Serialize, Deserialize};
use tracing::instrument;
use crate::{types::http::{ResponseCode, Result}, database::Database};
@ -18,6 +18,37 @@ pub struct ChatMessage {
pub content: String
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(tag = "type")]
pub enum ChatEvent {
#[serde(rename = "message")]
Message {
user_id: u64,
message_id: u64,
room_id: u64,
content: String,
date: u64
},
#[serde(rename = "add")]
Add {
user_id: u64,
room_id: u64
},
#[serde(rename = "leave")]
Leave {
user_id: u64,
room_id: u64
},
#[serde(rename = "typing")]
Typing {
user_id: u64,
room_id: u64
}
}
impl ChatRoom {
#[instrument(skip(db))]
@ -87,7 +118,7 @@ impl ChatRoom {
}
#[instrument(skip(db))]
pub fn load_old_chat_messagegs(&self, db: &Database, newest_message: u64, page: u64) -> Result<Vec<ChatMessage>> {
pub fn load_old_chat_messages(&self, db: &Database, newest_message: u64, page: u64) -> Result<Vec<ChatMessage>> {
let Ok(msgs) = db.load_old_chat_messages(self.room_id, newest_message, page) else {
tracing::error!("Failed to load messgaes");
return Err(ResponseCode::InternalServerError.text("Failed to load messages"))