use std::{collections::BTreeMap, sync::Arc};
use anyhow::Result;
use async_broadcast::{Receiver, Sender};
use async_compatibility_layer::art::async_spawn;
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use async_trait::async_trait;
use futures::future::join_all;
use hotshot_task::task::TaskState;
use hotshot_types::{
consensus::{CommitmentAndMetadata, OuterConsensus},
data::{QuorumProposal, VidDisperseShare, ViewChangeEvidence},
event::{Event, EventType},
message::{Proposal, UpgradeLock},
simple_certificate::{QuorumCertificate, TimeoutCertificate, UpgradeCertificate},
simple_vote::{QuorumVote, TimeoutData, TimeoutVote},
traits::{
election::Membership,
node_implementation::{NodeImplementation, NodeType, Versions},
signature_key::SignatureKey,
storage::Storage,
},
vid::vid_scheme,
vote::{Certificate, HasViewNumber},
};
use jf_vid::VidScheme;
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
use tracing::{debug, error, info, instrument, warn};
use crate::{
consensus::handlers::{
handle_quorum_proposal_recv, handle_quorum_proposal_validated, publish_proposal_if_able,
update_state_and_vote_if_able, VoteInfo,
},
events::HotShotEvent,
helpers::{broadcast_event, cancel_task, update_view, DONT_SEND_VIEW_CHANGE_EVENT},
vote_collection::{handle_vote, VoteCollectorsMap},
};
pub(crate) mod handlers;
pub struct ConsensusTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
pub public_key: TYPES::SignatureKey,
pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
pub consensus: OuterConsensus<TYPES>,
pub instance_state: Arc<TYPES::InstanceState>,
pub timeout: u64,
pub round_start_delay: u64,
pub cur_view: TYPES::Time,
pub cur_view_time: i64,
pub payload_commitment_and_metadata: Option<CommitmentAndMetadata<TYPES>>,
pub network: Arc<I::Network>,
pub timeout_membership: Arc<TYPES::Membership>,
pub quorum_membership: Arc<TYPES::Membership>,
pub da_membership: Arc<TYPES::Membership>,
pub vote_collectors: VoteCollectorsMap<TYPES, QuorumVote<TYPES>, QuorumCertificate<TYPES>, V>,
pub timeout_vote_collectors:
VoteCollectorsMap<TYPES, TimeoutVote<TYPES>, TimeoutCertificate<TYPES>, V>,
pub timeout_task: JoinHandle<()>,
pub spawned_tasks: BTreeMap<TYPES::Time, Vec<JoinHandle<()>>>,
pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
pub proposal_cert: Option<ViewChangeEvidence<TYPES>>,
pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
pub current_proposal: Option<QuorumProposal<TYPES>>,
pub id: u64,
pub storage: Arc<RwLock<I::Storage>>,
pub upgrade_lock: UpgradeLock<TYPES, V>,
}
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusTaskState<TYPES, I, V> {
pub async fn cancel_tasks(&mut self, view: TYPES::Time) {
let keep = self.spawned_tasks.split_off(&view);
let mut cancel = Vec::new();
while let Some((_, tasks)) = self.spawned_tasks.pop_first() {
let mut to_cancel = tasks.into_iter().map(cancel_task).collect();
cancel.append(&mut to_cancel);
}
self.spawned_tasks = keep;
join_all(cancel).await;
}
fn validate_disperse(&self, disperse: &Proposal<TYPES, VidDisperseShare<TYPES>>) -> bool {
let view = disperse.data.view_number();
let payload_commitment = disperse.data.payload_commitment;
if !self
.quorum_membership
.leader(view)
.validate(&disperse.signature, payload_commitment.as_ref())
&& !self
.public_key
.validate(&disperse.signature, payload_commitment.as_ref())
{
let mut validated = false;
for da_member in self.da_membership.committee_members(view) {
if da_member.validate(&disperse.signature, payload_commitment.as_ref()) {
validated = true;
break;
}
}
if !validated {
return false;
}
}
matches!(
vid_scheme(self.quorum_membership.total_nodes()).verify_share(
&disperse.data.share,
&disperse.data.common,
&payload_commitment,
),
Ok(Ok(()))
)
}
#[instrument(skip_all, target = "ConsensusTaskState", fields(id = self.id, view = *self.cur_view))]
async fn publish_proposal(
&mut self,
view: TYPES::Time,
event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
) -> Result<()> {
let create_and_send_proposal_handle = publish_proposal_if_able(
view,
event_sender,
event_receiver,
Arc::clone(&self.quorum_membership),
self.public_key.clone(),
self.private_key.clone(),
OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
self.round_start_delay,
self.formed_upgrade_certificate.clone(),
self.upgrade_lock.clone(),
self.payload_commitment_and_metadata.clone(),
self.proposal_cert.clone(),
Arc::clone(&self.instance_state),
self.id,
)
.await?;
self.spawned_tasks
.entry(view)
.or_default()
.push(create_and_send_proposal_handle);
Ok(())
}
#[instrument(skip_all, fields(id = self.id, view = *self.cur_view), target = "ConsensusTaskState")]
async fn spawn_vote_task(
&mut self,
view: TYPES::Time,
event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
) {
let Some(proposal) = self.current_proposal.clone() else {
return;
};
if proposal.view_number() != view {
return;
}
let upgrade = self.upgrade_lock.clone();
let pub_key = self.public_key.clone();
let priv_key = self.private_key.clone();
let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
let storage = Arc::clone(&self.storage);
let quorum_mem = Arc::clone(&self.quorum_membership);
let da_mem = Arc::clone(&self.da_membership);
let instance_state = Arc::clone(&self.instance_state);
let id = self.id;
let handle = async_spawn(async move {
let upgrade_lock = upgrade.clone();
update_state_and_vote_if_able::<TYPES, I, V>(
view,
proposal,
pub_key,
priv_key.clone(),
consensus,
storage,
quorum_mem,
instance_state,
VoteInfo {
private_key: priv_key,
upgrade_lock: upgrade,
da_membership: da_mem,
event_sender,
event_receiver,
},
id,
&upgrade_lock,
)
.await;
});
self.spawned_tasks.entry(view).or_default().push(handle);
}
#[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "Consensus replica task", level = "error", target = "ConsensusTaskState")]
pub async fn handle(
&mut self,
event: Arc<HotShotEvent<TYPES>>,
event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
) {
match event.as_ref() {
HotShotEvent::QuorumProposalRecv(proposal, sender) => {
debug!("proposal recv view: {:?}", proposal.data.view_number());
match handle_quorum_proposal_recv(
proposal,
sender,
event_sender.clone(),
event_receiver.clone(),
self,
)
.await
{
Ok(Some(current_proposal)) => {
let view = current_proposal.view_number();
self.current_proposal = Some(current_proposal);
self.spawn_vote_task(view, event_sender, event_receiver)
.await;
}
Ok(None) => {}
Err(e) => debug!("Failed to propose {e:#}"),
}
}
HotShotEvent::QuorumProposalValidated(proposal, _) => {
debug!("proposal validated view: {:?}", proposal.view_number());
if let Err(e) = handle_quorum_proposal_validated(
proposal,
event_sender.clone(),
event_receiver.clone(),
self,
)
.await
{
warn!("Failed to handle QuorumProposalValidated event {e:#}");
}
}
HotShotEvent::QuorumVoteRecv(ref vote) => {
debug!("Received quorum vote: {:?}", vote.view_number());
if self.quorum_membership.leader(vote.view_number() + 1) != self.public_key {
error!(
"We are not the leader for view {} are we the leader for view + 1? {}",
*vote.view_number() + 1,
self.quorum_membership.leader(vote.view_number() + 2) == self.public_key
);
return;
}
handle_vote(
&mut self.vote_collectors,
vote,
self.public_key.clone(),
&self.quorum_membership,
self.id,
&event,
&event_sender,
&self.upgrade_lock,
)
.await;
}
HotShotEvent::TimeoutVoteRecv(ref vote) => {
if self.timeout_membership.leader(vote.view_number() + 1) != self.public_key {
error!(
"We are not the leader for view {} are we the leader for view + 1? {}",
*vote.view_number() + 1,
self.timeout_membership.leader(vote.view_number() + 2) == self.public_key
);
return;
}
handle_vote(
&mut self.timeout_vote_collectors,
vote,
self.public_key.clone(),
&self.quorum_membership,
self.id,
&event,
&event_sender,
&self.upgrade_lock,
)
.await;
}
HotShotEvent::QcFormed(cert) => match cert {
either::Right(qc) => {
self.proposal_cert = Some(ViewChangeEvidence::Timeout(qc.clone()));
debug!(
"Attempting to publish proposal after forming a TC for view {}",
*qc.view_number
);
if let Err(e) = self
.publish_proposal(qc.view_number + 1, event_sender, event_receiver)
.await
{
debug!("Failed to propose; error = {e:?}");
};
}
either::Left(qc) => {
if let Err(e) = self.storage.write().await.update_high_qc(qc.clone()).await {
error!("Failed to store High QC of QC we formed. Error: {:?}", e);
}
if let Err(e) = self.consensus.write().await.update_high_qc(qc.clone()) {
tracing::trace!("{e:?}");
}
debug!(
"Attempting to publish proposal after forming a QC for view {}",
*qc.view_number
);
if let Err(e) = self
.publish_proposal(qc.view_number + 1, event_sender, event_receiver)
.await
{
debug!("Failed to propose; error = {e:?}");
};
}
},
#[cfg(not(feature = "dependency-tasks"))]
HotShotEvent::UpgradeCertificateFormed(cert) => {
debug!(
"Upgrade certificate received for view {}!",
*cert.view_number
);
if cert.data.decide_by >= self.cur_view + 3 {
debug!("Updating current formed_upgrade_certificate");
self.formed_upgrade_certificate = Some(cert.clone());
}
}
HotShotEvent::DaCertificateRecv(cert) => {
debug!("DAC Received for view {}!", *cert.view_number);
let view = cert.view_number;
self.consensus
.write()
.await
.update_saved_da_certs(view, cert.clone());
let Some(proposal) = self.current_proposal.clone() else {
return;
};
if proposal.view_number() != view {
return;
}
self.spawn_vote_task(view, event_sender, event_receiver)
.await;
}
HotShotEvent::VidShareRecv(disperse) => {
let view = disperse.data.view_number();
debug!(
"VID disperse received for view: {:?} in consensus task",
view
);
if view + 1 < self.cur_view {
info!("Throwing away VID disperse data that is more than one view older");
return;
}
debug!("VID disperse data is not more than one view older.");
if !self.validate_disperse(disperse) {
warn!("Failed to validated the VID dispersal/share sig.");
return;
}
self.consensus
.write()
.await
.update_vid_shares(view, disperse.clone());
if disperse.data.recipient_key != self.public_key {
return;
}
let Some(proposal) = self.current_proposal.clone() else {
return;
};
if proposal.view_number() != view {
return;
}
self.spawn_vote_task(view, event_sender.clone(), event_receiver.clone())
.await;
}
HotShotEvent::ViewChange(new_view) => {
let new_view = *new_view;
tracing::trace!("View Change event for view {} in consensus task", *new_view);
let old_view_number = self.cur_view;
if let Some(cert) = self
.upgrade_lock
.decided_upgrade_certificate
.read()
.await
.clone()
{
if new_view == cert.data.new_version_first_view {
error!(
"Version upgraded based on a decided upgrade cert: {:?}",
cert
);
}
}
if let Some(commitment_and_metadata) = &self.payload_commitment_and_metadata {
if commitment_and_metadata.block_view < old_view_number {
self.payload_commitment_and_metadata = None;
}
}
if let Err(e) = update_view::<TYPES>(
new_view,
&event_sender,
self.timeout,
OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
&mut self.cur_view,
&mut self.cur_view_time,
&mut self.timeout_task,
&self.output_event_stream,
DONT_SEND_VIEW_CHANGE_EVENT,
self.quorum_membership.leader(old_view_number) == self.public_key,
)
.await
{
tracing::trace!("Failed to update view; error = {e}");
return;
}
}
HotShotEvent::Timeout(view) => {
let view = *view;
if self.cur_view >= view {
return;
}
if !self.timeout_membership.has_stake(&self.public_key) {
debug!(
"We were not chosen for consensus committee on {:?}",
self.cur_view
);
return;
}
let Ok(vote) = TimeoutVote::create_signed_vote(
TimeoutData { view },
view,
&self.public_key,
&self.private_key,
&self.upgrade_lock,
)
.await
else {
error!("Failed to sign TimeoutData!");
return;
};
broadcast_event(Arc::new(HotShotEvent::TimeoutVoteSend(vote)), &event_sender).await;
broadcast_event(
Event {
view_number: view,
event: EventType::ViewTimeout { view_number: view },
},
&self.output_event_stream,
)
.await;
debug!(
"We did not receive evidence for view {} in time, sending timeout vote for that view!",
*view
);
broadcast_event(
Event {
view_number: view,
event: EventType::ReplicaViewTimeout { view_number: view },
},
&self.output_event_stream,
)
.await;
let consensus = self.consensus.read().await;
consensus.metrics.number_of_timeouts.add(1);
if self.quorum_membership.leader(view) == self.public_key {
consensus.metrics.number_of_timeouts_as_leader.add(1);
}
}
HotShotEvent::SendPayloadCommitmentAndMetadata(
payload_commitment,
builder_commitment,
metadata,
view,
fees,
auction_result,
) => {
let view = *view;
debug!(
"got commit and meta {:?}, view {:?}",
payload_commitment, view
);
self.payload_commitment_and_metadata = Some(CommitmentAndMetadata {
commitment: *payload_commitment,
builder_commitment: builder_commitment.clone(),
metadata: metadata.clone(),
fees: fees.clone(),
block_view: view,
auction_result: auction_result.clone(),
});
if self.quorum_membership.leader(view) == self.public_key
&& self.consensus.read().await.high_qc().view_number() + 1 == view
{
if let Err(e) = self
.publish_proposal(view, event_sender.clone(), event_receiver.clone())
.await
{
error!("Failed to propose; error = {e:?}");
};
}
if let Some(cert) = &self.proposal_cert {
if !cert.is_valid_for_view(&view) {
self.proposal_cert = None;
info!("Failed to propose off SendPayloadCommitmentAndMetadata because we had view change evidence, but it was not current.");
return;
}
match cert {
ViewChangeEvidence::Timeout(tc) => {
if self.quorum_membership.leader(tc.view_number() + 1)
== self.public_key
{
if let Err(e) = self
.publish_proposal(view, event_sender, event_receiver)
.await
{
debug!("Failed to propose; error = {e:?}");
};
}
}
ViewChangeEvidence::ViewSync(vsc) => {
if self.quorum_membership.leader(vsc.view_number()) == self.public_key {
if let Err(e) = self
.publish_proposal(view, event_sender, event_receiver)
.await
{
debug!("Failed to propose; error = {e:?}");
};
}
}
}
}
}
HotShotEvent::ViewSyncFinalizeCertificate2Recv(certificate) => {
if !certificate
.is_valid_cert(self.quorum_membership.as_ref(), &self.upgrade_lock)
.await
{
error!(
"View Sync Finalize certificate {:?} was invalid",
certificate.date()
);
return;
}
let view = certificate.view_number;
if self.quorum_membership.leader(view) == self.public_key {
self.proposal_cert = Some(ViewChangeEvidence::ViewSync(certificate.clone()));
debug!(
"Attempting to publish proposal after forming a View Sync Finalized Cert for view {}",
*certificate.view_number
);
if let Err(e) = self
.publish_proposal(view, event_sender, event_receiver)
.await
{
debug!("Failed to propose; error = {e:?}");
};
}
}
HotShotEvent::QuorumVoteSend(vote) => {
let Some(proposal) = self.current_proposal.clone() else {
return;
};
let new_view = proposal.view_number() + 1;
let should_propose = self.quorum_membership.leader(new_view) == self.public_key
&& self.consensus.read().await.high_qc().view_number == proposal.view_number();
if should_propose {
debug!(
"Attempting to publish proposal after voting; now in view: {}",
*new_view
);
if let Err(e) = self
.publish_proposal(new_view, event_sender.clone(), event_receiver.clone())
.await
{
debug!("failed to propose e = {:?}", e);
}
}
if proposal.view_number() <= vote.view_number() {
self.current_proposal = None;
}
}
HotShotEvent::QuorumProposalSend(proposal, _) => {
if self
.payload_commitment_and_metadata
.as_ref()
.is_some_and(|p| p.block_view <= proposal.data.view_number())
{
self.payload_commitment_and_metadata = None;
}
if let Some(cert) = &self.proposal_cert {
let view = match cert {
ViewChangeEvidence::Timeout(tc) => tc.view_number() + 1,
ViewChangeEvidence::ViewSync(vsc) => vsc.view_number(),
};
if view < proposal.data.view_number() {
self.proposal_cert = None;
}
}
}
_ => {}
}
}
}
#[async_trait]
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
for ConsensusTaskState<TYPES, I, V>
{
type Event = HotShotEvent<TYPES>;
async fn handle_event(
&mut self,
event: Arc<Self::Event>,
sender: &Sender<Arc<Self::Event>>,
receiver: &Receiver<Arc<Self::Event>>,
) -> Result<()> {
self.handle(event, sender.clone(), receiver.clone()).await;
Ok(())
}
async fn cancel_subtasks(&mut self) {
while !self.spawned_tasks.is_empty() {
let Some((_, handles)) = self.spawned_tasks.pop_first() else {
break;
};
for handle in handles {
#[cfg(async_executor_impl = "async-std")]
handle.cancel().await;
#[cfg(async_executor_impl = "tokio")]
handle.abort();
}
}
}
}