diff --git a/Cargo.lock b/Cargo.lock index f60505e..1f4009e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1796,6 +1796,19 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "env_logger" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + [[package]] name = "epaint" version = "0.24.1" @@ -1826,16 +1839,6 @@ dependencies = [ "serde", ] -[[package]] -name = "errno" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" -dependencies = [ - "libc", - "windows-sys 0.52.0", -] - [[package]] name = "error-code" version = "2.3.1" @@ -2482,6 +2485,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "1.1.0" @@ -2631,6 +2640,17 @@ dependencies = [ "mach2", ] +[[package]] +name = "is-terminal" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "itertools" version = "0.12.1" @@ -2810,12 +2830,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "linux-raw-sys" -version = "0.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" - [[package]] name = "local-ip-address" version = "0.5.7" @@ -2982,24 +2996,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "native-tls" -version = "0.2.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" -dependencies = [ - "lazy_static", - "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework", - "security-framework-sys", - "tempfile", -] - [[package]] name = "ndk" version = "0.7.0" @@ -3527,6 +3523,16 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "pretty_env_logger" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "865724d4dbe39d9f3dd3b52b88d859d66bcb2d6a0acfd5ea68a65fb66d4bdc1c" +dependencies = [ + "env_logger", + "log", +] + [[package]] name = "proc-macro-crate" version = "1.3.1" @@ -3688,6 +3694,10 @@ name = "relay-client" version = "0.2.0" dependencies = [ "log", + "mio", + "openssl", + "pretty_env_logger", + "rand", "tungstenite", ] @@ -3709,6 +3719,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "216080ab382b992234dda86873c18d4c48358f5cfcb70fd693d7f6f2131b628b" +[[package]] +name = "ring" +version = "0.17.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74" +dependencies = [ + "cc", + "getrandom", + "libc", + "spin", + "untrusted", + "windows-sys 0.48.0", +] + [[package]] name = "rodio" version = "0.17.3" @@ -3744,16 +3768,57 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" [[package]] -name = "rustix" -version = "0.38.31" +name = "rustls" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" +checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41" dependencies = [ - "bitflags 2.4.2", - "errno", - "libc", - "linux-raw-sys", - "windows-sys 0.52.0", + "log", + "ring", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-native-certs" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4" +dependencies = [ + "base64 0.21.7", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a716eb65e3158e90e17cd93d855216e27bde02745ab842f2cab4a39dba1bacf" + +[[package]] +name = "rustls-webpki" +version = "0.102.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", ] [[package]] @@ -3957,6 +4022,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "spirv" version = "0.2.0+1.5.4" @@ -4045,19 +4116,6 @@ dependencies = [ "slotmap", ] -[[package]] -name = "tempfile" -version = "3.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa" -dependencies = [ - "cfg-if", - "fastrand 2.0.1", - "redox_syscall 0.4.1", - "rustix", - "windows-sys 0.52.0", -] - [[package]] name = "termcolor" version = "1.4.1" @@ -4355,12 +4413,15 @@ dependencies = [ "http 1.0.0", "httparse", "log", - "native-tls", "rand", + "rustls", + "rustls-native-certs", + "rustls-pki-types", "sha1", "thiserror", "url", "utf-8", + "webpki-roots", ] [[package]] @@ -4422,6 +4483,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.0" @@ -4599,6 +4666,15 @@ dependencies = [ "web-sys", ] +[[package]] +name = "webpki-roots" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "weezl" version = "0.1.8" @@ -5136,3 +5212,9 @@ dependencies = [ "quote", "syn 2.0.48", ] + +[[package]] +name = "zeroize" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d" diff --git a/crates/relay-client/Cargo.toml b/crates/relay-client/Cargo.toml index ae07ac4..d93711f 100644 --- a/crates/relay-client/Cargo.toml +++ b/crates/relay-client/Cargo.toml @@ -13,4 +13,8 @@ workspace = true [dependencies] log = "0.4.20" -tungstenite = { version = "0.21.0", features = ["native-tls"] } +mio = { version = "0.8.10", features = ["net", "os-poll"] } +openssl = "0.10.63" +pretty_env_logger = "0.5.0" +rand = "0.8.5" +tungstenite = { version = "0.21.0", features = ["rustls", "rustls-native-certs", "rustls-pki-types", "rustls-tls-native-roots", "rustls-tls-webpki-roots"] } diff --git a/crates/relay-client/src/lib.rs b/crates/relay-client/src/lib.rs index c475d3c..a7a3247 100644 --- a/crates/relay-client/src/lib.rs +++ b/crates/relay-client/src/lib.rs @@ -1,23 +1,108 @@ //! A client to use a relay server. use std::borrow::Cow; -use std::collections::LinkedList; -use std::net::TcpStream; -use std::sync::mpsc::{Receiver, Sender}; +use std::io::ErrorKind; +use std::net::ToSocketAddrs; +use std::sync::mpsc::{channel, Receiver, Sender}; use log::warn; -use tungstenite::{Message, WebSocket}; +use mio::net::TcpStream; +use rand::seq::IteratorRandom; +use tungstenite::handshake::MidHandshake; +use tungstenite::stream::MaybeTlsStream; +use tungstenite::{ClientHandshake, HandshakeError, Message, WebSocket}; +/// The state of a [RelayConnection]. +#[derive(Debug, PartialEq, Eq)] +pub enum RelayConnectionState { + /// The [RelayConnection] is not connected. + Disconnected, + + /// The underlying [TcpStream] is connecting. + Connecting, + + /// The [RelayConnection] is making a tls handshake with the relay server. + TlsHandshaking, + + /// The [RelayConnection] is making a websocket handshake with the relay + /// server. + WebsocketHandshaking, + + /// The [RelayConnection] is connected. + Connected, +} + +/// A connection to a relay server. pub struct RelayConnection { + /// The address of the relay server. + server_address: String, + + /// The receiver part of the send channel. + /// + /// This is used in [RelayConnection::update] to get messages that need to + /// be sent to the relay server. send_receiver: Receiver, + + /// The sender part of the receive channel. + /// + /// This is used in [RelayConnection::send] to store messages that need to + /// be sent to the relay server. send_sender: Sender, - send_buffer: LinkedList, + + /// The receiver part of the receive channel. + /// + /// This is used in [RelayConnection::read] to get messages that have been + /// received from the relay server. receive_receiver: Receiver<(u32, Vec)>, + + /// The sender part of the send channel. + /// + /// This is used in [RelayConnection::update] to store messages that have + /// been received from the relay server. receive_sender: Sender<(u32, Vec)>, - stream: WebSocket, + + /// If the [TcpStream] is not currently connected it will be stored here. + stream: Option, + + /// If the websocket handshake is not complete it will be stored here. + handshake: Option>>>, + + /// When the websocket is correctly connected it will be stored here. + socket: Option>>, } impl RelayConnection { + /// Create a new [RelayConnection]. + pub fn new(server_address: String) -> Self { + let (send_sender, send_receiver) = channel(); + let (receive_sender, receive_receiver) = channel(); + Self { + server_address, + send_receiver, + send_sender, + receive_receiver, + receive_sender, + stream: None, + handshake: None, + socket: None, + } + } + + /// Returns the state of the [RelayConnection]. + pub fn state(&self) -> RelayConnectionState { + match ( + self.stream.is_some(), + self.handshake.is_some(), + self.socket.is_some(), + ) { + (false, false, false) => RelayConnectionState::Disconnected, + (true, false, false) => RelayConnectionState::Connecting, + (false, true, false) => RelayConnectionState::WebsocketHandshaking, + (false, false, true) => RelayConnectionState::Connected, + _ => unreachable!(), + } + } + /// Send a message to the target client. pub fn send(&self, target_id: u32, message: Cow<[u8]>) { let mut data = message.into_owned(); @@ -30,18 +115,266 @@ impl RelayConnection { self.receive_receiver.try_recv().ok() } - /// Update the [RelayConnection] by sending and receiving messages. + /// Update the [RelayConnection]. + /// + /// This function will connect to the relay server if it's not already + /// connected, and will send and receive messages from the relay server + /// if it's connected. pub fn update(&mut self) { - while let Ok(message) = self.send_receiver.try_recv() { - match self.stream.send(message) { - Ok(()) => (), - Err(tungstenite::Error::WriteBufferFull(frame)) => { - self.send_buffer.push_back(frame) - } - Err(e) => { - warn!("Relay connection closed with error: {}", e); + match ( + self.stream.take(), + self.handshake.take(), + self.socket.as_mut(), + ) { + (None, None, None) => { + // Resolve the relay address list. + let mut address = self.server_address.clone(); + address.push_str(":443"); + let address_list = match address.to_socket_addrs() { + Ok(address_list) => address_list, + Err(e) => { + warn!("failed to resolve relay address: {e}"); + return; + } + }; + + // Take a random relay address. + let Some(address) = address_list.choose(&mut rand::thread_rng()) else { + warn!("no relay address available"); + return; + }; + + // Start the connection to the relay. + match TcpStream::connect(address) { + Ok(stream) => self.stream = Some(stream), + Err(e) => warn!("failed to start connection to the relay server: {e}"), } } + (Some(stream), None, None) => { + // Check if there is an error while connecting. + if let Ok(Some(e)) | Err(e) = stream.take_error() { + warn!("failed to connect to relay: {e}"); + return; + } + + // Check if the stream is connected. + match stream.peer_addr() { + Ok(_) => { + // Start the websocket handshake. + match tungstenite::client_tls( + format!("wss://{}", self.server_address), + stream, + ) { + Ok((socket, _)) => self.socket = Some(socket), + Err(HandshakeError::Interrupted(handshake)) => { + self.handshake = Some(handshake); + } + Err(HandshakeError::Failure(e)) => { + warn!("relay handshake failed: {e}") + } + } + } + Err(ref e) if e.kind() == ErrorKind::NotConnected => { + self.stream = Some(stream); + } + Err(e) => warn!("failed to connect to relay: {e}"), + } + } + (None, Some(handshake), None) => { + // Check if the handshake is complete. + match handshake.handshake() { + Ok((socket, _)) => self.socket = Some(socket), + Err(HandshakeError::Interrupted(unfinished_handshake)) => { + self.handshake = Some(unfinished_handshake); + } + Err(HandshakeError::Failure(e)) => { + warn!("relay websocket handshake failed: {e}") + } + } + } + (None, None, Some(socket)) => { + // Send messages from the send channel to the socket. + while let Ok(message) = self.send_receiver.try_recv() { + match socket.send(message) { + Ok(()) => (), + Err(tungstenite::Error::Io(ref e)) + if e.kind() == std::io::ErrorKind::WouldBlock + || e.kind() == std::io::ErrorKind::Interrupted => + { + break; + } + Err(e) => { + warn!("relay connection closed with error: {e}"); + self.socket = None; + return; + } + } + } + + // Receive messages from the socket and send them to the receive channel. + loop { + match socket.read() { + Ok(message) => { + // Check the message length. + let mut data = message.into_data(); + if data.len() < 4 { + warn!("received malformed message with length: {}", data.len()); + continue; + } + + // Extract the sender ID. + let id_start = data.len() - 4; + let sender_id = u32::from_be_bytes( + data[id_start..] + .try_into() + .unwrap_or_else(|_| unreachable!()), + ); + data.truncate(id_start); + + // Send the message to the receive channel. + self.receive_sender.send((sender_id, data)).ok(); + } + Err(tungstenite::Error::Io(ref e)) + if e.kind() == std::io::ErrorKind::WouldBlock + || e.kind() == std::io::ErrorKind::Interrupted => + { + break; + } + Err(e) => { + warn!("relay connection closed with error: {e}"); + self.socket = None; + return; + } + } + } + } + _ => unreachable!(), } } + + // pub fn update(&mut self) { + // // If there's an unconnected stream, wait for it to be connected. + // if let Some(unconnected_stream) = self.unconnected_stream.take() { + // match unconnected_stream.peer_addr() { + // Ok(_) => { + // // Start the handshake. + // match tungstenite::client::client("wss://relay.cocosol.fr", + // unconnected_stream) { + // Ok((socket, _)) => self.socket = Some(socket), + // Err(HandshakeError::Interrupted(unfinished_handshake)) => + // { self.unfinished_handshake = + // Some(unfinished_handshake); return; + // } + // Err(HandshakeError::Failure(e)) => { + // warn!("relay handshake failed: {}", e); + // return; + // } + // } + // } + // Err(ref e) if e.kind() == std::io::ErrorKind::NotConnected => { + // self.unconnected_stream = Some(unconnected_stream); + // } + // Err(e) => warn!("failed to get peer address: {}", e), + // } + // } + + // // If there's an unfinished handshake, try to finish it. + // if let Some(unfinished_handshake) = self.unfinished_handshake.take() { + // match unfinished_handshake.handshake() { + // Ok((socket, _)) => self.socket = Some(socket), + // Err(HandshakeError::Interrupted(unfinished_handshake)) => { + // self.unfinished_handshake = Some(unfinished_handshake) + // } + // Err(HandshakeError::Failure(e)) => warn!("relay handshake failed: + // {}", e), } + // } + + // // If there's no socket yet, try to connect. + // let socket = match self.socket { + // Some(ref mut socket) => socket, + // None => { + // // Resolve the relay address list. + // let address_list = match "relay.cocosol.fr:443".to_socket_addrs() + // { Ok(address_list) => address_list, + // Err(e) => { + // warn!("failed to resolve relay address: {}", e); + // return; + // } + // }; + + // // Take a random relay address. + // let Some(address) = address_list.choose(&mut rand::thread_rng()) + // else { warn!("no relay address available"); + // return; + // }; + + // // Create a [TcpStream] connected to the relay. + // self.unconnected_stream = match TcpStream::connect(address) { + // Ok(stream) => Some(stream), + // Err(e) => { + // warn!("failed to connect to relay: {}", e); + // return; + // } + // }; + + // // Return because the socket is not connected yet. + // return; + // } + // }; + + // // Send messages from the send channel to the socket. + // while let Ok(message) = self.send_receiver.try_recv() { + // match socket.send(message) { + // Ok(()) => (), + // Err(tungstenite::Error::Io(ref e)) + // if e.kind() == std::io::ErrorKind::WouldBlock + // || e.kind() == std::io::ErrorKind::Interrupted => + // { + // break; + // } + // Err(e) => { + // warn!("relay connection closed with error: {}", e); + // self.socket = None; + // return; + // } + // } + // } + + // // Receive messages from the socket and send them to the receive channel. + // loop { + // match socket.read() { + // Ok(message) => { + // // Check the message length. + // let mut data = message.into_data(); + // if data.len() < 4 { + // warn!("received malformed message: {}", data.len()); + // continue; + // } + + // // Extract the sender ID. + // let id_start = data.len() - 4; + // let sender_id = u32::from_be_bytes( + // data[id_start..] + // .try_into() + // .unwrap_or_else(|_| unreachable!()), + // ); + // data.truncate(id_start); + + // // Send the message to the receive channel. + // self.receive_sender.send((sender_id, data)).ok(); + // } + // Err(tungstenite::Error::Io(ref e)) + // if e.kind() == std::io::ErrorKind::WouldBlock + // || e.kind() == std::io::ErrorKind::Interrupted => + // { + // break; + // } + // Err(e) => { + // warn!("relay connection closed with error: {}", e); + // self.socket = None; + // return; + // } + // } + // } + // } } diff --git a/crates/relay-client/src/main.rs b/crates/relay-client/src/main.rs new file mode 100644 index 0000000..73b7ad9 --- /dev/null +++ b/crates/relay-client/src/main.rs @@ -0,0 +1,20 @@ +//! TODO + +use std::net::TcpStream; +use std::thread; +use std::time::Duration; + +use relay_client::RelayConnection; + +fn main() { + pretty_env_logger::init(); + + let mut connection = RelayConnection::new("relay.cocosol.fr".to_string()); + + loop { + connection.update(); + if let Some((sender_id, data)) = connection.read() { + println!("Received message from {sender_id}: {:?}", data); + } + } +}