#![allow(unused_imports)]
use std::{collections::BTreeMap, sync::Arc};
use anyhow::{bail, Result};
use async_broadcast::{broadcast, Receiver, Sender};
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use async_trait::async_trait;
use futures::future::join_all;
use hotshot_task::task::{Task, TaskState};
use hotshot_types::{
consensus::{Consensus, OuterConsensus},
data::{Leaf, ViewChangeEvidence},
event::Event,
message::UpgradeLock,
simple_certificate::UpgradeCertificate,
traits::{
node_implementation::{NodeImplementation, NodeType, Versions},
signature_key::SignatureKey,
},
vote::HasViewNumber,
};
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
use tracing::{debug, error, info, instrument, warn};
use vbs::version::Version;
use self::handlers::handle_quorum_proposal_recv;
use crate::{
events::{HotShotEvent, ProposalMissing},
helpers::{broadcast_event, cancel_task, parent_leaf_and_state},
};
mod handlers;
pub struct QuorumProposalRecvTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
pub public_key: TYPES::SignatureKey,
pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
pub consensus: OuterConsensus<TYPES>,
pub cur_view: TYPES::Time,
pub cur_view_time: i64,
pub network: Arc<I::Network>,
pub quorum_membership: Arc<TYPES::Membership>,
pub timeout_membership: Arc<TYPES::Membership>,
pub timeout_task: JoinHandle<()>,
pub timeout: u64,
pub round_start_delay: u64,
pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
pub storage: Arc<RwLock<I::Storage>>,
pub proposal_cert: Option<ViewChangeEvidence<TYPES>>,
pub spawned_tasks: BTreeMap<TYPES::Time, Vec<JoinHandle<()>>>,
pub instance_state: Arc<TYPES::InstanceState>,
pub id: u64,
pub upgrade_lock: UpgradeLock<TYPES, V>,
}
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
QuorumProposalRecvTaskState<TYPES, I, V>
{
pub async fn cancel_tasks(&mut self, view: TYPES::Time) {
let keep = self.spawned_tasks.split_off(&view);
let mut cancel = Vec::new();
while let Some((_, tasks)) = self.spawned_tasks.pop_first() {
let mut to_cancel = tasks.into_iter().map(cancel_task).collect();
cancel.append(&mut to_cancel);
}
self.spawned_tasks = keep;
join_all(cancel).await;
}
#[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "Consensus replica task", level = "error")]
#[allow(unused_variables)]
pub async fn handle(
&mut self,
event: Arc<HotShotEvent<TYPES>>,
event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
) {
#[cfg(feature = "dependency-tasks")]
if let HotShotEvent::QuorumProposalRecv(proposal, sender) = event.as_ref() {
match handle_quorum_proposal_recv(
proposal,
sender,
&event_sender,
&event_receiver,
self,
)
.await
{
Ok(()) => {
self.cancel_tasks(proposal.data.view_number()).await;
}
Err(e) => debug!(?e, "Failed to validate the proposal"),
}
}
}
}
#[async_trait]
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
for QuorumProposalRecvTaskState<TYPES, I, V>
{
type Event = HotShotEvent<TYPES>;
async fn handle_event(
&mut self,
event: Arc<Self::Event>,
sender: &Sender<Arc<Self::Event>>,
receiver: &Receiver<Arc<Self::Event>>,
) -> Result<()> {
self.handle(event, sender.clone(), receiver.clone()).await;
Ok(())
}
async fn cancel_subtasks(&mut self) {
while !self.spawned_tasks.is_empty() {
let Some((_, handles)) = self.spawned_tasks.pop_first() else {
break;
};
for handle in handles {
#[cfg(async_executor_impl = "async-std")]
handle.cancel().await;
#[cfg(async_executor_impl = "tokio")]
handle.abort();
}
}
}
}