1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
// This file is part of the HotShot repository.
// You should have received a copy of the MIT License
// along with the HotShot repository. If not, see <https://mit-license.org/>.
//! Provides an event-streaming handle for a [`SystemContext`] running in the background
use std::sync::Arc;
use anyhow::{anyhow, Context, Ok, Result};
use async_broadcast::{InactiveReceiver, Receiver, Sender};
use async_lock::RwLock;
use committable::{Commitment, Committable};
use futures::Stream;
use hotshot_task::{
dependency::{Dependency, EventDependency},
task::{ConsensusTaskRegistry, NetworkTaskRegistry, Task, TaskState},
};
use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
use hotshot_types::{
consensus::Consensus,
data::{Leaf2, QuorumProposal2},
error::HotShotError,
message::{Message, MessageKind, Proposal, RecipientList},
request_response::ProposalRequestPayload,
traits::{
consensus_api::ConsensusApi,
election::Membership,
network::{BroadcastDelay, ConnectedNetwork, Topic},
node_implementation::NodeType,
signature_key::SignatureKey,
},
vote::HasViewNumber,
};
use tracing::instrument;
use crate::{traits::NodeImplementation, types::Event, SystemContext, Versions};
/// Event streaming handle for a [`SystemContext`] instance running in the background
///
/// This type provides the means to message and interact with a background [`SystemContext`] instance,
/// allowing the ability to receive [`Event`]s from it, send transactions to it, and interact with
/// the underlying storage.
pub struct SystemContextHandle<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
/// The [sender](Sender) and [receiver](Receiver),
/// to allow the application to communicate with HotShot.
pub(crate) output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
/// access to the internal event stream, in case we need to, say, shut something down
#[allow(clippy::type_complexity)]
pub(crate) internal_event_stream: (
Sender<Arc<HotShotEvent<TYPES>>>,
InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
),
/// registry for controlling consensus tasks
pub(crate) consensus_registry: ConsensusTaskRegistry<HotShotEvent<TYPES>>,
/// registry for controlling network tasks
pub(crate) network_registry: NetworkTaskRegistry,
/// Internal reference to the underlying [`SystemContext`]
pub hotshot: Arc<SystemContext<TYPES, I, V>>,
/// Reference to the internal storage for consensus datum.
pub(crate) storage: Arc<RwLock<I::Storage>>,
/// Networks used by the instance of hotshot
pub network: Arc<I::Network>,
/// Memberships used by consensus
pub memberships: Arc<RwLock<TYPES::Membership>>,
/// Number of blocks in an epoch, zero means there are no epochs
pub epoch_height: u64,
}
impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
SystemContextHandle<TYPES, I, V>
{
/// Adds a hotshot consensus-related task to the `SystemContextHandle`.
pub fn add_task<S: TaskState<Event = HotShotEvent<TYPES>> + 'static>(&mut self, task_state: S) {
let task = Task::new(
task_state,
self.internal_event_stream.0.clone(),
self.internal_event_stream.1.activate_cloned(),
);
self.consensus_registry.run_task(task);
}
/// obtains a stream to expose to the user
pub fn event_stream(&self) -> impl Stream<Item = Event<TYPES>> {
self.output_event_stream.1.activate_cloned()
}
/// Message other participants with a serialized message from the application
/// Receivers of this message will get an `Event::ExternalMessageReceived` via
/// the event stream.
///
/// # Errors
/// Errors if serializing the request fails, or the request fails to be sent
pub async fn send_external_message(
&self,
msg: Vec<u8>,
recipients: RecipientList<TYPES::SignatureKey>,
) -> Result<()> {
let message = Message {
sender: self.public_key().clone(),
kind: MessageKind::External(msg),
};
let serialized_message = self.hotshot.upgrade_lock.serialize(&message).await?;
match recipients {
RecipientList::Broadcast => {
self.network
.broadcast_message(serialized_message, Topic::Global, BroadcastDelay::None)
.await?;
}
RecipientList::Direct(recipient) => {
self.network
.direct_message(serialized_message, recipient)
.await?;
}
RecipientList::Many(recipients) => {
self.network
.da_broadcast_message(serialized_message, recipients, BroadcastDelay::None)
.await?;
}
}
Ok(())
}
/// Request a proposal from the all other nodes. Will block until some node
/// returns a valid proposal with the requested commitment. If nobody has the
/// proposal this will block forever
///
/// # Errors
/// Errors if signing the request for proposal fails
pub fn request_proposal(
&self,
view: TYPES::View,
leaf_commitment: Commitment<Leaf2<TYPES>>,
) -> Result<impl futures::Future<Output = Result<Proposal<TYPES, QuorumProposal2<TYPES>>>>>
{
// We need to be able to sign this request before submitting it to the network. Compute the
// payload first.
let signed_proposal_request = ProposalRequestPayload {
view_number: view,
key: self.public_key().clone(),
};
// Finally, compute the signature for the payload.
let signature = TYPES::SignatureKey::sign(
self.private_key(),
signed_proposal_request.commit().as_ref(),
)?;
let mem = Arc::clone(&self.memberships);
let receiver = self.internal_event_stream.1.activate_cloned();
let sender = self.internal_event_stream.0.clone();
let epoch_height = self.epoch_height;
Ok(async move {
// First, broadcast that we need a proposal
broadcast_event(
HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
&sender,
)
.await;
loop {
let hs_event = EventDependency::new(
receiver.clone(),
Box::new(move |event| {
let event = event.as_ref();
if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event {
quorum_proposal.data.view_number() == view
} else {
false
}
}),
)
.completed()
.await
.ok_or(anyhow!("Event dependency failed to get event"))?;
// Then, if it's `Some`, make sure that the data is correct
if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = hs_event.as_ref()
{
// Make sure that the quorum_proposal is valid
let mem_reader = mem.read().await;
if let Err(err) = quorum_proposal.validate_signature(&mem_reader, epoch_height)
{
tracing::warn!("Invalid Proposal Received after Request. Err {:?}", err);
continue;
}
drop(mem_reader);
let proposed_leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
let commit = proposed_leaf.commit();
if commit == leaf_commitment {
return Ok(quorum_proposal.clone());
}
tracing::warn!("Proposal received from request has different commitment than expected.\nExpected = {:?}\nReceived{:?}", leaf_commitment, commit);
}
}
})
}
/// HACK so we can know the types when running tests...
/// there are two cleaner solutions:
/// - make the stream generic and in nodetypes or nodeimpelmentation
/// - type wrapper
#[must_use]
pub fn event_stream_known_impl(&self) -> Receiver<Event<TYPES>> {
self.output_event_stream.1.activate_cloned()
}
/// HACK so we can create dependency tasks when running tests
#[must_use]
pub fn internal_event_stream_sender(&self) -> Sender<Arc<HotShotEvent<TYPES>>> {
self.internal_event_stream.0.clone()
}
/// HACK so we can know the types when running tests...
/// there are two cleaner solutions:
/// - make the stream generic and in nodetypes or nodeimpelmentation
/// - type wrapper
///
/// NOTE: this is only used for sanity checks in our tests
#[must_use]
pub fn internal_event_stream_receiver_known_impl(&self) -> Receiver<Arc<HotShotEvent<TYPES>>> {
self.internal_event_stream.1.activate_cloned()
}
/// Get the last decided validated state of the [`SystemContext`] instance.
///
/// # Panics
/// If the internal consensus is in an inconsistent state.
pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
self.hotshot.decided_state().await
}
/// Get the validated state from a given `view`.
///
/// Returns the requested state, if the [`SystemContext`] is tracking this view. Consensus
/// tracks views that have not yet been decided but could be in the future. This function may
/// return [`None`] if the requested view has already been decided (but see
/// [`decided_state`](Self::decided_state)) or if there is no path for the requested
/// view to ever be decided.
pub async fn state(&self, view: TYPES::View) -> Option<Arc<TYPES::ValidatedState>> {
self.hotshot.state(view).await
}
/// Get the last decided leaf of the [`SystemContext`] instance.
///
/// # Panics
/// If the internal consensus is in an inconsistent state.
pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
self.hotshot.decided_leaf().await
}
/// Tries to get the most recent decided leaf, returning instantly
/// if we can't acquire the lock.
///
/// # Panics
/// Panics if internal consensus is in an inconsistent state.
#[must_use]
pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
self.hotshot.try_decided_leaf()
}
/// Submits a transaction to the backing [`SystemContext`] instance.
///
/// The current node broadcasts the transaction to all nodes on the network.
///
/// # Errors
///
/// Will return a [`HotShotError`] if some error occurs in the underlying
/// [`SystemContext`] instance.
pub async fn submit_transaction(
&self,
tx: TYPES::Transaction,
) -> Result<(), HotShotError<TYPES>> {
self.hotshot.publish_transaction_async(tx).await
}
/// Get the underlying consensus state for this [`SystemContext`]
#[must_use]
pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
self.hotshot.consensus()
}
/// Shut down the the inner hotshot and wait until all background threads are closed.
pub async fn shut_down(&mut self) {
// this is required because `SystemContextHandle` holds an inactive receiver and
// `broadcast_direct` below can wait indefinitely
self.internal_event_stream.0.set_await_active(false);
let _ = self
.internal_event_stream
.0
.broadcast_direct(Arc::new(HotShotEvent::Shutdown))
.await
.inspect_err(|err| tracing::error!("Failed to send shutdown event: {err}"));
tracing::error!("Shutting down the network!");
self.hotshot.network.shut_down().await;
tracing::error!("Shutting down network tasks!");
self.network_registry.shutdown().await;
tracing::error!("Shutting down consensus!");
self.consensus_registry.shutdown().await;
}
/// return the timeout for a view of the underlying `SystemContext`
#[must_use]
pub fn next_view_timeout(&self) -> u64 {
self.hotshot.next_view_timeout()
}
/// Wrapper for `HotShotConsensusApi`'s `leader` function
///
/// # Errors
/// Returns an error if the leader cannot be calculated
#[allow(clippy::unused_async)] // async for API compatibility reasons
pub async fn leader(
&self,
view_number: TYPES::View,
epoch_number: TYPES::Epoch,
) -> Result<TYPES::SignatureKey> {
self.hotshot
.memberships
.read()
.await
.leader(view_number, epoch_number)
.context("Failed to lookup leader")
}
// Below is for testing only:
/// Wrapper to get this node's public key
#[cfg(feature = "hotshot-testing")]
#[must_use]
pub fn public_key(&self) -> TYPES::SignatureKey {
self.hotshot.public_key.clone()
}
/// Get the sender side of the external event stream for testing purpose
#[cfg(feature = "hotshot-testing")]
#[must_use]
pub fn external_channel_sender(&self) -> Sender<Event<TYPES>> {
self.output_event_stream.0.clone()
}
/// Get the sender side of the internal event stream for testing purpose
#[cfg(feature = "hotshot-testing")]
#[must_use]
pub fn internal_channel_sender(&self) -> Sender<Arc<HotShotEvent<TYPES>>> {
self.internal_event_stream.0.clone()
}
/// Wrapper to get the view number this node is on.
#[instrument(skip_all, target = "SystemContextHandle", fields(id = self.hotshot.id))]
pub async fn cur_view(&self) -> TYPES::View {
self.hotshot.consensus.read().await.cur_view()
}
/// Wrapper to get the epoch number this node is on.
#[instrument(skip_all, target = "SystemContextHandle", fields(id = self.hotshot.id))]
pub async fn cur_epoch(&self) -> TYPES::Epoch {
self.hotshot.consensus.read().await.cur_epoch()
}
/// Provides a reference to the underlying storage for this [`SystemContext`], allowing access to
/// historical data
#[must_use]
pub fn storage(&self) -> Arc<RwLock<I::Storage>> {
Arc::clone(&self.storage)
}
}