From 672e8af818fc51687161cec18715f2d670913ab9 Mon Sep 17 00:00:00 2001 From: Tipragot Date: Tue, 6 Feb 2024 23:16:23 +0100 Subject: [PATCH] Simple non-blocking tcp connection abstraction --- crates/bevnet/Cargo.toml | 12 +++ crates/bevnet/src/lib.rs | 179 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 191 insertions(+) create mode 100644 crates/bevnet/Cargo.toml create mode 100644 crates/bevnet/src/lib.rs diff --git a/crates/bevnet/Cargo.toml b/crates/bevnet/Cargo.toml new file mode 100644 index 0000000..b8ba9c8 --- /dev/null +++ b/crates/bevnet/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "bevnet" +version = "0.2.0" +edition = "2021" +license = "MIT OR Apache-2.0" +description = "A library for networking in Bevy." +authors = ["Tipragot "] +keywords = ["bevy", "network", "game"] +categories = ["network-programming", "game-development"] + +[lints] +workspace = true diff --git a/crates/bevnet/src/lib.rs b/crates/bevnet/src/lib.rs new file mode 100644 index 0000000..78b3ff3 --- /dev/null +++ b/crates/bevnet/src/lib.rs @@ -0,0 +1,179 @@ +//! A networking library for Bevy. + +use std::collections::LinkedList; +use std::io::{self, Read, Write}; +use std::net::{TcpStream, ToSocketAddrs}; + +/// A non-blocking tcp connection. +pub struct Connection { + /// The underlying [TcpStream] used for the connection. + stream: TcpStream, + + /// Contains the buffers that are not yet being sent. + send_buffers: LinkedList<(usize, Vec)>, + + /// The length of the next message to be received. + /// + /// `None` if the message length is not yet received. + receive_message_len: Option, + + /// The length of the received byte block. + /// + /// Used by [Connection::receive_partial] to determine if the block is + /// complete. + receive_filled_len: usize, + + /// The buffer used to receive a byte block. + receive_buffer: Vec, +} + +impl Connection { + /// Creates a new [Connection] from a [TcpStream]. + fn new(stream: TcpStream) -> io::Result { + stream.set_nonblocking(true)?; + Ok(Self { + stream, + send_buffers: LinkedList::new(), + receive_message_len: None, + receive_filled_len: 0, + receive_buffer: Vec::new(), + }) + } + + /// Creates a new [Connection] that connects to the given address. + /// + /// This function is blocking. + pub fn connect(address: impl ToSocketAddrs) -> io::Result { + Self::new(TcpStream::connect(address)?) + } + + /// Sends a message over the connection. + /// + /// This function is not blocking. + pub fn send(&mut self, message: &[u8]) -> io::Result<()> { + // Get the length of the message as a u16. + let message_len: u16 = match message.len().try_into() { + Ok(len) => len, + Err(_) => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("message length is too large: {}", message.len()), + )); + } + }; + + // Add a new buffer to the send queue. + let mut buffer = Vec::with_capacity(message_len as usize + 2); + buffer.extend(message_len.to_ne_bytes()); + buffer.extend(message); + self.send_buffers.push_back((0, buffer)); + + // Update the connection. + self.update() + } + + /// Updates the connection. + /// + /// This function sends any pending messages that have not been sent yet. + /// + /// This function is not blocking. + pub fn update(&mut self) -> io::Result<()> { + // Looping over the send buffers. + while let Some((offset, buffer)) = self.send_buffers.front_mut() { + // Writing the buffer to the stream. + match self.stream.write(&buffer[*offset..]) { + Ok(n) => *offset += n, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break, + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => break, + Err(e) => return Err(e), + } + + // Removing the buffer if it is fully sent. + if *offset >= buffer.len() { + self.send_buffers.pop_front(); + } + } + + // Returning success. + Ok(()) + } + + /// Receives a byte block from the connection. + /// + /// This function fills the receive buffer and returns `true` if the + /// buffer is successfully filled with `len` bytes and `false` if the + /// buffer is not filled yet. + /// + /// This function mustn't be called for different byte block sequentially + /// because the function can only process one byte block at a time. + /// + /// This function is not blocking. + fn receive_partial(&mut self, len: u16) -> io::Result { + let len = len as usize; + + // Resizing the buffer if it is not large enough. + if self.receive_buffer.len() < len { + self.receive_buffer.resize(len, 0); + } + + // Checking if the buffer is already filled. + if self.receive_filled_len >= len { + self.receive_filled_len = 0; + return Ok(true); + } + + // Reading from the stream. + let start_index = self.receive_filled_len; + let receive_buffer = &mut self.receive_buffer[start_index..start_index + len]; + let received_len = self.stream.read(receive_buffer); + self.receive_filled_len += match received_len { + Ok(n) => n, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(false), + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => return Ok(false), + Err(e) => return Err(e), + }; + + // Checking if the buffer is filled. + if self.receive_filled_len >= len { + self.receive_filled_len = 0; + return Ok(true); + } + Ok(false) + } + + /// Receives a message from the connection. + /// + /// If no message is available, returns `None`. + /// + /// This function is not blocking. + pub fn receive(&mut self) -> io::Result> { + // Receiving the message length. + let message_len = match self.receive_message_len { + Some(message_len) => message_len, + None => { + // If the message length is not received yet, return `None`. + if !self.receive_partial(2)? { + return Ok(None); + } + + // Setting the message length. + let message_len = + u16::from_ne_bytes(self.receive_buffer[..2].try_into().map_err(|_| { + io::Error::new(io::ErrorKind::InvalidData, "invalid message length") + })?); + self.receive_message_len = Some(message_len); + + // Returning the message length. + message_len + } + }; + + // Receiving the message. + if !self.receive_partial(message_len)? { + return Ok(None); + } + + // Returning the message. + Ok(Some(&self.receive_buffer[..message_len as usize])) + } +}