use std::sync::Arc;
use async_broadcast::{Receiver, Sender};
use async_lock::RwLock;
use async_trait::async_trait;
use hotshot_task::task::TaskState;
use hotshot_types::{
consensus::OuterConsensus,
event::Event,
message::UpgradeLock,
simple_certificate::{QuorumCertificate, TimeoutCertificate},
simple_vote::{QuorumVote, TimeoutVote},
traits::{
node_implementation::{NodeImplementation, NodeType, Versions},
signature_key::SignatureKey,
},
};
use tokio::task::JoinHandle;
use tracing::instrument;
use utils::anytrace::Result;
use self::handlers::{
handle_quorum_vote_recv, handle_timeout, handle_timeout_vote_recv, handle_view_change,
};
use crate::{events::HotShotEvent, vote_collection::VoteCollectorsMap};
mod handlers;
pub struct ConsensusTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
pub public_key: TYPES::SignatureKey,
pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
pub instance_state: Arc<TYPES::InstanceState>,
pub network: Arc<I::Network>,
pub timeout_membership: Arc<TYPES::Membership>,
pub quorum_membership: Arc<TYPES::Membership>,
pub committee_membership: Arc<TYPES::Membership>,
pub vote_collectors: VoteCollectorsMap<TYPES, QuorumVote<TYPES>, QuorumCertificate<TYPES>, V>,
pub timeout_vote_collectors:
VoteCollectorsMap<TYPES, TimeoutVote<TYPES>, TimeoutCertificate<TYPES>, V>,
pub storage: Arc<RwLock<I::Storage>>,
pub cur_view: TYPES::View,
pub cur_view_time: i64,
pub cur_epoch: TYPES::Epoch,
pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
pub timeout_task: JoinHandle<()>,
pub timeout: u64,
pub consensus: OuterConsensus<TYPES>,
pub id: u64,
pub upgrade_lock: UpgradeLock<TYPES, V>,
pub epoch_height: u64,
}
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusTaskState<TYPES, I, V> {
#[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, cur_epoch = *self.cur_epoch), name = "Consensus replica task", level = "error", target = "ConsensusTaskState")]
pub async fn handle(
&mut self,
event: Arc<HotShotEvent<TYPES>>,
sender: Sender<Arc<HotShotEvent<TYPES>>>,
) -> Result<()> {
match event.as_ref() {
HotShotEvent::QuorumVoteRecv(ref vote) => {
if let Err(e) =
handle_quorum_vote_recv(vote, Arc::clone(&event), &sender, self).await
{
tracing::debug!("Failed to handle QuorumVoteRecv event; error = {e}");
}
}
HotShotEvent::TimeoutVoteRecv(ref vote) => {
if let Err(e) =
handle_timeout_vote_recv(vote, Arc::clone(&event), &sender, self).await
{
tracing::debug!("Failed to handle TimeoutVoteRecv event; error = {e}");
}
}
HotShotEvent::ViewChange(new_view_number, epoch_number) => {
if let Err(e) =
handle_view_change(*new_view_number, *epoch_number, &sender, self).await
{
tracing::trace!("Failed to handle ViewChange event; error = {e}");
}
}
HotShotEvent::Timeout(view_number) => {
if let Err(e) = handle_timeout(*view_number, &sender, self).await {
tracing::debug!("Failed to handle Timeout event; error = {e}");
}
}
_ => {}
}
Ok(())
}
}
#[async_trait]
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
for ConsensusTaskState<TYPES, I, V>
{
type Event = HotShotEvent<TYPES>;
async fn handle_event(
&mut self,
event: Arc<Self::Event>,
sender: &Sender<Arc<Self::Event>>,
_receiver: &Receiver<Arc<Self::Event>>,
) -> Result<()> {
self.handle(event, sender.clone()).await
}
fn cancel_subtasks(&mut self) {
std::mem::replace(&mut self.timeout_task, tokio::spawn(async {})).abort();
}
}