From 9f233c6482d4ac8045c8d882f43d52ec146eaa81 Mon Sep 17 00:00:00 2001 From: Tipragot Date: Thu, 27 Apr 2023 10:00:33 +0200 Subject: [PATCH 1/4] Remove last system --- examples/ping_pong.rs | 99 ------------------ src/lib.rs | 218 --------------------------------------- src/packet.rs | 57 ----------- src/tcp.rs | 233 ------------------------------------------ 4 files changed, 607 deletions(-) delete mode 100644 examples/ping_pong.rs delete mode 100644 src/packet.rs delete mode 100644 src/tcp.rs diff --git a/examples/ping_pong.rs b/examples/ping_pong.rs deleted file mode 100644 index 1445160..0000000 --- a/examples/ping_pong.rs +++ /dev/null @@ -1,99 +0,0 @@ -use bevnet::{ - impl_packet, AppClientNetwork, AppServerNetwork, ClientConnection, ClientListener, - ClientNetworkPlugin, PacketEvent, ServerConnection, ServerNetworkPlugin, -}; -use bevy::prelude::*; -use serde::{Deserialize, Serialize}; - -/// A ping packet. -#[derive(Serialize, Deserialize)] -pub struct PingPacket; -impl_packet!(PingPacket); - -/// A pong packet. -#[derive(Serialize, Deserialize)] -pub struct PongPacket; -impl_packet!(PongPacket); - -/// Launch the server. -fn start_server(mut commands: Commands, keys: Res>) { - if keys.just_pressed(KeyCode::B) { - println!("Starting server..."); - match ClientListener::bind("127.0.0.1:8000") { - Ok(listener) => { - println!("Listening on {}", listener.address()); - commands.insert_resource(listener); - } - Err(e) => println!("Failed to bind: {}", e), - } - } -} - -/// Show when a client connects. -fn client_connected(connection: Query<(Entity, &ClientConnection), Added>) { - for (entity, connection) in connection.iter() { - println!("Client connected: {} as {:?}", connection.address(), entity); - } -} - -/// Show when a client disconnects. -fn client_disconnected(mut removed: RemovedComponents) { - for entity in removed.iter() { - println!("Client disconnected: {:?}", entity); - } -} - -/// Receive the ping packets. -fn receive_ping(mut events: EventReader>) { - for PacketEvent { connection, .. } in events.iter() { - println!("Received ping from {}", connection.address()); - connection.send(PongPacket); - println!("Response sent!"); - } -} - -/// Connect the client to the server. -fn connect(mut commands: Commands, keys: Res>) { - if keys.just_pressed(KeyCode::C) { - println!("Connecting..."); - match ServerConnection::connect("127.0.0.1:8000") { - Ok(connection) => { - println!("Connected to {}", connection.address()); - commands.insert_resource(connection); - } - Err(e) => println!("Failed to connect: {}", e), - } - } -} - -/// Receive the pong packets. -fn receive_pong(mut events: EventReader) { - for _ in events.iter() { - println!("Received pong!"); - } -} - -/// Send a ping packet to the server. -fn send_ping(keys: Res>, connection: Res) { - if keys.just_pressed(KeyCode::P) { - connection.send(PingPacket); - println!("Ping sent!"); - } -} - -fn main() { - App::new() - .add_plugins(DefaultPlugins) - .add_plugin(ServerNetworkPlugin) - .add_system(start_server) - .add_system(client_connected) - .add_system(client_disconnected) - .add_system(receive_ping) - .register_server_packet::() - .add_plugin(ClientNetworkPlugin) - .add_system(connect) - .add_system(send_ping.run_if(resource_exists::())) - .add_system(receive_pong) - .register_client_packet::() - .run(); -} diff --git a/src/lib.rs b/src/lib.rs index 3897209..e69de29 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,218 +0,0 @@ -use bevy::prelude::*; -pub use packet::Packet; -use std::{ - io, - net::{SocketAddr, ToSocketAddrs}, - sync::Arc, -}; -use tcp::{Connection, Listener}; - -mod packet; -mod tcp; - -/// A connection to a server. -#[derive(Resource)] -pub struct ServerConnection(Connection); - -impl ServerConnection { - /// Creates a [ServerConnection] to the given address. - pub fn connect(addr: A) -> io::Result { - Ok(Self(Connection::connect(addr)?)) - } - - /// Sends a [Packet] to the server. - pub fn send(&self, packet: P) { - self.0.send(packet); - } - - /// Gets the address of the server. - pub fn address(&self) -> SocketAddr { - self.0.address() - } -} - -/// Used to listen for incoming [ClientConnection]s. -#[derive(Resource)] -pub struct ClientListener(Listener); - -impl ClientListener { - /// Creates a [ClientListener] binded to the given address. - pub fn bind(address: A) -> io::Result { - Ok(Self(Listener::bind(address)?)) - } - - /// Returns the address the [ClientListener] is bound to. - pub fn address(&self) -> SocketAddr { - self.0.address() - } -} - -/// A connection to a client. -#[derive(Component)] -pub struct ClientConnection(Arc); - -impl ClientConnection { - /// Sends a [Packet] to the client. - pub fn send(&self, packet: P) { - self.0.send(packet); - } - - /// Gets the address of the client. - pub fn address(&self) -> SocketAddr { - self.0.address() - } -} - -/// A [Plugin] for client networking. -pub struct ClientNetworkPlugin; - -impl ClientNetworkPlugin { - /// Removes the [ServerConnection] resource when it's disconnected. - fn remove_disconnected(mut commands: Commands, connection: Res) { - if !connection.0.connected() { - commands.remove_resource::(); - } - } - - /// Clears the packet cache of the [ServerConnection]. - fn clear_cache(connection: Res) { - connection.0.clear(); - } -} - -impl Plugin for ClientNetworkPlugin { - fn build(&self, app: &mut App) { - app.add_systems(( - Self::remove_disconnected.run_if(resource_exists::()), - Self::clear_cache - .run_if(resource_exists::()) - .after(Self::remove_disconnected), - )); - } -} - -/// A [Plugin] for server networking. -pub struct ServerNetworkPlugin; - -impl ServerNetworkPlugin { - /// Removes the [ClientConnection] components when it's disconnected. - fn remove_disconnected( - mut commands: Commands, - connections: Query<(Entity, &ClientConnection)>, - ) { - for (entity, connection) in connections.iter() { - if !connection.0.connected() { - commands.entity(entity).remove::(); - } - } - } - - /// Clears the packet cache of the [ClientConnection]s. - fn clear_cache(connections: Query<&ClientConnection>) { - for connection in connections.iter() { - connection.0.clear(); - } - } - - /// Removes the [ClientListener] resource when it stop listening. - fn remove_not_listening(mut commands: Commands, listener: Res) { - if !listener.0.listening() { - commands.remove_resource::(); - } - } - - /// Accepts incoming connections. - fn accept_connections(mut commands: Commands, listener: Res) { - while let Some(connection) = listener.0.accept() { - commands.spawn(ClientConnection(Arc::new(connection))); - } - } -} - -impl Plugin for ServerNetworkPlugin { - fn build(&self, app: &mut App) { - app.add_systems(( - Self::remove_disconnected, - Self::clear_cache.after(Self::remove_disconnected), - Self::remove_not_listening.run_if(resource_exists::()), - Self::accept_connections - .run_if(resource_exists::()) - .after(Self::remove_not_listening), - )); - } -} - -/// Receives [Packet]s and sends them as [PacketEvent]s. -fn receive_server_packets( - mut writer: EventWriter>, - connection: Query<(Entity, &ClientConnection)>, -) { - for (entity, connection) in connection.iter() { - for packet in connection.0.recv() { - writer.send(PacketEvent { - connection: ClientConnection(Arc::clone(&connection.0)), - entity, - packet, - }); - } - } -} - -/// An extention trait to easily register a [Packet] to the server. -pub trait AppServerNetwork { - /// Registers a [Packet] for the server. - fn register_server_packet(&mut self) -> &mut Self; -} - -impl AppServerNetwork for App { - fn register_server_packet(&mut self) -> &mut Self { - self.add_event::>(); - self.add_system( - receive_server_packets::

- .after(ServerNetworkPlugin::remove_disconnected) - .before(ServerNetworkPlugin::clear_cache), - ); - self - } -} - -/// An event for received [Packet]s on the server. -pub struct PacketEvent { - /// The [ClientConnection] from which the [Packet] was received. - pub connection: ClientConnection, - - /// The [Entity] of the [ClientConnection]. - pub entity: Entity, - - /// The [Packet] - pub packet: P, -} - -/// Receives [Packet]s and sends them as [Event]s. -fn receive_client_packets( - mut writer: EventWriter

, - connection: Res, -) { - for packet in connection.0.recv() { - writer.send(packet); - } -} - -/// An extention trait to easily register a [Packet] to the client. -pub trait AppClientNetwork { - /// Registers a [Packet] for the client. - fn register_client_packet(&mut self) -> &mut Self; -} - -impl AppClientNetwork for App { - fn register_client_packet(&mut self) -> &mut Self { - self.add_event::

(); - self.add_system( - receive_client_packets::

- .run_if(resource_exists::()) - .after(ClientNetworkPlugin::remove_disconnected) - .before(ClientNetworkPlugin::clear_cache), - ); - self - } -} diff --git a/src/packet.rs b/src/packet.rs deleted file mode 100644 index 7a9a5e0..0000000 --- a/src/packet.rs +++ /dev/null @@ -1,57 +0,0 @@ -use dashmap::DashMap; -use serde::{de::DeserializeOwned, Serialize}; - -/// A [Packet] that can be sent over the network. -pub trait Packet: DeserializeOwned + Serialize + Sync + Send + 'static { - const ID: u32; -} - -/// A macro to easily implement [Packet]. -#[macro_export] -macro_rules! impl_packet { - ($t:ty) => { - impl ::bevnet::Packet for $t { - const ID: u32 = ::const_fnv1a_hash::fnv1a_hash_32( - concat!(module_path!(), "::", stringify!($t)).as_bytes(), - None, - ); - } - }; -} - -/// A container for the received [Packet]s. -pub struct PacketReceiver { - /// The received data. - data: DashMap>>, -} - -impl PacketReceiver { - /// Creates a new [PacketReceiver]. - pub fn new() -> Self { - Self { - data: DashMap::new(), - } - } - - /// Clears all the [Packet]s. - pub fn clear(&self) { - self.data.clear(); - } - - /// Inserts a the received raw [Packet] into the [PacketReceiver]. - pub fn insert(&self, id: u32, data: Vec) { - self.data.entry(id).or_default().push(data); - } - - /// Extract all the [Packet]s of a given type. - pub fn extract(&self) -> Vec

{ - match self.data.get_mut(&P::ID) { - Some(mut data) => data - .value_mut() - .drain(..) - .filter_map(|data| bincode::deserialize(&data).ok()) - .collect(), - None => Vec::new(), - } - } -} diff --git a/src/tcp.rs b/src/tcp.rs deleted file mode 100644 index 092d562..0000000 --- a/src/tcp.rs +++ /dev/null @@ -1,233 +0,0 @@ -use crate::packet::{Packet, PacketReceiver}; -use std::{ - io::{self, Read, Write}, - net::{Shutdown, SocketAddr, TcpListener, TcpStream, ToSocketAddrs}, - sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::{channel, Receiver, Sender}, - Arc, Mutex, - }, - thread, -}; - -/// Used to send [Packet] to the sending thread. -type ConnectionSender = Arc)>>>; - -/// A TCP [Connection] that can send and receive [Packet]. -pub struct Connection { - /// Whether or not the [Connection] is currently connected. - connected: Arc, - - /// Used to store the received [Packet]s. - packets: Arc, - - /// Used to send [Packet] to the sending thread. - sender: ConnectionSender, - - /// The [TcpStream] of the [Connection]. - stream: TcpStream, - - /// The address of the [Connection]. - address: SocketAddr, -} - -impl Connection { - /// Creates a new [Connection] with the given [TcpStream]. - pub fn new(stream: TcpStream) -> io::Result { - let connected = Arc::new(AtomicBool::new(true)); - let packets = Arc::new(PacketReceiver::new()); - - // Receiving part - let mut thread_stream = stream.try_clone()?; - let thread_packets = Arc::clone(&packets); - let thread_connected = Arc::clone(&connected); - thread::spawn(move || { - let mut int_buffer = [0; 4]; - - loop { - // Check if the connection is closed - if !thread_connected.load(Ordering::Relaxed) { - return; - } - - // Read the length of the packet - if thread_stream.read_exact(&mut int_buffer).is_err() { - break; - } - let len = u32::from_be_bytes(int_buffer); - - // Read the packet identifier - if thread_stream.read_exact(&mut int_buffer).is_err() { - break; - } - let id = u32::from_be_bytes(int_buffer); - - // Read the packet - let mut buffer = vec![0; len as usize]; - if thread_stream.read_exact(&mut buffer).is_err() { - break; - } - - // Insert the packet - thread_packets.insert(id, buffer); - } - - // Close the connection - thread_connected.store(false, Ordering::Relaxed); - }); - - // Sending part - let mut thread_stream = stream.try_clone()?; - let (sender, receiver) = channel(); - let thread_connected = Arc::clone(&connected); - thread::spawn(move || { - loop { - // Check if the connection is closed - if !thread_connected.load(Ordering::Relaxed) { - return; - } - - // Get the data to send - let (id, buffer): (u32, Vec) = match receiver.recv() { - Ok(data) => data, - Err(_) => break, - }; - - // Send the length of the data - let len = buffer.len() as u32; - if thread_stream.write_all(&len.to_be_bytes()).is_err() { - break; - } - - // Send the packet identifier - if thread_stream.write_all(&id.to_be_bytes()).is_err() { - break; - } - - // Send the data - if thread_stream.write_all(&buffer).is_err() { - break; - } - - // Flush the stream - if thread_stream.flush().is_err() { - break; - } - } - - // Close the connection - thread_connected.store(false, Ordering::Relaxed); - }); - - Ok(Self { - connected, - packets, - sender: Arc::new(Mutex::new(sender)), - address: stream.peer_addr()?, - stream, - }) - } - - /// Creates a [Connection] to the given address. - pub fn connect(address: A) -> io::Result { - Self::new(TcpStream::connect(address)?) - } - - /// Returns whether or not the [Connection] is currently connected. - pub fn connected(&self) -> bool { - self.connected.load(Ordering::Relaxed) - } - - /// Clears the [Packet] cache. - pub fn clear(&self) { - self.packets.clear(); - } - - /// Gets all the received [Packet]s of a certain type. - pub fn recv(&self) -> Vec

{ - self.packets.extract() - } - - /// Sends the given [Packet] to the [Connection]. - /// Does nothing if the [Connection] is closed. - pub fn send(&self, packet: P) { - let data = bincode::serialize(&packet).expect("Failed to serialize packet"); - self.sender - .lock() - .map(|sender| sender.send((P::ID, data))) - .ok(); - } - - /// Returns the address of the [Connection]. - pub fn address(&self) -> SocketAddr { - self.address - } -} - -impl Drop for Connection { - fn drop(&mut self) { - self.connected.store(false, Ordering::Relaxed); - self.stream.shutdown(Shutdown::Both).ok(); - } -} - -/// A TCP [Listener] that can accept [Connection]s. -pub struct Listener { - /// Whether the [Listener] is listening. - listening: Arc, - - /// The receiving part of the [Listener]. - receiver: Arc>>, - - /// The address the [Listener] is bound to. - address: SocketAddr, -} - -impl Listener { - /// Creates a new [Listener] binded to the given address. - pub fn bind(address: A) -> io::Result { - let listener = TcpListener::bind(address)?; - let address = listener.local_addr()?; - let listening = Arc::new(AtomicBool::new(true)); - let listening_thread = Arc::clone(&listening); - let (sender, receiver) = channel(); - thread::spawn(move || { - for stream in listener.incoming() { - let connection = match stream { - Ok(stream) => match Connection::new(stream) { - Ok(connection) => connection, - Err(_) => break, - }, - Err(_) => break, - }; - if sender.send(connection).is_err() { - break; - } - } - listening_thread.store(false, Ordering::Relaxed); - }); - Ok(Self { - listening, - receiver: Arc::new(Mutex::new(receiver)), - address, - }) - } - - /// Returns whether or not the [Listener] is listening. - pub fn listening(&self) -> bool { - self.listening.load(Ordering::Relaxed) - } - - /// Receives the next [Connection] from the [Listener]. - pub fn accept(&self) -> Option { - self.receiver - .lock() - .ok() - .and_then(|receiver| receiver.try_recv().ok()) - } - - /// Returns the address the [Listener] is bound to. - pub fn address(&self) -> SocketAddr { - self.address - } -} From 942c726b9548739b446857aaa836fb0debc5fbf5 Mon Sep 17 00:00:00 2001 From: Tipragot Date: Fri, 28 Apr 2023 11:33:01 +0200 Subject: [PATCH 2/4] New network system --- src/lib.rs | 193 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/tcp.rs | 153 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 346 insertions(+) create mode 100644 src/tcp.rs diff --git a/src/lib.rs b/src/lib.rs index e69de29..906f3fd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -0,0 +1,193 @@ +use bevy::{prelude::*, utils::HashMap}; +use std::{ + io::{self, ErrorKind}, + net::{TcpStream, ToSocketAddrs}, + sync::{mpsc::TryRecvError, Arc}, +}; +use tcp::{Connection, Listener, Packet, RawPacket}; + +mod tcp; + +/// A Bevy resource that store the packets handlers for the client. +#[derive(Resource)] +pub struct ClientPacketHandler(Arc>>); + +/// A connection to a remote server. +#[derive(Resource)] +pub struct ServerConnection(Connection); + +impl ServerConnection { + /// Connects to a remote server. + pub fn connect(addr: A) -> io::Result { + Ok(Self(Connection::new(TcpStream::connect(addr)?)?)) + } + + /// Send a [Packet] to the remote server. + pub fn send(&self, packet: P) { + self.0.send(packet).ok(); + } +} + +/// A Bevy resource that store the packets handlers for the server. +#[derive(Resource)] +struct ServerPacketHandler( + Arc>>, +); + +/// A [ClientConnection] listener. +#[derive(Resource)] +pub struct ClientListener(Listener); + +impl ClientListener { + /// Creates a new TCP listener on the given address. + pub fn bind(addr: A) -> io::Result { + Ok(Self(Listener::bind(addr)?)) + } +} + +/// A connection to a remote client. +#[derive(Component)] +pub struct ClientConnection(Arc); + +impl ClientConnection { + /// Sends a [Packet] to the remote client. + pub fn send(&self, packet: P) { + self.0.send(packet).ok(); + } +} + +/// A Bevy system to handle incoming [ClientConnection]s and remove the +/// [ClientListener] resource if it is no longer listening. +fn accept_connections(mut commands: Commands, listener: Option>) { + if let Some(listener) = listener { + match listener.0.accept() { + Ok(connection) => { + commands.spawn(ClientConnection(Arc::new(connection))); + } + Err(error) => match error.kind() { + ErrorKind::WouldBlock => {} + _ => commands.remove_resource::(), + }, + } + } +} + +/// A Bevy system to handle incoming [Packet]s from +/// the [ClientConnection]s and the [ServerConnection]. +/// It removes them if they are no longer connected. +fn receive_packets(world: &mut World) { + // Handle client packets + let mut packets = Vec::new(); + let mut to_remove = false; + if let Some(connection) = world.get_resource::() { + if let Some(receiver) = connection.0.recv() { + loop { + match receiver.try_recv() { + Ok(packet) => packets.push(packet), + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + to_remove = true; + break; + } + } + } + } + } + if let Some(handlers) = world + .get_resource_mut::() + .map(|handlers| Arc::clone(&handlers.0)) + { + for packet in packets.into_iter() { + if let Some(handler) = handlers.get(&packet.packet_id()) { + (handler)(packet, world); + } + } + } + if to_remove { + world.remove_resource::(); + } + + // Handle server packets + let mut packets = Vec::new(); + let mut to_remove = Vec::new(); + for (entity, connection) in world.query::<(Entity, &ClientConnection)>().iter(world) { + if let Some(receiver) = connection.0.recv() { + loop { + match receiver.try_recv() { + Ok(packet) => { + packets.push((entity, ClientConnection(Arc::clone(&connection.0)), packet)); + } + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + to_remove.push(entity); + break; + } + } + } + } + } + if let Some(handlers) = world + .get_resource_mut::() + .map(|handlers| Arc::clone(&handlers.0)) + { + for (entity, connection, packet) in packets.into_iter() { + if let Some(handler) = handlers.get(&packet.packet_id()) { + (handler)(entity, connection, packet, world); + } + } + } + for entity in to_remove { + world.despawn(entity); + } +} + +/// A network plugin. +pub struct NetworkPlugin; + +impl Plugin for NetworkPlugin { + fn build(&self, app: &mut App) { + app.insert_resource(ServerPacketHandler(Arc::new(HashMap::new()))); + app.insert_resource(ClientPacketHandler(Arc::new(HashMap::new()))); + app.add_system(accept_connections); + app.add_system(receive_packets); + } +} + +/// A extension trait to add packet handler. +pub trait NetworkAppExt { + /// Add a client packet handler. + fn add_client_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(RawPacket, &mut World) + Send + Sync + 'static; + + /// Add a server packet handler. + fn add_server_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(Entity, ClientConnection, RawPacket, &mut World) + Send + Sync + 'static; +} + +impl NetworkAppExt for App { + fn add_client_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(RawPacket, &mut World) + Send + Sync + 'static, + { + Arc::get_mut(&mut self.world.resource_mut::().0) + .unwrap() + .insert(P::packet_id(), Box::new(handler)); + self + } + + fn add_server_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(Entity, ClientConnection, RawPacket, &mut World) + Send + Sync + 'static, + { + Arc::get_mut(&mut self.world.resource_mut::().0) + .unwrap() + .insert(P::packet_id(), Box::new(handler)); + self + } +} diff --git a/src/tcp.rs b/src/tcp.rs new file mode 100644 index 0000000..08af2a7 --- /dev/null +++ b/src/tcp.rs @@ -0,0 +1,153 @@ +use serde::{de::DeserializeOwned, Serialize}; +use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, + io::{self, ErrorKind, Read, Write}, + net::{Shutdown, TcpListener, TcpStream, ToSocketAddrs}, + sync::{ + mpsc::{channel, Receiver, Sender}, + Mutex, MutexGuard, + }, + thread, +}; + +/// A packet that can be sent over a [Connection]. +pub trait Packet: DeserializeOwned + Serialize + Send + Sync { + /// Returns a unique identifier for this packet. + /// + /// This function uses [std::any::type_name] to get a string + /// representation of the type of the object and returns the + /// hash of that string. This is not perfect... but I didn't + /// find a better solution. + fn packet_id() -> u64 { + let mut hasher = DefaultHasher::new(); + std::any::type_name::().hash(&mut hasher); + hasher.finish() + } +} + +impl Packet for T {} + +/// A raw packet. +pub struct RawPacket { + /// The identifier for this packet. + packet_id: u64, + + /// The serialized packet. + data: Vec, +} + +impl RawPacket { + /// Returns the identifier for this packet. + pub fn packet_id(&self) -> u64 { + self.packet_id + } + + /// Deserializes this packet to the given [Packet] type. + pub fn deserialize(&self) -> Option

{ + bincode::deserialize(&self.data).ok() + } +} + +/// A TCP connection that can send and receive [Packet]s. +pub struct Connection { + /// The [TcpStream] of the connection. + /// + /// It is used to send [Packet]s and to stop the receive + /// thread when the [Connection] is dropped. + stream: Mutex, + + /// The [Receiver] of the received [RawPacket]s. + receiver: Mutex>, +} + +impl Connection { + /// Creates a new TCP connection. + pub fn new(stream: TcpStream) -> io::Result { + let (sender, receiver) = channel(); + let thread_stream = stream.try_clone()?; + thread::spawn(move || Self::receive_loop(thread_stream, sender)); + Ok(Self { + stream: Mutex::new(stream), + receiver: Mutex::new(receiver), + }) + } + + /// The [Packet] receiving loop. + fn receive_loop(mut stream: TcpStream, sender: Sender) { + let mut len_buffer = [0; 4]; + let mut id_buffer = [0; 8]; + loop { + // Read the length of the packet + if stream.read_exact(&mut len_buffer).is_err() { + return; + } + let packet_len = u32::from_le_bytes(len_buffer); + + // Read the packet identifier + if stream.read_exact(&mut id_buffer).is_err() { + return; + } + let packet_id = u64::from_le_bytes(id_buffer); + + // Read the packet data + let mut data = vec![0; packet_len as usize]; + if stream.read_exact(&mut data).is_err() { + return; + } + + // Store the packet + if sender.send(RawPacket { packet_id, data }).is_err() { + return; + } + } + } + + /// Sends a [Packet] over this connection. + pub fn send(&self, packet: P) -> io::Result<()> { + let data = bincode::serialize(&packet).map_err(|e| io::Error::new(ErrorKind::Other, e))?; + let mut stream = self + .stream + .lock() + .map_err(|_| io::Error::new(ErrorKind::Other, "Failed to lock stream"))?; + stream.write_all(&data.len().to_le_bytes())?; + stream.write_all(&P::packet_id().to_le_bytes())?; + stream.write_all(&data) + } + + /// Gets the [RawPacket] receiver of this connection. + pub fn recv(&self) -> Option>> { + self.receiver.lock().ok() + } +} + +impl Drop for Connection { + fn drop(&mut self) { + self.stream + .lock() + .map(|stream| stream.shutdown(Shutdown::Both)) + .ok(); + } +} + +/// A [Connection] listener. +pub struct Listener { + /// The [TcpListener] of the listener. + listener: TcpListener, +} + +impl Listener { + /// Creates a new TCP listener on the given address. + pub fn bind(addr: A) -> io::Result { + let listener = TcpListener::bind(addr)?; + listener.set_nonblocking(true)?; + Ok(Self { listener }) + } + + /// Accepts a new [Connection]. + pub fn accept(&self) -> io::Result { + self.listener + .accept() + .and_then(|(stream, _)| Connection::new(stream)) + } +} From 69a7f375189bbf68e7d4c9bc1340dff0ded4a995 Mon Sep 17 00:00:00 2001 From: Tipragot Date: Fri, 28 Apr 2023 17:51:41 +0200 Subject: [PATCH 3/4] New packet handeling system --- examples/ping_pong.rs | 67 ++++++++++++++++ src/client.rs | 103 ++++++++++++++++++++++++ src/lib.rs | 31 ++++++- src/server.rs | 127 +++++++++++++++++++++++++++++ src/tcp.rs | 183 +++++++++++++++++++++--------------------- 5 files changed, 418 insertions(+), 93 deletions(-) create mode 100644 examples/ping_pong.rs create mode 100644 src/client.rs create mode 100644 src/server.rs diff --git a/examples/ping_pong.rs b/examples/ping_pong.rs new file mode 100644 index 0000000..4a666b7 --- /dev/null +++ b/examples/ping_pong.rs @@ -0,0 +1,67 @@ +use bevnet::{ + client::{ClientAppExt, ClientPlugin, ServerConnection}, + server::{ClientListener, ServerAppExt, ServerPlugin}, +}; +use bevy::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +struct Ping; + +#[derive(Serialize, Deserialize)] +struct Pong; + +fn start_server(mut commands: Commands, keys: Res>) { + if keys.just_pressed(KeyCode::B) { + println!("Starting server..."); + match ClientListener::bind("127.0.0.1:8000") { + Ok(listener) => { + commands.insert_resource(listener); + println!("Server started"); + } + Err(e) => println!("Failed to start server: {}", e), + } + } +} + +fn connect(mut commands: Commands, keys: Res>) { + if keys.just_pressed(KeyCode::C) { + println!("Connecting to server..."); + match ServerConnection::connect("127.0.0.1:8000") { + Ok(connection) => { + commands.insert_resource(connection); + println!("Connected to server"); + } + Err(e) => println!("Failed to connect: {}", e), + } + } +} + +fn send_ping(connection: Option>, keys: Res>) { + if keys.just_pressed(KeyCode::S) { + println!("Sending ping..."); + if let Some(connection) = connection { + connection.send(Ping); + println!("Ping sent"); + } + } +} + +fn main() { + App::new() + .add_plugins(DefaultPlugins) + .add_plugin(ServerPlugin) + .add_system(start_server) + .add_server_packet_handler::(|entity, connection, _, _| { + println!("Received ping from {:?}", entity); + connection.send(Pong); + println!("Sent pong to {:?}", entity); + }) + .add_plugin(ClientPlugin) + .add_system(connect) + .add_client_packet_handler::(|_, _| { + println!("Received pong"); + }) + .add_system(send_ping) + .run(); +} diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..d368e0f --- /dev/null +++ b/src/client.rs @@ -0,0 +1,103 @@ +use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc}; + +use bevy::prelude::*; + +use crate::{tcp::Connection, Packet}; + +/// A function that handle a received [Packet] on the client. +pub type ClientPacketHandler = Box, &mut World) + Send + Sync>; + +/// A Bevy resource that store the packets handlers for the client. +#[derive(Resource)] +pub struct ClientHandlerManager(Arc>); + +/// A connection to a remote server. +#[derive(Resource)] +pub struct ServerConnection(Connection); + +impl ServerConnection { + /// Connects to a remote server. + pub fn connect(addr: A) -> io::Result { + Ok(Self(Connection::connect(addr)?)) + } + + /// Sends a packet through this connection. + pub fn send(&self, packet: P) { + let mut data = bincode::serialize(&packet).expect("Failed to serialize packet"); + data.extend(P::packet_id().to_be_bytes()); + self.0.send(data); + } +} + +/// A plugin that manage the network connections for a server. +pub struct ClientPlugin; + +impl ClientPlugin { + /// Handles a received [Packet] on the server. + pub fn handle_packets(world: &mut World) { + // Get all received packets + let mut packets = Vec::new(); + if let Some(connection) = world.get_resource::() { + while let Some(mut packet) = connection.0.recv() { + if packet.len() < 8 { + println!("Invalid packet received: {:?}", packet); + } else { + let id_buffer = packet.split_off(packet.len() - 8); + let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap()); + packets.push((packet_id, packet)); + } + } + } else { + return; + } + + // Get the packet handlers + let handlers = Arc::clone(&world.resource_mut::().0); + + // Handle all received packets + for (packet_id, packet) in packets { + if let Some(handler) = handlers.get(&packet_id) { + handler(packet, world); + } + } + } + + /// Remove [ServerConnection] if it's disconnected. + pub fn remove_disconnected(mut commands: Commands, connection: Option>) { + if let Some(connection) = connection { + if connection.0.closed() { + commands.remove_resource::(); + } + } + } +} + +impl Plugin for ClientPlugin { + fn build(&self, app: &mut App) { + app.insert_resource(ClientHandlerManager(Arc::new(HashMap::new()))); + app.add_system(ClientPlugin::handle_packets); + app.add_system(ClientPlugin::remove_disconnected); + } +} + +/// An extension to add packet handlers. +pub trait ClientAppExt { + /// Add a new packet handler. + fn add_client_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(Vec, &mut World) + Send + Sync + 'static; +} + +impl ClientAppExt for App { + fn add_client_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(Vec, &mut World) + Send + Sync + 'static, + { + Arc::get_mut(&mut self.world.resource_mut::().0) + .unwrap() + .insert(P::packet_id(), Box::new(handler)); + self + } +} diff --git a/src/lib.rs b/src/lib.rs index 906f3fd..bbfe983 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,31 @@ -use bevy::{prelude::*, utils::HashMap}; +use serde::{de::DeserializeOwned, Serialize}; +use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, +}; + +pub mod client; +pub mod server; +mod tcp; + +/// A packet that can be sent over a [Connection]. +pub trait Packet: DeserializeOwned + Serialize + Send + Sync { + /// Returns a unique identifier for this packet. + /// + /// This function uses [std::any::type_name] to get a string + /// representation of the type of the object and returns the + /// hash of that string. This is not perfect... but I didn't + /// find a better solution. + fn packet_id() -> u64 { + let mut hasher = DefaultHasher::new(); + std::any::type_name::().hash(&mut hasher); + hasher.finish() + } +} + +impl Packet for T {} + +/* use bevy::{prelude::*, utils::HashMap}; use std::{ io::{self, ErrorKind}, net::{TcpStream, ToSocketAddrs}, @@ -115,6 +142,7 @@ fn receive_packets(world: &mut World) { loop { match receiver.try_recv() { Ok(packet) => { + println!("YESSSS"); packets.push((entity, ClientConnection(Arc::clone(&connection.0)), packet)); } Err(TryRecvError::Empty) => break, @@ -191,3 +219,4 @@ impl NetworkAppExt for App { self } } + */ diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..2bced84 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,127 @@ +use crate::{ + tcp::{Connection, Listener}, + Packet, +}; +use bevy::prelude::*; +use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc}; + +/// A function that handle a received [Packet] on the server. +pub type ServerPacketHandler = + Box, &mut World) + Send + Sync>; + +/// A Bevy resource that store the packets handlers for the server. +#[derive(Resource)] +pub struct ServerHandlerManager(Arc>); + +/// A Bevy resource that listens for incoming [ClientConnection]s. +#[derive(Resource)] +pub struct ClientListener(Listener); + +impl ClientListener { + /// Creates a new listener on the given address. + pub fn bind(addr: A) -> io::Result { + Ok(Self(Listener::bind(addr)?)) + } +} + +/// A connection to a remote client. +#[derive(Component)] +pub struct ClientConnection(Arc); + +impl ClientConnection { + /// Sends a packet through this connection. + pub fn send(&self, packet: P) { + let mut data = bincode::serialize(&packet).expect("Failed to serialize packet"); + data.extend(P::packet_id().to_be_bytes()); + self.0.send(data); + } +} + +/// A plugin that manage the network connections for a server. +pub struct ServerPlugin; + +impl ServerPlugin { + /// Accept new [ClientConnection]s. + pub fn accept_connections(mut commands: Commands, listener: Option>) { + if let Some(listener) = listener { + if let Some(connection) = listener.0.accept() { + commands.spawn(ClientConnection(Arc::new(connection))); + } + } + } + + /// Handles a received [Packet] on the server. + pub fn handle_packets(world: &mut World) { + // Get all received packets + let mut packets = Vec::new(); + for (entity, connection) in world.query::<(Entity, &ClientConnection)>().iter(world) { + while let Some(mut packet) = connection.0.recv() { + if packet.len() < 8 { + println!("Invalid packet received: {:?}", packet); + } else { + let id_buffer = packet.split_off(packet.len() - 8); + let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap()); + packets.push(( + entity, + ClientConnection(Arc::clone(&connection.0)), + packet_id, + packet, + )); + } + } + } + + // Get the packet handlers + let handlers = Arc::clone(&world.resource_mut::().0); + + // Handle all received packets + for (entity, connection, packet_id, packet) in packets { + if let Some(handler) = handlers.get(&packet_id) { + handler(entity, connection, packet, world); + } + } + } + + /// Remove disconnected [ClientConnection]s. + pub fn remove_disconnected( + mut commands: Commands, + connections: Query<(Entity, &ClientConnection)>, + ) { + for (entity, connection) in connections.iter() { + if connection.0.closed() { + commands.entity(entity).remove::(); + } + } + } +} + +impl Plugin for ServerPlugin { + fn build(&self, app: &mut App) { + app.insert_resource(ServerHandlerManager(Arc::new(HashMap::new()))); + app.add_system(ServerPlugin::accept_connections); + app.add_system(ServerPlugin::handle_packets); + app.add_system(ServerPlugin::remove_disconnected); + } +} + +/// An extension to add packet handlers. +pub trait ServerAppExt { + /// Add a new packet handler. + fn add_server_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(Entity, ClientConnection, Vec, &mut World) + Send + Sync + 'static; +} + +impl ServerAppExt for App { + fn add_server_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(Entity, ClientConnection, Vec, &mut World) + Send + Sync + 'static, + { + Arc::get_mut(&mut self.world.resource_mut::().0) + .unwrap() + .insert(P::packet_id(), Box::new(handler)); + self + } +} diff --git a/src/tcp.rs b/src/tcp.rs index 08af2a7..413f287 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -1,132 +1,130 @@ -use serde::{de::DeserializeOwned, Serialize}; use std::{ - collections::hash_map::DefaultHasher, - hash::{Hash, Hasher}, - io::{self, ErrorKind, Read, Write}, + io::{self, Read, Write}, net::{Shutdown, TcpListener, TcpStream, ToSocketAddrs}, sync::{ + atomic::{AtomicBool, Ordering}, mpsc::{channel, Receiver, Sender}, - Mutex, MutexGuard, + Arc, Mutex, }, thread, }; -/// A packet that can be sent over a [Connection]. -pub trait Packet: DeserializeOwned + Serialize + Send + Sync { - /// Returns a unique identifier for this packet. - /// - /// This function uses [std::any::type_name] to get a string - /// representation of the type of the object and returns the - /// hash of that string. This is not perfect... but I didn't - /// find a better solution. - fn packet_id() -> u64 { - let mut hasher = DefaultHasher::new(); - std::any::type_name::().hash(&mut hasher); - hasher.finish() - } -} - -impl Packet for T {} - -/// A raw packet. -pub struct RawPacket { - /// The identifier for this packet. - packet_id: u64, - - /// The serialized packet. - data: Vec, -} - -impl RawPacket { - /// Returns the identifier for this packet. - pub fn packet_id(&self) -> u64 { - self.packet_id - } - - /// Deserializes this packet to the given [Packet] type. - pub fn deserialize(&self) -> Option

{ - bincode::deserialize(&self.data).ok() - } -} - -/// A TCP connection that can send and receive [Packet]s. +/// A non-blocking TCP connection. pub struct Connection { - /// The [TcpStream] of the connection. - /// - /// It is used to send [Packet]s and to stop the receive - /// thread when the [Connection] is dropped. - stream: Mutex, + /// Track if the connection has been closed. + closed: Arc, - /// The [Receiver] of the received [RawPacket]s. - receiver: Mutex>, + /// The underlying TCP stream. + stream: TcpStream, + + /// Used to receive packets from the receiving thread. + receiver: Mutex>>, + + /// Used to send packets to the sending thread. + sender: Mutex>>, } impl Connection { - /// Creates a new TCP connection. - pub fn new(stream: TcpStream) -> io::Result { - let (sender, receiver) = channel(); + /// Creates a new connection. + fn new(stream: TcpStream) -> io::Result { + stream.set_nonblocking(false)?; + let closed = Arc::new(AtomicBool::new(false)); + + // Spawn the receiving thread let thread_stream = stream.try_clone()?; - thread::spawn(move || Self::receive_loop(thread_stream, sender)); + let (thread_sender, receiver) = channel(); + let thread_closed = Arc::clone(&closed); + thread::spawn(move || Self::receiving_loop(thread_stream, thread_sender, thread_closed)); + + // Spawn the sending thread + let thread_stream = stream.try_clone()?; + let (sender, thread_receiver) = channel(); + let thread_closed = Arc::clone(&closed); + thread::spawn(move || Self::sending_loop(thread_stream, thread_receiver, thread_closed)); + + // Return the connection Ok(Self { - stream: Mutex::new(stream), + closed, + stream, receiver: Mutex::new(receiver), + sender: Mutex::new(sender), }) } - /// The [Packet] receiving loop. - fn receive_loop(mut stream: TcpStream, sender: Sender) { + /// Creates a new connection to the given address. + pub fn connect(addr: A) -> io::Result { + Self::new(TcpStream::connect(addr)?) + } + + /// The receiving loop for this connection. + fn receiving_loop(mut stream: TcpStream, sender: Sender>, closed: Arc) { let mut len_buffer = [0; 4]; - let mut id_buffer = [0; 8]; loop { - // Read the length of the packet + // Read the length of the next packet if stream.read_exact(&mut len_buffer).is_err() { - return; + break; } - let packet_len = u32::from_le_bytes(len_buffer); + let len = u32::from_be_bytes(len_buffer); - // Read the packet identifier - if stream.read_exact(&mut id_buffer).is_err() { - return; - } - let packet_id = u64::from_le_bytes(id_buffer); - - // Read the packet data - let mut data = vec![0; packet_len as usize]; - if stream.read_exact(&mut data).is_err() { - return; + // Read the packet + let mut packet = vec![0; len as usize]; + if stream.read_exact(&mut packet).is_err() { + break; } - // Store the packet - if sender.send(RawPacket { packet_id, data }).is_err() { - return; + // Send the packet + if sender.send(packet).is_err() { + break; } } + closed.store(true, Ordering::Relaxed); } - /// Sends a [Packet] over this connection. - pub fn send(&self, packet: P) -> io::Result<()> { - let data = bincode::serialize(&packet).map_err(|e| io::Error::new(ErrorKind::Other, e))?; - let mut stream = self - .stream + /// The sending loop for this connection. + fn sending_loop(mut stream: TcpStream, receiver: Receiver>, closed: Arc) { + loop { + // Get the next packet to send + let packet = match receiver.recv() { + Ok(packet) => packet, + Err(_) => break, + }; + + // Send the length of the packet + let len_buffer = u32::to_be_bytes(packet.len() as u32); + if stream.write_all(&len_buffer).is_err() { + break; + } + + // Send the packet + if stream.write_all(&packet).is_err() { + break; + } + } + closed.store(true, Ordering::Relaxed); + } + + /// Returns `true` if the connection has been closed. + pub fn closed(&self) -> bool { + self.closed.load(Ordering::Relaxed) + } + + /// Returns the next received packet. + pub fn recv(&self) -> Option> { + self.receiver .lock() - .map_err(|_| io::Error::new(ErrorKind::Other, "Failed to lock stream"))?; - stream.write_all(&data.len().to_le_bytes())?; - stream.write_all(&P::packet_id().to_le_bytes())?; - stream.write_all(&data) + .ok() + .and_then(|receiver| receiver.try_recv().ok()) } - /// Gets the [RawPacket] receiver of this connection. - pub fn recv(&self) -> Option>> { - self.receiver.lock().ok() + /// Sends a packet through this connection. + pub fn send(&self, packet: Vec) { + self.sender.lock().map(|sender| sender.send(packet)).ok(); } } impl Drop for Connection { fn drop(&mut self) { - self.stream - .lock() - .map(|stream| stream.shutdown(Shutdown::Both)) - .ok(); + self.stream.shutdown(Shutdown::Both).ok(); } } @@ -145,9 +143,10 @@ impl Listener { } /// Accepts a new [Connection]. - pub fn accept(&self) -> io::Result { + pub fn accept(&self) -> Option { self.listener .accept() .and_then(|(stream, _)| Connection::new(stream)) + .ok() } } From 39db941d52b0c00c7cead98a1dc853e367c00eae Mon Sep 17 00:00:00 2001 From: Tipragot Date: Fri, 28 Apr 2023 21:24:45 +0200 Subject: [PATCH 4/4] Sync system --- examples/ping_pong.rs | 67 -------------- src/{client.rs => client/mod.rs} | 14 +-- src/client/sync.rs | 129 ++++++++++++++++++++++++++ src/{server.rs => server/mod.rs} | 10 +- src/server/sync.rs | 154 +++++++++++++++++++++++++++++++ 5 files changed, 296 insertions(+), 78 deletions(-) delete mode 100644 examples/ping_pong.rs rename src/{client.rs => client/mod.rs} (91%) create mode 100644 src/client/sync.rs rename src/{server.rs => server/mod.rs} (93%) create mode 100644 src/server/sync.rs diff --git a/examples/ping_pong.rs b/examples/ping_pong.rs deleted file mode 100644 index 4a666b7..0000000 --- a/examples/ping_pong.rs +++ /dev/null @@ -1,67 +0,0 @@ -use bevnet::{ - client::{ClientAppExt, ClientPlugin, ServerConnection}, - server::{ClientListener, ServerAppExt, ServerPlugin}, -}; -use bevy::prelude::*; -use serde::{Deserialize, Serialize}; - -#[derive(Serialize, Deserialize)] -struct Ping; - -#[derive(Serialize, Deserialize)] -struct Pong; - -fn start_server(mut commands: Commands, keys: Res>) { - if keys.just_pressed(KeyCode::B) { - println!("Starting server..."); - match ClientListener::bind("127.0.0.1:8000") { - Ok(listener) => { - commands.insert_resource(listener); - println!("Server started"); - } - Err(e) => println!("Failed to start server: {}", e), - } - } -} - -fn connect(mut commands: Commands, keys: Res>) { - if keys.just_pressed(KeyCode::C) { - println!("Connecting to server..."); - match ServerConnection::connect("127.0.0.1:8000") { - Ok(connection) => { - commands.insert_resource(connection); - println!("Connected to server"); - } - Err(e) => println!("Failed to connect: {}", e), - } - } -} - -fn send_ping(connection: Option>, keys: Res>) { - if keys.just_pressed(KeyCode::S) { - println!("Sending ping..."); - if let Some(connection) = connection { - connection.send(Ping); - println!("Ping sent"); - } - } -} - -fn main() { - App::new() - .add_plugins(DefaultPlugins) - .add_plugin(ServerPlugin) - .add_system(start_server) - .add_server_packet_handler::(|entity, connection, _, _| { - println!("Received ping from {:?}", entity); - connection.send(Pong); - println!("Sent pong to {:?}", entity); - }) - .add_plugin(ClientPlugin) - .add_system(connect) - .add_client_packet_handler::(|_, _| { - println!("Received pong"); - }) - .add_system(send_ping) - .run(); -} diff --git a/src/client.rs b/src/client/mod.rs similarity index 91% rename from src/client.rs rename to src/client/mod.rs index d368e0f..7a3214a 100644 --- a/src/client.rs +++ b/src/client/mod.rs @@ -1,8 +1,8 @@ +use crate::{tcp::Connection, Packet}; +use bevy::prelude::*; use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc}; -use bevy::prelude::*; - -use crate::{tcp::Connection, Packet}; +pub mod sync; /// A function that handle a received [Packet] on the client. pub type ClientPacketHandler = Box, &mut World) + Send + Sync>; @@ -22,8 +22,8 @@ impl ServerConnection { } /// Sends a packet through this connection. - pub fn send(&self, packet: P) { - let mut data = bincode::serialize(&packet).expect("Failed to serialize packet"); + pub fn send(&self, packet: &P) { + let mut data = bincode::serialize(packet).expect("Failed to serialize packet"); data.extend(P::packet_id().to_be_bytes()); self.0.send(data); } @@ -83,14 +83,14 @@ impl Plugin for ClientPlugin { /// An extension to add packet handlers. pub trait ClientAppExt { /// Add a new packet handler. - fn add_client_packet_handler(&mut self, handler: H) -> &mut Self + fn add_packet_handler(&mut self, handler: H) -> &mut Self where P: Packet, H: Fn(Vec, &mut World) + Send + Sync + 'static; } impl ClientAppExt for App { - fn add_client_packet_handler(&mut self, handler: H) -> &mut Self + fn add_packet_handler(&mut self, handler: H) -> &mut Self where P: Packet, H: Fn(Vec, &mut World) + Send + Sync + 'static, diff --git a/src/client/sync.rs b/src/client/sync.rs new file mode 100644 index 0000000..fa1b195 --- /dev/null +++ b/src/client/sync.rs @@ -0,0 +1,129 @@ +use std::{any::type_name, marker::PhantomData}; + +use super::{ClientAppExt, ServerConnection}; +use crate::Packet; +use bevy::prelude::*; +use serde::{de::DeserializeOwned, Serialize}; + +/// An event that comes from the server. +#[derive(Deref)] +pub struct FromServer { + /// The event. + pub event: E, +} + +/// Mark an [Entity] as synced by the server. +#[derive(Component)] +pub struct ServerEntity(Entity); + +/// A plugin for the syncronization system. +pub struct ClientSyncPlugin; + +impl ClientSyncPlugin { + /// Removes [ServerEntity] when disconnected. + fn remove_synced(mut commands: Commands, entities: Query>) { + for entity in entities.iter() { + commands.entity(entity).despawn(); + } + } +} + +impl Plugin for ClientSyncPlugin { + fn build(&self, app: &mut App) { + app.add_packet_handler::(|data, world| { + match bincode::deserialize::(&data) { + Ok(entity) => { + if let Some((local_entity, _)) = world + .query::<(Entity, &ServerEntity)>() + .iter(world) + .find(|(_, server_entity)| server_entity.0 == entity) + { + println!("Despawning {:?}", local_entity); + world.despawn(local_entity); + } else { + println!("Spawning {:?}", entity); + world.spawn(ServerEntity(entity)); + } + } + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + } + }); + app.add_system(Self::remove_synced.run_if(resource_removed::())); + } +} + +/// An extention to add syncronization to the client. +pub trait ClientSyncExt { + /// Register syncronization for an [Event] that comes from the server. + fn server_event_sync(&mut self) -> &mut Self; + + /// Register syncronization for an [Event] that can be sent by the client. + fn client_event_sync(&mut self) -> &mut Self; + + /// Register a [Component] to be synced. + fn sync_component(&mut self) -> &mut Self; +} + +impl ClientSyncExt for App { + fn server_event_sync(&mut self) -> &mut Self { + self.add_event::>() + .add_packet_handler::(|data, world| match bincode::deserialize::(&data) { + Ok(event) => world.send_event(FromServer { event }), + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + }) + } + + fn client_event_sync(&mut self) -> &mut Self { + self.add_event::().add_system( + |mut events: EventReader, connection: Option>| { + if let Some(connection) = connection { + for event in events.iter() { + connection.send(event); + } + } + }, + ) + } + + fn sync_component(&mut self) -> &mut Self { + self.add_packet_handler::<(Entity, C), _>(|data, world| { + match bincode::deserialize::<(Entity, C)>(&data) { + Ok((entity, component)) => { + if let Some((local_entity, _)) = world + .query::<(Entity, &ServerEntity)>() + .iter(world) + .find(|(_, server_entity)| server_entity.0 == entity) + { + let mut local_entity = world.entity_mut(local_entity); + match local_entity.get_mut::() { + Some(mut local_component) => { + println!("CA CHANGE: {}", type_name::()); + *local_component = component; + } + None => { + local_entity.insert(component); + } + } + } else { + println!("Received component for unknown entity: {:?}", entity); + } + } + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + } + }) + .add_packet_handler::<(Entity, PhantomData), _>( + |data, world| match bincode::deserialize::<(Entity, PhantomData)>(&data) { + Ok((entity, _)) => { + if let Some((local_entity, _)) = world + .query::<(Entity, &ServerEntity)>() + .iter(world) + .find(|(_, server_entity)| server_entity.0 == entity) + { + world.entity_mut(local_entity).remove::(); + } + } + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + }, + ) + } +} diff --git a/src/server.rs b/src/server/mod.rs similarity index 93% rename from src/server.rs rename to src/server/mod.rs index 2bced84..9a31ad3 100644 --- a/src/server.rs +++ b/src/server/mod.rs @@ -5,6 +5,8 @@ use crate::{ use bevy::prelude::*; use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc}; +pub mod sync; + /// A function that handle a received [Packet] on the server. pub type ServerPacketHandler = Box, &mut World) + Send + Sync>; @@ -30,8 +32,8 @@ pub struct ClientConnection(Arc); impl ClientConnection { /// Sends a packet through this connection. - pub fn send(&self, packet: P) { - let mut data = bincode::serialize(&packet).expect("Failed to serialize packet"); + pub fn send(&self, packet: &P) { + let mut data = bincode::serialize(packet).expect("Failed to serialize packet"); data.extend(P::packet_id().to_be_bytes()); self.0.send(data); } @@ -107,14 +109,14 @@ impl Plugin for ServerPlugin { /// An extension to add packet handlers. pub trait ServerAppExt { /// Add a new packet handler. - fn add_server_packet_handler(&mut self, handler: H) -> &mut Self + fn add_packet_handler(&mut self, handler: H) -> &mut Self where P: Packet, H: Fn(Entity, ClientConnection, Vec, &mut World) + Send + Sync + 'static; } impl ServerAppExt for App { - fn add_server_packet_handler(&mut self, handler: H) -> &mut Self + fn add_packet_handler(&mut self, handler: H) -> &mut Self where P: Packet, H: Fn(Entity, ClientConnection, Vec, &mut World) + Send + Sync + 'static, diff --git a/src/server/sync.rs b/src/server/sync.rs new file mode 100644 index 0000000..ad2e7b7 --- /dev/null +++ b/src/server/sync.rs @@ -0,0 +1,154 @@ +use super::ServerAppExt; +use crate::{server::ClientConnection, Packet}; +use bevy::prelude::*; +use serde::{de::DeserializeOwned, Serialize}; +use std::{any::type_name, marker::PhantomData, ops::Deref}; + +/// An event that comes from a client. +pub struct FromClient { + /// The entity of the [ClientConnection] that sent the event. + pub entity: Entity, + + /// The [ClientConnection] that sent the event. + pub connection: ClientConnection, + + /// The event. + pub event: E, +} + +impl Deref for FromClient { + type Target = E; + + fn deref(&self) -> &Self::Target { + &self.event + } +} + +/// Mark an [Entity] to be synced. +#[derive(Component)] +pub struct Synced; + +/// A plugin for the syncronization system. +pub struct ServerSyncPlugin; + +impl ServerSyncPlugin { + /// Send to clients the [Synced] entity that has been added to the server. + fn send_added( + added_entities: Query>, + connections: Query<&ClientConnection>, + ) { + for entity in added_entities.iter() { + for connection in connections.iter() { + connection.send(&entity); + } + } + } + + /// Send [Synced] entities to new clients. + fn send_synced( + synced_entities: Query>, + new_connections: Query<&ClientConnection, Added>, + ) { + for entity in synced_entities.iter() { + for connection in new_connections.iter() { + connection.send(&entity); + } + } + } + + /// Send to clients the [Synced] entity that has been removed. + fn send_removed( + mut removed_entities: RemovedComponents, + connections: Query<&ClientConnection>, + ) { + for entity in removed_entities.iter() { + for connection in connections.iter() { + connection.send(&entity); + } + } + } +} + +impl Plugin for ServerSyncPlugin { + fn build(&self, app: &mut App) { + app.add_system(Self::send_added); + app.add_system(Self::send_synced); + app.add_system(Self::send_removed); + } +} + +/// An extention to add syncronization to the server. +pub trait ServerSyncExt { + /// Register syncronization for an [Event] that can be sent by the server. + fn server_event_sync(&mut self) -> &mut Self; + + /// Register syncronization for an [Event] that comes from the client. + fn client_event_sync(&mut self) -> &mut Self; + + /// Register a [Component] to be synced. + fn sync_component(&mut self) -> &mut Self; +} + +impl ServerSyncExt for App { + fn server_event_sync(&mut self) -> &mut Self { + self.add_event::().add_system( + |mut events: EventReader, connections: Query<&ClientConnection>| { + for event in events.iter() { + for connection in connections.iter() { + connection.send(event); + } + } + }, + ) + } + + fn client_event_sync(&mut self) -> &mut Self { + self.add_event::>() + .add_packet_handler::( + |entity, connection, data, world| match bincode::deserialize::(&data) { + Ok(event) => world.send_event(FromClient { + entity, + connection, + event, + }), + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + }, + ) + } + + fn sync_component(&mut self) -> &mut Self { + let update_components = + |changed_components: Query<(Entity, &C), (Changed, With)>, + connections: Query<&ClientConnection>| { + for (entity, component) in changed_components.iter() { + for connection in connections.iter() { + connection.send(&(entity, component.clone())); + } + } + }; + let send_components = + |components: Query<(Entity, &C), With>, + new_connections: Query<&ClientConnection, Added>| { + for (entity, component) in components.iter() { + for connection in new_connections.iter() { + connection.send(&(entity, component.clone())); + } + } + }; + let remove_components = + |mut removed_components: RemovedComponents, + synced_entities: Query>, + connections: Query<&ClientConnection>| { + for entity in removed_components.iter() { + if synced_entities.contains(entity) { + for connection in connections.iter() { + connection.send(&(entity, PhantomData::)); + } + } + } + }; + self.add_system(update_components.after(ServerSyncPlugin::send_added)) + .add_system(send_components.after(ServerSyncPlugin::send_synced)) + .add_system(remove_components.after(update_components)) + } +}