Non blocking relay connection #44

Merged
CoCo_Sol merged 7 commits from relay-client into main 2024-02-12 14:31:56 +00:00
4 changed files with 34 additions and 169 deletions
Showing only changes of commit 74a2387a82 - Show all commits

96
Cargo.lock generated
View file

@ -1796,19 +1796,6 @@ dependencies = [
"syn 2.0.48",
]
[[package]]
name = "env_logger"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580"
dependencies = [
"humantime",
"is-terminal",
"log",
"regex",
"termcolor",
]
[[package]]
name = "epaint"
version = "0.24.1"
@ -2485,12 +2472,6 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "1.1.0"
@ -2640,17 +2621,6 @@ dependencies = [
"mach2",
]
[[package]]
name = "is-terminal"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b"
dependencies = [
"hermit-abi",
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "itertools"
version = "0.12.1"
@ -3315,50 +3285,12 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "openssl"
version = "0.10.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15c9d69dd87a29568d4d017cfe8ec518706046a05184e5aea92d0af890b803c8"
dependencies = [
"bitflags 2.4.2",
"cfg-if",
"foreign-types 0.3.2",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]
[[package]]
name = "openssl-macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
]
[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22e1bf214306098e4832460f797824c05d25aacdf896f64a985fb0fd992454ae"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "orbclient"
version = "0.3.47"
@ -3523,16 +3455,6 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "pretty_env_logger"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "865724d4dbe39d9f3dd3b52b88d859d66bcb2d6a0acfd5ea68a65fb66d4bdc1c"
dependencies = [
"env_logger",
"log",
]
[[package]]
name = "proc-macro-crate"
version = "1.3.1"
@ -3695,8 +3617,6 @@ version = "0.2.0"
dependencies = [
"log",
"mio",
"openssl",
"pretty_env_logger",
"rand",
"tungstenite",
]
@ -4421,7 +4341,6 @@ dependencies = [
"thiserror",
"url",
"utf-8",
"webpki-roots",
]
[[package]]
@ -4522,12 +4441,6 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "vec_map"
version = "0.8.2"
@ -4666,15 +4579,6 @@ dependencies = [
"web-sys",
]
[[package]]
name = "webpki-roots"
version = "0.26.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009"
dependencies = [
"rustls-pki-types",
]
[[package]]
name = "weezl"
version = "0.1.8"

View file

@ -12,9 +12,7 @@ categories = ["network-programming", "game-development"]
workspace = true
[dependencies]
log = "0.4.20"
tungstenite = { version = "0.21.0", features = ["rustls-tls-native-roots"] }
mio = { version = "0.8.10", features = ["net", "os-poll"] }
openssl = "0.10.63"
pretty_env_logger = "0.5.0"
rand = "0.8.5"
tungstenite = { version = "0.21.0", features = ["rustls", "rustls-native-certs", "rustls-pki-types", "rustls-tls-native-roots", "rustls-tls-webpki-roots"] }
log = "0.4.20"

View file

@ -1,42 +1,39 @@
//! A client to use a relay server.
use std::borrow::Cow;
use std::io::{self, ErrorKind, Read};
use std::io::{self};
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::time::{Duration, Instant};
use log::{info, warn};
use log::warn;
use mio::net::TcpStream;
use mio::{Events, Interest, Poll, Token};
use rand::seq::{IteratorRandom, SliceRandom};
use tungstenite::client::{uri_mode, IntoClientRequest};
use tungstenite::handshake::client::Request;
use rand::seq::SliceRandom;
use tungstenite::handshake::MidHandshake;
use tungstenite::stream::MaybeTlsStream;
use tungstenite::{ClientHandshake, HandshakeError, Message, WebSocket};
/// The state of a [RelayConnection].
/// The state of a [Connection].
#[derive(Debug)]
pub enum ConnectionState {
/// The [RelayConnection] is not connected.
/// The [Connection] is not connected.
Disconnected,
/// The underlying [TcpStream] is connecting.
StreamConnecting(TcpStream, Instant),
Connecting(TcpStream, Instant),
/// The underlying [TcpStream] is connected.
StreamConnected(TcpStream),
Connected(TcpStream),
/// The websocket handshake is in progress.
Handshaking(MidHandshake<ClientHandshake<MaybeTlsStream<TcpStream>>>),
/// The [RelayConnection] is connected.
Connected(WebSocket<MaybeTlsStream<TcpStream>>),
/// The [Connection] is connected.
Active(WebSocket<MaybeTlsStream<TcpStream>>),
}
/// A connection to a relay server.
pub struct RelayConnection {
pub struct Connection {
/// The address list corresponding to the relay server.
address_list: Vec<SocketAddr>,
@ -45,25 +42,25 @@ pub struct RelayConnection {
/// The receiver part of the send channel.
///
/// This is used in [RelayConnection::update] to get messages that need to
/// This is used in [Connection::update] to get messages that need to
/// be sent to the relay server.
send_receiver: Receiver<Message>,
/// The sender part of the receive channel.
///
/// This is used in [RelayConnection::send] to store messages that need to
/// This is used in [Connection::send] to store messages that need to
/// be sent to the relay server.
send_sender: Sender<Message>,
/// The receiver part of the receive channel.
///
/// This is used in [RelayConnection::read] to get messages that have been
/// This is used in [Connection::read] to get messages that have been
/// received from the relay server.
receive_receiver: Receiver<(u32, Vec<u8>)>,
/// The sender part of the send channel.
///
/// This is used in [RelayConnection::update] to store messages that have
/// This is used in [Connection::update] to store messages that have
/// been received from the relay server.
receive_sender: Sender<(u32, Vec<u8>)>,
@ -71,8 +68,8 @@ pub struct RelayConnection {
pub state: ConnectionState,
}
impl RelayConnection {
/// Create a new [RelayConnection].
impl Connection {
/// Create a new [Connection].
pub fn new<'a>(domain: impl Into<Cow<'a, str>>) -> io::Result<Self> {
let domain = domain.into();
let (send_sender, send_receiver) = channel();
@ -110,7 +107,7 @@ impl RelayConnection {
// Create the new TCP stream.
match TcpStream::connect(address.to_owned()) {
Ok(stream) => ConnectionState::StreamConnecting(stream, Instant::now()),
Ok(stream) => ConnectionState::Connecting(stream, Instant::now()),
Err(e) => {
warn!("failed to start connection to the relay server: {e}");
ConnectionState::Disconnected
@ -118,8 +115,8 @@ impl RelayConnection {
}
}
/// Check if the [TcpStream] of the [RelayConnection] is connected.
fn check_connection(stream: TcpStream, start_time: Instant) -> ConnectionState {
/// Check if the [TcpStream] of the [Connection] is connected.
fn check_connection(&mut self, stream: TcpStream, start: Instant) -> ConnectionState {
// Check for connection errors.
if let Err(e) = stream.take_error() {
warn!("failed to connect to the relay server: {e}");
@ -138,7 +135,7 @@ impl RelayConnection {
};
// Check if the connection has timed out.
let elapsed = start_time.elapsed();
let elapsed = start.elapsed();
if elapsed > Duration::from_secs(5) {
warn!("connection to the relay server timed out");
return ConnectionState::Disconnected;
@ -146,15 +143,15 @@ impl RelayConnection {
// Update the connection state if connected.
match connected {
true => ConnectionState::StreamConnected(stream),
false => ConnectionState::StreamConnecting(stream, start_time),
true => ConnectionState::Connected(stream),
false => ConnectionState::Connecting(stream, start),
}
}
/// Start the websocket handshake.
fn start_handshake(&mut self, stream: TcpStream) -> ConnectionState {
match tungstenite::client_tls(format!("wss://{}", self.domain), stream) {
Ok((socket, _)) => ConnectionState::Connected(socket),
Ok((socket, _)) => ConnectionState::Active(socket),
Err(HandshakeError::Interrupted(handshake)) => ConnectionState::Handshaking(handshake),
Err(HandshakeError::Failure(e)) => {
warn!("handshake failed with the relay server: {e}");
@ -165,10 +162,11 @@ impl RelayConnection {
/// Continue the websocket handshake.
fn continue_handshake(
&mut self,
handshake: MidHandshake<ClientHandshake<MaybeTlsStream<TcpStream>>>,
) -> ConnectionState {
match handshake.handshake() {
Ok((socket, _)) => ConnectionState::Connected(socket),
Ok((socket, _)) => ConnectionState::Active(socket),
Err(HandshakeError::Interrupted(handshake)) => ConnectionState::Handshaking(handshake),
Err(HandshakeError::Failure(e)) => {
warn!("handshake failed with the relay server: {e}");
@ -177,7 +175,7 @@ impl RelayConnection {
}
}
/// Update the [RelayConnection] by receiving and sending messages.
/// Update the [Connection] by receiving and sending messages.
fn update_connection(
&mut self,
mut socket: WebSocket<MaybeTlsStream<TcpStream>>,
@ -236,10 +234,10 @@ impl RelayConnection {
}
// Keep the connection connected.
ConnectionState::Connected(socket)
ConnectionState::Active(socket)
}
/// Update the [RelayConnection].
/// Update the [Connection].
///
/// This function will connect to the relay server if it's not already
/// connected, and will send and receive messages from the relay server
@ -247,12 +245,10 @@ impl RelayConnection {
pub fn update(&mut self) {
self.state = match std::mem::replace(&mut self.state, ConnectionState::Disconnected) {
ConnectionState::Disconnected => self.create_stream(),
ConnectionState::StreamConnecting(stream, start_time) => {
Self::check_connection(stream, start_time)
}
ConnectionState::StreamConnected(stream) => self.start_handshake(stream),
ConnectionState::Handshaking(handshake) => Self::continue_handshake(handshake),
ConnectionState::Connected(socket) => self.update_connection(socket),
ConnectionState::Connecting(stream, start) => self.check_connection(stream, start),
ConnectionState::Connected(stream) => self.start_handshake(stream),
ConnectionState::Handshaking(handshake) => self.continue_handshake(handshake),
ConnectionState::Active(socket) => self.update_connection(socket),
}
}
}

View file

@ -1,33 +0,0 @@
//! TODO
use std::io::{stdout, Write};
use std::thread;
use std::time::Duration;
use relay_client::RelayConnection;
fn main() {
pretty_env_logger::init();
let mut connection = RelayConnection::new("relay.cocosol.fr").unwrap();
loop {
connection.update();
print!(
"\rState: {}",
match connection.state {
relay_client::ConnectionState::Disconnected => "Disconnected".to_string(),
relay_client::ConnectionState::StreamConnecting(_, instant) =>
format!("StreamConnecting {:?}", instant.elapsed()),
relay_client::ConnectionState::StreamConnected(_) => "StreamConnected".to_string(),
relay_client::ConnectionState::Handshaking(_) => "Handshaking".to_string(),
relay_client::ConnectionState::Connected(_) => "Connected".to_string(),
}
);
stdout().flush().unwrap();
thread::sleep(Duration::from_millis(10));
if let Some((sender_id, data)) = connection.read() {
println!("\nReceived message from {sender_id}: {:?}", data);
}
}
}