diff --git a/crates/relay-raft/src/lib.rs b/crates/relay-raft/src/lib.rs index e47ff73..8e4e692 100644 --- a/crates/relay-raft/src/lib.rs +++ b/crates/relay-raft/src/lib.rs @@ -2,9 +2,7 @@ use std::borrow::Cow; use std::collections::{BTreeSet, LinkedList}; -use std::io; -use prost::bytes::BufMut; use prost::Message; use rand_chacha::ChaChaRng; use rand_core::SeedableRng; @@ -16,33 +14,62 @@ use uuid::Uuid; /// A Raft node. pub struct RaftConnection { + /// The connection to the relay server. connection: Connection, + + /// The Raft node. node: RaftNode, + + /// The peers of the Raft cluster. peers: BTreeSet, } +pub struct RaftConnectionConfig { + /// The minimum number of timer ticks between leadership elections. + pub election_timeout_ticks: u32, + + /// The number of timer ticks between sending heartbeats to peers. + pub heartbeat_interval_ticks: u32, + + /// The maximum number of bytes to replicate to a peer at a time. + pub replication_chunk_size: usize, +} + +impl From for RaftConfig { + fn from(val: RaftConnectionConfig) -> Self { + RaftConfig { + election_timeout_ticks: val.election_timeout_ticks, + heartbeat_interval_ticks: val.heartbeat_interval_ticks, + replication_chunk_size: val.replication_chunk_size, + } + } +} + impl RaftConnection { /// Creates a new Raft connection from a current connection. /// Returns an error if the connection does not have an identifier. - pub fn from(connection: Connection, peers: BTreeSet) -> Result { + pub fn from( + connection: Connection, + peers: BTreeSet, + raft_config: RaftConnectionConfig, + ) -> Result { let Some(identifier) = connection.identifier() else { return Err(connection); }; - Ok(Self { + + let raft_node = Self { connection, node: RaftNode::new( identifier, peers.clone(), RaftLogMemory::new_unbounded(), ChaChaRng::seed_from_u64(identifier.as_u64_pair().0), - RaftConfig { - election_timeout_ticks: 10, - heartbeat_interval_ticks: 1, - replication_chunk_size: usize::max_value(), - }, + raft_config.into(), ), peers, - }) + }; + + Ok(raft_node) } /// Envoit un message à tous les noeuds du cluster. @@ -50,8 +77,9 @@ impl RaftConnection { let mut data = message.into().into_owned(); if self.node.is_leader() { let Ok(messages) = self.node.append(data) else { - panic!("OOOOOOOOH!"); + panic!("Message just cancelled."); }; + Self::send_raft_messages(&self.connection, &self.peers, messages); } else { data.push(1); if let (Some(leader), _) = self.node.leader() { @@ -70,24 +98,28 @@ impl RaftConnection { sendable.message.encode(&mut data).ok(); data.push(0); - match sendable.dest { - RaftMessageDestination::Broadcast => { - for peer in peers - .iter() - .filter(|&peer| Some(*peer) != connection.identifier()) - { - connection.send(*peer, &data); - } - } - RaftMessageDestination::To(target) => connection.send(target, data), + // Send the message to the target node. + if let RaftMessageDestination::To(target) = sendable.dest { + connection.send(target, data); + return; } + + // Broadcast the message to all peers. + peers + .iter() + .filter(|&peer| Some(*peer) != connection.identifier()) + .for_each(|peer| { + connection.send(*peer, &data); + }); } } pub fn update(&mut self) -> LinkedList> { + // Update the Raft node. let messages = self.node.timer_tick(); Self::send_raft_messages(&self.connection, &self.peers, messages); + // Update the connection. let messages = self.connection.update(); for (sender_id, mut message) in messages { let message_type = message[message.len() - 1]; @@ -100,18 +132,17 @@ impl RaftConnection { } 1 if self.node.is_leader() => { let Ok(messages) = self.node.append(message) else { - panic!("OOOOOOOOH!"); + panic!("Message just cancelled."); }; Self::send_raft_messages(&self.connection, &self.peers, messages); } - _ => panic!("AAAAAH!"), + _ => (), } } - let mut result = LinkedList::new(); - for message in self.node.take_committed() { - result.push_back(message.data.to_vec()); - } - result + self.node + .take_committed() + .map(|v| v.data.to_vec()) + .collect() } } diff --git a/crates/relay-raft/src/main.rs b/crates/relay-raft/src/main.rs index 982fdd6..53d2ceb 100644 --- a/crates/relay-raft/src/main.rs +++ b/crates/relay-raft/src/main.rs @@ -4,7 +4,7 @@ use std::time::Duration; use std::{io, thread}; use relay_client::Connection; -use relay_raft::RaftConnection; +use relay_raft::{RaftConnection, RaftConnectionConfig}; use uuid::Uuid; fn main() { @@ -24,7 +24,15 @@ fn main() { .map(|s| Uuid::parse_str(s).unwrap()) .collect(); - let Ok(mut connection) = RaftConnection::from(connection, peers) else { + let Ok(mut connection) = RaftConnection::from( + connection, + peers, + RaftConnectionConfig { + election_timeout_ticks: 10, + heartbeat_interval_ticks: 1, + replication_chunk_size: usize::max_value(), + }, + ) else { panic!("Failed to create raft connection"); }; @@ -33,9 +41,7 @@ fn main() { loop { let mut message = String::new(); io::stdin().read_line(&mut message).unwrap(); - sender - .send(message.replace('\n', "").replace('\r', "")) - .unwrap(); + sender.send(message.replace(['\n', '\r'], "")).unwrap(); } });