Wip: Add raft system
Some checks failed
Rust Checks / checks (push) Failing after 20m27s

Co-authored-by: Tipragot <Tipragot@users.noreply.github.com>
This commit is contained in:
CoCo_Sol 2024-04-08 01:01:19 +02:00
parent 106cabe951
commit d8a81646a6
5 changed files with 254 additions and 4 deletions

62
Cargo.lock generated
View file

@ -1206,7 +1206,7 @@ dependencies = [
"bitflags 2.4.2",
"cexpr",
"clang-sys",
"itertools",
"itertools 0.12.1",
"lazy_static",
"lazycell",
"proc-macro2",
@ -2572,6 +2572,15 @@ dependencies = [
"mach2",
]
[[package]]
name = "itertools"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "284f18f85651fe11e8a991b2adb42cb078325c996ed026d994719efcfca1d54b"
dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.12.1"
@ -3412,6 +3421,29 @@ version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f0f7f43585c34e4fdd7497d746bc32e14458cf11c69341cc0587b1d825dde42"
[[package]]
name = "prost"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e6984d2f1a23009bd270b8bb56d0926810a3d483f59c987d77969e9d8e840b2"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-derive"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "169a15f3008ecb5160cba7d37bcd690a7601b6d30cfb87a117d45e59d52af5d4"
dependencies = [
"anyhow",
"itertools 0.9.0",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "quote"
version = "1.0.35"
@ -3614,6 +3646,18 @@ dependencies = [
"uuid",
]
[[package]]
name = "relay-raft"
version = "0.2.0"
dependencies = [
"prost",
"rand_chacha 0.3.1",
"rand_core 0.6.4",
"relay-client",
"simple-raft",
"uuid",
]
[[package]]
name = "relay-server"
version = "0.2.0"
@ -3891,6 +3935,18 @@ version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe"
[[package]]
name = "simple-raft"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe321f515be05fa16fc544233b1ce28c3b44bdb61f51a1e781767ad5f6ff7b0c"
dependencies = [
"bytes",
"log",
"prost",
"rand_core 0.6.4",
]
[[package]]
name = "slab"
version = "0.4.9"
@ -4428,9 +4484,9 @@ checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "uuid"
version = "1.7.0"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a"
checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0"
dependencies = [
"getrandom 0.2.12",
"serde",

View file

@ -80,7 +80,7 @@ impl Connection {
path.push(".relay-data");
// Check if the file exists.
match false {
match path.exists() {
true => {
// Read the file and parse the identifier and secret key.
let contents = fs::read(&path)?;

View file

@ -0,0 +1,23 @@
[package]
name = "relay-raft"
version = "0.2.0"
edition = "2021"
license = "GPL-3.0-or-later"
description = "Raft implementation for the relay client."
authors = [
"Tipragot <contact@tipragot.fr>",
"CoCoSol <solois.corentin@proton.me>",
]
keywords = ["bevy", "network", "game", "raft"]
categories = ["network-programming", "game-development"]
# [lints]
# workspace = true
[dependencies]
relay-client = { path = "../relay-client" }
simple-raft = "0.2.0"
prost = { version = "0.7.0" }
rand_chacha = "0.3.1"
rand_core = "0.6.4"
uuid = "1.8.0"

View file

@ -0,0 +1,117 @@
//! Raft implementation for the relay client.
use std::borrow::Cow;
use std::collections::{BTreeSet, LinkedList};
use std::io;
use prost::bytes::BufMut;
use prost::Message;
use rand_chacha::ChaChaRng;
use rand_core::SeedableRng;
use relay_client::Connection;
use simple_raft::log::mem::RaftLogMemory;
use simple_raft::message::{RaftMessage, RaftMessageDestination, SendableRaftMessage};
use simple_raft::node::{RaftConfig, RaftNode};
use uuid::Uuid;
/// A Raft node.
pub struct RaftConnection {
connection: Connection,
node: RaftNode<RaftLogMemory, ChaChaRng, Uuid>,
peers: BTreeSet<Uuid>,
}
impl RaftConnection {
/// Creates a new Raft connection from a current connection.
/// Returns an error if the connection does not have an identifier.
pub fn from(connection: Connection, peers: BTreeSet<Uuid>) -> Result<Self, Connection> {
let Some(identifier) = connection.identifier() else {
return Err(connection);
};
Ok(Self {
connection,
node: RaftNode::new(
identifier,
peers.clone(),
RaftLogMemory::new_unbounded(),
ChaChaRng::seed_from_u64(identifier.as_u64_pair().0),
RaftConfig {
election_timeout_ticks: 10,
heartbeat_interval_ticks: 1,
replication_chunk_size: usize::max_value(),
},
),
peers,
})
}
/// Envoit un message à tous les noeuds du cluster.
pub fn append<'a>(&mut self, message: impl Into<Cow<'a, [u8]>>) {
let mut data = message.into().into_owned();
if self.node.is_leader() {
let Ok(messages) = self.node.append(data) else {
panic!("OOOOOOOOH!");
};
} else {
data.push(1);
if let (Some(leader), _) = self.node.leader() {
self.connection.send(*leader, data);
}
}
}
fn send_raft_messages(
connection: &Connection,
peers: &BTreeSet<Uuid>,
sendables: impl Iterator<Item = SendableRaftMessage<Uuid>>,
) {
for sendable in sendables {
let mut data: Vec<u8> = Vec::with_capacity(sendable.message.encoded_len() + 19);
sendable.message.encode(&mut data).ok();
data.push(0);
match sendable.dest {
RaftMessageDestination::Broadcast => {
for peer in peers
.iter()
.filter(|&peer| Some(*peer) != connection.identifier())
{
connection.send(*peer, &data);
}
}
RaftMessageDestination::To(target) => connection.send(target, data),
}
}
}
pub fn update(&mut self) -> LinkedList<Vec<u8>> {
let messages = self.node.timer_tick();
Self::send_raft_messages(&self.connection, &self.peers, messages);
let messages = self.connection.update();
for (sender_id, mut message) in messages {
let message_type = message[message.len() - 1];
message.truncate(message.len() - 1);
match message_type {
0 => {
let message = RaftMessage::decode(&*message).unwrap();
let messages = self.node.receive(message, sender_id);
Self::send_raft_messages(&self.connection, &self.peers, messages);
}
1 if self.node.is_leader() => {
let Ok(messages) = self.node.append(message) else {
panic!("OOOOOOOOH!");
};
Self::send_raft_messages(&self.connection, &self.peers, messages);
}
_ => panic!("AAAAAH!"),
}
}
let mut result = LinkedList::new();
for message in self.node.take_committed() {
result.push_back(message.data.to_vec());
}
result
}
}

View file

@ -0,0 +1,54 @@
use std::collections::BTreeSet;
use std::sync::mpsc::channel;
use std::time::Duration;
use std::{io, thread};
use relay_client::Connection;
use relay_raft::RaftConnection;
use uuid::Uuid;
fn main() {
let mut connection = Connection::new("relay.cocosol.fr").unwrap();
while connection.identifier().is_none() {
connection.update();
thread::sleep(Duration::from_millis(100));
}
let identifier = connection.identifier().unwrap();
println!("Identifier: {}", identifier);
let mut user_input = String::new();
io::stdin().read_line(&mut user_input).unwrap();
let peers: BTreeSet<Uuid> = user_input
.trim()
.split(',')
.map(|s| Uuid::parse_str(s).unwrap())
.collect();
let Ok(mut connection) = RaftConnection::from(connection, peers) else {
panic!("Failed to create raft connection");
};
let (sender, receiver) = channel();
thread::spawn(move || {
loop {
let mut message = String::new();
io::stdin().read_line(&mut message).unwrap();
sender
.send(message.replace('\n', "").replace('\r', ""))
.unwrap();
}
});
loop {
if let Ok(message) = receiver.try_recv() {
connection.append(message.as_bytes());
}
let messages = connection.update();
for message in messages {
println!("Received message: {}", String::from_utf8_lossy(&message));
}
thread::sleep(Duration::from_millis(100));
}
}