Merge branch 'remake' into 'main'
Remake the system to add synchronisation See merge request tipragot/bevnet!3
This commit is contained in:
commit
6ec3b0e509
|
@ -1,99 +0,0 @@
|
|||
use bevnet::{
|
||||
impl_packet, AppClientNetwork, AppServerNetwork, ClientConnection, ClientListener,
|
||||
ClientNetworkPlugin, PacketEvent, ServerConnection, ServerNetworkPlugin,
|
||||
};
|
||||
use bevy::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// A ping packet.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct PingPacket;
|
||||
impl_packet!(PingPacket);
|
||||
|
||||
/// A pong packet.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct PongPacket;
|
||||
impl_packet!(PongPacket);
|
||||
|
||||
/// Launch the server.
|
||||
fn start_server(mut commands: Commands, keys: Res<Input<KeyCode>>) {
|
||||
if keys.just_pressed(KeyCode::B) {
|
||||
println!("Starting server...");
|
||||
match ClientListener::bind("127.0.0.1:8000") {
|
||||
Ok(listener) => {
|
||||
println!("Listening on {}", listener.address());
|
||||
commands.insert_resource(listener);
|
||||
}
|
||||
Err(e) => println!("Failed to bind: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Show when a client connects.
|
||||
fn client_connected(connection: Query<(Entity, &ClientConnection), Added<ClientConnection>>) {
|
||||
for (entity, connection) in connection.iter() {
|
||||
println!("Client connected: {} as {:?}", connection.address(), entity);
|
||||
}
|
||||
}
|
||||
|
||||
/// Show when a client disconnects.
|
||||
fn client_disconnected(mut removed: RemovedComponents<ClientConnection>) {
|
||||
for entity in removed.iter() {
|
||||
println!("Client disconnected: {:?}", entity);
|
||||
}
|
||||
}
|
||||
|
||||
/// Receive the ping packets.
|
||||
fn receive_ping(mut events: EventReader<PacketEvent<PingPacket>>) {
|
||||
for PacketEvent { connection, .. } in events.iter() {
|
||||
println!("Received ping from {}", connection.address());
|
||||
connection.send(PongPacket);
|
||||
println!("Response sent!");
|
||||
}
|
||||
}
|
||||
|
||||
/// Connect the client to the server.
|
||||
fn connect(mut commands: Commands, keys: Res<Input<KeyCode>>) {
|
||||
if keys.just_pressed(KeyCode::C) {
|
||||
println!("Connecting...");
|
||||
match ServerConnection::connect("127.0.0.1:8000") {
|
||||
Ok(connection) => {
|
||||
println!("Connected to {}", connection.address());
|
||||
commands.insert_resource(connection);
|
||||
}
|
||||
Err(e) => println!("Failed to connect: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Receive the pong packets.
|
||||
fn receive_pong(mut events: EventReader<PongPacket>) {
|
||||
for _ in events.iter() {
|
||||
println!("Received pong!");
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a ping packet to the server.
|
||||
fn send_ping(keys: Res<Input<KeyCode>>, connection: Res<ServerConnection>) {
|
||||
if keys.just_pressed(KeyCode::P) {
|
||||
connection.send(PingPacket);
|
||||
println!("Ping sent!");
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
App::new()
|
||||
.add_plugins(DefaultPlugins)
|
||||
.add_plugin(ServerNetworkPlugin)
|
||||
.add_system(start_server)
|
||||
.add_system(client_connected)
|
||||
.add_system(client_disconnected)
|
||||
.add_system(receive_ping)
|
||||
.register_server_packet::<PingPacket>()
|
||||
.add_plugin(ClientNetworkPlugin)
|
||||
.add_system(connect)
|
||||
.add_system(send_ping.run_if(resource_exists::<ServerConnection>()))
|
||||
.add_system(receive_pong)
|
||||
.register_client_packet::<PongPacket>()
|
||||
.run();
|
||||
}
|
103
src/client/mod.rs
Normal file
103
src/client/mod.rs
Normal file
|
@ -0,0 +1,103 @@
|
|||
use crate::{tcp::Connection, Packet};
|
||||
use bevy::prelude::*;
|
||||
use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc};
|
||||
|
||||
pub mod sync;
|
||||
|
||||
/// A function that handle a received [Packet] on the client.
|
||||
pub type ClientPacketHandler = Box<dyn Fn(Vec<u8>, &mut World) + Send + Sync>;
|
||||
|
||||
/// A Bevy resource that store the packets handlers for the client.
|
||||
#[derive(Resource)]
|
||||
pub struct ClientHandlerManager(Arc<HashMap<u64, ClientPacketHandler>>);
|
||||
|
||||
/// A connection to a remote server.
|
||||
#[derive(Resource)]
|
||||
pub struct ServerConnection(Connection);
|
||||
|
||||
impl ServerConnection {
|
||||
/// Connects to a remote server.
|
||||
pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
|
||||
Ok(Self(Connection::connect(addr)?))
|
||||
}
|
||||
|
||||
/// Sends a packet through this connection.
|
||||
pub fn send<P: Packet>(&self, packet: &P) {
|
||||
let mut data = bincode::serialize(packet).expect("Failed to serialize packet");
|
||||
data.extend(P::packet_id().to_be_bytes());
|
||||
self.0.send(data);
|
||||
}
|
||||
}
|
||||
|
||||
/// A plugin that manage the network connections for a server.
|
||||
pub struct ClientPlugin;
|
||||
|
||||
impl ClientPlugin {
|
||||
/// Handles a received [Packet] on the server.
|
||||
pub fn handle_packets(world: &mut World) {
|
||||
// Get all received packets
|
||||
let mut packets = Vec::new();
|
||||
if let Some(connection) = world.get_resource::<ServerConnection>() {
|
||||
while let Some(mut packet) = connection.0.recv() {
|
||||
if packet.len() < 8 {
|
||||
println!("Invalid packet received: {:?}", packet);
|
||||
} else {
|
||||
let id_buffer = packet.split_off(packet.len() - 8);
|
||||
let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap());
|
||||
packets.push((packet_id, packet));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
|
||||
// Get the packet handlers
|
||||
let handlers = Arc::clone(&world.resource_mut::<ClientHandlerManager>().0);
|
||||
|
||||
// Handle all received packets
|
||||
for (packet_id, packet) in packets {
|
||||
if let Some(handler) = handlers.get(&packet_id) {
|
||||
handler(packet, world);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove [ServerConnection] if it's disconnected.
|
||||
pub fn remove_disconnected(mut commands: Commands, connection: Option<Res<ServerConnection>>) {
|
||||
if let Some(connection) = connection {
|
||||
if connection.0.closed() {
|
||||
commands.remove_resource::<ServerConnection>();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Plugin for ClientPlugin {
|
||||
fn build(&self, app: &mut App) {
|
||||
app.insert_resource(ClientHandlerManager(Arc::new(HashMap::new())));
|
||||
app.add_system(ClientPlugin::handle_packets);
|
||||
app.add_system(ClientPlugin::remove_disconnected);
|
||||
}
|
||||
}
|
||||
|
||||
/// An extension to add packet handlers.
|
||||
pub trait ClientAppExt {
|
||||
/// Add a new packet handler.
|
||||
fn add_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
|
||||
where
|
||||
P: Packet,
|
||||
H: Fn(Vec<u8>, &mut World) + Send + Sync + 'static;
|
||||
}
|
||||
|
||||
impl ClientAppExt for App {
|
||||
fn add_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
|
||||
where
|
||||
P: Packet,
|
||||
H: Fn(Vec<u8>, &mut World) + Send + Sync + 'static,
|
||||
{
|
||||
Arc::get_mut(&mut self.world.resource_mut::<ClientHandlerManager>().0)
|
||||
.unwrap()
|
||||
.insert(P::packet_id(), Box::new(handler));
|
||||
self
|
||||
}
|
||||
}
|
129
src/client/sync.rs
Normal file
129
src/client/sync.rs
Normal file
|
@ -0,0 +1,129 @@
|
|||
use std::{any::type_name, marker::PhantomData};
|
||||
|
||||
use super::{ClientAppExt, ServerConnection};
|
||||
use crate::Packet;
|
||||
use bevy::prelude::*;
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
|
||||
/// An event that comes from the server.
|
||||
#[derive(Deref)]
|
||||
pub struct FromServer<E: Event + Packet> {
|
||||
/// The event.
|
||||
pub event: E,
|
||||
}
|
||||
|
||||
/// Mark an [Entity] as synced by the server.
|
||||
#[derive(Component)]
|
||||
pub struct ServerEntity(Entity);
|
||||
|
||||
/// A plugin for the syncronization system.
|
||||
pub struct ClientSyncPlugin;
|
||||
|
||||
impl ClientSyncPlugin {
|
||||
/// Removes [ServerEntity] when disconnected.
|
||||
fn remove_synced(mut commands: Commands, entities: Query<Entity, With<ServerEntity>>) {
|
||||
for entity in entities.iter() {
|
||||
commands.entity(entity).despawn();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Plugin for ClientSyncPlugin {
|
||||
fn build(&self, app: &mut App) {
|
||||
app.add_packet_handler::<Entity, _>(|data, world| {
|
||||
match bincode::deserialize::<Entity>(&data) {
|
||||
Ok(entity) => {
|
||||
if let Some((local_entity, _)) = world
|
||||
.query::<(Entity, &ServerEntity)>()
|
||||
.iter(world)
|
||||
.find(|(_, server_entity)| server_entity.0 == entity)
|
||||
{
|
||||
println!("Despawning {:?}", local_entity);
|
||||
world.despawn(local_entity);
|
||||
} else {
|
||||
println!("Spawning {:?}", entity);
|
||||
world.spawn(ServerEntity(entity));
|
||||
}
|
||||
}
|
||||
Err(_) => println!("Failed to deserialize packet: {}", type_name::<Entity>()),
|
||||
}
|
||||
});
|
||||
app.add_system(Self::remove_synced.run_if(resource_removed::<ServerConnection>()));
|
||||
}
|
||||
}
|
||||
|
||||
/// An extention to add syncronization to the client.
|
||||
pub trait ClientSyncExt {
|
||||
/// Register syncronization for an [Event] that comes from the server.
|
||||
fn server_event_sync<E: Event + Packet>(&mut self) -> &mut Self;
|
||||
|
||||
/// Register syncronization for an [Event] that can be sent by the client.
|
||||
fn client_event_sync<E: Event + Packet>(&mut self) -> &mut Self;
|
||||
|
||||
/// Register a [Component] to be synced.
|
||||
fn sync_component<C: Component + DeserializeOwned + Serialize>(&mut self) -> &mut Self;
|
||||
}
|
||||
|
||||
impl ClientSyncExt for App {
|
||||
fn server_event_sync<E: Event + Packet>(&mut self) -> &mut Self {
|
||||
self.add_event::<FromServer<E>>()
|
||||
.add_packet_handler::<E, _>(|data, world| match bincode::deserialize::<E>(&data) {
|
||||
Ok(event) => world.send_event(FromServer { event }),
|
||||
Err(_) => println!("Failed to deserialize packet: {}", type_name::<E>()),
|
||||
})
|
||||
}
|
||||
|
||||
fn client_event_sync<E: Event + Packet>(&mut self) -> &mut Self {
|
||||
self.add_event::<E>().add_system(
|
||||
|mut events: EventReader<E>, connection: Option<Res<ServerConnection>>| {
|
||||
if let Some(connection) = connection {
|
||||
for event in events.iter() {
|
||||
connection.send(event);
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn sync_component<C: Component + DeserializeOwned + Serialize>(&mut self) -> &mut Self {
|
||||
self.add_packet_handler::<(Entity, C), _>(|data, world| {
|
||||
match bincode::deserialize::<(Entity, C)>(&data) {
|
||||
Ok((entity, component)) => {
|
||||
if let Some((local_entity, _)) = world
|
||||
.query::<(Entity, &ServerEntity)>()
|
||||
.iter(world)
|
||||
.find(|(_, server_entity)| server_entity.0 == entity)
|
||||
{
|
||||
let mut local_entity = world.entity_mut(local_entity);
|
||||
match local_entity.get_mut::<C>() {
|
||||
Some(mut local_component) => {
|
||||
println!("CA CHANGE: {}", type_name::<C>());
|
||||
*local_component = component;
|
||||
}
|
||||
None => {
|
||||
local_entity.insert(component);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
println!("Received component for unknown entity: {:?}", entity);
|
||||
}
|
||||
}
|
||||
Err(_) => println!("Failed to deserialize packet: {}", type_name::<C>()),
|
||||
}
|
||||
})
|
||||
.add_packet_handler::<(Entity, PhantomData<C>), _>(
|
||||
|data, world| match bincode::deserialize::<(Entity, PhantomData<C>)>(&data) {
|
||||
Ok((entity, _)) => {
|
||||
if let Some((local_entity, _)) = world
|
||||
.query::<(Entity, &ServerEntity)>()
|
||||
.iter(world)
|
||||
.find(|(_, server_entity)| server_entity.0 == entity)
|
||||
{
|
||||
world.entity_mut(local_entity).remove::<C>();
|
||||
}
|
||||
}
|
||||
Err(_) => println!("Failed to deserialize packet: {}", type_name::<C>()),
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
346
src/lib.rs
346
src/lib.rs
|
@ -1,218 +1,222 @@
|
|||
use bevy::prelude::*;
|
||||
pub use packet::Packet;
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use std::{
|
||||
io,
|
||||
net::{SocketAddr, ToSocketAddrs},
|
||||
sync::Arc,
|
||||
collections::hash_map::DefaultHasher,
|
||||
hash::{Hash, Hasher},
|
||||
};
|
||||
use tcp::{Connection, Listener};
|
||||
|
||||
mod packet;
|
||||
pub mod client;
|
||||
pub mod server;
|
||||
mod tcp;
|
||||
|
||||
/// A connection to a server.
|
||||
/// A packet that can be sent over a [Connection].
|
||||
pub trait Packet: DeserializeOwned + Serialize + Send + Sync {
|
||||
/// Returns a unique identifier for this packet.
|
||||
///
|
||||
/// This function uses [std::any::type_name] to get a string
|
||||
/// representation of the type of the object and returns the
|
||||
/// hash of that string. This is not perfect... but I didn't
|
||||
/// find a better solution.
|
||||
fn packet_id() -> u64 {
|
||||
let mut hasher = DefaultHasher::new();
|
||||
std::any::type_name::<Self>().hash(&mut hasher);
|
||||
hasher.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: DeserializeOwned + Serialize + Send + Sync> Packet for T {}
|
||||
|
||||
/* use bevy::{prelude::*, utils::HashMap};
|
||||
use std::{
|
||||
io::{self, ErrorKind},
|
||||
net::{TcpStream, ToSocketAddrs},
|
||||
sync::{mpsc::TryRecvError, Arc},
|
||||
};
|
||||
use tcp::{Connection, Listener, Packet, RawPacket};
|
||||
|
||||
mod tcp;
|
||||
|
||||
/// A Bevy resource that store the packets handlers for the client.
|
||||
#[derive(Resource)]
|
||||
pub struct ClientPacketHandler(Arc<HashMap<u64, Box<dyn Fn(RawPacket, &mut World) + Send + Sync>>>);
|
||||
|
||||
/// A connection to a remote server.
|
||||
#[derive(Resource)]
|
||||
pub struct ServerConnection(Connection);
|
||||
|
||||
impl ServerConnection {
|
||||
/// Creates a [ServerConnection] to the given address.
|
||||
/// Connects to a remote server.
|
||||
pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
|
||||
Ok(Self(Connection::connect(addr)?))
|
||||
Ok(Self(Connection::new(TcpStream::connect(addr)?)?))
|
||||
}
|
||||
|
||||
/// Sends a [Packet] to the server.
|
||||
/// Send a [Packet] to the remote server.
|
||||
pub fn send<P: Packet>(&self, packet: P) {
|
||||
self.0.send(packet);
|
||||
}
|
||||
|
||||
/// Gets the address of the server.
|
||||
pub fn address(&self) -> SocketAddr {
|
||||
self.0.address()
|
||||
self.0.send(packet).ok();
|
||||
}
|
||||
}
|
||||
|
||||
/// Used to listen for incoming [ClientConnection]s.
|
||||
/// A Bevy resource that store the packets handlers for the server.
|
||||
#[derive(Resource)]
|
||||
struct ServerPacketHandler(
|
||||
Arc<HashMap<u64, Box<dyn Fn(Entity, ClientConnection, RawPacket, &mut World) + Send + Sync>>>,
|
||||
);
|
||||
|
||||
/// A [ClientConnection] listener.
|
||||
#[derive(Resource)]
|
||||
pub struct ClientListener(Listener);
|
||||
|
||||
impl ClientListener {
|
||||
/// Creates a [ClientListener] binded to the given address.
|
||||
pub fn bind<A: ToSocketAddrs>(address: A) -> io::Result<Self> {
|
||||
Ok(Self(Listener::bind(address)?))
|
||||
}
|
||||
|
||||
/// Returns the address the [ClientListener] is bound to.
|
||||
pub fn address(&self) -> SocketAddr {
|
||||
self.0.address()
|
||||
/// Creates a new TCP listener on the given address.
|
||||
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
|
||||
Ok(Self(Listener::bind(addr)?))
|
||||
}
|
||||
}
|
||||
|
||||
/// A connection to a client.
|
||||
/// A connection to a remote client.
|
||||
#[derive(Component)]
|
||||
pub struct ClientConnection(Arc<Connection>);
|
||||
|
||||
impl ClientConnection {
|
||||
/// Sends a [Packet] to the client.
|
||||
/// Sends a [Packet] to the remote client.
|
||||
pub fn send<P: Packet>(&self, packet: P) {
|
||||
self.0.send(packet);
|
||||
}
|
||||
|
||||
/// Gets the address of the client.
|
||||
pub fn address(&self) -> SocketAddr {
|
||||
self.0.address()
|
||||
self.0.send(packet).ok();
|
||||
}
|
||||
}
|
||||
|
||||
/// A [Plugin] for client networking.
|
||||
pub struct ClientNetworkPlugin;
|
||||
|
||||
impl ClientNetworkPlugin {
|
||||
/// Removes the [ServerConnection] resource when it's disconnected.
|
||||
fn remove_disconnected(mut commands: Commands, connection: Res<ServerConnection>) {
|
||||
if !connection.0.connected() {
|
||||
commands.remove_resource::<ServerConnection>();
|
||||
}
|
||||
}
|
||||
|
||||
/// Clears the packet cache of the [ServerConnection].
|
||||
fn clear_cache(connection: Res<ServerConnection>) {
|
||||
connection.0.clear();
|
||||
}
|
||||
}
|
||||
|
||||
impl Plugin for ClientNetworkPlugin {
|
||||
fn build(&self, app: &mut App) {
|
||||
app.add_systems((
|
||||
Self::remove_disconnected.run_if(resource_exists::<ServerConnection>()),
|
||||
Self::clear_cache
|
||||
.run_if(resource_exists::<ServerConnection>())
|
||||
.after(Self::remove_disconnected),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
/// A [Plugin] for server networking.
|
||||
pub struct ServerNetworkPlugin;
|
||||
|
||||
impl ServerNetworkPlugin {
|
||||
/// Removes the [ClientConnection] components when it's disconnected.
|
||||
fn remove_disconnected(
|
||||
mut commands: Commands,
|
||||
connections: Query<(Entity, &ClientConnection)>,
|
||||
) {
|
||||
for (entity, connection) in connections.iter() {
|
||||
if !connection.0.connected() {
|
||||
commands.entity(entity).remove::<ClientConnection>();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Clears the packet cache of the [ClientConnection]s.
|
||||
fn clear_cache(connections: Query<&ClientConnection>) {
|
||||
for connection in connections.iter() {
|
||||
connection.0.clear();
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes the [ClientListener] resource when it stop listening.
|
||||
fn remove_not_listening(mut commands: Commands, listener: Res<ClientListener>) {
|
||||
if !listener.0.listening() {
|
||||
commands.remove_resource::<ClientListener>();
|
||||
}
|
||||
}
|
||||
|
||||
/// Accepts incoming connections.
|
||||
fn accept_connections(mut commands: Commands, listener: Res<ClientListener>) {
|
||||
while let Some(connection) = listener.0.accept() {
|
||||
/// A Bevy system to handle incoming [ClientConnection]s and remove the
|
||||
/// [ClientListener] resource if it is no longer listening.
|
||||
fn accept_connections(mut commands: Commands, listener: Option<Res<ClientListener>>) {
|
||||
if let Some(listener) = listener {
|
||||
match listener.0.accept() {
|
||||
Ok(connection) => {
|
||||
commands.spawn(ClientConnection(Arc::new(connection)));
|
||||
}
|
||||
Err(error) => match error.kind() {
|
||||
ErrorKind::WouldBlock => {}
|
||||
_ => commands.remove_resource::<ClientListener>(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Plugin for ServerNetworkPlugin {
|
||||
/// A Bevy system to handle incoming [Packet]s from
|
||||
/// the [ClientConnection]s and the [ServerConnection].
|
||||
/// It removes them if they are no longer connected.
|
||||
fn receive_packets(world: &mut World) {
|
||||
// Handle client packets
|
||||
let mut packets = Vec::new();
|
||||
let mut to_remove = false;
|
||||
if let Some(connection) = world.get_resource::<ServerConnection>() {
|
||||
if let Some(receiver) = connection.0.recv() {
|
||||
loop {
|
||||
match receiver.try_recv() {
|
||||
Ok(packet) => packets.push(packet),
|
||||
Err(TryRecvError::Empty) => break,
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
to_remove = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(handlers) = world
|
||||
.get_resource_mut::<ClientPacketHandler>()
|
||||
.map(|handlers| Arc::clone(&handlers.0))
|
||||
{
|
||||
for packet in packets.into_iter() {
|
||||
if let Some(handler) = handlers.get(&packet.packet_id()) {
|
||||
(handler)(packet, world);
|
||||
}
|
||||
}
|
||||
}
|
||||
if to_remove {
|
||||
world.remove_resource::<ServerConnection>();
|
||||
}
|
||||
|
||||
// Handle server packets
|
||||
let mut packets = Vec::new();
|
||||
let mut to_remove = Vec::new();
|
||||
for (entity, connection) in world.query::<(Entity, &ClientConnection)>().iter(world) {
|
||||
if let Some(receiver) = connection.0.recv() {
|
||||
loop {
|
||||
match receiver.try_recv() {
|
||||
Ok(packet) => {
|
||||
println!("YESSSS");
|
||||
packets.push((entity, ClientConnection(Arc::clone(&connection.0)), packet));
|
||||
}
|
||||
Err(TryRecvError::Empty) => break,
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
to_remove.push(entity);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(handlers) = world
|
||||
.get_resource_mut::<ServerPacketHandler>()
|
||||
.map(|handlers| Arc::clone(&handlers.0))
|
||||
{
|
||||
for (entity, connection, packet) in packets.into_iter() {
|
||||
if let Some(handler) = handlers.get(&packet.packet_id()) {
|
||||
(handler)(entity, connection, packet, world);
|
||||
}
|
||||
}
|
||||
}
|
||||
for entity in to_remove {
|
||||
world.despawn(entity);
|
||||
}
|
||||
}
|
||||
|
||||
/// A network plugin.
|
||||
pub struct NetworkPlugin;
|
||||
|
||||
impl Plugin for NetworkPlugin {
|
||||
fn build(&self, app: &mut App) {
|
||||
app.add_systems((
|
||||
Self::remove_disconnected,
|
||||
Self::clear_cache.after(Self::remove_disconnected),
|
||||
Self::remove_not_listening.run_if(resource_exists::<ClientListener>()),
|
||||
Self::accept_connections
|
||||
.run_if(resource_exists::<ClientListener>())
|
||||
.after(Self::remove_not_listening),
|
||||
));
|
||||
app.insert_resource(ServerPacketHandler(Arc::new(HashMap::new())));
|
||||
app.insert_resource(ClientPacketHandler(Arc::new(HashMap::new())));
|
||||
app.add_system(accept_connections);
|
||||
app.add_system(receive_packets);
|
||||
}
|
||||
}
|
||||
|
||||
/// Receives [Packet]s and sends them as [PacketEvent]s.
|
||||
fn receive_server_packets<P: Packet>(
|
||||
mut writer: EventWriter<PacketEvent<P>>,
|
||||
connection: Query<(Entity, &ClientConnection)>,
|
||||
) {
|
||||
for (entity, connection) in connection.iter() {
|
||||
for packet in connection.0.recv() {
|
||||
writer.send(PacketEvent {
|
||||
connection: ClientConnection(Arc::clone(&connection.0)),
|
||||
entity,
|
||||
packet,
|
||||
});
|
||||
}
|
||||
}
|
||||
/// A extension trait to add packet handler.
|
||||
pub trait NetworkAppExt {
|
||||
/// Add a client packet handler.
|
||||
fn add_client_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
|
||||
where
|
||||
P: Packet,
|
||||
H: Fn(RawPacket, &mut World) + Send + Sync + 'static;
|
||||
|
||||
/// Add a server packet handler.
|
||||
fn add_server_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
|
||||
where
|
||||
P: Packet,
|
||||
H: Fn(Entity, ClientConnection, RawPacket, &mut World) + Send + Sync + 'static;
|
||||
}
|
||||
|
||||
/// An extention trait to easily register a [Packet] to the server.
|
||||
pub trait AppServerNetwork {
|
||||
/// Registers a [Packet] for the server.
|
||||
fn register_server_packet<P: Packet>(&mut self) -> &mut Self;
|
||||
impl NetworkAppExt for App {
|
||||
fn add_client_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
|
||||
where
|
||||
P: Packet,
|
||||
H: Fn(RawPacket, &mut World) + Send + Sync + 'static,
|
||||
{
|
||||
Arc::get_mut(&mut self.world.resource_mut::<ClientPacketHandler>().0)
|
||||
.unwrap()
|
||||
.insert(P::packet_id(), Box::new(handler));
|
||||
self
|
||||
}
|
||||
|
||||
impl AppServerNetwork for App {
|
||||
fn register_server_packet<P: Packet>(&mut self) -> &mut Self {
|
||||
self.add_event::<PacketEvent<P>>();
|
||||
self.add_system(
|
||||
receive_server_packets::<P>
|
||||
.after(ServerNetworkPlugin::remove_disconnected)
|
||||
.before(ServerNetworkPlugin::clear_cache),
|
||||
);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// An event for received [Packet]s on the server.
|
||||
pub struct PacketEvent<P: Packet> {
|
||||
/// The [ClientConnection] from which the [Packet] was received.
|
||||
pub connection: ClientConnection,
|
||||
|
||||
/// The [Entity] of the [ClientConnection].
|
||||
pub entity: Entity,
|
||||
|
||||
/// The [Packet]
|
||||
pub packet: P,
|
||||
}
|
||||
|
||||
/// Receives [Packet]s and sends them as [Event]s.
|
||||
fn receive_client_packets<P: Packet>(
|
||||
mut writer: EventWriter<P>,
|
||||
connection: Res<ServerConnection>,
|
||||
) {
|
||||
for packet in connection.0.recv() {
|
||||
writer.send(packet);
|
||||
}
|
||||
}
|
||||
|
||||
/// An extention trait to easily register a [Packet] to the client.
|
||||
pub trait AppClientNetwork {
|
||||
/// Registers a [Packet] for the client.
|
||||
fn register_client_packet<P: Packet>(&mut self) -> &mut Self;
|
||||
}
|
||||
|
||||
impl AppClientNetwork for App {
|
||||
fn register_client_packet<P: Packet>(&mut self) -> &mut Self {
|
||||
self.add_event::<P>();
|
||||
self.add_system(
|
||||
receive_client_packets::<P>
|
||||
.run_if(resource_exists::<ServerConnection>())
|
||||
.after(ClientNetworkPlugin::remove_disconnected)
|
||||
.before(ClientNetworkPlugin::clear_cache),
|
||||
);
|
||||
fn add_server_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
|
||||
where
|
||||
P: Packet,
|
||||
H: Fn(Entity, ClientConnection, RawPacket, &mut World) + Send + Sync + 'static,
|
||||
{
|
||||
Arc::get_mut(&mut self.world.resource_mut::<ServerPacketHandler>().0)
|
||||
.unwrap()
|
||||
.insert(P::packet_id(), Box::new(handler));
|
||||
self
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
|
|
@ -1,57 +0,0 @@
|
|||
use dashmap::DashMap;
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
|
||||
/// A [Packet] that can be sent over the network.
|
||||
pub trait Packet: DeserializeOwned + Serialize + Sync + Send + 'static {
|
||||
const ID: u32;
|
||||
}
|
||||
|
||||
/// A macro to easily implement [Packet].
|
||||
#[macro_export]
|
||||
macro_rules! impl_packet {
|
||||
($t:ty) => {
|
||||
impl ::bevnet::Packet for $t {
|
||||
const ID: u32 = ::const_fnv1a_hash::fnv1a_hash_32(
|
||||
concat!(module_path!(), "::", stringify!($t)).as_bytes(),
|
||||
None,
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// A container for the received [Packet]s.
|
||||
pub struct PacketReceiver {
|
||||
/// The received data.
|
||||
data: DashMap<u32, Vec<Vec<u8>>>,
|
||||
}
|
||||
|
||||
impl PacketReceiver {
|
||||
/// Creates a new [PacketReceiver].
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
data: DashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Clears all the [Packet]s.
|
||||
pub fn clear(&self) {
|
||||
self.data.clear();
|
||||
}
|
||||
|
||||
/// Inserts a the received raw [Packet] into the [PacketReceiver].
|
||||
pub fn insert(&self, id: u32, data: Vec<u8>) {
|
||||
self.data.entry(id).or_default().push(data);
|
||||
}
|
||||
|
||||
/// Extract all the [Packet]s of a given type.
|
||||
pub fn extract<P: Packet>(&self) -> Vec<P> {
|
||||
match self.data.get_mut(&P::ID) {
|
||||
Some(mut data) => data
|
||||
.value_mut()
|
||||
.drain(..)
|
||||
.filter_map(|data| bincode::deserialize(&data).ok())
|
||||
.collect(),
|
||||
None => Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
129
src/server/mod.rs
Normal file
129
src/server/mod.rs
Normal file
|
@ -0,0 +1,129 @@
|
|||
use crate::{
|
||||
tcp::{Connection, Listener},
|
||||
Packet,
|
||||
};
|
||||
use bevy::prelude::*;
|
||||
use std::{collections::HashMap, io, net::ToSocketAddrs, sync::Arc};
|
||||
|
||||
pub mod sync;
|
||||
|
||||
/// A function that handle a received [Packet] on the server.
|
||||
pub type ServerPacketHandler =
|
||||
Box<dyn Fn(Entity, ClientConnection, Vec<u8>, &mut World) + Send + Sync>;
|
||||
|
||||
/// A Bevy resource that store the packets handlers for the server.
|
||||
#[derive(Resource)]
|
||||
pub struct ServerHandlerManager(Arc<HashMap<u64, ServerPacketHandler>>);
|
||||
|
||||
/// A Bevy resource that listens for incoming [ClientConnection]s.
|
||||
#[derive(Resource)]
|
||||
pub struct ClientListener(Listener);
|
||||
|
||||
impl ClientListener {
|
||||
/// Creates a new listener on the given address.
|
||||
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
|
||||
Ok(Self(Listener::bind(addr)?))
|
||||
}
|
||||
}
|
||||
|
||||
/// A connection to a remote client.
|
||||
#[derive(Component)]
|
||||
pub struct ClientConnection(Arc<Connection>);
|
||||
|
||||
impl ClientConnection {
|
||||
/// Sends a packet through this connection.
|
||||
pub fn send<P: Packet>(&self, packet: &P) {
|
||||
let mut data = bincode::serialize(packet).expect("Failed to serialize packet");
|
||||
data.extend(P::packet_id().to_be_bytes());
|
||||
self.0.send(data);
|
||||
}
|
||||
}
|
||||
|
||||
/// A plugin that manage the network connections for a server.
|
||||
pub struct ServerPlugin;
|
||||
|
||||
impl ServerPlugin {
|
||||
/// Accept new [ClientConnection]s.
|
||||
pub fn accept_connections(mut commands: Commands, listener: Option<Res<ClientListener>>) {
|
||||
if let Some(listener) = listener {
|
||||
if let Some(connection) = listener.0.accept() {
|
||||
commands.spawn(ClientConnection(Arc::new(connection)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles a received [Packet] on the server.
|
||||
pub fn handle_packets(world: &mut World) {
|
||||
// Get all received packets
|
||||
let mut packets = Vec::new();
|
||||
for (entity, connection) in world.query::<(Entity, &ClientConnection)>().iter(world) {
|
||||
while let Some(mut packet) = connection.0.recv() {
|
||||
if packet.len() < 8 {
|
||||
println!("Invalid packet received: {:?}", packet);
|
||||
} else {
|
||||
let id_buffer = packet.split_off(packet.len() - 8);
|
||||
let packet_id = u64::from_be_bytes(id_buffer.try_into().unwrap());
|
||||
packets.push((
|
||||
entity,
|
||||
ClientConnection(Arc::clone(&connection.0)),
|
||||
packet_id,
|
||||
packet,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get the packet handlers
|
||||
let handlers = Arc::clone(&world.resource_mut::<ServerHandlerManager>().0);
|
||||
|
||||
// Handle all received packets
|
||||
for (entity, connection, packet_id, packet) in packets {
|
||||
if let Some(handler) = handlers.get(&packet_id) {
|
||||
handler(entity, connection, packet, world);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove disconnected [ClientConnection]s.
|
||||
pub fn remove_disconnected(
|
||||
mut commands: Commands,
|
||||
connections: Query<(Entity, &ClientConnection)>,
|
||||
) {
|
||||
for (entity, connection) in connections.iter() {
|
||||
if connection.0.closed() {
|
||||
commands.entity(entity).remove::<ClientConnection>();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Plugin for ServerPlugin {
|
||||
fn build(&self, app: &mut App) {
|
||||
app.insert_resource(ServerHandlerManager(Arc::new(HashMap::new())));
|
||||
app.add_system(ServerPlugin::accept_connections);
|
||||
app.add_system(ServerPlugin::handle_packets);
|
||||
app.add_system(ServerPlugin::remove_disconnected);
|
||||
}
|
||||
}
|
||||
|
||||
/// An extension to add packet handlers.
|
||||
pub trait ServerAppExt {
|
||||
/// Add a new packet handler.
|
||||
fn add_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
|
||||
where
|
||||
P: Packet,
|
||||
H: Fn(Entity, ClientConnection, Vec<u8>, &mut World) + Send + Sync + 'static;
|
||||
}
|
||||
|
||||
impl ServerAppExt for App {
|
||||
fn add_packet_handler<P, H>(&mut self, handler: H) -> &mut Self
|
||||
where
|
||||
P: Packet,
|
||||
H: Fn(Entity, ClientConnection, Vec<u8>, &mut World) + Send + Sync + 'static,
|
||||
{
|
||||
Arc::get_mut(&mut self.world.resource_mut::<ServerHandlerManager>().0)
|
||||
.unwrap()
|
||||
.insert(P::packet_id(), Box::new(handler));
|
||||
self
|
||||
}
|
||||
}
|
154
src/server/sync.rs
Normal file
154
src/server/sync.rs
Normal file
|
@ -0,0 +1,154 @@
|
|||
use super::ServerAppExt;
|
||||
use crate::{server::ClientConnection, Packet};
|
||||
use bevy::prelude::*;
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use std::{any::type_name, marker::PhantomData, ops::Deref};
|
||||
|
||||
/// An event that comes from a client.
|
||||
pub struct FromClient<E: Event + Packet> {
|
||||
/// The entity of the [ClientConnection] that sent the event.
|
||||
pub entity: Entity,
|
||||
|
||||
/// The [ClientConnection] that sent the event.
|
||||
pub connection: ClientConnection,
|
||||
|
||||
/// The event.
|
||||
pub event: E,
|
||||
}
|
||||
|
||||
impl<E: Event + Packet> Deref for FromClient<E> {
|
||||
type Target = E;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.event
|
||||
}
|
||||
}
|
||||
|
||||
/// Mark an [Entity] to be synced.
|
||||
#[derive(Component)]
|
||||
pub struct Synced;
|
||||
|
||||
/// A plugin for the syncronization system.
|
||||
pub struct ServerSyncPlugin;
|
||||
|
||||
impl ServerSyncPlugin {
|
||||
/// Send to clients the [Synced] entity that has been added to the server.
|
||||
fn send_added(
|
||||
added_entities: Query<Entity, Added<Synced>>,
|
||||
connections: Query<&ClientConnection>,
|
||||
) {
|
||||
for entity in added_entities.iter() {
|
||||
for connection in connections.iter() {
|
||||
connection.send(&entity);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Send [Synced] entities to new clients.
|
||||
fn send_synced(
|
||||
synced_entities: Query<Entity, With<Synced>>,
|
||||
new_connections: Query<&ClientConnection, Added<ClientConnection>>,
|
||||
) {
|
||||
for entity in synced_entities.iter() {
|
||||
for connection in new_connections.iter() {
|
||||
connection.send(&entity);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Send to clients the [Synced] entity that has been removed.
|
||||
fn send_removed(
|
||||
mut removed_entities: RemovedComponents<Synced>,
|
||||
connections: Query<&ClientConnection>,
|
||||
) {
|
||||
for entity in removed_entities.iter() {
|
||||
for connection in connections.iter() {
|
||||
connection.send(&entity);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Plugin for ServerSyncPlugin {
|
||||
fn build(&self, app: &mut App) {
|
||||
app.add_system(Self::send_added);
|
||||
app.add_system(Self::send_synced);
|
||||
app.add_system(Self::send_removed);
|
||||
}
|
||||
}
|
||||
|
||||
/// An extention to add syncronization to the server.
|
||||
pub trait ServerSyncExt {
|
||||
/// Register syncronization for an [Event] that can be sent by the server.
|
||||
fn server_event_sync<E: Event + Packet>(&mut self) -> &mut Self;
|
||||
|
||||
/// Register syncronization for an [Event] that comes from the client.
|
||||
fn client_event_sync<E: Event + Packet>(&mut self) -> &mut Self;
|
||||
|
||||
/// Register a [Component] to be synced.
|
||||
fn sync_component<C: Component + DeserializeOwned + Serialize + Clone>(&mut self) -> &mut Self;
|
||||
}
|
||||
|
||||
impl ServerSyncExt for App {
|
||||
fn server_event_sync<E: Event + Packet>(&mut self) -> &mut Self {
|
||||
self.add_event::<E>().add_system(
|
||||
|mut events: EventReader<E>, connections: Query<&ClientConnection>| {
|
||||
for event in events.iter() {
|
||||
for connection in connections.iter() {
|
||||
connection.send(event);
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn client_event_sync<E: Event + Packet>(&mut self) -> &mut Self {
|
||||
self.add_event::<FromClient<E>>()
|
||||
.add_packet_handler::<E, _>(
|
||||
|entity, connection, data, world| match bincode::deserialize::<E>(&data) {
|
||||
Ok(event) => world.send_event(FromClient {
|
||||
entity,
|
||||
connection,
|
||||
event,
|
||||
}),
|
||||
Err(_) => println!("Failed to deserialize packet: {}", type_name::<E>()),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn sync_component<C: Component + DeserializeOwned + Serialize + Clone>(&mut self) -> &mut Self {
|
||||
let update_components =
|
||||
|changed_components: Query<(Entity, &C), (Changed<C>, With<Synced>)>,
|
||||
connections: Query<&ClientConnection>| {
|
||||
for (entity, component) in changed_components.iter() {
|
||||
for connection in connections.iter() {
|
||||
connection.send(&(entity, component.clone()));
|
||||
}
|
||||
}
|
||||
};
|
||||
let send_components =
|
||||
|components: Query<(Entity, &C), With<Synced>>,
|
||||
new_connections: Query<&ClientConnection, Added<ClientConnection>>| {
|
||||
for (entity, component) in components.iter() {
|
||||
for connection in new_connections.iter() {
|
||||
connection.send(&(entity, component.clone()));
|
||||
}
|
||||
}
|
||||
};
|
||||
let remove_components =
|
||||
|mut removed_components: RemovedComponents<C>,
|
||||
synced_entities: Query<Entity, With<Synced>>,
|
||||
connections: Query<&ClientConnection>| {
|
||||
for entity in removed_components.iter() {
|
||||
if synced_entities.contains(entity) {
|
||||
for connection in connections.iter() {
|
||||
connection.send(&(entity, PhantomData::<C>));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
self.add_system(update_components.after(ServerSyncPlugin::send_added))
|
||||
.add_system(send_components.after(ServerSyncPlugin::send_synced))
|
||||
.add_system(remove_components.after(update_components))
|
||||
}
|
||||
}
|
283
src/tcp.rs
283
src/tcp.rs
|
@ -1,7 +1,6 @@
|
|||
use crate::packet::{Packet, PacketReceiver};
|
||||
use std::{
|
||||
io::{self, Read, Write},
|
||||
net::{Shutdown, SocketAddr, TcpListener, TcpStream, ToSocketAddrs},
|
||||
net::{Shutdown, TcpListener, TcpStream, ToSocketAddrs},
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
mpsc::{channel, Receiver, Sender},
|
||||
|
@ -10,224 +9,144 @@ use std::{
|
|||
thread,
|
||||
};
|
||||
|
||||
/// Used to send [Packet] to the sending thread.
|
||||
type ConnectionSender = Arc<Mutex<Sender<(u32, Vec<u8>)>>>;
|
||||
|
||||
/// A TCP [Connection] that can send and receive [Packet].
|
||||
/// A non-blocking TCP connection.
|
||||
pub struct Connection {
|
||||
/// Whether or not the [Connection] is currently connected.
|
||||
connected: Arc<AtomicBool>,
|
||||
/// Track if the connection has been closed.
|
||||
closed: Arc<AtomicBool>,
|
||||
|
||||
/// Used to store the received [Packet]s.
|
||||
packets: Arc<PacketReceiver>,
|
||||
|
||||
/// Used to send [Packet] to the sending thread.
|
||||
sender: ConnectionSender,
|
||||
|
||||
/// The [TcpStream] of the [Connection].
|
||||
/// The underlying TCP stream.
|
||||
stream: TcpStream,
|
||||
|
||||
/// The address of the [Connection].
|
||||
address: SocketAddr,
|
||||
/// Used to receive packets from the receiving thread.
|
||||
receiver: Mutex<Receiver<Vec<u8>>>,
|
||||
|
||||
/// Used to send packets to the sending thread.
|
||||
sender: Mutex<Sender<Vec<u8>>>,
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
/// Creates a new [Connection] with the given [TcpStream].
|
||||
pub fn new(stream: TcpStream) -> io::Result<Self> {
|
||||
let connected = Arc::new(AtomicBool::new(true));
|
||||
let packets = Arc::new(PacketReceiver::new());
|
||||
/// Creates a new connection.
|
||||
fn new(stream: TcpStream) -> io::Result<Self> {
|
||||
stream.set_nonblocking(false)?;
|
||||
let closed = Arc::new(AtomicBool::new(false));
|
||||
|
||||
// Receiving part
|
||||
let mut thread_stream = stream.try_clone()?;
|
||||
let thread_packets = Arc::clone(&packets);
|
||||
let thread_connected = Arc::clone(&connected);
|
||||
thread::spawn(move || {
|
||||
let mut int_buffer = [0; 4];
|
||||
// Spawn the receiving thread
|
||||
let thread_stream = stream.try_clone()?;
|
||||
let (thread_sender, receiver) = channel();
|
||||
let thread_closed = Arc::clone(&closed);
|
||||
thread::spawn(move || Self::receiving_loop(thread_stream, thread_sender, thread_closed));
|
||||
|
||||
// Spawn the sending thread
|
||||
let thread_stream = stream.try_clone()?;
|
||||
let (sender, thread_receiver) = channel();
|
||||
let thread_closed = Arc::clone(&closed);
|
||||
thread::spawn(move || Self::sending_loop(thread_stream, thread_receiver, thread_closed));
|
||||
|
||||
// Return the connection
|
||||
Ok(Self {
|
||||
closed,
|
||||
stream,
|
||||
receiver: Mutex::new(receiver),
|
||||
sender: Mutex::new(sender),
|
||||
})
|
||||
}
|
||||
|
||||
/// Creates a new connection to the given address.
|
||||
pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
|
||||
Self::new(TcpStream::connect(addr)?)
|
||||
}
|
||||
|
||||
/// The receiving loop for this connection.
|
||||
fn receiving_loop(mut stream: TcpStream, sender: Sender<Vec<u8>>, closed: Arc<AtomicBool>) {
|
||||
let mut len_buffer = [0; 4];
|
||||
loop {
|
||||
// Check if the connection is closed
|
||||
if !thread_connected.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Read the length of the packet
|
||||
if thread_stream.read_exact(&mut int_buffer).is_err() {
|
||||
// Read the length of the next packet
|
||||
if stream.read_exact(&mut len_buffer).is_err() {
|
||||
break;
|
||||
}
|
||||
let len = u32::from_be_bytes(int_buffer);
|
||||
|
||||
// Read the packet identifier
|
||||
if thread_stream.read_exact(&mut int_buffer).is_err() {
|
||||
break;
|
||||
}
|
||||
let id = u32::from_be_bytes(int_buffer);
|
||||
let len = u32::from_be_bytes(len_buffer);
|
||||
|
||||
// Read the packet
|
||||
let mut buffer = vec![0; len as usize];
|
||||
if thread_stream.read_exact(&mut buffer).is_err() {
|
||||
let mut packet = vec![0; len as usize];
|
||||
if stream.read_exact(&mut packet).is_err() {
|
||||
break;
|
||||
}
|
||||
|
||||
// Insert the packet
|
||||
thread_packets.insert(id, buffer);
|
||||
// Send the packet
|
||||
if sender.send(packet).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
closed.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
// Close the connection
|
||||
thread_connected.store(false, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
// Sending part
|
||||
let mut thread_stream = stream.try_clone()?;
|
||||
let (sender, receiver) = channel();
|
||||
let thread_connected = Arc::clone(&connected);
|
||||
thread::spawn(move || {
|
||||
/// The sending loop for this connection.
|
||||
fn sending_loop(mut stream: TcpStream, receiver: Receiver<Vec<u8>>, closed: Arc<AtomicBool>) {
|
||||
loop {
|
||||
// Check if the connection is closed
|
||||
if !thread_connected.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Get the data to send
|
||||
let (id, buffer): (u32, Vec<u8>) = match receiver.recv() {
|
||||
Ok(data) => data,
|
||||
// Get the next packet to send
|
||||
let packet = match receiver.recv() {
|
||||
Ok(packet) => packet,
|
||||
Err(_) => break,
|
||||
};
|
||||
|
||||
// Send the length of the data
|
||||
let len = buffer.len() as u32;
|
||||
if thread_stream.write_all(&len.to_be_bytes()).is_err() {
|
||||
// Send the length of the packet
|
||||
let len_buffer = u32::to_be_bytes(packet.len() as u32);
|
||||
if stream.write_all(&len_buffer).is_err() {
|
||||
break;
|
||||
}
|
||||
|
||||
// Send the packet identifier
|
||||
if thread_stream.write_all(&id.to_be_bytes()).is_err() {
|
||||
break;
|
||||
}
|
||||
|
||||
// Send the data
|
||||
if thread_stream.write_all(&buffer).is_err() {
|
||||
break;
|
||||
}
|
||||
|
||||
// Flush the stream
|
||||
if thread_stream.flush().is_err() {
|
||||
// Send the packet
|
||||
if stream.write_all(&packet).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Close the connection
|
||||
thread_connected.store(false, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
connected,
|
||||
packets,
|
||||
sender: Arc::new(Mutex::new(sender)),
|
||||
address: stream.peer_addr()?,
|
||||
stream,
|
||||
})
|
||||
closed.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Creates a [Connection] to the given address.
|
||||
pub fn connect<A: ToSocketAddrs>(address: A) -> io::Result<Self> {
|
||||
Self::new(TcpStream::connect(address)?)
|
||||
/// Returns `true` if the connection has been closed.
|
||||
pub fn closed(&self) -> bool {
|
||||
self.closed.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Returns whether or not the [Connection] is currently connected.
|
||||
pub fn connected(&self) -> bool {
|
||||
self.connected.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Clears the [Packet] cache.
|
||||
pub fn clear(&self) {
|
||||
self.packets.clear();
|
||||
}
|
||||
|
||||
/// Gets all the received [Packet]s of a certain type.
|
||||
pub fn recv<P: Packet>(&self) -> Vec<P> {
|
||||
self.packets.extract()
|
||||
}
|
||||
|
||||
/// Sends the given [Packet] to the [Connection].
|
||||
/// Does nothing if the [Connection] is closed.
|
||||
pub fn send<P: Packet>(&self, packet: P) {
|
||||
let data = bincode::serialize(&packet).expect("Failed to serialize packet");
|
||||
self.sender
|
||||
.lock()
|
||||
.map(|sender| sender.send((P::ID, data)))
|
||||
.ok();
|
||||
}
|
||||
|
||||
/// Returns the address of the [Connection].
|
||||
pub fn address(&self) -> SocketAddr {
|
||||
self.address
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Connection {
|
||||
fn drop(&mut self) {
|
||||
self.connected.store(false, Ordering::Relaxed);
|
||||
self.stream.shutdown(Shutdown::Both).ok();
|
||||
}
|
||||
}
|
||||
|
||||
/// A TCP [Listener] that can accept [Connection]s.
|
||||
pub struct Listener {
|
||||
/// Whether the [Listener] is listening.
|
||||
listening: Arc<AtomicBool>,
|
||||
|
||||
/// The receiving part of the [Listener].
|
||||
receiver: Arc<Mutex<Receiver<Connection>>>,
|
||||
|
||||
/// The address the [Listener] is bound to.
|
||||
address: SocketAddr,
|
||||
}
|
||||
|
||||
impl Listener {
|
||||
/// Creates a new [Listener] binded to the given address.
|
||||
pub fn bind<A: ToSocketAddrs>(address: A) -> io::Result<Self> {
|
||||
let listener = TcpListener::bind(address)?;
|
||||
let address = listener.local_addr()?;
|
||||
let listening = Arc::new(AtomicBool::new(true));
|
||||
let listening_thread = Arc::clone(&listening);
|
||||
let (sender, receiver) = channel();
|
||||
thread::spawn(move || {
|
||||
for stream in listener.incoming() {
|
||||
let connection = match stream {
|
||||
Ok(stream) => match Connection::new(stream) {
|
||||
Ok(connection) => connection,
|
||||
Err(_) => break,
|
||||
},
|
||||
Err(_) => break,
|
||||
};
|
||||
if sender.send(connection).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
listening_thread.store(false, Ordering::Relaxed);
|
||||
});
|
||||
Ok(Self {
|
||||
listening,
|
||||
receiver: Arc::new(Mutex::new(receiver)),
|
||||
address,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns whether or not the [Listener] is listening.
|
||||
pub fn listening(&self) -> bool {
|
||||
self.listening.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Receives the next [Connection] from the [Listener].
|
||||
pub fn accept(&self) -> Option<Connection> {
|
||||
/// Returns the next received packet.
|
||||
pub fn recv(&self) -> Option<Vec<u8>> {
|
||||
self.receiver
|
||||
.lock()
|
||||
.ok()
|
||||
.and_then(|receiver| receiver.try_recv().ok())
|
||||
}
|
||||
|
||||
/// Returns the address the [Listener] is bound to.
|
||||
pub fn address(&self) -> SocketAddr {
|
||||
self.address
|
||||
/// Sends a packet through this connection.
|
||||
pub fn send(&self, packet: Vec<u8>) {
|
||||
self.sender.lock().map(|sender| sender.send(packet)).ok();
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Connection {
|
||||
fn drop(&mut self) {
|
||||
self.stream.shutdown(Shutdown::Both).ok();
|
||||
}
|
||||
}
|
||||
|
||||
/// A [Connection] listener.
|
||||
pub struct Listener {
|
||||
/// The [TcpListener] of the listener.
|
||||
listener: TcpListener,
|
||||
}
|
||||
|
||||
impl Listener {
|
||||
/// Creates a new TCP listener on the given address.
|
||||
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
|
||||
let listener = TcpListener::bind(addr)?;
|
||||
listener.set_nonblocking(true)?;
|
||||
Ok(Self { listener })
|
||||
}
|
||||
|
||||
/// Accepts a new [Connection].
|
||||
pub fn accept(&self) -> Option<Connection> {
|
||||
self.listener
|
||||
.accept()
|
||||
.and_then(|(stream, _)| Connection::new(stream))
|
||||
.ok()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue