use std::{
marker::PhantomData,
sync::Arc,
time::{Duration, Instant},
};
use anyhow::{ensure, Context, Result};
use async_broadcast::{Receiver, Sender};
use async_lock::RwLock;
use committable::Committable;
use either::Either;
use hotshot_task::{
dependency::{Dependency, EventDependency},
dependency_task::HandleDepOutput,
};
use hotshot_types::{
consensus::{CommitmentAndMetadata, OuterConsensus},
data::{Leaf2, QuorumProposal2, QuorumProposalWrapper, VidDisperse, ViewChangeEvidence2},
message::Proposal,
simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate},
traits::{
block_contents::BlockHeader, election::Membership, node_implementation::NodeType,
signature_key::SignatureKey,
},
utils::{is_last_block_in_epoch, option_epoch_from_block_number},
vote::{Certificate, HasViewNumber},
};
use tracing::instrument;
use utils::anytrace::*;
use vbs::version::StaticVersionType;
use crate::{
events::HotShotEvent,
helpers::{broadcast_event, parent_leaf_and_state},
quorum_proposal::{UpgradeLock, Versions},
};
#[derive(PartialEq, Debug)]
pub(crate) enum ProposalDependency {
PayloadAndMetadata,
Qc,
ViewSyncCert,
TimeoutCert,
Proposal,
VidShare,
}
pub struct ProposalDependencyHandle<TYPES: NodeType, V: Versions> {
pub latest_proposed_view: TYPES::View,
pub view_number: TYPES::View,
pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
pub receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
pub instance_state: Arc<TYPES::InstanceState>,
pub membership: Arc<RwLock<TYPES::Membership>>,
pub public_key: TYPES::SignatureKey,
pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
pub consensus: OuterConsensus<TYPES>,
pub timeout: u64,
pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
pub upgrade_lock: UpgradeLock<TYPES, V>,
pub id: u64,
pub view_start_time: Instant,
pub highest_qc: QuorumCertificate2<TYPES>,
pub epoch_height: u64,
}
impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
async fn wait_for_qc_event(
&self,
rx: &mut Receiver<Arc<HotShotEvent<TYPES>>>,
) -> Option<QuorumCertificate2<TYPES>> {
while let Ok(event) = rx.recv_direct().await {
if let HotShotEvent::HighQcRecv(qc, _sender) = event.as_ref() {
let membership_reader = self.membership.read().await;
let membership_stake_table = membership_reader.stake_table(qc.data.epoch);
let membership_success_threshold =
membership_reader.success_threshold(qc.data.epoch);
drop(membership_reader);
if qc
.is_valid_cert(
membership_stake_table,
membership_success_threshold,
&self.upgrade_lock,
)
.await
.is_ok()
{
return Some(qc.clone());
}
}
}
None
}
async fn wait_for_highest_qc(&mut self) {
tracing::error!("waiting for QC");
if self
.upgrade_lock
.version(self.view_number)
.await
.is_ok_and(|version| version < V::Epochs::VERSION)
{
return;
}
let wait_duration = Duration::from_millis(self.timeout / 2);
while self.view_start_time.elapsed() < wait_duration {
let Some(time_spent) = Instant::now().checked_duration_since(self.view_start_time)
else {
return;
};
let Some(time_left) = wait_duration.checked_sub(time_spent) else {
return;
};
let Ok(maybe_qc) = tokio::time::timeout(
time_left,
self.wait_for_qc_event(&mut self.receiver.clone()),
)
.await
else {
return;
};
let Some(qc) = maybe_qc else {
continue;
};
if qc.view_number() > self.highest_qc.view_number() {
self.highest_qc = qc;
}
}
}
async fn get_next_epoch_qc(
&self,
high_qc: &QuorumCertificate2<TYPES>,
) -> Option<NextEpochQuorumCertificate2<TYPES>> {
tracing::debug!("getting the next epoch QC");
if self.upgrade_lock.version_infallible(self.view_number).await < V::Epochs::VERSION {
return None;
}
if let Some(next_epoch_qc) = self.consensus.read().await.next_epoch_high_qc() {
if next_epoch_qc.data.leaf_commit == high_qc.data.leaf_commit {
return Some(next_epoch_qc.clone());
}
};
let wait_duration = Duration::from_millis(self.timeout / 2);
let Some(time_spent) = Instant::now().checked_duration_since(self.view_start_time) else {
return None;
};
let Some(time_left) = wait_duration.checked_sub(time_spent) else {
return None;
};
let receiver = self.receiver.clone();
let Ok(Some(event)) = tokio::time::timeout(time_left, async move {
let this_epoch_high_qc = high_qc.clone();
EventDependency::new(
receiver,
Box::new(move |event| {
let event = event.as_ref();
if let HotShotEvent::NextEpochQc2Formed(Either::Left(qc)) = event {
qc.data.leaf_commit == this_epoch_high_qc.data.leaf_commit
} else {
false
}
}),
)
.completed()
.await
})
.await
else {
if let Some(next_epoch_qc) = self.consensus.read().await.next_epoch_high_qc() {
if next_epoch_qc.data.leaf_commit == high_qc.data.leaf_commit {
return Some(next_epoch_qc.clone());
}
};
return None;
};
let HotShotEvent::NextEpochQc2Formed(Either::Left(qc)) = event.as_ref() else {
return None;
};
Some(qc.clone())
}
#[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
async fn publish_proposal(
&self,
commitment_and_metadata: CommitmentAndMetadata<TYPES>,
vid_share: Proposal<TYPES, VidDisperse<TYPES>>,
view_change_evidence: Option<ViewChangeEvidence2<TYPES>>,
formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
decided_upgrade_certificate: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
parent_qc: QuorumCertificate2<TYPES>,
) -> Result<()> {
let (parent_leaf, state) = parent_leaf_and_state(
&self.sender,
&self.receiver,
Arc::clone(&self.membership),
self.public_key.clone(),
self.private_key.clone(),
OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
&self.upgrade_lock,
parent_qc.view_number(),
self.epoch_height,
)
.await?;
let mut upgrade_certificate = parent_leaf
.upgrade_certificate()
.or(formed_upgrade_certificate);
if let Some(cert) = upgrade_certificate.clone() {
if cert
.is_relevant(self.view_number, Arc::clone(&decided_upgrade_certificate))
.await
.is_err()
{
upgrade_certificate = None;
}
}
let proposal_certificate = view_change_evidence
.as_ref()
.filter(|cert| cert.is_valid_for_view(&self.view_number))
.cloned();
ensure!(
commitment_and_metadata.block_view == self.view_number,
"Cannot propose because our VID payload commitment and metadata is for an older view."
);
let version = self.upgrade_lock.version(self.view_number).await?;
let builder_commitment = commitment_and_metadata.builder_commitment.clone();
let metadata = commitment_and_metadata.metadata.clone();
let block_header = if version >= V::Epochs::VERSION
&& self.consensus.read().await.is_qc_forming_eqc(&parent_qc)
{
tracing::info!("Reached end of epoch. Proposing the same block again to form an eQC.");
let block_header = parent_leaf.block_header().clone();
tracing::debug!(
"Proposing block no. {} to form the eQC.",
block_header.block_number()
);
block_header
} else if version < V::Marketplace::VERSION {
TYPES::BlockHeader::new_legacy(
state.as_ref(),
self.instance_state.as_ref(),
&parent_leaf,
commitment_and_metadata.commitment,
builder_commitment,
metadata,
commitment_and_metadata.fees.first().clone(),
vid_share.data.vid_common_ref().clone(),
version,
)
.await
.wrap()
.context(warn!("Failed to construct legacy block header"))?
} else {
TYPES::BlockHeader::new_marketplace(
state.as_ref(),
self.instance_state.as_ref(),
&parent_leaf,
commitment_and_metadata.commitment,
commitment_and_metadata.builder_commitment,
commitment_and_metadata.metadata,
commitment_and_metadata.fees.to_vec(),
*self.view_number,
vid_share.data.vid_common_ref().clone(),
commitment_and_metadata.auction_result,
version,
)
.await
.wrap()
.context(warn!("Failed to construct marketplace block header"))?
};
let epoch = option_epoch_from_block_number::<TYPES>(
version >= V::Epochs::VERSION,
block_header.block_number(),
self.epoch_height,
);
if self
.membership
.read()
.await
.leader(self.view_number, epoch)?
!= self.public_key
{
tracing::debug!(
"We are not the leader in the epoch for which we are about to propose. Do not send the quorum proposal."
);
return Ok(());
}
let next_epoch_qc = if self
.consensus
.read()
.await
.is_leaf_for_last_block(parent_qc.data.leaf_commit)
{
self.get_next_epoch_qc(&parent_qc).await
} else {
None
};
let next_drb_result =
if is_last_block_in_epoch(block_header.block_number(), self.epoch_height) {
if let Some(epoch_val) = &epoch {
self.consensus
.read()
.await
.drb_seeds_and_results
.results
.get(epoch_val)
.copied()
} else {
None
}
} else {
None
};
let proposal = QuorumProposalWrapper {
proposal: QuorumProposal2 {
block_header,
view_number: self.view_number,
epoch,
justify_qc: parent_qc,
next_epoch_justify_qc: next_epoch_qc,
upgrade_certificate,
view_change_evidence: proposal_certificate,
next_drb_result,
},
};
let proposed_leaf = Leaf2::from_quorum_proposal(&proposal);
ensure!(
proposed_leaf.parent_commitment() == parent_leaf.commit(),
"Proposed leaf parent does not equal high qc"
);
let signature =
TYPES::SignatureKey::sign(&self.private_key, proposed_leaf.commit().as_ref())
.wrap()
.context(error!("Failed to compute proposed_leaf.commit()"))?;
let message = Proposal {
data: proposal,
signature,
_pd: PhantomData,
};
tracing::debug!(
"Sending proposal for view {:?}",
proposed_leaf.view_number(),
);
broadcast_event(
Arc::new(HotShotEvent::QuorumProposalSend(
message.clone(),
self.public_key.clone(),
)),
&self.sender,
)
.await;
Ok(())
}
}
impl<TYPES: NodeType, V: Versions> HandleDepOutput for ProposalDependencyHandle<TYPES, V> {
type Output = Vec<Vec<Vec<Arc<HotShotEvent<TYPES>>>>>;
#[allow(clippy::no_effect_underscore_binding, clippy::too_many_lines)]
async fn handle_dep_result(mut self, res: Self::Output) {
let mut commit_and_metadata: Option<CommitmentAndMetadata<TYPES>> = None;
let mut timeout_certificate = None;
let mut view_sync_finalize_cert = None;
let mut vid_share = None;
let mut parent_qc = None;
for event in res.iter().flatten().flatten() {
match event.as_ref() {
HotShotEvent::SendPayloadCommitmentAndMetadata(
payload_commitment,
builder_commitment,
metadata,
view,
fees,
auction_result,
) => {
commit_and_metadata = Some(CommitmentAndMetadata {
commitment: *payload_commitment,
builder_commitment: builder_commitment.clone(),
metadata: metadata.clone(),
fees: fees.clone(),
block_view: *view,
auction_result: auction_result.clone(),
});
}
HotShotEvent::Qc2Formed(cert) => match cert {
either::Right(timeout) => {
timeout_certificate = Some(timeout.clone());
}
either::Left(qc) => {
parent_qc = Some(qc.clone());
}
},
HotShotEvent::ViewSyncFinalizeCertificateRecv(cert) => {
view_sync_finalize_cert = Some(cert.clone());
}
HotShotEvent::VidDisperseSend(share, _) => {
vid_share = Some(share.clone());
}
_ => {}
}
}
let Ok(version) = self.upgrade_lock.version(self.view_number).await else {
tracing::error!(
"Failed to get version for view {:?}, not proposing",
self.view_number
);
return;
};
let parent_qc = if let Some(qc) = parent_qc {
qc
} else if version < V::Epochs::VERSION {
self.consensus.read().await.high_qc().clone()
} else {
self.wait_for_highest_qc().await;
self.highest_qc.clone()
};
if commit_and_metadata.is_none() {
tracing::error!(
"Somehow completed the proposal dependency task without a commitment and metadata"
);
return;
}
if vid_share.is_none() {
tracing::error!("Somehow completed the proposal dependency task without a VID share");
return;
}
let proposal_cert = if let Some(view_sync_cert) = view_sync_finalize_cert {
Some(ViewChangeEvidence2::ViewSync(view_sync_cert))
} else {
timeout_certificate.map(ViewChangeEvidence2::Timeout)
};
if let Err(e) = self
.publish_proposal(
commit_and_metadata.unwrap(),
vid_share.unwrap(),
proposal_cert,
self.formed_upgrade_certificate.clone(),
Arc::clone(&self.upgrade_lock.decided_upgrade_certificate),
parent_qc,
)
.await
{
tracing::error!("Failed to publish proposal; error = {e:#}");
}
}
}