From 95fc4368ae0fcc3f0aa9262e3f3bfde8dee49059 Mon Sep 17 00:00:00 2001 From: Tipragot Date: Tue, 13 Feb 2024 13:57:21 +0100 Subject: [PATCH] Make the relay connection Send and Sync --- crates/relay-client/src/lib.rs | 69 ++++++++++++---------------------- 1 file changed, 24 insertions(+), 45 deletions(-) diff --git a/crates/relay-client/src/lib.rs b/crates/relay-client/src/lib.rs index d920d31..b052784 100644 --- a/crates/relay-client/src/lib.rs +++ b/crates/relay-client/src/lib.rs @@ -1,11 +1,12 @@ //! A library containing a client to use a relay server. use std::borrow::Cow; +use std::collections::LinkedList; use std::fs; use std::io::{self}; use std::net::{SocketAddr, ToSocketAddrs}; use std::path::PathBuf; -use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::Mutex; use std::time::{Duration, Instant}; use log::warn; @@ -58,29 +59,8 @@ pub struct Connection { /// The secret key used to authenticate with the relay server. secret: Option, - /// The receiver part of the send channel. - /// - /// This is used in [Connection::update] to get messages that need to - /// be sent to the relay server. - send_receiver: Receiver, - - /// The sender part of the receive channel. - /// - /// This is used in [Connection::send] to store messages that need to - /// be sent to the relay server. - send_sender: Sender, - - /// The receiver part of the receive channel. - /// - /// This is used in [Connection::read] to get messages that have been - /// received from the relay server. - receive_receiver: Receiver<(Uuid, Vec)>, - - /// The sender part of the send channel. - /// - /// This is used in [Connection::update] to store messages that have - /// been received from the relay server. - receive_sender: Sender<(Uuid, Vec)>, + /// A list of messages that needs to be sent. + to_send: Mutex>, /// The state of the connection. state: ConnectionState, @@ -118,10 +98,6 @@ impl Connection { } }; - // Create the communication channels. - let (send_sender, send_receiver) = channel(); - let (receive_sender, receive_receiver) = channel(); - // Create the connection and return it. Ok(Self { address_list: (domain.as_ref(), 443).to_socket_addrs()?.collect(), @@ -129,10 +105,7 @@ impl Connection { data_path, identifier, secret, - send_receiver, - send_sender, - receive_receiver, - receive_sender, + to_send: Mutex::new(LinkedList::new()), state: ConnectionState::Disconnected, }) } @@ -146,12 +119,9 @@ impl Connection { pub fn send<'a>(&self, target_id: Uuid, message: impl Into>) { let mut data = message.into().into_owned(); data.extend_from_slice(target_id.as_bytes()); - self.send_sender.send(Message::Binary(data)).ok(); - } - - /// Receive a message from the relay connection. - pub fn read(&self) -> Option<(Uuid, Vec)> { - self.receive_receiver.try_recv().ok() + if let Ok(mut to_send) = self.to_send.lock() { + to_send.push_back(Message::binary(data)); + } } /// Create a new [TcpStream] to the relay server. @@ -307,9 +277,16 @@ impl Connection { fn update_connection( &mut self, mut socket: WebSocket>, + messages: &mut LinkedList<(Uuid, Vec)>, ) -> ConnectionState { + // Unlock the sending list. + let Ok(mut to_send) = self.to_send.lock() else { + warn!("sending list closed"); + return ConnectionState::Disconnected; + }; + // Send messages from the send channel to the socket. - while let Ok(message) = self.send_receiver.try_recv() { + while let Some(message) = to_send.pop_front() { match socket.send(message) { Ok(()) => (), Err(tungstenite::Error::Io(ref e)) @@ -341,8 +318,8 @@ impl Connection { let sender_id = Uuid::from_slice(&data[id_start..]).expect("invalid sender id"); data.truncate(id_start); - // Send the message to the receive channel. - self.receive_sender.send((sender_id, data)).ok(); + // Add the message to the message list. + messages.push_back((sender_id, data)); } Err(tungstenite::Error::Io(ref e)) if e.kind() == std::io::ErrorKind::WouldBlock @@ -361,14 +338,15 @@ impl Connection { ConnectionState::Active(socket) } - /// Update the [Connection]. + /// Update the [Connection] and return the received messages. /// /// This function will connect to the relay server if it's not already /// connected, and will send and receive messages from the relay server /// if it's connected. /// /// This function will not block the current thread. - pub fn update(&mut self) { + pub fn update(&mut self) -> LinkedList<(Uuid, Vec)> { + let mut messages = LinkedList::new(); self.state = match std::mem::replace(&mut self.state, ConnectionState::Disconnected) { ConnectionState::Disconnected => self.create_stream(), ConnectionState::Connecting(stream, start) => self.check_connection(stream, start), @@ -376,7 +354,8 @@ impl Connection { ConnectionState::Handshaking(handshake) => self.continue_handshake(handshake), ConnectionState::Handshaked(socket) => self.start_authentication(socket), ConnectionState::Registering(socket) => self.get_registration_response(socket), - ConnectionState::Active(socket) => self.update_connection(socket), - } + ConnectionState::Active(socket) => self.update_connection(socket, &mut messages), + }; + messages } }