1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
// This file is part of the HotShot repository.
// You should have received a copy of the MIT License
// along with the HotShot repository. If not, see <https://mit-license.org/>.
use std::collections::HashMap;
use libp2p::request_response::{Event, Message, OutboundRequestId, ResponseChannel};
use libp2p_identity::PeerId;
use tokio::{spawn, sync::mpsc::UnboundedSender, time::sleep};
use tracing::{debug, error, warn};
use super::exponential_backoff::ExponentialBackoff;
use crate::network::{ClientRequest, NetworkEvent};
/// Request to direct message a peert
#[derive(Debug)]
pub struct DMRequest {
/// the recv-ers peer id
pub peer_id: PeerId,
/// the data
pub data: Vec<u8>,
/// backoff since last attempted request
pub backoff: ExponentialBackoff,
/// the number of remaining retries before giving up
pub(crate) retry_count: u8,
}
/// Wrapper metadata around libp2p's request response
/// usage: direct message peer
#[derive(Debug, Default)]
pub struct DMBehaviour {
/// In progress queries
in_progress_rr: HashMap<OutboundRequestId, DMRequest>,
}
/// Lilst of direct message output events
#[derive(Debug)]
pub enum DMEvent {
/// We received as Direct Request
DirectRequest(Vec<u8>, PeerId, ResponseChannel<Vec<u8>>),
/// We received a Direct Response
DirectResponse(Vec<u8>, PeerId),
}
impl DMBehaviour {
/// handle a direct message event
pub(crate) fn handle_dm_event(
&mut self,
event: Event<Vec<u8>, Vec<u8>>,
retry_tx: Option<UnboundedSender<ClientRequest>>,
) -> Option<NetworkEvent> {
match event {
Event::InboundFailure {
peer,
request_id: _,
error,
} => {
error!("Inbound message failure from {:?}: {:?}", peer, error);
None
}
Event::OutboundFailure {
peer,
request_id,
error,
} => {
warn!("Outbound message failure to {:?}: {:?}", peer, error);
if let Some(mut req) = self.in_progress_rr.remove(&request_id) {
if req.retry_count == 0 {
return None;
}
req.retry_count -= 1;
if let Some(retry_tx) = retry_tx {
spawn(async move {
sleep(req.backoff.next_timeout(false)).await;
let _ = retry_tx.send(ClientRequest::DirectRequest {
pid: peer,
contents: req.data,
retry_count: req.retry_count,
});
});
}
}
None
}
Event::Message { message, peer, .. } => match message {
Message::Request {
request: msg,
channel,
..
} => {
debug!("Received direct request {:?}", msg);
// receiver, not initiator.
// don't track. If we are disconnected, sender will reinitiate
Some(NetworkEvent::DirectRequest(msg, peer, channel))
}
Message::Response {
request_id,
response: msg,
} => {
// success, finished.
if let Some(req) = self.in_progress_rr.remove(&request_id) {
debug!("Received direct response {:?}", msg);
Some(NetworkEvent::DirectResponse(msg, req.peer_id))
} else {
warn!("Received response for unknown request id {:?}", request_id);
None
}
}
},
e @ Event::ResponseSent { .. } => {
debug!("Response sent {:?}", e);
None
}
}
}
}
impl DMBehaviour {
/// Add a direct request for a given peer
pub fn add_direct_request(&mut self, mut req: DMRequest, request_id: OutboundRequestId) {
if req.retry_count == 0 {
return;
}
req.retry_count -= 1;
debug!("Adding direct request {:?}", req);
self.in_progress_rr.insert(request_id, req);
}
}