use std::{marker::PhantomData, sync::Arc, time::Duration};
use anyhow::{ensure, Context, Result};
use async_broadcast::{Receiver, Sender};
use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_lock::RwLock;
use hotshot_task::{
dependency::{Dependency, EventDependency},
dependency_task::HandleDepOutput,
};
use hotshot_types::{
consensus::{CommitmentAndMetadata, OuterConsensus},
data::{Leaf, QuorumProposal, VidDisperse, ViewChangeEvidence},
message::Proposal,
simple_certificate::UpgradeCertificate,
traits::{
block_contents::BlockHeader, node_implementation::NodeType, signature_key::SignatureKey,
},
};
use tracing::{debug, error, instrument};
use vbs::version::StaticVersionType;
use crate::{
events::HotShotEvent,
helpers::{broadcast_event, fetch_proposal, parent_leaf_and_state},
quorum_proposal::{UpgradeLock, Versions},
};
#[derive(PartialEq, Debug)]
pub(crate) enum ProposalDependency {
PayloadAndMetadata,
Qc,
ViewSyncCert,
TimeoutCert,
Proposal,
VidShare,
}
pub struct ProposalDependencyHandle<TYPES: NodeType, V: Versions> {
pub latest_proposed_view: TYPES::Time,
pub view_number: TYPES::Time,
pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
pub receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
pub instance_state: Arc<TYPES::InstanceState>,
pub quorum_membership: Arc<TYPES::Membership>,
pub public_key: TYPES::SignatureKey,
pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
pub round_start_delay: u64,
pub consensus: OuterConsensus<TYPES>,
pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
pub upgrade_lock: UpgradeLock<TYPES, V>,
pub id: u64,
}
impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
#[instrument(skip_all, target = "ProposalDependencyHandle", fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
async fn publish_proposal(
&self,
commitment_and_metadata: CommitmentAndMetadata<TYPES>,
vid_share: Proposal<TYPES, VidDisperse<TYPES>>,
view_change_evidence: Option<ViewChangeEvidence<TYPES>>,
formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
decided_upgrade_certificate: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
) -> Result<()> {
let (parent_leaf, state) = parent_leaf_and_state(
self.view_number,
&self.sender,
&self.receiver,
Arc::clone(&self.quorum_membership),
self.public_key.clone(),
self.private_key.clone(),
OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
&self.upgrade_lock,
)
.await?;
let mut upgrade_certificate = parent_leaf
.upgrade_certificate()
.or(formed_upgrade_certificate);
if let Some(cert) = upgrade_certificate.clone() {
if cert
.is_relevant(self.view_number, Arc::clone(&decided_upgrade_certificate))
.await
.is_err()
{
upgrade_certificate = None;
}
}
let proposal_certificate = view_change_evidence
.as_ref()
.filter(|cert| cert.is_valid_for_view(&self.view_number))
.cloned();
ensure!(
commitment_and_metadata.block_view == self.view_number,
"Cannot propose because our VID payload commitment and metadata is for an older view."
);
let version = self.upgrade_lock.version(self.view_number).await?;
let block_header = if version < V::Marketplace::VERSION {
TYPES::BlockHeader::new_legacy(
state.as_ref(),
self.instance_state.as_ref(),
&parent_leaf,
commitment_and_metadata.commitment,
commitment_and_metadata.builder_commitment,
commitment_and_metadata.metadata,
commitment_and_metadata.fees.first().clone(),
vid_share.data.common.clone(),
version,
)
.await
.context("Failed to construct legacy block header")?
} else {
TYPES::BlockHeader::new_marketplace(
state.as_ref(),
self.instance_state.as_ref(),
&parent_leaf,
commitment_and_metadata.commitment,
commitment_and_metadata.builder_commitment,
commitment_and_metadata.metadata,
commitment_and_metadata.fees.to_vec(),
vid_share.data.common.clone(),
commitment_and_metadata.auction_result,
version,
)
.await
.context("Failed to construct marketplace block header")?
};
let proposal = QuorumProposal {
block_header,
view_number: self.view_number,
justify_qc: self.consensus.read().await.high_qc().clone(),
upgrade_certificate,
proposal_certificate,
};
let proposed_leaf = Leaf::from_quorum_proposal(&proposal);
ensure!(
proposed_leaf.parent_commitment() == parent_leaf.commit(&self.upgrade_lock).await,
"Proposed leaf parent does not equal high qc"
);
let signature = TYPES::SignatureKey::sign(
&self.private_key,
proposed_leaf.commit(&self.upgrade_lock).await.as_ref(),
)
.context("Failed to compute proposed_leaf.commit()")?;
let message = Proposal {
data: proposal,
signature,
_pd: PhantomData,
};
debug!(
"Sending proposal for view {:?}",
proposed_leaf.view_number(),
);
async_sleep(Duration::from_millis(self.round_start_delay)).await;
broadcast_event(
Arc::new(HotShotEvent::QuorumProposalSend(
message.clone(),
self.public_key.clone(),
)),
&self.sender,
)
.await;
Ok(())
}
}
impl<TYPES: NodeType, V: Versions> HandleDepOutput for ProposalDependencyHandle<TYPES, V> {
type Output = Vec<Vec<Vec<Arc<HotShotEvent<TYPES>>>>>;
#[allow(clippy::no_effect_underscore_binding, clippy::too_many_lines)]
async fn handle_dep_result(self, res: Self::Output) {
let high_qc_view_number = self.consensus.read().await.high_qc().view_number;
if !self
.consensus
.read()
.await
.validated_state_map()
.contains_key(&high_qc_view_number)
{
let membership = Arc::clone(&self.quorum_membership);
let event_sender = self.sender.clone();
let event_receiver = self.receiver.clone();
let sender_public_key = self.public_key.clone();
let sender_private_key = self.private_key.clone();
let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
let upgrade_lock = self.upgrade_lock.clone();
async_spawn(async move {
fetch_proposal(
high_qc_view_number,
event_sender,
event_receiver,
membership,
consensus,
sender_public_key,
sender_private_key,
&upgrade_lock,
)
.await
});
EventDependency::new(
self.receiver.clone(),
Box::new(move |event| {
let event = event.as_ref();
if let HotShotEvent::ValidatedStateUpdated(view_number, _) = event {
*view_number == high_qc_view_number
} else {
false
}
}),
)
.completed()
.await;
}
let mut commit_and_metadata: Option<CommitmentAndMetadata<TYPES>> = None;
let mut timeout_certificate = None;
let mut view_sync_finalize_cert = None;
let mut vid_share = None;
for event in res.iter().flatten().flatten() {
match event.as_ref() {
HotShotEvent::SendPayloadCommitmentAndMetadata(
payload_commitment,
builder_commitment,
metadata,
view,
fees,
auction_result,
) => {
commit_and_metadata = Some(CommitmentAndMetadata {
commitment: *payload_commitment,
builder_commitment: builder_commitment.clone(),
metadata: metadata.clone(),
fees: fees.clone(),
block_view: *view,
auction_result: auction_result.clone(),
});
}
HotShotEvent::QcFormed(cert) => match cert {
either::Right(timeout) => {
timeout_certificate = Some(timeout.clone());
}
either::Left(_) => {
}
},
HotShotEvent::ViewSyncFinalizeCertificate2Recv(cert) => {
view_sync_finalize_cert = Some(cert.clone());
}
HotShotEvent::VidDisperseSend(share, _) => {
vid_share = Some(share.clone());
}
_ => {}
}
}
if commit_and_metadata.is_none() {
error!(
"Somehow completed the proposal dependency task without a commitment and metadata"
);
return;
}
if vid_share.is_none() {
error!("Somehow completed the proposal dependency task without a VID share");
return;
}
let proposal_cert = if let Some(view_sync_cert) = view_sync_finalize_cert {
Some(ViewChangeEvidence::ViewSync(view_sync_cert))
} else {
timeout_certificate.map(ViewChangeEvidence::Timeout)
};
if let Err(e) = self
.publish_proposal(
commit_and_metadata.unwrap(),
vid_share.unwrap(),
proposal_cert,
self.formed_upgrade_certificate.clone(),
Arc::clone(&self.upgrade_lock.decided_upgrade_certificate),
)
.await
{
error!("Failed to publish proposal; error = {e}");
}
}
}