1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
// This file is part of the HotShot repository.

// You should have received a copy of the MIT License
// along with the HotShot repository. If not, see <https://mit-license.org/>.

use hotshot_types::traits::signature_key::SignatureKey;
use libp2p::{
    autonat,
    gossipsub::{Behaviour as GossipBehaviour, Event as GossipEvent, IdentTopic},
    identify::{Behaviour as IdentifyBehaviour, Event as IdentifyEvent},
    kad::store::MemoryStore,
    request_response::{OutboundRequestId, ResponseChannel},
    Multiaddr,
};
use libp2p_identity::PeerId;
use libp2p_swarm_derive::NetworkBehaviour;
use tracing::{debug, error};

use super::{
    behaviours::dht::store::{file_backed::FileBackedStore, validated::ValidatedStore},
    cbor, NetworkEventInternal,
};

/// Overarching network behaviour performing:
/// - network topology discovery
/// - direct messaging
/// - p2p broadcast
/// - connection management
#[derive(NetworkBehaviour, derive_more::Debug)]
#[behaviour(to_swarm = "NetworkEventInternal")]
pub struct NetworkDef<K: SignatureKey + 'static> {
    /// purpose: broadcasting messages to many peers
    /// NOTE gossipsub works ONLY for sharing messages right now
    /// in the future it may be able to do peer discovery and routing
    /// <https://github.com/libp2p/rust-libp2p/issues/2398>
    #[debug(skip)]
    gossipsub: GossipBehaviour,

    /// The DHT store. We use a `FileBackedStore` to occasionally save the DHT to
    /// a file on disk and a `ValidatedStore` to validate the records stored.
    #[debug(skip)]
    pub dht: libp2p::kad::Behaviour<FileBackedStore<ValidatedStore<MemoryStore, K>>>,

    /// purpose: identifying the addresses from an outside POV
    #[debug(skip)]
    identify: IdentifyBehaviour,

    /// purpose: directly messaging peer
    #[debug(skip)]
    pub direct_message: cbor::Behaviour<Vec<u8>, Vec<u8>>,

    /// Auto NAT behaviour to determine if we are publicly reachable and
    /// by which address
    #[debug(skip)]
    pub autonat: libp2p::autonat::Behaviour,
}

impl<K: SignatureKey + 'static> NetworkDef<K> {
    /// Create a new instance of a `NetworkDef`
    #[must_use]
    pub fn new(
        gossipsub: GossipBehaviour,
        dht: libp2p::kad::Behaviour<FileBackedStore<ValidatedStore<MemoryStore, K>>>,
        identify: IdentifyBehaviour,
        direct_message: super::cbor::Behaviour<Vec<u8>, Vec<u8>>,
        autonat: autonat::Behaviour,
    ) -> NetworkDef<K> {
        Self {
            gossipsub,
            dht,
            identify,
            direct_message,
            autonat,
        }
    }
}

/// Address functions
impl<K: SignatureKey + 'static> NetworkDef<K> {
    /// Add an address
    pub fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr) {
        // NOTE to get this address to play nice with the other
        // behaviours using the DHT for routing
        // we only need to add this address to the DHT since it
        // is always enabled. If it were not always enabled,
        // we would need to manually add the address to
        // the direct message behaviour
        self.dht.add_address(peer_id, address);
    }
}

/// Gossip functions
impl<K: SignatureKey + 'static> NetworkDef<K> {
    /// Publish a given gossip
    pub fn publish_gossip(&mut self, topic: IdentTopic, contents: Vec<u8>) {
        if let Err(e) = self.gossipsub.publish(topic, contents) {
            tracing::warn!("Failed to publish gossip message. Error: {:?}", e);
        }
    }
    /// Subscribe to a given topic
    pub fn subscribe_gossip(&mut self, t: &str) {
        if let Err(e) = self.gossipsub.subscribe(&IdentTopic::new(t)) {
            error!("Failed to subscribe to topic {:?}. Error: {:?}", t, e);
        }
    }

    /// Unsubscribe from a given topic
    pub fn unsubscribe_gossip(&mut self, t: &str) {
        if let Err(e) = self.gossipsub.unsubscribe(&IdentTopic::new(t)) {
            error!("Failed to unsubscribe from topic {:?}. Error: {:?}", t, e);
        }
    }
}

/// Request/response functions
impl<K: SignatureKey + 'static> NetworkDef<K> {
    /// Add a direct request for a given peer
    pub fn add_direct_request(&mut self, peer_id: PeerId, data: Vec<u8>) -> OutboundRequestId {
        self.direct_message.send_request(&peer_id, data)
    }

    /// Add a direct response for a channel
    pub fn add_direct_response(&mut self, chan: ResponseChannel<Vec<u8>>, msg: Vec<u8>) {
        let _ = self.direct_message.send_response(chan, msg);
    }
}

impl From<GossipEvent> for NetworkEventInternal {
    fn from(event: GossipEvent) -> Self {
        Self::GossipEvent(Box::new(event))
    }
}

impl From<libp2p::kad::Event> for NetworkEventInternal {
    fn from(event: libp2p::kad::Event) -> Self {
        Self::DHTEvent(event)
    }
}

impl From<IdentifyEvent> for NetworkEventInternal {
    fn from(event: IdentifyEvent) -> Self {
        Self::IdentifyEvent(Box::new(event))
    }
}
impl From<libp2p::request_response::Event<Vec<u8>, Vec<u8>>> for NetworkEventInternal {
    fn from(value: libp2p::request_response::Event<Vec<u8>, Vec<u8>>) -> Self {
        Self::DMEvent(value)
    }
}

impl From<libp2p::autonat::Event> for NetworkEventInternal {
    fn from(event: libp2p::autonat::Event) -> Self {
        Self::AutonatEvent(event)
    }
}