From c9c95bbf1ba1d708b3db3f1c0aa16e7928da0dce Mon Sep 17 00:00:00 2001 From: Tipragot Date: Mon, 12 Feb 2024 02:57:51 +0100 Subject: [PATCH] Working system --- crates/relay-client/src/lib.rs | 474 ++++++++++++-------------------- crates/relay-client/src/main.rs | 19 +- 2 files changed, 192 insertions(+), 301 deletions(-) diff --git a/crates/relay-client/src/lib.rs b/crates/relay-client/src/lib.rs index a7a3247..2bdc5c6 100644 --- a/crates/relay-client/src/lib.rs +++ b/crates/relay-client/src/lib.rs @@ -1,41 +1,47 @@ //! A client to use a relay server. use std::borrow::Cow; -use std::io::ErrorKind; -use std::net::ToSocketAddrs; +use std::io::{self, ErrorKind, Read}; +use std::net::{SocketAddr, ToSocketAddrs}; use std::sync::mpsc::{channel, Receiver, Sender}; +use std::time::{Duration, Instant}; -use log::warn; +use log::{info, warn}; use mio::net::TcpStream; -use rand::seq::IteratorRandom; +use mio::{Events, Interest, Poll, Token}; +use rand::seq::{IteratorRandom, SliceRandom}; +use tungstenite::client::{uri_mode, IntoClientRequest}; +use tungstenite::handshake::client::Request; use tungstenite::handshake::MidHandshake; use tungstenite::stream::MaybeTlsStream; use tungstenite::{ClientHandshake, HandshakeError, Message, WebSocket}; /// The state of a [RelayConnection]. -#[derive(Debug, PartialEq, Eq)] -pub enum RelayConnectionState { +#[derive(Debug)] +pub enum ConnectionState { /// The [RelayConnection] is not connected. Disconnected, /// The underlying [TcpStream] is connecting. - Connecting, + StreamConnecting(TcpStream, Instant), - /// The [RelayConnection] is making a tls handshake with the relay server. - TlsHandshaking, + /// The underlying [TcpStream] is connected. + StreamConnected(TcpStream), - /// The [RelayConnection] is making a websocket handshake with the relay - /// server. - WebsocketHandshaking, + /// The websocket handshake is in progress. + Handshaking(MidHandshake>>), /// The [RelayConnection] is connected. - Connected, + Connected(WebSocket>), } /// A connection to a relay server. pub struct RelayConnection { - /// The address of the relay server. - server_address: String, + /// The address list corresponding to the relay server. + address_list: Vec, + + /// The domain of the relay server. + domain: String, /// The receiver part of the send channel. /// @@ -61,46 +67,25 @@ pub struct RelayConnection { /// been received from the relay server. receive_sender: Sender<(u32, Vec)>, - /// If the [TcpStream] is not currently connected it will be stored here. - stream: Option, - - /// If the websocket handshake is not complete it will be stored here. - handshake: Option>>>, - - /// When the websocket is correctly connected it will be stored here. - socket: Option>>, + /// The state of the connection. + pub state: ConnectionState, } impl RelayConnection { /// Create a new [RelayConnection]. - pub fn new(server_address: String) -> Self { + pub fn new<'a>(domain: impl Into>) -> io::Result { + let domain = domain.into(); let (send_sender, send_receiver) = channel(); let (receive_sender, receive_receiver) = channel(); - Self { - server_address, + Ok(Self { + address_list: (domain.as_ref(), 443).to_socket_addrs()?.collect(), + domain: domain.into_owned(), send_receiver, send_sender, receive_receiver, receive_sender, - stream: None, - handshake: None, - socket: None, - } - } - - /// Returns the state of the [RelayConnection]. - pub fn state(&self) -> RelayConnectionState { - match ( - self.stream.is_some(), - self.handshake.is_some(), - self.socket.is_some(), - ) { - (false, false, false) => RelayConnectionState::Disconnected, - (true, false, false) => RelayConnectionState::Connecting, - (false, true, false) => RelayConnectionState::WebsocketHandshaking, - (false, false, true) => RelayConnectionState::Connected, - _ => unreachable!(), - } + state: ConnectionState::Disconnected, + }) } /// Send a message to the target client. @@ -115,266 +100,159 @@ impl RelayConnection { self.receive_receiver.try_recv().ok() } + /// Create a new [TcpStream] to the relay server. + fn create_stream(&mut self) -> ConnectionState { + // Take a random relay address. + let Some(address) = self.address_list.choose(&mut rand::thread_rng()) else { + warn!("no relay address available"); + return ConnectionState::Disconnected; + }; + + // Create the new TCP stream. + match TcpStream::connect(address.to_owned()) { + Ok(stream) => ConnectionState::StreamConnecting(stream, Instant::now()), + Err(e) => { + warn!("failed to start connection to the relay server: {e}"); + ConnectionState::Disconnected + } + } + } + + /// Check if the [TcpStream] of the [RelayConnection] is connected. + fn check_connection(stream: TcpStream, start_time: Instant) -> ConnectionState { + // Check for connection errors. + if let Err(e) = stream.take_error() { + warn!("failed to connect to the relay server: {e}"); + return ConnectionState::Disconnected; + } + + // Check if the stream is connected. + let connected = match stream.peek(&mut [0]) { + Ok(_) => true, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true, + Err(ref e) if e.kind() == io::ErrorKind::NotConnected => false, + Err(e) => { + warn!("failed to connect to the relay server: {e}"); + return ConnectionState::Disconnected; + } + }; + + // Check if the connection has timed out. + let elapsed = start_time.elapsed(); + if elapsed > Duration::from_secs(5) { + warn!("connection to the relay server timed out"); + return ConnectionState::Disconnected; + } + + // Update the connection state if connected. + match connected { + true => ConnectionState::StreamConnected(stream), + false => ConnectionState::StreamConnecting(stream, start_time), + } + } + + /// Start the websocket handshake. + fn start_handshake(&mut self, stream: TcpStream) -> ConnectionState { + match tungstenite::client_tls(format!("wss://{}", self.domain), stream) { + Ok((socket, _)) => ConnectionState::Connected(socket), + Err(HandshakeError::Interrupted(handshake)) => ConnectionState::Handshaking(handshake), + Err(HandshakeError::Failure(e)) => { + warn!("handshake failed with the relay server: {e}"); + ConnectionState::Disconnected + } + } + } + + /// Continue the websocket handshake. + fn continue_handshake( + handshake: MidHandshake>>, + ) -> ConnectionState { + match handshake.handshake() { + Ok((socket, _)) => ConnectionState::Connected(socket), + Err(HandshakeError::Interrupted(handshake)) => ConnectionState::Handshaking(handshake), + Err(HandshakeError::Failure(e)) => { + warn!("handshake failed with the relay server: {e}"); + ConnectionState::Disconnected + } + } + } + + /// Update the [RelayConnection] by receiving and sending messages. + fn update_connection( + &mut self, + mut socket: WebSocket>, + ) -> ConnectionState { + // Send messages from the send channel to the socket. + while let Ok(message) = self.send_receiver.try_recv() { + match socket.send(message) { + Ok(()) => (), + Err(tungstenite::Error::Io(ref e)) + if e.kind() == std::io::ErrorKind::WouldBlock + || e.kind() == std::io::ErrorKind::Interrupted => + { + break; + } + Err(e) => { + warn!("relay connection closed: {e}"); + return ConnectionState::Disconnected; + } + } + } + + // Receive messages from the socket and send them to the receive channel. + loop { + match socket.read() { + Ok(message) => { + // Check the message length. + let mut data = message.into_data(); + if data.len() < 4 { + warn!("received malformed message with length: {}", data.len()); + continue; + } + + // Extract the sender ID. + let id_start = data.len() - 4; + let sender_id = u32::from_be_bytes( + data[id_start..] + .try_into() + .unwrap_or_else(|_| unreachable!()), + ); + data.truncate(id_start); + + // Send the message to the receive channel. + self.receive_sender.send((sender_id, data)).ok(); + } + Err(tungstenite::Error::Io(ref e)) + if e.kind() == std::io::ErrorKind::WouldBlock + || e.kind() == std::io::ErrorKind::Interrupted => + { + break; + } + Err(e) => { + warn!("relay connection closed: {e}"); + return ConnectionState::Disconnected; + } + } + } + + // Keep the connection connected. + ConnectionState::Connected(socket) + } + /// Update the [RelayConnection]. /// /// This function will connect to the relay server if it's not already /// connected, and will send and receive messages from the relay server /// if it's connected. pub fn update(&mut self) { - match ( - self.stream.take(), - self.handshake.take(), - self.socket.as_mut(), - ) { - (None, None, None) => { - // Resolve the relay address list. - let mut address = self.server_address.clone(); - address.push_str(":443"); - let address_list = match address.to_socket_addrs() { - Ok(address_list) => address_list, - Err(e) => { - warn!("failed to resolve relay address: {e}"); - return; - } - }; - - // Take a random relay address. - let Some(address) = address_list.choose(&mut rand::thread_rng()) else { - warn!("no relay address available"); - return; - }; - - // Start the connection to the relay. - match TcpStream::connect(address) { - Ok(stream) => self.stream = Some(stream), - Err(e) => warn!("failed to start connection to the relay server: {e}"), - } + self.state = match std::mem::replace(&mut self.state, ConnectionState::Disconnected) { + ConnectionState::Disconnected => self.create_stream(), + ConnectionState::StreamConnecting(stream, start_time) => { + Self::check_connection(stream, start_time) } - (Some(stream), None, None) => { - // Check if there is an error while connecting. - if let Ok(Some(e)) | Err(e) = stream.take_error() { - warn!("failed to connect to relay: {e}"); - return; - } - - // Check if the stream is connected. - match stream.peer_addr() { - Ok(_) => { - // Start the websocket handshake. - match tungstenite::client_tls( - format!("wss://{}", self.server_address), - stream, - ) { - Ok((socket, _)) => self.socket = Some(socket), - Err(HandshakeError::Interrupted(handshake)) => { - self.handshake = Some(handshake); - } - Err(HandshakeError::Failure(e)) => { - warn!("relay handshake failed: {e}") - } - } - } - Err(ref e) if e.kind() == ErrorKind::NotConnected => { - self.stream = Some(stream); - } - Err(e) => warn!("failed to connect to relay: {e}"), - } - } - (None, Some(handshake), None) => { - // Check if the handshake is complete. - match handshake.handshake() { - Ok((socket, _)) => self.socket = Some(socket), - Err(HandshakeError::Interrupted(unfinished_handshake)) => { - self.handshake = Some(unfinished_handshake); - } - Err(HandshakeError::Failure(e)) => { - warn!("relay websocket handshake failed: {e}") - } - } - } - (None, None, Some(socket)) => { - // Send messages from the send channel to the socket. - while let Ok(message) = self.send_receiver.try_recv() { - match socket.send(message) { - Ok(()) => (), - Err(tungstenite::Error::Io(ref e)) - if e.kind() == std::io::ErrorKind::WouldBlock - || e.kind() == std::io::ErrorKind::Interrupted => - { - break; - } - Err(e) => { - warn!("relay connection closed with error: {e}"); - self.socket = None; - return; - } - } - } - - // Receive messages from the socket and send them to the receive channel. - loop { - match socket.read() { - Ok(message) => { - // Check the message length. - let mut data = message.into_data(); - if data.len() < 4 { - warn!("received malformed message with length: {}", data.len()); - continue; - } - - // Extract the sender ID. - let id_start = data.len() - 4; - let sender_id = u32::from_be_bytes( - data[id_start..] - .try_into() - .unwrap_or_else(|_| unreachable!()), - ); - data.truncate(id_start); - - // Send the message to the receive channel. - self.receive_sender.send((sender_id, data)).ok(); - } - Err(tungstenite::Error::Io(ref e)) - if e.kind() == std::io::ErrorKind::WouldBlock - || e.kind() == std::io::ErrorKind::Interrupted => - { - break; - } - Err(e) => { - warn!("relay connection closed with error: {e}"); - self.socket = None; - return; - } - } - } - } - _ => unreachable!(), + ConnectionState::StreamConnected(stream) => self.start_handshake(stream), + ConnectionState::Handshaking(handshake) => Self::continue_handshake(handshake), + ConnectionState::Connected(socket) => self.update_connection(socket), } } - - // pub fn update(&mut self) { - // // If there's an unconnected stream, wait for it to be connected. - // if let Some(unconnected_stream) = self.unconnected_stream.take() { - // match unconnected_stream.peer_addr() { - // Ok(_) => { - // // Start the handshake. - // match tungstenite::client::client("wss://relay.cocosol.fr", - // unconnected_stream) { - // Ok((socket, _)) => self.socket = Some(socket), - // Err(HandshakeError::Interrupted(unfinished_handshake)) => - // { self.unfinished_handshake = - // Some(unfinished_handshake); return; - // } - // Err(HandshakeError::Failure(e)) => { - // warn!("relay handshake failed: {}", e); - // return; - // } - // } - // } - // Err(ref e) if e.kind() == std::io::ErrorKind::NotConnected => { - // self.unconnected_stream = Some(unconnected_stream); - // } - // Err(e) => warn!("failed to get peer address: {}", e), - // } - // } - - // // If there's an unfinished handshake, try to finish it. - // if let Some(unfinished_handshake) = self.unfinished_handshake.take() { - // match unfinished_handshake.handshake() { - // Ok((socket, _)) => self.socket = Some(socket), - // Err(HandshakeError::Interrupted(unfinished_handshake)) => { - // self.unfinished_handshake = Some(unfinished_handshake) - // } - // Err(HandshakeError::Failure(e)) => warn!("relay handshake failed: - // {}", e), } - // } - - // // If there's no socket yet, try to connect. - // let socket = match self.socket { - // Some(ref mut socket) => socket, - // None => { - // // Resolve the relay address list. - // let address_list = match "relay.cocosol.fr:443".to_socket_addrs() - // { Ok(address_list) => address_list, - // Err(e) => { - // warn!("failed to resolve relay address: {}", e); - // return; - // } - // }; - - // // Take a random relay address. - // let Some(address) = address_list.choose(&mut rand::thread_rng()) - // else { warn!("no relay address available"); - // return; - // }; - - // // Create a [TcpStream] connected to the relay. - // self.unconnected_stream = match TcpStream::connect(address) { - // Ok(stream) => Some(stream), - // Err(e) => { - // warn!("failed to connect to relay: {}", e); - // return; - // } - // }; - - // // Return because the socket is not connected yet. - // return; - // } - // }; - - // // Send messages from the send channel to the socket. - // while let Ok(message) = self.send_receiver.try_recv() { - // match socket.send(message) { - // Ok(()) => (), - // Err(tungstenite::Error::Io(ref e)) - // if e.kind() == std::io::ErrorKind::WouldBlock - // || e.kind() == std::io::ErrorKind::Interrupted => - // { - // break; - // } - // Err(e) => { - // warn!("relay connection closed with error: {}", e); - // self.socket = None; - // return; - // } - // } - // } - - // // Receive messages from the socket and send them to the receive channel. - // loop { - // match socket.read() { - // Ok(message) => { - // // Check the message length. - // let mut data = message.into_data(); - // if data.len() < 4 { - // warn!("received malformed message: {}", data.len()); - // continue; - // } - - // // Extract the sender ID. - // let id_start = data.len() - 4; - // let sender_id = u32::from_be_bytes( - // data[id_start..] - // .try_into() - // .unwrap_or_else(|_| unreachable!()), - // ); - // data.truncate(id_start); - - // // Send the message to the receive channel. - // self.receive_sender.send((sender_id, data)).ok(); - // } - // Err(tungstenite::Error::Io(ref e)) - // if e.kind() == std::io::ErrorKind::WouldBlock - // || e.kind() == std::io::ErrorKind::Interrupted => - // { - // break; - // } - // Err(e) => { - // warn!("relay connection closed with error: {}", e); - // self.socket = None; - // return; - // } - // } - // } - // } } diff --git a/crates/relay-client/src/main.rs b/crates/relay-client/src/main.rs index 73b7ad9..c207650 100644 --- a/crates/relay-client/src/main.rs +++ b/crates/relay-client/src/main.rs @@ -1,6 +1,6 @@ //! TODO -use std::net::TcpStream; +use std::io::{stdout, Write}; use std::thread; use std::time::Duration; @@ -9,12 +9,25 @@ use relay_client::RelayConnection; fn main() { pretty_env_logger::init(); - let mut connection = RelayConnection::new("relay.cocosol.fr".to_string()); + let mut connection = RelayConnection::new("relay.cocosol.fr").unwrap(); loop { connection.update(); + print!( + "\rState: {}", + match connection.state { + relay_client::ConnectionState::Disconnected => "Disconnected".to_string(), + relay_client::ConnectionState::StreamConnecting(_, instant) => + format!("StreamConnecting {:?}", instant.elapsed()), + relay_client::ConnectionState::StreamConnected(_) => "StreamConnected".to_string(), + relay_client::ConnectionState::Handshaking(_) => "Handshaking".to_string(), + relay_client::ConnectionState::Connected(_) => "Connected".to_string(), + } + ); + stdout().flush().unwrap(); + thread::sleep(Duration::from_millis(10)); if let Some((sender_id, data)) = connection.read() { - println!("Received message from {sender_id}: {:?}", data); + println!("\nReceived message from {sender_id}: {:?}", data); } } }