From 3aa5856753b6be3a1be0ec6befc45753afea0796 Mon Sep 17 00:00:00 2001 From: Tipragot Date: Sat, 29 Apr 2023 00:54:03 +0200 Subject: [PATCH 1/7] Remove useless dependencies --- Cargo.toml | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 745162b..9128e5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,8 +10,6 @@ categories = ["network-programming", "game-development"] repository = "https://git.tipragot.fr/tipragot/bevnet" [dependencies] -bevy = "0.10.1" -bincode = "1.3.3" -const-fnv1a-hash = "1.1.0" -dashmap = "5.4.0" serde = { version = "1.0.160", features = ["derive"] } +bincode = "1.3.3" +bevy = "0.10.1" From e109b67cfb71b25229d245b23c0b25c7e55bf30c Mon Sep 17 00:00:00 2001 From: Tipragot Date: Sat, 29 Apr 2023 00:54:35 +0200 Subject: [PATCH 2/7] Remove commented code --- src/lib.rs | 196 ----------------------------------------------------- 1 file changed, 196 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index bbfe983..817daed 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,199 +24,3 @@ pub trait Packet: DeserializeOwned + Serialize + Send + Sync { } impl Packet for T {} - -/* use bevy::{prelude::*, utils::HashMap}; -use std::{ - io::{self, ErrorKind}, - net::{TcpStream, ToSocketAddrs}, - sync::{mpsc::TryRecvError, Arc}, -}; -use tcp::{Connection, Listener, Packet, RawPacket}; - -mod tcp; - -/// A Bevy resource that store the packets handlers for the client. -#[derive(Resource)] -pub struct ClientPacketHandler(Arc>>); - -/// A connection to a remote server. -#[derive(Resource)] -pub struct ServerConnection(Connection); - -impl ServerConnection { - /// Connects to a remote server. - pub fn connect(addr: A) -> io::Result { - Ok(Self(Connection::new(TcpStream::connect(addr)?)?)) - } - - /// Send a [Packet] to the remote server. - pub fn send(&self, packet: P) { - self.0.send(packet).ok(); - } -} - -/// A Bevy resource that store the packets handlers for the server. -#[derive(Resource)] -struct ServerPacketHandler( - Arc>>, -); - -/// A [ClientConnection] listener. -#[derive(Resource)] -pub struct ClientListener(Listener); - -impl ClientListener { - /// Creates a new TCP listener on the given address. - pub fn bind(addr: A) -> io::Result { - Ok(Self(Listener::bind(addr)?)) - } -} - -/// A connection to a remote client. -#[derive(Component)] -pub struct ClientConnection(Arc); - -impl ClientConnection { - /// Sends a [Packet] to the remote client. - pub fn send(&self, packet: P) { - self.0.send(packet).ok(); - } -} - -/// A Bevy system to handle incoming [ClientConnection]s and remove the -/// [ClientListener] resource if it is no longer listening. -fn accept_connections(mut commands: Commands, listener: Option>) { - if let Some(listener) = listener { - match listener.0.accept() { - Ok(connection) => { - commands.spawn(ClientConnection(Arc::new(connection))); - } - Err(error) => match error.kind() { - ErrorKind::WouldBlock => {} - _ => commands.remove_resource::(), - }, - } - } -} - -/// A Bevy system to handle incoming [Packet]s from -/// the [ClientConnection]s and the [ServerConnection]. -/// It removes them if they are no longer connected. -fn receive_packets(world: &mut World) { - // Handle client packets - let mut packets = Vec::new(); - let mut to_remove = false; - if let Some(connection) = world.get_resource::() { - if let Some(receiver) = connection.0.recv() { - loop { - match receiver.try_recv() { - Ok(packet) => packets.push(packet), - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => { - to_remove = true; - break; - } - } - } - } - } - if let Some(handlers) = world - .get_resource_mut::() - .map(|handlers| Arc::clone(&handlers.0)) - { - for packet in packets.into_iter() { - if let Some(handler) = handlers.get(&packet.packet_id()) { - (handler)(packet, world); - } - } - } - if to_remove { - world.remove_resource::(); - } - - // Handle server packets - let mut packets = Vec::new(); - let mut to_remove = Vec::new(); - for (entity, connection) in world.query::<(Entity, &ClientConnection)>().iter(world) { - if let Some(receiver) = connection.0.recv() { - loop { - match receiver.try_recv() { - Ok(packet) => { - println!("YESSSS"); - packets.push((entity, ClientConnection(Arc::clone(&connection.0)), packet)); - } - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => { - to_remove.push(entity); - break; - } - } - } - } - } - if let Some(handlers) = world - .get_resource_mut::() - .map(|handlers| Arc::clone(&handlers.0)) - { - for (entity, connection, packet) in packets.into_iter() { - if let Some(handler) = handlers.get(&packet.packet_id()) { - (handler)(entity, connection, packet, world); - } - } - } - for entity in to_remove { - world.despawn(entity); - } -} - -/// A network plugin. -pub struct NetworkPlugin; - -impl Plugin for NetworkPlugin { - fn build(&self, app: &mut App) { - app.insert_resource(ServerPacketHandler(Arc::new(HashMap::new()))); - app.insert_resource(ClientPacketHandler(Arc::new(HashMap::new()))); - app.add_system(accept_connections); - app.add_system(receive_packets); - } -} - -/// A extension trait to add packet handler. -pub trait NetworkAppExt { - /// Add a client packet handler. - fn add_client_packet_handler(&mut self, handler: H) -> &mut Self - where - P: Packet, - H: Fn(RawPacket, &mut World) + Send + Sync + 'static; - - /// Add a server packet handler. - fn add_server_packet_handler(&mut self, handler: H) -> &mut Self - where - P: Packet, - H: Fn(Entity, ClientConnection, RawPacket, &mut World) + Send + Sync + 'static; -} - -impl NetworkAppExt for App { - fn add_client_packet_handler(&mut self, handler: H) -> &mut Self - where - P: Packet, - H: Fn(RawPacket, &mut World) + Send + Sync + 'static, - { - Arc::get_mut(&mut self.world.resource_mut::().0) - .unwrap() - .insert(P::packet_id(), Box::new(handler)); - self - } - - fn add_server_packet_handler(&mut self, handler: H) -> &mut Self - where - P: Packet, - H: Fn(Entity, ClientConnection, RawPacket, &mut World) + Send + Sync + 'static, - { - Arc::get_mut(&mut self.world.resource_mut::().0) - .unwrap() - .insert(P::packet_id(), Box::new(handler)); - self - } -} - */ From a69973b3fae36ac9420367451ffa2dfdc27c68f9 Mon Sep 17 00:00:00 2001 From: Tipragot Date: Sat, 29 Apr 2023 01:29:58 +0200 Subject: [PATCH 3/7] Put base system in the same file using features --- Cargo.toml | 6 ++ src/lib.rs | 206 ++++++++++++++++++++++++++++++++++++++++++++++++++++- src/tcp.rs | 16 ++--- 3 files changed, 218 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9128e5b..ff61668 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,3 +13,9 @@ repository = "https://git.tipragot.fr/tipragot/bevnet" serde = { version = "1.0.160", features = ["derive"] } bincode = "1.3.3" bevy = "0.10.1" + +[features] +default = ["server", "sync"] +server = [] +client = [] +sync = [] diff --git a/src/lib.rs b/src/lib.rs index 817daed..51a7a32 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,11 +1,11 @@ +use bevy::prelude::*; use serde::{de::DeserializeOwned, Serialize}; use std::{ collections::hash_map::DefaultHasher, hash::{Hash, Hasher}, }; +use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc}; -pub mod client; -pub mod server; mod tcp; /// A packet that can be sent over a [Connection]. @@ -24,3 +24,205 @@ pub trait Packet: DeserializeOwned + Serialize + Send + Sync { } impl Packet for T {} + +#[cfg(feature = "server")] +/// A function that handle a received [Packet]s. +pub type PacketHandler = Box, &mut World) + Send + Sync>; + +#[cfg(feature = "client")] +/// A function that handle a received [Packet]s. +pub type PacketHandler = Box, &mut World) + Send + Sync>; + +/// A Bevy resource that store the packets handlers. +#[derive(Resource)] +struct HandlerManager(Arc>); + +#[cfg(feature = "server")] +/// A Bevy resource that listens for incoming [Connection]s. +#[derive(Resource)] +pub struct Listener(tcp::Listener); + +#[cfg(feature = "server")] +impl Listener { + /// Creates a new listener on the given address. + pub fn bind(addr: A) -> io::Result { + Ok(Self(tcp::Listener::bind(addr)?)) + } +} + +#[cfg(feature = "server")] +/// A connection to a remote client. +#[derive(Component)] +pub struct Connection(Arc); + +#[cfg(feature = "client")] +/// A connection to a remote server. +#[derive(Resource)] +pub struct Connection(tcp::Connection); + +impl Connection { + #[cfg(feature = "client")] + /// Connects to a remote server. + pub fn connect(addr: A) -> io::Result { + Ok(Self(tcp::Connection::connect(addr)?)) + } + + /// Sends a packet through this connection. + pub fn send(&self, packet: &P) { + let mut data = bincode::serialize(packet).expect("Failed to serialize packet"); + data.extend(P::packet_id().to_be_bytes()); + self.0.send(data); + } +} + +/// A plugin that manage the network connections. +pub struct NetworkPlugin; + +impl NetworkPlugin { + #[cfg(feature = "server")] + /// Accept new [Connection]s. + fn accept_connections(mut commands: Commands, listener: Option>) { + if let Some(listener) = listener { + if let Some(connection) = listener.0.accept() { + commands.spawn(Connection(Arc::new(connection))); + } + } + } + + #[cfg(feature = "server")] + /// Handles a received [Packet]s. + fn handle_packets(world: &mut World) { + // Get all received packets + let mut packets = Vec::new(); + for (entity, connection) in world.query::<(Entity, &Connection)>().iter(world) { + while let Some(mut packet) = connection.0.recv() { + if packet.len() < 8 { + println!("Invalid packet received: {:?}", packet); + } else { + let id_buffer = packet.split_off(packet.len() - 8); + let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap()); + packets.push(( + entity, + Connection(Arc::clone(&connection.0)), + packet_id, + packet, + )); + } + } + } + + // Get the packet handlers + let handlers = Arc::clone(&world.resource_mut::().0); + + // Handle all received packets + for (entity, connection, packet_id, packet) in packets { + if let Some(handler) = handlers.get(&packet_id) { + handler(entity, connection, packet, world); + } + } + } + + #[cfg(feature = "client")] + /// Handles a received [Packet] on the server. + pub fn handle_packets(world: &mut World) { + // Get all received packets + let mut packets = Vec::new(); + if let Some(connection) = world.get_resource::() { + while let Some(mut packet) = connection.0.recv() { + if packet.len() < 8 { + println!("Invalid packet received: {:?}", packet); + } else { + let id_buffer = packet.split_off(packet.len() - 8); + let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap()); + packets.push((packet_id, packet)); + } + } + } else { + return; + } + + // Get the packet handlers + let handlers = Arc::clone(&world.resource_mut::().0); + + // Handle all received packets + for (packet_id, packet) in packets { + if let Some(handler) = handlers.get(&packet_id) { + handler(packet, world); + } + } + } + + #[cfg(feature = "server")] + /// Remove disconnected [Connection]s. + fn remove_disconnected(mut commands: Commands, connections: Query<(Entity, &Connection)>) { + for (entity, connection) in connections.iter() { + if connection.0.closed() { + commands.entity(entity).remove::(); + } + } + } + + #[cfg(feature = "client")] + /// Remove [ServerConnection] if it's disconnected. + pub fn remove_disconnected(mut commands: Commands, connection: Option>) { + if let Some(connection) = connection { + if connection.0.closed() { + commands.remove_resource::(); + } + } + } +} + +impl Plugin for NetworkPlugin { + fn build(&self, app: &mut App) { + app.insert_resource(HandlerManager(Arc::new(HashMap::new()))); + app.add_system(NetworkPlugin::handle_packets); + app.add_system(NetworkPlugin::remove_disconnected); + + #[cfg(feature = "server")] + app.add_system(NetworkPlugin::accept_connections); + } +} + +/// An extension to add packet handlers. +pub trait NetworkExt { + #[cfg(feature = "server")] + /// Add a new packet handler. + fn add_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(Entity, Connection, Vec, &mut World) + Send + Sync + 'static; + + #[cfg(feature = "client")] + /// Add a new packet handler. + fn add_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(Vec, &mut World) + Send + Sync + 'static; +} + +impl NetworkExt for App { + #[cfg(feature = "server")] + fn add_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(Entity, Connection, Vec, &mut World) + Send + Sync + 'static, + { + Arc::get_mut(&mut self.world.resource_mut::().0) + .unwrap() + .insert(P::packet_id(), Box::new(handler)); + self + } + + #[cfg(feature = "client")] + fn add_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(Vec, &mut World) + Send + Sync + 'static, + { + Arc::get_mut(&mut self.world.resource_mut::().0) + .unwrap() + .insert(P::packet_id(), Box::new(handler)); + self + } +} diff --git a/src/tcp.rs b/src/tcp.rs index 413f287..d6611bf 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -1,6 +1,6 @@ use std::{ io::{self, Read, Write}, - net::{Shutdown, TcpListener, TcpStream, ToSocketAddrs}, + net::{Shutdown, TcpStream, ToSocketAddrs}, sync::{ atomic::{AtomicBool, Ordering}, mpsc::{channel, Receiver, Sender}, @@ -51,6 +51,7 @@ impl Connection { }) } + #[cfg(feature = "client")] /// Creates a new connection to the given address. pub fn connect(addr: A) -> io::Result { Self::new(TcpStream::connect(addr)?) @@ -82,13 +83,7 @@ impl Connection { /// The sending loop for this connection. fn sending_loop(mut stream: TcpStream, receiver: Receiver>, closed: Arc) { - loop { - // Get the next packet to send - let packet = match receiver.recv() { - Ok(packet) => packet, - Err(_) => break, - }; - + while let Ok(packet) = receiver.recv() { // Send the length of the packet let len_buffer = u32::to_be_bytes(packet.len() as u32); if stream.write_all(&len_buffer).is_err() { @@ -128,12 +123,17 @@ impl Drop for Connection { } } +#[cfg(feature = "server")] +use std::net::TcpListener; + +#[cfg(feature = "server")] /// A [Connection] listener. pub struct Listener { /// The [TcpListener] of the listener. listener: TcpListener, } +#[cfg(feature = "server")] impl Listener { /// Creates a new TCP listener on the given address. pub fn bind(addr: A) -> io::Result { From f849aab7d62be5213f61f2f95596e429933d53a0 Mon Sep 17 00:00:00 2001 From: Tipragot Date: Sat, 29 Apr 2023 14:23:24 +0200 Subject: [PATCH 4/7] Add sync and refactor --- src/client/mod.rs | 103 --------------- src/client/sync.rs | 127 ------------------ src/lib.rs | 319 +++++++++++++++++++++++++++++++++++++++------ src/server/mod.rs | 129 ------------------ src/server/sync.rs | 154 ---------------------- 5 files changed, 281 insertions(+), 551 deletions(-) delete mode 100644 src/client/mod.rs delete mode 100644 src/client/sync.rs delete mode 100644 src/server/mod.rs delete mode 100644 src/server/sync.rs diff --git a/src/client/mod.rs b/src/client/mod.rs deleted file mode 100644 index 7a3214a..0000000 --- a/src/client/mod.rs +++ /dev/null @@ -1,103 +0,0 @@ -use crate::{tcp::Connection, Packet}; -use bevy::prelude::*; -use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc}; - -pub mod sync; - -/// A function that handle a received [Packet] on the client. -pub type ClientPacketHandler = Box, &mut World) + Send + Sync>; - -/// A Bevy resource that store the packets handlers for the client. -#[derive(Resource)] -pub struct ClientHandlerManager(Arc>); - -/// A connection to a remote server. -#[derive(Resource)] -pub struct ServerConnection(Connection); - -impl ServerConnection { - /// Connects to a remote server. - pub fn connect(addr: A) -> io::Result { - Ok(Self(Connection::connect(addr)?)) - } - - /// Sends a packet through this connection. - pub fn send(&self, packet: &P) { - let mut data = bincode::serialize(packet).expect("Failed to serialize packet"); - data.extend(P::packet_id().to_be_bytes()); - self.0.send(data); - } -} - -/// A plugin that manage the network connections for a server. -pub struct ClientPlugin; - -impl ClientPlugin { - /// Handles a received [Packet] on the server. - pub fn handle_packets(world: &mut World) { - // Get all received packets - let mut packets = Vec::new(); - if let Some(connection) = world.get_resource::() { - while let Some(mut packet) = connection.0.recv() { - if packet.len() < 8 { - println!("Invalid packet received: {:?}", packet); - } else { - let id_buffer = packet.split_off(packet.len() - 8); - let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap()); - packets.push((packet_id, packet)); - } - } - } else { - return; - } - - // Get the packet handlers - let handlers = Arc::clone(&world.resource_mut::().0); - - // Handle all received packets - for (packet_id, packet) in packets { - if let Some(handler) = handlers.get(&packet_id) { - handler(packet, world); - } - } - } - - /// Remove [ServerConnection] if it's disconnected. - pub fn remove_disconnected(mut commands: Commands, connection: Option>) { - if let Some(connection) = connection { - if connection.0.closed() { - commands.remove_resource::(); - } - } - } -} - -impl Plugin for ClientPlugin { - fn build(&self, app: &mut App) { - app.insert_resource(ClientHandlerManager(Arc::new(HashMap::new()))); - app.add_system(ClientPlugin::handle_packets); - app.add_system(ClientPlugin::remove_disconnected); - } -} - -/// An extension to add packet handlers. -pub trait ClientAppExt { - /// Add a new packet handler. - fn add_packet_handler(&mut self, handler: H) -> &mut Self - where - P: Packet, - H: Fn(Vec, &mut World) + Send + Sync + 'static; -} - -impl ClientAppExt for App { - fn add_packet_handler(&mut self, handler: H) -> &mut Self - where - P: Packet, - H: Fn(Vec, &mut World) + Send + Sync + 'static, - { - Arc::get_mut(&mut self.world.resource_mut::().0) - .unwrap() - .insert(P::packet_id(), Box::new(handler)); - self - } -} diff --git a/src/client/sync.rs b/src/client/sync.rs deleted file mode 100644 index f427c86..0000000 --- a/src/client/sync.rs +++ /dev/null @@ -1,127 +0,0 @@ -use std::{any::type_name, marker::PhantomData}; - -use super::{ClientAppExt, ServerConnection}; -use crate::Packet; -use bevy::prelude::*; -use serde::{de::DeserializeOwned, Serialize}; - -/// An event that comes from the server. -#[derive(Deref)] -pub struct FromServer { - /// The event. - pub event: E, -} - -/// Mark an [Entity] as synced by the server. -#[derive(Component)] -pub struct ServerEntity(Entity); - -/// A plugin for the syncronization system. -pub struct ClientSyncPlugin; - -impl ClientSyncPlugin { - /// Removes [ServerEntity] when disconnected. - fn remove_synced(mut commands: Commands, entities: Query>) { - for entity in entities.iter() { - commands.entity(entity).despawn(); - } - } -} - -impl Plugin for ClientSyncPlugin { - fn build(&self, app: &mut App) { - app.add_packet_handler::(|data, world| { - match bincode::deserialize::(&data) { - Ok(entity) => { - if let Some((local_entity, _)) = world - .query::<(Entity, &ServerEntity)>() - .iter(world) - .find(|(_, server_entity)| server_entity.0 == entity) - { - world.despawn(local_entity); - } else { - world.spawn(ServerEntity(entity)); - } - } - Err(_) => println!("Failed to deserialize packet: {}", type_name::()), - } - }); - app.add_system(Self::remove_synced.run_if(resource_removed::())); - } -} - -/// An extention to add syncronization to the client. -pub trait ClientSyncExt { - /// Register syncronization for an [Event] that comes from the server. - fn server_event_sync(&mut self) -> &mut Self; - - /// Register syncronization for an [Event] that can be sent by the client. - fn client_event_sync(&mut self) -> &mut Self; - - /// Register a [Component] to be synced. - fn sync_component(&mut self) -> &mut Self; -} - -impl ClientSyncExt for App { - fn server_event_sync(&mut self) -> &mut Self { - self.add_event::>() - .add_packet_handler::(|data, world| match bincode::deserialize::(&data) { - Ok(event) => world.send_event(FromServer { event }), - Err(_) => println!("Failed to deserialize packet: {}", type_name::()), - }) - } - - fn client_event_sync(&mut self) -> &mut Self { - self.add_event::().add_system( - |mut events: EventReader, connection: Option>| { - if let Some(connection) = connection { - for event in events.iter() { - connection.send(event); - } - } - }, - ) - } - - fn sync_component(&mut self) -> &mut Self { - self.add_packet_handler::<(Entity, C), _>(|data, world| { - match bincode::deserialize::<(Entity, C)>(&data) { - Ok((entity, component)) => { - if let Some((local_entity, _)) = world - .query::<(Entity, &ServerEntity)>() - .iter(world) - .find(|(_, server_entity)| server_entity.0 == entity) - { - let mut local_entity = world.entity_mut(local_entity); - match local_entity.get_mut::() { - Some(mut local_component) => { - println!("CA CHANGE: {}", type_name::()); - *local_component = component; - } - None => { - local_entity.insert(component); - } - } - } else { - println!("Received component for unknown entity: {:?}", entity); - } - } - Err(_) => println!("Failed to deserialize packet: {}", type_name::()), - } - }) - .add_packet_handler::<(Entity, PhantomData), _>( - |data, world| match bincode::deserialize::<(Entity, PhantomData)>(&data) { - Ok((entity, _)) => { - if let Some((local_entity, _)) = world - .query::<(Entity, &ServerEntity)>() - .iter(world) - .find(|(_, server_entity)| server_entity.0 == entity) - { - world.entity_mut(local_entity).remove::(); - } - } - Err(_) => println!("Failed to deserialize packet: {}", type_name::()), - }, - ) - } -} diff --git a/src/lib.rs b/src/lib.rs index 51a7a32..6ed0547 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,12 @@ use std::{ }; use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc}; +#[cfg(feature = "sync")] +use std::{any::type_name, marker::PhantomData}; + +#[cfg(all(feature = "sync", feature = "server"))] +use std::ops::Deref; + mod tcp; /// A packet that can be sent over a [Connection]. @@ -26,12 +32,24 @@ pub trait Packet: DeserializeOwned + Serialize + Send + Sync { impl Packet for T {} #[cfg(feature = "server")] -/// A function that handle a received [Packet]s. -pub type PacketHandler = Box, &mut World) + Send + Sync>; +/// A trait for a handler function. +pub trait PacketHandlerFn: + Fn(Entity, Connection, Vec, &mut World) + Send + Sync + 'static +{ +} + +#[cfg(feature = "server")] +impl, &mut World) + Send + Sync + 'static> PacketHandlerFn for T {} #[cfg(feature = "client")] +/// A trait for a handler function. +pub trait PacketHandlerFn: Fn(Vec, &mut World) + Send + Sync + 'static {} + +#[cfg(feature = "client")] +impl, &mut World) + Send + Sync + 'static> PacketHandlerFn for T {} + /// A function that handle a received [Packet]s. -pub type PacketHandler = Box, &mut World) + Send + Sync>; +pub type PacketHandler = Box; /// A Bevy resource that store the packets handlers. #[derive(Resource)] @@ -44,19 +62,19 @@ pub struct Listener(tcp::Listener); #[cfg(feature = "server")] impl Listener { - /// Creates a new listener on the given address. + /// Creates a new [Listener] on the given address. pub fn bind(addr: A) -> io::Result { Ok(Self(tcp::Listener::bind(addr)?)) } } #[cfg(feature = "server")] -/// A connection to a remote client. +/// A [Connection] to a remote client. #[derive(Component)] pub struct Connection(Arc); #[cfg(feature = "client")] -/// A connection to a remote server. +/// A [Connection] to a remote server. #[derive(Resource)] pub struct Connection(tcp::Connection); @@ -75,7 +93,47 @@ impl Connection { } } -/// A plugin that manage the network connections. +#[cfg(feature = "server")] +/// An event that comes from a client. +pub struct PacketEvent { + /// The entity of the [Connection] that sent the event. + pub entity: Entity, + + /// The [Connection] that sent the event. + pub connection: Connection, + + /// The event. + pub event: E, +} + +#[cfg(all(feature = "sync", feature = "server"))] +impl Deref for PacketEvent { + type Target = E; + + fn deref(&self) -> &Self::Target { + &self.event + } +} + +#[cfg(all(feature = "sync", feature = "client"))] +/// An event that comes from the server. +#[derive(Deref)] +pub struct PacketEvent { + /// The event. + pub event: E, +} + +#[cfg(all(feature = "sync", feature = "server"))] +/// Mark an [Entity] to be synced. +#[derive(Component)] +pub struct Synced; + +#[cfg(all(feature = "sync", feature = "client"))] +/// Mark an [Entity] as synced by the server. +#[derive(Component)] +pub struct Synced(Entity); + +/// A plugin that manage the network [Connection]s. pub struct NetworkPlugin; impl NetworkPlugin { @@ -163,7 +221,7 @@ impl NetworkPlugin { } #[cfg(feature = "client")] - /// Remove [ServerConnection] if it's disconnected. + /// Remove [Connection] if it's disconnected. pub fn remove_disconnected(mut commands: Commands, connection: Option>) { if let Some(connection) = connection { if connection.0.closed() { @@ -171,58 +229,243 @@ impl NetworkPlugin { } } } + + #[cfg(all(feature = "sync", feature = "server"))] + /// Send to clients the [Synced] entity that has been added to the server. + fn send_added(added_entities: Query>, connections: Query<&Connection>) { + for entity in added_entities.iter() { + for connection in connections.iter() { + connection.send(&entity); + } + } + } + + #[cfg(all(feature = "sync", feature = "server"))] + /// Send [Synced] entities to new clients. + fn send_synced( + synced_entities: Query>, + new_connections: Query<&Connection, Added>, + ) { + for entity in synced_entities.iter() { + for connection in new_connections.iter() { + connection.send(&entity); + } + } + } + + #[cfg(all(feature = "sync", feature = "server"))] + /// Send to clients the [Synced] entity that has been removed. + fn send_removed( + mut removed_entities: RemovedComponents, + connections: Query<&Connection>, + ) { + for entity in removed_entities.iter() { + for connection in connections.iter() { + connection.send(&entity); + } + } + } + + #[cfg(all(feature = "sync", feature = "client"))] + /// Removes [ServerEntity] when disconnected. + fn remove_synced(mut commands: Commands, entities: Query>) { + for entity in entities.iter() { + commands.entity(entity).despawn(); + } + } } impl Plugin for NetworkPlugin { fn build(&self, app: &mut App) { app.insert_resource(HandlerManager(Arc::new(HashMap::new()))); - app.add_system(NetworkPlugin::handle_packets); - app.add_system(NetworkPlugin::remove_disconnected); + app.add_system(Self::handle_packets); + app.add_system(Self::remove_disconnected); #[cfg(feature = "server")] - app.add_system(NetworkPlugin::accept_connections); + app.add_system(Self::accept_connections); + + #[cfg(all(feature = "sync", feature = "server"))] + { + app.add_system(Self::send_added); + app.add_system(Self::send_synced); + app.add_system(Self::send_removed); + } + + #[cfg(all(feature = "sync", feature = "client"))] + { + app.add_packet_handler::(|data, world| { + match bincode::deserialize::(&data) { + Ok(entity) => { + if let Some((local_entity, _)) = world + .query::<(Entity, &Synced)>() + .iter(world) + .find(|(_, server_entity)| server_entity.0 == entity) + { + world.despawn(local_entity); + } else { + world.spawn(Synced(entity)); + } + } + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + } + }); + app.add_system(Self::remove_synced.run_if(resource_removed::())); + } } } /// An extension to add packet handlers. pub trait NetworkExt { - #[cfg(feature = "server")] /// Add a new packet handler. - fn add_packet_handler(&mut self, handler: H) -> &mut Self - where - P: Packet, - H: Fn(Entity, Connection, Vec, &mut World) + Send + Sync + 'static; + fn add_packet_handler(&mut self, handler: H) -> &mut Self; - #[cfg(feature = "client")] - /// Add a new packet handler. - fn add_packet_handler(&mut self, handler: H) -> &mut Self - where - P: Packet, - H: Fn(Vec, &mut World) + Send + Sync + 'static; + #[cfg(all(feature = "sync"))] + /// Register syncronization for an [Event] that can be sent by the server. + fn sync_server_event(&mut self) -> &mut Self; + + #[cfg(all(feature = "sync"))] + /// Register syncronization for an [Event] that comes from the client. + fn sync_client_event(&mut self) -> &mut Self; + + #[cfg(all(feature = "sync"))] + /// Register a [Component] to be synced. + fn sync_component(&mut self) -> &mut Self; } impl NetworkExt for App { - #[cfg(feature = "server")] - fn add_packet_handler(&mut self, handler: H) -> &mut Self - where - P: Packet, - H: Fn(Entity, Connection, Vec, &mut World) + Send + Sync + 'static, - { + fn add_packet_handler(&mut self, handler: H) -> &mut Self { Arc::get_mut(&mut self.world.resource_mut::().0) .unwrap() .insert(P::packet_id(), Box::new(handler)); self } - #[cfg(feature = "client")] - fn add_packet_handler(&mut self, handler: H) -> &mut Self - where - P: Packet, - H: Fn(Vec, &mut World) + Send + Sync + 'static, - { - Arc::get_mut(&mut self.world.resource_mut::().0) - .unwrap() - .insert(P::packet_id(), Box::new(handler)); - self + #[cfg(all(feature = "sync", feature = "server"))] + fn sync_server_event(&mut self) -> &mut Self { + self.add_event::().add_system( + |mut events: EventReader, connections: Query<&Connection>| { + for event in events.iter() { + for connection in connections.iter() { + connection.send(event); + } + } + }, + ) + } + + #[cfg(all(feature = "sync", feature = "client"))] + fn sync_server_event(&mut self) -> &mut Self { + self.add_event::>() + .add_packet_handler::(|data, world| match bincode::deserialize::(&data) { + Ok(event) => world.send_event(PacketEvent { event }), + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + }) + } + + #[cfg(all(feature = "sync", feature = "server"))] + fn sync_client_event(&mut self) -> &mut Self { + self.add_event::>() + .add_packet_handler::( + |entity, connection, data, world| match bincode::deserialize::(&data) { + Ok(event) => world.send_event(PacketEvent { + entity, + connection, + event, + }), + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + }, + ) + } + + #[cfg(all(feature = "sync", feature = "client"))] + fn sync_client_event(&mut self) -> &mut Self { + self.add_event::().add_system( + |mut events: EventReader, connection: Option>| { + if let Some(connection) = connection { + for event in events.iter() { + connection.send(event); + } + } + }, + ) + } + + #[cfg(all(feature = "sync", feature = "server"))] + fn sync_component(&mut self) -> &mut Self { + let update_components = + |changed_components: Query<(Entity, &C), (Changed, With)>, + connections: Query<&Connection>| { + for (entity, component) in changed_components.iter() { + for connection in connections.iter() { + connection.send(&(entity, component.clone())); + } + } + }; + let send_components = + |components: Query<(Entity, &C), With>, + new_connections: Query<&Connection, Added>| { + for (entity, component) in components.iter() { + for connection in new_connections.iter() { + connection.send(&(entity, component.clone())); + } + } + }; + let remove_components = |mut removed_components: RemovedComponents, + synced_entities: Query>, + connections: Query<&Connection>| { + for entity in removed_components.iter() { + if synced_entities.contains(entity) { + for connection in connections.iter() { + connection.send(&(entity, PhantomData::)); + } + } + } + }; + self.add_system(update_components.after(NetworkPlugin::send_added)) + .add_system(send_components.after(NetworkPlugin::send_synced)) + .add_system(remove_components.after(update_components)) + } + + #[cfg(all(feature = "sync", feature = "client"))] + fn sync_component(&mut self) -> &mut Self { + self.add_packet_handler::<(Entity, C), _>(|data, world| { + match bincode::deserialize::<(Entity, C)>(&data) { + Ok((entity, component)) => { + if let Some((local_entity, _)) = world + .query::<(Entity, &Synced)>() + .iter(world) + .find(|(_, server_entity)| server_entity.0 == entity) + { + let mut local_entity = world.entity_mut(local_entity); + match local_entity.get_mut::() { + Some(mut local_component) => { + println!("CA CHANGE: {}", type_name::()); + *local_component = component; + } + None => { + local_entity.insert(component); + } + } + } else { + println!("Received component for unknown entity: {:?}", entity); + } + } + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + } + }) + .add_packet_handler::<(Entity, PhantomData), _>( + |data, world| match bincode::deserialize::<(Entity, PhantomData)>(&data) { + Ok((entity, _)) => { + if let Some((local_entity, _)) = world + .query::<(Entity, &Synced)>() + .iter(world) + .find(|(_, server_entity)| server_entity.0 == entity) + { + world.entity_mut(local_entity).remove::(); + } + } + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + }, + ) } } diff --git a/src/server/mod.rs b/src/server/mod.rs deleted file mode 100644 index 9a31ad3..0000000 --- a/src/server/mod.rs +++ /dev/null @@ -1,129 +0,0 @@ -use crate::{ - tcp::{Connection, Listener}, - Packet, -}; -use bevy::prelude::*; -use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc}; - -pub mod sync; - -/// A function that handle a received [Packet] on the server. -pub type ServerPacketHandler = - Box, &mut World) + Send + Sync>; - -/// A Bevy resource that store the packets handlers for the server. -#[derive(Resource)] -pub struct ServerHandlerManager(Arc>); - -/// A Bevy resource that listens for incoming [ClientConnection]s. -#[derive(Resource)] -pub struct ClientListener(Listener); - -impl ClientListener { - /// Creates a new listener on the given address. - pub fn bind(addr: A) -> io::Result { - Ok(Self(Listener::bind(addr)?)) - } -} - -/// A connection to a remote client. -#[derive(Component)] -pub struct ClientConnection(Arc); - -impl ClientConnection { - /// Sends a packet through this connection. - pub fn send(&self, packet: &P) { - let mut data = bincode::serialize(packet).expect("Failed to serialize packet"); - data.extend(P::packet_id().to_be_bytes()); - self.0.send(data); - } -} - -/// A plugin that manage the network connections for a server. -pub struct ServerPlugin; - -impl ServerPlugin { - /// Accept new [ClientConnection]s. - pub fn accept_connections(mut commands: Commands, listener: Option>) { - if let Some(listener) = listener { - if let Some(connection) = listener.0.accept() { - commands.spawn(ClientConnection(Arc::new(connection))); - } - } - } - - /// Handles a received [Packet] on the server. - pub fn handle_packets(world: &mut World) { - // Get all received packets - let mut packets = Vec::new(); - for (entity, connection) in world.query::<(Entity, &ClientConnection)>().iter(world) { - while let Some(mut packet) = connection.0.recv() { - if packet.len() < 8 { - println!("Invalid packet received: {:?}", packet); - } else { - let id_buffer = packet.split_off(packet.len() - 8); - let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap()); - packets.push(( - entity, - ClientConnection(Arc::clone(&connection.0)), - packet_id, - packet, - )); - } - } - } - - // Get the packet handlers - let handlers = Arc::clone(&world.resource_mut::().0); - - // Handle all received packets - for (entity, connection, packet_id, packet) in packets { - if let Some(handler) = handlers.get(&packet_id) { - handler(entity, connection, packet, world); - } - } - } - - /// Remove disconnected [ClientConnection]s. - pub fn remove_disconnected( - mut commands: Commands, - connections: Query<(Entity, &ClientConnection)>, - ) { - for (entity, connection) in connections.iter() { - if connection.0.closed() { - commands.entity(entity).remove::(); - } - } - } -} - -impl Plugin for ServerPlugin { - fn build(&self, app: &mut App) { - app.insert_resource(ServerHandlerManager(Arc::new(HashMap::new()))); - app.add_system(ServerPlugin::accept_connections); - app.add_system(ServerPlugin::handle_packets); - app.add_system(ServerPlugin::remove_disconnected); - } -} - -/// An extension to add packet handlers. -pub trait ServerAppExt { - /// Add a new packet handler. - fn add_packet_handler(&mut self, handler: H) -> &mut Self - where - P: Packet, - H: Fn(Entity, ClientConnection, Vec, &mut World) + Send + Sync + 'static; -} - -impl ServerAppExt for App { - fn add_packet_handler(&mut self, handler: H) -> &mut Self - where - P: Packet, - H: Fn(Entity, ClientConnection, Vec, &mut World) + Send + Sync + 'static, - { - Arc::get_mut(&mut self.world.resource_mut::().0) - .unwrap() - .insert(P::packet_id(), Box::new(handler)); - self - } -} diff --git a/src/server/sync.rs b/src/server/sync.rs deleted file mode 100644 index ad2e7b7..0000000 --- a/src/server/sync.rs +++ /dev/null @@ -1,154 +0,0 @@ -use super::ServerAppExt; -use crate::{server::ClientConnection, Packet}; -use bevy::prelude::*; -use serde::{de::DeserializeOwned, Serialize}; -use std::{any::type_name, marker::PhantomData, ops::Deref}; - -/// An event that comes from a client. -pub struct FromClient { - /// The entity of the [ClientConnection] that sent the event. - pub entity: Entity, - - /// The [ClientConnection] that sent the event. - pub connection: ClientConnection, - - /// The event. - pub event: E, -} - -impl Deref for FromClient { - type Target = E; - - fn deref(&self) -> &Self::Target { - &self.event - } -} - -/// Mark an [Entity] to be synced. -#[derive(Component)] -pub struct Synced; - -/// A plugin for the syncronization system. -pub struct ServerSyncPlugin; - -impl ServerSyncPlugin { - /// Send to clients the [Synced] entity that has been added to the server. - fn send_added( - added_entities: Query>, - connections: Query<&ClientConnection>, - ) { - for entity in added_entities.iter() { - for connection in connections.iter() { - connection.send(&entity); - } - } - } - - /// Send [Synced] entities to new clients. - fn send_synced( - synced_entities: Query>, - new_connections: Query<&ClientConnection, Added>, - ) { - for entity in synced_entities.iter() { - for connection in new_connections.iter() { - connection.send(&entity); - } - } - } - - /// Send to clients the [Synced] entity that has been removed. - fn send_removed( - mut removed_entities: RemovedComponents, - connections: Query<&ClientConnection>, - ) { - for entity in removed_entities.iter() { - for connection in connections.iter() { - connection.send(&entity); - } - } - } -} - -impl Plugin for ServerSyncPlugin { - fn build(&self, app: &mut App) { - app.add_system(Self::send_added); - app.add_system(Self::send_synced); - app.add_system(Self::send_removed); - } -} - -/// An extention to add syncronization to the server. -pub trait ServerSyncExt { - /// Register syncronization for an [Event] that can be sent by the server. - fn server_event_sync(&mut self) -> &mut Self; - - /// Register syncronization for an [Event] that comes from the client. - fn client_event_sync(&mut self) -> &mut Self; - - /// Register a [Component] to be synced. - fn sync_component(&mut self) -> &mut Self; -} - -impl ServerSyncExt for App { - fn server_event_sync(&mut self) -> &mut Self { - self.add_event::().add_system( - |mut events: EventReader, connections: Query<&ClientConnection>| { - for event in events.iter() { - for connection in connections.iter() { - connection.send(event); - } - } - }, - ) - } - - fn client_event_sync(&mut self) -> &mut Self { - self.add_event::>() - .add_packet_handler::( - |entity, connection, data, world| match bincode::deserialize::(&data) { - Ok(event) => world.send_event(FromClient { - entity, - connection, - event, - }), - Err(_) => println!("Failed to deserialize packet: {}", type_name::()), - }, - ) - } - - fn sync_component(&mut self) -> &mut Self { - let update_components = - |changed_components: Query<(Entity, &C), (Changed, With)>, - connections: Query<&ClientConnection>| { - for (entity, component) in changed_components.iter() { - for connection in connections.iter() { - connection.send(&(entity, component.clone())); - } - } - }; - let send_components = - |components: Query<(Entity, &C), With>, - new_connections: Query<&ClientConnection, Added>| { - for (entity, component) in components.iter() { - for connection in new_connections.iter() { - connection.send(&(entity, component.clone())); - } - } - }; - let remove_components = - |mut removed_components: RemovedComponents, - synced_entities: Query>, - connections: Query<&ClientConnection>| { - for entity in removed_components.iter() { - if synced_entities.contains(entity) { - for connection in connections.iter() { - connection.send(&(entity, PhantomData::)); - } - } - } - }; - self.add_system(update_components.after(ServerSyncPlugin::send_added)) - .add_system(send_components.after(ServerSyncPlugin::send_synced)) - .add_system(remove_components.after(update_components)) - } -} From f3868b17cf8afff18fb6026a5f2ab982dc186641 Mon Sep 17 00:00:00 2001 From: Tipragot Date: Sat, 29 Apr 2023 15:17:29 +0200 Subject: [PATCH 5/7] Separate things to make it works with all features --- Cargo.toml | 2 +- src/client.rs | 227 +++++++++++++++++++++++++ src/lib.rs | 449 +------------------------------------------------- src/server.rs | 273 ++++++++++++++++++++++++++++++ src/tcp.rs | 6 +- 5 files changed, 508 insertions(+), 449 deletions(-) create mode 100644 src/client.rs create mode 100644 src/server.rs diff --git a/Cargo.toml b/Cargo.toml index ff61668..78964fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ bincode = "1.3.3" bevy = "0.10.1" [features] -default = ["server", "sync"] +default = ["client", "server", "sync"] server = [] client = [] sync = [] diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..a92db51 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,227 @@ +use bevy::prelude::*; +use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc}; + +#[cfg(feature = "sync")] +use ::{ + serde::{de::DeserializeOwned, Serialize}, + std::{any::type_name, marker::PhantomData}, +}; + +use crate::{tcp, Packet}; + +/// A trait for a handler function. +pub trait PacketHandlerFn: Fn(Vec, &mut World) + Send + Sync + 'static {} + +impl, &mut World) + Send + Sync + 'static> PacketHandlerFn for T {} + +/// A function that handle a received [Packet]s. +pub type PacketHandler = Box; + +/// A Bevy resource that store the packets handlers. +#[derive(Resource)] +struct HandlerManager(Arc>); + +/// A [Connection] to a remote server. +#[derive(Resource)] +pub struct Connection(tcp::Connection); + +impl Connection { + /// Connects to a remote server. + pub fn connect(addr: A) -> io::Result { + Ok(Self(tcp::Connection::connect(addr)?)) + } + + /// Sends a packet through this connection. + pub fn send(&self, packet: &P) { + let mut data = bincode::serialize(packet).expect("Failed to serialize packet"); + data.extend(P::packet_id().to_be_bytes()); + self.0.send(data); + } +} + +#[cfg(feature = "sync")] +/// An event that comes from the server. +#[derive(Deref)] +pub struct PacketEvent { + /// The event. + pub event: E, +} + +#[cfg(feature = "sync")] +/// Mark an [Entity] as synced by the server. +#[derive(Component)] +pub struct Synced(Entity); + +/// A plugin that manage the network [Connection]s. +pub struct NetworkPlugin; + +impl NetworkPlugin { + #[cfg(feature = "client")] + /// Handles a received [Packet] on the server. + pub fn handle_packets(world: &mut World) { + // Get all received packets + let mut packets = Vec::new(); + if let Some(connection) = world.get_resource::() { + while let Some(mut packet) = connection.0.recv() { + if packet.len() < 8 { + println!("Invalid packet received: {:?}", packet); + } else { + let id_buffer = packet.split_off(packet.len() - 8); + let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap()); + packets.push((packet_id, packet)); + } + } + } else { + return; + } + + // Get the packet handlers + let handlers = Arc::clone(&world.resource_mut::().0); + + // Handle all received packets + for (packet_id, packet) in packets { + if let Some(handler) = handlers.get(&packet_id) { + handler(packet, world); + } + } + } + + #[cfg(feature = "client")] + /// Remove [Connection] if it's disconnected. + pub fn remove_disconnected(mut commands: Commands, connection: Option>) { + if let Some(connection) = connection { + if connection.0.closed() { + commands.remove_resource::(); + } + } + } + + #[cfg(feature = "sync")] + /// Removes [ServerEntity] when disconnected. + fn remove_synced(mut commands: Commands, entities: Query>) { + for entity in entities.iter() { + commands.entity(entity).despawn(); + } + } +} + +impl Plugin for NetworkPlugin { + fn build(&self, app: &mut App) { + app.insert_resource(HandlerManager(Arc::new(HashMap::new()))); + app.add_system(Self::handle_packets); + app.add_system(Self::remove_disconnected); + + #[cfg(feature = "sync")] + { + app.add_packet_handler::(|data, world| { + match bincode::deserialize::(&data) { + Ok(entity) => { + if let Some((local_entity, _)) = world + .query::<(Entity, &Synced)>() + .iter(world) + .find(|(_, server_entity)| server_entity.0 == entity) + { + world.despawn(local_entity); + } else { + world.spawn(Synced(entity)); + } + } + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + } + }); + app.add_system(Self::remove_synced.run_if(resource_removed::())); + } + } +} + +/// An extension to add packet handlers. +pub trait NetworkExt { + /// Add a new packet handler. + fn add_packet_handler(&mut self, handler: H) -> &mut Self; + + #[cfg(all(feature = "sync"))] + /// Register syncronization for an [Event] that can be sent by the server. + fn sync_server_event(&mut self) -> &mut Self; + + #[cfg(all(feature = "sync"))] + /// Register syncronization for an [Event] that comes from the client. + fn sync_client_event(&mut self) -> &mut Self; + + #[cfg(all(feature = "sync"))] + /// Register a [Component] to be synced. + fn sync_component(&mut self) -> &mut Self; +} + +impl NetworkExt for App { + fn add_packet_handler(&mut self, handler: H) -> &mut Self { + Arc::get_mut(&mut self.world.resource_mut::().0) + .unwrap() + .insert(P::packet_id(), Box::new(handler)); + self + } + + #[cfg(feature = "sync")] + fn sync_server_event(&mut self) -> &mut Self { + self.add_event::>() + .add_packet_handler::(|data, world| match bincode::deserialize::(&data) { + Ok(event) => world.send_event(PacketEvent { event }), + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + }) + } + + #[cfg(feature = "sync")] + fn sync_client_event(&mut self) -> &mut Self { + self.add_event::().add_system( + |mut events: EventReader, connection: Option>| { + if let Some(connection) = connection { + for event in events.iter() { + connection.send(event); + } + } + }, + ) + } + + #[cfg(feature = "sync")] + fn sync_component(&mut self) -> &mut Self { + self.add_packet_handler::<(Entity, C), _>(|data, world| { + match bincode::deserialize::<(Entity, C)>(&data) { + Ok((entity, component)) => { + if let Some((local_entity, _)) = world + .query::<(Entity, &Synced)>() + .iter(world) + .find(|(_, server_entity)| server_entity.0 == entity) + { + let mut local_entity = world.entity_mut(local_entity); + match local_entity.get_mut::() { + Some(mut local_component) => { + println!("CA CHANGE: {}", type_name::()); + *local_component = component; + } + None => { + local_entity.insert(component); + } + } + } else { + println!("Received component for unknown entity: {:?}", entity); + } + } + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + } + }) + .add_packet_handler::<(Entity, PhantomData), _>( + |data, world| match bincode::deserialize::<(Entity, PhantomData)>(&data) { + Ok((entity, _)) => { + if let Some((local_entity, _)) = world + .query::<(Entity, &Synced)>() + .iter(world) + .find(|(_, server_entity)| server_entity.0 == entity) + { + world.entity_mut(local_entity).remove::(); + } + } + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + }, + ) + } +} diff --git a/src/lib.rs b/src/lib.rs index 6ed0547..f9849ca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,16 +1,14 @@ -use bevy::prelude::*; use serde::{de::DeserializeOwned, Serialize}; use std::{ collections::hash_map::DefaultHasher, hash::{Hash, Hasher}, }; -use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc}; -#[cfg(feature = "sync")] -use std::{any::type_name, marker::PhantomData}; +#[cfg(feature = "client")] +pub mod client; -#[cfg(all(feature = "sync", feature = "server"))] -use std::ops::Deref; +#[cfg(feature = "server")] +pub mod server; mod tcp; @@ -30,442 +28,3 @@ pub trait Packet: DeserializeOwned + Serialize + Send + Sync { } impl Packet for T {} - -#[cfg(feature = "server")] -/// A trait for a handler function. -pub trait PacketHandlerFn: - Fn(Entity, Connection, Vec, &mut World) + Send + Sync + 'static -{ -} - -#[cfg(feature = "server")] -impl, &mut World) + Send + Sync + 'static> PacketHandlerFn for T {} - -#[cfg(feature = "client")] -/// A trait for a handler function. -pub trait PacketHandlerFn: Fn(Vec, &mut World) + Send + Sync + 'static {} - -#[cfg(feature = "client")] -impl, &mut World) + Send + Sync + 'static> PacketHandlerFn for T {} - -/// A function that handle a received [Packet]s. -pub type PacketHandler = Box; - -/// A Bevy resource that store the packets handlers. -#[derive(Resource)] -struct HandlerManager(Arc>); - -#[cfg(feature = "server")] -/// A Bevy resource that listens for incoming [Connection]s. -#[derive(Resource)] -pub struct Listener(tcp::Listener); - -#[cfg(feature = "server")] -impl Listener { - /// Creates a new [Listener] on the given address. - pub fn bind(addr: A) -> io::Result { - Ok(Self(tcp::Listener::bind(addr)?)) - } -} - -#[cfg(feature = "server")] -/// A [Connection] to a remote client. -#[derive(Component)] -pub struct Connection(Arc); - -#[cfg(feature = "client")] -/// A [Connection] to a remote server. -#[derive(Resource)] -pub struct Connection(tcp::Connection); - -impl Connection { - #[cfg(feature = "client")] - /// Connects to a remote server. - pub fn connect(addr: A) -> io::Result { - Ok(Self(tcp::Connection::connect(addr)?)) - } - - /// Sends a packet through this connection. - pub fn send(&self, packet: &P) { - let mut data = bincode::serialize(packet).expect("Failed to serialize packet"); - data.extend(P::packet_id().to_be_bytes()); - self.0.send(data); - } -} - -#[cfg(feature = "server")] -/// An event that comes from a client. -pub struct PacketEvent { - /// The entity of the [Connection] that sent the event. - pub entity: Entity, - - /// The [Connection] that sent the event. - pub connection: Connection, - - /// The event. - pub event: E, -} - -#[cfg(all(feature = "sync", feature = "server"))] -impl Deref for PacketEvent { - type Target = E; - - fn deref(&self) -> &Self::Target { - &self.event - } -} - -#[cfg(all(feature = "sync", feature = "client"))] -/// An event that comes from the server. -#[derive(Deref)] -pub struct PacketEvent { - /// The event. - pub event: E, -} - -#[cfg(all(feature = "sync", feature = "server"))] -/// Mark an [Entity] to be synced. -#[derive(Component)] -pub struct Synced; - -#[cfg(all(feature = "sync", feature = "client"))] -/// Mark an [Entity] as synced by the server. -#[derive(Component)] -pub struct Synced(Entity); - -/// A plugin that manage the network [Connection]s. -pub struct NetworkPlugin; - -impl NetworkPlugin { - #[cfg(feature = "server")] - /// Accept new [Connection]s. - fn accept_connections(mut commands: Commands, listener: Option>) { - if let Some(listener) = listener { - if let Some(connection) = listener.0.accept() { - commands.spawn(Connection(Arc::new(connection))); - } - } - } - - #[cfg(feature = "server")] - /// Handles a received [Packet]s. - fn handle_packets(world: &mut World) { - // Get all received packets - let mut packets = Vec::new(); - for (entity, connection) in world.query::<(Entity, &Connection)>().iter(world) { - while let Some(mut packet) = connection.0.recv() { - if packet.len() < 8 { - println!("Invalid packet received: {:?}", packet); - } else { - let id_buffer = packet.split_off(packet.len() - 8); - let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap()); - packets.push(( - entity, - Connection(Arc::clone(&connection.0)), - packet_id, - packet, - )); - } - } - } - - // Get the packet handlers - let handlers = Arc::clone(&world.resource_mut::().0); - - // Handle all received packets - for (entity, connection, packet_id, packet) in packets { - if let Some(handler) = handlers.get(&packet_id) { - handler(entity, connection, packet, world); - } - } - } - - #[cfg(feature = "client")] - /// Handles a received [Packet] on the server. - pub fn handle_packets(world: &mut World) { - // Get all received packets - let mut packets = Vec::new(); - if let Some(connection) = world.get_resource::() { - while let Some(mut packet) = connection.0.recv() { - if packet.len() < 8 { - println!("Invalid packet received: {:?}", packet); - } else { - let id_buffer = packet.split_off(packet.len() - 8); - let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap()); - packets.push((packet_id, packet)); - } - } - } else { - return; - } - - // Get the packet handlers - let handlers = Arc::clone(&world.resource_mut::().0); - - // Handle all received packets - for (packet_id, packet) in packets { - if let Some(handler) = handlers.get(&packet_id) { - handler(packet, world); - } - } - } - - #[cfg(feature = "server")] - /// Remove disconnected [Connection]s. - fn remove_disconnected(mut commands: Commands, connections: Query<(Entity, &Connection)>) { - for (entity, connection) in connections.iter() { - if connection.0.closed() { - commands.entity(entity).remove::(); - } - } - } - - #[cfg(feature = "client")] - /// Remove [Connection] if it's disconnected. - pub fn remove_disconnected(mut commands: Commands, connection: Option>) { - if let Some(connection) = connection { - if connection.0.closed() { - commands.remove_resource::(); - } - } - } - - #[cfg(all(feature = "sync", feature = "server"))] - /// Send to clients the [Synced] entity that has been added to the server. - fn send_added(added_entities: Query>, connections: Query<&Connection>) { - for entity in added_entities.iter() { - for connection in connections.iter() { - connection.send(&entity); - } - } - } - - #[cfg(all(feature = "sync", feature = "server"))] - /// Send [Synced] entities to new clients. - fn send_synced( - synced_entities: Query>, - new_connections: Query<&Connection, Added>, - ) { - for entity in synced_entities.iter() { - for connection in new_connections.iter() { - connection.send(&entity); - } - } - } - - #[cfg(all(feature = "sync", feature = "server"))] - /// Send to clients the [Synced] entity that has been removed. - fn send_removed( - mut removed_entities: RemovedComponents, - connections: Query<&Connection>, - ) { - for entity in removed_entities.iter() { - for connection in connections.iter() { - connection.send(&entity); - } - } - } - - #[cfg(all(feature = "sync", feature = "client"))] - /// Removes [ServerEntity] when disconnected. - fn remove_synced(mut commands: Commands, entities: Query>) { - for entity in entities.iter() { - commands.entity(entity).despawn(); - } - } -} - -impl Plugin for NetworkPlugin { - fn build(&self, app: &mut App) { - app.insert_resource(HandlerManager(Arc::new(HashMap::new()))); - app.add_system(Self::handle_packets); - app.add_system(Self::remove_disconnected); - - #[cfg(feature = "server")] - app.add_system(Self::accept_connections); - - #[cfg(all(feature = "sync", feature = "server"))] - { - app.add_system(Self::send_added); - app.add_system(Self::send_synced); - app.add_system(Self::send_removed); - } - - #[cfg(all(feature = "sync", feature = "client"))] - { - app.add_packet_handler::(|data, world| { - match bincode::deserialize::(&data) { - Ok(entity) => { - if let Some((local_entity, _)) = world - .query::<(Entity, &Synced)>() - .iter(world) - .find(|(_, server_entity)| server_entity.0 == entity) - { - world.despawn(local_entity); - } else { - world.spawn(Synced(entity)); - } - } - Err(_) => println!("Failed to deserialize packet: {}", type_name::()), - } - }); - app.add_system(Self::remove_synced.run_if(resource_removed::())); - } - } -} - -/// An extension to add packet handlers. -pub trait NetworkExt { - /// Add a new packet handler. - fn add_packet_handler(&mut self, handler: H) -> &mut Self; - - #[cfg(all(feature = "sync"))] - /// Register syncronization for an [Event] that can be sent by the server. - fn sync_server_event(&mut self) -> &mut Self; - - #[cfg(all(feature = "sync"))] - /// Register syncronization for an [Event] that comes from the client. - fn sync_client_event(&mut self) -> &mut Self; - - #[cfg(all(feature = "sync"))] - /// Register a [Component] to be synced. - fn sync_component(&mut self) -> &mut Self; -} - -impl NetworkExt for App { - fn add_packet_handler(&mut self, handler: H) -> &mut Self { - Arc::get_mut(&mut self.world.resource_mut::().0) - .unwrap() - .insert(P::packet_id(), Box::new(handler)); - self - } - - #[cfg(all(feature = "sync", feature = "server"))] - fn sync_server_event(&mut self) -> &mut Self { - self.add_event::().add_system( - |mut events: EventReader, connections: Query<&Connection>| { - for event in events.iter() { - for connection in connections.iter() { - connection.send(event); - } - } - }, - ) - } - - #[cfg(all(feature = "sync", feature = "client"))] - fn sync_server_event(&mut self) -> &mut Self { - self.add_event::>() - .add_packet_handler::(|data, world| match bincode::deserialize::(&data) { - Ok(event) => world.send_event(PacketEvent { event }), - Err(_) => println!("Failed to deserialize packet: {}", type_name::()), - }) - } - - #[cfg(all(feature = "sync", feature = "server"))] - fn sync_client_event(&mut self) -> &mut Self { - self.add_event::>() - .add_packet_handler::( - |entity, connection, data, world| match bincode::deserialize::(&data) { - Ok(event) => world.send_event(PacketEvent { - entity, - connection, - event, - }), - Err(_) => println!("Failed to deserialize packet: {}", type_name::()), - }, - ) - } - - #[cfg(all(feature = "sync", feature = "client"))] - fn sync_client_event(&mut self) -> &mut Self { - self.add_event::().add_system( - |mut events: EventReader, connection: Option>| { - if let Some(connection) = connection { - for event in events.iter() { - connection.send(event); - } - } - }, - ) - } - - #[cfg(all(feature = "sync", feature = "server"))] - fn sync_component(&mut self) -> &mut Self { - let update_components = - |changed_components: Query<(Entity, &C), (Changed, With)>, - connections: Query<&Connection>| { - for (entity, component) in changed_components.iter() { - for connection in connections.iter() { - connection.send(&(entity, component.clone())); - } - } - }; - let send_components = - |components: Query<(Entity, &C), With>, - new_connections: Query<&Connection, Added>| { - for (entity, component) in components.iter() { - for connection in new_connections.iter() { - connection.send(&(entity, component.clone())); - } - } - }; - let remove_components = |mut removed_components: RemovedComponents, - synced_entities: Query>, - connections: Query<&Connection>| { - for entity in removed_components.iter() { - if synced_entities.contains(entity) { - for connection in connections.iter() { - connection.send(&(entity, PhantomData::)); - } - } - } - }; - self.add_system(update_components.after(NetworkPlugin::send_added)) - .add_system(send_components.after(NetworkPlugin::send_synced)) - .add_system(remove_components.after(update_components)) - } - - #[cfg(all(feature = "sync", feature = "client"))] - fn sync_component(&mut self) -> &mut Self { - self.add_packet_handler::<(Entity, C), _>(|data, world| { - match bincode::deserialize::<(Entity, C)>(&data) { - Ok((entity, component)) => { - if let Some((local_entity, _)) = world - .query::<(Entity, &Synced)>() - .iter(world) - .find(|(_, server_entity)| server_entity.0 == entity) - { - let mut local_entity = world.entity_mut(local_entity); - match local_entity.get_mut::() { - Some(mut local_component) => { - println!("CA CHANGE: {}", type_name::()); - *local_component = component; - } - None => { - local_entity.insert(component); - } - } - } else { - println!("Received component for unknown entity: {:?}", entity); - } - } - Err(_) => println!("Failed to deserialize packet: {}", type_name::()), - } - }) - .add_packet_handler::<(Entity, PhantomData), _>( - |data, world| match bincode::deserialize::<(Entity, PhantomData)>(&data) { - Ok((entity, _)) => { - if let Some((local_entity, _)) = world - .query::<(Entity, &Synced)>() - .iter(world) - .find(|(_, server_entity)| server_entity.0 == entity) - { - world.entity_mut(local_entity).remove::(); - } - } - Err(_) => println!("Failed to deserialize packet: {}", type_name::()), - }, - ) - } -} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..8ac9277 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,273 @@ +use crate::{tcp, Packet}; +use bevy::prelude::*; +use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc}; + +#[cfg(feature = "sync")] +use ::{ + serde::{de::DeserializeOwned, Serialize}, + std::{any::type_name, marker::PhantomData, ops::Deref}, +}; + +/// A trait for a handler function. +pub trait PacketHandlerFn: + Fn(Entity, Connection, Vec, &mut World) + Send + Sync + 'static +{ +} + +impl, &mut World) + Send + Sync + 'static> PacketHandlerFn for T {} + +/// A function that handle a received [Packet]s. +pub type PacketHandler = Box; + +/// A Bevy resource that store the packets handlers. +#[derive(Resource)] +struct HandlerManager(Arc>); + +/// A Bevy resource that listens for incoming [Connection]s. +#[derive(Resource)] +pub struct Listener(tcp::Listener); + +impl Listener { + /// Creates a new [Listener] on the given address. + pub fn bind(addr: A) -> io::Result { + Ok(Self(tcp::Listener::bind(addr)?)) + } +} + +/// A [Connection] to a remote client. +#[derive(Component)] +pub struct Connection(Arc); + +impl Connection { + /// Sends a packet through this connection. + pub fn send(&self, packet: &P) { + let mut data = bincode::serialize(packet).expect("Failed to serialize packet"); + data.extend(P::packet_id().to_be_bytes()); + self.0.send(data); + } +} + +#[cfg(feature = "sync")] +/// An event that comes from a client. +pub struct PacketEvent { + /// The entity of the [Connection] that sent the event. + pub entity: Entity, + + /// The [Connection] that sent the event. + pub connection: Connection, + + /// The event. + pub event: E, +} + +#[cfg(feature = "sync")] +impl Deref for PacketEvent { + type Target = E; + + fn deref(&self) -> &Self::Target { + &self.event + } +} + +#[cfg(feature = "sync")] +/// Mark an [Entity] to be synced. +#[derive(Component)] +pub struct Synced; + +/// A plugin that manage the network [Connection]s. +pub struct NetworkPlugin; + +impl NetworkPlugin { + /// Accept new [Connection]s. + fn accept_connections(mut commands: Commands, listener: Option>) { + if let Some(listener) = listener { + if let Some(connection) = listener.0.accept() { + commands.spawn(Connection(Arc::new(connection))); + } + } + } + + /// Handles a received [Packet]s. + fn handle_packets(world: &mut World) { + // Get all received packets + let mut packets = Vec::new(); + for (entity, connection) in world.query::<(Entity, &Connection)>().iter(world) { + while let Some(mut packet) = connection.0.recv() { + if packet.len() < 8 { + println!("Invalid packet received: {:?}", packet); + } else { + let id_buffer = packet.split_off(packet.len() - 8); + let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap()); + packets.push(( + entity, + Connection(Arc::clone(&connection.0)), + packet_id, + packet, + )); + } + } + } + + // Get the packet handlers + let handlers = Arc::clone(&world.resource_mut::().0); + + // Handle all received packets + for (entity, connection, packet_id, packet) in packets { + if let Some(handler) = handlers.get(&packet_id) { + handler(entity, connection, packet, world); + } + } + } + + /// Remove disconnected [Connection]s. + fn remove_disconnected(mut commands: Commands, connections: Query<(Entity, &Connection)>) { + for (entity, connection) in connections.iter() { + if connection.0.closed() { + commands.entity(entity).remove::(); + } + } + } + + #[cfg(feature = "sync")] + /// Send to clients the [Synced] entity that has been added to the server. + fn send_added(added_entities: Query>, connections: Query<&Connection>) { + for entity in added_entities.iter() { + for connection in connections.iter() { + connection.send(&entity); + } + } + } + + #[cfg(feature = "sync")] + /// Send [Synced] entities to new clients. + fn send_synced( + synced_entities: Query>, + new_connections: Query<&Connection, Added>, + ) { + for entity in synced_entities.iter() { + for connection in new_connections.iter() { + connection.send(&entity); + } + } + } + + #[cfg(feature = "sync")] + /// Send to clients the [Synced] entity that has been removed. + fn send_removed( + mut removed_entities: RemovedComponents, + connections: Query<&Connection>, + ) { + for entity in removed_entities.iter() { + for connection in connections.iter() { + connection.send(&entity); + } + } + } +} + +impl Plugin for NetworkPlugin { + fn build(&self, app: &mut App) { + app.insert_resource(HandlerManager(Arc::new(HashMap::new()))); + app.add_system(Self::handle_packets); + app.add_system(Self::remove_disconnected); + app.add_system(Self::accept_connections); + + #[cfg(feature = "sync")] + { + app.add_system(Self::send_added); + app.add_system(Self::send_synced); + app.add_system(Self::send_removed); + } + } +} + +/// An extension to add packet handlers. +pub trait NetworkExt { + /// Add a new packet handler. + fn add_packet_handler(&mut self, handler: H) -> &mut Self; + + #[cfg(all(feature = "sync"))] + /// Register syncronization for an [Event] that can be sent by the server. + fn sync_server_event(&mut self) -> &mut Self; + + #[cfg(all(feature = "sync"))] + /// Register syncronization for an [Event] that comes from the client. + fn sync_client_event(&mut self) -> &mut Self; + + #[cfg(all(feature = "sync"))] + /// Register a [Component] to be synced. + fn sync_component(&mut self) -> &mut Self; +} + +impl NetworkExt for App { + fn add_packet_handler(&mut self, handler: H) -> &mut Self { + Arc::get_mut(&mut self.world.resource_mut::().0) + .unwrap() + .insert(P::packet_id(), Box::new(handler)); + self + } + + #[cfg(feature = "sync")] + fn sync_server_event(&mut self) -> &mut Self { + self.add_event::().add_system( + |mut events: EventReader, connections: Query<&Connection>| { + for event in events.iter() { + for connection in connections.iter() { + connection.send(event); + } + } + }, + ) + } + + #[cfg(feature = "sync")] + fn sync_client_event(&mut self) -> &mut Self { + self.add_event::>() + .add_packet_handler::( + |entity, connection, data, world| match bincode::deserialize::(&data) { + Ok(event) => world.send_event(PacketEvent { + entity, + connection, + event, + }), + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + }, + ) + } + + #[cfg(feature = "sync")] + fn sync_component(&mut self) -> &mut Self { + let update_components = + |changed_components: Query<(Entity, &C), (Changed, With)>, + connections: Query<&Connection>| { + for (entity, component) in changed_components.iter() { + for connection in connections.iter() { + connection.send(&(entity, component.clone())); + } + } + }; + let send_components = + |components: Query<(Entity, &C), With>, + new_connections: Query<&Connection, Added>| { + for (entity, component) in components.iter() { + for connection in new_connections.iter() { + connection.send(&(entity, component.clone())); + } + } + }; + let remove_components = |mut removed_components: RemovedComponents, + synced_entities: Query>, + connections: Query<&Connection>| { + for entity in removed_components.iter() { + if synced_entities.contains(entity) { + for connection in connections.iter() { + connection.send(&(entity, PhantomData::)); + } + } + } + }; + self.add_system(update_components.after(NetworkPlugin::send_added)) + .add_system(send_components.after(NetworkPlugin::send_synced)) + .add_system(remove_components.after(update_components)) + } +} diff --git a/src/tcp.rs b/src/tcp.rs index d6611bf..0905256 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -9,6 +9,9 @@ use std::{ thread, }; +#[cfg(feature = "server")] +use std::net::TcpListener; + /// A non-blocking TCP connection. pub struct Connection { /// Track if the connection has been closed. @@ -123,9 +126,6 @@ impl Drop for Connection { } } -#[cfg(feature = "server")] -use std::net::TcpListener; - #[cfg(feature = "server")] /// A [Connection] listener. pub struct Listener { From 1a8136b078fc88cfdccbae7da9ed157816244adb Mon Sep 17 00:00:00 2001 From: Tipragot Date: Sat, 29 Apr 2023 15:19:09 +0200 Subject: [PATCH 6/7] Remove useless public identifier --- src/client.rs | 2 +- src/server.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client.rs b/src/client.rs index a92db51..a014ab8 100644 --- a/src/client.rs +++ b/src/client.rs @@ -15,7 +15,7 @@ pub trait PacketHandlerFn: Fn(Vec, &mut World) + Send + Sync + 'static {} impl, &mut World) + Send + Sync + 'static> PacketHandlerFn for T {} /// A function that handle a received [Packet]s. -pub type PacketHandler = Box; +type PacketHandler = Box; /// A Bevy resource that store the packets handlers. #[derive(Resource)] diff --git a/src/server.rs b/src/server.rs index 8ac9277..e6d3d05 100644 --- a/src/server.rs +++ b/src/server.rs @@ -17,7 +17,7 @@ pub trait PacketHandlerFn: impl, &mut World) + Send + Sync + 'static> PacketHandlerFn for T {} /// A function that handle a received [Packet]s. -pub type PacketHandler = Box; +type PacketHandler = Box; /// A Bevy resource that store the packets handlers. #[derive(Resource)] From 59dc588c39b612de7641efaa8b5173cefb4787e1 Mon Sep 17 00:00:00 2001 From: Tipragot Date: Sat, 29 Apr 2023 15:35:15 +0200 Subject: [PATCH 7/7] Rename things to make them much clear --- src/client.rs | 38 ++++++++++++------------- src/server.rs | 78 +++++++++++++++++++++++++++++---------------------- 2 files changed, 63 insertions(+), 53 deletions(-) diff --git a/src/client.rs b/src/client.rs index a014ab8..cbec75f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -23,9 +23,9 @@ struct HandlerManager(Arc>); /// A [Connection] to a remote server. #[derive(Resource)] -pub struct Connection(tcp::Connection); +pub struct ServerConnection(tcp::Connection); -impl Connection { +impl ServerConnection { /// Connects to a remote server. pub fn connect(addr: A) -> io::Result { Ok(Self(tcp::Connection::connect(addr)?)) @@ -42,7 +42,7 @@ impl Connection { #[cfg(feature = "sync")] /// An event that comes from the server. #[derive(Deref)] -pub struct PacketEvent { +pub struct FromServer { /// The event. pub event: E, } @@ -50,18 +50,18 @@ pub struct PacketEvent { #[cfg(feature = "sync")] /// Mark an [Entity] as synced by the server. #[derive(Component)] -pub struct Synced(Entity); +pub struct ServerEntity(Entity); /// A plugin that manage the network [Connection]s. -pub struct NetworkPlugin; +pub struct ClientPlugin; -impl NetworkPlugin { +impl ClientPlugin { #[cfg(feature = "client")] /// Handles a received [Packet] on the server. pub fn handle_packets(world: &mut World) { // Get all received packets let mut packets = Vec::new(); - if let Some(connection) = world.get_resource::() { + if let Some(connection) = world.get_resource::() { while let Some(mut packet) = connection.0.recv() { if packet.len() < 8 { println!("Invalid packet received: {:?}", packet); @@ -88,24 +88,24 @@ impl NetworkPlugin { #[cfg(feature = "client")] /// Remove [Connection] if it's disconnected. - pub fn remove_disconnected(mut commands: Commands, connection: Option>) { + pub fn remove_disconnected(mut commands: Commands, connection: Option>) { if let Some(connection) = connection { if connection.0.closed() { - commands.remove_resource::(); + commands.remove_resource::(); } } } #[cfg(feature = "sync")] /// Removes [ServerEntity] when disconnected. - fn remove_synced(mut commands: Commands, entities: Query>) { + fn remove_synced(mut commands: Commands, entities: Query>) { for entity in entities.iter() { commands.entity(entity).despawn(); } } } -impl Plugin for NetworkPlugin { +impl Plugin for ClientPlugin { fn build(&self, app: &mut App) { app.insert_resource(HandlerManager(Arc::new(HashMap::new()))); app.add_system(Self::handle_packets); @@ -117,19 +117,19 @@ impl Plugin for NetworkPlugin { match bincode::deserialize::(&data) { Ok(entity) => { if let Some((local_entity, _)) = world - .query::<(Entity, &Synced)>() + .query::<(Entity, &ServerEntity)>() .iter(world) .find(|(_, server_entity)| server_entity.0 == entity) { world.despawn(local_entity); } else { - world.spawn(Synced(entity)); + world.spawn(ServerEntity(entity)); } } Err(_) => println!("Failed to deserialize packet: {}", type_name::()), } }); - app.add_system(Self::remove_synced.run_if(resource_removed::())); + app.add_system(Self::remove_synced.run_if(resource_removed::())); } } } @@ -162,9 +162,9 @@ impl NetworkExt for App { #[cfg(feature = "sync")] fn sync_server_event(&mut self) -> &mut Self { - self.add_event::>() + self.add_event::>() .add_packet_handler::(|data, world| match bincode::deserialize::(&data) { - Ok(event) => world.send_event(PacketEvent { event }), + Ok(event) => world.send_event(FromServer { event }), Err(_) => println!("Failed to deserialize packet: {}", type_name::()), }) } @@ -172,7 +172,7 @@ impl NetworkExt for App { #[cfg(feature = "sync")] fn sync_client_event(&mut self) -> &mut Self { self.add_event::().add_system( - |mut events: EventReader, connection: Option>| { + |mut events: EventReader, connection: Option>| { if let Some(connection) = connection { for event in events.iter() { connection.send(event); @@ -188,7 +188,7 @@ impl NetworkExt for App { match bincode::deserialize::<(Entity, C)>(&data) { Ok((entity, component)) => { if let Some((local_entity, _)) = world - .query::<(Entity, &Synced)>() + .query::<(Entity, &ServerEntity)>() .iter(world) .find(|(_, server_entity)| server_entity.0 == entity) { @@ -213,7 +213,7 @@ impl NetworkExt for App { |data, world| match bincode::deserialize::<(Entity, PhantomData)>(&data) { Ok((entity, _)) => { if let Some((local_entity, _)) = world - .query::<(Entity, &Synced)>() + .query::<(Entity, &ServerEntity)>() .iter(world) .find(|(_, server_entity)| server_entity.0 == entity) { diff --git a/src/server.rs b/src/server.rs index e6d3d05..381227a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -10,11 +10,14 @@ use ::{ /// A trait for a handler function. pub trait PacketHandlerFn: - Fn(Entity, Connection, Vec, &mut World) + Send + Sync + 'static + Fn(Entity, ClientConnection, Vec, &mut World) + Send + Sync + 'static { } -impl, &mut World) + Send + Sync + 'static> PacketHandlerFn for T {} +impl, &mut World) + Send + Sync + 'static> PacketHandlerFn + for T +{ +} /// A function that handle a received [Packet]s. type PacketHandler = Box; @@ -36,9 +39,9 @@ impl Listener { /// A [Connection] to a remote client. #[derive(Component)] -pub struct Connection(Arc); +pub struct ClientConnection(Arc); -impl Connection { +impl ClientConnection { /// Sends a packet through this connection. pub fn send(&self, packet: &P) { let mut data = bincode::serialize(packet).expect("Failed to serialize packet"); @@ -49,19 +52,19 @@ impl Connection { #[cfg(feature = "sync")] /// An event that comes from a client. -pub struct PacketEvent { +pub struct FromClient { /// The entity of the [Connection] that sent the event. pub entity: Entity, /// The [Connection] that sent the event. - pub connection: Connection, + pub connection: ClientConnection, /// The event. pub event: E, } #[cfg(feature = "sync")] -impl Deref for PacketEvent { +impl Deref for FromClient { type Target = E; fn deref(&self) -> &Self::Target { @@ -75,14 +78,14 @@ impl Deref for PacketEvent { pub struct Synced; /// A plugin that manage the network [Connection]s. -pub struct NetworkPlugin; +pub struct ServerPlugin; -impl NetworkPlugin { +impl ServerPlugin { /// Accept new [Connection]s. fn accept_connections(mut commands: Commands, listener: Option>) { if let Some(listener) = listener { if let Some(connection) = listener.0.accept() { - commands.spawn(Connection(Arc::new(connection))); + commands.spawn(ClientConnection(Arc::new(connection))); } } } @@ -91,7 +94,7 @@ impl NetworkPlugin { fn handle_packets(world: &mut World) { // Get all received packets let mut packets = Vec::new(); - for (entity, connection) in world.query::<(Entity, &Connection)>().iter(world) { + for (entity, connection) in world.query::<(Entity, &ClientConnection)>().iter(world) { while let Some(mut packet) = connection.0.recv() { if packet.len() < 8 { println!("Invalid packet received: {:?}", packet); @@ -100,7 +103,7 @@ impl NetworkPlugin { let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap()); packets.push(( entity, - Connection(Arc::clone(&connection.0)), + ClientConnection(Arc::clone(&connection.0)), packet_id, packet, )); @@ -120,17 +123,23 @@ impl NetworkPlugin { } /// Remove disconnected [Connection]s. - fn remove_disconnected(mut commands: Commands, connections: Query<(Entity, &Connection)>) { + fn remove_disconnected( + mut commands: Commands, + connections: Query<(Entity, &ClientConnection)>, + ) { for (entity, connection) in connections.iter() { if connection.0.closed() { - commands.entity(entity).remove::(); + commands.entity(entity).remove::(); } } } #[cfg(feature = "sync")] /// Send to clients the [Synced] entity that has been added to the server. - fn send_added(added_entities: Query>, connections: Query<&Connection>) { + fn send_added( + added_entities: Query>, + connections: Query<&ClientConnection>, + ) { for entity in added_entities.iter() { for connection in connections.iter() { connection.send(&entity); @@ -142,7 +151,7 @@ impl NetworkPlugin { /// Send [Synced] entities to new clients. fn send_synced( synced_entities: Query>, - new_connections: Query<&Connection, Added>, + new_connections: Query<&ClientConnection, Added>, ) { for entity in synced_entities.iter() { for connection in new_connections.iter() { @@ -155,7 +164,7 @@ impl NetworkPlugin { /// Send to clients the [Synced] entity that has been removed. fn send_removed( mut removed_entities: RemovedComponents, - connections: Query<&Connection>, + connections: Query<&ClientConnection>, ) { for entity in removed_entities.iter() { for connection in connections.iter() { @@ -165,7 +174,7 @@ impl NetworkPlugin { } } -impl Plugin for NetworkPlugin { +impl Plugin for ServerPlugin { fn build(&self, app: &mut App) { app.insert_resource(HandlerManager(Arc::new(HashMap::new()))); app.add_system(Self::handle_packets); @@ -210,7 +219,7 @@ impl NetworkExt for App { #[cfg(feature = "sync")] fn sync_server_event(&mut self) -> &mut Self { self.add_event::().add_system( - |mut events: EventReader, connections: Query<&Connection>| { + |mut events: EventReader, connections: Query<&ClientConnection>| { for event in events.iter() { for connection in connections.iter() { connection.send(event); @@ -222,10 +231,10 @@ impl NetworkExt for App { #[cfg(feature = "sync")] fn sync_client_event(&mut self) -> &mut Self { - self.add_event::>() + self.add_event::>() .add_packet_handler::( |entity, connection, data, world| match bincode::deserialize::(&data) { - Ok(event) => world.send_event(PacketEvent { + Ok(event) => world.send_event(FromClient { entity, connection, event, @@ -239,7 +248,7 @@ impl NetworkExt for App { fn sync_component(&mut self) -> &mut Self { let update_components = |changed_components: Query<(Entity, &C), (Changed, With)>, - connections: Query<&Connection>| { + connections: Query<&ClientConnection>| { for (entity, component) in changed_components.iter() { for connection in connections.iter() { connection.send(&(entity, component.clone())); @@ -248,26 +257,27 @@ impl NetworkExt for App { }; let send_components = |components: Query<(Entity, &C), With>, - new_connections: Query<&Connection, Added>| { + new_connections: Query<&ClientConnection, Added>| { for (entity, component) in components.iter() { for connection in new_connections.iter() { connection.send(&(entity, component.clone())); } } }; - let remove_components = |mut removed_components: RemovedComponents, - synced_entities: Query>, - connections: Query<&Connection>| { - for entity in removed_components.iter() { - if synced_entities.contains(entity) { - for connection in connections.iter() { - connection.send(&(entity, PhantomData::)); + let remove_components = + |mut removed_components: RemovedComponents, + synced_entities: Query>, + connections: Query<&ClientConnection>| { + for entity in removed_components.iter() { + if synced_entities.contains(entity) { + for connection in connections.iter() { + connection.send(&(entity, PhantomData::)); + } } } - } - }; - self.add_system(update_components.after(NetworkPlugin::send_added)) - .add_system(send_components.after(NetworkPlugin::send_synced)) + }; + self.add_system(update_components.after(ServerPlugin::send_added)) + .add_system(send_components.after(ServerPlugin::send_synced)) .add_system(remove_components.after(update_components)) } }