Simple non-blocking tcp connection abstraction (#13)
Some checks failed
Rust Checks / checks (push) Failing after 10s

Closes #15

Reviewed-on: corentin/border-wars#13
Reviewed-by: Raphaël <r.lauray@outlook.fr>
This commit is contained in:
Tipragot 2024-02-07 08:26:55 +00:00
parent b46ccc8faa
commit 7ce491aee1
2 changed files with 280 additions and 0 deletions

12
crates/bevnet/Cargo.toml Normal file
View file

@ -0,0 +1,12 @@
[package]
name = "bevnet"
version = "0.2.0"
edition = "2021"
license = "MIT OR Apache-2.0"
description = "A library for networking in Bevy."
authors = ["Tipragot <contact@tipragot.fr>"]
keywords = ["bevy", "network", "game"]
categories = ["network-programming", "game-development"]
[lints]
workspace = true

268
crates/bevnet/src/lib.rs Normal file
View file

@ -0,0 +1,268 @@
//! A networking library for Bevy.
use std::collections::LinkedList;
use std::io::{self, Read, Write};
use std::net::{TcpListener, TcpStream, ToSocketAddrs};
/// A non-blocking tcp connection.
///
/// # Example
///
/// ```rust
/// use std::io;
///
/// use bevnet::{Connection, Listener};
///
/// # fn main() -> io::Result<()> {
/// let listener = Listener::bind("127.0.0.1:23732")?;
/// let mut connection = Connection::connect("127.0.0.1:23732")?;
///
/// // The accept operation is not blocking. So we need to loop here.
/// let mut server_connection;
/// loop {
/// if let Some(new_connection) = listener.accept()? {
/// server_connection = new_connection;
/// break;
/// }
/// }
///
/// // We don't need to loop here because the send operation just appends to the send buffer.
/// connection.send(b"Hello, world!")?;
///
/// // To be sure the message has been sent, we need to update the connection.
/// while !connection.update()? {
/// // Wait until the connection is updated.
/// }
///
/// // The receive operation is not blocking. So we need to loop here.
/// loop {
/// if let Some(message) = server_connection.receive()? {
/// assert_eq!(message, b"Hello, world!");
/// break;
/// }
/// }
/// # Ok(())
/// # }
/// ```
pub struct Connection {
/// The underlying [TcpStream] used for the connection.
stream: TcpStream,
/// Contains the buffers that are not yet being sent.
send_buffers: LinkedList<(usize, Vec<u8>)>,
/// The length of the next message to be received.
///
/// `None` if the message length is not yet received.
receive_message_len: Option<u16>,
/// The length of the received byte block.
///
/// Used by [Connection::receive_partial] to determine if the block is
/// complete.
receive_filled_len: usize,
/// The buffer used to receive a byte block.
receive_buffer: Vec<u8>,
}
impl Connection {
/// Creates a new [Connection] from a [TcpStream].
fn new(stream: TcpStream) -> io::Result<Self> {
stream.set_nonblocking(true)?;
Ok(Self {
stream,
send_buffers: LinkedList::new(),
receive_message_len: None,
receive_filled_len: 0,
receive_buffer: Vec::new(),
})
}
/// Creates a new [Connection] that connects to the given address.
///
/// This function is blocking.
pub fn connect(address: impl ToSocketAddrs) -> io::Result<Self> {
Self::new(TcpStream::connect(address)?)
}
/// Sends a message over the connection.
///
/// Returns `true` if the message has been sent directly and `false`
/// if the message is still in the send queue.
///
/// This function is not blocking.
pub fn send(&mut self, message: &[u8]) -> io::Result<bool> {
// Get the length of the message as a u16.
let message_len: u16 = match message.len().try_into() {
Ok(len) => len,
Err(_) => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("message length is too large: {}", message.len()),
));
}
};
// Add a new buffer to the send queue.
let mut buffer = Vec::with_capacity(message_len as usize + 2);
buffer.extend(message_len.to_ne_bytes());
buffer.extend(message);
self.send_buffers.push_back((0, buffer));
// Update the connection.
self.update()
}
/// Updates the connection.
///
/// This function sends any pending messages that have not been sent yet.
/// It returns `true` if there is no remaining data to send after updating
/// the connection and `false` otherwise.
///
/// This function is not blocking.
pub fn update(&mut self) -> io::Result<bool> {
// Looping over the send buffers.
while let Some((offset, buffer)) = self.send_buffers.front_mut() {
// Writing the buffer to the stream.
match self.stream.write(&buffer[*offset..]) {
Ok(n) => *offset += n,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => break,
Err(e) => return Err(e),
}
// Removing the buffer if it is fully sent.
if *offset >= buffer.len() {
self.send_buffers.pop_front();
}
}
// Returning success.
Ok(self.send_buffers.is_empty())
}
/// Receives a byte block from the connection.
///
/// This function fills the receive buffer and returns `true` if the
/// buffer is successfully filled with `len` bytes and `false` if the
/// buffer is not filled yet.
///
/// This function mustn't be called for different byte block sequentially
/// because the function can only process one byte block at a time.
///
/// This function is not blocking.
fn receive_partial(&mut self, len: u16) -> io::Result<bool> {
let len = len as usize;
// Resizing the buffer if it is not large enough.
if self.receive_buffer.len() < len {
self.receive_buffer.resize(len, 0);
}
// Checking if the buffer is already filled.
if self.receive_filled_len >= len {
self.receive_filled_len = 0;
return Ok(true);
}
// Reading from the stream.
let start_index = self.receive_filled_len;
let receive_buffer = &mut self.receive_buffer[start_index..start_index + len];
let received_len = self.stream.read(receive_buffer);
self.receive_filled_len += match received_len {
Ok(n) => n,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(false),
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => return Ok(false),
Err(e) => return Err(e),
};
// Checking if the buffer is filled.
if self.receive_filled_len >= len {
self.receive_filled_len = 0;
return Ok(true);
}
Ok(false)
}
/// Receives a message from the connection.
///
/// If no message is available, returns `None`.
///
/// This function is not blocking.
pub fn receive(&mut self) -> io::Result<Option<&[u8]>> {
// Receiving the message length.
let message_len = match self.receive_message_len {
Some(message_len) => message_len,
None => {
// If the message length is not received yet, return `None`.
if !self.receive_partial(2)? {
return Ok(None);
}
// Setting the message length.
let message_len =
u16::from_ne_bytes(self.receive_buffer[..2].try_into().map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "invalid message length")
})?);
self.receive_message_len = Some(message_len);
// Returning the message length.
message_len
}
};
// Receiving the message.
if !self.receive_partial(message_len)? {
return Ok(None);
}
// Returning the message.
Ok(Some(&self.receive_buffer[..message_len as usize]))
}
}
/// A non-blocking tcp listener.
///
/// ```rust
/// use std::io;
///
/// use bevnet::{Connection, Listener};
///
/// # fn main() -> io::Result<()> {
/// let listener = Listener::bind("127.0.0.1:23732")?;
/// let mut connection = Connection::connect("127.0.0.1:23732")?;
///
/// // The accept operation is not blocking. So we need to loop here.
/// let mut server_connection;
/// loop {
/// if let Some(new_connection) = listener.accept()? {
/// server_connection = new_connection;
/// break;
/// }
/// }
/// # Ok(())
/// # }
/// ```
pub struct Listener(TcpListener);
impl Listener {
/// Creates a new listener.
pub fn bind(addr: impl ToSocketAddrs) -> io::Result<Listener> {
let listener = TcpListener::bind(addr)?;
listener.set_nonblocking(true)?;
Ok(Listener(listener))
}
/// Accepts a new [Connection].
///
/// This function is not blocking.
pub fn accept(&self) -> io::Result<Option<Connection>> {
match self.0.accept() {
Ok((stream, _)) => Connection::new(stream).map(Some),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(None),
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => Ok(None),
Err(e) => Err(e),
}
}
}