1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
// This file is part of the HotShot repository.

// You should have received a copy of the MIT License
// along with the HotShot repository. If not, see <https://mit-license.org/>.

/// networking behaviours wrapping libp2p's behaviours
pub mod behaviours;
/// defines the swarm and network definition (internal)
mod def;
/// libp2p network errors
pub mod error;
/// functionality of a libp2p network node
mod node;
/// Alternative Libp2p transport implementations
pub mod transport;

use std::{collections::HashSet, fmt::Debug};

use futures::channel::oneshot::{self, Sender};
use hotshot_types::{
    request_response::{Request, Response},
    traits::signature_key::SignatureKey,
};
#[cfg(async_executor_impl = "async-std")]
use libp2p::dns::async_std::Transport as DnsTransport;
#[cfg(async_executor_impl = "tokio")]
use libp2p::dns::tokio::Transport as DnsTransport;
use libp2p::{
    build_multiaddr,
    core::{muxing::StreamMuxerBox, transport::Boxed},
    gossipsub::Event as GossipEvent,
    identify::Event as IdentifyEvent,
    identity::Keypair,
    quic,
    request_response::ResponseChannel,
    Multiaddr, Transport,
};
use libp2p_identity::PeerId;
#[cfg(async_executor_impl = "async-std")]
use quic::async_std::Transport as QuicTransport;
#[cfg(async_executor_impl = "tokio")]
use quic::tokio::Transport as QuicTransport;
use tracing::instrument;
use transport::StakeTableAuthentication;

pub use self::{
    def::NetworkDef,
    error::NetworkError,
    node::{
        network_node_handle_error, spawn_network_node, GossipConfig, NetworkNode,
        NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeConfigBuilderError,
        NetworkNodeHandle, NetworkNodeHandleError, NetworkNodeReceiver, DEFAULT_REPLICATION_FACTOR,
    },
};
#[cfg(not(any(async_executor_impl = "async-std", async_executor_impl = "tokio")))]
compile_error! {"Either config option \"async-std\" or \"tokio\" must be enabled for this crate."}

/// Actions to send from the client to the swarm
#[derive(Debug)]
pub enum ClientRequest {
    /// Start the bootstrap process to kademlia
    BeginBootstrap,
    /// kill the swarm
    Shutdown,
    /// broadcast a serialized message
    GossipMsg(String, Vec<u8>),
    /// subscribe to a topic
    Subscribe(String, Option<Sender<()>>),
    /// unsubscribe from a topic
    Unsubscribe(String, Option<Sender<()>>),
    /// client request to send a direct serialized message
    DirectRequest {
        /// peer id
        pid: PeerId,
        /// msg contents
        contents: Vec<u8>,
        /// number of retries
        retry_count: u8,
    },
    /// client request to send a direct reply to a message
    DirectResponse(ResponseChannel<Vec<u8>>, Vec<u8>),
    /// request for data from another peer
    DataRequest {
        /// request sent on wire
        request: Request,
        /// Peer to try sending the request to
        peer: PeerId,
        /// Send back request ID to client
        chan: oneshot::Sender<Option<Response>>,
    },
    /// Respond with some data to another peer
    DataResponse {
        /// Data
        response: Response,
        /// Send back channel
        chan: ResponseChannel<Response>,
    },
    /// prune a peer
    Prune(PeerId),
    /// add vec of known peers or addresses
    AddKnownPeers(Vec<(PeerId, Multiaddr)>),
    /// Ignore peers. Only here for debugging purposes.
    /// Allows us to have nodes that are never pruned
    IgnorePeers(Vec<PeerId>),
    /// Put(Key, Value) into DHT
    /// relay success back on channel
    PutDHT {
        /// Key to publish under
        key: Vec<u8>,
        /// Value to publish under
        value: Vec<u8>,
        /// Channel to notify caller of result of publishing
        notify: Sender<()>,
    },
    /// Get(Key, Chan)
    GetDHT {
        /// Key to search for
        key: Vec<u8>,
        /// Channel to notify caller of value (or failure to find value)
        notify: Sender<Vec<u8>>,
        /// number of retries to make
        retry_count: u8,
    },
    /// Request the number of connected peers
    GetConnectedPeerNum(Sender<usize>),
    /// Request the set of connected peers
    GetConnectedPeers(Sender<HashSet<PeerId>>),
    /// Print the routing  table to stderr, debugging only
    GetRoutingTable(Sender<()>),
    /// Get address of peer
    LookupPeer(PeerId, Sender<()>),
}

/// events generated by the swarm that we wish
/// to relay to the client
#[derive(Debug)]
pub enum NetworkEvent {
    /// Recv-ed a broadcast
    GossipMsg(Vec<u8>),
    /// Recv-ed a direct message from a node
    DirectRequest(Vec<u8>, PeerId, ResponseChannel<Vec<u8>>),
    /// Recv-ed a direct response from a node (that hopefully was initiated by this node)
    DirectResponse(Vec<u8>, PeerId),
    /// A peer is asking us for data
    ResponseRequested(Request, ResponseChannel<Response>),
    /// Report that kademlia has successfully bootstrapped into the network
    IsBootstrapped,
    /// The number of connected peers has possibly changed
    ConnectedPeersUpdate(usize),
}

#[derive(Debug)]
/// internal representation of the network events
/// only used for event processing before relaying to client
pub enum NetworkEventInternal {
    /// a DHT event
    DHTEvent(libp2p::kad::Event),
    /// a identify event. Is boxed because this event is much larger than the other ones so we want
    /// to store it on the heap.
    IdentifyEvent(Box<IdentifyEvent>),
    /// a gossip  event
    GossipEvent(Box<GossipEvent>),
    /// a direct message event
    DMEvent(libp2p::request_response::Event<Vec<u8>, Vec<u8>>),
    /// a request response event
    RequestResponseEvent(libp2p::request_response::Event<Request, Response>),
    /// a autonat event
    AutonatEvent(libp2p::autonat::Event),
}

/// Bind all interfaces on port `port`
/// NOTE we may want something more general in the fture.
#[must_use]
pub fn gen_multiaddr(port: u16) -> Multiaddr {
    build_multiaddr!(Ip4([0, 0, 0, 0]), Udp(port), QuicV1)
}

/// `BoxedTransport` is a type alias for a boxed tuple containing a `PeerId` and a `StreamMuxerBox`.
///
/// This type is used to represent a transport in the libp2p network framework. The `PeerId` is a unique identifier for each peer in the network, and the `StreamMuxerBox` is a type of multiplexer that can handle multiple substreams over a single connection.
type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>;

/// Generates an authenticated transport checked against the stake table.
/// If the stake table or authentication message is not provided, the transport will
/// not participate in stake table authentication.
///
/// # Errors
/// If we could not create a DNS transport
#[instrument(skip(identity))]
pub async fn gen_transport<K: SignatureKey + 'static>(
    identity: Keypair,
    stake_table: Option<HashSet<K>>,
    auth_message: Option<Vec<u8>>,
) -> Result<BoxedTransport, NetworkError> {
    // Create the initial `Quic` transport
    let transport = {
        let mut config = quic::Config::new(&identity);
        config.handshake_timeout = std::time::Duration::from_secs(20);
        QuicTransport::new(config)
    };

    // Require authentication against the stake table
    let transport = StakeTableAuthentication::new(transport, stake_table, auth_message);

    // Support DNS resolution
    let transport = {
        #[cfg(async_executor_impl = "async-std")]
        {
            DnsTransport::system(transport).await
        }

        #[cfg(async_executor_impl = "tokio")]
        {
            DnsTransport::system(transport)
        }
    }
    .map_err(|e| NetworkError::TransportLaunch { source: e })?;

    Ok(transport
        .map(|(peer_id, connection), _| (peer_id, StreamMuxerBox::new(connection)))
        .boxed())
}