pub mod client;
use std::{
collections::{HashMap, HashSet},
fs,
fs::OpenOptions,
io::{self, ErrorKind},
time::Duration,
};
use async_lock::RwLock;
use client::{BenchResults, BenchResultsDownloadConfig};
use csv::Writer;
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use hotshot_types::{
network::{BuilderType, NetworkConfig, PublicKeysFile},
traits::signature_key::{SignatureKey, StakeTableEntryType},
PeerConfig,
};
use libp2p_identity::{
ed25519::{Keypair as EdKeypair, SecretKey},
Keypair, PeerId,
};
use multiaddr::Multiaddr;
use surf_disco::Url;
use tide_disco::{
api::ApiError,
error::ServerError,
method::{ReadState, WriteState},
Api, App, RequestError,
};
use vbs::{
version::{StaticVersion, StaticVersionType},
BinarySerializer,
};
pub const ORCHESTRATOR_MAJOR_VERSION: u16 = 0;
pub const ORCHESTRATOR_MINOR_VERSION: u16 = 1;
pub type OrchestratorVersion =
StaticVersion<ORCHESTRATOR_MAJOR_VERSION, ORCHESTRATOR_MINOR_VERSION>;
pub const ORCHESTRATOR_VERSION: OrchestratorVersion = StaticVersion {};
#[must_use]
pub fn libp2p_generate_indexed_identity(seed: [u8; 32], index: u64) -> Keypair {
let mut hasher = blake3::Hasher::new();
hasher.update(&seed);
hasher.update(&index.to_le_bytes());
let new_seed = *hasher.finalize().as_bytes();
let sk_bytes = SecretKey::try_from_bytes(new_seed).unwrap();
<EdKeypair as From<SecretKey>>::from(sk_bytes).into()
}
#[derive(Default, Clone)]
#[allow(clippy::struct_excessive_bools)]
struct OrchestratorState<KEY: SignatureKey> {
latest_index: u16,
tmp_latest_index: u16,
config: NetworkConfig<KEY>,
peer_pub_ready: bool,
pub_posted: HashMap<Vec<u8>, (u64, bool)>,
start: bool,
nodes_connected: HashSet<PeerConfig<KEY>>,
bench_results: BenchResults,
nodes_post_results: u64,
manual_start_allowed: bool,
accepting_new_keys: bool,
builders: Vec<Url>,
fixed_stake_table: bool,
}
impl<KEY: SignatureKey + 'static> OrchestratorState<KEY> {
pub fn new(network_config: NetworkConfig<KEY>) -> Self {
let mut peer_pub_ready = false;
let mut fixed_stake_table = false;
if network_config.config.known_nodes_with_stake.is_empty() {
println!("No nodes were loaded from the config file. Nodes will be allowed to register dynamically.");
} else {
println!("Initializing orchestrator with fixed stake table.");
peer_pub_ready = true;
fixed_stake_table = true;
}
let builders = if matches!(network_config.builder, BuilderType::External) {
network_config.config.builder_urls.clone().into()
} else {
vec![]
};
OrchestratorState {
latest_index: 0,
tmp_latest_index: 0,
config: network_config,
peer_pub_ready,
pub_posted: HashMap::new(),
nodes_connected: HashSet::new(),
start: false,
bench_results: BenchResults::default(),
nodes_post_results: 0,
manual_start_allowed: true,
accepting_new_keys: true,
builders,
fixed_stake_table,
}
}
pub fn output_to_csv(&self) {
let output_csv = BenchResultsDownloadConfig {
commit_sha: self.config.commit_sha.clone(),
total_nodes: self.config.config.num_nodes_with_stake.into(),
da_committee_size: self.config.config.da_staked_committee_size,
fixed_leader_for_gpuvid: self.config.config.fixed_leader_for_gpuvid,
transactions_per_round: self.config.transactions_per_round,
transaction_size: self.bench_results.transaction_size_in_bytes,
rounds: self.config.rounds,
partial_results: self.bench_results.partial_results.clone(),
avg_latency_in_sec: self.bench_results.avg_latency_in_sec,
minimum_latency_in_sec: self.bench_results.minimum_latency_in_sec,
maximum_latency_in_sec: self.bench_results.maximum_latency_in_sec,
throughput_bytes_per_sec: self.bench_results.throughput_bytes_per_sec,
total_transactions_committed: self.bench_results.total_transactions_committed,
total_time_elapsed_in_sec: self.bench_results.total_time_elapsed_in_sec,
total_num_views: self.bench_results.total_num_views,
failed_num_views: self.bench_results.failed_num_views,
committee_type: self.bench_results.committee_type.clone(),
};
let results_csv_file = OpenOptions::new()
.create(true)
.append(true) .open("scripts/benchmarks_results/results.csv")
.unwrap();
let mut wtr = Writer::from_writer(results_csv_file);
let _ = wtr.serialize(output_csv);
let _ = wtr.flush();
println!("Results successfully saved in scripts/benchmarks_results/results.csv");
}
}
pub trait OrchestratorApi<KEY: SignatureKey> {
fn post_identity(
&mut self,
libp2p_address: Option<Multiaddr>,
libp2p_public_key: Option<PeerId>,
) -> Result<u16, ServerError>;
fn post_getconfig(&mut self, _node_index: u16) -> Result<NetworkConfig<KEY>, ServerError>;
fn get_tmp_node_index(&mut self) -> Result<u16, ServerError>;
fn register_public_key(
&mut self,
pubkey: &mut Vec<u8>,
is_da: bool,
libp2p_address: Option<Multiaddr>,
libp2p_public_key: Option<PeerId>,
) -> Result<(u64, bool), ServerError>;
fn peer_pub_ready(&self) -> Result<bool, ServerError>;
fn post_config_after_peer_collected(&mut self) -> Result<NetworkConfig<KEY>, ServerError>;
fn get_start(&self) -> Result<bool, ServerError>;
fn post_run_results(&mut self, metrics: BenchResults) -> Result<(), ServerError>;
fn post_ready(&mut self, peer_config: &PeerConfig<KEY>) -> Result<(), ServerError>;
fn post_manual_start(&mut self, password_bytes: Vec<u8>) -> Result<(), ServerError>;
fn post_builder(&mut self, builder: Url) -> Result<(), ServerError>;
fn get_builders(&self) -> Result<Vec<Url>, ServerError>;
}
impl<KEY> OrchestratorState<KEY>
where
KEY: serde::Serialize + Clone + SignatureKey + 'static,
{
fn register_unknown(
&mut self,
pubkey: &mut Vec<u8>,
da_requested: bool,
libp2p_address: Option<Multiaddr>,
libp2p_public_key: Option<PeerId>,
) -> Result<(u64, bool), ServerError> {
if let Some((node_index, is_da)) = self.pub_posted.get(pubkey) {
return Ok((*node_index, *is_da));
}
if !self.accepting_new_keys {
return Err(ServerError {
status: tide_disco::StatusCode::FORBIDDEN,
message:
"Network has been started manually, and is no longer registering new keys."
.to_string(),
});
}
let node_index = self.pub_posted.len() as u64;
let staked_pubkey = PeerConfig::<KEY>::from_bytes(pubkey).unwrap();
self.config
.config
.known_nodes_with_stake
.push(staked_pubkey.clone());
let mut added_to_da = false;
let da_full =
self.config.config.known_da_nodes.len() >= self.config.config.da_staked_committee_size;
#[allow(clippy::nonminimal_bool)]
if (self.config.indexed_da || (!self.config.indexed_da && da_requested)) && !da_full {
self.config.config.known_da_nodes.push(staked_pubkey);
added_to_da = true;
}
self.pub_posted
.insert(pubkey.clone(), (node_index, added_to_da));
if self.config.libp2p_config.clone().is_some() {
if let (Some(libp2p_public_key), Some(libp2p_address)) =
(libp2p_public_key, libp2p_address)
{
self.config
.libp2p_config
.as_mut()
.unwrap()
.bootstrap_nodes
.push((libp2p_public_key, libp2p_address));
}
}
tracing::error!("Posted public key for node_index {node_index}");
if node_index + 1 >= (self.config.config.num_nodes_with_stake.get() as u64) {
self.peer_pub_ready = true;
self.accepting_new_keys = false;
}
Ok((node_index, added_to_da))
}
fn register_from_list(
&mut self,
pubkey: &mut Vec<u8>,
da_requested: bool,
libp2p_address: Option<Multiaddr>,
libp2p_public_key: Option<PeerId>,
) -> Result<(u64, bool), ServerError> {
if let Some((node_index, is_da)) = self.pub_posted.get(pubkey) {
return Ok((*node_index, *is_da));
}
let staked_pubkey = PeerConfig::<KEY>::from_bytes(pubkey).unwrap();
let Some((node_index, node_config)) =
self.config.public_keys.iter().enumerate().find(|keys| {
keys.1.stake_table_key == staked_pubkey.stake_table_entry.public_key()
})
else {
return Err(ServerError {
status: tide_disco::StatusCode::FORBIDDEN,
message: "You are unauthorized to register with the orchestrator".to_string(),
});
};
if node_config.da != da_requested {
return Err(ServerError {
status: tide_disco::StatusCode::BAD_REQUEST,
message: format!("Mismatch in DA status in registration for node {}. DA requested: {}, expected: {}", node_index, da_requested, node_config.da),
});
}
let added_to_da = node_config.da;
self.pub_posted
.insert(pubkey.clone(), (node_index as u64, added_to_da));
if self.config.libp2p_config.clone().is_some() {
if let (Some(libp2p_public_key), Some(libp2p_address)) =
(libp2p_public_key, libp2p_address)
{
self.config
.libp2p_config
.as_mut()
.unwrap()
.bootstrap_nodes
.push((libp2p_public_key, libp2p_address));
}
}
tracing::error!("Node {node_index} has registered.");
Ok((node_index as u64, added_to_da))
}
}
impl<KEY> OrchestratorApi<KEY> for OrchestratorState<KEY>
where
KEY: serde::Serialize + Clone + SignatureKey + 'static,
{
fn post_identity(
&mut self,
libp2p_address: Option<Multiaddr>,
libp2p_public_key: Option<PeerId>,
) -> Result<u16, ServerError> {
let node_index = self.latest_index;
self.latest_index += 1;
if usize::from(node_index) >= self.config.config.num_nodes_with_stake.get() {
return Err(ServerError {
status: tide_disco::StatusCode::BAD_REQUEST,
message: "Network has reached capacity".to_string(),
});
}
if self.config.libp2p_config.clone().is_some() {
if let (Some(libp2p_public_key), Some(libp2p_address)) =
(libp2p_public_key, libp2p_address)
{
self.config
.libp2p_config
.as_mut()
.unwrap()
.bootstrap_nodes
.push((libp2p_public_key, libp2p_address));
}
}
Ok(node_index)
}
fn post_getconfig(&mut self, _node_index: u16) -> Result<NetworkConfig<KEY>, ServerError> {
Ok(self.config.clone())
}
fn get_tmp_node_index(&mut self) -> Result<u16, ServerError> {
let tmp_node_index = self.tmp_latest_index;
self.tmp_latest_index += 1;
if usize::from(tmp_node_index) >= self.config.config.num_nodes_with_stake.get() {
return Err(ServerError {
status: tide_disco::StatusCode::BAD_REQUEST,
message: "Node index getter for key pair generation has reached capacity"
.to_string(),
});
}
Ok(tmp_node_index)
}
#[allow(clippy::cast_possible_truncation)]
fn register_public_key(
&mut self,
pubkey: &mut Vec<u8>,
da_requested: bool,
libp2p_address: Option<Multiaddr>,
libp2p_public_key: Option<PeerId>,
) -> Result<(u64, bool), ServerError> {
if self.fixed_stake_table {
self.register_from_list(pubkey, da_requested, libp2p_address, libp2p_public_key)
} else {
self.register_unknown(pubkey, da_requested, libp2p_address, libp2p_public_key)
}
}
fn peer_pub_ready(&self) -> Result<bool, ServerError> {
if !self.peer_pub_ready {
return Err(ServerError {
status: tide_disco::StatusCode::BAD_REQUEST,
message: "Peer's public configs are not ready".to_string(),
});
}
Ok(self.peer_pub_ready)
}
fn post_config_after_peer_collected(&mut self) -> Result<NetworkConfig<KEY>, ServerError> {
if !self.peer_pub_ready {
return Err(ServerError {
status: tide_disco::StatusCode::BAD_REQUEST,
message: "Peer's public configs are not ready".to_string(),
});
}
Ok(self.config.clone())
}
fn get_start(&self) -> Result<bool, ServerError> {
if !self.start {
return Err(ServerError {
status: tide_disco::StatusCode::BAD_REQUEST,
message: "Network is not ready to start".to_string(),
});
}
Ok(self.start)
}
fn post_ready(&mut self, peer_config: &PeerConfig<KEY>) -> Result<(), ServerError> {
if !self
.config
.config
.known_nodes_with_stake
.contains(peer_config)
{
return Err(ServerError {
status: tide_disco::StatusCode::FORBIDDEN,
message: "You are unauthorized to register with the orchestrator".to_string(),
});
}
if self.nodes_connected.insert(peer_config.clone()) {
tracing::error!(
"Node {peer_config} connected. Total nodes connected: {}",
self.nodes_connected.len()
);
}
if self.nodes_connected.len() as u64 * self.config.config.start_threshold.1
>= (self.config.config.num_nodes_with_stake.get() as u64)
* self.config.config.start_threshold.0
{
self.accepting_new_keys = false;
self.manual_start_allowed = false;
self.start = true;
}
Ok(())
}
fn post_manual_start(&mut self, password_bytes: Vec<u8>) -> Result<(), ServerError> {
if !self.manual_start_allowed {
return Err(ServerError {
status: tide_disco::StatusCode::FORBIDDEN,
message: "Configs have already been distributed to nodes, and the network can no longer be started manually.".to_string(),
});
}
let password = String::from_utf8(password_bytes)
.expect("Failed to decode raw password as UTF-8 string.");
if self.config.manual_start_password != Some(password) {
return Err(ServerError {
status: tide_disco::StatusCode::FORBIDDEN,
message: "Incorrect password.".to_string(),
});
}
let registered_nodes_with_stake = self.config.config.known_nodes_with_stake.len();
let registered_da_nodes = self.config.config.known_da_nodes.len();
if registered_da_nodes > 1 {
self.config.config.num_nodes_with_stake =
std::num::NonZeroUsize::new(registered_nodes_with_stake)
.expect("Failed to convert to NonZeroUsize; this should be impossible.");
self.config.config.da_staked_committee_size = registered_da_nodes;
} else {
return Err(ServerError {
status: tide_disco::StatusCode::FORBIDDEN,
message: format!("We cannot manually start the network, because we only have {registered_nodes_with_stake} nodes with stake registered, with {registered_da_nodes} DA nodes.")
});
}
self.accepting_new_keys = false;
self.manual_start_allowed = false;
self.peer_pub_ready = true;
self.start = true;
Ok(())
}
fn post_run_results(&mut self, metrics: BenchResults) -> Result<(), ServerError> {
if metrics.total_transactions_committed != 0 {
if self.bench_results.total_transactions_committed == 0 {
self.bench_results = metrics;
} else {
let cur_metrics = self.bench_results.clone();
self.bench_results.avg_latency_in_sec = (metrics.avg_latency_in_sec
* metrics.num_latency
+ cur_metrics.avg_latency_in_sec * cur_metrics.num_latency)
/ (metrics.num_latency + cur_metrics.num_latency);
self.bench_results.num_latency += metrics.num_latency;
self.bench_results.minimum_latency_in_sec = metrics
.minimum_latency_in_sec
.min(cur_metrics.minimum_latency_in_sec);
self.bench_results.maximum_latency_in_sec = metrics
.maximum_latency_in_sec
.max(cur_metrics.maximum_latency_in_sec);
self.bench_results.throughput_bytes_per_sec = metrics
.throughput_bytes_per_sec
.max(cur_metrics.throughput_bytes_per_sec);
self.bench_results.total_transactions_committed = metrics
.total_transactions_committed
.max(cur_metrics.total_transactions_committed);
self.bench_results.total_time_elapsed_in_sec = metrics
.total_time_elapsed_in_sec
.max(cur_metrics.total_time_elapsed_in_sec);
self.bench_results.total_num_views =
metrics.total_num_views.min(cur_metrics.total_num_views);
self.bench_results.failed_num_views =
metrics.failed_num_views.max(cur_metrics.failed_num_views);
}
}
self.nodes_post_results += 1;
if self.bench_results.partial_results == "Unset" {
self.bench_results.partial_results = "One".to_string();
self.bench_results.printout();
self.output_to_csv();
}
if self.bench_results.partial_results == "One"
&& self.nodes_post_results >= (self.config.config.da_staked_committee_size as u64 / 2)
{
self.bench_results.partial_results = "HalfDA".to_string();
self.bench_results.printout();
self.output_to_csv();
}
if self.bench_results.partial_results == "HalfDA"
&& self.nodes_post_results >= (self.config.config.num_nodes_with_stake.get() as u64 / 2)
{
self.bench_results.partial_results = "Half".to_string();
self.bench_results.printout();
self.output_to_csv();
}
if self.bench_results.partial_results != "Full"
&& self.nodes_post_results >= (self.config.config.num_nodes_with_stake.get() as u64)
{
self.bench_results.partial_results = "Full".to_string();
self.bench_results.printout();
self.output_to_csv();
}
Ok(())
}
fn post_builder(&mut self, builder: Url) -> Result<(), ServerError> {
self.builders.push(builder);
Ok(())
}
fn get_builders(&self) -> Result<Vec<Url>, ServerError> {
if !matches!(self.config.builder, BuilderType::External)
&& self.builders.len() != self.config.config.da_staked_committee_size
{
return Err(ServerError {
status: tide_disco::StatusCode::NOT_FOUND,
message: "Not all builders are registered yet".to_string(),
});
}
Ok(self.builders.clone())
}
}
#[allow(clippy::too_many_lines)]
fn define_api<KEY, State, VER>() -> Result<Api<State, ServerError, VER>, ApiError>
where
State: 'static + Send + Sync + ReadState + WriteState,
<State as ReadState>::State: Send + Sync + OrchestratorApi<KEY>,
KEY: serde::Serialize + SignatureKey,
VER: StaticVersionType + 'static,
{
let api_toml = toml::from_str::<toml::Value>(include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/api.toml"
)))
.expect("API file is not valid toml");
let mut api = Api::<State, ServerError, VER>::new(api_toml)?;
api.post("post_identity", |req, state| {
async move {
let mut body_bytes = req.body_bytes();
body_bytes.drain(..12);
let Ok((libp2p_address, libp2p_public_key)) =
vbs::Serializer::<OrchestratorVersion>::deserialize(&body_bytes)
else {
return Err(ServerError {
status: tide_disco::StatusCode::BAD_REQUEST,
message: "Malformed body".to_string(),
});
};
state.post_identity(libp2p_address, libp2p_public_key)
}
.boxed()
})?
.post("post_getconfig", |req, state| {
async move {
let node_index = req.integer_param("node_index")?;
state.post_getconfig(node_index)
}
.boxed()
})?
.post("get_tmp_node_index", |_req, state| {
async move { state.get_tmp_node_index() }.boxed()
})?
.post("post_pubkey", |req, state| {
async move {
let is_da = req.boolean_param("is_da")?;
let mut body_bytes = req.body_bytes();
body_bytes.drain(..12);
let Ok((mut pubkey, libp2p_address, libp2p_public_key)) =
vbs::Serializer::<OrchestratorVersion>::deserialize(&body_bytes)
else {
return Err(ServerError {
status: tide_disco::StatusCode::BAD_REQUEST,
message: "Malformed body".to_string(),
});
};
state.register_public_key(&mut pubkey, is_da, libp2p_address, libp2p_public_key)
}
.boxed()
})?
.get("peer_pubconfig_ready", |_req, state| {
async move { state.peer_pub_ready() }.boxed()
})?
.post("post_config_after_peer_collected", |_req, state| {
async move { state.post_config_after_peer_collected() }.boxed()
})?
.post(
"post_ready",
|req, state: &mut <State as ReadState>::State| {
async move {
let mut body_bytes = req.body_bytes();
body_bytes.drain(..12);
let Some(pubkey) = PeerConfig::<KEY>::from_bytes(&body_bytes) else {
return Err(ServerError {
status: tide_disco::StatusCode::BAD_REQUEST,
message: "Malformed body".to_string(),
});
};
state.post_ready(&pubkey)
}
.boxed()
},
)?
.post(
"post_manual_start",
|req, state: &mut <State as ReadState>::State| {
async move {
let password = req.body_bytes();
state.post_manual_start(password)
}
.boxed()
},
)?
.get("get_start", |_req, state| {
async move { state.get_start() }.boxed()
})?
.post("post_results", |req, state| {
async move {
let metrics: Result<BenchResults, RequestError> = req.body_json();
state.post_run_results(metrics.unwrap())
}
.boxed()
})?
.post("post_builder", |req, state| {
async move {
let mut body_bytes = req.body_bytes();
body_bytes.drain(..12);
let Ok(urls) =
vbs::Serializer::<OrchestratorVersion>::deserialize::<Vec<Url>>(&body_bytes)
else {
return Err(ServerError {
status: tide_disco::StatusCode::BAD_REQUEST,
message: "Malformed body".to_string(),
});
};
let mut futures = urls
.into_iter()
.map(|url| async {
let client: surf_disco::Client<ServerError, OrchestratorVersion> =
surf_disco::client::Client::builder(url.clone()).build();
if client.connect(Some(Duration::from_secs(2))).await {
Some(url)
} else {
None
}
})
.collect::<FuturesUnordered<_>>()
.filter_map(futures::future::ready);
if let Some(url) = futures.next().await {
state.post_builder(url)
} else {
Err(ServerError {
status: tide_disco::StatusCode::BAD_REQUEST,
message: "No reachable addresses".to_string(),
})
}
}
.boxed()
})?
.get("get_builders", |_req, state| {
async move { state.get_builders() }.boxed()
})?;
Ok(api)
}
pub async fn run_orchestrator<KEY>(
mut network_config: NetworkConfig<KEY>,
url: Url,
) -> io::Result<()>
where
KEY: SignatureKey + 'static + serde::Serialize,
{
let env_password = std::env::var("ORCHESTRATOR_MANUAL_START_PASSWORD");
if env_password.is_ok() {
tracing::warn!("Took orchestrator manual start password from the environment variable: ORCHESTRATOR_MANUAL_START_PASSWORD={:?}", env_password);
network_config.manual_start_password = env_password.ok();
}
{
let env_public_keys = std::env::var("ORCHESTRATOR_PUBLIC_KEYS");
if let Ok(filepath) = env_public_keys {
#[allow(clippy::panic)]
let config_file_as_string: String = fs::read_to_string(filepath.clone())
.unwrap_or_else(|_| panic!("Could not read config file located at {filepath}"));
let file: PublicKeysFile<KEY> =
toml::from_str::<PublicKeysFile<KEY>>(&config_file_as_string)
.expect("Unable to convert config file to TOML");
network_config.public_keys = file.public_keys;
}
}
network_config.config.known_nodes_with_stake = network_config
.public_keys
.iter()
.map(|keys| PeerConfig {
stake_table_entry: keys.stake_table_key.stake_table_entry(keys.stake),
state_ver_key: keys.state_ver_key.clone(),
})
.collect();
network_config.config.known_da_nodes = network_config
.public_keys
.iter()
.filter(|keys| keys.da)
.map(|keys| PeerConfig {
stake_table_entry: keys.stake_table_key.stake_table_entry(keys.stake),
state_ver_key: keys.state_ver_key.clone(),
})
.collect();
let web_api =
define_api().map_err(|_e| io::Error::new(ErrorKind::Other, "Failed to define api"));
let state: RwLock<OrchestratorState<KEY>> = RwLock::new(OrchestratorState::new(network_config));
let mut app = App::<RwLock<OrchestratorState<KEY>>, ServerError>::with_state(state);
app.register_module::<ServerError, OrchestratorVersion>("api", web_api.unwrap())
.expect("Error registering api");
tracing::error!("listening on {:?}", url);
app.serve(url, ORCHESTRATOR_VERSION).await
}