diff --git a/examples/ping_pong.rs b/examples/ping_pong.rs deleted file mode 100644 index 1445160..0000000 --- a/examples/ping_pong.rs +++ /dev/null @@ -1,99 +0,0 @@ -use bevnet::{ - impl_packet, AppClientNetwork, AppServerNetwork, ClientConnection, ClientListener, - ClientNetworkPlugin, PacketEvent, ServerConnection, ServerNetworkPlugin, -}; -use bevy::prelude::*; -use serde::{Deserialize, Serialize}; - -/// A ping packet. -#[derive(Serialize, Deserialize)] -pub struct PingPacket; -impl_packet!(PingPacket); - -/// A pong packet. -#[derive(Serialize, Deserialize)] -pub struct PongPacket; -impl_packet!(PongPacket); - -/// Launch the server. -fn start_server(mut commands: Commands, keys: Res>) { - if keys.just_pressed(KeyCode::B) { - println!("Starting server..."); - match ClientListener::bind("127.0.0.1:8000") { - Ok(listener) => { - println!("Listening on {}", listener.address()); - commands.insert_resource(listener); - } - Err(e) => println!("Failed to bind: {}", e), - } - } -} - -/// Show when a client connects. -fn client_connected(connection: Query<(Entity, &ClientConnection), Added>) { - for (entity, connection) in connection.iter() { - println!("Client connected: {} as {:?}", connection.address(), entity); - } -} - -/// Show when a client disconnects. -fn client_disconnected(mut removed: RemovedComponents) { - for entity in removed.iter() { - println!("Client disconnected: {:?}", entity); - } -} - -/// Receive the ping packets. -fn receive_ping(mut events: EventReader>) { - for PacketEvent { connection, .. } in events.iter() { - println!("Received ping from {}", connection.address()); - connection.send(PongPacket); - println!("Response sent!"); - } -} - -/// Connect the client to the server. -fn connect(mut commands: Commands, keys: Res>) { - if keys.just_pressed(KeyCode::C) { - println!("Connecting..."); - match ServerConnection::connect("127.0.0.1:8000") { - Ok(connection) => { - println!("Connected to {}", connection.address()); - commands.insert_resource(connection); - } - Err(e) => println!("Failed to connect: {}", e), - } - } -} - -/// Receive the pong packets. -fn receive_pong(mut events: EventReader) { - for _ in events.iter() { - println!("Received pong!"); - } -} - -/// Send a ping packet to the server. -fn send_ping(keys: Res>, connection: Res) { - if keys.just_pressed(KeyCode::P) { - connection.send(PingPacket); - println!("Ping sent!"); - } -} - -fn main() { - App::new() - .add_plugins(DefaultPlugins) - .add_plugin(ServerNetworkPlugin) - .add_system(start_server) - .add_system(client_connected) - .add_system(client_disconnected) - .add_system(receive_ping) - .register_server_packet::() - .add_plugin(ClientNetworkPlugin) - .add_system(connect) - .add_system(send_ping.run_if(resource_exists::())) - .add_system(receive_pong) - .register_client_packet::() - .run(); -} diff --git a/src/client/mod.rs b/src/client/mod.rs new file mode 100644 index 0000000..7a3214a --- /dev/null +++ b/src/client/mod.rs @@ -0,0 +1,103 @@ +use crate::{tcp::Connection, Packet}; +use bevy::prelude::*; +use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc}; + +pub mod sync; + +/// A function that handle a received [Packet] on the client. +pub type ClientPacketHandler = Box, &mut World) + Send + Sync>; + +/// A Bevy resource that store the packets handlers for the client. +#[derive(Resource)] +pub struct ClientHandlerManager(Arc>); + +/// A connection to a remote server. +#[derive(Resource)] +pub struct ServerConnection(Connection); + +impl ServerConnection { + /// Connects to a remote server. + pub fn connect(addr: A) -> io::Result { + Ok(Self(Connection::connect(addr)?)) + } + + /// Sends a packet through this connection. + pub fn send(&self, packet: &P) { + let mut data = bincode::serialize(packet).expect("Failed to serialize packet"); + data.extend(P::packet_id().to_be_bytes()); + self.0.send(data); + } +} + +/// A plugin that manage the network connections for a server. +pub struct ClientPlugin; + +impl ClientPlugin { + /// Handles a received [Packet] on the server. + pub fn handle_packets(world: &mut World) { + // Get all received packets + let mut packets = Vec::new(); + if let Some(connection) = world.get_resource::() { + while let Some(mut packet) = connection.0.recv() { + if packet.len() < 8 { + println!("Invalid packet received: {:?}", packet); + } else { + let id_buffer = packet.split_off(packet.len() - 8); + let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap()); + packets.push((packet_id, packet)); + } + } + } else { + return; + } + + // Get the packet handlers + let handlers = Arc::clone(&world.resource_mut::().0); + + // Handle all received packets + for (packet_id, packet) in packets { + if let Some(handler) = handlers.get(&packet_id) { + handler(packet, world); + } + } + } + + /// Remove [ServerConnection] if it's disconnected. + pub fn remove_disconnected(mut commands: Commands, connection: Option>) { + if let Some(connection) = connection { + if connection.0.closed() { + commands.remove_resource::(); + } + } + } +} + +impl Plugin for ClientPlugin { + fn build(&self, app: &mut App) { + app.insert_resource(ClientHandlerManager(Arc::new(HashMap::new()))); + app.add_system(ClientPlugin::handle_packets); + app.add_system(ClientPlugin::remove_disconnected); + } +} + +/// An extension to add packet handlers. +pub trait ClientAppExt { + /// Add a new packet handler. + fn add_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(Vec, &mut World) + Send + Sync + 'static; +} + +impl ClientAppExt for App { + fn add_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(Vec, &mut World) + Send + Sync + 'static, + { + Arc::get_mut(&mut self.world.resource_mut::().0) + .unwrap() + .insert(P::packet_id(), Box::new(handler)); + self + } +} diff --git a/src/client/sync.rs b/src/client/sync.rs new file mode 100644 index 0000000..fa1b195 --- /dev/null +++ b/src/client/sync.rs @@ -0,0 +1,129 @@ +use std::{any::type_name, marker::PhantomData}; + +use super::{ClientAppExt, ServerConnection}; +use crate::Packet; +use bevy::prelude::*; +use serde::{de::DeserializeOwned, Serialize}; + +/// An event that comes from the server. +#[derive(Deref)] +pub struct FromServer { + /// The event. + pub event: E, +} + +/// Mark an [Entity] as synced by the server. +#[derive(Component)] +pub struct ServerEntity(Entity); + +/// A plugin for the syncronization system. +pub struct ClientSyncPlugin; + +impl ClientSyncPlugin { + /// Removes [ServerEntity] when disconnected. + fn remove_synced(mut commands: Commands, entities: Query>) { + for entity in entities.iter() { + commands.entity(entity).despawn(); + } + } +} + +impl Plugin for ClientSyncPlugin { + fn build(&self, app: &mut App) { + app.add_packet_handler::(|data, world| { + match bincode::deserialize::(&data) { + Ok(entity) => { + if let Some((local_entity, _)) = world + .query::<(Entity, &ServerEntity)>() + .iter(world) + .find(|(_, server_entity)| server_entity.0 == entity) + { + println!("Despawning {:?}", local_entity); + world.despawn(local_entity); + } else { + println!("Spawning {:?}", entity); + world.spawn(ServerEntity(entity)); + } + } + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + } + }); + app.add_system(Self::remove_synced.run_if(resource_removed::())); + } +} + +/// An extention to add syncronization to the client. +pub trait ClientSyncExt { + /// Register syncronization for an [Event] that comes from the server. + fn server_event_sync(&mut self) -> &mut Self; + + /// Register syncronization for an [Event] that can be sent by the client. + fn client_event_sync(&mut self) -> &mut Self; + + /// Register a [Component] to be synced. + fn sync_component(&mut self) -> &mut Self; +} + +impl ClientSyncExt for App { + fn server_event_sync(&mut self) -> &mut Self { + self.add_event::>() + .add_packet_handler::(|data, world| match bincode::deserialize::(&data) { + Ok(event) => world.send_event(FromServer { event }), + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + }) + } + + fn client_event_sync(&mut self) -> &mut Self { + self.add_event::().add_system( + |mut events: EventReader, connection: Option>| { + if let Some(connection) = connection { + for event in events.iter() { + connection.send(event); + } + } + }, + ) + } + + fn sync_component(&mut self) -> &mut Self { + self.add_packet_handler::<(Entity, C), _>(|data, world| { + match bincode::deserialize::<(Entity, C)>(&data) { + Ok((entity, component)) => { + if let Some((local_entity, _)) = world + .query::<(Entity, &ServerEntity)>() + .iter(world) + .find(|(_, server_entity)| server_entity.0 == entity) + { + let mut local_entity = world.entity_mut(local_entity); + match local_entity.get_mut::() { + Some(mut local_component) => { + println!("CA CHANGE: {}", type_name::()); + *local_component = component; + } + None => { + local_entity.insert(component); + } + } + } else { + println!("Received component for unknown entity: {:?}", entity); + } + } + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + } + }) + .add_packet_handler::<(Entity, PhantomData), _>( + |data, world| match bincode::deserialize::<(Entity, PhantomData)>(&data) { + Ok((entity, _)) => { + if let Some((local_entity, _)) = world + .query::<(Entity, &ServerEntity)>() + .iter(world) + .find(|(_, server_entity)| server_entity.0 == entity) + { + world.entity_mut(local_entity).remove::(); + } + } + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + }, + ) + } +} diff --git a/src/lib.rs b/src/lib.rs index 3897209..bbfe983 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,218 +1,222 @@ -use bevy::prelude::*; -pub use packet::Packet; +use serde::{de::DeserializeOwned, Serialize}; use std::{ - io, - net::{SocketAddr, ToSocketAddrs}, - sync::Arc, + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, }; -use tcp::{Connection, Listener}; -mod packet; +pub mod client; +pub mod server; mod tcp; -/// A connection to a server. +/// A packet that can be sent over a [Connection]. +pub trait Packet: DeserializeOwned + Serialize + Send + Sync { + /// Returns a unique identifier for this packet. + /// + /// This function uses [std::any::type_name] to get a string + /// representation of the type of the object and returns the + /// hash of that string. This is not perfect... but I didn't + /// find a better solution. + fn packet_id() -> u64 { + let mut hasher = DefaultHasher::new(); + std::any::type_name::().hash(&mut hasher); + hasher.finish() + } +} + +impl Packet for T {} + +/* use bevy::{prelude::*, utils::HashMap}; +use std::{ + io::{self, ErrorKind}, + net::{TcpStream, ToSocketAddrs}, + sync::{mpsc::TryRecvError, Arc}, +}; +use tcp::{Connection, Listener, Packet, RawPacket}; + +mod tcp; + +/// A Bevy resource that store the packets handlers for the client. +#[derive(Resource)] +pub struct ClientPacketHandler(Arc>>); + +/// A connection to a remote server. #[derive(Resource)] pub struct ServerConnection(Connection); impl ServerConnection { - /// Creates a [ServerConnection] to the given address. + /// Connects to a remote server. pub fn connect(addr: A) -> io::Result { - Ok(Self(Connection::connect(addr)?)) + Ok(Self(Connection::new(TcpStream::connect(addr)?)?)) } - /// Sends a [Packet] to the server. + /// Send a [Packet] to the remote server. pub fn send(&self, packet: P) { - self.0.send(packet); - } - - /// Gets the address of the server. - pub fn address(&self) -> SocketAddr { - self.0.address() + self.0.send(packet).ok(); } } -/// Used to listen for incoming [ClientConnection]s. +/// A Bevy resource that store the packets handlers for the server. +#[derive(Resource)] +struct ServerPacketHandler( + Arc>>, +); + +/// A [ClientConnection] listener. #[derive(Resource)] pub struct ClientListener(Listener); impl ClientListener { - /// Creates a [ClientListener] binded to the given address. - pub fn bind(address: A) -> io::Result { - Ok(Self(Listener::bind(address)?)) - } - - /// Returns the address the [ClientListener] is bound to. - pub fn address(&self) -> SocketAddr { - self.0.address() + /// Creates a new TCP listener on the given address. + pub fn bind(addr: A) -> io::Result { + Ok(Self(Listener::bind(addr)?)) } } -/// A connection to a client. +/// A connection to a remote client. #[derive(Component)] pub struct ClientConnection(Arc); impl ClientConnection { - /// Sends a [Packet] to the client. + /// Sends a [Packet] to the remote client. pub fn send(&self, packet: P) { - self.0.send(packet); - } - - /// Gets the address of the client. - pub fn address(&self) -> SocketAddr { - self.0.address() + self.0.send(packet).ok(); } } -/// A [Plugin] for client networking. -pub struct ClientNetworkPlugin; - -impl ClientNetworkPlugin { - /// Removes the [ServerConnection] resource when it's disconnected. - fn remove_disconnected(mut commands: Commands, connection: Res) { - if !connection.0.connected() { - commands.remove_resource::(); +/// A Bevy system to handle incoming [ClientConnection]s and remove the +/// [ClientListener] resource if it is no longer listening. +fn accept_connections(mut commands: Commands, listener: Option>) { + if let Some(listener) = listener { + match listener.0.accept() { + Ok(connection) => { + commands.spawn(ClientConnection(Arc::new(connection))); + } + Err(error) => match error.kind() { + ErrorKind::WouldBlock => {} + _ => commands.remove_resource::(), + }, } } - - /// Clears the packet cache of the [ServerConnection]. - fn clear_cache(connection: Res) { - connection.0.clear(); - } } -impl Plugin for ClientNetworkPlugin { - fn build(&self, app: &mut App) { - app.add_systems(( - Self::remove_disconnected.run_if(resource_exists::()), - Self::clear_cache - .run_if(resource_exists::()) - .after(Self::remove_disconnected), - )); - } -} - -/// A [Plugin] for server networking. -pub struct ServerNetworkPlugin; - -impl ServerNetworkPlugin { - /// Removes the [ClientConnection] components when it's disconnected. - fn remove_disconnected( - mut commands: Commands, - connections: Query<(Entity, &ClientConnection)>, - ) { - for (entity, connection) in connections.iter() { - if !connection.0.connected() { - commands.entity(entity).remove::(); +/// A Bevy system to handle incoming [Packet]s from +/// the [ClientConnection]s and the [ServerConnection]. +/// It removes them if they are no longer connected. +fn receive_packets(world: &mut World) { + // Handle client packets + let mut packets = Vec::new(); + let mut to_remove = false; + if let Some(connection) = world.get_resource::() { + if let Some(receiver) = connection.0.recv() { + loop { + match receiver.try_recv() { + Ok(packet) => packets.push(packet), + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + to_remove = true; + break; + } + } } } } - - /// Clears the packet cache of the [ClientConnection]s. - fn clear_cache(connections: Query<&ClientConnection>) { - for connection in connections.iter() { - connection.0.clear(); + if let Some(handlers) = world + .get_resource_mut::() + .map(|handlers| Arc::clone(&handlers.0)) + { + for packet in packets.into_iter() { + if let Some(handler) = handlers.get(&packet.packet_id()) { + (handler)(packet, world); + } } } - - /// Removes the [ClientListener] resource when it stop listening. - fn remove_not_listening(mut commands: Commands, listener: Res) { - if !listener.0.listening() { - commands.remove_resource::(); - } + if to_remove { + world.remove_resource::(); } - /// Accepts incoming connections. - fn accept_connections(mut commands: Commands, listener: Res) { - while let Some(connection) = listener.0.accept() { - commands.spawn(ClientConnection(Arc::new(connection))); + // Handle server packets + let mut packets = Vec::new(); + let mut to_remove = Vec::new(); + for (entity, connection) in world.query::<(Entity, &ClientConnection)>().iter(world) { + if let Some(receiver) = connection.0.recv() { + loop { + match receiver.try_recv() { + Ok(packet) => { + println!("YESSSS"); + packets.push((entity, ClientConnection(Arc::clone(&connection.0)), packet)); + } + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + to_remove.push(entity); + break; + } + } + } } } + if let Some(handlers) = world + .get_resource_mut::() + .map(|handlers| Arc::clone(&handlers.0)) + { + for (entity, connection, packet) in packets.into_iter() { + if let Some(handler) = handlers.get(&packet.packet_id()) { + (handler)(entity, connection, packet, world); + } + } + } + for entity in to_remove { + world.despawn(entity); + } } -impl Plugin for ServerNetworkPlugin { +/// A network plugin. +pub struct NetworkPlugin; + +impl Plugin for NetworkPlugin { fn build(&self, app: &mut App) { - app.add_systems(( - Self::remove_disconnected, - Self::clear_cache.after(Self::remove_disconnected), - Self::remove_not_listening.run_if(resource_exists::()), - Self::accept_connections - .run_if(resource_exists::()) - .after(Self::remove_not_listening), - )); + app.insert_resource(ServerPacketHandler(Arc::new(HashMap::new()))); + app.insert_resource(ClientPacketHandler(Arc::new(HashMap::new()))); + app.add_system(accept_connections); + app.add_system(receive_packets); } } -/// Receives [Packet]s and sends them as [PacketEvent]s. -fn receive_server_packets( - mut writer: EventWriter>, - connection: Query<(Entity, &ClientConnection)>, -) { - for (entity, connection) in connection.iter() { - for packet in connection.0.recv() { - writer.send(PacketEvent { - connection: ClientConnection(Arc::clone(&connection.0)), - entity, - packet, - }); - } +/// A extension trait to add packet handler. +pub trait NetworkAppExt { + /// Add a client packet handler. + fn add_client_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(RawPacket, &mut World) + Send + Sync + 'static; + + /// Add a server packet handler. + fn add_server_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(Entity, ClientConnection, RawPacket, &mut World) + Send + Sync + 'static; +} + +impl NetworkAppExt for App { + fn add_client_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(RawPacket, &mut World) + Send + Sync + 'static, + { + Arc::get_mut(&mut self.world.resource_mut::().0) + .unwrap() + .insert(P::packet_id(), Box::new(handler)); + self } -} -/// An extention trait to easily register a [Packet] to the server. -pub trait AppServerNetwork { - /// Registers a [Packet] for the server. - fn register_server_packet(&mut self) -> &mut Self; -} - -impl AppServerNetwork for App { - fn register_server_packet(&mut self) -> &mut Self { - self.add_event::>(); - self.add_system( - receive_server_packets::

- .after(ServerNetworkPlugin::remove_disconnected) - .before(ServerNetworkPlugin::clear_cache), - ); - self - } -} - -/// An event for received [Packet]s on the server. -pub struct PacketEvent { - /// The [ClientConnection] from which the [Packet] was received. - pub connection: ClientConnection, - - /// The [Entity] of the [ClientConnection]. - pub entity: Entity, - - /// The [Packet] - pub packet: P, -} - -/// Receives [Packet]s and sends them as [Event]s. -fn receive_client_packets( - mut writer: EventWriter

, - connection: Res, -) { - for packet in connection.0.recv() { - writer.send(packet); - } -} - -/// An extention trait to easily register a [Packet] to the client. -pub trait AppClientNetwork { - /// Registers a [Packet] for the client. - fn register_client_packet(&mut self) -> &mut Self; -} - -impl AppClientNetwork for App { - fn register_client_packet(&mut self) -> &mut Self { - self.add_event::

(); - self.add_system( - receive_client_packets::

- .run_if(resource_exists::()) - .after(ClientNetworkPlugin::remove_disconnected) - .before(ClientNetworkPlugin::clear_cache), - ); + fn add_server_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(Entity, ClientConnection, RawPacket, &mut World) + Send + Sync + 'static, + { + Arc::get_mut(&mut self.world.resource_mut::().0) + .unwrap() + .insert(P::packet_id(), Box::new(handler)); self } } + */ diff --git a/src/packet.rs b/src/packet.rs deleted file mode 100644 index 7a9a5e0..0000000 --- a/src/packet.rs +++ /dev/null @@ -1,57 +0,0 @@ -use dashmap::DashMap; -use serde::{de::DeserializeOwned, Serialize}; - -/// A [Packet] that can be sent over the network. -pub trait Packet: DeserializeOwned + Serialize + Sync + Send + 'static { - const ID: u32; -} - -/// A macro to easily implement [Packet]. -#[macro_export] -macro_rules! impl_packet { - ($t:ty) => { - impl ::bevnet::Packet for $t { - const ID: u32 = ::const_fnv1a_hash::fnv1a_hash_32( - concat!(module_path!(), "::", stringify!($t)).as_bytes(), - None, - ); - } - }; -} - -/// A container for the received [Packet]s. -pub struct PacketReceiver { - /// The received data. - data: DashMap>>, -} - -impl PacketReceiver { - /// Creates a new [PacketReceiver]. - pub fn new() -> Self { - Self { - data: DashMap::new(), - } - } - - /// Clears all the [Packet]s. - pub fn clear(&self) { - self.data.clear(); - } - - /// Inserts a the received raw [Packet] into the [PacketReceiver]. - pub fn insert(&self, id: u32, data: Vec) { - self.data.entry(id).or_default().push(data); - } - - /// Extract all the [Packet]s of a given type. - pub fn extract(&self) -> Vec

{ - match self.data.get_mut(&P::ID) { - Some(mut data) => data - .value_mut() - .drain(..) - .filter_map(|data| bincode::deserialize(&data).ok()) - .collect(), - None => Vec::new(), - } - } -} diff --git a/src/server/mod.rs b/src/server/mod.rs new file mode 100644 index 0000000..9a31ad3 --- /dev/null +++ b/src/server/mod.rs @@ -0,0 +1,129 @@ +use crate::{ + tcp::{Connection, Listener}, + Packet, +}; +use bevy::prelude::*; +use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc}; + +pub mod sync; + +/// A function that handle a received [Packet] on the server. +pub type ServerPacketHandler = + Box, &mut World) + Send + Sync>; + +/// A Bevy resource that store the packets handlers for the server. +#[derive(Resource)] +pub struct ServerHandlerManager(Arc>); + +/// A Bevy resource that listens for incoming [ClientConnection]s. +#[derive(Resource)] +pub struct ClientListener(Listener); + +impl ClientListener { + /// Creates a new listener on the given address. + pub fn bind(addr: A) -> io::Result { + Ok(Self(Listener::bind(addr)?)) + } +} + +/// A connection to a remote client. +#[derive(Component)] +pub struct ClientConnection(Arc); + +impl ClientConnection { + /// Sends a packet through this connection. + pub fn send(&self, packet: &P) { + let mut data = bincode::serialize(packet).expect("Failed to serialize packet"); + data.extend(P::packet_id().to_be_bytes()); + self.0.send(data); + } +} + +/// A plugin that manage the network connections for a server. +pub struct ServerPlugin; + +impl ServerPlugin { + /// Accept new [ClientConnection]s. + pub fn accept_connections(mut commands: Commands, listener: Option>) { + if let Some(listener) = listener { + if let Some(connection) = listener.0.accept() { + commands.spawn(ClientConnection(Arc::new(connection))); + } + } + } + + /// Handles a received [Packet] on the server. + pub fn handle_packets(world: &mut World) { + // Get all received packets + let mut packets = Vec::new(); + for (entity, connection) in world.query::<(Entity, &ClientConnection)>().iter(world) { + while let Some(mut packet) = connection.0.recv() { + if packet.len() < 8 { + println!("Invalid packet received: {:?}", packet); + } else { + let id_buffer = packet.split_off(packet.len() - 8); + let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap()); + packets.push(( + entity, + ClientConnection(Arc::clone(&connection.0)), + packet_id, + packet, + )); + } + } + } + + // Get the packet handlers + let handlers = Arc::clone(&world.resource_mut::().0); + + // Handle all received packets + for (entity, connection, packet_id, packet) in packets { + if let Some(handler) = handlers.get(&packet_id) { + handler(entity, connection, packet, world); + } + } + } + + /// Remove disconnected [ClientConnection]s. + pub fn remove_disconnected( + mut commands: Commands, + connections: Query<(Entity, &ClientConnection)>, + ) { + for (entity, connection) in connections.iter() { + if connection.0.closed() { + commands.entity(entity).remove::(); + } + } + } +} + +impl Plugin for ServerPlugin { + fn build(&self, app: &mut App) { + app.insert_resource(ServerHandlerManager(Arc::new(HashMap::new()))); + app.add_system(ServerPlugin::accept_connections); + app.add_system(ServerPlugin::handle_packets); + app.add_system(ServerPlugin::remove_disconnected); + } +} + +/// An extension to add packet handlers. +pub trait ServerAppExt { + /// Add a new packet handler. + fn add_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(Entity, ClientConnection, Vec, &mut World) + Send + Sync + 'static; +} + +impl ServerAppExt for App { + fn add_packet_handler(&mut self, handler: H) -> &mut Self + where + P: Packet, + H: Fn(Entity, ClientConnection, Vec, &mut World) + Send + Sync + 'static, + { + Arc::get_mut(&mut self.world.resource_mut::().0) + .unwrap() + .insert(P::packet_id(), Box::new(handler)); + self + } +} diff --git a/src/server/sync.rs b/src/server/sync.rs new file mode 100644 index 0000000..ad2e7b7 --- /dev/null +++ b/src/server/sync.rs @@ -0,0 +1,154 @@ +use super::ServerAppExt; +use crate::{server::ClientConnection, Packet}; +use bevy::prelude::*; +use serde::{de::DeserializeOwned, Serialize}; +use std::{any::type_name, marker::PhantomData, ops::Deref}; + +/// An event that comes from a client. +pub struct FromClient { + /// The entity of the [ClientConnection] that sent the event. + pub entity: Entity, + + /// The [ClientConnection] that sent the event. + pub connection: ClientConnection, + + /// The event. + pub event: E, +} + +impl Deref for FromClient { + type Target = E; + + fn deref(&self) -> &Self::Target { + &self.event + } +} + +/// Mark an [Entity] to be synced. +#[derive(Component)] +pub struct Synced; + +/// A plugin for the syncronization system. +pub struct ServerSyncPlugin; + +impl ServerSyncPlugin { + /// Send to clients the [Synced] entity that has been added to the server. + fn send_added( + added_entities: Query>, + connections: Query<&ClientConnection>, + ) { + for entity in added_entities.iter() { + for connection in connections.iter() { + connection.send(&entity); + } + } + } + + /// Send [Synced] entities to new clients. + fn send_synced( + synced_entities: Query>, + new_connections: Query<&ClientConnection, Added>, + ) { + for entity in synced_entities.iter() { + for connection in new_connections.iter() { + connection.send(&entity); + } + } + } + + /// Send to clients the [Synced] entity that has been removed. + fn send_removed( + mut removed_entities: RemovedComponents, + connections: Query<&ClientConnection>, + ) { + for entity in removed_entities.iter() { + for connection in connections.iter() { + connection.send(&entity); + } + } + } +} + +impl Plugin for ServerSyncPlugin { + fn build(&self, app: &mut App) { + app.add_system(Self::send_added); + app.add_system(Self::send_synced); + app.add_system(Self::send_removed); + } +} + +/// An extention to add syncronization to the server. +pub trait ServerSyncExt { + /// Register syncronization for an [Event] that can be sent by the server. + fn server_event_sync(&mut self) -> &mut Self; + + /// Register syncronization for an [Event] that comes from the client. + fn client_event_sync(&mut self) -> &mut Self; + + /// Register a [Component] to be synced. + fn sync_component(&mut self) -> &mut Self; +} + +impl ServerSyncExt for App { + fn server_event_sync(&mut self) -> &mut Self { + self.add_event::().add_system( + |mut events: EventReader, connections: Query<&ClientConnection>| { + for event in events.iter() { + for connection in connections.iter() { + connection.send(event); + } + } + }, + ) + } + + fn client_event_sync(&mut self) -> &mut Self { + self.add_event::>() + .add_packet_handler::( + |entity, connection, data, world| match bincode::deserialize::(&data) { + Ok(event) => world.send_event(FromClient { + entity, + connection, + event, + }), + Err(_) => println!("Failed to deserialize packet: {}", type_name::()), + }, + ) + } + + fn sync_component(&mut self) -> &mut Self { + let update_components = + |changed_components: Query<(Entity, &C), (Changed, With)>, + connections: Query<&ClientConnection>| { + for (entity, component) in changed_components.iter() { + for connection in connections.iter() { + connection.send(&(entity, component.clone())); + } + } + }; + let send_components = + |components: Query<(Entity, &C), With>, + new_connections: Query<&ClientConnection, Added>| { + for (entity, component) in components.iter() { + for connection in new_connections.iter() { + connection.send(&(entity, component.clone())); + } + } + }; + let remove_components = + |mut removed_components: RemovedComponents, + synced_entities: Query>, + connections: Query<&ClientConnection>| { + for entity in removed_components.iter() { + if synced_entities.contains(entity) { + for connection in connections.iter() { + connection.send(&(entity, PhantomData::)); + } + } + } + }; + self.add_system(update_components.after(ServerSyncPlugin::send_added)) + .add_system(send_components.after(ServerSyncPlugin::send_synced)) + .add_system(remove_components.after(update_components)) + } +} diff --git a/src/tcp.rs b/src/tcp.rs index 092d562..413f287 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -1,7 +1,6 @@ -use crate::packet::{Packet, PacketReceiver}; use std::{ io::{self, Read, Write}, - net::{Shutdown, SocketAddr, TcpListener, TcpStream, ToSocketAddrs}, + net::{Shutdown, TcpListener, TcpStream, ToSocketAddrs}, sync::{ atomic::{AtomicBool, Ordering}, mpsc::{channel, Receiver, Sender}, @@ -10,224 +9,144 @@ use std::{ thread, }; -/// Used to send [Packet] to the sending thread. -type ConnectionSender = Arc)>>>; - -/// A TCP [Connection] that can send and receive [Packet]. +/// A non-blocking TCP connection. pub struct Connection { - /// Whether or not the [Connection] is currently connected. - connected: Arc, + /// Track if the connection has been closed. + closed: Arc, - /// Used to store the received [Packet]s. - packets: Arc, - - /// Used to send [Packet] to the sending thread. - sender: ConnectionSender, - - /// The [TcpStream] of the [Connection]. + /// The underlying TCP stream. stream: TcpStream, - /// The address of the [Connection]. - address: SocketAddr, + /// Used to receive packets from the receiving thread. + receiver: Mutex>>, + + /// Used to send packets to the sending thread. + sender: Mutex>>, } impl Connection { - /// Creates a new [Connection] with the given [TcpStream]. - pub fn new(stream: TcpStream) -> io::Result { - let connected = Arc::new(AtomicBool::new(true)); - let packets = Arc::new(PacketReceiver::new()); + /// Creates a new connection. + fn new(stream: TcpStream) -> io::Result { + stream.set_nonblocking(false)?; + let closed = Arc::new(AtomicBool::new(false)); - // Receiving part - let mut thread_stream = stream.try_clone()?; - let thread_packets = Arc::clone(&packets); - let thread_connected = Arc::clone(&connected); - thread::spawn(move || { - let mut int_buffer = [0; 4]; + // Spawn the receiving thread + let thread_stream = stream.try_clone()?; + let (thread_sender, receiver) = channel(); + let thread_closed = Arc::clone(&closed); + thread::spawn(move || Self::receiving_loop(thread_stream, thread_sender, thread_closed)); - loop { - // Check if the connection is closed - if !thread_connected.load(Ordering::Relaxed) { - return; - } - - // Read the length of the packet - if thread_stream.read_exact(&mut int_buffer).is_err() { - break; - } - let len = u32::from_be_bytes(int_buffer); - - // Read the packet identifier - if thread_stream.read_exact(&mut int_buffer).is_err() { - break; - } - let id = u32::from_be_bytes(int_buffer); - - // Read the packet - let mut buffer = vec![0; len as usize]; - if thread_stream.read_exact(&mut buffer).is_err() { - break; - } - - // Insert the packet - thread_packets.insert(id, buffer); - } - - // Close the connection - thread_connected.store(false, Ordering::Relaxed); - }); - - // Sending part - let mut thread_stream = stream.try_clone()?; - let (sender, receiver) = channel(); - let thread_connected = Arc::clone(&connected); - thread::spawn(move || { - loop { - // Check if the connection is closed - if !thread_connected.load(Ordering::Relaxed) { - return; - } - - // Get the data to send - let (id, buffer): (u32, Vec) = match receiver.recv() { - Ok(data) => data, - Err(_) => break, - }; - - // Send the length of the data - let len = buffer.len() as u32; - if thread_stream.write_all(&len.to_be_bytes()).is_err() { - break; - } - - // Send the packet identifier - if thread_stream.write_all(&id.to_be_bytes()).is_err() { - break; - } - - // Send the data - if thread_stream.write_all(&buffer).is_err() { - break; - } - - // Flush the stream - if thread_stream.flush().is_err() { - break; - } - } - - // Close the connection - thread_connected.store(false, Ordering::Relaxed); - }); + // Spawn the sending thread + let thread_stream = stream.try_clone()?; + let (sender, thread_receiver) = channel(); + let thread_closed = Arc::clone(&closed); + thread::spawn(move || Self::sending_loop(thread_stream, thread_receiver, thread_closed)); + // Return the connection Ok(Self { - connected, - packets, - sender: Arc::new(Mutex::new(sender)), - address: stream.peer_addr()?, + closed, stream, + receiver: Mutex::new(receiver), + sender: Mutex::new(sender), }) } - /// Creates a [Connection] to the given address. - pub fn connect(address: A) -> io::Result { - Self::new(TcpStream::connect(address)?) + /// Creates a new connection to the given address. + pub fn connect(addr: A) -> io::Result { + Self::new(TcpStream::connect(addr)?) } - /// Returns whether or not the [Connection] is currently connected. - pub fn connected(&self) -> bool { - self.connected.load(Ordering::Relaxed) - } - - /// Clears the [Packet] cache. - pub fn clear(&self) { - self.packets.clear(); - } - - /// Gets all the received [Packet]s of a certain type. - pub fn recv(&self) -> Vec

{ - self.packets.extract() - } - - /// Sends the given [Packet] to the [Connection]. - /// Does nothing if the [Connection] is closed. - pub fn send(&self, packet: P) { - let data = bincode::serialize(&packet).expect("Failed to serialize packet"); - self.sender - .lock() - .map(|sender| sender.send((P::ID, data))) - .ok(); - } - - /// Returns the address of the [Connection]. - pub fn address(&self) -> SocketAddr { - self.address - } -} - -impl Drop for Connection { - fn drop(&mut self) { - self.connected.store(false, Ordering::Relaxed); - self.stream.shutdown(Shutdown::Both).ok(); - } -} - -/// A TCP [Listener] that can accept [Connection]s. -pub struct Listener { - /// Whether the [Listener] is listening. - listening: Arc, - - /// The receiving part of the [Listener]. - receiver: Arc>>, - - /// The address the [Listener] is bound to. - address: SocketAddr, -} - -impl Listener { - /// Creates a new [Listener] binded to the given address. - pub fn bind(address: A) -> io::Result { - let listener = TcpListener::bind(address)?; - let address = listener.local_addr()?; - let listening = Arc::new(AtomicBool::new(true)); - let listening_thread = Arc::clone(&listening); - let (sender, receiver) = channel(); - thread::spawn(move || { - for stream in listener.incoming() { - let connection = match stream { - Ok(stream) => match Connection::new(stream) { - Ok(connection) => connection, - Err(_) => break, - }, - Err(_) => break, - }; - if sender.send(connection).is_err() { - break; - } + /// The receiving loop for this connection. + fn receiving_loop(mut stream: TcpStream, sender: Sender>, closed: Arc) { + let mut len_buffer = [0; 4]; + loop { + // Read the length of the next packet + if stream.read_exact(&mut len_buffer).is_err() { + break; } - listening_thread.store(false, Ordering::Relaxed); - }); - Ok(Self { - listening, - receiver: Arc::new(Mutex::new(receiver)), - address, - }) + let len = u32::from_be_bytes(len_buffer); + + // Read the packet + let mut packet = vec![0; len as usize]; + if stream.read_exact(&mut packet).is_err() { + break; + } + + // Send the packet + if sender.send(packet).is_err() { + break; + } + } + closed.store(true, Ordering::Relaxed); } - /// Returns whether or not the [Listener] is listening. - pub fn listening(&self) -> bool { - self.listening.load(Ordering::Relaxed) + /// The sending loop for this connection. + fn sending_loop(mut stream: TcpStream, receiver: Receiver>, closed: Arc) { + loop { + // Get the next packet to send + let packet = match receiver.recv() { + Ok(packet) => packet, + Err(_) => break, + }; + + // Send the length of the packet + let len_buffer = u32::to_be_bytes(packet.len() as u32); + if stream.write_all(&len_buffer).is_err() { + break; + } + + // Send the packet + if stream.write_all(&packet).is_err() { + break; + } + } + closed.store(true, Ordering::Relaxed); } - /// Receives the next [Connection] from the [Listener]. - pub fn accept(&self) -> Option { + /// Returns `true` if the connection has been closed. + pub fn closed(&self) -> bool { + self.closed.load(Ordering::Relaxed) + } + + /// Returns the next received packet. + pub fn recv(&self) -> Option> { self.receiver .lock() .ok() .and_then(|receiver| receiver.try_recv().ok()) } - /// Returns the address the [Listener] is bound to. - pub fn address(&self) -> SocketAddr { - self.address + /// Sends a packet through this connection. + pub fn send(&self, packet: Vec) { + self.sender.lock().map(|sender| sender.send(packet)).ok(); + } +} + +impl Drop for Connection { + fn drop(&mut self) { + self.stream.shutdown(Shutdown::Both).ok(); + } +} + +/// A [Connection] listener. +pub struct Listener { + /// The [TcpListener] of the listener. + listener: TcpListener, +} + +impl Listener { + /// Creates a new TCP listener on the given address. + pub fn bind(addr: A) -> io::Result { + let listener = TcpListener::bind(addr)?; + listener.set_nonblocking(true)?; + Ok(Self { listener }) + } + + /// Accepts a new [Connection]. + pub fn accept(&self) -> Option { + self.listener + .accept() + .and_then(|(stream, _)| Connection::new(stream)) + .ok() } }