From 543b6973bbdfd0a2222ebbed8c7efd07b76aacbc Mon Sep 17 00:00:00 2001 From: Tipragot Date: Sun, 11 Feb 2024 15:20:41 +0100 Subject: [PATCH 1/7] Save --- Cargo.lock | 151 +++++++++++++++++++++++++++++++++ crates/relay-client/Cargo.toml | 16 ++++ crates/relay-client/src/lib.rs | 47 ++++++++++ 3 files changed, 214 insertions(+) create mode 100644 crates/relay-client/Cargo.toml create mode 100644 crates/relay-client/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 9e70aac..f60505e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1826,6 +1826,16 @@ dependencies = [ "serde", ] +[[package]] +name = "errno" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "error-code" version = "2.3.1" @@ -2800,6 +2810,12 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "linux-raw-sys" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" + [[package]] name = "local-ip-address" version = "0.5.7" @@ -2966,6 +2982,24 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "ndk" version = "0.7.0" @@ -3285,6 +3319,50 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "openssl" +version = "0.10.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15c9d69dd87a29568d4d017cfe8ec518706046a05184e5aea92d0af890b803c8" +dependencies = [ + "bitflags 2.4.2", + "cfg-if", + "foreign-types 0.3.2", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + +[[package]] +name = "openssl-sys" +version = "0.9.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22e1bf214306098e4832460f797824c05d25aacdf896f64a985fb0fd992454ae" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "orbclient" version = "0.3.47" @@ -3605,6 +3683,14 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +[[package]] +name = "relay-client" +version = "0.2.0" +dependencies = [ + "log", + "tungstenite", +] + [[package]] name = "relay-server" version = "0.2.0" @@ -3657,6 +3743,19 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustix" +version = "0.38.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" +dependencies = [ + "bitflags 2.4.2", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.52.0", +] + [[package]] name = "rustversion" version = "1.0.14" @@ -3689,12 +3788,44 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "security-framework" +version = "2.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e932934257d3b408ed8f30db49d85ea163bfe74961f017f405b025af298f0c7a" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.196" @@ -3914,6 +4045,19 @@ dependencies = [ "slotmap", ] +[[package]] +name = "tempfile" +version = "3.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa" +dependencies = [ + "cfg-if", + "fastrand 2.0.1", + "redox_syscall 0.4.1", + "rustix", + "windows-sys 0.52.0", +] + [[package]] name = "termcolor" version = "1.4.1" @@ -4211,6 +4355,7 @@ dependencies = [ "http 1.0.0", "httparse", "log", + "native-tls", "rand", "sha1", "thiserror", @@ -4310,6 +4455,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vec_map" version = "0.8.2" diff --git a/crates/relay-client/Cargo.toml b/crates/relay-client/Cargo.toml new file mode 100644 index 0000000..ae07ac4 --- /dev/null +++ b/crates/relay-client/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "relay-client" +version = "0.2.0" +edition = "2021" +license = "GPL-3.0-or-later" +description = "A client to use a relay server." +authors = ["Tipragot "] +keywords = ["bevy", "network", "game"] +categories = ["network-programming", "game-development"] + +[lints] +workspace = true + +[dependencies] +log = "0.4.20" +tungstenite = { version = "0.21.0", features = ["native-tls"] } diff --git a/crates/relay-client/src/lib.rs b/crates/relay-client/src/lib.rs new file mode 100644 index 0000000..c475d3c --- /dev/null +++ b/crates/relay-client/src/lib.rs @@ -0,0 +1,47 @@ +//! A client to use a relay server. + +use std::borrow::Cow; +use std::collections::LinkedList; +use std::net::TcpStream; +use std::sync::mpsc::{Receiver, Sender}; + +use log::warn; +use tungstenite::{Message, WebSocket}; + +pub struct RelayConnection { + send_receiver: Receiver, + send_sender: Sender, + send_buffer: LinkedList, + receive_receiver: Receiver<(u32, Vec)>, + receive_sender: Sender<(u32, Vec)>, + stream: WebSocket, +} + +impl RelayConnection { + /// Send a message to the target client. + pub fn send(&self, target_id: u32, message: Cow<[u8]>) { + let mut data = message.into_owned(); + data.extend_from_slice(&target_id.to_be_bytes()); + self.send_sender.send(Message::Binary(data)).ok(); + } + + /// Receive a message from the target client. + pub fn read(&self) -> Option<(u32, Vec)> { + self.receive_receiver.try_recv().ok() + } + + /// Update the [RelayConnection] by sending and receiving messages. + pub fn update(&mut self) { + while let Ok(message) = self.send_receiver.try_recv() { + match self.stream.send(message) { + Ok(()) => (), + Err(tungstenite::Error::WriteBufferFull(frame)) => { + self.send_buffer.push_back(frame) + } + Err(e) => { + warn!("Relay connection closed with error: {}", e); + } + } + } + } +} -- 2.43.4 From 787dde0bb7402fd9c3d572950850c355033283aa Mon Sep 17 00:00:00 2001 From: Tipragot Date: Sun, 11 Feb 2024 21:19:42 +0100 Subject: [PATCH 2/7] Save --- Cargo.lock | 194 ++++++++++++----- crates/relay-client/Cargo.toml | 6 +- crates/relay-client/src/lib.rs | 363 ++++++++++++++++++++++++++++++-- crates/relay-client/src/main.rs | 20 ++ 4 files changed, 511 insertions(+), 72 deletions(-) create mode 100644 crates/relay-client/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index f60505e..1f4009e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1796,6 +1796,19 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "env_logger" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + [[package]] name = "epaint" version = "0.24.1" @@ -1826,16 +1839,6 @@ dependencies = [ "serde", ] -[[package]] -name = "errno" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" -dependencies = [ - "libc", - "windows-sys 0.52.0", -] - [[package]] name = "error-code" version = "2.3.1" @@ -2482,6 +2485,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "1.1.0" @@ -2631,6 +2640,17 @@ dependencies = [ "mach2", ] +[[package]] +name = "is-terminal" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "itertools" version = "0.12.1" @@ -2810,12 +2830,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "linux-raw-sys" -version = "0.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" - [[package]] name = "local-ip-address" version = "0.5.7" @@ -2982,24 +2996,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "native-tls" -version = "0.2.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" -dependencies = [ - "lazy_static", - "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework", - "security-framework-sys", - "tempfile", -] - [[package]] name = "ndk" version = "0.7.0" @@ -3527,6 +3523,16 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "pretty_env_logger" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "865724d4dbe39d9f3dd3b52b88d859d66bcb2d6a0acfd5ea68a65fb66d4bdc1c" +dependencies = [ + "env_logger", + "log", +] + [[package]] name = "proc-macro-crate" version = "1.3.1" @@ -3688,6 +3694,10 @@ name = "relay-client" version = "0.2.0" dependencies = [ "log", + "mio", + "openssl", + "pretty_env_logger", + "rand", "tungstenite", ] @@ -3709,6 +3719,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "216080ab382b992234dda86873c18d4c48358f5cfcb70fd693d7f6f2131b628b" +[[package]] +name = "ring" +version = "0.17.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74" +dependencies = [ + "cc", + "getrandom", + "libc", + "spin", + "untrusted", + "windows-sys 0.48.0", +] + [[package]] name = "rodio" version = "0.17.3" @@ -3744,16 +3768,57 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" [[package]] -name = "rustix" -version = "0.38.31" +name = "rustls" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" +checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41" dependencies = [ - "bitflags 2.4.2", - "errno", - "libc", - "linux-raw-sys", - "windows-sys 0.52.0", + "log", + "ring", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-native-certs" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4" +dependencies = [ + "base64 0.21.7", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a716eb65e3158e90e17cd93d855216e27bde02745ab842f2cab4a39dba1bacf" + +[[package]] +name = "rustls-webpki" +version = "0.102.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", ] [[package]] @@ -3957,6 +4022,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "spirv" version = "0.2.0+1.5.4" @@ -4045,19 +4116,6 @@ dependencies = [ "slotmap", ] -[[package]] -name = "tempfile" -version = "3.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa" -dependencies = [ - "cfg-if", - "fastrand 2.0.1", - "redox_syscall 0.4.1", - "rustix", - "windows-sys 0.52.0", -] - [[package]] name = "termcolor" version = "1.4.1" @@ -4355,12 +4413,15 @@ dependencies = [ "http 1.0.0", "httparse", "log", - "native-tls", "rand", + "rustls", + "rustls-native-certs", + "rustls-pki-types", "sha1", "thiserror", "url", "utf-8", + "webpki-roots", ] [[package]] @@ -4422,6 +4483,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.0" @@ -4599,6 +4666,15 @@ dependencies = [ "web-sys", ] +[[package]] +name = "webpki-roots" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "weezl" version = "0.1.8" @@ -5136,3 +5212,9 @@ dependencies = [ "quote", "syn 2.0.48", ] + +[[package]] +name = "zeroize" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d" diff --git a/crates/relay-client/Cargo.toml b/crates/relay-client/Cargo.toml index ae07ac4..d93711f 100644 --- a/crates/relay-client/Cargo.toml +++ b/crates/relay-client/Cargo.toml @@ -13,4 +13,8 @@ workspace = true [dependencies] log = "0.4.20" -tungstenite = { version = "0.21.0", features = ["native-tls"] } +mio = { version = "0.8.10", features = ["net", "os-poll"] } +openssl = "0.10.63" +pretty_env_logger = "0.5.0" +rand = "0.8.5" +tungstenite = { version = "0.21.0", features = ["rustls", "rustls-native-certs", "rustls-pki-types", "rustls-tls-native-roots", "rustls-tls-webpki-roots"] } diff --git a/crates/relay-client/src/lib.rs b/crates/relay-client/src/lib.rs index c475d3c..a7a3247 100644 --- a/crates/relay-client/src/lib.rs +++ b/crates/relay-client/src/lib.rs @@ -1,23 +1,108 @@ //! A client to use a relay server. use std::borrow::Cow; -use std::collections::LinkedList; -use std::net::TcpStream; -use std::sync::mpsc::{Receiver, Sender}; +use std::io::ErrorKind; +use std::net::ToSocketAddrs; +use std::sync::mpsc::{channel, Receiver, Sender}; use log::warn; -use tungstenite::{Message, WebSocket}; +use mio::net::TcpStream; +use rand::seq::IteratorRandom; +use tungstenite::handshake::MidHandshake; +use tungstenite::stream::MaybeTlsStream; +use tungstenite::{ClientHandshake, HandshakeError, Message, WebSocket}; +/// The state of a [RelayConnection]. +#[derive(Debug, PartialEq, Eq)] +pub enum RelayConnectionState { + /// The [RelayConnection] is not connected. + Disconnected, + + /// The underlying [TcpStream] is connecting. + Connecting, + + /// The [RelayConnection] is making a tls handshake with the relay server. + TlsHandshaking, + + /// The [RelayConnection] is making a websocket handshake with the relay + /// server. + WebsocketHandshaking, + + /// The [RelayConnection] is connected. + Connected, +} + +/// A connection to a relay server. pub struct RelayConnection { + /// The address of the relay server. + server_address: String, + + /// The receiver part of the send channel. + /// + /// This is used in [RelayConnection::update] to get messages that need to + /// be sent to the relay server. send_receiver: Receiver, + + /// The sender part of the receive channel. + /// + /// This is used in [RelayConnection::send] to store messages that need to + /// be sent to the relay server. send_sender: Sender, - send_buffer: LinkedList, + + /// The receiver part of the receive channel. + /// + /// This is used in [RelayConnection::read] to get messages that have been + /// received from the relay server. receive_receiver: Receiver<(u32, Vec)>, + + /// The sender part of the send channel. + /// + /// This is used in [RelayConnection::update] to store messages that have + /// been received from the relay server. receive_sender: Sender<(u32, Vec)>, - stream: WebSocket, + + /// If the [TcpStream] is not currently connected it will be stored here. + stream: Option, + + /// If the websocket handshake is not complete it will be stored here. + handshake: Option>>>, + + /// When the websocket is correctly connected it will be stored here. + socket: Option>>, } impl RelayConnection { + /// Create a new [RelayConnection]. + pub fn new(server_address: String) -> Self { + let (send_sender, send_receiver) = channel(); + let (receive_sender, receive_receiver) = channel(); + Self { + server_address, + send_receiver, + send_sender, + receive_receiver, + receive_sender, + stream: None, + handshake: None, + socket: None, + } + } + + /// Returns the state of the [RelayConnection]. + pub fn state(&self) -> RelayConnectionState { + match ( + self.stream.is_some(), + self.handshake.is_some(), + self.socket.is_some(), + ) { + (false, false, false) => RelayConnectionState::Disconnected, + (true, false, false) => RelayConnectionState::Connecting, + (false, true, false) => RelayConnectionState::WebsocketHandshaking, + (false, false, true) => RelayConnectionState::Connected, + _ => unreachable!(), + } + } + /// Send a message to the target client. pub fn send(&self, target_id: u32, message: Cow<[u8]>) { let mut data = message.into_owned(); @@ -30,18 +115,266 @@ impl RelayConnection { self.receive_receiver.try_recv().ok() } - /// Update the [RelayConnection] by sending and receiving messages. + /// Update the [RelayConnection]. + /// + /// This function will connect to the relay server if it's not already + /// connected, and will send and receive messages from the relay server + /// if it's connected. pub fn update(&mut self) { - while let Ok(message) = self.send_receiver.try_recv() { - match self.stream.send(message) { - Ok(()) => (), - Err(tungstenite::Error::WriteBufferFull(frame)) => { - self.send_buffer.push_back(frame) - } - Err(e) => { - warn!("Relay connection closed with error: {}", e); + match ( + self.stream.take(), + self.handshake.take(), + self.socket.as_mut(), + ) { + (None, None, None) => { + // Resolve the relay address list. + let mut address = self.server_address.clone(); + address.push_str(":443"); + let address_list = match address.to_socket_addrs() { + Ok(address_list) => address_list, + Err(e) => { + warn!("failed to resolve relay address: {e}"); + return; + } + }; + + // Take a random relay address. + let Some(address) = address_list.choose(&mut rand::thread_rng()) else { + warn!("no relay address available"); + return; + }; + + // Start the connection to the relay. + match TcpStream::connect(address) { + Ok(stream) => self.stream = Some(stream), + Err(e) => warn!("failed to start connection to the relay server: {e}"), } } + (Some(stream), None, None) => { + // Check if there is an error while connecting. + if let Ok(Some(e)) | Err(e) = stream.take_error() { + warn!("failed to connect to relay: {e}"); + return; + } + + // Check if the stream is connected. + match stream.peer_addr() { + Ok(_) => { + // Start the websocket handshake. + match tungstenite::client_tls( + format!("wss://{}", self.server_address), + stream, + ) { + Ok((socket, _)) => self.socket = Some(socket), + Err(HandshakeError::Interrupted(handshake)) => { + self.handshake = Some(handshake); + } + Err(HandshakeError::Failure(e)) => { + warn!("relay handshake failed: {e}") + } + } + } + Err(ref e) if e.kind() == ErrorKind::NotConnected => { + self.stream = Some(stream); + } + Err(e) => warn!("failed to connect to relay: {e}"), + } + } + (None, Some(handshake), None) => { + // Check if the handshake is complete. + match handshake.handshake() { + Ok((socket, _)) => self.socket = Some(socket), + Err(HandshakeError::Interrupted(unfinished_handshake)) => { + self.handshake = Some(unfinished_handshake); + } + Err(HandshakeError::Failure(e)) => { + warn!("relay websocket handshake failed: {e}") + } + } + } + (None, None, Some(socket)) => { + // Send messages from the send channel to the socket. + while let Ok(message) = self.send_receiver.try_recv() { + match socket.send(message) { + Ok(()) => (), + Err(tungstenite::Error::Io(ref e)) + if e.kind() == std::io::ErrorKind::WouldBlock + || e.kind() == std::io::ErrorKind::Interrupted => + { + break; + } + Err(e) => { + warn!("relay connection closed with error: {e}"); + self.socket = None; + return; + } + } + } + + // Receive messages from the socket and send them to the receive channel. + loop { + match socket.read() { + Ok(message) => { + // Check the message length. + let mut data = message.into_data(); + if data.len() < 4 { + warn!("received malformed message with length: {}", data.len()); + continue; + } + + // Extract the sender ID. + let id_start = data.len() - 4; + let sender_id = u32::from_be_bytes( + data[id_start..] + .try_into() + .unwrap_or_else(|_| unreachable!()), + ); + data.truncate(id_start); + + // Send the message to the receive channel. + self.receive_sender.send((sender_id, data)).ok(); + } + Err(tungstenite::Error::Io(ref e)) + if e.kind() == std::io::ErrorKind::WouldBlock + || e.kind() == std::io::ErrorKind::Interrupted => + { + break; + } + Err(e) => { + warn!("relay connection closed with error: {e}"); + self.socket = None; + return; + } + } + } + } + _ => unreachable!(), } } + + // pub fn update(&mut self) { + // // If there's an unconnected stream, wait for it to be connected. + // if let Some(unconnected_stream) = self.unconnected_stream.take() { + // match unconnected_stream.peer_addr() { + // Ok(_) => { + // // Start the handshake. + // match tungstenite::client::client("wss://relay.cocosol.fr", + // unconnected_stream) { + // Ok((socket, _)) => self.socket = Some(socket), + // Err(HandshakeError::Interrupted(unfinished_handshake)) => + // { self.unfinished_handshake = + // Some(unfinished_handshake); return; + // } + // Err(HandshakeError::Failure(e)) => { + // warn!("relay handshake failed: {}", e); + // return; + // } + // } + // } + // Err(ref e) if e.kind() == std::io::ErrorKind::NotConnected => { + // self.unconnected_stream = Some(unconnected_stream); + // } + // Err(e) => warn!("failed to get peer address: {}", e), + // } + // } + + // // If there's an unfinished handshake, try to finish it. + // if let Some(unfinished_handshake) = self.unfinished_handshake.take() { + // match unfinished_handshake.handshake() { + // Ok((socket, _)) => self.socket = Some(socket), + // Err(HandshakeError::Interrupted(unfinished_handshake)) => { + // self.unfinished_handshake = Some(unfinished_handshake) + // } + // Err(HandshakeError::Failure(e)) => warn!("relay handshake failed: + // {}", e), } + // } + + // // If there's no socket yet, try to connect. + // let socket = match self.socket { + // Some(ref mut socket) => socket, + // None => { + // // Resolve the relay address list. + // let address_list = match "relay.cocosol.fr:443".to_socket_addrs() + // { Ok(address_list) => address_list, + // Err(e) => { + // warn!("failed to resolve relay address: {}", e); + // return; + // } + // }; + + // // Take a random relay address. + // let Some(address) = address_list.choose(&mut rand::thread_rng()) + // else { warn!("no relay address available"); + // return; + // }; + + // // Create a [TcpStream] connected to the relay. + // self.unconnected_stream = match TcpStream::connect(address) { + // Ok(stream) => Some(stream), + // Err(e) => { + // warn!("failed to connect to relay: {}", e); + // return; + // } + // }; + + // // Return because the socket is not connected yet. + // return; + // } + // }; + + // // Send messages from the send channel to the socket. + // while let Ok(message) = self.send_receiver.try_recv() { + // match socket.send(message) { + // Ok(()) => (), + // Err(tungstenite::Error::Io(ref e)) + // if e.kind() == std::io::ErrorKind::WouldBlock + // || e.kind() == std::io::ErrorKind::Interrupted => + // { + // break; + // } + // Err(e) => { + // warn!("relay connection closed with error: {}", e); + // self.socket = None; + // return; + // } + // } + // } + + // // Receive messages from the socket and send them to the receive channel. + // loop { + // match socket.read() { + // Ok(message) => { + // // Check the message length. + // let mut data = message.into_data(); + // if data.len() < 4 { + // warn!("received malformed message: {}", data.len()); + // continue; + // } + + // // Extract the sender ID. + // let id_start = data.len() - 4; + // let sender_id = u32::from_be_bytes( + // data[id_start..] + // .try_into() + // .unwrap_or_else(|_| unreachable!()), + // ); + // data.truncate(id_start); + + // // Send the message to the receive channel. + // self.receive_sender.send((sender_id, data)).ok(); + // } + // Err(tungstenite::Error::Io(ref e)) + // if e.kind() == std::io::ErrorKind::WouldBlock + // || e.kind() == std::io::ErrorKind::Interrupted => + // { + // break; + // } + // Err(e) => { + // warn!("relay connection closed with error: {}", e); + // self.socket = None; + // return; + // } + // } + // } + // } } diff --git a/crates/relay-client/src/main.rs b/crates/relay-client/src/main.rs new file mode 100644 index 0000000..73b7ad9 --- /dev/null +++ b/crates/relay-client/src/main.rs @@ -0,0 +1,20 @@ +//! TODO + +use std::net::TcpStream; +use std::thread; +use std::time::Duration; + +use relay_client::RelayConnection; + +fn main() { + pretty_env_logger::init(); + + let mut connection = RelayConnection::new("relay.cocosol.fr".to_string()); + + loop { + connection.update(); + if let Some((sender_id, data)) = connection.read() { + println!("Received message from {sender_id}: {:?}", data); + } + } +} -- 2.43.4 From c9c95bbf1ba1d708b3db3f1c0aa16e7928da0dce Mon Sep 17 00:00:00 2001 From: Tipragot Date: Mon, 12 Feb 2024 02:57:51 +0100 Subject: [PATCH 3/7] Working system --- crates/relay-client/src/lib.rs | 474 ++++++++++++-------------------- crates/relay-client/src/main.rs | 19 +- 2 files changed, 192 insertions(+), 301 deletions(-) diff --git a/crates/relay-client/src/lib.rs b/crates/relay-client/src/lib.rs index a7a3247..2bdc5c6 100644 --- a/crates/relay-client/src/lib.rs +++ b/crates/relay-client/src/lib.rs @@ -1,41 +1,47 @@ //! A client to use a relay server. use std::borrow::Cow; -use std::io::ErrorKind; -use std::net::ToSocketAddrs; +use std::io::{self, ErrorKind, Read}; +use std::net::{SocketAddr, ToSocketAddrs}; use std::sync::mpsc::{channel, Receiver, Sender}; +use std::time::{Duration, Instant}; -use log::warn; +use log::{info, warn}; use mio::net::TcpStream; -use rand::seq::IteratorRandom; +use mio::{Events, Interest, Poll, Token}; +use rand::seq::{IteratorRandom, SliceRandom}; +use tungstenite::client::{uri_mode, IntoClientRequest}; +use tungstenite::handshake::client::Request; use tungstenite::handshake::MidHandshake; use tungstenite::stream::MaybeTlsStream; use tungstenite::{ClientHandshake, HandshakeError, Message, WebSocket}; /// The state of a [RelayConnection]. -#[derive(Debug, PartialEq, Eq)] -pub enum RelayConnectionState { +#[derive(Debug)] +pub enum ConnectionState { /// The [RelayConnection] is not connected. Disconnected, /// The underlying [TcpStream] is connecting. - Connecting, + StreamConnecting(TcpStream, Instant), - /// The [RelayConnection] is making a tls handshake with the relay server. - TlsHandshaking, + /// The underlying [TcpStream] is connected. + StreamConnected(TcpStream), - /// The [RelayConnection] is making a websocket handshake with the relay - /// server. - WebsocketHandshaking, + /// The websocket handshake is in progress. + Handshaking(MidHandshake>>), /// The [RelayConnection] is connected. - Connected, + Connected(WebSocket>), } /// A connection to a relay server. pub struct RelayConnection { - /// The address of the relay server. - server_address: String, + /// The address list corresponding to the relay server. + address_list: Vec, + + /// The domain of the relay server. + domain: String, /// The receiver part of the send channel. /// @@ -61,46 +67,25 @@ pub struct RelayConnection { /// been received from the relay server. receive_sender: Sender<(u32, Vec)>, - /// If the [TcpStream] is not currently connected it will be stored here. - stream: Option, - - /// If the websocket handshake is not complete it will be stored here. - handshake: Option>>>, - - /// When the websocket is correctly connected it will be stored here. - socket: Option>>, + /// The state of the connection. + pub state: ConnectionState, } impl RelayConnection { /// Create a new [RelayConnection]. - pub fn new(server_address: String) -> Self { + pub fn new<'a>(domain: impl Into>) -> io::Result { + let domain = domain.into(); let (send_sender, send_receiver) = channel(); let (receive_sender, receive_receiver) = channel(); - Self { - server_address, + Ok(Self { + address_list: (domain.as_ref(), 443).to_socket_addrs()?.collect(), + domain: domain.into_owned(), send_receiver, send_sender, receive_receiver, receive_sender, - stream: None, - handshake: None, - socket: None, - } - } - - /// Returns the state of the [RelayConnection]. - pub fn state(&self) -> RelayConnectionState { - match ( - self.stream.is_some(), - self.handshake.is_some(), - self.socket.is_some(), - ) { - (false, false, false) => RelayConnectionState::Disconnected, - (true, false, false) => RelayConnectionState::Connecting, - (false, true, false) => RelayConnectionState::WebsocketHandshaking, - (false, false, true) => RelayConnectionState::Connected, - _ => unreachable!(), - } + state: ConnectionState::Disconnected, + }) } /// Send a message to the target client. @@ -115,266 +100,159 @@ impl RelayConnection { self.receive_receiver.try_recv().ok() } + /// Create a new [TcpStream] to the relay server. + fn create_stream(&mut self) -> ConnectionState { + // Take a random relay address. + let Some(address) = self.address_list.choose(&mut rand::thread_rng()) else { + warn!("no relay address available"); + return ConnectionState::Disconnected; + }; + + // Create the new TCP stream. + match TcpStream::connect(address.to_owned()) { + Ok(stream) => ConnectionState::StreamConnecting(stream, Instant::now()), + Err(e) => { + warn!("failed to start connection to the relay server: {e}"); + ConnectionState::Disconnected + } + } + } + + /// Check if the [TcpStream] of the [RelayConnection] is connected. + fn check_connection(stream: TcpStream, start_time: Instant) -> ConnectionState { + // Check for connection errors. + if let Err(e) = stream.take_error() { + warn!("failed to connect to the relay server: {e}"); + return ConnectionState::Disconnected; + } + + // Check if the stream is connected. + let connected = match stream.peek(&mut [0]) { + Ok(_) => true, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true, + Err(ref e) if e.kind() == io::ErrorKind::NotConnected => false, + Err(e) => { + warn!("failed to connect to the relay server: {e}"); + return ConnectionState::Disconnected; + } + }; + + // Check if the connection has timed out. + let elapsed = start_time.elapsed(); + if elapsed > Duration::from_secs(5) { + warn!("connection to the relay server timed out"); + return ConnectionState::Disconnected; + } + + // Update the connection state if connected. + match connected { + true => ConnectionState::StreamConnected(stream), + false => ConnectionState::StreamConnecting(stream, start_time), + } + } + + /// Start the websocket handshake. + fn start_handshake(&mut self, stream: TcpStream) -> ConnectionState { + match tungstenite::client_tls(format!("wss://{}", self.domain), stream) { + Ok((socket, _)) => ConnectionState::Connected(socket), + Err(HandshakeError::Interrupted(handshake)) => ConnectionState::Handshaking(handshake), + Err(HandshakeError::Failure(e)) => { + warn!("handshake failed with the relay server: {e}"); + ConnectionState::Disconnected + } + } + } + + /// Continue the websocket handshake. + fn continue_handshake( + handshake: MidHandshake>>, + ) -> ConnectionState { + match handshake.handshake() { + Ok((socket, _)) => ConnectionState::Connected(socket), + Err(HandshakeError::Interrupted(handshake)) => ConnectionState::Handshaking(handshake), + Err(HandshakeError::Failure(e)) => { + warn!("handshake failed with the relay server: {e}"); + ConnectionState::Disconnected + } + } + } + + /// Update the [RelayConnection] by receiving and sending messages. + fn update_connection( + &mut self, + mut socket: WebSocket>, + ) -> ConnectionState { + // Send messages from the send channel to the socket. + while let Ok(message) = self.send_receiver.try_recv() { + match socket.send(message) { + Ok(()) => (), + Err(tungstenite::Error::Io(ref e)) + if e.kind() == std::io::ErrorKind::WouldBlock + || e.kind() == std::io::ErrorKind::Interrupted => + { + break; + } + Err(e) => { + warn!("relay connection closed: {e}"); + return ConnectionState::Disconnected; + } + } + } + + // Receive messages from the socket and send them to the receive channel. + loop { + match socket.read() { + Ok(message) => { + // Check the message length. + let mut data = message.into_data(); + if data.len() < 4 { + warn!("received malformed message with length: {}", data.len()); + continue; + } + + // Extract the sender ID. + let id_start = data.len() - 4; + let sender_id = u32::from_be_bytes( + data[id_start..] + .try_into() + .unwrap_or_else(|_| unreachable!()), + ); + data.truncate(id_start); + + // Send the message to the receive channel. + self.receive_sender.send((sender_id, data)).ok(); + } + Err(tungstenite::Error::Io(ref e)) + if e.kind() == std::io::ErrorKind::WouldBlock + || e.kind() == std::io::ErrorKind::Interrupted => + { + break; + } + Err(e) => { + warn!("relay connection closed: {e}"); + return ConnectionState::Disconnected; + } + } + } + + // Keep the connection connected. + ConnectionState::Connected(socket) + } + /// Update the [RelayConnection]. /// /// This function will connect to the relay server if it's not already /// connected, and will send and receive messages from the relay server /// if it's connected. pub fn update(&mut self) { - match ( - self.stream.take(), - self.handshake.take(), - self.socket.as_mut(), - ) { - (None, None, None) => { - // Resolve the relay address list. - let mut address = self.server_address.clone(); - address.push_str(":443"); - let address_list = match address.to_socket_addrs() { - Ok(address_list) => address_list, - Err(e) => { - warn!("failed to resolve relay address: {e}"); - return; - } - }; - - // Take a random relay address. - let Some(address) = address_list.choose(&mut rand::thread_rng()) else { - warn!("no relay address available"); - return; - }; - - // Start the connection to the relay. - match TcpStream::connect(address) { - Ok(stream) => self.stream = Some(stream), - Err(e) => warn!("failed to start connection to the relay server: {e}"), - } + self.state = match std::mem::replace(&mut self.state, ConnectionState::Disconnected) { + ConnectionState::Disconnected => self.create_stream(), + ConnectionState::StreamConnecting(stream, start_time) => { + Self::check_connection(stream, start_time) } - (Some(stream), None, None) => { - // Check if there is an error while connecting. - if let Ok(Some(e)) | Err(e) = stream.take_error() { - warn!("failed to connect to relay: {e}"); - return; - } - - // Check if the stream is connected. - match stream.peer_addr() { - Ok(_) => { - // Start the websocket handshake. - match tungstenite::client_tls( - format!("wss://{}", self.server_address), - stream, - ) { - Ok((socket, _)) => self.socket = Some(socket), - Err(HandshakeError::Interrupted(handshake)) => { - self.handshake = Some(handshake); - } - Err(HandshakeError::Failure(e)) => { - warn!("relay handshake failed: {e}") - } - } - } - Err(ref e) if e.kind() == ErrorKind::NotConnected => { - self.stream = Some(stream); - } - Err(e) => warn!("failed to connect to relay: {e}"), - } - } - (None, Some(handshake), None) => { - // Check if the handshake is complete. - match handshake.handshake() { - Ok((socket, _)) => self.socket = Some(socket), - Err(HandshakeError::Interrupted(unfinished_handshake)) => { - self.handshake = Some(unfinished_handshake); - } - Err(HandshakeError::Failure(e)) => { - warn!("relay websocket handshake failed: {e}") - } - } - } - (None, None, Some(socket)) => { - // Send messages from the send channel to the socket. - while let Ok(message) = self.send_receiver.try_recv() { - match socket.send(message) { - Ok(()) => (), - Err(tungstenite::Error::Io(ref e)) - if e.kind() == std::io::ErrorKind::WouldBlock - || e.kind() == std::io::ErrorKind::Interrupted => - { - break; - } - Err(e) => { - warn!("relay connection closed with error: {e}"); - self.socket = None; - return; - } - } - } - - // Receive messages from the socket and send them to the receive channel. - loop { - match socket.read() { - Ok(message) => { - // Check the message length. - let mut data = message.into_data(); - if data.len() < 4 { - warn!("received malformed message with length: {}", data.len()); - continue; - } - - // Extract the sender ID. - let id_start = data.len() - 4; - let sender_id = u32::from_be_bytes( - data[id_start..] - .try_into() - .unwrap_or_else(|_| unreachable!()), - ); - data.truncate(id_start); - - // Send the message to the receive channel. - self.receive_sender.send((sender_id, data)).ok(); - } - Err(tungstenite::Error::Io(ref e)) - if e.kind() == std::io::ErrorKind::WouldBlock - || e.kind() == std::io::ErrorKind::Interrupted => - { - break; - } - Err(e) => { - warn!("relay connection closed with error: {e}"); - self.socket = None; - return; - } - } - } - } - _ => unreachable!(), + ConnectionState::StreamConnected(stream) => self.start_handshake(stream), + ConnectionState::Handshaking(handshake) => Self::continue_handshake(handshake), + ConnectionState::Connected(socket) => self.update_connection(socket), } } - - // pub fn update(&mut self) { - // // If there's an unconnected stream, wait for it to be connected. - // if let Some(unconnected_stream) = self.unconnected_stream.take() { - // match unconnected_stream.peer_addr() { - // Ok(_) => { - // // Start the handshake. - // match tungstenite::client::client("wss://relay.cocosol.fr", - // unconnected_stream) { - // Ok((socket, _)) => self.socket = Some(socket), - // Err(HandshakeError::Interrupted(unfinished_handshake)) => - // { self.unfinished_handshake = - // Some(unfinished_handshake); return; - // } - // Err(HandshakeError::Failure(e)) => { - // warn!("relay handshake failed: {}", e); - // return; - // } - // } - // } - // Err(ref e) if e.kind() == std::io::ErrorKind::NotConnected => { - // self.unconnected_stream = Some(unconnected_stream); - // } - // Err(e) => warn!("failed to get peer address: {}", e), - // } - // } - - // // If there's an unfinished handshake, try to finish it. - // if let Some(unfinished_handshake) = self.unfinished_handshake.take() { - // match unfinished_handshake.handshake() { - // Ok((socket, _)) => self.socket = Some(socket), - // Err(HandshakeError::Interrupted(unfinished_handshake)) => { - // self.unfinished_handshake = Some(unfinished_handshake) - // } - // Err(HandshakeError::Failure(e)) => warn!("relay handshake failed: - // {}", e), } - // } - - // // If there's no socket yet, try to connect. - // let socket = match self.socket { - // Some(ref mut socket) => socket, - // None => { - // // Resolve the relay address list. - // let address_list = match "relay.cocosol.fr:443".to_socket_addrs() - // { Ok(address_list) => address_list, - // Err(e) => { - // warn!("failed to resolve relay address: {}", e); - // return; - // } - // }; - - // // Take a random relay address. - // let Some(address) = address_list.choose(&mut rand::thread_rng()) - // else { warn!("no relay address available"); - // return; - // }; - - // // Create a [TcpStream] connected to the relay. - // self.unconnected_stream = match TcpStream::connect(address) { - // Ok(stream) => Some(stream), - // Err(e) => { - // warn!("failed to connect to relay: {}", e); - // return; - // } - // }; - - // // Return because the socket is not connected yet. - // return; - // } - // }; - - // // Send messages from the send channel to the socket. - // while let Ok(message) = self.send_receiver.try_recv() { - // match socket.send(message) { - // Ok(()) => (), - // Err(tungstenite::Error::Io(ref e)) - // if e.kind() == std::io::ErrorKind::WouldBlock - // || e.kind() == std::io::ErrorKind::Interrupted => - // { - // break; - // } - // Err(e) => { - // warn!("relay connection closed with error: {}", e); - // self.socket = None; - // return; - // } - // } - // } - - // // Receive messages from the socket and send them to the receive channel. - // loop { - // match socket.read() { - // Ok(message) => { - // // Check the message length. - // let mut data = message.into_data(); - // if data.len() < 4 { - // warn!("received malformed message: {}", data.len()); - // continue; - // } - - // // Extract the sender ID. - // let id_start = data.len() - 4; - // let sender_id = u32::from_be_bytes( - // data[id_start..] - // .try_into() - // .unwrap_or_else(|_| unreachable!()), - // ); - // data.truncate(id_start); - - // // Send the message to the receive channel. - // self.receive_sender.send((sender_id, data)).ok(); - // } - // Err(tungstenite::Error::Io(ref e)) - // if e.kind() == std::io::ErrorKind::WouldBlock - // || e.kind() == std::io::ErrorKind::Interrupted => - // { - // break; - // } - // Err(e) => { - // warn!("relay connection closed with error: {}", e); - // self.socket = None; - // return; - // } - // } - // } - // } } diff --git a/crates/relay-client/src/main.rs b/crates/relay-client/src/main.rs index 73b7ad9..c207650 100644 --- a/crates/relay-client/src/main.rs +++ b/crates/relay-client/src/main.rs @@ -1,6 +1,6 @@ //! TODO -use std::net::TcpStream; +use std::io::{stdout, Write}; use std::thread; use std::time::Duration; @@ -9,12 +9,25 @@ use relay_client::RelayConnection; fn main() { pretty_env_logger::init(); - let mut connection = RelayConnection::new("relay.cocosol.fr".to_string()); + let mut connection = RelayConnection::new("relay.cocosol.fr").unwrap(); loop { connection.update(); + print!( + "\rState: {}", + match connection.state { + relay_client::ConnectionState::Disconnected => "Disconnected".to_string(), + relay_client::ConnectionState::StreamConnecting(_, instant) => + format!("StreamConnecting {:?}", instant.elapsed()), + relay_client::ConnectionState::StreamConnected(_) => "StreamConnected".to_string(), + relay_client::ConnectionState::Handshaking(_) => "Handshaking".to_string(), + relay_client::ConnectionState::Connected(_) => "Connected".to_string(), + } + ); + stdout().flush().unwrap(); + thread::sleep(Duration::from_millis(10)); if let Some((sender_id, data)) = connection.read() { - println!("Received message from {sender_id}: {:?}", data); + println!("\nReceived message from {sender_id}: {:?}", data); } } } -- 2.43.4 From 74a2387a8234e75d303d7788d6dd56b141a1fd96 Mon Sep 17 00:00:00 2001 From: Tipragot Date: Mon, 12 Feb 2024 03:08:07 +0100 Subject: [PATCH 4/7] Cleaning --- Cargo.lock | 96 --------------------------------- crates/relay-client/Cargo.toml | 6 +-- crates/relay-client/src/lib.rs | 68 +++++++++++------------ crates/relay-client/src/main.rs | 33 ------------ 4 files changed, 34 insertions(+), 169 deletions(-) delete mode 100644 crates/relay-client/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 1f4009e..0198598 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1796,19 +1796,6 @@ dependencies = [ "syn 2.0.48", ] -[[package]] -name = "env_logger" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580" -dependencies = [ - "humantime", - "is-terminal", - "log", - "regex", - "termcolor", -] - [[package]] name = "epaint" version = "0.24.1" @@ -2485,12 +2472,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" -[[package]] -name = "humantime" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" - [[package]] name = "hyper" version = "1.1.0" @@ -2640,17 +2621,6 @@ dependencies = [ "mach2", ] -[[package]] -name = "is-terminal" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" -dependencies = [ - "hermit-abi", - "libc", - "windows-sys 0.52.0", -] - [[package]] name = "itertools" version = "0.12.1" @@ -3315,50 +3285,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" -[[package]] -name = "openssl" -version = "0.10.63" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15c9d69dd87a29568d4d017cfe8ec518706046a05184e5aea92d0af890b803c8" -dependencies = [ - "bitflags 2.4.2", - "cfg-if", - "foreign-types 0.3.2", - "libc", - "once_cell", - "openssl-macros", - "openssl-sys", -] - -[[package]] -name = "openssl-macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.48", -] - [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" -[[package]] -name = "openssl-sys" -version = "0.9.99" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22e1bf214306098e4832460f797824c05d25aacdf896f64a985fb0fd992454ae" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "orbclient" version = "0.3.47" @@ -3523,16 +3455,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" -[[package]] -name = "pretty_env_logger" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "865724d4dbe39d9f3dd3b52b88d859d66bcb2d6a0acfd5ea68a65fb66d4bdc1c" -dependencies = [ - "env_logger", - "log", -] - [[package]] name = "proc-macro-crate" version = "1.3.1" @@ -3695,8 +3617,6 @@ version = "0.2.0" dependencies = [ "log", "mio", - "openssl", - "pretty_env_logger", "rand", "tungstenite", ] @@ -4421,7 +4341,6 @@ dependencies = [ "thiserror", "url", "utf-8", - "webpki-roots", ] [[package]] @@ -4522,12 +4441,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" -[[package]] -name = "vcpkg" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" - [[package]] name = "vec_map" version = "0.8.2" @@ -4666,15 +4579,6 @@ dependencies = [ "web-sys", ] -[[package]] -name = "webpki-roots" -version = "0.26.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009" -dependencies = [ - "rustls-pki-types", -] - [[package]] name = "weezl" version = "0.1.8" diff --git a/crates/relay-client/Cargo.toml b/crates/relay-client/Cargo.toml index d93711f..3df6c3a 100644 --- a/crates/relay-client/Cargo.toml +++ b/crates/relay-client/Cargo.toml @@ -12,9 +12,7 @@ categories = ["network-programming", "game-development"] workspace = true [dependencies] -log = "0.4.20" +tungstenite = { version = "0.21.0", features = ["rustls-tls-native-roots"] } mio = { version = "0.8.10", features = ["net", "os-poll"] } -openssl = "0.10.63" -pretty_env_logger = "0.5.0" rand = "0.8.5" -tungstenite = { version = "0.21.0", features = ["rustls", "rustls-native-certs", "rustls-pki-types", "rustls-tls-native-roots", "rustls-tls-webpki-roots"] } +log = "0.4.20" diff --git a/crates/relay-client/src/lib.rs b/crates/relay-client/src/lib.rs index 2bdc5c6..ba6e23a 100644 --- a/crates/relay-client/src/lib.rs +++ b/crates/relay-client/src/lib.rs @@ -1,42 +1,39 @@ //! A client to use a relay server. use std::borrow::Cow; -use std::io::{self, ErrorKind, Read}; +use std::io::{self}; use std::net::{SocketAddr, ToSocketAddrs}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::time::{Duration, Instant}; -use log::{info, warn}; +use log::warn; use mio::net::TcpStream; -use mio::{Events, Interest, Poll, Token}; -use rand::seq::{IteratorRandom, SliceRandom}; -use tungstenite::client::{uri_mode, IntoClientRequest}; -use tungstenite::handshake::client::Request; +use rand::seq::SliceRandom; use tungstenite::handshake::MidHandshake; use tungstenite::stream::MaybeTlsStream; use tungstenite::{ClientHandshake, HandshakeError, Message, WebSocket}; -/// The state of a [RelayConnection]. +/// The state of a [Connection]. #[derive(Debug)] pub enum ConnectionState { - /// The [RelayConnection] is not connected. + /// The [Connection] is not connected. Disconnected, /// The underlying [TcpStream] is connecting. - StreamConnecting(TcpStream, Instant), + Connecting(TcpStream, Instant), /// The underlying [TcpStream] is connected. - StreamConnected(TcpStream), + Connected(TcpStream), /// The websocket handshake is in progress. Handshaking(MidHandshake>>), - /// The [RelayConnection] is connected. - Connected(WebSocket>), + /// The [Connection] is connected. + Active(WebSocket>), } /// A connection to a relay server. -pub struct RelayConnection { +pub struct Connection { /// The address list corresponding to the relay server. address_list: Vec, @@ -45,25 +42,25 @@ pub struct RelayConnection { /// The receiver part of the send channel. /// - /// This is used in [RelayConnection::update] to get messages that need to + /// This is used in [Connection::update] to get messages that need to /// be sent to the relay server. send_receiver: Receiver, /// The sender part of the receive channel. /// - /// This is used in [RelayConnection::send] to store messages that need to + /// This is used in [Connection::send] to store messages that need to /// be sent to the relay server. send_sender: Sender, /// The receiver part of the receive channel. /// - /// This is used in [RelayConnection::read] to get messages that have been + /// This is used in [Connection::read] to get messages that have been /// received from the relay server. receive_receiver: Receiver<(u32, Vec)>, /// The sender part of the send channel. /// - /// This is used in [RelayConnection::update] to store messages that have + /// This is used in [Connection::update] to store messages that have /// been received from the relay server. receive_sender: Sender<(u32, Vec)>, @@ -71,8 +68,8 @@ pub struct RelayConnection { pub state: ConnectionState, } -impl RelayConnection { - /// Create a new [RelayConnection]. +impl Connection { + /// Create a new [Connection]. pub fn new<'a>(domain: impl Into>) -> io::Result { let domain = domain.into(); let (send_sender, send_receiver) = channel(); @@ -110,7 +107,7 @@ impl RelayConnection { // Create the new TCP stream. match TcpStream::connect(address.to_owned()) { - Ok(stream) => ConnectionState::StreamConnecting(stream, Instant::now()), + Ok(stream) => ConnectionState::Connecting(stream, Instant::now()), Err(e) => { warn!("failed to start connection to the relay server: {e}"); ConnectionState::Disconnected @@ -118,8 +115,8 @@ impl RelayConnection { } } - /// Check if the [TcpStream] of the [RelayConnection] is connected. - fn check_connection(stream: TcpStream, start_time: Instant) -> ConnectionState { + /// Check if the [TcpStream] of the [Connection] is connected. + fn check_connection(&mut self, stream: TcpStream, start: Instant) -> ConnectionState { // Check for connection errors. if let Err(e) = stream.take_error() { warn!("failed to connect to the relay server: {e}"); @@ -138,7 +135,7 @@ impl RelayConnection { }; // Check if the connection has timed out. - let elapsed = start_time.elapsed(); + let elapsed = start.elapsed(); if elapsed > Duration::from_secs(5) { warn!("connection to the relay server timed out"); return ConnectionState::Disconnected; @@ -146,15 +143,15 @@ impl RelayConnection { // Update the connection state if connected. match connected { - true => ConnectionState::StreamConnected(stream), - false => ConnectionState::StreamConnecting(stream, start_time), + true => ConnectionState::Connected(stream), + false => ConnectionState::Connecting(stream, start), } } /// Start the websocket handshake. fn start_handshake(&mut self, stream: TcpStream) -> ConnectionState { match tungstenite::client_tls(format!("wss://{}", self.domain), stream) { - Ok((socket, _)) => ConnectionState::Connected(socket), + Ok((socket, _)) => ConnectionState::Active(socket), Err(HandshakeError::Interrupted(handshake)) => ConnectionState::Handshaking(handshake), Err(HandshakeError::Failure(e)) => { warn!("handshake failed with the relay server: {e}"); @@ -165,10 +162,11 @@ impl RelayConnection { /// Continue the websocket handshake. fn continue_handshake( + &mut self, handshake: MidHandshake>>, ) -> ConnectionState { match handshake.handshake() { - Ok((socket, _)) => ConnectionState::Connected(socket), + Ok((socket, _)) => ConnectionState::Active(socket), Err(HandshakeError::Interrupted(handshake)) => ConnectionState::Handshaking(handshake), Err(HandshakeError::Failure(e)) => { warn!("handshake failed with the relay server: {e}"); @@ -177,7 +175,7 @@ impl RelayConnection { } } - /// Update the [RelayConnection] by receiving and sending messages. + /// Update the [Connection] by receiving and sending messages. fn update_connection( &mut self, mut socket: WebSocket>, @@ -236,10 +234,10 @@ impl RelayConnection { } // Keep the connection connected. - ConnectionState::Connected(socket) + ConnectionState::Active(socket) } - /// Update the [RelayConnection]. + /// Update the [Connection]. /// /// This function will connect to the relay server if it's not already /// connected, and will send and receive messages from the relay server @@ -247,12 +245,10 @@ impl RelayConnection { pub fn update(&mut self) { self.state = match std::mem::replace(&mut self.state, ConnectionState::Disconnected) { ConnectionState::Disconnected => self.create_stream(), - ConnectionState::StreamConnecting(stream, start_time) => { - Self::check_connection(stream, start_time) - } - ConnectionState::StreamConnected(stream) => self.start_handshake(stream), - ConnectionState::Handshaking(handshake) => Self::continue_handshake(handshake), - ConnectionState::Connected(socket) => self.update_connection(socket), + ConnectionState::Connecting(stream, start) => self.check_connection(stream, start), + ConnectionState::Connected(stream) => self.start_handshake(stream), + ConnectionState::Handshaking(handshake) => self.continue_handshake(handshake), + ConnectionState::Active(socket) => self.update_connection(socket), } } } diff --git a/crates/relay-client/src/main.rs b/crates/relay-client/src/main.rs deleted file mode 100644 index c207650..0000000 --- a/crates/relay-client/src/main.rs +++ /dev/null @@ -1,33 +0,0 @@ -//! TODO - -use std::io::{stdout, Write}; -use std::thread; -use std::time::Duration; - -use relay_client::RelayConnection; - -fn main() { - pretty_env_logger::init(); - - let mut connection = RelayConnection::new("relay.cocosol.fr").unwrap(); - - loop { - connection.update(); - print!( - "\rState: {}", - match connection.state { - relay_client::ConnectionState::Disconnected => "Disconnected".to_string(), - relay_client::ConnectionState::StreamConnecting(_, instant) => - format!("StreamConnecting {:?}", instant.elapsed()), - relay_client::ConnectionState::StreamConnected(_) => "StreamConnected".to_string(), - relay_client::ConnectionState::Handshaking(_) => "Handshaking".to_string(), - relay_client::ConnectionState::Connected(_) => "Connected".to_string(), - } - ); - stdout().flush().unwrap(); - thread::sleep(Duration::from_millis(10)); - if let Some((sender_id, data)) = connection.read() { - println!("\nReceived message from {sender_id}: {:?}", data); - } - } -} -- 2.43.4 From 2bd5b441792dd7f616e30cfd38ce0c630a902d5d Mon Sep 17 00:00:00 2001 From: Tipragot Date: Mon, 12 Feb 2024 03:08:57 +0100 Subject: [PATCH 5/7] dqsdqsd --- crates/relay-client/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/relay-client/src/lib.rs b/crates/relay-client/src/lib.rs index ba6e23a..267f086 100644 --- a/crates/relay-client/src/lib.rs +++ b/crates/relay-client/src/lib.rs @@ -242,6 +242,8 @@ impl Connection { /// This function will connect to the relay server if it's not already /// connected, and will send and receive messages from the relay server /// if it's connected. + /// + /// This function will not block the current thread. pub fn update(&mut self) { self.state = match std::mem::replace(&mut self.state, ConnectionState::Disconnected) { ConnectionState::Disconnected => self.create_stream(), -- 2.43.4 From 3847bbd3ebd891376abcdbfef39b89fca3a9abcd Mon Sep 17 00:00:00 2001 From: Tipragot Date: Mon, 12 Feb 2024 03:10:04 +0100 Subject: [PATCH 6/7] dqsdqsdqsd --- crates/relay-client/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/relay-client/src/lib.rs b/crates/relay-client/src/lib.rs index 267f086..46ee8f3 100644 --- a/crates/relay-client/src/lib.rs +++ b/crates/relay-client/src/lib.rs @@ -1,4 +1,4 @@ -//! A client to use a relay server. +//! A library containing a client to use a relay server. use std::borrow::Cow; use std::io::{self}; -- 2.43.4 From 9e6c0306976bcb972eacbf0b9ae27ea2bbfa48f3 Mon Sep 17 00:00:00 2001 From: Tipragot Date: Mon, 12 Feb 2024 13:08:34 +0100 Subject: [PATCH 7/7] Good --- crates/relay-client/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/relay-client/src/lib.rs b/crates/relay-client/src/lib.rs index 46ee8f3..ffb9fb5 100644 --- a/crates/relay-client/src/lib.rs +++ b/crates/relay-client/src/lib.rs @@ -15,7 +15,7 @@ use tungstenite::{ClientHandshake, HandshakeError, Message, WebSocket}; /// The state of a [Connection]. #[derive(Debug)] -pub enum ConnectionState { +enum ConnectionState { /// The [Connection] is not connected. Disconnected, @@ -65,7 +65,7 @@ pub struct Connection { receive_sender: Sender<(u32, Vec)>, /// The state of the connection. - pub state: ConnectionState, + state: ConnectionState, } impl Connection { -- 2.43.4