From d8a81646a6494fe1ecfe28fa92010564e623ec94 Mon Sep 17 00:00:00 2001 From: CoCo_Sol Date: Mon, 8 Apr 2024 01:01:19 +0200 Subject: [PATCH] Wip: Add raft system Co-authored-by: Tipragot --- Cargo.lock | 62 ++++++++++++++++- crates/relay-client/src/lib.rs | 2 +- crates/relay-raft/Cargo.toml | 23 +++++++ crates/relay-raft/src/lib.rs | 117 +++++++++++++++++++++++++++++++++ crates/relay-raft/src/main.rs | 54 +++++++++++++++ 5 files changed, 254 insertions(+), 4 deletions(-) create mode 100644 crates/relay-raft/Cargo.toml create mode 100644 crates/relay-raft/src/lib.rs create mode 100644 crates/relay-raft/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 7621658..9d399e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1206,7 +1206,7 @@ dependencies = [ "bitflags 2.4.2", "cexpr", "clang-sys", - "itertools", + "itertools 0.12.1", "lazy_static", "lazycell", "proc-macro2", @@ -2572,6 +2572,15 @@ dependencies = [ "mach2", ] +[[package]] +name = "itertools" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "284f18f85651fe11e8a991b2adb42cb078325c996ed026d994719efcfca1d54b" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.12.1" @@ -3412,6 +3421,29 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0f0f7f43585c34e4fdd7497d746bc32e14458cf11c69341cc0587b1d825dde42" +[[package]] +name = "prost" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e6984d2f1a23009bd270b8bb56d0926810a3d483f59c987d77969e9d8e840b2" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "169a15f3008ecb5160cba7d37bcd690a7601b6d30cfb87a117d45e59d52af5d4" +dependencies = [ + "anyhow", + "itertools 0.9.0", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "quote" version = "1.0.35" @@ -3614,6 +3646,18 @@ dependencies = [ "uuid", ] +[[package]] +name = "relay-raft" +version = "0.2.0" +dependencies = [ + "prost", + "rand_chacha 0.3.1", + "rand_core 0.6.4", + "relay-client", + "simple-raft", + "uuid", +] + [[package]] name = "relay-server" version = "0.2.0" @@ -3891,6 +3935,18 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" +[[package]] +name = "simple-raft" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe321f515be05fa16fc544233b1ce28c3b44bdb61f51a1e781767ad5f6ff7b0c" +dependencies = [ + "bytes", + "log", + "prost", + "rand_core 0.6.4", +] + [[package]] name = "slab" version = "0.4.9" @@ -4428,9 +4484,9 @@ checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" [[package]] name = "uuid" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" +checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" dependencies = [ "getrandom 0.2.12", "serde", diff --git a/crates/relay-client/src/lib.rs b/crates/relay-client/src/lib.rs index b390d21..b052784 100644 --- a/crates/relay-client/src/lib.rs +++ b/crates/relay-client/src/lib.rs @@ -80,7 +80,7 @@ impl Connection { path.push(".relay-data"); // Check if the file exists. - match false { + match path.exists() { true => { // Read the file and parse the identifier and secret key. let contents = fs::read(&path)?; diff --git a/crates/relay-raft/Cargo.toml b/crates/relay-raft/Cargo.toml new file mode 100644 index 0000000..9e56dec --- /dev/null +++ b/crates/relay-raft/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "relay-raft" +version = "0.2.0" +edition = "2021" +license = "GPL-3.0-or-later" +description = "Raft implementation for the relay client." +authors = [ + "Tipragot ", + "CoCoSol ", +] +keywords = ["bevy", "network", "game", "raft"] +categories = ["network-programming", "game-development"] + +# [lints] +# workspace = true + +[dependencies] +relay-client = { path = "../relay-client" } +simple-raft = "0.2.0" +prost = { version = "0.7.0" } +rand_chacha = "0.3.1" +rand_core = "0.6.4" +uuid = "1.8.0" diff --git a/crates/relay-raft/src/lib.rs b/crates/relay-raft/src/lib.rs new file mode 100644 index 0000000..e47ff73 --- /dev/null +++ b/crates/relay-raft/src/lib.rs @@ -0,0 +1,117 @@ +//! Raft implementation for the relay client. + +use std::borrow::Cow; +use std::collections::{BTreeSet, LinkedList}; +use std::io; + +use prost::bytes::BufMut; +use prost::Message; +use rand_chacha::ChaChaRng; +use rand_core::SeedableRng; +use relay_client::Connection; +use simple_raft::log::mem::RaftLogMemory; +use simple_raft::message::{RaftMessage, RaftMessageDestination, SendableRaftMessage}; +use simple_raft::node::{RaftConfig, RaftNode}; +use uuid::Uuid; + +/// A Raft node. +pub struct RaftConnection { + connection: Connection, + node: RaftNode, + peers: BTreeSet, +} + +impl RaftConnection { + /// Creates a new Raft connection from a current connection. + /// Returns an error if the connection does not have an identifier. + pub fn from(connection: Connection, peers: BTreeSet) -> Result { + let Some(identifier) = connection.identifier() else { + return Err(connection); + }; + Ok(Self { + connection, + node: RaftNode::new( + identifier, + peers.clone(), + RaftLogMemory::new_unbounded(), + ChaChaRng::seed_from_u64(identifier.as_u64_pair().0), + RaftConfig { + election_timeout_ticks: 10, + heartbeat_interval_ticks: 1, + replication_chunk_size: usize::max_value(), + }, + ), + peers, + }) + } + + /// Envoit un message à tous les noeuds du cluster. + pub fn append<'a>(&mut self, message: impl Into>) { + let mut data = message.into().into_owned(); + if self.node.is_leader() { + let Ok(messages) = self.node.append(data) else { + panic!("OOOOOOOOH!"); + }; + } else { + data.push(1); + if let (Some(leader), _) = self.node.leader() { + self.connection.send(*leader, data); + } + } + } + + fn send_raft_messages( + connection: &Connection, + peers: &BTreeSet, + sendables: impl Iterator>, + ) { + for sendable in sendables { + let mut data: Vec = Vec::with_capacity(sendable.message.encoded_len() + 19); + sendable.message.encode(&mut data).ok(); + data.push(0); + + match sendable.dest { + RaftMessageDestination::Broadcast => { + for peer in peers + .iter() + .filter(|&peer| Some(*peer) != connection.identifier()) + { + connection.send(*peer, &data); + } + } + RaftMessageDestination::To(target) => connection.send(target, data), + } + } + } + + pub fn update(&mut self) -> LinkedList> { + let messages = self.node.timer_tick(); + Self::send_raft_messages(&self.connection, &self.peers, messages); + + let messages = self.connection.update(); + for (sender_id, mut message) in messages { + let message_type = message[message.len() - 1]; + message.truncate(message.len() - 1); + match message_type { + 0 => { + let message = RaftMessage::decode(&*message).unwrap(); + let messages = self.node.receive(message, sender_id); + Self::send_raft_messages(&self.connection, &self.peers, messages); + } + 1 if self.node.is_leader() => { + let Ok(messages) = self.node.append(message) else { + panic!("OOOOOOOOH!"); + }; + Self::send_raft_messages(&self.connection, &self.peers, messages); + } + _ => panic!("AAAAAH!"), + } + } + + let mut result = LinkedList::new(); + for message in self.node.take_committed() { + result.push_back(message.data.to_vec()); + } + result + } +} diff --git a/crates/relay-raft/src/main.rs b/crates/relay-raft/src/main.rs new file mode 100644 index 0000000..982fdd6 --- /dev/null +++ b/crates/relay-raft/src/main.rs @@ -0,0 +1,54 @@ +use std::collections::BTreeSet; +use std::sync::mpsc::channel; +use std::time::Duration; +use std::{io, thread}; + +use relay_client::Connection; +use relay_raft::RaftConnection; +use uuid::Uuid; + +fn main() { + let mut connection = Connection::new("relay.cocosol.fr").unwrap(); + while connection.identifier().is_none() { + connection.update(); + thread::sleep(Duration::from_millis(100)); + } + let identifier = connection.identifier().unwrap(); + println!("Identifier: {}", identifier); + + let mut user_input = String::new(); + io::stdin().read_line(&mut user_input).unwrap(); + let peers: BTreeSet = user_input + .trim() + .split(',') + .map(|s| Uuid::parse_str(s).unwrap()) + .collect(); + + let Ok(mut connection) = RaftConnection::from(connection, peers) else { + panic!("Failed to create raft connection"); + }; + + let (sender, receiver) = channel(); + thread::spawn(move || { + loop { + let mut message = String::new(); + io::stdin().read_line(&mut message).unwrap(); + sender + .send(message.replace('\n', "").replace('\r', "")) + .unwrap(); + } + }); + + loop { + if let Ok(message) = receiver.try_recv() { + connection.append(message.as_bytes()); + } + let messages = connection.update(); + + for message in messages { + println!("Received message: {}", String::from_utf8_lossy(&message)); + } + + thread::sleep(Duration::from_millis(100)); + } +}