use std::sync::Arc;
use anyhow::Result;
use async_broadcast::{Receiver, Sender};
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use async_trait::async_trait;
use hotshot_task::task::TaskState;
use hotshot_types::{
consensus::OuterConsensus,
event::Event,
message::UpgradeLock,
simple_certificate::{QuorumCertificate, TimeoutCertificate},
simple_vote::{QuorumVote, TimeoutVote},
traits::{
node_implementation::{NodeImplementation, NodeType, Versions},
signature_key::SignatureKey,
},
};
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
use tracing::instrument;
use self::handlers::{
handle_quorum_vote_recv, handle_timeout, handle_timeout_vote_recv, handle_view_change,
};
use crate::{events::HotShotEvent, vote_collection::VoteCollectorsMap};
mod handlers;
pub struct Consensus2TaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
pub public_key: TYPES::SignatureKey,
pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
pub instance_state: Arc<TYPES::InstanceState>,
pub network: Arc<I::Network>,
pub timeout_membership: Arc<TYPES::Membership>,
pub quorum_membership: Arc<TYPES::Membership>,
pub committee_membership: Arc<TYPES::Membership>,
pub vote_collectors: VoteCollectorsMap<TYPES, QuorumVote<TYPES>, QuorumCertificate<TYPES>, V>,
pub timeout_vote_collectors:
VoteCollectorsMap<TYPES, TimeoutVote<TYPES>, TimeoutCertificate<TYPES>, V>,
pub storage: Arc<RwLock<I::Storage>>,
pub cur_view: TYPES::Time,
pub cur_view_time: i64,
pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
pub timeout_task: JoinHandle<()>,
pub timeout: u64,
pub consensus: OuterConsensus<TYPES>,
pub last_decided_view: TYPES::Time,
pub id: u64,
pub upgrade_lock: UpgradeLock<TYPES, V>,
}
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> Consensus2TaskState<TYPES, I, V> {
#[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, last_decided_view = *self.last_decided_view), name = "Consensus replica task", level = "error", target = "Consensus2TaskState")]
pub async fn handle(
&mut self,
event: Arc<HotShotEvent<TYPES>>,
sender: Sender<Arc<HotShotEvent<TYPES>>>,
) {
match event.as_ref() {
HotShotEvent::QuorumVoteRecv(ref vote) => {
if let Err(e) =
handle_quorum_vote_recv(vote, Arc::clone(&event), &sender, self).await
{
tracing::debug!("Failed to handle QuorumVoteRecv event; error = {e}");
}
}
HotShotEvent::TimeoutVoteRecv(ref vote) => {
if let Err(e) =
handle_timeout_vote_recv(vote, Arc::clone(&event), &sender, self).await
{
tracing::debug!("Failed to handle TimeoutVoteRecv event; error = {e}");
}
}
HotShotEvent::ViewChange(new_view_number) => {
if let Err(e) = handle_view_change(*new_view_number, &sender, self).await {
tracing::trace!("Failed to handle ViewChange event; error = {e}");
}
}
HotShotEvent::Timeout(view_number) => {
if let Err(e) = handle_timeout(*view_number, &sender, self).await {
tracing::debug!("Failed to handle Timeout event; error = {e}");
}
}
HotShotEvent::LastDecidedViewUpdated(view_number) => {
if *view_number < self.last_decided_view {
tracing::debug!("New decided view is not newer than ours");
} else {
self.last_decided_view = *view_number;
if let Err(e) = self
.consensus
.write()
.await
.update_last_decided_view(*view_number)
{
tracing::trace!("{e:?}");
}
}
}
_ => {}
}
}
}
#[async_trait]
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
for Consensus2TaskState<TYPES, I, V>
{
type Event = HotShotEvent<TYPES>;
async fn handle_event(
&mut self,
event: Arc<Self::Event>,
sender: &Sender<Arc<Self::Event>>,
_receiver: &Receiver<Arc<Self::Event>>,
) -> Result<()> {
self.handle(event, sender.clone()).await;
Ok(())
}
async fn cancel_subtasks(&mut self) {}
}