Merge branch 'refactor' into 'main'
Refactor the code See merge request tipragot/bevnet!5
This commit is contained in:
commit
84ff6d6947
12
Cargo.toml
12
Cargo.toml
|
@ -10,8 +10,12 @@ categories = ["network-programming", "game-development"]
|
||||||
repository = "https://git.tipragot.fr/tipragot/bevnet"
|
repository = "https://git.tipragot.fr/tipragot/bevnet"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
bevy = "0.10.1"
|
|
||||||
bincode = "1.3.3"
|
|
||||||
const-fnv1a-hash = "1.1.0"
|
|
||||||
dashmap = "5.4.0"
|
|
||||||
serde = { version = "1.0.160", features = ["derive"] }
|
serde = { version = "1.0.160", features = ["derive"] }
|
||||||
|
bincode = "1.3.3"
|
||||||
|
bevy = "0.10.1"
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = ["client", "server", "sync"]
|
||||||
|
server = []
|
||||||
|
client = []
|
||||||
|
sync = []
|
||||||
|
|
227
src/client.rs
Normal file
227
src/client.rs
Normal file
|
@ -0,0 +1,227 @@
|
||||||
|
use bevy::prelude::*;
|
||||||
|
use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc};
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
use ::{
|
||||||
|
serde::{de::DeserializeOwned, Serialize},
|
||||||
|
std::{any::type_name, marker::PhantomData},
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{tcp, Packet};
|
||||||
|
|
||||||
|
/// A trait for a handler function.
|
||||||
|
pub trait PacketHandlerFn: Fn(Vec<u8>, &mut World) + Send + Sync + 'static {}
|
||||||
|
|
||||||
|
impl<T: Fn(Vec<u8>, &mut World) + Send + Sync + 'static> PacketHandlerFn for T {}
|
||||||
|
|
||||||
|
/// A function that handle a received [Packet]s.
|
||||||
|
type PacketHandler = Box<dyn PacketHandlerFn>;
|
||||||
|
|
||||||
|
/// A Bevy resource that store the packets handlers.
|
||||||
|
#[derive(Resource)]
|
||||||
|
struct HandlerManager(Arc<HashMap<u64, PacketHandler>>);
|
||||||
|
|
||||||
|
/// A [Connection] to a remote server.
|
||||||
|
#[derive(Resource)]
|
||||||
|
pub struct ServerConnection(tcp::Connection);
|
||||||
|
|
||||||
|
impl ServerConnection {
|
||||||
|
/// Connects to a remote server.
|
||||||
|
pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
|
||||||
|
Ok(Self(tcp::Connection::connect(addr)?))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sends a packet through this connection.
|
||||||
|
pub fn send<P: Packet>(&self, packet: &P) {
|
||||||
|
let mut data = bincode::serialize(packet).expect("Failed to serialize packet");
|
||||||
|
data.extend(P::packet_id().to_be_bytes());
|
||||||
|
self.0.send(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
/// An event that comes from the server.
|
||||||
|
#[derive(Deref)]
|
||||||
|
pub struct FromServer<E: Event + Packet> {
|
||||||
|
/// The event.
|
||||||
|
pub event: E,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
/// Mark an [Entity] as synced by the server.
|
||||||
|
#[derive(Component)]
|
||||||
|
pub struct ServerEntity(Entity);
|
||||||
|
|
||||||
|
/// A plugin that manage the network [Connection]s.
|
||||||
|
pub struct ClientPlugin;
|
||||||
|
|
||||||
|
impl ClientPlugin {
|
||||||
|
#[cfg(feature = "client")]
|
||||||
|
/// Handles a received [Packet] on the server.
|
||||||
|
pub fn handle_packets(world: &mut World) {
|
||||||
|
// Get all received packets
|
||||||
|
let mut packets = Vec::new();
|
||||||
|
if let Some(connection) = world.get_resource::<ServerConnection>() {
|
||||||
|
while let Some(mut packet) = connection.0.recv() {
|
||||||
|
if packet.len() < 8 {
|
||||||
|
println!("Invalid packet received: {:?}", packet);
|
||||||
|
} else {
|
||||||
|
let id_buffer = packet.split_off(packet.len() - 8);
|
||||||
|
let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap());
|
||||||
|
packets.push((packet_id, packet));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the packet handlers
|
||||||
|
let handlers = Arc::clone(&world.resource_mut::<HandlerManager>().0);
|
||||||
|
|
||||||
|
// Handle all received packets
|
||||||
|
for (packet_id, packet) in packets {
|
||||||
|
if let Some(handler) = handlers.get(&packet_id) {
|
||||||
|
handler(packet, world);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "client")]
|
||||||
|
/// Remove [Connection] if it's disconnected.
|
||||||
|
pub fn remove_disconnected(mut commands: Commands, connection: Option<Res<ServerConnection>>) {
|
||||||
|
if let Some(connection) = connection {
|
||||||
|
if connection.0.closed() {
|
||||||
|
commands.remove_resource::<ServerConnection>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
/// Removes [ServerEntity] when disconnected.
|
||||||
|
fn remove_synced(mut commands: Commands, entities: Query<Entity, With<ServerEntity>>) {
|
||||||
|
for entity in entities.iter() {
|
||||||
|
commands.entity(entity).despawn();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Plugin for ClientPlugin {
|
||||||
|
fn build(&self, app: &mut App) {
|
||||||
|
app.insert_resource(HandlerManager(Arc::new(HashMap::new())));
|
||||||
|
app.add_system(Self::handle_packets);
|
||||||
|
app.add_system(Self::remove_disconnected);
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
{
|
||||||
|
app.add_packet_handler::<Entity, _>(|data, world| {
|
||||||
|
match bincode::deserialize::<Entity>(&data) {
|
||||||
|
Ok(entity) => {
|
||||||
|
if let Some((local_entity, _)) = world
|
||||||
|
.query::<(Entity, &ServerEntity)>()
|
||||||
|
.iter(world)
|
||||||
|
.find(|(_, server_entity)| server_entity.0 == entity)
|
||||||
|
{
|
||||||
|
world.despawn(local_entity);
|
||||||
|
} else {
|
||||||
|
world.spawn(ServerEntity(entity));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => println!("Failed to deserialize packet: {}", type_name::<Entity>()),
|
||||||
|
}
|
||||||
|
});
|
||||||
|
app.add_system(Self::remove_synced.run_if(resource_removed::<ServerConnection>()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An extension to add packet handlers.
|
||||||
|
pub trait NetworkExt {
|
||||||
|
/// Add a new packet handler.
|
||||||
|
fn add_packet_handler<P: Packet, H: PacketHandlerFn>(&mut self, handler: H) -> &mut Self;
|
||||||
|
|
||||||
|
#[cfg(all(feature = "sync"))]
|
||||||
|
/// Register syncronization for an [Event] that can be sent by the server.
|
||||||
|
fn sync_server_event<E: Event + Packet>(&mut self) -> &mut Self;
|
||||||
|
|
||||||
|
#[cfg(all(feature = "sync"))]
|
||||||
|
/// Register syncronization for an [Event] that comes from the client.
|
||||||
|
fn sync_client_event<E: Event + Packet>(&mut self) -> &mut Self;
|
||||||
|
|
||||||
|
#[cfg(all(feature = "sync"))]
|
||||||
|
/// Register a [Component] to be synced.
|
||||||
|
fn sync_component<C: Component + DeserializeOwned + Serialize + Clone>(&mut self) -> &mut Self;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NetworkExt for App {
|
||||||
|
fn add_packet_handler<P: Packet, H: PacketHandlerFn>(&mut self, handler: H) -> &mut Self {
|
||||||
|
Arc::get_mut(&mut self.world.resource_mut::<HandlerManager>().0)
|
||||||
|
.unwrap()
|
||||||
|
.insert(P::packet_id(), Box::new(handler));
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
fn sync_server_event<E: Event + Packet>(&mut self) -> &mut Self {
|
||||||
|
self.add_event::<FromServer<E>>()
|
||||||
|
.add_packet_handler::<E, _>(|data, world| match bincode::deserialize::<E>(&data) {
|
||||||
|
Ok(event) => world.send_event(FromServer { event }),
|
||||||
|
Err(_) => println!("Failed to deserialize packet: {}", type_name::<E>()),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
fn sync_client_event<E: Event + Packet>(&mut self) -> &mut Self {
|
||||||
|
self.add_event::<E>().add_system(
|
||||||
|
|mut events: EventReader<E>, connection: Option<Res<ServerConnection>>| {
|
||||||
|
if let Some(connection) = connection {
|
||||||
|
for event in events.iter() {
|
||||||
|
connection.send(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
fn sync_component<C: Component + DeserializeOwned + Serialize>(&mut self) -> &mut Self {
|
||||||
|
self.add_packet_handler::<(Entity, C), _>(|data, world| {
|
||||||
|
match bincode::deserialize::<(Entity, C)>(&data) {
|
||||||
|
Ok((entity, component)) => {
|
||||||
|
if let Some((local_entity, _)) = world
|
||||||
|
.query::<(Entity, &ServerEntity)>()
|
||||||
|
.iter(world)
|
||||||
|
.find(|(_, server_entity)| server_entity.0 == entity)
|
||||||
|
{
|
||||||
|
let mut local_entity = world.entity_mut(local_entity);
|
||||||
|
match local_entity.get_mut::<C>() {
|
||||||
|
Some(mut local_component) => {
|
||||||
|
println!("CA CHANGE: {}", type_name::<C>());
|
||||||
|
*local_component = component;
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
local_entity.insert(component);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
println!("Received component for unknown entity: {:?}", entity);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => println!("Failed to deserialize packet: {}", type_name::<C>()),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.add_packet_handler::<(Entity, PhantomData<C>), _>(
|
||||||
|
|data, world| match bincode::deserialize::<(Entity, PhantomData<C>)>(&data) {
|
||||||
|
Ok((entity, _)) => {
|
||||||
|
if let Some((local_entity, _)) = world
|
||||||
|
.query::<(Entity, &ServerEntity)>()
|
||||||
|
.iter(world)
|
||||||
|
.find(|(_, server_entity)| server_entity.0 == entity)
|
||||||
|
{
|
||||||
|
world.entity_mut(local_entity).remove::<C>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => println!("Failed to deserialize packet: {}", type_name::<C>()),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,103 +0,0 @@
|
||||||
use crate::{tcp::Connection, Packet};
|
|
||||||
use bevy::prelude::*;
|
|
||||||
use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc};
|
|
||||||
|
|
||||||
pub mod sync;
|
|
||||||
|
|
||||||
/// A function that handle a received [Packet] on the client.
|
|
||||||
pub type ClientPacketHandler = Box<dyn Fn(Vec<u8>, &mut World) + Send + Sync>;
|
|
||||||
|
|
||||||
/// A Bevy resource that store the packets handlers for the client.
|
|
||||||
#[derive(Resource)]
|
|
||||||
pub struct ClientHandlerManager(Arc<HashMap<u64, ClientPacketHandler>>);
|
|
||||||
|
|
||||||
/// A connection to a remote server.
|
|
||||||
#[derive(Resource)]
|
|
||||||
pub struct ServerConnection(Connection);
|
|
||||||
|
|
||||||
impl ServerConnection {
|
|
||||||
/// Connects to a remote server.
|
|
||||||
pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
|
|
||||||
Ok(Self(Connection::connect(addr)?))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sends a packet through this connection.
|
|
||||||
pub fn send<P: Packet>(&self, packet: &P) {
|
|
||||||
let mut data = bincode::serialize(packet).expect("Failed to serialize packet");
|
|
||||||
data.extend(P::packet_id().to_be_bytes());
|
|
||||||
self.0.send(data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A plugin that manage the network connections for a server.
|
|
||||||
pub struct ClientPlugin;
|
|
||||||
|
|
||||||
impl ClientPlugin {
|
|
||||||
/// Handles a received [Packet] on the server.
|
|
||||||
pub fn handle_packets(world: &mut World) {
|
|
||||||
// Get all received packets
|
|
||||||
let mut packets = Vec::new();
|
|
||||||
if let Some(connection) = world.get_resource::<ServerConnection>() {
|
|
||||||
while let Some(mut packet) = connection.0.recv() {
|
|
||||||
if packet.len() < 8 {
|
|
||||||
println!("Invalid packet received: {:?}", packet);
|
|
||||||
} else {
|
|
||||||
let id_buffer = packet.split_off(packet.len() - 8);
|
|
||||||
let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap());
|
|
||||||
packets.push((packet_id, packet));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get the packet handlers
|
|
||||||
let handlers = Arc::clone(&world.resource_mut::<ClientHandlerManager>().0);
|
|
||||||
|
|
||||||
// Handle all received packets
|
|
||||||
for (packet_id, packet) in packets {
|
|
||||||
if let Some(handler) = handlers.get(&packet_id) {
|
|
||||||
handler(packet, world);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Remove [ServerConnection] if it's disconnected.
|
|
||||||
pub fn remove_disconnected(mut commands: Commands, connection: Option<Res<ServerConnection>>) {
|
|
||||||
if let Some(connection) = connection {
|
|
||||||
if connection.0.closed() {
|
|
||||||
commands.remove_resource::<ServerConnection>();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Plugin for ClientPlugin {
|
|
||||||
fn build(&self, app: &mut App) {
|
|
||||||
app.insert_resource(ClientHandlerManager(Arc::new(HashMap::new())));
|
|
||||||
app.add_system(ClientPlugin::handle_packets);
|
|
||||||
app.add_system(ClientPlugin::remove_disconnected);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// An extension to add packet handlers.
|
|
||||||
pub trait ClientAppExt {
|
|
||||||
/// Add a new packet handler.
|
|
||||||
fn add_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
|
|
||||||
where
|
|
||||||
P: Packet,
|
|
||||||
H: Fn(Vec<u8>, &mut World) + Send + Sync + 'static;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ClientAppExt for App {
|
|
||||||
fn add_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
|
|
||||||
where
|
|
||||||
P: Packet,
|
|
||||||
H: Fn(Vec<u8>, &mut World) + Send + Sync + 'static,
|
|
||||||
{
|
|
||||||
Arc::get_mut(&mut self.world.resource_mut::<ClientHandlerManager>().0)
|
|
||||||
.unwrap()
|
|
||||||
.insert(P::packet_id(), Box::new(handler));
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,127 +0,0 @@
|
||||||
use std::{any::type_name, marker::PhantomData};
|
|
||||||
|
|
||||||
use super::{ClientAppExt, ServerConnection};
|
|
||||||
use crate::Packet;
|
|
||||||
use bevy::prelude::*;
|
|
||||||
use serde::{de::DeserializeOwned, Serialize};
|
|
||||||
|
|
||||||
/// An event that comes from the server.
|
|
||||||
#[derive(Deref)]
|
|
||||||
pub struct FromServer<E: Event + Packet> {
|
|
||||||
/// The event.
|
|
||||||
pub event: E,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Mark an [Entity] as synced by the server.
|
|
||||||
#[derive(Component)]
|
|
||||||
pub struct ServerEntity(Entity);
|
|
||||||
|
|
||||||
/// A plugin for the syncronization system.
|
|
||||||
pub struct ClientSyncPlugin;
|
|
||||||
|
|
||||||
impl ClientSyncPlugin {
|
|
||||||
/// Removes [ServerEntity] when disconnected.
|
|
||||||
fn remove_synced(mut commands: Commands, entities: Query<Entity, With<ServerEntity>>) {
|
|
||||||
for entity in entities.iter() {
|
|
||||||
commands.entity(entity).despawn();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Plugin for ClientSyncPlugin {
|
|
||||||
fn build(&self, app: &mut App) {
|
|
||||||
app.add_packet_handler::<Entity, _>(|data, world| {
|
|
||||||
match bincode::deserialize::<Entity>(&data) {
|
|
||||||
Ok(entity) => {
|
|
||||||
if let Some((local_entity, _)) = world
|
|
||||||
.query::<(Entity, &ServerEntity)>()
|
|
||||||
.iter(world)
|
|
||||||
.find(|(_, server_entity)| server_entity.0 == entity)
|
|
||||||
{
|
|
||||||
world.despawn(local_entity);
|
|
||||||
} else {
|
|
||||||
world.spawn(ServerEntity(entity));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(_) => println!("Failed to deserialize packet: {}", type_name::<Entity>()),
|
|
||||||
}
|
|
||||||
});
|
|
||||||
app.add_system(Self::remove_synced.run_if(resource_removed::<ServerConnection>()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// An extention to add syncronization to the client.
|
|
||||||
pub trait ClientSyncExt {
|
|
||||||
/// Register syncronization for an [Event] that comes from the server.
|
|
||||||
fn server_event_sync<E: Event + Packet>(&mut self) -> &mut Self;
|
|
||||||
|
|
||||||
/// Register syncronization for an [Event] that can be sent by the client.
|
|
||||||
fn client_event_sync<E: Event + Packet>(&mut self) -> &mut Self;
|
|
||||||
|
|
||||||
/// Register a [Component] to be synced.
|
|
||||||
fn sync_component<C: Component + DeserializeOwned + Serialize>(&mut self) -> &mut Self;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ClientSyncExt for App {
|
|
||||||
fn server_event_sync<E: Event + Packet>(&mut self) -> &mut Self {
|
|
||||||
self.add_event::<FromServer<E>>()
|
|
||||||
.add_packet_handler::<E, _>(|data, world| match bincode::deserialize::<E>(&data) {
|
|
||||||
Ok(event) => world.send_event(FromServer { event }),
|
|
||||||
Err(_) => println!("Failed to deserialize packet: {}", type_name::<E>()),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn client_event_sync<E: Event + Packet>(&mut self) -> &mut Self {
|
|
||||||
self.add_event::<E>().add_system(
|
|
||||||
|mut events: EventReader<E>, connection: Option<Res<ServerConnection>>| {
|
|
||||||
if let Some(connection) = connection {
|
|
||||||
for event in events.iter() {
|
|
||||||
connection.send(event);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn sync_component<C: Component + DeserializeOwned + Serialize>(&mut self) -> &mut Self {
|
|
||||||
self.add_packet_handler::<(Entity, C), _>(|data, world| {
|
|
||||||
match bincode::deserialize::<(Entity, C)>(&data) {
|
|
||||||
Ok((entity, component)) => {
|
|
||||||
if let Some((local_entity, _)) = world
|
|
||||||
.query::<(Entity, &ServerEntity)>()
|
|
||||||
.iter(world)
|
|
||||||
.find(|(_, server_entity)| server_entity.0 == entity)
|
|
||||||
{
|
|
||||||
let mut local_entity = world.entity_mut(local_entity);
|
|
||||||
match local_entity.get_mut::<C>() {
|
|
||||||
Some(mut local_component) => {
|
|
||||||
println!("CA CHANGE: {}", type_name::<C>());
|
|
||||||
*local_component = component;
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
local_entity.insert(component);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
println!("Received component for unknown entity: {:?}", entity);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(_) => println!("Failed to deserialize packet: {}", type_name::<C>()),
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.add_packet_handler::<(Entity, PhantomData<C>), _>(
|
|
||||||
|data, world| match bincode::deserialize::<(Entity, PhantomData<C>)>(&data) {
|
|
||||||
Ok((entity, _)) => {
|
|
||||||
if let Some((local_entity, _)) = world
|
|
||||||
.query::<(Entity, &ServerEntity)>()
|
|
||||||
.iter(world)
|
|
||||||
.find(|(_, server_entity)| server_entity.0 == entity)
|
|
||||||
{
|
|
||||||
world.entity_mut(local_entity).remove::<C>();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(_) => println!("Failed to deserialize packet: {}", type_name::<C>()),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
200
src/lib.rs
200
src/lib.rs
|
@ -4,8 +4,12 @@ use std::{
|
||||||
hash::{Hash, Hasher},
|
hash::{Hash, Hasher},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#[cfg(feature = "client")]
|
||||||
pub mod client;
|
pub mod client;
|
||||||
|
|
||||||
|
#[cfg(feature = "server")]
|
||||||
pub mod server;
|
pub mod server;
|
||||||
|
|
||||||
mod tcp;
|
mod tcp;
|
||||||
|
|
||||||
/// A packet that can be sent over a [Connection].
|
/// A packet that can be sent over a [Connection].
|
||||||
|
@ -24,199 +28,3 @@ pub trait Packet: DeserializeOwned + Serialize + Send + Sync {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: DeserializeOwned + Serialize + Send + Sync> Packet for T {}
|
impl<T: DeserializeOwned + Serialize + Send + Sync> Packet for T {}
|
||||||
|
|
||||||
/* use bevy::{prelude::*, utils::HashMap};
|
|
||||||
use std::{
|
|
||||||
io::{self, ErrorKind},
|
|
||||||
net::{TcpStream, ToSocketAddrs},
|
|
||||||
sync::{mpsc::TryRecvError, Arc},
|
|
||||||
};
|
|
||||||
use tcp::{Connection, Listener, Packet, RawPacket};
|
|
||||||
|
|
||||||
mod tcp;
|
|
||||||
|
|
||||||
/// A Bevy resource that store the packets handlers for the client.
|
|
||||||
#[derive(Resource)]
|
|
||||||
pub struct ClientPacketHandler(Arc<HashMap<u64, Box<dyn Fn(RawPacket, &mut World) + Send + Sync>>>);
|
|
||||||
|
|
||||||
/// A connection to a remote server.
|
|
||||||
#[derive(Resource)]
|
|
||||||
pub struct ServerConnection(Connection);
|
|
||||||
|
|
||||||
impl ServerConnection {
|
|
||||||
/// Connects to a remote server.
|
|
||||||
pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
|
|
||||||
Ok(Self(Connection::new(TcpStream::connect(addr)?)?))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Send a [Packet] to the remote server.
|
|
||||||
pub fn send<P: Packet>(&self, packet: P) {
|
|
||||||
self.0.send(packet).ok();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A Bevy resource that store the packets handlers for the server.
|
|
||||||
#[derive(Resource)]
|
|
||||||
struct ServerPacketHandler(
|
|
||||||
Arc<HashMap<u64, Box<dyn Fn(Entity, ClientConnection, RawPacket, &mut World) + Send + Sync>>>,
|
|
||||||
);
|
|
||||||
|
|
||||||
/// A [ClientConnection] listener.
|
|
||||||
#[derive(Resource)]
|
|
||||||
pub struct ClientListener(Listener);
|
|
||||||
|
|
||||||
impl ClientListener {
|
|
||||||
/// Creates a new TCP listener on the given address.
|
|
||||||
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
|
|
||||||
Ok(Self(Listener::bind(addr)?))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A connection to a remote client.
|
|
||||||
#[derive(Component)]
|
|
||||||
pub struct ClientConnection(Arc<Connection>);
|
|
||||||
|
|
||||||
impl ClientConnection {
|
|
||||||
/// Sends a [Packet] to the remote client.
|
|
||||||
pub fn send<P: Packet>(&self, packet: P) {
|
|
||||||
self.0.send(packet).ok();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A Bevy system to handle incoming [ClientConnection]s and remove the
|
|
||||||
/// [ClientListener] resource if it is no longer listening.
|
|
||||||
fn accept_connections(mut commands: Commands, listener: Option<Res<ClientListener>>) {
|
|
||||||
if let Some(listener) = listener {
|
|
||||||
match listener.0.accept() {
|
|
||||||
Ok(connection) => {
|
|
||||||
commands.spawn(ClientConnection(Arc::new(connection)));
|
|
||||||
}
|
|
||||||
Err(error) => match error.kind() {
|
|
||||||
ErrorKind::WouldBlock => {}
|
|
||||||
_ => commands.remove_resource::<ClientListener>(),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A Bevy system to handle incoming [Packet]s from
|
|
||||||
/// the [ClientConnection]s and the [ServerConnection].
|
|
||||||
/// It removes them if they are no longer connected.
|
|
||||||
fn receive_packets(world: &mut World) {
|
|
||||||
// Handle client packets
|
|
||||||
let mut packets = Vec::new();
|
|
||||||
let mut to_remove = false;
|
|
||||||
if let Some(connection) = world.get_resource::<ServerConnection>() {
|
|
||||||
if let Some(receiver) = connection.0.recv() {
|
|
||||||
loop {
|
|
||||||
match receiver.try_recv() {
|
|
||||||
Ok(packet) => packets.push(packet),
|
|
||||||
Err(TryRecvError::Empty) => break,
|
|
||||||
Err(TryRecvError::Disconnected) => {
|
|
||||||
to_remove = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if let Some(handlers) = world
|
|
||||||
.get_resource_mut::<ClientPacketHandler>()
|
|
||||||
.map(|handlers| Arc::clone(&handlers.0))
|
|
||||||
{
|
|
||||||
for packet in packets.into_iter() {
|
|
||||||
if let Some(handler) = handlers.get(&packet.packet_id()) {
|
|
||||||
(handler)(packet, world);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if to_remove {
|
|
||||||
world.remove_resource::<ServerConnection>();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handle server packets
|
|
||||||
let mut packets = Vec::new();
|
|
||||||
let mut to_remove = Vec::new();
|
|
||||||
for (entity, connection) in world.query::<(Entity, &ClientConnection)>().iter(world) {
|
|
||||||
if let Some(receiver) = connection.0.recv() {
|
|
||||||
loop {
|
|
||||||
match receiver.try_recv() {
|
|
||||||
Ok(packet) => {
|
|
||||||
println!("YESSSS");
|
|
||||||
packets.push((entity, ClientConnection(Arc::clone(&connection.0)), packet));
|
|
||||||
}
|
|
||||||
Err(TryRecvError::Empty) => break,
|
|
||||||
Err(TryRecvError::Disconnected) => {
|
|
||||||
to_remove.push(entity);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if let Some(handlers) = world
|
|
||||||
.get_resource_mut::<ServerPacketHandler>()
|
|
||||||
.map(|handlers| Arc::clone(&handlers.0))
|
|
||||||
{
|
|
||||||
for (entity, connection, packet) in packets.into_iter() {
|
|
||||||
if let Some(handler) = handlers.get(&packet.packet_id()) {
|
|
||||||
(handler)(entity, connection, packet, world);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for entity in to_remove {
|
|
||||||
world.despawn(entity);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A network plugin.
|
|
||||||
pub struct NetworkPlugin;
|
|
||||||
|
|
||||||
impl Plugin for NetworkPlugin {
|
|
||||||
fn build(&self, app: &mut App) {
|
|
||||||
app.insert_resource(ServerPacketHandler(Arc::new(HashMap::new())));
|
|
||||||
app.insert_resource(ClientPacketHandler(Arc::new(HashMap::new())));
|
|
||||||
app.add_system(accept_connections);
|
|
||||||
app.add_system(receive_packets);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A extension trait to add packet handler.
|
|
||||||
pub trait NetworkAppExt {
|
|
||||||
/// Add a client packet handler.
|
|
||||||
fn add_client_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
|
|
||||||
where
|
|
||||||
P: Packet,
|
|
||||||
H: Fn(RawPacket, &mut World) + Send + Sync + 'static;
|
|
||||||
|
|
||||||
/// Add a server packet handler.
|
|
||||||
fn add_server_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
|
|
||||||
where
|
|
||||||
P: Packet,
|
|
||||||
H: Fn(Entity, ClientConnection, RawPacket, &mut World) + Send + Sync + 'static;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl NetworkAppExt for App {
|
|
||||||
fn add_client_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
|
|
||||||
where
|
|
||||||
P: Packet,
|
|
||||||
H: Fn(RawPacket, &mut World) + Send + Sync + 'static,
|
|
||||||
{
|
|
||||||
Arc::get_mut(&mut self.world.resource_mut::<ClientPacketHandler>().0)
|
|
||||||
.unwrap()
|
|
||||||
.insert(P::packet_id(), Box::new(handler));
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
fn add_server_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
|
|
||||||
where
|
|
||||||
P: Packet,
|
|
||||||
H: Fn(Entity, ClientConnection, RawPacket, &mut World) + Send + Sync + 'static,
|
|
||||||
{
|
|
||||||
Arc::get_mut(&mut self.world.resource_mut::<ServerPacketHandler>().0)
|
|
||||||
.unwrap()
|
|
||||||
.insert(P::packet_id(), Box::new(handler));
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
283
src/server.rs
Normal file
283
src/server.rs
Normal file
|
@ -0,0 +1,283 @@
|
||||||
|
use crate::{tcp, Packet};
|
||||||
|
use bevy::prelude::*;
|
||||||
|
use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc};
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
use ::{
|
||||||
|
serde::{de::DeserializeOwned, Serialize},
|
||||||
|
std::{any::type_name, marker::PhantomData, ops::Deref},
|
||||||
|
};
|
||||||
|
|
||||||
|
/// A trait for a handler function.
|
||||||
|
pub trait PacketHandlerFn:
|
||||||
|
Fn(Entity, ClientConnection, Vec<u8>, &mut World) + Send + Sync + 'static
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Fn(Entity, ClientConnection, Vec<u8>, &mut World) + Send + Sync + 'static> PacketHandlerFn
|
||||||
|
for T
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A function that handle a received [Packet]s.
|
||||||
|
type PacketHandler = Box<dyn PacketHandlerFn>;
|
||||||
|
|
||||||
|
/// A Bevy resource that store the packets handlers.
|
||||||
|
#[derive(Resource)]
|
||||||
|
struct HandlerManager(Arc<HashMap<u64, PacketHandler>>);
|
||||||
|
|
||||||
|
/// A Bevy resource that listens for incoming [Connection]s.
|
||||||
|
#[derive(Resource)]
|
||||||
|
pub struct Listener(tcp::Listener);
|
||||||
|
|
||||||
|
impl Listener {
|
||||||
|
/// Creates a new [Listener] on the given address.
|
||||||
|
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
|
||||||
|
Ok(Self(tcp::Listener::bind(addr)?))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A [Connection] to a remote client.
|
||||||
|
#[derive(Component)]
|
||||||
|
pub struct ClientConnection(Arc<tcp::Connection>);
|
||||||
|
|
||||||
|
impl ClientConnection {
|
||||||
|
/// Sends a packet through this connection.
|
||||||
|
pub fn send<P: Packet>(&self, packet: &P) {
|
||||||
|
let mut data = bincode::serialize(packet).expect("Failed to serialize packet");
|
||||||
|
data.extend(P::packet_id().to_be_bytes());
|
||||||
|
self.0.send(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
/// An event that comes from a client.
|
||||||
|
pub struct FromClient<E: Event + Packet> {
|
||||||
|
/// The entity of the [Connection] that sent the event.
|
||||||
|
pub entity: Entity,
|
||||||
|
|
||||||
|
/// The [Connection] that sent the event.
|
||||||
|
pub connection: ClientConnection,
|
||||||
|
|
||||||
|
/// The event.
|
||||||
|
pub event: E,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
impl<E: Event + Packet> Deref for FromClient<E> {
|
||||||
|
type Target = E;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.event
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
/// Mark an [Entity] to be synced.
|
||||||
|
#[derive(Component)]
|
||||||
|
pub struct Synced;
|
||||||
|
|
||||||
|
/// A plugin that manage the network [Connection]s.
|
||||||
|
pub struct ServerPlugin;
|
||||||
|
|
||||||
|
impl ServerPlugin {
|
||||||
|
/// Accept new [Connection]s.
|
||||||
|
fn accept_connections(mut commands: Commands, listener: Option<Res<Listener>>) {
|
||||||
|
if let Some(listener) = listener {
|
||||||
|
if let Some(connection) = listener.0.accept() {
|
||||||
|
commands.spawn(ClientConnection(Arc::new(connection)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handles a received [Packet]s.
|
||||||
|
fn handle_packets(world: &mut World) {
|
||||||
|
// Get all received packets
|
||||||
|
let mut packets = Vec::new();
|
||||||
|
for (entity, connection) in world.query::<(Entity, &ClientConnection)>().iter(world) {
|
||||||
|
while let Some(mut packet) = connection.0.recv() {
|
||||||
|
if packet.len() < 8 {
|
||||||
|
println!("Invalid packet received: {:?}", packet);
|
||||||
|
} else {
|
||||||
|
let id_buffer = packet.split_off(packet.len() - 8);
|
||||||
|
let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap());
|
||||||
|
packets.push((
|
||||||
|
entity,
|
||||||
|
ClientConnection(Arc::clone(&connection.0)),
|
||||||
|
packet_id,
|
||||||
|
packet,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the packet handlers
|
||||||
|
let handlers = Arc::clone(&world.resource_mut::<HandlerManager>().0);
|
||||||
|
|
||||||
|
// Handle all received packets
|
||||||
|
for (entity, connection, packet_id, packet) in packets {
|
||||||
|
if let Some(handler) = handlers.get(&packet_id) {
|
||||||
|
handler(entity, connection, packet, world);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove disconnected [Connection]s.
|
||||||
|
fn remove_disconnected(
|
||||||
|
mut commands: Commands,
|
||||||
|
connections: Query<(Entity, &ClientConnection)>,
|
||||||
|
) {
|
||||||
|
for (entity, connection) in connections.iter() {
|
||||||
|
if connection.0.closed() {
|
||||||
|
commands.entity(entity).remove::<ClientConnection>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
/// Send to clients the [Synced] entity that has been added to the server.
|
||||||
|
fn send_added(
|
||||||
|
added_entities: Query<Entity, Added<Synced>>,
|
||||||
|
connections: Query<&ClientConnection>,
|
||||||
|
) {
|
||||||
|
for entity in added_entities.iter() {
|
||||||
|
for connection in connections.iter() {
|
||||||
|
connection.send(&entity);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
/// Send [Synced] entities to new clients.
|
||||||
|
fn send_synced(
|
||||||
|
synced_entities: Query<Entity, With<Synced>>,
|
||||||
|
new_connections: Query<&ClientConnection, Added<ClientConnection>>,
|
||||||
|
) {
|
||||||
|
for entity in synced_entities.iter() {
|
||||||
|
for connection in new_connections.iter() {
|
||||||
|
connection.send(&entity);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
/// Send to clients the [Synced] entity that has been removed.
|
||||||
|
fn send_removed(
|
||||||
|
mut removed_entities: RemovedComponents<Synced>,
|
||||||
|
connections: Query<&ClientConnection>,
|
||||||
|
) {
|
||||||
|
for entity in removed_entities.iter() {
|
||||||
|
for connection in connections.iter() {
|
||||||
|
connection.send(&entity);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Plugin for ServerPlugin {
|
||||||
|
fn build(&self, app: &mut App) {
|
||||||
|
app.insert_resource(HandlerManager(Arc::new(HashMap::new())));
|
||||||
|
app.add_system(Self::handle_packets);
|
||||||
|
app.add_system(Self::remove_disconnected);
|
||||||
|
app.add_system(Self::accept_connections);
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
{
|
||||||
|
app.add_system(Self::send_added);
|
||||||
|
app.add_system(Self::send_synced);
|
||||||
|
app.add_system(Self::send_removed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An extension to add packet handlers.
|
||||||
|
pub trait NetworkExt {
|
||||||
|
/// Add a new packet handler.
|
||||||
|
fn add_packet_handler<P: Packet, H: PacketHandlerFn>(&mut self, handler: H) -> &mut Self;
|
||||||
|
|
||||||
|
#[cfg(all(feature = "sync"))]
|
||||||
|
/// Register syncronization for an [Event] that can be sent by the server.
|
||||||
|
fn sync_server_event<E: Event + Packet>(&mut self) -> &mut Self;
|
||||||
|
|
||||||
|
#[cfg(all(feature = "sync"))]
|
||||||
|
/// Register syncronization for an [Event] that comes from the client.
|
||||||
|
fn sync_client_event<E: Event + Packet>(&mut self) -> &mut Self;
|
||||||
|
|
||||||
|
#[cfg(all(feature = "sync"))]
|
||||||
|
/// Register a [Component] to be synced.
|
||||||
|
fn sync_component<C: Component + DeserializeOwned + Serialize + Clone>(&mut self) -> &mut Self;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NetworkExt for App {
|
||||||
|
fn add_packet_handler<P: Packet, H: PacketHandlerFn>(&mut self, handler: H) -> &mut Self {
|
||||||
|
Arc::get_mut(&mut self.world.resource_mut::<HandlerManager>().0)
|
||||||
|
.unwrap()
|
||||||
|
.insert(P::packet_id(), Box::new(handler));
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
fn sync_server_event<E: Event + Packet>(&mut self) -> &mut Self {
|
||||||
|
self.add_event::<E>().add_system(
|
||||||
|
|mut events: EventReader<E>, connections: Query<&ClientConnection>| {
|
||||||
|
for event in events.iter() {
|
||||||
|
for connection in connections.iter() {
|
||||||
|
connection.send(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
fn sync_client_event<E: Event + Packet>(&mut self) -> &mut Self {
|
||||||
|
self.add_event::<FromClient<E>>()
|
||||||
|
.add_packet_handler::<E, _>(
|
||||||
|
|entity, connection, data, world| match bincode::deserialize::<E>(&data) {
|
||||||
|
Ok(event) => world.send_event(FromClient {
|
||||||
|
entity,
|
||||||
|
connection,
|
||||||
|
event,
|
||||||
|
}),
|
||||||
|
Err(_) => println!("Failed to deserialize packet: {}", type_name::<E>()),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
fn sync_component<C: Component + DeserializeOwned + Serialize + Clone>(&mut self) -> &mut Self {
|
||||||
|
let update_components =
|
||||||
|
|changed_components: Query<(Entity, &C), (Changed<C>, With<Synced>)>,
|
||||||
|
connections: Query<&ClientConnection>| {
|
||||||
|
for (entity, component) in changed_components.iter() {
|
||||||
|
for connection in connections.iter() {
|
||||||
|
connection.send(&(entity, component.clone()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let send_components =
|
||||||
|
|components: Query<(Entity, &C), With<Synced>>,
|
||||||
|
new_connections: Query<&ClientConnection, Added<ClientConnection>>| {
|
||||||
|
for (entity, component) in components.iter() {
|
||||||
|
for connection in new_connections.iter() {
|
||||||
|
connection.send(&(entity, component.clone()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let remove_components =
|
||||||
|
|mut removed_components: RemovedComponents<C>,
|
||||||
|
synced_entities: Query<Entity, With<Synced>>,
|
||||||
|
connections: Query<&ClientConnection>| {
|
||||||
|
for entity in removed_components.iter() {
|
||||||
|
if synced_entities.contains(entity) {
|
||||||
|
for connection in connections.iter() {
|
||||||
|
connection.send(&(entity, PhantomData::<C>));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
self.add_system(update_components.after(ServerPlugin::send_added))
|
||||||
|
.add_system(send_components.after(ServerPlugin::send_synced))
|
||||||
|
.add_system(remove_components.after(update_components))
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,129 +0,0 @@
|
||||||
use crate::{
|
|
||||||
tcp::{Connection, Listener},
|
|
||||||
Packet,
|
|
||||||
};
|
|
||||||
use bevy::prelude::*;
|
|
||||||
use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc};
|
|
||||||
|
|
||||||
pub mod sync;
|
|
||||||
|
|
||||||
/// A function that handle a received [Packet] on the server.
|
|
||||||
pub type ServerPacketHandler =
|
|
||||||
Box<dyn Fn(Entity, ClientConnection, Vec<u8>, &mut World) + Send + Sync>;
|
|
||||||
|
|
||||||
/// A Bevy resource that store the packets handlers for the server.
|
|
||||||
#[derive(Resource)]
|
|
||||||
pub struct ServerHandlerManager(Arc<HashMap<u64, ServerPacketHandler>>);
|
|
||||||
|
|
||||||
/// A Bevy resource that listens for incoming [ClientConnection]s.
|
|
||||||
#[derive(Resource)]
|
|
||||||
pub struct ClientListener(Listener);
|
|
||||||
|
|
||||||
impl ClientListener {
|
|
||||||
/// Creates a new listener on the given address.
|
|
||||||
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
|
|
||||||
Ok(Self(Listener::bind(addr)?))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A connection to a remote client.
|
|
||||||
#[derive(Component)]
|
|
||||||
pub struct ClientConnection(Arc<Connection>);
|
|
||||||
|
|
||||||
impl ClientConnection {
|
|
||||||
/// Sends a packet through this connection.
|
|
||||||
pub fn send<P: Packet>(&self, packet: &P) {
|
|
||||||
let mut data = bincode::serialize(packet).expect("Failed to serialize packet");
|
|
||||||
data.extend(P::packet_id().to_be_bytes());
|
|
||||||
self.0.send(data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A plugin that manage the network connections for a server.
|
|
||||||
pub struct ServerPlugin;
|
|
||||||
|
|
||||||
impl ServerPlugin {
|
|
||||||
/// Accept new [ClientConnection]s.
|
|
||||||
pub fn accept_connections(mut commands: Commands, listener: Option<Res<ClientListener>>) {
|
|
||||||
if let Some(listener) = listener {
|
|
||||||
if let Some(connection) = listener.0.accept() {
|
|
||||||
commands.spawn(ClientConnection(Arc::new(connection)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Handles a received [Packet] on the server.
|
|
||||||
pub fn handle_packets(world: &mut World) {
|
|
||||||
// Get all received packets
|
|
||||||
let mut packets = Vec::new();
|
|
||||||
for (entity, connection) in world.query::<(Entity, &ClientConnection)>().iter(world) {
|
|
||||||
while let Some(mut packet) = connection.0.recv() {
|
|
||||||
if packet.len() < 8 {
|
|
||||||
println!("Invalid packet received: {:?}", packet);
|
|
||||||
} else {
|
|
||||||
let id_buffer = packet.split_off(packet.len() - 8);
|
|
||||||
let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap());
|
|
||||||
packets.push((
|
|
||||||
entity,
|
|
||||||
ClientConnection(Arc::clone(&connection.0)),
|
|
||||||
packet_id,
|
|
||||||
packet,
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get the packet handlers
|
|
||||||
let handlers = Arc::clone(&world.resource_mut::<ServerHandlerManager>().0);
|
|
||||||
|
|
||||||
// Handle all received packets
|
|
||||||
for (entity, connection, packet_id, packet) in packets {
|
|
||||||
if let Some(handler) = handlers.get(&packet_id) {
|
|
||||||
handler(entity, connection, packet, world);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Remove disconnected [ClientConnection]s.
|
|
||||||
pub fn remove_disconnected(
|
|
||||||
mut commands: Commands,
|
|
||||||
connections: Query<(Entity, &ClientConnection)>,
|
|
||||||
) {
|
|
||||||
for (entity, connection) in connections.iter() {
|
|
||||||
if connection.0.closed() {
|
|
||||||
commands.entity(entity).remove::<ClientConnection>();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Plugin for ServerPlugin {
|
|
||||||
fn build(&self, app: &mut App) {
|
|
||||||
app.insert_resource(ServerHandlerManager(Arc::new(HashMap::new())));
|
|
||||||
app.add_system(ServerPlugin::accept_connections);
|
|
||||||
app.add_system(ServerPlugin::handle_packets);
|
|
||||||
app.add_system(ServerPlugin::remove_disconnected);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// An extension to add packet handlers.
|
|
||||||
pub trait ServerAppExt {
|
|
||||||
/// Add a new packet handler.
|
|
||||||
fn add_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
|
|
||||||
where
|
|
||||||
P: Packet,
|
|
||||||
H: Fn(Entity, ClientConnection, Vec<u8>, &mut World) + Send + Sync + 'static;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ServerAppExt for App {
|
|
||||||
fn add_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
|
|
||||||
where
|
|
||||||
P: Packet,
|
|
||||||
H: Fn(Entity, ClientConnection, Vec<u8>, &mut World) + Send + Sync + 'static,
|
|
||||||
{
|
|
||||||
Arc::get_mut(&mut self.world.resource_mut::<ServerHandlerManager>().0)
|
|
||||||
.unwrap()
|
|
||||||
.insert(P::packet_id(), Box::new(handler));
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,154 +0,0 @@
|
||||||
use super::ServerAppExt;
|
|
||||||
use crate::{server::ClientConnection, Packet};
|
|
||||||
use bevy::prelude::*;
|
|
||||||
use serde::{de::DeserializeOwned, Serialize};
|
|
||||||
use std::{any::type_name, marker::PhantomData, ops::Deref};
|
|
||||||
|
|
||||||
/// An event that comes from a client.
|
|
||||||
pub struct FromClient<E: Event + Packet> {
|
|
||||||
/// The entity of the [ClientConnection] that sent the event.
|
|
||||||
pub entity: Entity,
|
|
||||||
|
|
||||||
/// The [ClientConnection] that sent the event.
|
|
||||||
pub connection: ClientConnection,
|
|
||||||
|
|
||||||
/// The event.
|
|
||||||
pub event: E,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<E: Event + Packet> Deref for FromClient<E> {
|
|
||||||
type Target = E;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
&self.event
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Mark an [Entity] to be synced.
|
|
||||||
#[derive(Component)]
|
|
||||||
pub struct Synced;
|
|
||||||
|
|
||||||
/// A plugin for the syncronization system.
|
|
||||||
pub struct ServerSyncPlugin;
|
|
||||||
|
|
||||||
impl ServerSyncPlugin {
|
|
||||||
/// Send to clients the [Synced] entity that has been added to the server.
|
|
||||||
fn send_added(
|
|
||||||
added_entities: Query<Entity, Added<Synced>>,
|
|
||||||
connections: Query<&ClientConnection>,
|
|
||||||
) {
|
|
||||||
for entity in added_entities.iter() {
|
|
||||||
for connection in connections.iter() {
|
|
||||||
connection.send(&entity);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Send [Synced] entities to new clients.
|
|
||||||
fn send_synced(
|
|
||||||
synced_entities: Query<Entity, With<Synced>>,
|
|
||||||
new_connections: Query<&ClientConnection, Added<ClientConnection>>,
|
|
||||||
) {
|
|
||||||
for entity in synced_entities.iter() {
|
|
||||||
for connection in new_connections.iter() {
|
|
||||||
connection.send(&entity);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Send to clients the [Synced] entity that has been removed.
|
|
||||||
fn send_removed(
|
|
||||||
mut removed_entities: RemovedComponents<Synced>,
|
|
||||||
connections: Query<&ClientConnection>,
|
|
||||||
) {
|
|
||||||
for entity in removed_entities.iter() {
|
|
||||||
for connection in connections.iter() {
|
|
||||||
connection.send(&entity);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Plugin for ServerSyncPlugin {
|
|
||||||
fn build(&self, app: &mut App) {
|
|
||||||
app.add_system(Self::send_added);
|
|
||||||
app.add_system(Self::send_synced);
|
|
||||||
app.add_system(Self::send_removed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// An extention to add syncronization to the server.
|
|
||||||
pub trait ServerSyncExt {
|
|
||||||
/// Register syncronization for an [Event] that can be sent by the server.
|
|
||||||
fn server_event_sync<E: Event + Packet>(&mut self) -> &mut Self;
|
|
||||||
|
|
||||||
/// Register syncronization for an [Event] that comes from the client.
|
|
||||||
fn client_event_sync<E: Event + Packet>(&mut self) -> &mut Self;
|
|
||||||
|
|
||||||
/// Register a [Component] to be synced.
|
|
||||||
fn sync_component<C: Component + DeserializeOwned + Serialize + Clone>(&mut self) -> &mut Self;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ServerSyncExt for App {
|
|
||||||
fn server_event_sync<E: Event + Packet>(&mut self) -> &mut Self {
|
|
||||||
self.add_event::<E>().add_system(
|
|
||||||
|mut events: EventReader<E>, connections: Query<&ClientConnection>| {
|
|
||||||
for event in events.iter() {
|
|
||||||
for connection in connections.iter() {
|
|
||||||
connection.send(event);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn client_event_sync<E: Event + Packet>(&mut self) -> &mut Self {
|
|
||||||
self.add_event::<FromClient<E>>()
|
|
||||||
.add_packet_handler::<E, _>(
|
|
||||||
|entity, connection, data, world| match bincode::deserialize::<E>(&data) {
|
|
||||||
Ok(event) => world.send_event(FromClient {
|
|
||||||
entity,
|
|
||||||
connection,
|
|
||||||
event,
|
|
||||||
}),
|
|
||||||
Err(_) => println!("Failed to deserialize packet: {}", type_name::<E>()),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn sync_component<C: Component + DeserializeOwned + Serialize + Clone>(&mut self) -> &mut Self {
|
|
||||||
let update_components =
|
|
||||||
|changed_components: Query<(Entity, &C), (Changed<C>, With<Synced>)>,
|
|
||||||
connections: Query<&ClientConnection>| {
|
|
||||||
for (entity, component) in changed_components.iter() {
|
|
||||||
for connection in connections.iter() {
|
|
||||||
connection.send(&(entity, component.clone()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let send_components =
|
|
||||||
|components: Query<(Entity, &C), With<Synced>>,
|
|
||||||
new_connections: Query<&ClientConnection, Added<ClientConnection>>| {
|
|
||||||
for (entity, component) in components.iter() {
|
|
||||||
for connection in new_connections.iter() {
|
|
||||||
connection.send(&(entity, component.clone()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let remove_components =
|
|
||||||
|mut removed_components: RemovedComponents<C>,
|
|
||||||
synced_entities: Query<Entity, With<Synced>>,
|
|
||||||
connections: Query<&ClientConnection>| {
|
|
||||||
for entity in removed_components.iter() {
|
|
||||||
if synced_entities.contains(entity) {
|
|
||||||
for connection in connections.iter() {
|
|
||||||
connection.send(&(entity, PhantomData::<C>));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
self.add_system(update_components.after(ServerSyncPlugin::send_added))
|
|
||||||
.add_system(send_components.after(ServerSyncPlugin::send_synced))
|
|
||||||
.add_system(remove_components.after(update_components))
|
|
||||||
}
|
|
||||||
}
|
|
16
src/tcp.rs
16
src/tcp.rs
|
@ -1,6 +1,6 @@
|
||||||
use std::{
|
use std::{
|
||||||
io::{self, Read, Write},
|
io::{self, Read, Write},
|
||||||
net::{Shutdown, TcpListener, TcpStream, ToSocketAddrs},
|
net::{Shutdown, TcpStream, ToSocketAddrs},
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicBool, Ordering},
|
atomic::{AtomicBool, Ordering},
|
||||||
mpsc::{channel, Receiver, Sender},
|
mpsc::{channel, Receiver, Sender},
|
||||||
|
@ -9,6 +9,9 @@ use std::{
|
||||||
thread,
|
thread,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#[cfg(feature = "server")]
|
||||||
|
use std::net::TcpListener;
|
||||||
|
|
||||||
/// A non-blocking TCP connection.
|
/// A non-blocking TCP connection.
|
||||||
pub struct Connection {
|
pub struct Connection {
|
||||||
/// Track if the connection has been closed.
|
/// Track if the connection has been closed.
|
||||||
|
@ -51,6 +54,7 @@ impl Connection {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "client")]
|
||||||
/// Creates a new connection to the given address.
|
/// Creates a new connection to the given address.
|
||||||
pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
|
pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
|
||||||
Self::new(TcpStream::connect(addr)?)
|
Self::new(TcpStream::connect(addr)?)
|
||||||
|
@ -82,13 +86,7 @@ impl Connection {
|
||||||
|
|
||||||
/// The sending loop for this connection.
|
/// The sending loop for this connection.
|
||||||
fn sending_loop(mut stream: TcpStream, receiver: Receiver<Vec<u8>>, closed: Arc<AtomicBool>) {
|
fn sending_loop(mut stream: TcpStream, receiver: Receiver<Vec<u8>>, closed: Arc<AtomicBool>) {
|
||||||
loop {
|
while let Ok(packet) = receiver.recv() {
|
||||||
// Get the next packet to send
|
|
||||||
let packet = match receiver.recv() {
|
|
||||||
Ok(packet) => packet,
|
|
||||||
Err(_) => break,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Send the length of the packet
|
// Send the length of the packet
|
||||||
let len_buffer = u32::to_be_bytes(packet.len() as u32);
|
let len_buffer = u32::to_be_bytes(packet.len() as u32);
|
||||||
if stream.write_all(&len_buffer).is_err() {
|
if stream.write_all(&len_buffer).is_err() {
|
||||||
|
@ -128,12 +126,14 @@ impl Drop for Connection {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "server")]
|
||||||
/// A [Connection] listener.
|
/// A [Connection] listener.
|
||||||
pub struct Listener {
|
pub struct Listener {
|
||||||
/// The [TcpListener] of the listener.
|
/// The [TcpListener] of the listener.
|
||||||
listener: TcpListener,
|
listener: TcpListener,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "server")]
|
||||||
impl Listener {
|
impl Listener {
|
||||||
/// Creates a new TCP listener on the given address.
|
/// Creates a new TCP listener on the given address.
|
||||||
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
|
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
|
||||||
|
|
Loading…
Reference in a new issue