Remove last system
Some checks are pending
Rust Checks / checks (push) Waiting to run

Co-authored-by: CoCoSol <CoCoSol007@users.noreply.github.com>
This commit is contained in:
Tipragot 2024-04-08 17:45:18 +02:00
parent 106cabe951
commit 1363883dd5
10 changed files with 23 additions and 1538 deletions

811
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,20 +0,0 @@
[package]
name = "bevnet"
version = "0.2.0"
edition = "2021"
license = "GPL-3.0-or-later"
description = "A library for networking in Bevy."
authors = ["Tipragot <contact@tipragot.fr>"]
keywords = ["bevy", "network", "game"]
categories = ["network-programming", "game-development"]
[lints]
workspace = true
[dependencies]
relay-client = { path = "../relay-client" }
serde = "1.0.196"
bincode = "1.3.3"
dashmap = "5.5.3"
bevy = "0.12.1"
uuid = "1.7.0"

View file

@ -1,152 +0,0 @@
//! A networking library for Bevy.
use std::borrow::Cow;
use std::collections::LinkedList;
use bevy::prelude::*;
use dashmap::DashMap;
use serde::de::DeserializeOwned;
use serde::Serialize;
pub use uuid::Uuid;
/// A connection to a relay server.
#[derive(Resource)]
pub struct Connection(relay_client::Connection);
/// A resource that stores the received messages.
#[derive(Resource)]
pub struct ReceivedMessages(DashMap<u16, LinkedList<(Uuid, Vec<u8>)>>);
impl Connection {
/// Returns the identifier of the connection.
pub const fn identifier(&self) -> Option<Uuid> {
self.0.identifier()
}
}
/// A bevy plugin to make multiplayer game using a relay server.
pub struct NetworkPlugin(String);
impl NetworkPlugin {
/// Create a new [NetworkPlugin] plugin with the given domain for the relay
/// server.
pub fn new<'a>(domain: impl Into<Cow<'a, str>>) -> Self {
Self(domain.into().into_owned())
}
}
/// Update the relay connection.
fn update_connection(mut connection: ResMut<Connection>, received_messages: Res<ReceivedMessages>) {
let messages = connection.0.update();
for (sender, mut message) in messages {
if message.len() < 2 {
error!("message too short received");
continue;
}
let id_start = message.len() - 2;
let event_id = u16::from_be_bytes([message[id_start], message[id_start + 1]]);
message.truncate(id_start);
received_messages
.0
.entry(event_id)
.or_default()
.push_back((sender, message));
}
}
/// A system that clear the received messages.
fn clear_received_messages(received_messages: Res<ReceivedMessages>) {
received_messages.0.clear();
}
impl Plugin for NetworkPlugin {
fn build(&self, app: &mut App) {
app.insert_resource(Connection(
relay_client::Connection::new(&self.0).expect("could not create connection"),
))
.insert_resource(ReceivedMessages(DashMap::new()))
.add_systems(PreUpdate, update_connection)
.add_systems(PreUpdate, clear_received_messages.after(update_connection));
}
}
/// A resource that store the last event id used to register an [Event].
///
/// This is used to give an unique id to each event.
#[derive(Resource, Default)]
struct LastEventId(u16);
/// An [Event] used to send an [Event] to another client on the relay server.
#[derive(Event)]
pub struct SendTo<T: Event + DeserializeOwned + Serialize>(pub Uuid, pub T);
/// An [Event] used to receive an [Event] from another client on the relay
/// server.
#[derive(Event)]
pub struct Receive<T: Event + DeserializeOwned + Serialize>(pub Uuid, pub T);
/// A trait that extends a bevy [App] to add multiplayer support.
pub trait NetworkAppExt {
/// Setup the application to manage network events of type `T`.
fn add_network_event<T: Event + DeserializeOwned + Serialize>(&mut self) -> &mut Self;
}
impl NetworkAppExt for App {
fn add_network_event<T: Event + DeserializeOwned + Serialize>(&mut self) -> &mut Self {
// Get a new event id.
let mut event_id = self.world.get_resource_or_insert_with(LastEventId::default);
event_id.0 += 1;
let event_id = event_id.0;
// Register the event.
self.add_event::<SendTo<T>>()
.add_event::<Receive<T>>()
.add_systems(
PreUpdate,
(move |mut events: EventReader<SendTo<T>>, connection: Res<Connection>| {
for event in events.read() {
// Get the size of the serialized event.
let size = match bincode::serialized_size(&event.1) {
Ok(size) => size,
Err(e) => {
error!("failed to serialize event: {}", e);
continue;
}
};
// Serialize the event we add 18 here because we will add the event id (2
// bytes) at the end and after that, the relay client will add the target id
// at the end (16 bytes).
let mut data = Vec::with_capacity(size as usize + 18);
if let Err(e) = bincode::serialize_into(&mut data, &event.1) {
error!("failed to serialize event: {}", e);
continue;
}
// Add the event id.
data.extend_from_slice(&event_id.to_be_bytes());
// Send the event.
connection.0.send(event.0, data);
}
})
.before(update_connection),
)
.add_systems(
PreUpdate,
(move |mut writer: EventWriter<Receive<T>>,
received_messages: Res<ReceivedMessages>| {
if let Some(mut messages) = received_messages.0.get_mut(&event_id) {
while let Some((sender, message)) = messages.pop_front() {
match bincode::deserialize(&message) {
Ok(event) => writer.send(Receive(sender, event)),
Err(e) => error!("failed to deserialize event: {}", e),
}
}
}
})
.before(clear_received_messages)
.after(update_connection),
)
}
}

View file

@ -5,7 +5,6 @@ edition = "2021"
license = "GPL-3.0-or-later"
description = "An online turn based game."
repository = "https://git.tipragot.fr/corentin/border-wars.git"
authors = ["CoCoSol"]
[lints]
workspace = true
@ -15,6 +14,5 @@ bevy = "0.12.1"
bevy_egui = "0.24.0"
noise = "0.8.2"
paste = "1.0.14"
bevnet = { path = "../bevnet" }
serde = "1.0.197"
rand = "0.8.5"

View file

@ -1,20 +0,0 @@
[package]
name = "relay-client"
version = "0.2.0"
edition = "2021"
license = "GPL-3.0-or-later"
description = "A client to use a relay server."
authors = ["Tipragot <contact@tipragot.fr>"]
keywords = ["bevy", "network", "game"]
categories = ["network-programming", "game-development"]
[lints]
workspace = true
[dependencies]
tungstenite = { version = "0.21.0", features = ["rustls-tls-native-roots"] }
mio = { version = "0.8.10", features = ["net", "os-poll"] }
uuid = "1.7.0"
rand = "0.8.5"
home = "0.5.9"
log = "0.4.20"

View file

@ -1,361 +0,0 @@
//! A library containing a client to use a relay server.
use std::borrow::Cow;
use std::collections::LinkedList;
use std::fs;
use std::io::{self};
use std::net::{SocketAddr, ToSocketAddrs};
use std::path::PathBuf;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use log::warn;
use mio::net::TcpStream;
use rand::seq::SliceRandom;
use tungstenite::handshake::MidHandshake;
use tungstenite::stream::MaybeTlsStream;
use tungstenite::{ClientHandshake, HandshakeError, Message, WebSocket};
use uuid::Uuid;
/// The state of a [Connection].
#[derive(Debug)]
enum ConnectionState {
/// The [Connection] is not connected.
Disconnected,
/// The underlying [TcpStream] is connecting.
Connecting(TcpStream, Instant),
/// The underlying [TcpStream] is connected.
Connected(TcpStream),
/// The websocket handshake is in progress.
Handshaking(MidHandshake<ClientHandshake<MaybeTlsStream<TcpStream>>>),
/// The websocket handshake is finished.
Handshaked(WebSocket<MaybeTlsStream<TcpStream>>),
/// The [Connection] is registering with the relay server.
Registering(WebSocket<MaybeTlsStream<TcpStream>>),
/// The [Connection] is connected.
Active(WebSocket<MaybeTlsStream<TcpStream>>),
}
/// A connection to a relay server.
pub struct Connection {
/// The address list corresponding to the relay server.
address_list: Vec<SocketAddr>,
/// The domain of the relay server.
domain: String,
/// The path to the file where the identifier and secret key are stored.
data_path: PathBuf,
/// The identifier of the connection for the relay server.
identifier: Option<Uuid>,
/// The secret key used to authenticate with the relay server.
secret: Option<Uuid>,
/// A list of messages that needs to be sent.
to_send: Mutex<LinkedList<Message>>,
/// The state of the connection.
state: ConnectionState,
}
impl Connection {
/// Create a new [Connection].
pub fn new<'a>(domain: impl Into<Cow<'a, str>>) -> io::Result<Self> {
let domain = domain.into();
// Loads the identifier and secret key from disk.
let (data_path, identifier, secret) = {
// Find the relay data file path.
let mut path = home::home_dir().ok_or_else(|| {
io::Error::new(io::ErrorKind::NotFound, "could not find home directory")
})?;
path.push(".relay-data");
// Check if the file exists.
match false {
true => {
// Read the file and parse the identifier and secret key.
let contents = fs::read(&path)?;
if contents.len() != 32 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid data in .relay-data",
));
}
let identifier = Uuid::from_slice(&contents[..16]).map_err(io::Error::other)?;
let secret = Uuid::from_slice(&contents[16..]).map_err(io::Error::other)?;
(path, Some(identifier), Some(secret))
}
false => (path, None, None),
}
};
// Create the connection and return it.
Ok(Self {
address_list: (domain.as_ref(), 443).to_socket_addrs()?.collect(),
domain: domain.into_owned(),
data_path,
identifier,
secret,
to_send: Mutex::new(LinkedList::new()),
state: ConnectionState::Disconnected,
})
}
/// Get the identifier of the connection.
pub const fn identifier(&self) -> Option<Uuid> {
self.identifier
}
/// Send a message to the target client.
pub fn send<'a>(&self, target_id: Uuid, message: impl Into<Cow<'a, [u8]>>) {
let mut data = message.into().into_owned();
data.extend_from_slice(target_id.as_bytes());
if let Ok(mut to_send) = self.to_send.lock() {
to_send.push_back(Message::binary(data));
}
}
/// Create a new [TcpStream] to the relay server.
fn create_stream(&mut self) -> ConnectionState {
// Take a random relay address.
let Some(address) = self.address_list.choose(&mut rand::thread_rng()) else {
warn!("no relay address available");
return ConnectionState::Disconnected;
};
// Create the new TCP stream.
match TcpStream::connect(address.to_owned()) {
Ok(stream) => ConnectionState::Connecting(stream, Instant::now()),
Err(e) => {
warn!("failed to start connection to the relay server: {e}");
ConnectionState::Disconnected
}
}
}
/// Check if the [TcpStream] of the [Connection] is connected.
fn check_connection(&mut self, stream: TcpStream, start: Instant) -> ConnectionState {
// Check for connection errors.
if let Err(e) = stream.take_error() {
warn!("failed to connect to the relay server: {e}");
return ConnectionState::Disconnected;
}
// Check if the stream is connected.
let connected = match stream.peek(&mut [0]) {
Ok(_) => true,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true,
Err(ref e) if e.kind() == io::ErrorKind::NotConnected => false,
Err(e) => {
warn!("failed to connect to the relay server: {e}");
return ConnectionState::Disconnected;
}
};
// Check if the connection has timed out.
let elapsed = start.elapsed();
if elapsed > Duration::from_secs(5) {
warn!("connection to the relay server timed out");
return ConnectionState::Disconnected;
}
// Update the connection state if connected.
match connected {
true => ConnectionState::Connected(stream),
false => ConnectionState::Connecting(stream, start),
}
}
/// Start the websocket handshake.
fn start_handshake(&mut self, stream: TcpStream) -> ConnectionState {
match tungstenite::client_tls(format!("wss://{}", self.domain), stream) {
Ok((socket, _)) => ConnectionState::Handshaked(socket),
Err(HandshakeError::Interrupted(handshake)) => ConnectionState::Handshaking(handshake),
Err(HandshakeError::Failure(e)) => {
warn!("handshake failed with the relay server: {e}");
ConnectionState::Disconnected
}
}
}
/// Continue the websocket handshake.
fn continue_handshake(
&mut self,
handshake: MidHandshake<ClientHandshake<MaybeTlsStream<TcpStream>>>,
) -> ConnectionState {
match handshake.handshake() {
Ok((socket, _)) => ConnectionState::Handshaked(socket),
Err(HandshakeError::Interrupted(handshake)) => ConnectionState::Handshaking(handshake),
Err(HandshakeError::Failure(e)) => {
warn!("handshake failed with the relay server: {e}");
ConnectionState::Disconnected
}
}
}
/// Start authentication with the relay server.
fn start_authentication(
&mut self,
mut socket: WebSocket<MaybeTlsStream<TcpStream>>,
) -> ConnectionState {
match (self.identifier, self.secret) {
(Some(identifier), Some(secret)) => {
// Create the authentication message.
let mut data = Vec::with_capacity(32);
data.extend(identifier.as_bytes());
data.extend(secret.as_bytes());
// Send the authentication message.
match socket.send(Message::Binary(data)) {
Ok(()) => ConnectionState::Active(socket),
Err(e) => {
warn!("failed to send authentication message: {e}");
ConnectionState::Disconnected
}
}
}
_ => {
// Send empty authentication message to request a new identifier and secret key.
match socket.send(Message::Binary(vec![])) {
Ok(()) => ConnectionState::Registering(socket),
Err(e) => {
warn!("failed to send registration message: {e}");
ConnectionState::Disconnected
}
}
}
}
}
/// Wait for the registration response.
fn get_registration_response(
&mut self,
mut socket: WebSocket<MaybeTlsStream<TcpStream>>,
) -> ConnectionState {
match socket.read() {
Ok(message) => {
// Check the message length.
let data = message.into_data();
if data.len() != 32 {
warn!("received malformed registration response");
return ConnectionState::Disconnected;
}
// Extract the client identifier and secret.
self.identifier = Some(Uuid::from_slice(&data[..16]).expect("invalid identifier"));
self.secret = Some(Uuid::from_slice(&data[16..]).expect("invalid secret"));
// Save the client identifier and secret.
fs::write(&self.data_path, data).ok();
// Activate the connection.
ConnectionState::Active(socket)
}
Err(tungstenite::Error::Io(ref e))
if e.kind() == std::io::ErrorKind::WouldBlock
|| e.kind() == std::io::ErrorKind::Interrupted =>
{
ConnectionState::Registering(socket)
}
Err(e) => {
warn!("failed to receive registration response: {e}");
ConnectionState::Disconnected
}
}
}
/// Update the [Connection] by receiving and sending messages.
fn update_connection(
&mut self,
mut socket: WebSocket<MaybeTlsStream<TcpStream>>,
messages: &mut LinkedList<(Uuid, Vec<u8>)>,
) -> ConnectionState {
// Unlock the sending list.
let Ok(mut to_send) = self.to_send.lock() else {
warn!("sending list closed");
return ConnectionState::Disconnected;
};
// Send messages from the send channel to the socket.
while let Some(message) = to_send.pop_front() {
match socket.send(message) {
Ok(()) => (),
Err(tungstenite::Error::Io(ref e))
if e.kind() == std::io::ErrorKind::WouldBlock
|| e.kind() == std::io::ErrorKind::Interrupted =>
{
break;
}
Err(e) => {
warn!("relay connection closed: {e}");
return ConnectionState::Disconnected;
}
}
}
// Receive messages from the socket and send them to the receive channel.
loop {
match socket.read() {
Ok(message) => {
// Check the message length.
let mut data = message.into_data();
if data.len() < 16 {
warn!("received malformed message with length: {}", data.len());
continue;
}
// Extract the sender ID.
let id_start = data.len() - 16;
let sender_id = Uuid::from_slice(&data[id_start..]).expect("invalid sender id");
data.truncate(id_start);
// Add the message to the message list.
messages.push_back((sender_id, data));
}
Err(tungstenite::Error::Io(ref e))
if e.kind() == std::io::ErrorKind::WouldBlock
|| e.kind() == std::io::ErrorKind::Interrupted =>
{
break;
}
Err(e) => {
warn!("relay connection closed: {e}");
return ConnectionState::Disconnected;
}
}
}
// Keep the connection connected.
ConnectionState::Active(socket)
}
/// Update the [Connection] and return the received messages.
///
/// This function will connect to the relay server if it's not already
/// connected, and will send and receive messages from the relay server
/// if it's connected.
///
/// This function will not block the current thread.
pub fn update(&mut self) -> LinkedList<(Uuid, Vec<u8>)> {
let mut messages = LinkedList::new();
self.state = match std::mem::replace(&mut self.state, ConnectionState::Disconnected) {
ConnectionState::Disconnected => self.create_stream(),
ConnectionState::Connecting(stream, start) => self.check_connection(stream, start),
ConnectionState::Connected(stream) => self.start_handshake(stream),
ConnectionState::Handshaking(handshake) => self.continue_handshake(handshake),
ConnectionState::Handshaked(socket) => self.start_authentication(socket),
ConnectionState::Registering(socket) => self.get_registration_response(socket),
ConnectionState::Active(socket) => self.update_connection(socket, &mut messages),
};
messages
}
}

View file

@ -1,22 +0,0 @@
[package]
name = "relay-server"
version = "0.2.0"
edition = "2021"
license = "GPL-3.0-or-later"
description = "A relay server for bevnet."
authors = ["Tipragot <contact@tipragot.fr>"]
keywords = ["bevy", "network", "game"]
categories = ["network-programming", "game-development"]
[lints]
workspace = true
[dependencies]
tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread"] }
axum = { version = "0.7.4", features = ["ws"] }
uuid = { version = "1.7.0", features = ["v4"] }
lazy_static = "1.4.0"
futures = "0.3.30"
dashmap = "5.5.3"
anyhow = "1.0.79"
sled = "0.34.7"

View file

@ -1,160 +0,0 @@
//! A relay server for bevnet.
use std::io;
use anyhow::bail;
use axum::extract::ws::{Message, WebSocket};
use axum::extract::WebSocketUpgrade;
use axum::routing::get;
use axum::Router;
use dashmap::DashMap;
use futures::{SinkExt, StreamExt};
use lazy_static::lazy_static;
use sled::transaction::{ConflictableTransactionResult, TransactionalTree};
use sled::{Db, IVec};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use uuid::Uuid;
lazy_static! {
static ref CLIENTS: DashMap<Uuid, Sender<Vec<u8>>> = DashMap::new();
static ref DB: Db = sled::open("/data/secrets.db").expect("unable to open the database");
}
#[tokio::main]
async fn main() {
let app = Router::new().route(
"/",
get(|ws: WebSocketUpgrade| async {
ws.on_upgrade(|socket| async {
handle(socket).await.ok();
})
}),
);
let listener = tokio::net::TcpListener::bind("0.0.0.0:80")
.await
.expect("failed to bind");
axum::serve(listener, app).await.expect("failed to serve");
}
/// Create a new client and add it to the database.
fn create_client(tx: &TransactionalTree) -> ConflictableTransactionResult<(Uuid, Uuid), io::Error> {
// Generates a new identifier for the client.
let client_id = loop {
// Generates a new random identifier.
let id = Uuid::new_v4();
// Check if the id isn't already in the database.
if tx.get(id.as_bytes())?.is_none() {
break id;
}
};
// Generate a random secret for the client.
let secret = Uuid::new_v4();
// Add the new client to the database.
tx.insert(client_id.as_bytes(), secret.as_bytes())?;
// Returns the client identifier and his secret.
Ok((client_id, secret))
}
/// Handle the websocket connection.
async fn handle(mut socket: WebSocket) -> anyhow::Result<()> {
// Receive the first request from the client.
let data = match socket.recv().await {
Some(Ok(message)) => message.into_data(),
_ => return Ok(()),
};
// If the request is empty it means that the client want a new identifier and
// secret, so we create them and send them to the client.
let client_id = if data.is_empty() {
// Generate the new client.
let (client_id, secret) = DB.transaction(create_client)?;
DB.flush_async().await?;
println!("{client_id} created");
// Send the data to the client.
let mut data = Vec::with_capacity(32);
data.extend_from_slice(client_id.as_bytes());
data.extend_from_slice(secret.as_bytes());
socket.send(Message::Binary(data)).await?;
// Returns the client identifier.
client_id
}
// Otherwise it means that the client want to reuse an identifier, so it will
// send it along with his secret to prove that he is the right client.
else {
// Check for the message length to detect malformed messages.
if data.len() != 32 {
bail!("malformed message");
}
// Get the client identifier and secret from the message.
let client_id = Uuid::from_slice(&data[..16])?;
let secret = Uuid::from_slice(&data[16..])?;
// Check with the database if the secret is correct.
if DB.get(client_id.as_bytes())? != Some(IVec::from(secret.as_bytes())) {
bail!("invalid secret")
}
// Returns the client identifier.
client_id
};
// Handle the client connection.
println!("{client_id} connected");
let (sender, receiver) = channel(128);
CLIENTS.insert(client_id, sender);
handle_client(socket, client_id, receiver).await.ok();
CLIENTS.remove(&client_id);
println!("{client_id} disconnected");
// Returns success.
Ok(())
}
/// Handle the client connection.
async fn handle_client(
socket: WebSocket,
client_id: Uuid,
mut receiver: Receiver<Vec<u8>>,
) -> anyhow::Result<()> {
// Split the socket into sender and receiver.
let (mut writer, mut reader) = socket.split();
// Handle sending messages to the client.
tokio::spawn(async move {
while let Some(message) = receiver.recv().await {
writer.send(Message::Binary(message)).await?;
}
Ok::<(), axum::Error>(())
});
// Handle messages from the client.
while let Some(Ok(message)) = reader.next().await {
// Get the target ID from the message.
let mut data = message.into_data();
if data.len() < 16 {
bail!("malformed message");
}
let id_start = data.len() - 16;
let target_id = Uuid::from_slice(&data[id_start..])?;
// Write the sender ID to the message.
for (i, &byte) in client_id.as_bytes().iter().enumerate() {
data[id_start + i] = byte;
}
// Send the message to the target client.
if let Some(sender) = CLIENTS.get(&target_id) {
sender.send(data).await?;
}
}
// Returns success.
Ok(())
}

10
crates/server/Cargo.toml Normal file
View file

@ -0,0 +1,10 @@
[package]
name = "server"
version = "0.1.0"
edition = "2021"
license = "GPL-3.0-or-later"
description = "The server of Border Wars"
repository = "https://git.tipragot.fr/corentin/border-wars.git"
# [lints]
# workspace = true

View file

@ -0,0 +1,3 @@
fn main() {
println!("Hello, world!");
}