use std::{marker::PhantomData, sync::Arc};
use async_broadcast::{Receiver, Sender};
use async_lock::RwLock;
use async_trait::async_trait;
use hotshot_task::task::TaskState;
use hotshot_types::{
consensus::OuterConsensus,
data::{PackedBundle, VidDisperse, VidDisperseShare2},
message::Proposal,
traits::{
block_contents::BlockHeader,
election::Membership,
node_implementation::{ConsensusTime, NodeImplementation, NodeType},
signature_key::SignatureKey,
BlockPayload,
},
utils::epoch_from_block_number,
};
use tracing::{debug, error, info, instrument};
use utils::anytrace::Result;
use crate::{
events::{HotShotEvent, HotShotTaskCompleted},
helpers::broadcast_event,
};
pub struct VidTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
pub cur_view: TYPES::View,
pub cur_epoch: TYPES::Epoch,
pub consensus: OuterConsensus<TYPES>,
pub network: Arc<I::Network>,
pub membership: Arc<RwLock<TYPES::Membership>>,
pub public_key: TYPES::SignatureKey,
pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
pub id: u64,
pub epoch_height: u64,
}
impl<TYPES: NodeType, I: NodeImplementation<TYPES>> VidTaskState<TYPES, I> {
#[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = *self.cur_epoch), name = "VID Main Task", level = "error", target = "VidTaskState")]
pub async fn handle(
&mut self,
event: Arc<HotShotEvent<TYPES>>,
event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
) -> Option<HotShotTaskCompleted> {
match event.as_ref() {
HotShotEvent::BlockRecv(packed_bundle) => {
let PackedBundle::<TYPES> {
encoded_transactions,
metadata,
view_number,
sequencing_fees,
vid_precompute,
auction_result,
..
} = packed_bundle;
let payload =
<TYPES as NodeType>::BlockPayload::from_bytes(encoded_transactions, metadata);
let builder_commitment = payload.builder_commitment(metadata);
let epoch = self.cur_epoch;
if self
.membership
.read()
.await
.leader(*view_number, epoch)
.ok()?
!= self.public_key
{
tracing::debug!(
"We are not the leader in the current epoch. Do not send the VID dispersal."
);
return None;
}
let vid_disperse = VidDisperse::calculate_vid_disperse(
Arc::clone(encoded_transactions),
&Arc::clone(&self.membership),
*view_number,
epoch,
epoch,
vid_precompute.clone(),
)
.await;
let payload_commitment = vid_disperse.payload_commitment;
let shares = VidDisperseShare2::from_vid_disperse(vid_disperse.clone());
let mut consensus_writer = self.consensus.write().await;
for share in shares {
if let Some(disperse) = share.to_proposal(&self.private_key) {
consensus_writer.update_vid_shares(*view_number, disperse);
}
}
drop(consensus_writer);
broadcast_event(
Arc::new(HotShotEvent::SendPayloadCommitmentAndMetadata(
payload_commitment,
builder_commitment,
metadata.clone(),
*view_number,
sequencing_fees.clone(),
auction_result.clone(),
)),
&event_stream,
)
.await;
let view_number = *view_number;
let Ok(signature) = TYPES::SignatureKey::sign(
&self.private_key,
vid_disperse.payload_commitment.as_ref(),
) else {
error!("VID: failed to sign dispersal payload");
return None;
};
debug!(
"publishing VID disperse for view {} and epoch {}",
*view_number, *epoch
);
broadcast_event(
Arc::new(HotShotEvent::VidDisperseSend(
Proposal {
signature,
data: vid_disperse.clone(),
_pd: PhantomData,
},
self.public_key.clone(),
)),
&event_stream,
)
.await;
}
HotShotEvent::ViewChange(view, epoch) => {
if *epoch > self.cur_epoch {
self.cur_epoch = *epoch;
}
let view = *view;
if (*view != 0 || *self.cur_view > 0) && *self.cur_view >= *view {
return None;
}
if *view - *self.cur_view > 1 {
info!("View changed by more than 1 going to view {:?}", view);
}
self.cur_view = view;
return None;
}
HotShotEvent::QuorumProposalSend(proposal, _) => {
let proposed_block_number = proposal.data.block_header.block_number();
if self.epoch_height == 0 || proposed_block_number % self.epoch_height != 0 {
return None;
}
let proposal_view_number = proposal.data.view_number;
let sender_epoch = TYPES::Epoch::new(epoch_from_block_number(
proposed_block_number,
self.epoch_height,
));
let target_epoch = TYPES::Epoch::new(
epoch_from_block_number(proposed_block_number, self.epoch_height) + 1,
);
let consensus_reader = self.consensus.read().await;
let Some(txns) = consensus_reader.saved_payloads().get(&proposal_view_number)
else {
tracing::warn!(
"We need to calculate VID for the nodes in the next epoch \
but we don't have the transactions"
);
return None;
};
let txns = Arc::clone(txns);
drop(consensus_reader);
let next_epoch_vid_disperse = VidDisperse::calculate_vid_disperse(
txns,
&Arc::clone(&self.membership),
proposal_view_number,
target_epoch,
sender_epoch,
None,
)
.await;
let Ok(next_epoch_signature) = TYPES::SignatureKey::sign(
&self.private_key,
next_epoch_vid_disperse.payload_commitment.as_ref(),
) else {
error!("VID: failed to sign dispersal payload for the next epoch");
return None;
};
debug!(
"publishing VID disperse for view {} and epoch {}",
*proposal_view_number, *target_epoch
);
broadcast_event(
Arc::new(HotShotEvent::VidDisperseSend(
Proposal {
signature: next_epoch_signature,
data: next_epoch_vid_disperse.clone(),
_pd: PhantomData,
},
self.public_key.clone(),
)),
&event_stream,
)
.await;
}
HotShotEvent::Shutdown => {
return Some(HotShotTaskCompleted);
}
_ => {}
}
None
}
}
#[async_trait]
impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for VidTaskState<TYPES, I> {
type Event = HotShotEvent<TYPES>;
async fn handle_event(
&mut self,
event: Arc<Self::Event>,
sender: &Sender<Arc<Self::Event>>,
_receiver: &Receiver<Arc<Self::Event>>,
) -> Result<()> {
self.handle(event, sender.clone()).await;
Ok(())
}
fn cancel_subtasks(&mut self) {}
}