use std::{collections::btree_map::Entry, sync::Arc};
use async_broadcast::{InactiveReceiver, Sender};
use async_lock::RwLock;
use chrono::Utc;
use committable::Committable;
use hotshot_types::{
consensus::OuterConsensus,
data::{Leaf2, QuorumProposalWrapper, VidDisperseShare},
drb::{compute_drb_result, DrbResult},
event::{Event, EventType},
message::{convert_proposal, Proposal, UpgradeLock},
simple_vote::{HasEpoch, QuorumData2, QuorumVote2},
traits::{
block_contents::BlockHeader,
election::Membership,
node_implementation::{ConsensusTime, NodeImplementation, NodeType},
signature_key::SignatureKey,
storage::Storage,
ValidatedState,
},
utils::{
epoch_from_block_number, is_epoch_root, is_last_block_in_epoch,
option_epoch_from_block_number,
},
vote::HasViewNumber,
};
use tokio::spawn;
use tracing::instrument;
use utils::anytrace::*;
use vbs::version::StaticVersionType;
use super::QuorumVoteTaskState;
use crate::{
events::HotShotEvent,
helpers::{
broadcast_event, decide_from_proposal, decide_from_proposal_2, fetch_proposal,
LeafChainTraversalOutcome,
},
quorum_vote::Versions,
};
async fn store_and_get_computed_drb_result<
TYPES: NodeType,
I: NodeImplementation<TYPES>,
V: Versions,
>(
epoch_number: TYPES::Epoch,
task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
) -> Result<DrbResult> {
if let Some(computed_result) = task_state
.consensus
.read()
.await
.drb_seeds_and_results
.results
.get(&epoch_number)
{
return Ok(*computed_result);
}
let (task_epoch, computation) =
(&mut task_state.drb_computation).context(warn!("DRB computation task doesn't exist."))?;
ensure!(
*task_epoch == epoch_number,
info!("DRB computation is not for the next epoch.")
);
ensure!(
computation.is_finished(),
info!("DRB computation has not yet finished.")
);
match computation.await {
Ok(result) => {
let mut consensus_writer = task_state.consensus.write().await;
consensus_writer
.drb_seeds_and_results
.results
.insert(epoch_number, result);
task_state.drb_computation = None;
Ok(result)
}
Err(e) => Err(warn!("Error in DRB calculation: {:?}.", e)),
}
}
async fn verify_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
proposal: &QuorumProposalWrapper<TYPES>,
task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
) -> Result<()> {
if task_state.epoch_height == 0
|| !is_last_block_in_epoch(
proposal.block_header().block_number(),
task_state.epoch_height,
)
{
tracing::debug!("Skipping DRB result verification");
return Ok(());
}
let epoch = option_epoch_from_block_number::<TYPES>(
task_state
.upgrade_lock
.epochs_enabled(proposal.view_number())
.await,
proposal.block_header().block_number(),
task_state.epoch_height,
);
let proposal_result = proposal
.next_drb_result()
.context(info!("Proposal is missing the DRB result."))?;
let membership_reader = task_state.membership.read().await;
if let Some(epoch_val) = epoch {
let has_stake_current_epoch =
membership_reader.has_stake(&task_state.public_key, Some(epoch_val));
drop(membership_reader);
if has_stake_current_epoch {
let computed_result =
store_and_get_computed_drb_result(epoch_val + 1, task_state).await?;
ensure!(proposal_result == computed_result, warn!("Our calculated DRB result is {:?}, which does not match the proposed DRB result of {:?}", computed_result, proposal_result));
}
Ok(())
} else {
Err(error!("Epochs are not available"))
}
}
async fn start_drb_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
proposal: &QuorumProposalWrapper<TYPES>,
task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
) {
if proposal.epoch().is_none() {
return;
}
let current_epoch_number = TYPES::Epoch::new(epoch_from_block_number(
proposal.block_header().block_number(),
task_state.epoch_height,
));
if task_state
.membership
.read()
.await
.has_stake(&task_state.public_key, Some(current_epoch_number))
{
let new_epoch_number = current_epoch_number + 1;
if let Some((task_epoch, join_handle)) = &mut task_state.drb_computation {
if join_handle.is_finished() {
match join_handle.await {
Ok(result) => {
task_state
.consensus
.write()
.await
.drb_seeds_and_results
.results
.insert(*task_epoch, result);
task_state.drb_computation = None;
}
Err(e) => {
tracing::error!("error joining DRB computation task: {e:?}");
}
}
} else if *task_epoch == new_epoch_number {
return;
} else {
join_handle.abort();
task_state.drb_computation = None;
}
}
let mut consensus_writer = task_state.consensus.write().await;
if consensus_writer
.drb_seeds_and_results
.results
.contains_key(&new_epoch_number)
{
return;
}
if let Entry::Occupied(entry) = consensus_writer
.drb_seeds_and_results
.seeds
.entry(new_epoch_number)
{
let drb_seed_input = *entry.get();
let new_drb_task = spawn(async move { compute_drb_result::<TYPES>(drb_seed_input) });
task_state.drb_computation = Some((new_epoch_number, new_drb_task));
entry.remove();
}
}
}
async fn store_drb_seed_and_result<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
decided_leaf: &Leaf2<TYPES>,
) -> Result<()> {
if task_state.epoch_height == 0 {
tracing::info!("Epoch height is 0, skipping DRB storage.");
return Ok(());
}
let decided_block_number = decided_leaf.block_header().block_number();
let current_epoch_number = TYPES::Epoch::new(epoch_from_block_number(
decided_block_number,
task_state.epoch_height,
));
if is_epoch_root(decided_block_number, task_state.epoch_height) {
let mut consensus_writer = task_state.consensus.write().await;
consensus_writer
.drb_seeds_and_results
.garbage_collect(current_epoch_number);
drop(consensus_writer);
store_and_get_computed_drb_result(current_epoch_number + 1, task_state).await?;
let Ok(drb_seed_input_vec) = bincode::serialize(&decided_leaf.justify_qc().signatures)
else {
bail!("Failed to serialize the QC signature.");
};
let Ok(drb_seed_input) = drb_seed_input_vec.try_into() else {
bail!("Failed to convert the serialized QC signature into a DRB seed input.");
};
task_state
.consensus
.write()
.await
.drb_seeds_and_results
.store_seed(current_epoch_number + 2, drb_seed_input);
}
else if is_last_block_in_epoch(decided_block_number, task_state.epoch_height) {
if let Some(result) = decided_leaf.next_drb_result {
task_state
.consensus
.write()
.await
.drb_seeds_and_results
.results
.insert(current_epoch_number + 1, result);
} else {
bail!("The last block of the epoch is decided but doesn't contain a DRB result.");
}
}
Ok(())
}
#[instrument(skip_all, fields(id = task_state.id, view = *proposal.view_number()))]
pub(crate) async fn handle_quorum_proposal_validated<
TYPES: NodeType,
I: NodeImplementation<TYPES>,
V: Versions,
>(
proposal: &QuorumProposalWrapper<TYPES>,
task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
) -> Result<()> {
let version = task_state
.upgrade_lock
.version(proposal.view_number())
.await?;
if version >= V::Epochs::VERSION {
verify_drb_result(proposal, task_state).await?;
start_drb_task(proposal, task_state).await;
}
let LeafChainTraversalOutcome {
new_locked_view_number,
new_decided_view_number,
new_decide_qc,
leaf_views,
included_txns,
decided_upgrade_cert,
} = if version >= V::Epochs::VERSION {
decide_from_proposal_2(
proposal,
OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate),
&task_state.public_key,
version >= V::Epochs::VERSION,
&task_state.membership,
)
.await
} else {
decide_from_proposal(
proposal,
OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate),
&task_state.public_key,
version >= V::Epochs::VERSION,
&task_state.membership,
)
.await
};
if let Some(cert) = decided_upgrade_cert.clone() {
let mut decided_certificate_lock = task_state
.upgrade_lock
.decided_upgrade_certificate
.write()
.await;
*decided_certificate_lock = Some(cert.clone());
drop(decided_certificate_lock);
let _ = task_state
.storage
.write()
.await
.update_decided_upgrade_certificate(Some(cert.clone()))
.await;
}
let mut consensus_writer = task_state.consensus.write().await;
if let Some(locked_view_number) = new_locked_view_number {
consensus_writer.update_locked_view(locked_view_number)?;
}
#[allow(clippy::cast_precision_loss)]
if let Some(decided_view_number) = new_decided_view_number {
let old_decided_view = consensus_writer.last_decided_view();
consensus_writer.collect_garbage(old_decided_view, decided_view_number);
consensus_writer.update_last_decided_view(decided_view_number)?;
consensus_writer
.metrics
.last_decided_time
.set(Utc::now().timestamp().try_into().unwrap());
consensus_writer.metrics.invalid_qc.set(0);
consensus_writer
.metrics
.last_decided_view
.set(usize::try_from(consensus_writer.last_decided_view().u64()).unwrap());
let cur_number_of_views_per_decide_event =
*proposal.view_number() - consensus_writer.last_decided_view().u64();
consensus_writer
.metrics
.number_of_views_per_decide_event
.add_point(cur_number_of_views_per_decide_event as f64);
tracing::debug!(
"Sending Decide for view {:?}",
consensus_writer.last_decided_view()
);
drop(consensus_writer);
broadcast_event(
Event {
view_number: decided_view_number,
event: EventType::Decide {
leaf_chain: Arc::new(leaf_views.clone()),
qc: Arc::new(new_decide_qc.unwrap()),
block_size: included_txns.map(|txns| txns.len().try_into().unwrap()),
},
},
&task_state.output_event_stream,
)
.await;
tracing::debug!("Successfully sent decide event");
if version >= V::Epochs::VERSION {
store_drb_seed_and_result(task_state, &leaf_views.last().unwrap().leaf).await?;
}
}
Ok(())
}
#[instrument(skip_all, target = "VoteDependencyHandle", fields(view = *view_number))]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn update_shared_state<
TYPES: NodeType,
I: NodeImplementation<TYPES>,
V: Versions,
>(
consensus: OuterConsensus<TYPES>,
sender: Sender<Arc<HotShotEvent<TYPES>>>,
receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
membership: Arc<RwLock<TYPES::Membership>>,
public_key: TYPES::SignatureKey,
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
upgrade_lock: UpgradeLock<TYPES, V>,
view_number: TYPES::View,
instance_state: Arc<TYPES::InstanceState>,
storage: Arc<RwLock<I::Storage>>,
proposed_leaf: &Leaf2<TYPES>,
vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
parent_view_number: Option<TYPES::View>,
epoch_height: u64,
) -> Result<()> {
let justify_qc = &proposed_leaf.justify_qc();
let consensus_reader = consensus.read().await;
let mut maybe_validated_view = parent_view_number.and_then(|view_number| {
consensus_reader
.validated_state_map()
.get(&view_number)
.cloned()
});
let mut maybe_parent = consensus_reader
.saved_leaves()
.get(&justify_qc.data.leaf_commit)
.cloned();
drop(consensus_reader);
maybe_parent = match maybe_parent {
Some(p) => Some(p),
None => {
match fetch_proposal(
justify_qc.view_number(),
sender.clone(),
receiver.activate_cloned(),
Arc::clone(&membership),
OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
public_key.clone(),
private_key.clone(),
&upgrade_lock,
epoch_height,
)
.await
.ok()
{
Some((leaf, view)) => {
maybe_validated_view = Some(view);
Some(leaf)
}
None => None,
}
}
};
let parent = maybe_parent.context(info!(
"Proposal's parent missing from storage with commitment: {:?}, proposal view {:?}",
justify_qc.data.leaf_commit,
proposed_leaf.view_number(),
))?;
let Some(validated_view) = maybe_validated_view else {
bail!(
"Failed to fetch view for parent, parent view {:?}",
parent_view_number
);
};
let (Some(parent_state), _) = validated_view.state_and_delta() else {
bail!("Parent state not found! Consensus internally inconsistent");
};
let version = upgrade_lock.version(view_number).await?;
let (validated_state, state_delta) = parent_state
.validate_and_apply_header(
&instance_state,
&parent,
&proposed_leaf.block_header().clone(),
vid_share.data.vid_common_ref().clone(),
version,
*view_number,
)
.await
.wrap()
.context(warn!("Block header doesn't extend the proposal!"))?;
let state = Arc::new(validated_state);
let delta = Arc::new(state_delta);
let mut consensus_writer = consensus.write().await;
if let Err(e) = consensus_writer.update_leaf(
proposed_leaf.clone(),
Arc::clone(&state),
Some(Arc::clone(&delta)),
) {
tracing::trace!("{e:?}");
}
let new_leaves = consensus_writer.saved_leaves().clone();
let new_state = consensus_writer.validated_state_map().clone();
drop(consensus_writer);
storage
.write()
.await
.update_undecided_state2(new_leaves, new_state)
.await
.wrap()
.context(error!("Failed to update undecided state"))?;
Ok(())
}
#[instrument(skip_all, fields(name = "Submit quorum vote", level = "error"))]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn submit_vote<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
sender: Sender<Arc<HotShotEvent<TYPES>>>,
membership: Arc<RwLock<TYPES::Membership>>,
public_key: TYPES::SignatureKey,
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
upgrade_lock: UpgradeLock<TYPES, V>,
view_number: TYPES::View,
storage: Arc<RwLock<I::Storage>>,
leaf: Leaf2<TYPES>,
vid_share: Proposal<TYPES, VidDisperseShare<TYPES>>,
extended_vote: bool,
epoch_height: u64,
) -> Result<()> {
let epoch_number = option_epoch_from_block_number::<TYPES>(
leaf.with_epoch,
leaf.block_header().block_number(),
epoch_height,
);
let membership_reader = membership.read().await;
let committee_member_in_current_epoch = membership_reader.has_stake(&public_key, epoch_number);
let committee_member_in_next_epoch = leaf.with_epoch
&& is_last_block_in_epoch(leaf.height(), epoch_height)
&& membership_reader.has_stake(&public_key, epoch_number.map(|x| x + 1));
drop(membership_reader);
ensure!(
committee_member_in_current_epoch || committee_member_in_next_epoch,
info!(
"We were not chosen for quorum committee on {:?}",
view_number
)
);
let vote = QuorumVote2::<TYPES>::create_signed_vote(
QuorumData2 {
leaf_commit: leaf.commit(),
epoch: epoch_number,
},
view_number,
&public_key,
&private_key,
&upgrade_lock,
)
.await
.wrap()
.context(error!("Failed to sign vote. This should never happen."))?;
storage
.write()
.await
.append_vid2(&convert_proposal(vid_share))
.await
.wrap()
.context(error!("Failed to store VID share"))?;
if extended_vote {
tracing::debug!("sending extended vote to everybody",);
broadcast_event(
Arc::new(HotShotEvent::ExtendedQuorumVoteSend(vote)),
&sender,
)
.await;
} else {
tracing::debug!(
"sending vote to next quorum leader {:?}",
vote.view_number() + 1
);
broadcast_event(Arc::new(HotShotEvent::QuorumVoteSend(vote)), &sender).await;
}
Ok(())
}