generated from tipragot/rust
Non blocking relay connection #44
|
@ -1,41 +1,47 @@
|
|||
//! A client to use a relay server.
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::io::ErrorKind;
|
||||
use std::net::ToSocketAddrs;
|
||||
use std::io::{self, ErrorKind, Read};
|
||||
use std::net::{SocketAddr, ToSocketAddrs};
|
||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use log::warn;
|
||||
use log::{info, warn};
|
||||
use mio::net::TcpStream;
|
||||
use rand::seq::IteratorRandom;
|
||||
use mio::{Events, Interest, Poll, Token};
|
||||
use rand::seq::{IteratorRandom, SliceRandom};
|
||||
use tungstenite::client::{uri_mode, IntoClientRequest};
|
||||
use tungstenite::handshake::client::Request;
|
||||
use tungstenite::handshake::MidHandshake;
|
||||
use tungstenite::stream::MaybeTlsStream;
|
||||
use tungstenite::{ClientHandshake, HandshakeError, Message, WebSocket};
|
||||
|
||||
/// The state of a [RelayConnection].
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum RelayConnectionState {
|
||||
#[derive(Debug)]
|
||||
pub enum ConnectionState {
|
||||
/// The [RelayConnection] is not connected.
|
||||
Disconnected,
|
||||
|
||||
/// The underlying [TcpStream] is connecting.
|
||||
Connecting,
|
||||
StreamConnecting(TcpStream, Instant),
|
||||
|
||||
/// The [RelayConnection] is making a tls handshake with the relay server.
|
||||
TlsHandshaking,
|
||||
/// The underlying [TcpStream] is connected.
|
||||
StreamConnected(TcpStream),
|
||||
|
||||
/// The [RelayConnection] is making a websocket handshake with the relay
|
||||
/// server.
|
||||
WebsocketHandshaking,
|
||||
/// The websocket handshake is in progress.
|
||||
Handshaking(MidHandshake<ClientHandshake<MaybeTlsStream<TcpStream>>>),
|
||||
|
||||
/// The [RelayConnection] is connected.
|
||||
Connected,
|
||||
Connected(WebSocket<MaybeTlsStream<TcpStream>>),
|
||||
}
|
||||
|
||||
/// A connection to a relay server.
|
||||
pub struct RelayConnection {
|
||||
/// The address of the relay server.
|
||||
server_address: String,
|
||||
/// The address list corresponding to the relay server.
|
||||
address_list: Vec<SocketAddr>,
|
||||
|
||||
/// The domain of the relay server.
|
||||
domain: String,
|
||||
|
||||
/// The receiver part of the send channel.
|
||||
///
|
||||
|
@ -61,46 +67,25 @@ pub struct RelayConnection {
|
|||
/// been received from the relay server.
|
||||
receive_sender: Sender<(u32, Vec<u8>)>,
|
||||
|
||||
/// If the [TcpStream] is not currently connected it will be stored here.
|
||||
stream: Option<TcpStream>,
|
||||
|
||||
/// If the websocket handshake is not complete it will be stored here.
|
||||
handshake: Option<MidHandshake<ClientHandshake<MaybeTlsStream<TcpStream>>>>,
|
||||
|
||||
/// When the websocket is correctly connected it will be stored here.
|
||||
socket: Option<WebSocket<MaybeTlsStream<TcpStream>>>,
|
||||
/// The state of the connection.
|
||||
pub state: ConnectionState,
|
||||
}
|
||||
|
||||
impl RelayConnection {
|
||||
/// Create a new [RelayConnection].
|
||||
pub fn new(server_address: String) -> Self {
|
||||
pub fn new<'a>(domain: impl Into<Cow<'a, str>>) -> io::Result<Self> {
|
||||
let domain = domain.into();
|
||||
let (send_sender, send_receiver) = channel();
|
||||
let (receive_sender, receive_receiver) = channel();
|
||||
Self {
|
||||
server_address,
|
||||
Ok(Self {
|
||||
address_list: (domain.as_ref(), 443).to_socket_addrs()?.collect(),
|
||||
domain: domain.into_owned(),
|
||||
send_receiver,
|
||||
send_sender,
|
||||
receive_receiver,
|
||||
receive_sender,
|
||||
stream: None,
|
||||
handshake: None,
|
||||
socket: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the state of the [RelayConnection].
|
||||
pub fn state(&self) -> RelayConnectionState {
|
||||
match (
|
||||
self.stream.is_some(),
|
||||
self.handshake.is_some(),
|
||||
self.socket.is_some(),
|
||||
) {
|
||||
(false, false, false) => RelayConnectionState::Disconnected,
|
||||
(true, false, false) => RelayConnectionState::Connecting,
|
||||
(false, true, false) => RelayConnectionState::WebsocketHandshaking,
|
||||
(false, false, true) => RelayConnectionState::Connected,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
state: ConnectionState::Disconnected,
|
||||
})
|
||||
}
|
||||
|
||||
/// Send a message to the target client.
|
||||
|
@ -115,84 +100,88 @@ impl RelayConnection {
|
|||
self.receive_receiver.try_recv().ok()
|
||||
}
|
||||
|
||||
/// Update the [RelayConnection].
|
||||
///
|
||||
/// This function will connect to the relay server if it's not already
|
||||
/// connected, and will send and receive messages from the relay server
|
||||
/// if it's connected.
|
||||
pub fn update(&mut self) {
|
||||
match (
|
||||
self.stream.take(),
|
||||
self.handshake.take(),
|
||||
self.socket.as_mut(),
|
||||
) {
|
||||
(None, None, None) => {
|
||||
// Resolve the relay address list.
|
||||
let mut address = self.server_address.clone();
|
||||
address.push_str(":443");
|
||||
let address_list = match address.to_socket_addrs() {
|
||||
Ok(address_list) => address_list,
|
||||
Err(e) => {
|
||||
warn!("failed to resolve relay address: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
/// Create a new [TcpStream] to the relay server.
|
||||
fn create_stream(&mut self) -> ConnectionState {
|
||||
// Take a random relay address.
|
||||
let Some(address) = address_list.choose(&mut rand::thread_rng()) else {
|
||||
let Some(address) = self.address_list.choose(&mut rand::thread_rng()) else {
|
||||
warn!("no relay address available");
|
||||
return;
|
||||
return ConnectionState::Disconnected;
|
||||
};
|
||||
|
||||
// Start the connection to the relay.
|
||||
match TcpStream::connect(address) {
|
||||
Ok(stream) => self.stream = Some(stream),
|
||||
Err(e) => warn!("failed to start connection to the relay server: {e}"),
|
||||
// Create the new TCP stream.
|
||||
match TcpStream::connect(address.to_owned()) {
|
||||
Ok(stream) => ConnectionState::StreamConnecting(stream, Instant::now()),
|
||||
Err(e) => {
|
||||
warn!("failed to start connection to the relay server: {e}");
|
||||
ConnectionState::Disconnected
|
||||
}
|
||||
}
|
||||
(Some(stream), None, None) => {
|
||||
// Check if there is an error while connecting.
|
||||
if let Ok(Some(e)) | Err(e) = stream.take_error() {
|
||||
warn!("failed to connect to relay: {e}");
|
||||
return;
|
||||
}
|
||||
|
||||
/// Check if the [TcpStream] of the [RelayConnection] is connected.
|
||||
fn check_connection(stream: TcpStream, start_time: Instant) -> ConnectionState {
|
||||
// Check for connection errors.
|
||||
if let Err(e) = stream.take_error() {
|
||||
warn!("failed to connect to the relay server: {e}");
|
||||
return ConnectionState::Disconnected;
|
||||
}
|
||||
|
||||
// Check if the stream is connected.
|
||||
match stream.peer_addr() {
|
||||
Ok(_) => {
|
||||
// Start the websocket handshake.
|
||||
match tungstenite::client_tls(
|
||||
format!("wss://{}", self.server_address),
|
||||
stream,
|
||||
) {
|
||||
Ok((socket, _)) => self.socket = Some(socket),
|
||||
Err(HandshakeError::Interrupted(handshake)) => {
|
||||
self.handshake = Some(handshake);
|
||||
let connected = match stream.peek(&mut [0]) {
|
||||
Ok(_) => true,
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true,
|
||||
Err(ref e) if e.kind() == io::ErrorKind::NotConnected => false,
|
||||
Err(e) => {
|
||||
warn!("failed to connect to the relay server: {e}");
|
||||
return ConnectionState::Disconnected;
|
||||
}
|
||||
};
|
||||
|
||||
// Check if the connection has timed out.
|
||||
let elapsed = start_time.elapsed();
|
||||
if elapsed > Duration::from_secs(5) {
|
||||
warn!("connection to the relay server timed out");
|
||||
return ConnectionState::Disconnected;
|
||||
}
|
||||
|
||||
// Update the connection state if connected.
|
||||
match connected {
|
||||
true => ConnectionState::StreamConnected(stream),
|
||||
false => ConnectionState::StreamConnecting(stream, start_time),
|
||||
}
|
||||
}
|
||||
|
||||
/// Start the websocket handshake.
|
||||
fn start_handshake(&mut self, stream: TcpStream) -> ConnectionState {
|
||||
match tungstenite::client_tls(format!("wss://{}", self.domain), stream) {
|
||||
Ok((socket, _)) => ConnectionState::Connected(socket),
|
||||
Err(HandshakeError::Interrupted(handshake)) => ConnectionState::Handshaking(handshake),
|
||||
Err(HandshakeError::Failure(e)) => {
|
||||
warn!("relay handshake failed: {e}")
|
||||
warn!("handshake failed with the relay server: {e}");
|
||||
ConnectionState::Disconnected
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(ref e) if e.kind() == ErrorKind::NotConnected => {
|
||||
self.stream = Some(stream);
|
||||
}
|
||||
Err(e) => warn!("failed to connect to relay: {e}"),
|
||||
}
|
||||
}
|
||||
(None, Some(handshake), None) => {
|
||||
// Check if the handshake is complete.
|
||||
|
||||
/// Continue the websocket handshake.
|
||||
fn continue_handshake(
|
||||
handshake: MidHandshake<ClientHandshake<MaybeTlsStream<TcpStream>>>,
|
||||
) -> ConnectionState {
|
||||
match handshake.handshake() {
|
||||
Ok((socket, _)) => self.socket = Some(socket),
|
||||
Err(HandshakeError::Interrupted(unfinished_handshake)) => {
|
||||
self.handshake = Some(unfinished_handshake);
|
||||
}
|
||||
Ok((socket, _)) => ConnectionState::Connected(socket),
|
||||
Err(HandshakeError::Interrupted(handshake)) => ConnectionState::Handshaking(handshake),
|
||||
Err(HandshakeError::Failure(e)) => {
|
||||
warn!("relay websocket handshake failed: {e}")
|
||||
warn!("handshake failed with the relay server: {e}");
|
||||
ConnectionState::Disconnected
|
||||
}
|
||||
}
|
||||
}
|
||||
(None, None, Some(socket)) => {
|
||||
|
||||
/// Update the [RelayConnection] by receiving and sending messages.
|
||||
fn update_connection(
|
||||
&mut self,
|
||||
mut socket: WebSocket<MaybeTlsStream<TcpStream>>,
|
||||
) -> ConnectionState {
|
||||
// Send messages from the send channel to the socket.
|
||||
while let Ok(message) = self.send_receiver.try_recv() {
|
||||
match socket.send(message) {
|
||||
|
@ -204,9 +193,8 @@ impl RelayConnection {
|
|||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("relay connection closed with error: {e}");
|
||||
self.socket = None;
|
||||
return;
|
||||
warn!("relay connection closed: {e}");
|
||||
return ConnectionState::Disconnected;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -241,140 +229,30 @@ impl RelayConnection {
|
|||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("relay connection closed with error: {e}");
|
||||
self.socket = None;
|
||||
return;
|
||||
warn!("relay connection closed: {e}");
|
||||
return ConnectionState::Disconnected;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
|
||||
// Keep the connection connected.
|
||||
ConnectionState::Connected(socket)
|
||||
}
|
||||
|
||||
// pub fn update(&mut self) {
|
||||
// // If there's an unconnected stream, wait for it to be connected.
|
||||
// if let Some(unconnected_stream) = self.unconnected_stream.take() {
|
||||
// match unconnected_stream.peer_addr() {
|
||||
// Ok(_) => {
|
||||
// // Start the handshake.
|
||||
// match tungstenite::client::client("wss://relay.cocosol.fr",
|
||||
// unconnected_stream) {
|
||||
// Ok((socket, _)) => self.socket = Some(socket),
|
||||
// Err(HandshakeError::Interrupted(unfinished_handshake)) =>
|
||||
// { self.unfinished_handshake =
|
||||
// Some(unfinished_handshake); return;
|
||||
// }
|
||||
// Err(HandshakeError::Failure(e)) => {
|
||||
// warn!("relay handshake failed: {}", e);
|
||||
// return;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// Err(ref e) if e.kind() == std::io::ErrorKind::NotConnected => {
|
||||
// self.unconnected_stream = Some(unconnected_stream);
|
||||
// }
|
||||
// Err(e) => warn!("failed to get peer address: {}", e),
|
||||
// }
|
||||
// }
|
||||
|
||||
// // If there's an unfinished handshake, try to finish it.
|
||||
// if let Some(unfinished_handshake) = self.unfinished_handshake.take() {
|
||||
// match unfinished_handshake.handshake() {
|
||||
// Ok((socket, _)) => self.socket = Some(socket),
|
||||
// Err(HandshakeError::Interrupted(unfinished_handshake)) => {
|
||||
// self.unfinished_handshake = Some(unfinished_handshake)
|
||||
// }
|
||||
// Err(HandshakeError::Failure(e)) => warn!("relay handshake failed:
|
||||
// {}", e), }
|
||||
// }
|
||||
|
||||
// // If there's no socket yet, try to connect.
|
||||
// let socket = match self.socket {
|
||||
// Some(ref mut socket) => socket,
|
||||
// None => {
|
||||
// // Resolve the relay address list.
|
||||
// let address_list = match "relay.cocosol.fr:443".to_socket_addrs()
|
||||
// { Ok(address_list) => address_list,
|
||||
// Err(e) => {
|
||||
// warn!("failed to resolve relay address: {}", e);
|
||||
// return;
|
||||
// }
|
||||
// };
|
||||
|
||||
// // Take a random relay address.
|
||||
// let Some(address) = address_list.choose(&mut rand::thread_rng())
|
||||
// else { warn!("no relay address available");
|
||||
// return;
|
||||
// };
|
||||
|
||||
// // Create a [TcpStream] connected to the relay.
|
||||
// self.unconnected_stream = match TcpStream::connect(address) {
|
||||
// Ok(stream) => Some(stream),
|
||||
// Err(e) => {
|
||||
// warn!("failed to connect to relay: {}", e);
|
||||
// return;
|
||||
// }
|
||||
// };
|
||||
|
||||
// // Return because the socket is not connected yet.
|
||||
// return;
|
||||
// }
|
||||
// };
|
||||
|
||||
// // Send messages from the send channel to the socket.
|
||||
// while let Ok(message) = self.send_receiver.try_recv() {
|
||||
// match socket.send(message) {
|
||||
// Ok(()) => (),
|
||||
// Err(tungstenite::Error::Io(ref e))
|
||||
// if e.kind() == std::io::ErrorKind::WouldBlock
|
||||
// || e.kind() == std::io::ErrorKind::Interrupted =>
|
||||
// {
|
||||
// break;
|
||||
// }
|
||||
// Err(e) => {
|
||||
// warn!("relay connection closed with error: {}", e);
|
||||
// self.socket = None;
|
||||
// return;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// // Receive messages from the socket and send them to the receive channel.
|
||||
// loop {
|
||||
// match socket.read() {
|
||||
// Ok(message) => {
|
||||
// // Check the message length.
|
||||
// let mut data = message.into_data();
|
||||
// if data.len() < 4 {
|
||||
// warn!("received malformed message: {}", data.len());
|
||||
// continue;
|
||||
// }
|
||||
|
||||
// // Extract the sender ID.
|
||||
// let id_start = data.len() - 4;
|
||||
// let sender_id = u32::from_be_bytes(
|
||||
// data[id_start..]
|
||||
// .try_into()
|
||||
// .unwrap_or_else(|_| unreachable!()),
|
||||
// );
|
||||
// data.truncate(id_start);
|
||||
|
||||
// // Send the message to the receive channel.
|
||||
// self.receive_sender.send((sender_id, data)).ok();
|
||||
// }
|
||||
// Err(tungstenite::Error::Io(ref e))
|
||||
// if e.kind() == std::io::ErrorKind::WouldBlock
|
||||
// || e.kind() == std::io::ErrorKind::Interrupted =>
|
||||
// {
|
||||
// break;
|
||||
// }
|
||||
// Err(e) => {
|
||||
// warn!("relay connection closed with error: {}", e);
|
||||
// self.socket = None;
|
||||
// return;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
/// Update the [RelayConnection].
|
||||
///
|
||||
/// This function will connect to the relay server if it's not already
|
||||
/// connected, and will send and receive messages from the relay server
|
||||
/// if it's connected.
|
||||
pub fn update(&mut self) {
|
||||
self.state = match std::mem::replace(&mut self.state, ConnectionState::Disconnected) {
|
||||
ConnectionState::Disconnected => self.create_stream(),
|
||||
ConnectionState::StreamConnecting(stream, start_time) => {
|
||||
Self::check_connection(stream, start_time)
|
||||
}
|
||||
ConnectionState::StreamConnected(stream) => self.start_handshake(stream),
|
||||
ConnectionState::Handshaking(handshake) => Self::continue_handshake(handshake),
|
||||
ConnectionState::Connected(socket) => self.update_connection(socket),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
//! TODO
|
||||
|
||||
use std::net::TcpStream;
|
||||
use std::io::{stdout, Write};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
|
@ -9,12 +9,25 @@ use relay_client::RelayConnection;
|
|||
fn main() {
|
||||
pretty_env_logger::init();
|
||||
|
||||
let mut connection = RelayConnection::new("relay.cocosol.fr".to_string());
|
||||
let mut connection = RelayConnection::new("relay.cocosol.fr").unwrap();
|
||||
|
||||
loop {
|
||||
connection.update();
|
||||
print!(
|
||||
"\rState: {}",
|
||||
match connection.state {
|
||||
relay_client::ConnectionState::Disconnected => "Disconnected".to_string(),
|
||||
relay_client::ConnectionState::StreamConnecting(_, instant) =>
|
||||
format!("StreamConnecting {:?}", instant.elapsed()),
|
||||
relay_client::ConnectionState::StreamConnected(_) => "StreamConnected".to_string(),
|
||||
relay_client::ConnectionState::Handshaking(_) => "Handshaking".to_string(),
|
||||
relay_client::ConnectionState::Connected(_) => "Connected".to_string(),
|
||||
}
|
||||
);
|
||||
stdout().flush().unwrap();
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
if let Some((sender_id, data)) = connection.read() {
|
||||
println!("Received message from {sender_id}: {:?}", data);
|
||||
println!("\nReceived message from {sender_id}: {:?}", data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue