#[cfg(feature = "docs")]
pub mod documentation;
use committable::Committable;
use futures::future::{select, Either};
use hotshot_types::{
message::UpgradeLock,
traits::{network::BroadcastDelay, node_implementation::Versions},
};
use rand::Rng;
use url::Url;
pub mod traits;
pub mod types;
pub mod tasks;
pub mod helpers;
use std::{
collections::{BTreeMap, HashMap},
num::NonZeroUsize,
sync::Arc,
time::Duration,
};
use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
use async_lock::RwLock;
use async_trait::async_trait;
use futures::join;
use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry};
use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
pub use hotshot_types::error::HotShotError;
use hotshot_types::{
consensus::{Consensus, ConsensusMetricsValue, OuterConsensus, View, ViewInner},
constants::{EVENT_CHANNEL_SIZE, EXTERNAL_EVENT_CHANNEL_SIZE},
data::{Leaf2, QuorumProposal, QuorumProposal2},
event::{EventType, LeafInfo},
message::{convert_proposal, DataMessage, Message, MessageKind, Proposal},
simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate},
traits::{
consensus_api::ConsensusApi,
election::Membership,
network::ConnectedNetwork,
node_implementation::{ConsensusTime, NodeType},
signature_key::SignatureKey,
states::ValidatedState,
storage::Storage,
EncodeBytes,
},
utils::epoch_from_block_number,
HotShotConfig,
};
pub use rand;
use tokio::{spawn, time::sleep};
use tracing::{debug, instrument, trace};
use crate::{
tasks::{add_consensus_tasks, add_network_tasks},
traits::NodeImplementation,
types::{Event, SystemContextHandle},
};
pub const H_512: usize = 64;
pub const H_256: usize = 32;
#[derive(Clone)]
pub struct MarketplaceConfig<TYPES: NodeType, I: NodeImplementation<TYPES>> {
pub auction_results_provider: Arc<I::AuctionResultsProvider>,
pub fallback_builder_url: Url,
}
pub struct SystemContext<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
public_key: TYPES::SignatureKey,
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
pub config: HotShotConfig<TYPES::SignatureKey>,
pub network: Arc<I::Network>,
pub memberships: Arc<RwLock<TYPES::Membership>>,
metrics: Arc<ConsensusMetricsValue>,
consensus: OuterConsensus<TYPES>,
instance_state: Arc<TYPES::InstanceState>,
start_view: TYPES::View,
start_epoch: TYPES::Epoch,
output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
pub(crate) external_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
anchored_leaf: Leaf2<TYPES>,
#[allow(clippy::type_complexity)]
internal_event_stream: (
Sender<Arc<HotShotEvent<TYPES>>>,
InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
),
pub id: u64,
pub storage: Arc<RwLock<I::Storage>>,
pub upgrade_lock: UpgradeLock<TYPES, V>,
pub marketplace_config: MarketplaceConfig<TYPES, I>,
}
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> Clone
for SystemContext<TYPES, I, V>
{
#![allow(deprecated)]
fn clone(&self) -> Self {
Self {
public_key: self.public_key.clone(),
private_key: self.private_key.clone(),
config: self.config.clone(),
network: Arc::clone(&self.network),
memberships: Arc::clone(&self.memberships),
metrics: Arc::clone(&self.metrics),
consensus: self.consensus.clone(),
instance_state: Arc::clone(&self.instance_state),
start_view: self.start_view,
start_epoch: self.start_epoch,
output_event_stream: self.output_event_stream.clone(),
external_event_stream: self.external_event_stream.clone(),
anchored_leaf: self.anchored_leaf.clone(),
internal_event_stream: self.internal_event_stream.clone(),
id: self.id,
storage: Arc::clone(&self.storage),
upgrade_lock: self.upgrade_lock.clone(),
marketplace_config: self.marketplace_config.clone(),
}
}
}
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
#![allow(deprecated)]
#[allow(clippy::too_many_arguments)]
pub async fn new(
public_key: TYPES::SignatureKey,
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
nonce: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: Arc<RwLock<TYPES::Membership>>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
storage: I::Storage,
marketplace_config: MarketplaceConfig<TYPES, I>,
) -> Arc<Self> {
#[allow(clippy::panic)]
match storage
.migrate_consensus(
Into::<Leaf2<TYPES>>::into,
convert_proposal::<TYPES, QuorumProposal<TYPES>, QuorumProposal2<TYPES>>,
)
.await
{
Ok(()) => {}
Err(e) => {
panic!("Failed to migrate consensus storage: {e}");
}
}
let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
let external_chan = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
Self::new_from_channels(
public_key,
private_key,
nonce,
config,
memberships,
network,
initializer,
metrics,
storage,
marketplace_config,
internal_chan,
external_chan,
)
}
#[allow(clippy::too_many_arguments, clippy::type_complexity)]
pub fn new_from_channels(
public_key: TYPES::SignatureKey,
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
nonce: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: Arc<RwLock<TYPES::Membership>>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
storage: I::Storage,
marketplace_config: MarketplaceConfig<TYPES, I>,
internal_channel: (
Sender<Arc<HotShotEvent<TYPES>>>,
Receiver<Arc<HotShotEvent<TYPES>>>,
),
external_channel: (Sender<Event<TYPES>>, Receiver<Event<TYPES>>),
) -> Arc<Self> {
debug!("Creating a new hotshot");
let consensus_metrics = Arc::new(metrics);
let anchored_leaf = initializer.inner;
let instance_state = initializer.instance_state;
let (internal_tx, mut internal_rx) = internal_channel;
let (mut external_tx, mut external_rx) = external_channel;
let upgrade_lock =
UpgradeLock::<TYPES, V>::from_certificate(&initializer.decided_upgrade_certificate);
external_rx.set_overflow(true);
internal_rx.set_overflow(true);
let validated_state = match initializer.validated_state {
Some(state) => state,
None => Arc::new(TYPES::ValidatedState::from_header(
anchored_leaf.block_header(),
)),
};
let epoch = TYPES::Epoch::new(epoch_from_block_number(
anchored_leaf.height(),
config.epoch_height,
));
let mut validated_state_map = BTreeMap::default();
validated_state_map.insert(
anchored_leaf.view_number(),
View {
view_inner: ViewInner::Leaf {
leaf: anchored_leaf.commit(),
state: Arc::clone(&validated_state),
delta: initializer.state_delta.clone(),
epoch,
},
},
);
for (view_num, inner) in initializer.undecided_state {
validated_state_map.insert(view_num, inner);
}
let mut saved_leaves = HashMap::new();
let mut saved_payloads = BTreeMap::new();
saved_leaves.insert(anchored_leaf.commit(), anchored_leaf.clone());
for leaf in initializer.undecided_leaves {
saved_leaves.insert(leaf.commit(), leaf.clone());
}
if let Some(payload) = anchored_leaf.block_payload() {
let encoded_txns = payload.encode();
saved_payloads.insert(anchored_leaf.view_number(), Arc::clone(&encoded_txns));
}
let anchored_epoch = if config.epoch_height == 0 {
TYPES::Epoch::new(0)
} else if anchored_leaf.height() % config.epoch_height == 0 {
TYPES::Epoch::new(anchored_leaf.height() / config.epoch_height)
} else {
TYPES::Epoch::new(anchored_leaf.height() / config.epoch_height + 1)
};
let consensus = Consensus::new(
validated_state_map,
anchored_leaf.view_number(),
anchored_epoch,
anchored_leaf.view_number(),
anchored_leaf.view_number(),
initializer.actioned_view,
initializer.saved_proposals,
saved_leaves,
saved_payloads,
initializer.high_qc,
initializer.next_epoch_high_qc,
Arc::clone(&consensus_metrics),
config.epoch_height,
);
let consensus = Arc::new(RwLock::new(consensus));
external_tx.set_await_active(false);
let inner: Arc<SystemContext<TYPES, I, V>> = Arc::new(SystemContext {
id: nonce,
consensus: OuterConsensus::new(consensus),
instance_state: Arc::new(instance_state),
public_key,
private_key,
config,
start_view: initializer.start_view,
start_epoch: initializer.start_epoch,
network,
memberships,
metrics: Arc::clone(&consensus_metrics),
internal_event_stream: (internal_tx, internal_rx.deactivate()),
output_event_stream: (external_tx.clone(), external_rx.clone().deactivate()),
external_event_stream: (external_tx, external_rx.deactivate()),
anchored_leaf: anchored_leaf.clone(),
storage: Arc::new(RwLock::new(storage)),
upgrade_lock,
marketplace_config,
});
inner
}
#[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
pub async fn start_consensus(&self) {
#[cfg(all(feature = "rewind", not(debug_assertions)))]
compile_error!("Cannot run rewind in production builds!");
debug!("Starting Consensus");
let consensus = self.consensus.read().await;
#[allow(clippy::panic)]
self.internal_event_stream
.0
.broadcast_direct(Arc::new(HotShotEvent::ViewChange(
self.start_view,
self.start_epoch,
)))
.await
.unwrap_or_else(|_| {
panic!(
"Genesis Broadcast failed; event = ViewChange({:?})",
self.start_view
)
});
let event_stream = self.internal_event_stream.0.clone();
let next_view_timeout = self.config.next_view_timeout;
let start_view = self.start_view;
let start_epoch = self.start_epoch;
spawn({
async move {
sleep(Duration::from_millis(next_view_timeout)).await;
broadcast_event(
Arc::new(HotShotEvent::Timeout(start_view + 1, start_epoch + 1)),
&event_stream,
)
.await;
}
});
#[allow(clippy::panic)]
self.internal_event_stream
.0
.broadcast_direct(Arc::new(HotShotEvent::Qc2Formed(either::Left(
consensus.high_qc().clone(),
))))
.await
.unwrap_or_else(|_| {
panic!(
"Genesis Broadcast failed; event = Qc2Formed(either::Left({:?}))",
consensus.high_qc()
)
});
{
if self.anchored_leaf.view_number() == TYPES::View::genesis() {
let (validated_state, state_delta) =
TYPES::ValidatedState::genesis(&self.instance_state);
let qc = Arc::new(
QuorumCertificate2::genesis::<V>(
&validated_state,
self.instance_state.as_ref(),
)
.await,
);
broadcast_event(
Event {
view_number: self.anchored_leaf.view_number(),
event: EventType::Decide {
leaf_chain: Arc::new(vec![LeafInfo::new(
self.anchored_leaf.clone(),
Arc::new(validated_state),
Some(Arc::new(state_delta)),
None,
)]),
qc,
block_size: None,
},
},
&self.external_event_stream.0,
)
.await;
}
}
}
async fn send_external_event(&self, event: Event<TYPES>) {
debug!(?event, "send_external_event");
broadcast_event(event, &self.external_event_stream.0).await;
}
#[instrument(skip(self), err, target = "SystemContext", fields(id = self.id))]
pub async fn publish_transaction_async(
&self,
transaction: TYPES::Transaction,
) -> Result<(), HotShotError<TYPES>> {
trace!("Adding transaction to our own queue");
let api = self.clone();
let consensus_reader = api.consensus.read().await;
let view_number = consensus_reader.cur_view();
let epoch = consensus_reader.cur_epoch();
drop(consensus_reader);
let message_kind: DataMessage<TYPES> =
DataMessage::SubmitTransaction(transaction.clone(), view_number);
let message = Message {
sender: api.public_key.clone(),
kind: MessageKind::from(message_kind),
};
let serialized_message = self.upgrade_lock.serialize(&message).await.map_err(|err| {
HotShotError::FailedToSerialize(format!("failed to serialize transaction: {err}"))
})?;
spawn(async move {
let memberships_da_committee_members = api
.memberships
.read()
.await
.da_committee_members(view_number, epoch)
.iter()
.cloned()
.collect();
join! {
api
.network.da_broadcast_message(
serialized_message,
memberships_da_committee_members,
BroadcastDelay::None,
),
api
.send_external_event(Event {
view_number,
event: EventType::Transactions {
transactions: vec![transaction],
},
}),
}
});
Ok(())
}
#[must_use]
pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
Arc::clone(&self.consensus.inner_consensus)
}
pub fn instance_state(&self) -> Arc<TYPES::InstanceState> {
Arc::clone(&self.instance_state)
}
#[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
self.consensus.read().await.decided_leaf()
}
#[must_use]
#[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
self.consensus.try_read().map(|guard| guard.decided_leaf())
}
#[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
Arc::clone(&self.consensus.read().await.decided_state())
}
#[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
pub async fn state(&self, view: TYPES::View) -> Option<Arc<TYPES::ValidatedState>> {
self.consensus.read().await.state(view).cloned()
}
#[allow(clippy::too_many_arguments)]
pub async fn init(
public_key: TYPES::SignatureKey,
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
node_id: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: Arc<RwLock<TYPES::Membership>>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
storage: I::Storage,
marketplace_config: MarketplaceConfig<TYPES, I>,
) -> Result<
(
SystemContextHandle<TYPES, I, V>,
Sender<Arc<HotShotEvent<TYPES>>>,
Receiver<Arc<HotShotEvent<TYPES>>>,
),
HotShotError<TYPES>,
> {
let hotshot = Self::new(
public_key,
private_key,
node_id,
config,
memberships,
network,
initializer,
metrics,
storage,
marketplace_config,
)
.await;
let handle = Arc::clone(&hotshot).run_tasks().await;
let (tx, rx) = hotshot.internal_event_stream.clone();
Ok((handle, tx, rx.activate()))
}
#[must_use]
pub fn next_view_timeout(&self) -> u64 {
self.config.next_view_timeout
}
}
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
pub async fn run_tasks(&self) -> SystemContextHandle<TYPES, I, V> {
let consensus_registry = ConsensusTaskRegistry::new();
let network_registry = NetworkTaskRegistry::new();
let output_event_stream = self.external_event_stream.clone();
let internal_event_stream = self.internal_event_stream.clone();
let mut handle = SystemContextHandle {
consensus_registry,
network_registry,
output_event_stream: output_event_stream.clone(),
internal_event_stream: internal_event_stream.clone(),
hotshot: self.clone().into(),
storage: Arc::clone(&self.storage),
network: Arc::clone(&self.network),
memberships: Arc::clone(&self.memberships),
epoch_height: self.config.epoch_height,
};
add_network_tasks::<TYPES, I, V>(&mut handle).await;
add_consensus_tasks::<TYPES, I, V>(&mut handle).await;
handle
}
}
type Channel<S> = (Sender<Arc<S>>, Receiver<Arc<S>>);
#[async_trait]
pub trait TwinsHandlerState<TYPES, I, V>
where
Self: std::fmt::Debug + Send + Sync,
TYPES: NodeType,
I: NodeImplementation<TYPES>,
V: Versions,
{
async fn send_handler(
&mut self,
event: &HotShotEvent<TYPES>,
) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>>;
async fn recv_handler(
&mut self,
event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
) -> Vec<HotShotEvent<TYPES>>;
fn fuse_channels(
&'static mut self,
left: Channel<HotShotEvent<TYPES>>,
right: Channel<HotShotEvent<TYPES>>,
) -> Channel<HotShotEvent<TYPES>> {
let send_state = Arc::new(RwLock::new(self));
let recv_state = Arc::clone(&send_state);
let (left_sender, mut left_receiver) = (left.0, left.1);
let (right_sender, mut right_receiver) = (right.0, right.1);
let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
let (network_task_sender, mut receiver_from_network): Channel<HotShotEvent<TYPES>> =
broadcast(EVENT_CHANNEL_SIZE);
let _recv_loop_handle = spawn(async move {
loop {
let msg = match select(left_receiver.recv(), right_receiver.recv()).await {
Either::Left(msg) => Either::Left(msg.0.unwrap().as_ref().clone()),
Either::Right(msg) => Either::Right(msg.0.unwrap().as_ref().clone()),
};
let mut state = recv_state.write().await;
let mut result = state.recv_handler(&msg).await;
while let Some(event) = result.pop() {
let _ = sender_to_network.broadcast(event.into()).await;
}
}
});
let _send_loop_handle = spawn(async move {
loop {
if let Ok(msg) = receiver_from_network.recv().await {
let mut state = send_state.write().await;
let mut result = state.send_handler(&msg).await;
while let Some(event) = result.pop() {
match event {
Either::Left(msg) => {
let _ = left_sender.broadcast(msg.into()).await;
}
Either::Right(msg) => {
let _ = right_sender.broadcast(msg.into()).await;
}
}
}
}
}
});
(network_task_sender, network_task_receiver)
}
#[allow(clippy::too_many_arguments)]
async fn spawn_twin_handles(
&'static mut self,
public_key: TYPES::SignatureKey,
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
nonce: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: Arc<RwLock<TYPES::Membership>>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
storage: I::Storage,
marketplace_config: MarketplaceConfig<TYPES, I>,
) -> (
SystemContextHandle<TYPES, I, V>,
SystemContextHandle<TYPES, I, V>,
) {
let epoch_height = config.epoch_height;
let left_system_context = SystemContext::new(
public_key.clone(),
private_key.clone(),
nonce,
config.clone(),
Arc::clone(&memberships),
Arc::clone(&network),
initializer.clone(),
metrics.clone(),
storage.clone(),
marketplace_config.clone(),
)
.await;
let right_system_context = SystemContext::new(
public_key,
private_key,
nonce,
config,
memberships,
network,
initializer,
metrics,
storage,
marketplace_config,
)
.await;
let left_consensus_registry = ConsensusTaskRegistry::new();
let left_network_registry = NetworkTaskRegistry::new();
let right_consensus_registry = ConsensusTaskRegistry::new();
let right_network_registry = NetworkTaskRegistry::new();
let (left_external_sender, left_external_receiver) = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
let left_external_event_stream =
(left_external_sender, left_external_receiver.deactivate());
let (right_external_sender, right_external_receiver) =
broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
let right_external_event_stream =
(right_external_sender, right_external_receiver.deactivate());
let (left_internal_sender, left_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
let left_internal_event_stream = (
left_internal_sender.clone(),
left_internal_receiver.clone().deactivate(),
);
let (right_internal_sender, right_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
let right_internal_event_stream = (
right_internal_sender.clone(),
right_internal_receiver.clone().deactivate(),
);
let mut left_handle = SystemContextHandle {
consensus_registry: left_consensus_registry,
network_registry: left_network_registry,
output_event_stream: left_external_event_stream.clone(),
internal_event_stream: left_internal_event_stream.clone(),
hotshot: Arc::clone(&left_system_context),
storage: Arc::clone(&left_system_context.storage),
network: Arc::clone(&left_system_context.network),
memberships: Arc::clone(&left_system_context.memberships),
epoch_height,
};
let mut right_handle = SystemContextHandle {
consensus_registry: right_consensus_registry,
network_registry: right_network_registry,
output_event_stream: right_external_event_stream.clone(),
internal_event_stream: right_internal_event_stream.clone(),
hotshot: Arc::clone(&right_system_context),
storage: Arc::clone(&right_system_context.storage),
network: Arc::clone(&right_system_context.network),
memberships: Arc::clone(&right_system_context.memberships),
epoch_height,
};
add_consensus_tasks::<TYPES, I, V>(&mut left_handle).await;
add_consensus_tasks::<TYPES, I, V>(&mut right_handle).await;
let fused_internal_event_stream = self.fuse_channels(
(left_internal_sender, left_internal_receiver),
(right_internal_sender, right_internal_receiver),
);
left_handle.internal_event_stream = (
fused_internal_event_stream.0,
fused_internal_event_stream.1.deactivate(),
);
add_network_tasks::<TYPES, I, V>(&mut left_handle).await;
left_handle.internal_event_stream = left_internal_event_stream.clone();
(left_handle, right_handle)
}
}
#[derive(Debug)]
pub struct RandomTwinsHandler;
#[async_trait]
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
for RandomTwinsHandler
{
async fn send_handler(
&mut self,
event: &HotShotEvent<TYPES>,
) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
let random: bool = rand::thread_rng().gen();
#[allow(clippy::match_bool)]
match random {
true => vec![Either::Left(event.clone())],
false => vec![Either::Right(event.clone())],
}
}
async fn recv_handler(
&mut self,
event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
) -> Vec<HotShotEvent<TYPES>> {
match event {
Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
}
}
}
#[derive(Debug)]
pub struct DoubleTwinsHandler;
#[async_trait]
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
for DoubleTwinsHandler
{
async fn send_handler(
&mut self,
event: &HotShotEvent<TYPES>,
) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
vec![Either::Left(event.clone()), Either::Right(event.clone())]
}
async fn recv_handler(
&mut self,
event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
) -> Vec<HotShotEvent<TYPES>> {
match event {
Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
}
}
}
#[async_trait]
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusApi<TYPES, I>
for SystemContextHandle<TYPES, I, V>
{
fn total_nodes(&self) -> NonZeroUsize {
self.hotshot.config.num_nodes_with_stake
}
fn builder_timeout(&self) -> Duration {
self.hotshot.config.builder_timeout
}
async fn send_event(&self, event: Event<TYPES>) {
debug!(?event, "send_event");
broadcast_event(event, &self.hotshot.external_event_stream.0).await;
}
fn public_key(&self) -> &TYPES::SignatureKey {
&self.hotshot.public_key
}
fn private_key(&self) -> &<TYPES::SignatureKey as SignatureKey>::PrivateKey {
&self.hotshot.private_key
}
}
#[derive(Clone)]
pub struct HotShotInitializer<TYPES: NodeType> {
inner: Leaf2<TYPES>,
instance_state: TYPES::InstanceState,
validated_state: Option<Arc<TYPES::ValidatedState>>,
state_delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
start_view: TYPES::View,
start_epoch: TYPES::Epoch,
actioned_view: TYPES::View,
high_qc: QuorumCertificate2<TYPES>,
next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
undecided_leaves: Vec<Leaf2<TYPES>>,
undecided_state: BTreeMap<TYPES::View, View<TYPES>>,
saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposal2<TYPES>>>,
}
impl<TYPES: NodeType> HotShotInitializer<TYPES> {
pub async fn from_genesis<V: Versions>(
instance_state: TYPES::InstanceState,
) -> Result<Self, HotShotError<TYPES>> {
let (validated_state, state_delta) = TYPES::ValidatedState::genesis(&instance_state);
let high_qc = QuorumCertificate2::genesis::<V>(&validated_state, &instance_state).await;
Ok(Self {
inner: Leaf2::genesis(&validated_state, &instance_state).await,
validated_state: Some(Arc::new(validated_state)),
state_delta: Some(Arc::new(state_delta)),
start_view: TYPES::View::new(0),
start_epoch: TYPES::Epoch::new(0),
actioned_view: TYPES::View::new(0),
saved_proposals: BTreeMap::new(),
high_qc,
next_epoch_high_qc: None,
decided_upgrade_certificate: None,
undecided_leaves: Vec::new(),
undecided_state: BTreeMap::new(),
instance_state,
})
}
#[allow(clippy::too_many_arguments)]
pub fn from_reload(
anchor_leaf: Leaf2<TYPES>,
instance_state: TYPES::InstanceState,
validated_state: Option<Arc<TYPES::ValidatedState>>,
start_view: TYPES::View,
start_epoch: TYPES::Epoch,
actioned_view: TYPES::View,
saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposal2<TYPES>>>,
high_qc: QuorumCertificate2<TYPES>,
next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
undecided_leaves: Vec<Leaf2<TYPES>>,
undecided_state: BTreeMap<TYPES::View, View<TYPES>>,
) -> Self {
Self {
inner: anchor_leaf,
instance_state,
validated_state,
state_delta: None,
start_view,
start_epoch,
actioned_view,
saved_proposals,
high_qc,
next_epoch_high_qc,
decided_upgrade_certificate,
undecided_leaves,
undecided_state,
}
}
}