Add sync and refactor

This commit is contained in:
Tipragot 2023-04-29 14:23:24 +02:00
parent a69973b3fa
commit f849aab7d6
5 changed files with 281 additions and 551 deletions

View file

@ -1,103 +0,0 @@
use crate::{tcp::Connection, Packet};
use bevy::prelude::*;
use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc};
pub mod sync;
/// A function that handle a received [Packet] on the client.
pub type ClientPacketHandler = Box<dyn Fn(Vec<u8>, &mut World) + Send + Sync>;
/// A Bevy resource that store the packets handlers for the client.
#[derive(Resource)]
pub struct ClientHandlerManager(Arc<HashMap<u64, ClientPacketHandler>>);
/// A connection to a remote server.
#[derive(Resource)]
pub struct ServerConnection(Connection);
impl ServerConnection {
/// Connects to a remote server.
pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
Ok(Self(Connection::connect(addr)?))
}
/// Sends a packet through this connection.
pub fn send<P: Packet>(&self, packet: &P) {
let mut data = bincode::serialize(packet).expect("Failed to serialize packet");
data.extend(P::packet_id().to_be_bytes());
self.0.send(data);
}
}
/// A plugin that manage the network connections for a server.
pub struct ClientPlugin;
impl ClientPlugin {
/// Handles a received [Packet] on the server.
pub fn handle_packets(world: &mut World) {
// Get all received packets
let mut packets = Vec::new();
if let Some(connection) = world.get_resource::<ServerConnection>() {
while let Some(mut packet) = connection.0.recv() {
if packet.len() < 8 {
println!("Invalid packet received: {:?}", packet);
} else {
let id_buffer = packet.split_off(packet.len() - 8);
let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap());
packets.push((packet_id, packet));
}
}
} else {
return;
}
// Get the packet handlers
let handlers = Arc::clone(&world.resource_mut::<ClientHandlerManager>().0);
// Handle all received packets
for (packet_id, packet) in packets {
if let Some(handler) = handlers.get(&packet_id) {
handler(packet, world);
}
}
}
/// Remove [ServerConnection] if it's disconnected.
pub fn remove_disconnected(mut commands: Commands, connection: Option<Res<ServerConnection>>) {
if let Some(connection) = connection {
if connection.0.closed() {
commands.remove_resource::<ServerConnection>();
}
}
}
}
impl Plugin for ClientPlugin {
fn build(&self, app: &mut App) {
app.insert_resource(ClientHandlerManager(Arc::new(HashMap::new())));
app.add_system(ClientPlugin::handle_packets);
app.add_system(ClientPlugin::remove_disconnected);
}
}
/// An extension to add packet handlers.
pub trait ClientAppExt {
/// Add a new packet handler.
fn add_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
where
P: Packet,
H: Fn(Vec<u8>, &mut World) + Send + Sync + 'static;
}
impl ClientAppExt for App {
fn add_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
where
P: Packet,
H: Fn(Vec<u8>, &mut World) + Send + Sync + 'static,
{
Arc::get_mut(&mut self.world.resource_mut::<ClientHandlerManager>().0)
.unwrap()
.insert(P::packet_id(), Box::new(handler));
self
}
}

View file

@ -1,127 +0,0 @@
use std::{any::type_name, marker::PhantomData};
use super::{ClientAppExt, ServerConnection};
use crate::Packet;
use bevy::prelude::*;
use serde::{de::DeserializeOwned, Serialize};
/// An event that comes from the server.
#[derive(Deref)]
pub struct FromServer<E: Event + Packet> {
/// The event.
pub event: E,
}
/// Mark an [Entity] as synced by the server.
#[derive(Component)]
pub struct ServerEntity(Entity);
/// A plugin for the syncronization system.
pub struct ClientSyncPlugin;
impl ClientSyncPlugin {
/// Removes [ServerEntity] when disconnected.
fn remove_synced(mut commands: Commands, entities: Query<Entity, With<ServerEntity>>) {
for entity in entities.iter() {
commands.entity(entity).despawn();
}
}
}
impl Plugin for ClientSyncPlugin {
fn build(&self, app: &mut App) {
app.add_packet_handler::<Entity, _>(|data, world| {
match bincode::deserialize::<Entity>(&data) {
Ok(entity) => {
if let Some((local_entity, _)) = world
.query::<(Entity, &ServerEntity)>()
.iter(world)
.find(|(_, server_entity)| server_entity.0 == entity)
{
world.despawn(local_entity);
} else {
world.spawn(ServerEntity(entity));
}
}
Err(_) => println!("Failed to deserialize packet: {}", type_name::<Entity>()),
}
});
app.add_system(Self::remove_synced.run_if(resource_removed::<ServerConnection>()));
}
}
/// An extention to add syncronization to the client.
pub trait ClientSyncExt {
/// Register syncronization for an [Event] that comes from the server.
fn server_event_sync<E: Event + Packet>(&mut self) -> &mut Self;
/// Register syncronization for an [Event] that can be sent by the client.
fn client_event_sync<E: Event + Packet>(&mut self) -> &mut Self;
/// Register a [Component] to be synced.
fn sync_component<C: Component + DeserializeOwned + Serialize>(&mut self) -> &mut Self;
}
impl ClientSyncExt for App {
fn server_event_sync<E: Event + Packet>(&mut self) -> &mut Self {
self.add_event::<FromServer<E>>()
.add_packet_handler::<E, _>(|data, world| match bincode::deserialize::<E>(&data) {
Ok(event) => world.send_event(FromServer { event }),
Err(_) => println!("Failed to deserialize packet: {}", type_name::<E>()),
})
}
fn client_event_sync<E: Event + Packet>(&mut self) -> &mut Self {
self.add_event::<E>().add_system(
|mut events: EventReader<E>, connection: Option<Res<ServerConnection>>| {
if let Some(connection) = connection {
for event in events.iter() {
connection.send(event);
}
}
},
)
}
fn sync_component<C: Component + DeserializeOwned + Serialize>(&mut self) -> &mut Self {
self.add_packet_handler::<(Entity, C), _>(|data, world| {
match bincode::deserialize::<(Entity, C)>(&data) {
Ok((entity, component)) => {
if let Some((local_entity, _)) = world
.query::<(Entity, &ServerEntity)>()
.iter(world)
.find(|(_, server_entity)| server_entity.0 == entity)
{
let mut local_entity = world.entity_mut(local_entity);
match local_entity.get_mut::<C>() {
Some(mut local_component) => {
println!("CA CHANGE: {}", type_name::<C>());
*local_component = component;
}
None => {
local_entity.insert(component);
}
}
} else {
println!("Received component for unknown entity: {:?}", entity);
}
}
Err(_) => println!("Failed to deserialize packet: {}", type_name::<C>()),
}
})
.add_packet_handler::<(Entity, PhantomData<C>), _>(
|data, world| match bincode::deserialize::<(Entity, PhantomData<C>)>(&data) {
Ok((entity, _)) => {
if let Some((local_entity, _)) = world
.query::<(Entity, &ServerEntity)>()
.iter(world)
.find(|(_, server_entity)| server_entity.0 == entity)
{
world.entity_mut(local_entity).remove::<C>();
}
}
Err(_) => println!("Failed to deserialize packet: {}", type_name::<C>()),
},
)
}
}

View file

@ -6,6 +6,12 @@ use std::{
};
use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc};
#[cfg(feature = "sync")]
use std::{any::type_name, marker::PhantomData};
#[cfg(all(feature = "sync", feature = "server"))]
use std::ops::Deref;
mod tcp;
/// A packet that can be sent over a [Connection].
@ -26,12 +32,24 @@ pub trait Packet: DeserializeOwned + Serialize + Send + Sync {
impl<T: DeserializeOwned + Serialize + Send + Sync> Packet for T {}
#[cfg(feature = "server")]
/// A function that handle a received [Packet]s.
pub type PacketHandler = Box<dyn Fn(Entity, Connection, Vec<u8>, &mut World) + Send + Sync>;
/// A trait for a handler function.
pub trait PacketHandlerFn:
Fn(Entity, Connection, Vec<u8>, &mut World) + Send + Sync + 'static
{
}
#[cfg(feature = "server")]
impl<T: Fn(Entity, Connection, Vec<u8>, &mut World) + Send + Sync + 'static> PacketHandlerFn for T {}
#[cfg(feature = "client")]
/// A trait for a handler function.
pub trait PacketHandlerFn: Fn(Vec<u8>, &mut World) + Send + Sync + 'static {}
#[cfg(feature = "client")]
impl<T: Fn(Vec<u8>, &mut World) + Send + Sync + 'static> PacketHandlerFn for T {}
/// A function that handle a received [Packet]s.
pub type PacketHandler = Box<dyn Fn(Vec<u8>, &mut World) + Send + Sync>;
pub type PacketHandler = Box<dyn PacketHandlerFn>;
/// A Bevy resource that store the packets handlers.
#[derive(Resource)]
@ -44,19 +62,19 @@ pub struct Listener(tcp::Listener);
#[cfg(feature = "server")]
impl Listener {
/// Creates a new listener on the given address.
/// Creates a new [Listener] on the given address.
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
Ok(Self(tcp::Listener::bind(addr)?))
}
}
#[cfg(feature = "server")]
/// A connection to a remote client.
/// A [Connection] to a remote client.
#[derive(Component)]
pub struct Connection(Arc<tcp::Connection>);
#[cfg(feature = "client")]
/// A connection to a remote server.
/// A [Connection] to a remote server.
#[derive(Resource)]
pub struct Connection(tcp::Connection);
@ -75,7 +93,47 @@ impl Connection {
}
}
/// A plugin that manage the network connections.
#[cfg(feature = "server")]
/// An event that comes from a client.
pub struct PacketEvent<E: Event + Packet> {
/// The entity of the [Connection] that sent the event.
pub entity: Entity,
/// The [Connection] that sent the event.
pub connection: Connection,
/// The event.
pub event: E,
}
#[cfg(all(feature = "sync", feature = "server"))]
impl<E: Event + Packet> Deref for PacketEvent<E> {
type Target = E;
fn deref(&self) -> &Self::Target {
&self.event
}
}
#[cfg(all(feature = "sync", feature = "client"))]
/// An event that comes from the server.
#[derive(Deref)]
pub struct PacketEvent<E: Event + Packet> {
/// The event.
pub event: E,
}
#[cfg(all(feature = "sync", feature = "server"))]
/// Mark an [Entity] to be synced.
#[derive(Component)]
pub struct Synced;
#[cfg(all(feature = "sync", feature = "client"))]
/// Mark an [Entity] as synced by the server.
#[derive(Component)]
pub struct Synced(Entity);
/// A plugin that manage the network [Connection]s.
pub struct NetworkPlugin;
impl NetworkPlugin {
@ -163,7 +221,7 @@ impl NetworkPlugin {
}
#[cfg(feature = "client")]
/// Remove [ServerConnection] if it's disconnected.
/// Remove [Connection] if it's disconnected.
pub fn remove_disconnected(mut commands: Commands, connection: Option<Res<Connection>>) {
if let Some(connection) = connection {
if connection.0.closed() {
@ -171,58 +229,243 @@ impl NetworkPlugin {
}
}
}
#[cfg(all(feature = "sync", feature = "server"))]
/// Send to clients the [Synced] entity that has been added to the server.
fn send_added(added_entities: Query<Entity, Added<Synced>>, connections: Query<&Connection>) {
for entity in added_entities.iter() {
for connection in connections.iter() {
connection.send(&entity);
}
}
}
#[cfg(all(feature = "sync", feature = "server"))]
/// Send [Synced] entities to new clients.
fn send_synced(
synced_entities: Query<Entity, With<Synced>>,
new_connections: Query<&Connection, Added<Connection>>,
) {
for entity in synced_entities.iter() {
for connection in new_connections.iter() {
connection.send(&entity);
}
}
}
#[cfg(all(feature = "sync", feature = "server"))]
/// Send to clients the [Synced] entity that has been removed.
fn send_removed(
mut removed_entities: RemovedComponents<Synced>,
connections: Query<&Connection>,
) {
for entity in removed_entities.iter() {
for connection in connections.iter() {
connection.send(&entity);
}
}
}
#[cfg(all(feature = "sync", feature = "client"))]
/// Removes [ServerEntity] when disconnected.
fn remove_synced(mut commands: Commands, entities: Query<Entity, With<Synced>>) {
for entity in entities.iter() {
commands.entity(entity).despawn();
}
}
}
impl Plugin for NetworkPlugin {
fn build(&self, app: &mut App) {
app.insert_resource(HandlerManager(Arc::new(HashMap::new())));
app.add_system(NetworkPlugin::handle_packets);
app.add_system(NetworkPlugin::remove_disconnected);
app.add_system(Self::handle_packets);
app.add_system(Self::remove_disconnected);
#[cfg(feature = "server")]
app.add_system(NetworkPlugin::accept_connections);
app.add_system(Self::accept_connections);
#[cfg(all(feature = "sync", feature = "server"))]
{
app.add_system(Self::send_added);
app.add_system(Self::send_synced);
app.add_system(Self::send_removed);
}
#[cfg(all(feature = "sync", feature = "client"))]
{
app.add_packet_handler::<Entity, _>(|data, world| {
match bincode::deserialize::<Entity>(&data) {
Ok(entity) => {
if let Some((local_entity, _)) = world
.query::<(Entity, &Synced)>()
.iter(world)
.find(|(_, server_entity)| server_entity.0 == entity)
{
world.despawn(local_entity);
} else {
world.spawn(Synced(entity));
}
}
Err(_) => println!("Failed to deserialize packet: {}", type_name::<Entity>()),
}
});
app.add_system(Self::remove_synced.run_if(resource_removed::<Connection>()));
}
}
}
/// An extension to add packet handlers.
pub trait NetworkExt {
#[cfg(feature = "server")]
/// Add a new packet handler.
fn add_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
where
P: Packet,
H: Fn(Entity, Connection, Vec<u8>, &mut World) + Send + Sync + 'static;
fn add_packet_handler<P: Packet, H: PacketHandlerFn>(&mut self, handler: H) -> &mut Self;
#[cfg(feature = "client")]
/// Add a new packet handler.
fn add_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
where
P: Packet,
H: Fn(Vec<u8>, &mut World) + Send + Sync + 'static;
#[cfg(all(feature = "sync"))]
/// Register syncronization for an [Event] that can be sent by the server.
fn sync_server_event<E: Event + Packet>(&mut self) -> &mut Self;
#[cfg(all(feature = "sync"))]
/// Register syncronization for an [Event] that comes from the client.
fn sync_client_event<E: Event + Packet>(&mut self) -> &mut Self;
#[cfg(all(feature = "sync"))]
/// Register a [Component] to be synced.
fn sync_component<C: Component + DeserializeOwned + Serialize + Clone>(&mut self) -> &mut Self;
}
impl NetworkExt for App {
#[cfg(feature = "server")]
fn add_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
where
P: Packet,
H: Fn(Entity, Connection, Vec<u8>, &mut World) + Send + Sync + 'static,
{
fn add_packet_handler<P: Packet, H: PacketHandlerFn>(&mut self, handler: H) -> &mut Self {
Arc::get_mut(&mut self.world.resource_mut::<HandlerManager>().0)
.unwrap()
.insert(P::packet_id(), Box::new(handler));
self
}
#[cfg(feature = "client")]
fn add_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
where
P: Packet,
H: Fn(Vec<u8>, &mut World) + Send + Sync + 'static,
{
Arc::get_mut(&mut self.world.resource_mut::<HandlerManager>().0)
.unwrap()
.insert(P::packet_id(), Box::new(handler));
self
#[cfg(all(feature = "sync", feature = "server"))]
fn sync_server_event<E: Event + Packet>(&mut self) -> &mut Self {
self.add_event::<E>().add_system(
|mut events: EventReader<E>, connections: Query<&Connection>| {
for event in events.iter() {
for connection in connections.iter() {
connection.send(event);
}
}
},
)
}
#[cfg(all(feature = "sync", feature = "client"))]
fn sync_server_event<E: Event + Packet>(&mut self) -> &mut Self {
self.add_event::<PacketEvent<E>>()
.add_packet_handler::<E, _>(|data, world| match bincode::deserialize::<E>(&data) {
Ok(event) => world.send_event(PacketEvent { event }),
Err(_) => println!("Failed to deserialize packet: {}", type_name::<E>()),
})
}
#[cfg(all(feature = "sync", feature = "server"))]
fn sync_client_event<E: Event + Packet>(&mut self) -> &mut Self {
self.add_event::<PacketEvent<E>>()
.add_packet_handler::<E, _>(
|entity, connection, data, world| match bincode::deserialize::<E>(&data) {
Ok(event) => world.send_event(PacketEvent {
entity,
connection,
event,
}),
Err(_) => println!("Failed to deserialize packet: {}", type_name::<E>()),
},
)
}
#[cfg(all(feature = "sync", feature = "client"))]
fn sync_client_event<E: Event + Packet>(&mut self) -> &mut Self {
self.add_event::<E>().add_system(
|mut events: EventReader<E>, connection: Option<Res<Connection>>| {
if let Some(connection) = connection {
for event in events.iter() {
connection.send(event);
}
}
},
)
}
#[cfg(all(feature = "sync", feature = "server"))]
fn sync_component<C: Component + DeserializeOwned + Serialize + Clone>(&mut self) -> &mut Self {
let update_components =
|changed_components: Query<(Entity, &C), (Changed<C>, With<Synced>)>,
connections: Query<&Connection>| {
for (entity, component) in changed_components.iter() {
for connection in connections.iter() {
connection.send(&(entity, component.clone()));
}
}
};
let send_components =
|components: Query<(Entity, &C), With<Synced>>,
new_connections: Query<&Connection, Added<Connection>>| {
for (entity, component) in components.iter() {
for connection in new_connections.iter() {
connection.send(&(entity, component.clone()));
}
}
};
let remove_components = |mut removed_components: RemovedComponents<C>,
synced_entities: Query<Entity, With<Synced>>,
connections: Query<&Connection>| {
for entity in removed_components.iter() {
if synced_entities.contains(entity) {
for connection in connections.iter() {
connection.send(&(entity, PhantomData::<C>));
}
}
}
};
self.add_system(update_components.after(NetworkPlugin::send_added))
.add_system(send_components.after(NetworkPlugin::send_synced))
.add_system(remove_components.after(update_components))
}
#[cfg(all(feature = "sync", feature = "client"))]
fn sync_component<C: Component + DeserializeOwned + Serialize>(&mut self) -> &mut Self {
self.add_packet_handler::<(Entity, C), _>(|data, world| {
match bincode::deserialize::<(Entity, C)>(&data) {
Ok((entity, component)) => {
if let Some((local_entity, _)) = world
.query::<(Entity, &Synced)>()
.iter(world)
.find(|(_, server_entity)| server_entity.0 == entity)
{
let mut local_entity = world.entity_mut(local_entity);
match local_entity.get_mut::<C>() {
Some(mut local_component) => {
println!("CA CHANGE: {}", type_name::<C>());
*local_component = component;
}
None => {
local_entity.insert(component);
}
}
} else {
println!("Received component for unknown entity: {:?}", entity);
}
}
Err(_) => println!("Failed to deserialize packet: {}", type_name::<C>()),
}
})
.add_packet_handler::<(Entity, PhantomData<C>), _>(
|data, world| match bincode::deserialize::<(Entity, PhantomData<C>)>(&data) {
Ok((entity, _)) => {
if let Some((local_entity, _)) = world
.query::<(Entity, &Synced)>()
.iter(world)
.find(|(_, server_entity)| server_entity.0 == entity)
{
world.entity_mut(local_entity).remove::<C>();
}
}
Err(_) => println!("Failed to deserialize packet: {}", type_name::<C>()),
},
)
}
}

View file

@ -1,129 +0,0 @@
use crate::{
tcp::{Connection, Listener},
Packet,
};
use bevy::prelude::*;
use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc};
pub mod sync;
/// A function that handle a received [Packet] on the server.
pub type ServerPacketHandler =
Box<dyn Fn(Entity, ClientConnection, Vec<u8>, &mut World) + Send + Sync>;
/// A Bevy resource that store the packets handlers for the server.
#[derive(Resource)]
pub struct ServerHandlerManager(Arc<HashMap<u64, ServerPacketHandler>>);
/// A Bevy resource that listens for incoming [ClientConnection]s.
#[derive(Resource)]
pub struct ClientListener(Listener);
impl ClientListener {
/// Creates a new listener on the given address.
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
Ok(Self(Listener::bind(addr)?))
}
}
/// A connection to a remote client.
#[derive(Component)]
pub struct ClientConnection(Arc<Connection>);
impl ClientConnection {
/// Sends a packet through this connection.
pub fn send<P: Packet>(&self, packet: &P) {
let mut data = bincode::serialize(packet).expect("Failed to serialize packet");
data.extend(P::packet_id().to_be_bytes());
self.0.send(data);
}
}
/// A plugin that manage the network connections for a server.
pub struct ServerPlugin;
impl ServerPlugin {
/// Accept new [ClientConnection]s.
pub fn accept_connections(mut commands: Commands, listener: Option<Res<ClientListener>>) {
if let Some(listener) = listener {
if let Some(connection) = listener.0.accept() {
commands.spawn(ClientConnection(Arc::new(connection)));
}
}
}
/// Handles a received [Packet] on the server.
pub fn handle_packets(world: &mut World) {
// Get all received packets
let mut packets = Vec::new();
for (entity, connection) in world.query::<(Entity, &ClientConnection)>().iter(world) {
while let Some(mut packet) = connection.0.recv() {
if packet.len() < 8 {
println!("Invalid packet received: {:?}", packet);
} else {
let id_buffer = packet.split_off(packet.len() - 8);
let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap());
packets.push((
entity,
ClientConnection(Arc::clone(&connection.0)),
packet_id,
packet,
));
}
}
}
// Get the packet handlers
let handlers = Arc::clone(&world.resource_mut::<ServerHandlerManager>().0);
// Handle all received packets
for (entity, connection, packet_id, packet) in packets {
if let Some(handler) = handlers.get(&packet_id) {
handler(entity, connection, packet, world);
}
}
}
/// Remove disconnected [ClientConnection]s.
pub fn remove_disconnected(
mut commands: Commands,
connections: Query<(Entity, &ClientConnection)>,
) {
for (entity, connection) in connections.iter() {
if connection.0.closed() {
commands.entity(entity).remove::<ClientConnection>();
}
}
}
}
impl Plugin for ServerPlugin {
fn build(&self, app: &mut App) {
app.insert_resource(ServerHandlerManager(Arc::new(HashMap::new())));
app.add_system(ServerPlugin::accept_connections);
app.add_system(ServerPlugin::handle_packets);
app.add_system(ServerPlugin::remove_disconnected);
}
}
/// An extension to add packet handlers.
pub trait ServerAppExt {
/// Add a new packet handler.
fn add_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
where
P: Packet,
H: Fn(Entity, ClientConnection, Vec<u8>, &mut World) + Send + Sync + 'static;
}
impl ServerAppExt for App {
fn add_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
where
P: Packet,
H: Fn(Entity, ClientConnection, Vec<u8>, &mut World) + Send + Sync + 'static,
{
Arc::get_mut(&mut self.world.resource_mut::<ServerHandlerManager>().0)
.unwrap()
.insert(P::packet_id(), Box::new(handler));
self
}
}

View file

@ -1,154 +0,0 @@
use super::ServerAppExt;
use crate::{server::ClientConnection, Packet};
use bevy::prelude::*;
use serde::{de::DeserializeOwned, Serialize};
use std::{any::type_name, marker::PhantomData, ops::Deref};
/// An event that comes from a client.
pub struct FromClient<E: Event + Packet> {
/// The entity of the [ClientConnection] that sent the event.
pub entity: Entity,
/// The [ClientConnection] that sent the event.
pub connection: ClientConnection,
/// The event.
pub event: E,
}
impl<E: Event + Packet> Deref for FromClient<E> {
type Target = E;
fn deref(&self) -> &Self::Target {
&self.event
}
}
/// Mark an [Entity] to be synced.
#[derive(Component)]
pub struct Synced;
/// A plugin for the syncronization system.
pub struct ServerSyncPlugin;
impl ServerSyncPlugin {
/// Send to clients the [Synced] entity that has been added to the server.
fn send_added(
added_entities: Query<Entity, Added<Synced>>,
connections: Query<&ClientConnection>,
) {
for entity in added_entities.iter() {
for connection in connections.iter() {
connection.send(&entity);
}
}
}
/// Send [Synced] entities to new clients.
fn send_synced(
synced_entities: Query<Entity, With<Synced>>,
new_connections: Query<&ClientConnection, Added<ClientConnection>>,
) {
for entity in synced_entities.iter() {
for connection in new_connections.iter() {
connection.send(&entity);
}
}
}
/// Send to clients the [Synced] entity that has been removed.
fn send_removed(
mut removed_entities: RemovedComponents<Synced>,
connections: Query<&ClientConnection>,
) {
for entity in removed_entities.iter() {
for connection in connections.iter() {
connection.send(&entity);
}
}
}
}
impl Plugin for ServerSyncPlugin {
fn build(&self, app: &mut App) {
app.add_system(Self::send_added);
app.add_system(Self::send_synced);
app.add_system(Self::send_removed);
}
}
/// An extention to add syncronization to the server.
pub trait ServerSyncExt {
/// Register syncronization for an [Event] that can be sent by the server.
fn server_event_sync<E: Event + Packet>(&mut self) -> &mut Self;
/// Register syncronization for an [Event] that comes from the client.
fn client_event_sync<E: Event + Packet>(&mut self) -> &mut Self;
/// Register a [Component] to be synced.
fn sync_component<C: Component + DeserializeOwned + Serialize + Clone>(&mut self) -> &mut Self;
}
impl ServerSyncExt for App {
fn server_event_sync<E: Event + Packet>(&mut self) -> &mut Self {
self.add_event::<E>().add_system(
|mut events: EventReader<E>, connections: Query<&ClientConnection>| {
for event in events.iter() {
for connection in connections.iter() {
connection.send(event);
}
}
},
)
}
fn client_event_sync<E: Event + Packet>(&mut self) -> &mut Self {
self.add_event::<FromClient<E>>()
.add_packet_handler::<E, _>(
|entity, connection, data, world| match bincode::deserialize::<E>(&data) {
Ok(event) => world.send_event(FromClient {
entity,
connection,
event,
}),
Err(_) => println!("Failed to deserialize packet: {}", type_name::<E>()),
},
)
}
fn sync_component<C: Component + DeserializeOwned + Serialize + Clone>(&mut self) -> &mut Self {
let update_components =
|changed_components: Query<(Entity, &C), (Changed<C>, With<Synced>)>,
connections: Query<&ClientConnection>| {
for (entity, component) in changed_components.iter() {
for connection in connections.iter() {
connection.send(&(entity, component.clone()));
}
}
};
let send_components =
|components: Query<(Entity, &C), With<Synced>>,
new_connections: Query<&ClientConnection, Added<ClientConnection>>| {
for (entity, component) in components.iter() {
for connection in new_connections.iter() {
connection.send(&(entity, component.clone()));
}
}
};
let remove_components =
|mut removed_components: RemovedComponents<C>,
synced_entities: Query<Entity, With<Synced>>,
connections: Query<&ClientConnection>| {
for entity in removed_components.iter() {
if synced_entities.contains(entity) {
for connection in connections.iter() {
connection.send(&(entity, PhantomData::<C>));
}
}
}
};
self.add_system(update_components.after(ServerSyncPlugin::send_added))
.add_system(send_components.after(ServerSyncPlugin::send_synced))
.add_system(remove_components.after(update_components))
}
}