use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use async_broadcast::{Receiver, SendError, Sender};
use async_lock::RwLock;
use committable::{Commitment, Committable};
use hotshot_task::dependency::{Dependency, EventDependency};
use hotshot_types::{
consensus::OuterConsensus,
data::{Leaf2, QuorumProposal2, ViewChangeEvidence},
event::{Event, EventType, LeafInfo},
message::{Proposal, UpgradeLock},
request_response::ProposalRequestPayload,
simple_certificate::{QuorumCertificate2, UpgradeCertificate},
simple_vote::HasEpoch,
traits::{
block_contents::BlockHeader,
election::Membership,
node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
signature_key::SignatureKey,
BlockPayload, ValidatedState,
},
utils::{epoch_from_block_number, is_last_block_in_epoch, Terminator, View, ViewInner},
vote::{Certificate, HasViewNumber},
};
use tokio::time::timeout;
use tracing::instrument;
use utils::anytrace::*;
use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT};
#[instrument(skip_all)]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn fetch_proposal<TYPES: NodeType, V: Versions>(
view_number: TYPES::View,
event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
membership: Arc<RwLock<TYPES::Membership>>,
consensus: OuterConsensus<TYPES>,
sender_public_key: TYPES::SignatureKey,
sender_private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
upgrade_lock: &UpgradeLock<TYPES, V>,
epoch_height: u64,
) -> Result<(Leaf2<TYPES>, View<TYPES>)> {
let signed_proposal_request = ProposalRequestPayload {
view_number,
key: sender_public_key,
};
let signature = TYPES::SignatureKey::sign(
&sender_private_key,
signed_proposal_request.commit().as_ref(),
)
.wrap()
.context(error!("Failed to sign proposal. This should never happen."))?;
broadcast_event(
HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
&event_sender,
)
.await;
let mem = Arc::clone(&membership);
let Ok(Some(proposal)) =
timeout(REQUEST_TIMEOUT, async move {
let mut proposal = None;
while proposal.is_none() {
let event = EventDependency::new(
event_receiver.clone(),
Box::new(move |event| {
let event = event.as_ref();
if let HotShotEvent::QuorumProposalResponseRecv(
quorum_proposal,
) = event
{
quorum_proposal.data.view_number() == view_number
} else {
false
}
}),
)
.completed()
.await;
if let Some(hs_event) = event.as_ref() {
if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) =
hs_event.as_ref()
{
let mem_reader = mem.read().await;
if quorum_proposal.validate_signature(&mem_reader, epoch_height).is_ok() {
proposal = Some(quorum_proposal.clone());
}
}
} else {
return None;
}
}
proposal
})
.await
else {
bail!("Request for proposal failed");
};
let view_number = proposal.data.view_number();
let justify_qc = proposal.data.justify_qc.clone();
let justify_qc_epoch = justify_qc.data.epoch();
let membership_reader = membership.read().await;
let membership_stake_table = membership_reader.stake_table(justify_qc_epoch);
let membership_success_threshold = membership_reader.success_threshold(justify_qc_epoch);
drop(membership_reader);
if !justify_qc
.is_valid_cert(
membership_stake_table,
membership_success_threshold,
upgrade_lock,
)
.await
{
bail!("Invalid justify_qc in proposal for view {}", *view_number);
}
let mut consensus_writer = consensus.write().await;
let leaf = Leaf2::from_quorum_proposal(&proposal.data);
let state = Arc::new(
<TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(&proposal.data.block_header),
);
if let Err(e) = consensus_writer.update_leaf(leaf.clone(), Arc::clone(&state), None) {
tracing::trace!("{e:?}");
}
let view = View {
view_inner: ViewInner::Leaf {
leaf: leaf.commit(),
state,
delta: None,
epoch: leaf.epoch(),
},
};
Ok((leaf, view))
}
#[derive(Debug)]
pub struct LeafChainTraversalOutcome<TYPES: NodeType> {
pub new_locked_view_number: Option<TYPES::View>,
pub new_decided_view_number: Option<TYPES::View>,
pub new_decide_qc: Option<QuorumCertificate2<TYPES>>,
pub leaf_views: Vec<LeafInfo<TYPES>>,
pub included_txns: Option<HashSet<Commitment<<TYPES as NodeType>::Transaction>>>,
pub decided_upgrade_cert: Option<UpgradeCertificate<TYPES>>,
}
impl<TYPES: NodeType + Default> Default for LeafChainTraversalOutcome<TYPES> {
fn default() -> Self {
Self {
new_locked_view_number: None,
new_decided_view_number: None,
new_decide_qc: None,
leaf_views: Vec::new(),
included_txns: None,
decided_upgrade_cert: None,
}
}
}
pub async fn decide_from_proposal_2<TYPES: NodeType>(
proposal: &QuorumProposal2<TYPES>,
consensus: OuterConsensus<TYPES>,
existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
public_key: &TYPES::SignatureKey,
) -> LeafChainTraversalOutcome<TYPES> {
let mut res = LeafChainTraversalOutcome::default();
let consensus_reader = consensus.read().await;
let proposed_leaf = Leaf2::from_quorum_proposal(proposal);
res.new_locked_view_number = Some(proposed_leaf.justify_qc().view_number());
let Some(parent_info) = consensus_reader.parent_leaf_info(&proposed_leaf, public_key) else {
return res;
};
let Some(grand_parent_info) = consensus_reader.parent_leaf_info(&parent_info.leaf, public_key)
else {
return res;
};
if grand_parent_info.leaf.view_number() + 1 != parent_info.leaf.view_number() {
return res;
}
res.new_decide_qc = Some(parent_info.leaf.justify_qc().clone());
let decided_view_number = grand_parent_info.leaf.view_number();
res.new_decided_view_number = Some(decided_view_number);
let old_anchor_view = consensus_reader.last_decided_view();
let mut current_leaf_info = Some(grand_parent_info);
let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
let mut txns = HashSet::new();
while current_leaf_info
.as_ref()
.is_some_and(|info| info.leaf.view_number() > old_anchor_view)
{
let info = &mut current_leaf_info.unwrap();
if let Some(cert) = info.leaf.upgrade_certificate() {
if info.leaf.upgrade_certificate() != *existing_upgrade_cert_reader {
if cert.data.decide_by < decided_view_number {
tracing::warn!("Failed to decide an upgrade certificate in time. Ignoring.");
} else {
tracing::info!("Reached decide on upgrade certificate: {:?}", cert);
res.decided_upgrade_cert = Some(cert.clone());
}
}
}
res.leaf_views.push(info.clone());
if let Some(encoded_txns) = consensus_reader
.saved_payloads()
.get(&info.leaf.view_number())
{
let payload =
BlockPayload::from_bytes(encoded_txns, info.leaf.block_header().metadata());
info.leaf.fill_block_payload_unchecked(payload);
}
if let Some(ref payload) = info.leaf.block_payload() {
for txn in payload.transaction_commitments(info.leaf.block_header().metadata()) {
txns.insert(txn);
}
}
current_leaf_info = consensus_reader.parent_leaf_info(&info.leaf, public_key);
}
if !txns.is_empty() {
res.included_txns = Some(txns);
}
res
}
pub async fn decide_from_proposal<TYPES: NodeType>(
proposal: &QuorumProposal2<TYPES>,
consensus: OuterConsensus<TYPES>,
existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
public_key: &TYPES::SignatureKey,
) -> LeafChainTraversalOutcome<TYPES> {
let consensus_reader = consensus.read().await;
let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
let view_number = proposal.view_number();
let parent_view_number = proposal.justify_qc.view_number();
let old_anchor_view = consensus_reader.last_decided_view();
let mut last_view_number_visited = view_number;
let mut current_chain_length = 0usize;
let mut res = LeafChainTraversalOutcome::default();
if let Err(e) = consensus_reader.visit_leaf_ancestors(
parent_view_number,
Terminator::Exclusive(old_anchor_view),
true,
|leaf, state, delta| {
if res.new_decided_view_number.is_none() {
if last_view_number_visited == leaf.view_number() + 1 {
last_view_number_visited = leaf.view_number();
current_chain_length += 1;
if current_chain_length == 2 {
res.new_locked_view_number = Some(leaf.view_number());
res.new_decide_qc = Some(leaf.justify_qc().clone());
} else if current_chain_length == 3 {
res.new_decided_view_number = Some(leaf.view_number());
}
} else {
return false;
}
}
if let Some(new_decided_view) = res.new_decided_view_number {
let mut leaf = leaf.clone();
if leaf.view_number() == new_decided_view {
consensus_reader
.metrics
.last_synced_block_height
.set(usize::try_from(leaf.height()).unwrap_or(0));
}
if let Some(cert) = leaf.upgrade_certificate() {
if leaf.upgrade_certificate() != *existing_upgrade_cert_reader {
if cert.data.decide_by < view_number {
tracing::warn!(
"Failed to decide an upgrade certificate in time. Ignoring."
);
} else {
tracing::info!("Reached decide on upgrade certificate: {:?}", cert);
res.decided_upgrade_cert = Some(cert.clone());
}
}
}
if let Some(encoded_txns) =
consensus_reader.saved_payloads().get(&leaf.view_number())
{
let payload =
BlockPayload::from_bytes(encoded_txns, leaf.block_header().metadata());
leaf.fill_block_payload_unchecked(payload);
}
let vid_share = consensus_reader
.vid_shares()
.get(&leaf.view_number())
.unwrap_or(&HashMap::new())
.get(public_key)
.cloned()
.map(|prop| prop.data);
res.leaf_views.push(LeafInfo::new(
leaf.clone(),
Arc::clone(&state),
delta.clone(),
vid_share,
));
if let Some(ref payload) = leaf.block_payload() {
res.included_txns = Some(
payload
.transaction_commitments(leaf.block_header().metadata())
.into_iter()
.collect::<HashSet<_>>(),
);
}
}
true
},
) {
tracing::debug!("Leaf ascension failed; error={e}");
}
res
}
#[instrument(skip_all)]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn parent_leaf_and_state<TYPES: NodeType, V: Versions>(
event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
membership: Arc<RwLock<TYPES::Membership>>,
public_key: TYPES::SignatureKey,
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
consensus: OuterConsensus<TYPES>,
upgrade_lock: &UpgradeLock<TYPES, V>,
parent_view_number: TYPES::View,
epoch_height: u64,
) -> Result<(Leaf2<TYPES>, Arc<<TYPES as NodeType>::ValidatedState>)> {
let consensus_reader = consensus.read().await;
let vsm_contains_parent_view = consensus_reader
.validated_state_map()
.contains_key(&parent_view_number);
drop(consensus_reader);
if !vsm_contains_parent_view {
let _ = fetch_proposal(
parent_view_number,
event_sender.clone(),
event_receiver.clone(),
membership,
consensus.clone(),
public_key.clone(),
private_key.clone(),
upgrade_lock,
epoch_height,
)
.await
.context(info!("Failed to fetch proposal"))?;
}
let consensus_reader = consensus.read().await;
let parent_view = consensus_reader.validated_state_map().get(&parent_view_number).context(
debug!("Couldn't find parent view in state map, waiting for replica to see proposal; parent_view_number: {}", *parent_view_number)
)?;
let (leaf_commitment, state) = parent_view.leaf_and_state().context(
info!("Parent of high QC points to a view without a proposal; parent_view_number: {parent_view_number:?}, parent_view {parent_view:?}")
)?;
if leaf_commitment != consensus_reader.high_qc().data().leaf_commit {
tracing::debug!(
"They don't equal: {:?} {:?}",
leaf_commitment,
consensus_reader.high_qc().data().leaf_commit
);
}
let leaf = consensus_reader
.saved_leaves()
.get(&leaf_commitment)
.context(info!("Failed to find high QC of parent"))?;
Ok((leaf.clone(), Arc::clone(state)))
}
#[allow(clippy::too_many_lines)]
#[instrument(skip_all, fields(id = validation_info.id, view = *proposal.data.view_number()))]
pub async fn validate_proposal_safety_and_liveness<
TYPES: NodeType,
I: NodeImplementation<TYPES>,
V: Versions,
>(
proposal: Proposal<TYPES, QuorumProposal2<TYPES>>,
parent_leaf: Leaf2<TYPES>,
validation_info: &ValidationInfo<TYPES, I, V>,
event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
sender: TYPES::SignatureKey,
) -> Result<()> {
let view_number = proposal.data.view_number();
let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
ensure!(
proposed_leaf.parent_commitment() == parent_leaf.commit(),
"Proposed leaf does not extend the parent leaf."
);
let proposal_epoch =
epoch_from_block_number(proposed_leaf.height(), validation_info.epoch_height);
let state = Arc::new(
<TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(&proposal.data.block_header),
);
{
let mut consensus_writer = validation_info.consensus.write().await;
if let Err(e) = consensus_writer.update_leaf(proposed_leaf.clone(), state, None) {
tracing::trace!("{e:?}");
}
if let Err(e) = consensus_writer.update_proposed_view(proposal.clone()) {
tracing::debug!("Internal proposal update failed; error = {e:#}");
};
}
UpgradeCertificate::validate(
&proposal.data.upgrade_certificate,
&validation_info.membership,
TYPES::Epoch::new(proposal_epoch),
&validation_info.upgrade_lock,
)
.await?;
proposed_leaf
.extends_upgrade(
&parent_leaf,
&validation_info.upgrade_lock.decided_upgrade_certificate,
)
.await?;
let justify_qc = proposal.data.justify_qc.clone();
{
let consensus_reader = validation_info.consensus.read().await;
let justify_qc_epoch =
epoch_from_block_number(parent_leaf.height(), validation_info.epoch_height);
ensure!(
proposal_epoch == justify_qc_epoch
|| consensus_reader.check_eqc(&proposed_leaf, &parent_leaf),
{
error!(
"Failed epoch safety check \n Proposed leaf is {:?} \n justify QC leaf is {:?}",
proposed_leaf.clone(),
parent_leaf.clone(),
)
}
);
if is_last_block_in_epoch(parent_leaf.height(), validation_info.epoch_height) {
ensure!(proposal.data.next_epoch_justify_qc.is_some(),
"Epoch transition proposal does not include the next epoch justify QC. Do not vote!");
}
let liveness_check = justify_qc.view_number() > consensus_reader.locked_view();
let outcome = consensus_reader.visit_leaf_ancestors(
justify_qc.view_number(),
Terminator::Inclusive(consensus_reader.locked_view()),
false,
|leaf, _, _| {
leaf.view_number() != consensus_reader.locked_view()
},
);
let safety_check = outcome.is_ok();
ensure!(safety_check || liveness_check, {
if let Err(e) = outcome {
broadcast_event(
Event {
view_number,
event: EventType::Error { error: Arc::new(e) },
},
&validation_info.output_event_stream,
)
.await;
}
error!("Failed safety and liveness check \n High QC is {:?} Proposal QC is {:?} Locked view is {:?}", consensus_reader.high_qc(), proposal.data.clone(), consensus_reader.locked_view())
});
}
broadcast_event(
Event {
view_number,
event: EventType::QuorumProposal {
proposal: proposal.clone(),
sender,
},
},
&validation_info.output_event_stream,
)
.await;
broadcast_event(
Arc::new(HotShotEvent::QuorumProposalValidated(
proposal.clone(),
parent_leaf,
)),
&event_stream,
)
.await;
Ok(())
}
pub(crate) async fn validate_proposal_view_and_certs<
TYPES: NodeType,
I: NodeImplementation<TYPES>,
V: Versions,
>(
proposal: &Proposal<TYPES, QuorumProposal2<TYPES>>,
validation_info: &ValidationInfo<TYPES, I, V>,
) -> Result<()> {
let view_number = proposal.data.view_number();
ensure!(
view_number >= validation_info.consensus.read().await.cur_view(),
"Proposal is from an older view {:?}",
proposal.data.clone()
);
let membership_reader = validation_info.membership.read().await;
proposal.validate_signature(&membership_reader, validation_info.epoch_height)?;
drop(membership_reader);
if proposal.data.justify_qc.view_number() != view_number - 1 {
let received_proposal_cert =
proposal.data.view_change_evidence.clone().context(debug!(
"Quorum proposal for view {} needed a timeout or view sync certificate, but did not have one",
*view_number
))?;
match received_proposal_cert {
ViewChangeEvidence::Timeout(timeout_cert) => {
ensure!(
timeout_cert.data().view == view_number - 1,
"Timeout certificate for view {} was not for the immediately preceding view",
*view_number
);
let timeout_cert_epoch = timeout_cert.data().epoch();
let membership_reader = validation_info.membership.read().await;
let membership_stake_table = membership_reader.stake_table(timeout_cert_epoch);
let membership_success_threshold =
membership_reader.success_threshold(timeout_cert_epoch);
drop(membership_reader);
ensure!(
timeout_cert
.is_valid_cert(
membership_stake_table,
membership_success_threshold,
&validation_info.upgrade_lock
)
.await,
"Timeout certificate for view {} was invalid",
*view_number
);
}
ViewChangeEvidence::ViewSync(view_sync_cert) => {
ensure!(
view_sync_cert.view_number == view_number,
"View sync cert view number {:?} does not match proposal view number {:?}",
view_sync_cert.view_number,
view_number
);
let view_sync_cert_epoch = view_sync_cert.data().epoch();
let membership_reader = validation_info.membership.read().await;
let membership_stake_table = membership_reader.stake_table(view_sync_cert_epoch);
let membership_success_threshold =
membership_reader.success_threshold(view_sync_cert_epoch);
drop(membership_reader);
ensure!(
view_sync_cert
.is_valid_cert(
membership_stake_table,
membership_success_threshold,
&validation_info.upgrade_lock
)
.await,
"Invalid view sync finalize cert provided"
);
}
}
}
{
let epoch = TYPES::Epoch::new(epoch_from_block_number(
proposal.data.block_header.block_number(),
TYPES::EPOCH_HEIGHT,
));
UpgradeCertificate::validate(
&proposal.data.upgrade_certificate,
&validation_info.membership,
epoch,
&validation_info.upgrade_lock,
)
.await?;
}
Ok(())
}
pub async fn broadcast_event<E: Clone + std::fmt::Debug>(event: E, sender: &Sender<E>) {
match sender.broadcast_direct(event).await {
Ok(None) => (),
Ok(Some(overflowed)) => {
tracing::error!(
"Event sender queue overflow, Oldest event removed form queue: {:?}",
overflowed
);
}
Err(SendError(e)) => {
tracing::warn!(
"Event: {:?}\n Sending failed, event stream probably shutdown",
e
);
}
}
}