From 9f233c6482d4ac8045c8d882f43d52ec146eaa81 Mon Sep 17 00:00:00 2001 From: Tipragot Date: Thu, 27 Apr 2023 10:00:33 +0200 Subject: [PATCH] Remove last system --- examples/ping_pong.rs | 99 ------------------ src/lib.rs | 218 --------------------------------------- src/packet.rs | 57 ----------- src/tcp.rs | 233 ------------------------------------------ 4 files changed, 607 deletions(-) delete mode 100644 examples/ping_pong.rs delete mode 100644 src/packet.rs delete mode 100644 src/tcp.rs diff --git a/examples/ping_pong.rs b/examples/ping_pong.rs deleted file mode 100644 index 1445160..0000000 --- a/examples/ping_pong.rs +++ /dev/null @@ -1,99 +0,0 @@ -use bevnet::{ - impl_packet, AppClientNetwork, AppServerNetwork, ClientConnection, ClientListener, - ClientNetworkPlugin, PacketEvent, ServerConnection, ServerNetworkPlugin, -}; -use bevy::prelude::*; -use serde::{Deserialize, Serialize}; - -/// A ping packet. -#[derive(Serialize, Deserialize)] -pub struct PingPacket; -impl_packet!(PingPacket); - -/// A pong packet. -#[derive(Serialize, Deserialize)] -pub struct PongPacket; -impl_packet!(PongPacket); - -/// Launch the server. -fn start_server(mut commands: Commands, keys: Res>) { - if keys.just_pressed(KeyCode::B) { - println!("Starting server..."); - match ClientListener::bind("127.0.0.1:8000") { - Ok(listener) => { - println!("Listening on {}", listener.address()); - commands.insert_resource(listener); - } - Err(e) => println!("Failed to bind: {}", e), - } - } -} - -/// Show when a client connects. -fn client_connected(connection: Query<(Entity, &ClientConnection), Added>) { - for (entity, connection) in connection.iter() { - println!("Client connected: {} as {:?}", connection.address(), entity); - } -} - -/// Show when a client disconnects. -fn client_disconnected(mut removed: RemovedComponents) { - for entity in removed.iter() { - println!("Client disconnected: {:?}", entity); - } -} - -/// Receive the ping packets. -fn receive_ping(mut events: EventReader>) { - for PacketEvent { connection, .. } in events.iter() { - println!("Received ping from {}", connection.address()); - connection.send(PongPacket); - println!("Response sent!"); - } -} - -/// Connect the client to the server. -fn connect(mut commands: Commands, keys: Res>) { - if keys.just_pressed(KeyCode::C) { - println!("Connecting..."); - match ServerConnection::connect("127.0.0.1:8000") { - Ok(connection) => { - println!("Connected to {}", connection.address()); - commands.insert_resource(connection); - } - Err(e) => println!("Failed to connect: {}", e), - } - } -} - -/// Receive the pong packets. -fn receive_pong(mut events: EventReader) { - for _ in events.iter() { - println!("Received pong!"); - } -} - -/// Send a ping packet to the server. -fn send_ping(keys: Res>, connection: Res) { - if keys.just_pressed(KeyCode::P) { - connection.send(PingPacket); - println!("Ping sent!"); - } -} - -fn main() { - App::new() - .add_plugins(DefaultPlugins) - .add_plugin(ServerNetworkPlugin) - .add_system(start_server) - .add_system(client_connected) - .add_system(client_disconnected) - .add_system(receive_ping) - .register_server_packet::() - .add_plugin(ClientNetworkPlugin) - .add_system(connect) - .add_system(send_ping.run_if(resource_exists::())) - .add_system(receive_pong) - .register_client_packet::() - .run(); -} diff --git a/src/lib.rs b/src/lib.rs index 3897209..e69de29 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,218 +0,0 @@ -use bevy::prelude::*; -pub use packet::Packet; -use std::{ - io, - net::{SocketAddr, ToSocketAddrs}, - sync::Arc, -}; -use tcp::{Connection, Listener}; - -mod packet; -mod tcp; - -/// A connection to a server. -#[derive(Resource)] -pub struct ServerConnection(Connection); - -impl ServerConnection { - /// Creates a [ServerConnection] to the given address. - pub fn connect(addr: A) -> io::Result { - Ok(Self(Connection::connect(addr)?)) - } - - /// Sends a [Packet] to the server. - pub fn send(&self, packet: P) { - self.0.send(packet); - } - - /// Gets the address of the server. - pub fn address(&self) -> SocketAddr { - self.0.address() - } -} - -/// Used to listen for incoming [ClientConnection]s. -#[derive(Resource)] -pub struct ClientListener(Listener); - -impl ClientListener { - /// Creates a [ClientListener] binded to the given address. - pub fn bind(address: A) -> io::Result { - Ok(Self(Listener::bind(address)?)) - } - - /// Returns the address the [ClientListener] is bound to. - pub fn address(&self) -> SocketAddr { - self.0.address() - } -} - -/// A connection to a client. -#[derive(Component)] -pub struct ClientConnection(Arc); - -impl ClientConnection { - /// Sends a [Packet] to the client. - pub fn send(&self, packet: P) { - self.0.send(packet); - } - - /// Gets the address of the client. - pub fn address(&self) -> SocketAddr { - self.0.address() - } -} - -/// A [Plugin] for client networking. -pub struct ClientNetworkPlugin; - -impl ClientNetworkPlugin { - /// Removes the [ServerConnection] resource when it's disconnected. - fn remove_disconnected(mut commands: Commands, connection: Res) { - if !connection.0.connected() { - commands.remove_resource::(); - } - } - - /// Clears the packet cache of the [ServerConnection]. - fn clear_cache(connection: Res) { - connection.0.clear(); - } -} - -impl Plugin for ClientNetworkPlugin { - fn build(&self, app: &mut App) { - app.add_systems(( - Self::remove_disconnected.run_if(resource_exists::()), - Self::clear_cache - .run_if(resource_exists::()) - .after(Self::remove_disconnected), - )); - } -} - -/// A [Plugin] for server networking. -pub struct ServerNetworkPlugin; - -impl ServerNetworkPlugin { - /// Removes the [ClientConnection] components when it's disconnected. - fn remove_disconnected( - mut commands: Commands, - connections: Query<(Entity, &ClientConnection)>, - ) { - for (entity, connection) in connections.iter() { - if !connection.0.connected() { - commands.entity(entity).remove::(); - } - } - } - - /// Clears the packet cache of the [ClientConnection]s. - fn clear_cache(connections: Query<&ClientConnection>) { - for connection in connections.iter() { - connection.0.clear(); - } - } - - /// Removes the [ClientListener] resource when it stop listening. - fn remove_not_listening(mut commands: Commands, listener: Res) { - if !listener.0.listening() { - commands.remove_resource::(); - } - } - - /// Accepts incoming connections. - fn accept_connections(mut commands: Commands, listener: Res) { - while let Some(connection) = listener.0.accept() { - commands.spawn(ClientConnection(Arc::new(connection))); - } - } -} - -impl Plugin for ServerNetworkPlugin { - fn build(&self, app: &mut App) { - app.add_systems(( - Self::remove_disconnected, - Self::clear_cache.after(Self::remove_disconnected), - Self::remove_not_listening.run_if(resource_exists::()), - Self::accept_connections - .run_if(resource_exists::()) - .after(Self::remove_not_listening), - )); - } -} - -/// Receives [Packet]s and sends them as [PacketEvent]s. -fn receive_server_packets( - mut writer: EventWriter>, - connection: Query<(Entity, &ClientConnection)>, -) { - for (entity, connection) in connection.iter() { - for packet in connection.0.recv() { - writer.send(PacketEvent { - connection: ClientConnection(Arc::clone(&connection.0)), - entity, - packet, - }); - } - } -} - -/// An extention trait to easily register a [Packet] to the server. -pub trait AppServerNetwork { - /// Registers a [Packet] for the server. - fn register_server_packet(&mut self) -> &mut Self; -} - -impl AppServerNetwork for App { - fn register_server_packet(&mut self) -> &mut Self { - self.add_event::>(); - self.add_system( - receive_server_packets::

- .after(ServerNetworkPlugin::remove_disconnected) - .before(ServerNetworkPlugin::clear_cache), - ); - self - } -} - -/// An event for received [Packet]s on the server. -pub struct PacketEvent { - /// The [ClientConnection] from which the [Packet] was received. - pub connection: ClientConnection, - - /// The [Entity] of the [ClientConnection]. - pub entity: Entity, - - /// The [Packet] - pub packet: P, -} - -/// Receives [Packet]s and sends them as [Event]s. -fn receive_client_packets( - mut writer: EventWriter

, - connection: Res, -) { - for packet in connection.0.recv() { - writer.send(packet); - } -} - -/// An extention trait to easily register a [Packet] to the client. -pub trait AppClientNetwork { - /// Registers a [Packet] for the client. - fn register_client_packet(&mut self) -> &mut Self; -} - -impl AppClientNetwork for App { - fn register_client_packet(&mut self) -> &mut Self { - self.add_event::

(); - self.add_system( - receive_client_packets::

- .run_if(resource_exists::()) - .after(ClientNetworkPlugin::remove_disconnected) - .before(ClientNetworkPlugin::clear_cache), - ); - self - } -} diff --git a/src/packet.rs b/src/packet.rs deleted file mode 100644 index 7a9a5e0..0000000 --- a/src/packet.rs +++ /dev/null @@ -1,57 +0,0 @@ -use dashmap::DashMap; -use serde::{de::DeserializeOwned, Serialize}; - -/// A [Packet] that can be sent over the network. -pub trait Packet: DeserializeOwned + Serialize + Sync + Send + 'static { - const ID: u32; -} - -/// A macro to easily implement [Packet]. -#[macro_export] -macro_rules! impl_packet { - ($t:ty) => { - impl ::bevnet::Packet for $t { - const ID: u32 = ::const_fnv1a_hash::fnv1a_hash_32( - concat!(module_path!(), "::", stringify!($t)).as_bytes(), - None, - ); - } - }; -} - -/// A container for the received [Packet]s. -pub struct PacketReceiver { - /// The received data. - data: DashMap>>, -} - -impl PacketReceiver { - /// Creates a new [PacketReceiver]. - pub fn new() -> Self { - Self { - data: DashMap::new(), - } - } - - /// Clears all the [Packet]s. - pub fn clear(&self) { - self.data.clear(); - } - - /// Inserts a the received raw [Packet] into the [PacketReceiver]. - pub fn insert(&self, id: u32, data: Vec) { - self.data.entry(id).or_default().push(data); - } - - /// Extract all the [Packet]s of a given type. - pub fn extract(&self) -> Vec

{ - match self.data.get_mut(&P::ID) { - Some(mut data) => data - .value_mut() - .drain(..) - .filter_map(|data| bincode::deserialize(&data).ok()) - .collect(), - None => Vec::new(), - } - } -} diff --git a/src/tcp.rs b/src/tcp.rs deleted file mode 100644 index 092d562..0000000 --- a/src/tcp.rs +++ /dev/null @@ -1,233 +0,0 @@ -use crate::packet::{Packet, PacketReceiver}; -use std::{ - io::{self, Read, Write}, - net::{Shutdown, SocketAddr, TcpListener, TcpStream, ToSocketAddrs}, - sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::{channel, Receiver, Sender}, - Arc, Mutex, - }, - thread, -}; - -/// Used to send [Packet] to the sending thread. -type ConnectionSender = Arc)>>>; - -/// A TCP [Connection] that can send and receive [Packet]. -pub struct Connection { - /// Whether or not the [Connection] is currently connected. - connected: Arc, - - /// Used to store the received [Packet]s. - packets: Arc, - - /// Used to send [Packet] to the sending thread. - sender: ConnectionSender, - - /// The [TcpStream] of the [Connection]. - stream: TcpStream, - - /// The address of the [Connection]. - address: SocketAddr, -} - -impl Connection { - /// Creates a new [Connection] with the given [TcpStream]. - pub fn new(stream: TcpStream) -> io::Result { - let connected = Arc::new(AtomicBool::new(true)); - let packets = Arc::new(PacketReceiver::new()); - - // Receiving part - let mut thread_stream = stream.try_clone()?; - let thread_packets = Arc::clone(&packets); - let thread_connected = Arc::clone(&connected); - thread::spawn(move || { - let mut int_buffer = [0; 4]; - - loop { - // Check if the connection is closed - if !thread_connected.load(Ordering::Relaxed) { - return; - } - - // Read the length of the packet - if thread_stream.read_exact(&mut int_buffer).is_err() { - break; - } - let len = u32::from_be_bytes(int_buffer); - - // Read the packet identifier - if thread_stream.read_exact(&mut int_buffer).is_err() { - break; - } - let id = u32::from_be_bytes(int_buffer); - - // Read the packet - let mut buffer = vec![0; len as usize]; - if thread_stream.read_exact(&mut buffer).is_err() { - break; - } - - // Insert the packet - thread_packets.insert(id, buffer); - } - - // Close the connection - thread_connected.store(false, Ordering::Relaxed); - }); - - // Sending part - let mut thread_stream = stream.try_clone()?; - let (sender, receiver) = channel(); - let thread_connected = Arc::clone(&connected); - thread::spawn(move || { - loop { - // Check if the connection is closed - if !thread_connected.load(Ordering::Relaxed) { - return; - } - - // Get the data to send - let (id, buffer): (u32, Vec) = match receiver.recv() { - Ok(data) => data, - Err(_) => break, - }; - - // Send the length of the data - let len = buffer.len() as u32; - if thread_stream.write_all(&len.to_be_bytes()).is_err() { - break; - } - - // Send the packet identifier - if thread_stream.write_all(&id.to_be_bytes()).is_err() { - break; - } - - // Send the data - if thread_stream.write_all(&buffer).is_err() { - break; - } - - // Flush the stream - if thread_stream.flush().is_err() { - break; - } - } - - // Close the connection - thread_connected.store(false, Ordering::Relaxed); - }); - - Ok(Self { - connected, - packets, - sender: Arc::new(Mutex::new(sender)), - address: stream.peer_addr()?, - stream, - }) - } - - /// Creates a [Connection] to the given address. - pub fn connect(address: A) -> io::Result { - Self::new(TcpStream::connect(address)?) - } - - /// Returns whether or not the [Connection] is currently connected. - pub fn connected(&self) -> bool { - self.connected.load(Ordering::Relaxed) - } - - /// Clears the [Packet] cache. - pub fn clear(&self) { - self.packets.clear(); - } - - /// Gets all the received [Packet]s of a certain type. - pub fn recv(&self) -> Vec

{ - self.packets.extract() - } - - /// Sends the given [Packet] to the [Connection]. - /// Does nothing if the [Connection] is closed. - pub fn send(&self, packet: P) { - let data = bincode::serialize(&packet).expect("Failed to serialize packet"); - self.sender - .lock() - .map(|sender| sender.send((P::ID, data))) - .ok(); - } - - /// Returns the address of the [Connection]. - pub fn address(&self) -> SocketAddr { - self.address - } -} - -impl Drop for Connection { - fn drop(&mut self) { - self.connected.store(false, Ordering::Relaxed); - self.stream.shutdown(Shutdown::Both).ok(); - } -} - -/// A TCP [Listener] that can accept [Connection]s. -pub struct Listener { - /// Whether the [Listener] is listening. - listening: Arc, - - /// The receiving part of the [Listener]. - receiver: Arc>>, - - /// The address the [Listener] is bound to. - address: SocketAddr, -} - -impl Listener { - /// Creates a new [Listener] binded to the given address. - pub fn bind(address: A) -> io::Result { - let listener = TcpListener::bind(address)?; - let address = listener.local_addr()?; - let listening = Arc::new(AtomicBool::new(true)); - let listening_thread = Arc::clone(&listening); - let (sender, receiver) = channel(); - thread::spawn(move || { - for stream in listener.incoming() { - let connection = match stream { - Ok(stream) => match Connection::new(stream) { - Ok(connection) => connection, - Err(_) => break, - }, - Err(_) => break, - }; - if sender.send(connection).is_err() { - break; - } - } - listening_thread.store(false, Ordering::Relaxed); - }); - Ok(Self { - listening, - receiver: Arc::new(Mutex::new(receiver)), - address, - }) - } - - /// Returns whether or not the [Listener] is listening. - pub fn listening(&self) -> bool { - self.listening.load(Ordering::Relaxed) - } - - /// Receives the next [Connection] from the [Listener]. - pub fn accept(&self) -> Option { - self.receiver - .lock() - .ok() - .and_then(|receiver| receiver.try_recv().ok()) - } - - /// Returns the address the [Listener] is bound to. - pub fn address(&self) -> SocketAddr { - self.address - } -}