Simple non-blocking tcp connection abstraction
All checks were successful
Rust Checks / checks (push) Successful in 6s
Rust Checks / checks (pull_request) Successful in 6s

This commit is contained in:
Tipragot 2024-02-06 23:16:23 +01:00
parent 2cbf784840
commit 672e8af818
2 changed files with 191 additions and 0 deletions

12
crates/bevnet/Cargo.toml Normal file
View file

@ -0,0 +1,12 @@
[package]
name = "bevnet"
version = "0.2.0"
edition = "2021"
license = "MIT OR Apache-2.0"
description = "A library for networking in Bevy."
authors = ["Tipragot <contact@tipragot.fr>"]
keywords = ["bevy", "network", "game"]
categories = ["network-programming", "game-development"]
[lints]
workspace = true

179
crates/bevnet/src/lib.rs Normal file
View file

@ -0,0 +1,179 @@
//! A networking library for Bevy.
use std::collections::LinkedList;
use std::io::{self, Read, Write};
use std::net::{TcpStream, ToSocketAddrs};
/// A non-blocking tcp connection.
pub struct Connection {
/// The underlying [TcpStream] used for the connection.
stream: TcpStream,
/// Contains the buffers that are not yet being sent.
send_buffers: LinkedList<(usize, Vec<u8>)>,
/// The length of the next message to be received.
///
/// `None` if the message length is not yet received.
receive_message_len: Option<u16>,
/// The length of the received byte block.
///
/// Used by [Connection::receive_partial] to determine if the block is
/// complete.
receive_filled_len: usize,
/// The buffer used to receive a byte block.
receive_buffer: Vec<u8>,
}
impl Connection {
/// Creates a new [Connection] from a [TcpStream].
fn new(stream: TcpStream) -> io::Result<Self> {
stream.set_nonblocking(true)?;
Ok(Self {
stream,
send_buffers: LinkedList::new(),
receive_message_len: None,
receive_filled_len: 0,
receive_buffer: Vec::new(),
})
}
/// Creates a new [Connection] that connects to the given address.
///
/// This function is blocking.
pub fn connect(address: impl ToSocketAddrs) -> io::Result<Self> {
Self::new(TcpStream::connect(address)?)
}
/// Sends a message over the connection.
///
/// This function is not blocking.
pub fn send(&mut self, message: &[u8]) -> io::Result<()> {
// Get the length of the message as a u16.
let message_len: u16 = match message.len().try_into() {
Ok(len) => len,
Err(_) => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("message length is too large: {}", message.len()),
));
}
};
// Add a new buffer to the send queue.
let mut buffer = Vec::with_capacity(message_len as usize + 2);
buffer.extend(message_len.to_ne_bytes());
buffer.extend(message);
self.send_buffers.push_back((0, buffer));
// Update the connection.
self.update()
}
/// Updates the connection.
///
/// This function sends any pending messages that have not been sent yet.
///
/// This function is not blocking.
pub fn update(&mut self) -> io::Result<()> {
// Looping over the send buffers.
while let Some((offset, buffer)) = self.send_buffers.front_mut() {
// Writing the buffer to the stream.
match self.stream.write(&buffer[*offset..]) {
Ok(n) => *offset += n,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => break,
Err(e) => return Err(e),
}
// Removing the buffer if it is fully sent.
if *offset >= buffer.len() {
self.send_buffers.pop_front();
}
}
// Returning success.
Ok(())
}
/// Receives a byte block from the connection.
///
/// This function fills the receive buffer and returns `true` if the
/// buffer is successfully filled with `len` bytes and `false` if the
/// buffer is not filled yet.
///
/// This function mustn't be called for different byte block sequentially
/// because the function can only process one byte block at a time.
///
/// This function is not blocking.
fn receive_partial(&mut self, len: u16) -> io::Result<bool> {
let len = len as usize;
// Resizing the buffer if it is not large enough.
if self.receive_buffer.len() < len {
self.receive_buffer.resize(len, 0);
}
// Checking if the buffer is already filled.
if self.receive_filled_len >= len {
self.receive_filled_len = 0;
return Ok(true);
}
// Reading from the stream.
let start_index = self.receive_filled_len;
let receive_buffer = &mut self.receive_buffer[start_index..start_index + len];
let received_len = self.stream.read(receive_buffer);
self.receive_filled_len += match received_len {
Ok(n) => n,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(false),
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => return Ok(false),
Err(e) => return Err(e),
};
// Checking if the buffer is filled.
if self.receive_filled_len >= len {
self.receive_filled_len = 0;
return Ok(true);
}
Ok(false)
}
/// Receives a message from the connection.
///
/// If no message is available, returns `None`.
///
/// This function is not blocking.
pub fn receive(&mut self) -> io::Result<Option<&[u8]>> {
// Receiving the message length.
let message_len = match self.receive_message_len {
Some(message_len) => message_len,
None => {
// If the message length is not received yet, return `None`.
if !self.receive_partial(2)? {
return Ok(None);
}
// Setting the message length.
let message_len =
u16::from_ne_bytes(self.receive_buffer[..2].try_into().map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "invalid message length")
})?);
self.receive_message_len = Some(message_len);
// Returning the message length.
message_len
}
};
// Receiving the message.
if !self.receive_partial(message_len)? {
return Ok(None);
}
// Returning the message.
Ok(Some(&self.receive_buffer[..message_len as usize]))
}
}