From f3868b17cf8afff18fb6026a5f2ab982dc186641 Mon Sep 17 00:00:00 2001 From: Tipragot Date: Sat, 29 Apr 2023 15:17:29 +0200 Subject: [PATCH] Separate things to make it works with all features --- Cargo.toml | 2 +- src/client.rs | 227 +++++++++++++++++++++++++ src/lib.rs | 449 +------------------------------------------------- src/server.rs | 273 ++++++++++++++++++++++++++++++ src/tcp.rs | 6 +- 5 files changed, 508 insertions(+), 449 deletions(-) create mode 100644 src/client.rs create mode 100644 src/server.rs diff --git a/Cargo.toml b/Cargo.toml index ff61668..78964fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ bincode = "1.3.3" bevy = "0.10.1" [features] -default = ["server", "sync"] +default = ["client", "server", "sync"] server = [] client = [] sync = [] diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..a92db51 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,227 @@ +use bevy::prelude::*; +use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc}; + +#[cfg(feature = "sync")] +use ::{ + serde::{de::DeserializeOwned, Serialize}, + std::{any::type_name, marker::PhantomData}, +}; + +use crate::{tcp, Packet}; + +/// A trait for a handler function. +pub trait PacketHandlerFn: Fn(Vec, &mut World) + Send + Sync + 'static {} + +impl, &mut World) + Send + Sync + 'static> PacketHandlerFn for T {} + +/// A function that handle a received [Packet]s. +pub type PacketHandler = Box; + +/// A Bevy resource that store the packets handlers. +#[derive(Resource)] +struct HandlerManager(Arc>); + +/// A [Connection] to a remote server. +#[derive(Resource)] +pub struct Connection(tcp::Connection); + +impl Connection { + /// Connects to a remote server. + pub fn connect(addr: A) -> io::Result { + Ok(Self(tcp::Connection::connect(addr)?)) + } + + /// Sends a packet through this connection. + pub fn send(&self, packet: &P) { + let mut data = bincode::serialize(packet).expect("Failed to serialize packet"); + data.extend(P::packet_id().to_be_bytes()); + self.0.send(data); + } +} + +#[cfg(feature = "sync")] +/// An event that comes from the server. +#[derive(Deref)] +pub struct PacketEvent { + /// The event. + pub event: E, +} + +#[cfg(feature = "sync")] +/// Mark an [Entity] as synced by the server. +#[derive(Component)] +pub struct Synced(Entity); + +/// A plugin that manage the network [Connection]s. +pub struct NetworkPlugin; + +impl NetworkPlugin { + #[cfg(feature = "client")] + /// Handles a received [Packet] on the server. + pub fn handle_packets(world: &mut World) { + // Get all received packets + let mut packets = Vec::new(); + if let Some(connection) = world.get_resource::() { + while let Some(mut packet) = connection.0.recv() { + if packet.len() < 8 { + println!("Invalid packet received: {:?}", packet); + } else { + let id_buffer = packet.split_off(packet.len() - 8); + let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap()); + packets.push((packet_id, packet)); + } + } + } else { + return; + } + + // Get the packet handlers + let handlers = Arc::clone(&world.resource_mut::().0); + + // Handle all received packets + for (packet_id, packet) in packets { + if let Some(handler) = handlers.get(&packet_id) { + handler(packet, world); + } + } + } + + #[cfg(feature = "client")] + /// Remove [Connection] if it's disconnected. + pub fn remove_disconnected(mut commands: Commands, connection: Option>) { + if let Some(connection) = connection { + if connection.0.closed() { + commands.remove_resource::(); + } + } + } + + #[cfg(feature = "sync")] + /// Removes [ServerEntity] when disconnected. + fn remove_synced(mut commands: Commands, entities: Query>) { + for entity in entities.iter() { + commands.entity(entity).despawn(); + } + } +} + +impl Plugin for NetworkPlugin { + fn build(&self, app: &mut App) { + app.insert_resource(HandlerManager(Arc::new(HashMap::new()))); + app.add_system(Self::handle_packets); + app.add_system(Self::remove_disconnected); + + #[cfg(feature = "sync")] + { + app.add_packet_handler::(|data, world| { + match bincode::deserialize::(&data) { + Ok(entity) => { + if let Some((local_entity, _)) = world + .query::<(Entity, &Synced)>() + .iter(world) + .find(|(_, server_entity)| server_entity.0 == entity) + { + world.despawn(local_entity); + } else { + world.spawn(Synced(entity)); + } + } + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + } + }); + app.add_system(Self::remove_synced.run_if(resource_removed::())); + } + } +} + +/// An extension to add packet handlers. +pub trait NetworkExt { + /// Add a new packet handler. + fn add_packet_handler(&mut self, handler: H) -> &mut Self; + + #[cfg(all(feature = "sync"))] + /// Register syncronization for an [Event] that can be sent by the server. + fn sync_server_event(&mut self) -> &mut Self; + + #[cfg(all(feature = "sync"))] + /// Register syncronization for an [Event] that comes from the client. + fn sync_client_event(&mut self) -> &mut Self; + + #[cfg(all(feature = "sync"))] + /// Register a [Component] to be synced. + fn sync_component(&mut self) -> &mut Self; +} + +impl NetworkExt for App { + fn add_packet_handler(&mut self, handler: H) -> &mut Self { + Arc::get_mut(&mut self.world.resource_mut::().0) + .unwrap() + .insert(P::packet_id(), Box::new(handler)); + self + } + + #[cfg(feature = "sync")] + fn sync_server_event(&mut self) -> &mut Self { + self.add_event::>() + .add_packet_handler::(|data, world| match bincode::deserialize::(&data) { + Ok(event) => world.send_event(PacketEvent { event }), + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + }) + } + + #[cfg(feature = "sync")] + fn sync_client_event(&mut self) -> &mut Self { + self.add_event::().add_system( + |mut events: EventReader, connection: Option>| { + if let Some(connection) = connection { + for event in events.iter() { + connection.send(event); + } + } + }, + ) + } + + #[cfg(feature = "sync")] + fn sync_component(&mut self) -> &mut Self { + self.add_packet_handler::<(Entity, C), _>(|data, world| { + match bincode::deserialize::<(Entity, C)>(&data) { + Ok((entity, component)) => { + if let Some((local_entity, _)) = world + .query::<(Entity, &Synced)>() + .iter(world) + .find(|(_, server_entity)| server_entity.0 == entity) + { + let mut local_entity = world.entity_mut(local_entity); + match local_entity.get_mut::() { + Some(mut local_component) => { + println!("CA CHANGE: {}", type_name::()); + *local_component = component; + } + None => { + local_entity.insert(component); + } + } + } else { + println!("Received component for unknown entity: {:?}", entity); + } + } + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + } + }) + .add_packet_handler::<(Entity, PhantomData), _>( + |data, world| match bincode::deserialize::<(Entity, PhantomData)>(&data) { + Ok((entity, _)) => { + if let Some((local_entity, _)) = world + .query::<(Entity, &Synced)>() + .iter(world) + .find(|(_, server_entity)| server_entity.0 == entity) + { + world.entity_mut(local_entity).remove::(); + } + } + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + }, + ) + } +} diff --git a/src/lib.rs b/src/lib.rs index 6ed0547..f9849ca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,16 +1,14 @@ -use bevy::prelude::*; use serde::{de::DeserializeOwned, Serialize}; use std::{ collections::hash_map::DefaultHasher, hash::{Hash, Hasher}, }; -use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc}; -#[cfg(feature = "sync")] -use std::{any::type_name, marker::PhantomData}; +#[cfg(feature = "client")] +pub mod client; -#[cfg(all(feature = "sync", feature = "server"))] -use std::ops::Deref; +#[cfg(feature = "server")] +pub mod server; mod tcp; @@ -30,442 +28,3 @@ pub trait Packet: DeserializeOwned + Serialize + Send + Sync { } impl Packet for T {} - -#[cfg(feature = "server")] -/// A trait for a handler function. -pub trait PacketHandlerFn: - Fn(Entity, Connection, Vec, &mut World) + Send + Sync + 'static -{ -} - -#[cfg(feature = "server")] -impl, &mut World) + Send + Sync + 'static> PacketHandlerFn for T {} - -#[cfg(feature = "client")] -/// A trait for a handler function. -pub trait PacketHandlerFn: Fn(Vec, &mut World) + Send + Sync + 'static {} - -#[cfg(feature = "client")] -impl, &mut World) + Send + Sync + 'static> PacketHandlerFn for T {} - -/// A function that handle a received [Packet]s. -pub type PacketHandler = Box; - -/// A Bevy resource that store the packets handlers. -#[derive(Resource)] -struct HandlerManager(Arc>); - -#[cfg(feature = "server")] -/// A Bevy resource that listens for incoming [Connection]s. -#[derive(Resource)] -pub struct Listener(tcp::Listener); - -#[cfg(feature = "server")] -impl Listener { - /// Creates a new [Listener] on the given address. - pub fn bind(addr: A) -> io::Result { - Ok(Self(tcp::Listener::bind(addr)?)) - } -} - -#[cfg(feature = "server")] -/// A [Connection] to a remote client. -#[derive(Component)] -pub struct Connection(Arc); - -#[cfg(feature = "client")] -/// A [Connection] to a remote server. -#[derive(Resource)] -pub struct Connection(tcp::Connection); - -impl Connection { - #[cfg(feature = "client")] - /// Connects to a remote server. - pub fn connect(addr: A) -> io::Result { - Ok(Self(tcp::Connection::connect(addr)?)) - } - - /// Sends a packet through this connection. - pub fn send(&self, packet: &P) { - let mut data = bincode::serialize(packet).expect("Failed to serialize packet"); - data.extend(P::packet_id().to_be_bytes()); - self.0.send(data); - } -} - -#[cfg(feature = "server")] -/// An event that comes from a client. -pub struct PacketEvent { - /// The entity of the [Connection] that sent the event. - pub entity: Entity, - - /// The [Connection] that sent the event. - pub connection: Connection, - - /// The event. - pub event: E, -} - -#[cfg(all(feature = "sync", feature = "server"))] -impl Deref for PacketEvent { - type Target = E; - - fn deref(&self) -> &Self::Target { - &self.event - } -} - -#[cfg(all(feature = "sync", feature = "client"))] -/// An event that comes from the server. -#[derive(Deref)] -pub struct PacketEvent { - /// The event. - pub event: E, -} - -#[cfg(all(feature = "sync", feature = "server"))] -/// Mark an [Entity] to be synced. -#[derive(Component)] -pub struct Synced; - -#[cfg(all(feature = "sync", feature = "client"))] -/// Mark an [Entity] as synced by the server. -#[derive(Component)] -pub struct Synced(Entity); - -/// A plugin that manage the network [Connection]s. -pub struct NetworkPlugin; - -impl NetworkPlugin { - #[cfg(feature = "server")] - /// Accept new [Connection]s. - fn accept_connections(mut commands: Commands, listener: Option>) { - if let Some(listener) = listener { - if let Some(connection) = listener.0.accept() { - commands.spawn(Connection(Arc::new(connection))); - } - } - } - - #[cfg(feature = "server")] - /// Handles a received [Packet]s. - fn handle_packets(world: &mut World) { - // Get all received packets - let mut packets = Vec::new(); - for (entity, connection) in world.query::<(Entity, &Connection)>().iter(world) { - while let Some(mut packet) = connection.0.recv() { - if packet.len() < 8 { - println!("Invalid packet received: {:?}", packet); - } else { - let id_buffer = packet.split_off(packet.len() - 8); - let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap()); - packets.push(( - entity, - Connection(Arc::clone(&connection.0)), - packet_id, - packet, - )); - } - } - } - - // Get the packet handlers - let handlers = Arc::clone(&world.resource_mut::().0); - - // Handle all received packets - for (entity, connection, packet_id, packet) in packets { - if let Some(handler) = handlers.get(&packet_id) { - handler(entity, connection, packet, world); - } - } - } - - #[cfg(feature = "client")] - /// Handles a received [Packet] on the server. - pub fn handle_packets(world: &mut World) { - // Get all received packets - let mut packets = Vec::new(); - if let Some(connection) = world.get_resource::() { - while let Some(mut packet) = connection.0.recv() { - if packet.len() < 8 { - println!("Invalid packet received: {:?}", packet); - } else { - let id_buffer = packet.split_off(packet.len() - 8); - let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap()); - packets.push((packet_id, packet)); - } - } - } else { - return; - } - - // Get the packet handlers - let handlers = Arc::clone(&world.resource_mut::().0); - - // Handle all received packets - for (packet_id, packet) in packets { - if let Some(handler) = handlers.get(&packet_id) { - handler(packet, world); - } - } - } - - #[cfg(feature = "server")] - /// Remove disconnected [Connection]s. - fn remove_disconnected(mut commands: Commands, connections: Query<(Entity, &Connection)>) { - for (entity, connection) in connections.iter() { - if connection.0.closed() { - commands.entity(entity).remove::(); - } - } - } - - #[cfg(feature = "client")] - /// Remove [Connection] if it's disconnected. - pub fn remove_disconnected(mut commands: Commands, connection: Option>) { - if let Some(connection) = connection { - if connection.0.closed() { - commands.remove_resource::(); - } - } - } - - #[cfg(all(feature = "sync", feature = "server"))] - /// Send to clients the [Synced] entity that has been added to the server. - fn send_added(added_entities: Query>, connections: Query<&Connection>) { - for entity in added_entities.iter() { - for connection in connections.iter() { - connection.send(&entity); - } - } - } - - #[cfg(all(feature = "sync", feature = "server"))] - /// Send [Synced] entities to new clients. - fn send_synced( - synced_entities: Query>, - new_connections: Query<&Connection, Added>, - ) { - for entity in synced_entities.iter() { - for connection in new_connections.iter() { - connection.send(&entity); - } - } - } - - #[cfg(all(feature = "sync", feature = "server"))] - /// Send to clients the [Synced] entity that has been removed. - fn send_removed( - mut removed_entities: RemovedComponents, - connections: Query<&Connection>, - ) { - for entity in removed_entities.iter() { - for connection in connections.iter() { - connection.send(&entity); - } - } - } - - #[cfg(all(feature = "sync", feature = "client"))] - /// Removes [ServerEntity] when disconnected. - fn remove_synced(mut commands: Commands, entities: Query>) { - for entity in entities.iter() { - commands.entity(entity).despawn(); - } - } -} - -impl Plugin for NetworkPlugin { - fn build(&self, app: &mut App) { - app.insert_resource(HandlerManager(Arc::new(HashMap::new()))); - app.add_system(Self::handle_packets); - app.add_system(Self::remove_disconnected); - - #[cfg(feature = "server")] - app.add_system(Self::accept_connections); - - #[cfg(all(feature = "sync", feature = "server"))] - { - app.add_system(Self::send_added); - app.add_system(Self::send_synced); - app.add_system(Self::send_removed); - } - - #[cfg(all(feature = "sync", feature = "client"))] - { - app.add_packet_handler::(|data, world| { - match bincode::deserialize::(&data) { - Ok(entity) => { - if let Some((local_entity, _)) = world - .query::<(Entity, &Synced)>() - .iter(world) - .find(|(_, server_entity)| server_entity.0 == entity) - { - world.despawn(local_entity); - } else { - world.spawn(Synced(entity)); - } - } - Err(_) => println!("Failed to deserialize packet: {}", type_name::()), - } - }); - app.add_system(Self::remove_synced.run_if(resource_removed::())); - } - } -} - -/// An extension to add packet handlers. -pub trait NetworkExt { - /// Add a new packet handler. - fn add_packet_handler(&mut self, handler: H) -> &mut Self; - - #[cfg(all(feature = "sync"))] - /// Register syncronization for an [Event] that can be sent by the server. - fn sync_server_event(&mut self) -> &mut Self; - - #[cfg(all(feature = "sync"))] - /// Register syncronization for an [Event] that comes from the client. - fn sync_client_event(&mut self) -> &mut Self; - - #[cfg(all(feature = "sync"))] - /// Register a [Component] to be synced. - fn sync_component(&mut self) -> &mut Self; -} - -impl NetworkExt for App { - fn add_packet_handler(&mut self, handler: H) -> &mut Self { - Arc::get_mut(&mut self.world.resource_mut::().0) - .unwrap() - .insert(P::packet_id(), Box::new(handler)); - self - } - - #[cfg(all(feature = "sync", feature = "server"))] - fn sync_server_event(&mut self) -> &mut Self { - self.add_event::().add_system( - |mut events: EventReader, connections: Query<&Connection>| { - for event in events.iter() { - for connection in connections.iter() { - connection.send(event); - } - } - }, - ) - } - - #[cfg(all(feature = "sync", feature = "client"))] - fn sync_server_event(&mut self) -> &mut Self { - self.add_event::>() - .add_packet_handler::(|data, world| match bincode::deserialize::(&data) { - Ok(event) => world.send_event(PacketEvent { event }), - Err(_) => println!("Failed to deserialize packet: {}", type_name::()), - }) - } - - #[cfg(all(feature = "sync", feature = "server"))] - fn sync_client_event(&mut self) -> &mut Self { - self.add_event::>() - .add_packet_handler::( - |entity, connection, data, world| match bincode::deserialize::(&data) { - Ok(event) => world.send_event(PacketEvent { - entity, - connection, - event, - }), - Err(_) => println!("Failed to deserialize packet: {}", type_name::()), - }, - ) - } - - #[cfg(all(feature = "sync", feature = "client"))] - fn sync_client_event(&mut self) -> &mut Self { - self.add_event::().add_system( - |mut events: EventReader, connection: Option>| { - if let Some(connection) = connection { - for event in events.iter() { - connection.send(event); - } - } - }, - ) - } - - #[cfg(all(feature = "sync", feature = "server"))] - fn sync_component(&mut self) -> &mut Self { - let update_components = - |changed_components: Query<(Entity, &C), (Changed, With)>, - connections: Query<&Connection>| { - for (entity, component) in changed_components.iter() { - for connection in connections.iter() { - connection.send(&(entity, component.clone())); - } - } - }; - let send_components = - |components: Query<(Entity, &C), With>, - new_connections: Query<&Connection, Added>| { - for (entity, component) in components.iter() { - for connection in new_connections.iter() { - connection.send(&(entity, component.clone())); - } - } - }; - let remove_components = |mut removed_components: RemovedComponents, - synced_entities: Query>, - connections: Query<&Connection>| { - for entity in removed_components.iter() { - if synced_entities.contains(entity) { - for connection in connections.iter() { - connection.send(&(entity, PhantomData::)); - } - } - } - }; - self.add_system(update_components.after(NetworkPlugin::send_added)) - .add_system(send_components.after(NetworkPlugin::send_synced)) - .add_system(remove_components.after(update_components)) - } - - #[cfg(all(feature = "sync", feature = "client"))] - fn sync_component(&mut self) -> &mut Self { - self.add_packet_handler::<(Entity, C), _>(|data, world| { - match bincode::deserialize::<(Entity, C)>(&data) { - Ok((entity, component)) => { - if let Some((local_entity, _)) = world - .query::<(Entity, &Synced)>() - .iter(world) - .find(|(_, server_entity)| server_entity.0 == entity) - { - let mut local_entity = world.entity_mut(local_entity); - match local_entity.get_mut::() { - Some(mut local_component) => { - println!("CA CHANGE: {}", type_name::()); - *local_component = component; - } - None => { - local_entity.insert(component); - } - } - } else { - println!("Received component for unknown entity: {:?}", entity); - } - } - Err(_) => println!("Failed to deserialize packet: {}", type_name::()), - } - }) - .add_packet_handler::<(Entity, PhantomData), _>( - |data, world| match bincode::deserialize::<(Entity, PhantomData)>(&data) { - Ok((entity, _)) => { - if let Some((local_entity, _)) = world - .query::<(Entity, &Synced)>() - .iter(world) - .find(|(_, server_entity)| server_entity.0 == entity) - { - world.entity_mut(local_entity).remove::(); - } - } - Err(_) => println!("Failed to deserialize packet: {}", type_name::()), - }, - ) - } -} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..8ac9277 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,273 @@ +use crate::{tcp, Packet}; +use bevy::prelude::*; +use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc}; + +#[cfg(feature = "sync")] +use ::{ + serde::{de::DeserializeOwned, Serialize}, + std::{any::type_name, marker::PhantomData, ops::Deref}, +}; + +/// A trait for a handler function. +pub trait PacketHandlerFn: + Fn(Entity, Connection, Vec, &mut World) + Send + Sync + 'static +{ +} + +impl, &mut World) + Send + Sync + 'static> PacketHandlerFn for T {} + +/// A function that handle a received [Packet]s. +pub type PacketHandler = Box; + +/// A Bevy resource that store the packets handlers. +#[derive(Resource)] +struct HandlerManager(Arc>); + +/// A Bevy resource that listens for incoming [Connection]s. +#[derive(Resource)] +pub struct Listener(tcp::Listener); + +impl Listener { + /// Creates a new [Listener] on the given address. + pub fn bind(addr: A) -> io::Result { + Ok(Self(tcp::Listener::bind(addr)?)) + } +} + +/// A [Connection] to a remote client. +#[derive(Component)] +pub struct Connection(Arc); + +impl Connection { + /// Sends a packet through this connection. + pub fn send(&self, packet: &P) { + let mut data = bincode::serialize(packet).expect("Failed to serialize packet"); + data.extend(P::packet_id().to_be_bytes()); + self.0.send(data); + } +} + +#[cfg(feature = "sync")] +/// An event that comes from a client. +pub struct PacketEvent { + /// The entity of the [Connection] that sent the event. + pub entity: Entity, + + /// The [Connection] that sent the event. + pub connection: Connection, + + /// The event. + pub event: E, +} + +#[cfg(feature = "sync")] +impl Deref for PacketEvent { + type Target = E; + + fn deref(&self) -> &Self::Target { + &self.event + } +} + +#[cfg(feature = "sync")] +/// Mark an [Entity] to be synced. +#[derive(Component)] +pub struct Synced; + +/// A plugin that manage the network [Connection]s. +pub struct NetworkPlugin; + +impl NetworkPlugin { + /// Accept new [Connection]s. + fn accept_connections(mut commands: Commands, listener: Option>) { + if let Some(listener) = listener { + if let Some(connection) = listener.0.accept() { + commands.spawn(Connection(Arc::new(connection))); + } + } + } + + /// Handles a received [Packet]s. + fn handle_packets(world: &mut World) { + // Get all received packets + let mut packets = Vec::new(); + for (entity, connection) in world.query::<(Entity, &Connection)>().iter(world) { + while let Some(mut packet) = connection.0.recv() { + if packet.len() < 8 { + println!("Invalid packet received: {:?}", packet); + } else { + let id_buffer = packet.split_off(packet.len() - 8); + let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap()); + packets.push(( + entity, + Connection(Arc::clone(&connection.0)), + packet_id, + packet, + )); + } + } + } + + // Get the packet handlers + let handlers = Arc::clone(&world.resource_mut::().0); + + // Handle all received packets + for (entity, connection, packet_id, packet) in packets { + if let Some(handler) = handlers.get(&packet_id) { + handler(entity, connection, packet, world); + } + } + } + + /// Remove disconnected [Connection]s. + fn remove_disconnected(mut commands: Commands, connections: Query<(Entity, &Connection)>) { + for (entity, connection) in connections.iter() { + if connection.0.closed() { + commands.entity(entity).remove::(); + } + } + } + + #[cfg(feature = "sync")] + /// Send to clients the [Synced] entity that has been added to the server. + fn send_added(added_entities: Query>, connections: Query<&Connection>) { + for entity in added_entities.iter() { + for connection in connections.iter() { + connection.send(&entity); + } + } + } + + #[cfg(feature = "sync")] + /// Send [Synced] entities to new clients. + fn send_synced( + synced_entities: Query>, + new_connections: Query<&Connection, Added>, + ) { + for entity in synced_entities.iter() { + for connection in new_connections.iter() { + connection.send(&entity); + } + } + } + + #[cfg(feature = "sync")] + /// Send to clients the [Synced] entity that has been removed. + fn send_removed( + mut removed_entities: RemovedComponents, + connections: Query<&Connection>, + ) { + for entity in removed_entities.iter() { + for connection in connections.iter() { + connection.send(&entity); + } + } + } +} + +impl Plugin for NetworkPlugin { + fn build(&self, app: &mut App) { + app.insert_resource(HandlerManager(Arc::new(HashMap::new()))); + app.add_system(Self::handle_packets); + app.add_system(Self::remove_disconnected); + app.add_system(Self::accept_connections); + + #[cfg(feature = "sync")] + { + app.add_system(Self::send_added); + app.add_system(Self::send_synced); + app.add_system(Self::send_removed); + } + } +} + +/// An extension to add packet handlers. +pub trait NetworkExt { + /// Add a new packet handler. + fn add_packet_handler(&mut self, handler: H) -> &mut Self; + + #[cfg(all(feature = "sync"))] + /// Register syncronization for an [Event] that can be sent by the server. + fn sync_server_event(&mut self) -> &mut Self; + + #[cfg(all(feature = "sync"))] + /// Register syncronization for an [Event] that comes from the client. + fn sync_client_event(&mut self) -> &mut Self; + + #[cfg(all(feature = "sync"))] + /// Register a [Component] to be synced. + fn sync_component(&mut self) -> &mut Self; +} + +impl NetworkExt for App { + fn add_packet_handler(&mut self, handler: H) -> &mut Self { + Arc::get_mut(&mut self.world.resource_mut::().0) + .unwrap() + .insert(P::packet_id(), Box::new(handler)); + self + } + + #[cfg(feature = "sync")] + fn sync_server_event(&mut self) -> &mut Self { + self.add_event::().add_system( + |mut events: EventReader, connections: Query<&Connection>| { + for event in events.iter() { + for connection in connections.iter() { + connection.send(event); + } + } + }, + ) + } + + #[cfg(feature = "sync")] + fn sync_client_event(&mut self) -> &mut Self { + self.add_event::>() + .add_packet_handler::( + |entity, connection, data, world| match bincode::deserialize::(&data) { + Ok(event) => world.send_event(PacketEvent { + entity, + connection, + event, + }), + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + }, + ) + } + + #[cfg(feature = "sync")] + fn sync_component(&mut self) -> &mut Self { + let update_components = + |changed_components: Query<(Entity, &C), (Changed, With)>, + connections: Query<&Connection>| { + for (entity, component) in changed_components.iter() { + for connection in connections.iter() { + connection.send(&(entity, component.clone())); + } + } + }; + let send_components = + |components: Query<(Entity, &C), With>, + new_connections: Query<&Connection, Added>| { + for (entity, component) in components.iter() { + for connection in new_connections.iter() { + connection.send(&(entity, component.clone())); + } + } + }; + let remove_components = |mut removed_components: RemovedComponents, + synced_entities: Query>, + connections: Query<&Connection>| { + for entity in removed_components.iter() { + if synced_entities.contains(entity) { + for connection in connections.iter() { + connection.send(&(entity, PhantomData::)); + } + } + } + }; + self.add_system(update_components.after(NetworkPlugin::send_added)) + .add_system(send_components.after(NetworkPlugin::send_synced)) + .add_system(remove_components.after(update_components)) + } +} diff --git a/src/tcp.rs b/src/tcp.rs index d6611bf..0905256 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -9,6 +9,9 @@ use std::{ thread, }; +#[cfg(feature = "server")] +use std::net::TcpListener; + /// A non-blocking TCP connection. pub struct Connection { /// Track if the connection has been closed. @@ -123,9 +126,6 @@ impl Drop for Connection { } } -#[cfg(feature = "server")] -use std::net::TcpListener; - #[cfg(feature = "server")] /// A [Connection] listener. pub struct Listener {