#![allow(unused_imports)]
use std::{collections::BTreeMap, sync::Arc};
use async_broadcast::{broadcast, Receiver, Sender};
use async_lock::RwLock;
use async_trait::async_trait;
use either::Either;
use futures::future::{err, join_all};
use hotshot_task::task::{Task, TaskState};
use hotshot_types::{
consensus::{Consensus, OuterConsensus},
data::{EpochNumber, Leaf, ViewChangeEvidence},
event::Event,
message::UpgradeLock,
simple_certificate::UpgradeCertificate,
traits::{
node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
signature_key::SignatureKey,
},
vote::{Certificate, HasViewNumber},
};
use tokio::task::JoinHandle;
use tracing::{debug, error, info, instrument, warn};
use utils::anytrace::{bail, Result};
use vbs::version::Version;
use self::handlers::handle_quorum_proposal_recv;
use crate::{
events::{HotShotEvent, ProposalMissing},
helpers::{broadcast_event, fetch_proposal, parent_leaf_and_state},
};
mod handlers;
pub struct QuorumProposalRecvTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
pub public_key: TYPES::SignatureKey,
pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
pub consensus: OuterConsensus<TYPES>,
pub cur_view: TYPES::View,
pub cur_epoch: TYPES::Epoch,
pub membership: Arc<RwLock<TYPES::Membership>>,
pub timeout: u64,
pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
pub storage: Arc<RwLock<I::Storage>>,
pub spawned_tasks: BTreeMap<TYPES::View, Vec<JoinHandle<()>>>,
pub id: u64,
pub upgrade_lock: UpgradeLock<TYPES, V>,
pub epoch_height: u64,
}
pub(crate) struct ValidationInfo<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
pub id: u64,
pub(crate) public_key: TYPES::SignatureKey,
pub(crate) private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
pub(crate) consensus: OuterConsensus<TYPES>,
pub membership: Arc<RwLock<TYPES::Membership>>,
pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
pub(crate) storage: Arc<RwLock<I::Storage>>,
pub(crate) upgrade_lock: UpgradeLock<TYPES, V>,
pub epoch_height: u64,
}
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
QuorumProposalRecvTaskState<TYPES, I, V>
{
pub fn cancel_tasks(&mut self, view: TYPES::View) {
let keep = self.spawned_tasks.split_off(&view);
while let Some((_, tasks)) = self.spawned_tasks.pop_first() {
for task in tasks {
task.abort();
}
}
self.spawned_tasks = keep;
}
#[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = *self.cur_epoch), name = "Consensus replica task", level = "error")]
#[allow(unused_variables)]
pub async fn handle(
&mut self,
event: Arc<HotShotEvent<TYPES>>,
event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
) {
match event.as_ref() {
HotShotEvent::QuorumProposalRecv(proposal, sender) => {
if self.consensus.read().await.cur_view() > proposal.data.view_number()
|| self.cur_view > proposal.data.view_number()
{
tracing::error!("Throwing away old proposal");
return;
}
let validation_info = ValidationInfo::<TYPES, I, V> {
id: self.id,
public_key: self.public_key.clone(),
private_key: self.private_key.clone(),
consensus: self.consensus.clone(),
membership: Arc::clone(&self.membership),
output_event_stream: self.output_event_stream.clone(),
storage: Arc::clone(&self.storage),
upgrade_lock: self.upgrade_lock.clone(),
epoch_height: self.epoch_height,
};
match handle_quorum_proposal_recv(
proposal,
sender,
&event_sender,
&event_receiver,
validation_info,
)
.await
{
Ok(()) => {}
Err(e) => debug!(?e, "Failed to validate the proposal"),
}
}
HotShotEvent::ViewChange(view, epoch) => {
if *epoch > self.cur_epoch {
self.cur_epoch = *epoch;
}
if self.cur_view >= *view {
return;
}
self.cur_view = *view;
let oldest_view_to_keep = TYPES::View::new(view.saturating_sub(1));
self.cancel_tasks(oldest_view_to_keep);
}
_ => {}
}
}
}
#[async_trait]
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
for QuorumProposalRecvTaskState<TYPES, I, V>
{
type Event = HotShotEvent<TYPES>;
async fn handle_event(
&mut self,
event: Arc<Self::Event>,
sender: &Sender<Arc<Self::Event>>,
receiver: &Receiver<Arc<Self::Event>>,
) -> Result<()> {
self.handle(event, sender.clone(), receiver.clone()).await;
Ok(())
}
fn cancel_subtasks(&mut self) {
while !self.spawned_tasks.is_empty() {
let Some((_, handles)) = self.spawned_tasks.pop_first() else {
break;
};
for handle in handles {
handle.abort();
}
}
}
}