use std::{sync::Arc, time::Duration};
use async_broadcast::Receiver;
use async_compatibility_layer::art::{async_sleep, async_spawn};
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use futures::{FutureExt, StreamExt};
use hotshot_task::dependency::{Dependency, EventDependency};
use hotshot_types::{
consensus::{Consensus, LockedConsensusState, OuterConsensus},
data::VidDisperseShare,
message::{
DaConsensusMessage, DataMessage, GeneralConsensusMessage, Message, MessageKind, Proposal,
SequencingMessage,
},
request_response::{NetworkMsgResponseChannel, RequestReceiver},
traits::{
election::Membership,
network::{DataRequest, RequestKind, ResponseMessage},
node_implementation::NodeType,
signature_key::SignatureKey,
},
};
use sha2::{Digest, Sha256};
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
use tracing::instrument;
use crate::events::HotShotEvent;
const TXNS_TIMEOUT: Duration = Duration::from_millis(100);
pub struct NetworkResponseState<TYPES: NodeType> {
consensus: LockedConsensusState<TYPES>,
receiver: RequestReceiver,
quorum: Arc<TYPES::Membership>,
pub_key: TYPES::SignatureKey,
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
id: u64,
}
impl<TYPES: NodeType> NetworkResponseState<TYPES> {
pub fn new(
consensus: LockedConsensusState<TYPES>,
receiver: RequestReceiver,
quorum: Arc<TYPES::Membership>,
pub_key: TYPES::SignatureKey,
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
id: u64,
) -> Self {
Self {
consensus,
receiver,
quorum,
pub_key,
private_key,
id,
}
}
async fn run_loop(mut self, shutdown: EventDependency<Arc<HotShotEvent<TYPES>>>) {
let mut shutdown = Box::pin(shutdown.completed().fuse());
loop {
futures::select! {
req = self.receiver.next() => {
match req {
Some((msg, chan)) => self.handle_message(msg, chan).await,
None => return,
}
},
_ = shutdown => {
return;
}
}
}
}
async fn handle_message(&self, raw_req: Vec<u8>, chan: NetworkMsgResponseChannel<Vec<u8>>) {
let req: Message<TYPES> = match bincode::deserialize(&raw_req) {
Ok(deserialized) => deserialized,
Err(e) => {
tracing::error!("Failed to deserialize message! Error: {e}");
return;
}
};
let sender = req.sender.clone();
match req.kind {
MessageKind::Data(DataMessage::RequestData(request)) => {
if !self.valid_sender(&sender) || !valid_signature::<TYPES>(&request, &sender) {
let serialized_msg = match bincode::serialize(
&self.make_msg(ResponseMessage::Denied),
) {
Ok(serialized) => serialized,
Err(e) => {
tracing::error!("Failed to serialize outgoing message: this should never happen. Error: {e}");
return;
}
};
let _ = chan.sender.send(serialized_msg);
return;
}
let response = self.handle_request(request).await;
let serialized_response = match bincode::serialize(&response) {
Ok(serialized) => serialized,
Err(e) => {
tracing::error!("Failed to serialize outgoing message: this should never happen. Error: {e}");
return;
}
};
let _ = chan.sender.send(serialized_response);
}
msg => tracing::error!(
"Received message that wasn't a DataRequest in the request task. Message: {:?}",
msg
),
}
}
#[instrument(skip_all, target = "NetworkResponseState", fields(id = self.id))]
async fn get_or_calc_vid_share(
&self,
view: TYPES::Time,
key: &TYPES::SignatureKey,
) -> Option<Proposal<TYPES, VidDisperseShare<TYPES>>> {
let contained = self
.consensus
.read()
.await
.vid_shares()
.get(&view)
.is_some_and(|m| m.contains_key(key));
if !contained {
if Consensus::calculate_and_update_vid(
OuterConsensus::new(Arc::clone(&self.consensus)),
view,
Arc::clone(&self.quorum),
&self.private_key,
)
.await
.is_none()
{
async_sleep(TXNS_TIMEOUT).await;
Consensus::calculate_and_update_vid(
OuterConsensus::new(Arc::clone(&self.consensus)),
view,
Arc::clone(&self.quorum),
&self.private_key,
)
.await?;
}
return self
.consensus
.read()
.await
.vid_shares()
.get(&view)?
.get(key)
.cloned();
}
self.consensus
.read()
.await
.vid_shares()
.get(&view)?
.get(key)
.cloned()
}
async fn handle_request(&self, req: DataRequest<TYPES>) -> Message<TYPES> {
match req.request {
RequestKind::Vid(view, pub_key) => {
let Some(share) = self.get_or_calc_vid_share(view, &pub_key).await else {
return self.make_msg(ResponseMessage::NotFound);
};
let seq_msg = SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(share));
self.make_msg(ResponseMessage::Found(seq_msg))
}
RequestKind::DaProposal(_view) => self.make_msg(ResponseMessage::NotFound),
RequestKind::Proposal(view) => self.make_msg(self.respond_with_proposal(view).await),
}
}
fn make_msg(&self, msg: ResponseMessage<TYPES>) -> Message<TYPES> {
Message {
sender: self.pub_key.clone(),
kind: MessageKind::Data(DataMessage::DataResponse(msg)),
}
}
fn valid_sender(&self, sender: &TYPES::SignatureKey) -> bool {
self.quorum.has_stake(sender)
}
async fn respond_with_proposal(&self, view: TYPES::Time) -> ResponseMessage<TYPES> {
match self.consensus.read().await.last_proposals().get(&view) {
Some(prop) => ResponseMessage::Found(SequencingMessage::General(
GeneralConsensusMessage::Proposal(prop.clone()),
)),
None => ResponseMessage::NotFound,
}
}
}
fn valid_signature<TYPES: NodeType>(
req: &DataRequest<TYPES>,
sender: &TYPES::SignatureKey,
) -> bool {
let Ok(data) = bincode::serialize(&req.request) else {
return false;
};
sender.validate(&req.signature, &Sha256::digest(data))
}
pub fn run_response_task<TYPES: NodeType>(
task_state: NetworkResponseState<TYPES>,
event_stream: Receiver<Arc<HotShotEvent<TYPES>>>,
) -> JoinHandle<()> {
let dep = EventDependency::new(
event_stream,
Box::new(|e| matches!(e.as_ref(), HotShotEvent::Shutdown)),
);
async_spawn(task_state.run_loop(dep))
}