use std::{
any::TypeId, collections::HashMap, num::NonZeroUsize, rc::Rc, sync::Arc, time::Duration,
};
use anyhow::{ensure, Result};
use async_lock::RwLock;
use hotshot::{
tasks::EventTransformerState,
traits::{NetworkReliability, NodeImplementation, TestableNodeImplementation},
types::SystemContextHandle,
HotShotInitializer, MarketplaceConfig, SystemContext, TwinsHandlerState,
};
use hotshot_example_types::{
auction_results_provider_types::TestAuctionResultsProvider, node_types::EpochsTestVersions,
state_types::TestInstanceState, storage_types::TestStorage, testable_delay::DelayConfig,
};
use hotshot_types::{
consensus::ConsensusMetricsValue,
traits::node_implementation::{NodeType, Versions},
HotShotConfig, ValidatorConfig,
};
use tide_disco::Url;
use vec1::Vec1;
use super::{
completion_task::{CompletionTaskDescription, TimeBasedCompletionTaskDescription},
overall_safety_task::OverallSafetyPropertiesDescription,
txn_task::TxnTaskDescription,
};
use crate::{
spinning_task::SpinningTaskDescription,
test_launcher::{Network, ResourceGenerators, TestLauncher},
test_task::TestTaskStateSeed,
view_sync_task::ViewSyncTaskDescription,
};
pub type TransactionValidator = Arc<dyn Fn(&Vec<(u64, u64)>) -> Result<()> + Send + Sync>;
#[derive(Clone, Debug, Copy)]
pub struct TimingData {
pub next_view_timeout: u64,
pub builder_timeout: Duration,
pub data_request_delay: Duration,
pub secondary_network_delay: Duration,
pub view_sync_timeout: Duration,
}
#[derive(Clone)]
pub struct TestDescription<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
pub num_nodes_with_stake: usize,
pub start_nodes: usize,
pub skip_late: bool,
pub num_bootstrap_nodes: usize,
pub da_staked_committee_size: usize,
pub overall_safety_properties: OverallSafetyPropertiesDescription<TYPES>,
pub spinning_properties: SpinningTaskDescription,
pub txn_description: TxnTaskDescription,
pub completion_task_description: CompletionTaskDescription,
pub timing_data: TimingData,
pub unreliable_network: Option<Box<dyn NetworkReliability>>,
pub view_sync_properties: ViewSyncTaskDescription,
pub builders: Vec1<BuilderDescription>,
pub fallback_builder: BuilderDescription,
pub solver: FakeSolverApiDescription,
pub behaviour: Rc<dyn Fn(u64) -> Behaviour<TYPES, I, V>>,
pub async_delay_config: DelayConfig,
pub upgrade_view: Option<u64>,
pub start_solver: bool,
pub validate_transactions: TransactionValidator,
}
pub fn nonempty_block_threshold(threshold: (u64, u64)) -> TransactionValidator {
Arc::new(move |transactions| {
if matches!(threshold, (0, _)) {
return Ok(());
}
let blocks: Vec<_> = transactions.iter().filter(|(view, _)| *view != 0).collect();
let num_blocks = blocks.len() as u64;
let mut num_nonempty_blocks = 0;
ensure!(num_blocks > 0, "Failed to commit any non-genesis blocks");
for (_, num_transactions) in blocks {
if *num_transactions > 0 {
num_nonempty_blocks += 1;
}
}
ensure!(
num_nonempty_blocks * threshold.1 >= threshold.0 * num_blocks,
"Failed to meet nonempty block threshold of {}/{}; got {num_nonempty_blocks} nonempty blocks out of a total of {num_blocks}", threshold.0, threshold.1
);
Ok(())
})
}
pub fn nonempty_block_limit(limit: (u64, u64)) -> TransactionValidator {
Arc::new(move |transactions| {
if matches!(limit, (_, 0)) {
return Ok(());
}
let blocks: Vec<_> = transactions.iter().filter(|(view, _)| *view != 0).collect();
let num_blocks = blocks.len() as u64;
let mut num_nonempty_blocks = 0;
ensure!(num_blocks > 0, "Failed to commit any non-genesis blocks");
for (_, num_transactions) in blocks {
if *num_transactions > 0 {
num_nonempty_blocks += 1;
}
}
ensure!(
num_nonempty_blocks * limit.1 <= limit.0 * num_blocks,
"Exceeded nonempty block limit of {}/{}; got {num_nonempty_blocks} nonempty blocks out of a total of {num_blocks}", limit.0, limit.1
);
Ok(())
})
}
#[derive(Debug)]
pub enum Behaviour<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
ByzantineTwins(Box<dyn TwinsHandlerState<TYPES, I, V>>),
Byzantine(Box<dyn EventTransformerState<TYPES, I, V>>),
Standard,
}
pub async fn create_test_handle<
TYPES: NodeType<InstanceState = TestInstanceState>,
I: NodeImplementation<TYPES>,
V: Versions,
>(
metadata: TestDescription<TYPES, I, V>,
node_id: u64,
network: Network<TYPES, I>,
memberships: Arc<RwLock<TYPES::Membership>>,
config: HotShotConfig<TYPES::SignatureKey>,
storage: I::Storage,
marketplace_config: MarketplaceConfig<TYPES, I>,
) -> SystemContextHandle<TYPES, I, V> {
let initializer = HotShotInitializer::<TYPES>::from_genesis::<V>(TestInstanceState::new(
metadata.async_delay_config,
))
.await
.unwrap();
let is_da = node_id < config.da_staked_committee_size as u64;
let validator_config: ValidatorConfig<TYPES::SignatureKey> =
ValidatorConfig::generated_from_seed_indexed([0u8; 32], node_id, 1, is_da);
let private_key = validator_config.private_key.clone();
let public_key = validator_config.public_key.clone();
let behaviour = (metadata.behaviour)(node_id);
match behaviour {
Behaviour::ByzantineTwins(state) => {
let state = Box::leak(state);
let (left_handle, _right_handle) = state
.spawn_twin_handles(
public_key,
private_key,
node_id,
config,
memberships,
network,
initializer,
ConsensusMetricsValue::default(),
storage,
marketplace_config,
)
.await;
left_handle
}
Behaviour::Byzantine(state) => {
let state = Box::leak(state);
state
.spawn_handle(
public_key,
private_key,
node_id,
config,
memberships,
network,
initializer,
ConsensusMetricsValue::default(),
storage,
marketplace_config,
)
.await
}
Behaviour::Standard => {
let hotshot = SystemContext::<TYPES, I, V>::new(
public_key,
private_key,
node_id,
config,
memberships,
network,
initializer,
ConsensusMetricsValue::default(),
storage,
marketplace_config,
)
.await;
hotshot.run_tasks().await
}
}
}
#[derive(Clone, Debug)]
pub enum BuilderChange {
Up,
Down,
FailClaims(bool),
}
#[derive(Clone, Debug, Default)]
pub struct BuilderDescription {
pub changes: HashMap<u64, BuilderChange>,
}
#[derive(Clone, Debug)]
pub struct FakeSolverApiDescription {
pub error_pct: f32,
}
impl Default for TimingData {
fn default() -> Self {
Self {
next_view_timeout: 4000,
builder_timeout: Duration::from_millis(500),
data_request_delay: Duration::from_millis(200),
secondary_network_delay: Duration::from_millis(1000),
view_sync_timeout: Duration::from_millis(2000),
}
}
}
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TestDescription<TYPES, I, V> {
#[must_use]
#[allow(clippy::redundant_field_names)]
pub fn default_stress() -> Self {
let num_nodes_with_stake = 100;
Self {
num_bootstrap_nodes: num_nodes_with_stake,
num_nodes_with_stake,
start_nodes: num_nodes_with_stake,
overall_safety_properties: OverallSafetyPropertiesDescription::<TYPES> {
num_successful_views: 50,
check_leaf: true,
check_block: true,
num_failed_views: 15,
transaction_threshold: 0,
threshold_calculator: Arc::new(|_active, total| (2 * total / 3 + 1)),
expected_views_to_fail: HashMap::new(),
},
timing_data: TimingData {
next_view_timeout: 2000,
..TimingData::default()
},
view_sync_properties: ViewSyncTaskDescription::Threshold(0, num_nodes_with_stake),
..Self::default()
}
}
#[must_use]
#[allow(clippy::redundant_field_names)]
pub fn default_multiple_rounds() -> Self {
let num_nodes_with_stake = 10;
TestDescription::<TYPES, I, V> {
num_bootstrap_nodes: num_nodes_with_stake,
num_nodes_with_stake,
start_nodes: num_nodes_with_stake,
overall_safety_properties: OverallSafetyPropertiesDescription::<TYPES> {
num_successful_views: 20,
check_leaf: true,
check_block: true,
num_failed_views: 8,
transaction_threshold: 0,
threshold_calculator: Arc::new(|_active, total| (2 * total / 3 + 1)),
expected_views_to_fail: HashMap::new(),
},
timing_data: TimingData {
..TimingData::default()
},
view_sync_properties: ViewSyncTaskDescription::Threshold(0, num_nodes_with_stake),
..TestDescription::<TYPES, I, V>::default()
}
}
#[must_use]
#[allow(clippy::redundant_field_names)]
pub fn default_more_nodes() -> Self {
let num_nodes_with_stake = 20;
Self {
num_nodes_with_stake,
start_nodes: num_nodes_with_stake,
num_bootstrap_nodes: num_nodes_with_stake,
da_staked_committee_size: 14,
completion_task_description: CompletionTaskDescription::TimeBasedCompletionTaskBuilder(
TimeBasedCompletionTaskDescription {
duration: Duration::new(340, 0),
},
),
overall_safety_properties: OverallSafetyPropertiesDescription {
..Default::default()
},
timing_data: TimingData {
next_view_timeout: 5000,
..TimingData::default()
},
view_sync_properties: ViewSyncTaskDescription::Threshold(0, num_nodes_with_stake),
..Self::default()
}
}
}
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> Default
for TestDescription<TYPES, I, V>
{
#[allow(clippy::redundant_field_names)]
fn default() -> Self {
let num_nodes_with_stake = 7;
Self {
timing_data: TimingData::default(),
num_nodes_with_stake,
start_nodes: num_nodes_with_stake,
skip_late: false,
num_bootstrap_nodes: num_nodes_with_stake,
da_staked_committee_size: num_nodes_with_stake,
spinning_properties: SpinningTaskDescription {
node_changes: vec![],
},
overall_safety_properties: OverallSafetyPropertiesDescription::default(),
txn_description: TxnTaskDescription::RoundRobinTimeBased(Duration::from_millis(100)),
completion_task_description: CompletionTaskDescription::TimeBasedCompletionTaskBuilder(
TimeBasedCompletionTaskDescription {
duration: Duration::from_millis(10000),
},
),
unreliable_network: None,
view_sync_properties: ViewSyncTaskDescription::Threshold(0, num_nodes_with_stake),
builders: vec1::vec1![BuilderDescription::default(), BuilderDescription::default(),],
fallback_builder: BuilderDescription::default(),
solver: FakeSolverApiDescription {
error_pct: 0.1,
},
behaviour: Rc::new(|_| Behaviour::Standard),
async_delay_config: DelayConfig::default(),
upgrade_view: None,
start_solver: true,
validate_transactions: Arc::new(|_| Ok(())),
}
}
}
impl<
TYPES: NodeType<InstanceState = TestInstanceState>,
I: TestableNodeImplementation<TYPES>,
V: Versions,
> TestDescription<TYPES, I, V>
where
I: NodeImplementation<TYPES, AuctionResultsProvider = TestAuctionResultsProvider<TYPES>>,
{
pub fn gen_launcher(self, node_id: u64) -> TestLauncher<TYPES, I, V> {
self.gen_launcher_with_tasks(node_id, vec![])
}
#[must_use]
pub fn gen_launcher_with_tasks(
self,
node_id: u64,
additional_test_tasks: Vec<Box<dyn TestTaskStateSeed<TYPES, I, V>>>,
) -> TestLauncher<TYPES, I, V> {
let TestDescription {
num_nodes_with_stake,
num_bootstrap_nodes,
timing_data,
da_staked_committee_size,
unreliable_network,
..
} = self.clone();
let mut known_da_nodes = Vec::new();
let known_nodes_with_stake = (0..num_nodes_with_stake)
.map(|node_id_| {
let cur_validator_config: ValidatorConfig<TYPES::SignatureKey> =
ValidatorConfig::generated_from_seed_indexed(
[0u8; 32],
node_id_ as u64,
1,
node_id_ < da_staked_committee_size,
);
if node_id_ < da_staked_committee_size {
known_da_nodes.push(cur_validator_config.public_config());
}
cur_validator_config.public_config()
})
.collect();
let validator_config = ValidatorConfig::<TYPES::SignatureKey>::generated_from_seed_indexed(
[0u8; 32],
node_id,
1,
0 < da_staked_committee_size,
);
let epoch_height = if TypeId::of::<V>() == TypeId::of::<EpochsTestVersions>() {
10
} else {
0
};
let config = HotShotConfig {
start_threshold: (1, 1),
num_nodes_with_stake: NonZeroUsize::new(num_nodes_with_stake).unwrap(),
known_da_nodes,
num_bootstrap: num_bootstrap_nodes,
known_nodes_with_stake,
da_staked_committee_size,
fixed_leader_for_gpuvid: 1,
next_view_timeout: 500,
view_sync_timeout: Duration::from_millis(250),
builder_timeout: Duration::from_millis(1000),
data_request_delay: Duration::from_millis(200),
builder_urls: vec1::vec1![Url::parse("http://localhost:9999").expect("Valid URL")],
start_proposing_view: u64::MAX,
stop_proposing_view: 0,
start_voting_view: u64::MAX,
stop_voting_view: 0,
start_proposing_time: u64::MAX,
stop_proposing_time: 0,
start_voting_time: u64::MAX,
stop_voting_time: 0,
epoch_height,
};
let TimingData {
next_view_timeout,
builder_timeout,
data_request_delay,
secondary_network_delay,
view_sync_timeout,
} = timing_data;
let mod_config =
|a: &mut HotShotConfig<TYPES::SignatureKey>| {
a.next_view_timeout = next_view_timeout;
a.builder_timeout = builder_timeout;
a.data_request_delay = data_request_delay;
a.view_sync_timeout = view_sync_timeout;
};
let metadata = self.clone();
TestLauncher {
resource_generator: ResourceGenerators {
channel_generator: <I as TestableNodeImplementation<TYPES>>::gen_networks(
num_nodes_with_stake,
num_bootstrap_nodes,
da_staked_committee_size,
unreliable_network,
secondary_network_delay,
),
storage: Box::new(move |_| {
let mut storage = TestStorage::<TYPES>::default();
storage.delay_config = metadata.async_delay_config.clone();
storage
}),
config,
validator_config,
marketplace_config: Box::new(|_| MarketplaceConfig::<TYPES, I> {
auction_results_provider: TestAuctionResultsProvider::<TYPES>::default().into(),
fallback_builder_url: Url::parse("http://localhost:9999").unwrap(),
}),
},
metadata: self,
additional_test_tasks,
}
.modify_default_config(mod_config)
}
}