1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
//! This file contains the [`Sender`] and [`Receiver`] traits. These traits are **used** by the
//! [`RequestResponseProtocol`] to send and receive messages from a network or other source.
//!
//! For HotShot I've gone ahead and done a blanket implementation for a [`Sender`] for all
//! [`ConnectedNetwork`]s. The reason it's not done for the [`Receiver`] is because both
//! HS and the confirmation layer will receive messages from a single point and _then_ decide
//! what to do with them (as opposed to having some sort of filtering mechanism). So for
//! [`Receiver`] I've done a blanket implementation for channels that send [`Vec<u8>`]s.
use std::{ops::Deref, sync::Arc};
use anyhow::{Context, Result};
use async_trait::async_trait;
use hotshot_types::traits::{network::ConnectedNetwork, signature_key::SignatureKey};
use tokio::sync::mpsc;
/// A type alias for a shareable byte array
pub type Bytes = Arc<Vec<u8>>;
/// The [`Sender`] trait is used to allow the [`RequestResponseProtocol`] to send messages to a specific recipient
#[async_trait]
pub trait Sender<K: SignatureKey + 'static>: Send + Sync + 'static + Clone {
/// Send a message to the specified recipient
async fn send_message(&self, message: &Bytes, recipient: K) -> Result<()>;
}
/// The [`Receiver`] trait is used to allow the [`RequestResponseProtocol`] to receive messages from a network
/// or other source.
#[async_trait]
pub trait Receiver: Send + Sync + 'static {
/// Receive a message. Returning an error here means the receiver will _NEVER_ receive any more messages
async fn receive_message(&mut self) -> Result<Bytes>;
}
/// A blanket implementation of the [`Sender`] trait for all types that dereference to [`ConnectedNetwork`]
#[async_trait]
impl<T, K> Sender<K> for T
where
T: Deref<Target: ConnectedNetwork<K>> + Send + Sync + 'static + Clone,
K: SignatureKey + 'static,
{
async fn send_message(&self, message: &Bytes, recipient: K) -> Result<()> {
// Just send the message to the recipient
self.direct_message(message.to_vec(), recipient)
.await
.with_context(|| "failed to send message")
}
}
/// An implementation of the [`Receiver`] trait for the [`mpsc::Receiver`] type. Allows us to send messages
/// to a channel and have the protocol receive them.
#[async_trait]
impl Receiver for mpsc::Receiver<Bytes> {
async fn receive_message(&mut self) -> Result<Bytes> {
// Just receive a message from the channel
self.recv().await.ok_or(anyhow::anyhow!("channel closed"))
}
}