1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
// This file is part of the HotShot repository.
// You should have received a copy of the MIT License
// along with the HotShot repository. If not, see <https://mit-license.org/>.
/// networking behaviours wrapping libp2p's behaviours
pub mod behaviours;
/// defines the swarm and network definition (internal)
mod def;
/// functionality of a libp2p network node
mod node;
/// Alternative Libp2p transport implementations
pub mod transport;
/// Forked `cbor` codec with altered request/response sizes
pub mod cbor;
use std::{collections::HashSet, fmt::Debug, sync::Arc};
use async_lock::RwLock;
use futures::channel::oneshot::Sender;
use hotshot_types::traits::{network::NetworkError, node_implementation::NodeType};
use libp2p::{
build_multiaddr,
core::{muxing::StreamMuxerBox, transport::Boxed},
dns::tokio::Transport as DnsTransport,
gossipsub::Event as GossipEvent,
identify::Event as IdentifyEvent,
identity::Keypair,
quic,
request_response::ResponseChannel,
Multiaddr, Transport,
};
use libp2p_identity::PeerId;
use quic::tokio::Transport as QuicTransport;
use tracing::instrument;
use transport::StakeTableAuthentication;
pub use self::{
def::NetworkDef,
node::{
spawn_network_node, GossipConfig, NetworkNode, NetworkNodeConfig, NetworkNodeConfigBuilder,
NetworkNodeConfigBuilderError, NetworkNodeHandle, NetworkNodeReceiver,
RequestResponseConfig, DEFAULT_REPLICATION_FACTOR,
},
};
/// Actions to send from the client to the swarm
#[derive(Debug)]
pub enum ClientRequest {
/// Start the bootstrap process to kademlia
BeginBootstrap,
/// kill the swarm
Shutdown,
/// broadcast a serialized message
GossipMsg(String, Vec<u8>),
/// subscribe to a topic
Subscribe(String, Option<Sender<()>>),
/// unsubscribe from a topic
Unsubscribe(String, Option<Sender<()>>),
/// client request to send a direct serialized message
DirectRequest {
/// peer id
pid: PeerId,
/// msg contents
contents: Vec<u8>,
/// number of retries
retry_count: u8,
},
/// client request to send a direct reply to a message
DirectResponse(ResponseChannel<Vec<u8>>, Vec<u8>),
/// prune a peer
Prune(PeerId),
/// add vec of known peers or addresses
AddKnownPeers(Vec<(PeerId, Multiaddr)>),
/// Ignore peers. Only here for debugging purposes.
/// Allows us to have nodes that are never pruned
IgnorePeers(Vec<PeerId>),
/// Put(Key, Value) into DHT
/// relay success back on channel
PutDHT {
/// Key to publish under
key: Vec<u8>,
/// Value to publish under
value: Vec<u8>,
/// Channel to notify caller of result of publishing
notify: Sender<()>,
},
/// Get(Key, Chan)
GetDHT {
/// Key to search for
key: Vec<u8>,
/// Channel to notify caller of value (or failure to find value)
notify: Sender<Vec<u8>>,
/// number of retries to make
retry_count: u8,
},
/// Request the number of connected peers
GetConnectedPeerNum(Sender<usize>),
/// Request the set of connected peers
GetConnectedPeers(Sender<HashSet<PeerId>>),
/// Print the routing table to stderr, debugging only
GetRoutingTable(Sender<()>),
/// Get address of peer
LookupPeer(PeerId, Sender<()>),
}
/// events generated by the swarm that we wish
/// to relay to the client
#[derive(Debug)]
pub enum NetworkEvent {
/// Recv-ed a broadcast
GossipMsg(Vec<u8>),
/// Recv-ed a direct message from a node
DirectRequest(Vec<u8>, PeerId, ResponseChannel<Vec<u8>>),
/// Recv-ed a direct response from a node (that hopefully was initiated by this node)
DirectResponse(Vec<u8>, PeerId),
/// Report that kademlia has successfully bootstrapped into the network
IsBootstrapped,
/// The number of connected peers has possibly changed
ConnectedPeersUpdate(usize),
}
#[derive(Debug)]
/// internal representation of the network events
/// only used for event processing before relaying to client
pub enum NetworkEventInternal {
/// a DHT event
DHTEvent(libp2p::kad::Event),
/// a identify event. Is boxed because this event is much larger than the other ones so we want
/// to store it on the heap.
IdentifyEvent(Box<IdentifyEvent>),
/// a gossip event
GossipEvent(Box<GossipEvent>),
/// a direct message event
DMEvent(libp2p::request_response::Event<Vec<u8>, Vec<u8>>),
/// a autonat event
AutonatEvent(libp2p::autonat::Event),
}
/// Bind all interfaces on port `port`
/// NOTE we may want something more general in the fture.
#[must_use]
pub fn gen_multiaddr(port: u16) -> Multiaddr {
build_multiaddr!(Ip4([0, 0, 0, 0]), Udp(port), QuicV1)
}
/// `BoxedTransport` is a type alias for a boxed tuple containing a `PeerId` and a `StreamMuxerBox`.
///
/// This type is used to represent a transport in the libp2p network framework. The `PeerId` is a unique identifier for each peer in the network, and the `StreamMuxerBox` is a type of multiplexer that can handle multiple substreams over a single connection.
type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>;
/// Generates an authenticated transport checked against the stake table.
/// If the stake table or authentication message is not provided, the transport will
/// not participate in stake table authentication.
///
/// # Errors
/// If we could not create a DNS transport
#[instrument(skip(identity))]
pub async fn gen_transport<T: NodeType>(
identity: Keypair,
stake_table: Option<Arc<RwLock<T::Membership>>>,
auth_message: Option<Vec<u8>>,
) -> Result<BoxedTransport, NetworkError> {
// Create the initial `Quic` transport
let transport = {
let mut config = quic::Config::new(&identity);
config.handshake_timeout = std::time::Duration::from_secs(20);
QuicTransport::new(config)
};
// Require authentication against the stake table
let transport: StakeTableAuthentication<_, T, _> =
StakeTableAuthentication::new(transport, stake_table, auth_message);
// Support DNS resolution
let transport = {
{
DnsTransport::system(transport)
}
}
.map_err(|e| NetworkError::ConfigError(format!("failed to build DNS transport: {e}")))?;
Ok(transport
.map(|(peer_id, connection), _| (peer_id, StreamMuxerBox::new(connection)))
.boxed())
}