commit 29ece8505191645bd029fadc15de91f1e50afddd Author: Tipragot Date: Thu Apr 27 00:39:27 2023 +0200 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4fffb2f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +/Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..bbc1c38 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "bevnet" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" +description = "A library for networking in Bevy." +authors = ["Tipragot "] +keywords = ["bevy", "network", "game"] +categories = ["network-programming", "game-development "] +repository = "https://git.tipragot.fr/tipragot/bevnet" + +[dependencies] +bevy = "0.10.1" +bincode = "1.3.3" +const-fnv1a-hash = "1.1.0" +dashmap = "5.4.0" +serde = { version = "1.0.160", features = ["derive"] } diff --git a/examples/ping_pong.rs b/examples/ping_pong.rs new file mode 100644 index 0000000..1445160 --- /dev/null +++ b/examples/ping_pong.rs @@ -0,0 +1,99 @@ +use bevnet::{ + impl_packet, AppClientNetwork, AppServerNetwork, ClientConnection, ClientListener, + ClientNetworkPlugin, PacketEvent, ServerConnection, ServerNetworkPlugin, +}; +use bevy::prelude::*; +use serde::{Deserialize, Serialize}; + +/// A ping packet. +#[derive(Serialize, Deserialize)] +pub struct PingPacket; +impl_packet!(PingPacket); + +/// A pong packet. +#[derive(Serialize, Deserialize)] +pub struct PongPacket; +impl_packet!(PongPacket); + +/// Launch the server. +fn start_server(mut commands: Commands, keys: Res>) { + if keys.just_pressed(KeyCode::B) { + println!("Starting server..."); + match ClientListener::bind("127.0.0.1:8000") { + Ok(listener) => { + println!("Listening on {}", listener.address()); + commands.insert_resource(listener); + } + Err(e) => println!("Failed to bind: {}", e), + } + } +} + +/// Show when a client connects. +fn client_connected(connection: Query<(Entity, &ClientConnection), Added>) { + for (entity, connection) in connection.iter() { + println!("Client connected: {} as {:?}", connection.address(), entity); + } +} + +/// Show when a client disconnects. +fn client_disconnected(mut removed: RemovedComponents) { + for entity in removed.iter() { + println!("Client disconnected: {:?}", entity); + } +} + +/// Receive the ping packets. +fn receive_ping(mut events: EventReader>) { + for PacketEvent { connection, .. } in events.iter() { + println!("Received ping from {}", connection.address()); + connection.send(PongPacket); + println!("Response sent!"); + } +} + +/// Connect the client to the server. +fn connect(mut commands: Commands, keys: Res>) { + if keys.just_pressed(KeyCode::C) { + println!("Connecting..."); + match ServerConnection::connect("127.0.0.1:8000") { + Ok(connection) => { + println!("Connected to {}", connection.address()); + commands.insert_resource(connection); + } + Err(e) => println!("Failed to connect: {}", e), + } + } +} + +/// Receive the pong packets. +fn receive_pong(mut events: EventReader) { + for _ in events.iter() { + println!("Received pong!"); + } +} + +/// Send a ping packet to the server. +fn send_ping(keys: Res>, connection: Res) { + if keys.just_pressed(KeyCode::P) { + connection.send(PingPacket); + println!("Ping sent!"); + } +} + +fn main() { + App::new() + .add_plugins(DefaultPlugins) + .add_plugin(ServerNetworkPlugin) + .add_system(start_server) + .add_system(client_connected) + .add_system(client_disconnected) + .add_system(receive_ping) + .register_server_packet::() + .add_plugin(ClientNetworkPlugin) + .add_system(connect) + .add_system(send_ping.run_if(resource_exists::())) + .add_system(receive_pong) + .register_client_packet::() + .run(); +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..3897209 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,218 @@ +use bevy::prelude::*; +pub use packet::Packet; +use std::{ + io, + net::{SocketAddr, ToSocketAddrs}, + sync::Arc, +}; +use tcp::{Connection, Listener}; + +mod packet; +mod tcp; + +/// A connection to a server. +#[derive(Resource)] +pub struct ServerConnection(Connection); + +impl ServerConnection { + /// Creates a [ServerConnection] to the given address. + pub fn connect(addr: A) -> io::Result { + Ok(Self(Connection::connect(addr)?)) + } + + /// Sends a [Packet] to the server. + pub fn send(&self, packet: P) { + self.0.send(packet); + } + + /// Gets the address of the server. + pub fn address(&self) -> SocketAddr { + self.0.address() + } +} + +/// Used to listen for incoming [ClientConnection]s. +#[derive(Resource)] +pub struct ClientListener(Listener); + +impl ClientListener { + /// Creates a [ClientListener] binded to the given address. + pub fn bind(address: A) -> io::Result { + Ok(Self(Listener::bind(address)?)) + } + + /// Returns the address the [ClientListener] is bound to. + pub fn address(&self) -> SocketAddr { + self.0.address() + } +} + +/// A connection to a client. +#[derive(Component)] +pub struct ClientConnection(Arc); + +impl ClientConnection { + /// Sends a [Packet] to the client. + pub fn send(&self, packet: P) { + self.0.send(packet); + } + + /// Gets the address of the client. + pub fn address(&self) -> SocketAddr { + self.0.address() + } +} + +/// A [Plugin] for client networking. +pub struct ClientNetworkPlugin; + +impl ClientNetworkPlugin { + /// Removes the [ServerConnection] resource when it's disconnected. + fn remove_disconnected(mut commands: Commands, connection: Res) { + if !connection.0.connected() { + commands.remove_resource::(); + } + } + + /// Clears the packet cache of the [ServerConnection]. + fn clear_cache(connection: Res) { + connection.0.clear(); + } +} + +impl Plugin for ClientNetworkPlugin { + fn build(&self, app: &mut App) { + app.add_systems(( + Self::remove_disconnected.run_if(resource_exists::()), + Self::clear_cache + .run_if(resource_exists::()) + .after(Self::remove_disconnected), + )); + } +} + +/// A [Plugin] for server networking. +pub struct ServerNetworkPlugin; + +impl ServerNetworkPlugin { + /// Removes the [ClientConnection] components when it's disconnected. + fn remove_disconnected( + mut commands: Commands, + connections: Query<(Entity, &ClientConnection)>, + ) { + for (entity, connection) in connections.iter() { + if !connection.0.connected() { + commands.entity(entity).remove::(); + } + } + } + + /// Clears the packet cache of the [ClientConnection]s. + fn clear_cache(connections: Query<&ClientConnection>) { + for connection in connections.iter() { + connection.0.clear(); + } + } + + /// Removes the [ClientListener] resource when it stop listening. + fn remove_not_listening(mut commands: Commands, listener: Res) { + if !listener.0.listening() { + commands.remove_resource::(); + } + } + + /// Accepts incoming connections. + fn accept_connections(mut commands: Commands, listener: Res) { + while let Some(connection) = listener.0.accept() { + commands.spawn(ClientConnection(Arc::new(connection))); + } + } +} + +impl Plugin for ServerNetworkPlugin { + fn build(&self, app: &mut App) { + app.add_systems(( + Self::remove_disconnected, + Self::clear_cache.after(Self::remove_disconnected), + Self::remove_not_listening.run_if(resource_exists::()), + Self::accept_connections + .run_if(resource_exists::()) + .after(Self::remove_not_listening), + )); + } +} + +/// Receives [Packet]s and sends them as [PacketEvent]s. +fn receive_server_packets( + mut writer: EventWriter>, + connection: Query<(Entity, &ClientConnection)>, +) { + for (entity, connection) in connection.iter() { + for packet in connection.0.recv() { + writer.send(PacketEvent { + connection: ClientConnection(Arc::clone(&connection.0)), + entity, + packet, + }); + } + } +} + +/// An extention trait to easily register a [Packet] to the server. +pub trait AppServerNetwork { + /// Registers a [Packet] for the server. + fn register_server_packet(&mut self) -> &mut Self; +} + +impl AppServerNetwork for App { + fn register_server_packet(&mut self) -> &mut Self { + self.add_event::>(); + self.add_system( + receive_server_packets::

+ .after(ServerNetworkPlugin::remove_disconnected) + .before(ServerNetworkPlugin::clear_cache), + ); + self + } +} + +/// An event for received [Packet]s on the server. +pub struct PacketEvent { + /// The [ClientConnection] from which the [Packet] was received. + pub connection: ClientConnection, + + /// The [Entity] of the [ClientConnection]. + pub entity: Entity, + + /// The [Packet] + pub packet: P, +} + +/// Receives [Packet]s and sends them as [Event]s. +fn receive_client_packets( + mut writer: EventWriter

, + connection: Res, +) { + for packet in connection.0.recv() { + writer.send(packet); + } +} + +/// An extention trait to easily register a [Packet] to the client. +pub trait AppClientNetwork { + /// Registers a [Packet] for the client. + fn register_client_packet(&mut self) -> &mut Self; +} + +impl AppClientNetwork for App { + fn register_client_packet(&mut self) -> &mut Self { + self.add_event::

(); + self.add_system( + receive_client_packets::

+ .run_if(resource_exists::()) + .after(ClientNetworkPlugin::remove_disconnected) + .before(ClientNetworkPlugin::clear_cache), + ); + self + } +} diff --git a/src/packet.rs b/src/packet.rs new file mode 100644 index 0000000..7a9a5e0 --- /dev/null +++ b/src/packet.rs @@ -0,0 +1,57 @@ +use dashmap::DashMap; +use serde::{de::DeserializeOwned, Serialize}; + +/// A [Packet] that can be sent over the network. +pub trait Packet: DeserializeOwned + Serialize + Sync + Send + 'static { + const ID: u32; +} + +/// A macro to easily implement [Packet]. +#[macro_export] +macro_rules! impl_packet { + ($t:ty) => { + impl ::bevnet::Packet for $t { + const ID: u32 = ::const_fnv1a_hash::fnv1a_hash_32( + concat!(module_path!(), "::", stringify!($t)).as_bytes(), + None, + ); + } + }; +} + +/// A container for the received [Packet]s. +pub struct PacketReceiver { + /// The received data. + data: DashMap>>, +} + +impl PacketReceiver { + /// Creates a new [PacketReceiver]. + pub fn new() -> Self { + Self { + data: DashMap::new(), + } + } + + /// Clears all the [Packet]s. + pub fn clear(&self) { + self.data.clear(); + } + + /// Inserts a the received raw [Packet] into the [PacketReceiver]. + pub fn insert(&self, id: u32, data: Vec) { + self.data.entry(id).or_default().push(data); + } + + /// Extract all the [Packet]s of a given type. + pub fn extract(&self) -> Vec

{ + match self.data.get_mut(&P::ID) { + Some(mut data) => data + .value_mut() + .drain(..) + .filter_map(|data| bincode::deserialize(&data).ok()) + .collect(), + None => Vec::new(), + } + } +} diff --git a/src/tcp.rs b/src/tcp.rs new file mode 100644 index 0000000..092d562 --- /dev/null +++ b/src/tcp.rs @@ -0,0 +1,233 @@ +use crate::packet::{Packet, PacketReceiver}; +use std::{ + io::{self, Read, Write}, + net::{Shutdown, SocketAddr, TcpListener, TcpStream, ToSocketAddrs}, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, + }, + thread, +}; + +/// Used to send [Packet] to the sending thread. +type ConnectionSender = Arc)>>>; + +/// A TCP [Connection] that can send and receive [Packet]. +pub struct Connection { + /// Whether or not the [Connection] is currently connected. + connected: Arc, + + /// Used to store the received [Packet]s. + packets: Arc, + + /// Used to send [Packet] to the sending thread. + sender: ConnectionSender, + + /// The [TcpStream] of the [Connection]. + stream: TcpStream, + + /// The address of the [Connection]. + address: SocketAddr, +} + +impl Connection { + /// Creates a new [Connection] with the given [TcpStream]. + pub fn new(stream: TcpStream) -> io::Result { + let connected = Arc::new(AtomicBool::new(true)); + let packets = Arc::new(PacketReceiver::new()); + + // Receiving part + let mut thread_stream = stream.try_clone()?; + let thread_packets = Arc::clone(&packets); + let thread_connected = Arc::clone(&connected); + thread::spawn(move || { + let mut int_buffer = [0; 4]; + + loop { + // Check if the connection is closed + if !thread_connected.load(Ordering::Relaxed) { + return; + } + + // Read the length of the packet + if thread_stream.read_exact(&mut int_buffer).is_err() { + break; + } + let len = u32::from_be_bytes(int_buffer); + + // Read the packet identifier + if thread_stream.read_exact(&mut int_buffer).is_err() { + break; + } + let id = u32::from_be_bytes(int_buffer); + + // Read the packet + let mut buffer = vec![0; len as usize]; + if thread_stream.read_exact(&mut buffer).is_err() { + break; + } + + // Insert the packet + thread_packets.insert(id, buffer); + } + + // Close the connection + thread_connected.store(false, Ordering::Relaxed); + }); + + // Sending part + let mut thread_stream = stream.try_clone()?; + let (sender, receiver) = channel(); + let thread_connected = Arc::clone(&connected); + thread::spawn(move || { + loop { + // Check if the connection is closed + if !thread_connected.load(Ordering::Relaxed) { + return; + } + + // Get the data to send + let (id, buffer): (u32, Vec) = match receiver.recv() { + Ok(data) => data, + Err(_) => break, + }; + + // Send the length of the data + let len = buffer.len() as u32; + if thread_stream.write_all(&len.to_be_bytes()).is_err() { + break; + } + + // Send the packet identifier + if thread_stream.write_all(&id.to_be_bytes()).is_err() { + break; + } + + // Send the data + if thread_stream.write_all(&buffer).is_err() { + break; + } + + // Flush the stream + if thread_stream.flush().is_err() { + break; + } + } + + // Close the connection + thread_connected.store(false, Ordering::Relaxed); + }); + + Ok(Self { + connected, + packets, + sender: Arc::new(Mutex::new(sender)), + address: stream.peer_addr()?, + stream, + }) + } + + /// Creates a [Connection] to the given address. + pub fn connect(address: A) -> io::Result { + Self::new(TcpStream::connect(address)?) + } + + /// Returns whether or not the [Connection] is currently connected. + pub fn connected(&self) -> bool { + self.connected.load(Ordering::Relaxed) + } + + /// Clears the [Packet] cache. + pub fn clear(&self) { + self.packets.clear(); + } + + /// Gets all the received [Packet]s of a certain type. + pub fn recv(&self) -> Vec

{ + self.packets.extract() + } + + /// Sends the given [Packet] to the [Connection]. + /// Does nothing if the [Connection] is closed. + pub fn send(&self, packet: P) { + let data = bincode::serialize(&packet).expect("Failed to serialize packet"); + self.sender + .lock() + .map(|sender| sender.send((P::ID, data))) + .ok(); + } + + /// Returns the address of the [Connection]. + pub fn address(&self) -> SocketAddr { + self.address + } +} + +impl Drop for Connection { + fn drop(&mut self) { + self.connected.store(false, Ordering::Relaxed); + self.stream.shutdown(Shutdown::Both).ok(); + } +} + +/// A TCP [Listener] that can accept [Connection]s. +pub struct Listener { + /// Whether the [Listener] is listening. + listening: Arc, + + /// The receiving part of the [Listener]. + receiver: Arc>>, + + /// The address the [Listener] is bound to. + address: SocketAddr, +} + +impl Listener { + /// Creates a new [Listener] binded to the given address. + pub fn bind(address: A) -> io::Result { + let listener = TcpListener::bind(address)?; + let address = listener.local_addr()?; + let listening = Arc::new(AtomicBool::new(true)); + let listening_thread = Arc::clone(&listening); + let (sender, receiver) = channel(); + thread::spawn(move || { + for stream in listener.incoming() { + let connection = match stream { + Ok(stream) => match Connection::new(stream) { + Ok(connection) => connection, + Err(_) => break, + }, + Err(_) => break, + }; + if sender.send(connection).is_err() { + break; + } + } + listening_thread.store(false, Ordering::Relaxed); + }); + Ok(Self { + listening, + receiver: Arc::new(Mutex::new(receiver)), + address, + }) + } + + /// Returns whether or not the [Listener] is listening. + pub fn listening(&self) -> bool { + self.listening.load(Ordering::Relaxed) + } + + /// Receives the next [Connection] from the [Listener]. + pub fn accept(&self) -> Option { + self.receiver + .lock() + .ok() + .and_then(|receiver| receiver.try_recv().ok()) + } + + /// Returns the address the [Listener] is bound to. + pub fn address(&self) -> SocketAddr { + self.address + } +}