use std::{
collections::{BTreeMap, HashMap},
mem::ManuallyDrop,
ops::{Deref, DerefMut},
sync::Arc,
};
use anyhow::{bail, ensure, Result};
use async_lock::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
use committable::Commitment;
use tracing::{debug, error, instrument, trace};
use vec1::Vec1;
pub use crate::utils::{View, ViewInner};
use crate::{
data::{Leaf, QuorumProposal, VidDisperse, VidDisperseShare},
error::HotShotError,
event::HotShotAction,
message::{Proposal, UpgradeLock},
simple_certificate::{DaCertificate, QuorumCertificate},
traits::{
block_contents::BuilderFee,
metrics::{Counter, Gauge, Histogram, Metrics, NoMetrics},
node_implementation::{ConsensusTime, NodeType, Versions},
signature_key::SignatureKey,
BlockPayload, ValidatedState,
},
utils::{BuilderCommitment, StateAndDelta, Terminator},
vid::VidCommitment,
vote::HasViewNumber,
};
pub type CommitmentMap<T> = HashMap<Commitment<T>, T>;
pub type VidShares<TYPES> = BTreeMap<
<TYPES as NodeType>::Time,
HashMap<<TYPES as NodeType>::SignatureKey, Proposal<TYPES, VidDisperseShare<TYPES>>>,
>;
pub type LockedConsensusState<TYPES> = Arc<RwLock<Consensus<TYPES>>>;
#[derive(Clone, Debug)]
pub struct OuterConsensus<TYPES: NodeType> {
pub inner_consensus: LockedConsensusState<TYPES>,
}
impl<TYPES: NodeType> OuterConsensus<TYPES> {
pub fn new(consensus: LockedConsensusState<TYPES>) -> Self {
Self {
inner_consensus: consensus,
}
}
#[instrument(skip_all, target = "OuterConsensus")]
pub async fn read(&self) -> ConsensusReadLockGuard<'_, TYPES> {
trace!("Trying to acquire read lock on consensus");
let ret = self.inner_consensus.read().await;
trace!("Acquired read lock on consensus");
ConsensusReadLockGuard::new(ret)
}
#[instrument(skip_all, target = "OuterConsensus")]
pub async fn write(&self) -> ConsensusWriteLockGuard<'_, TYPES> {
trace!("Trying to acquire write lock on consensus");
let ret = self.inner_consensus.write().await;
trace!("Acquired write lock on consensus");
ConsensusWriteLockGuard::new(ret)
}
#[instrument(skip_all, target = "OuterConsensus")]
pub fn try_write(&self) -> Option<ConsensusWriteLockGuard<'_, TYPES>> {
trace!("Trying to acquire write lock on consensus");
let ret = self.inner_consensus.try_write();
if let Some(guard) = ret {
trace!("Acquired write lock on consensus");
Some(ConsensusWriteLockGuard::new(guard))
} else {
trace!("Failed to acquire write lock");
None
}
}
#[instrument(skip_all, target = "OuterConsensus")]
pub async fn upgradable_read(&self) -> ConsensusUpgradableReadLockGuard<'_, TYPES> {
trace!("Trying to acquire upgradable read lock on consensus");
let ret = self.inner_consensus.upgradable_read().await;
trace!("Acquired upgradable read lock on consensus");
ConsensusUpgradableReadLockGuard::new(ret)
}
#[instrument(skip_all, target = "OuterConsensus")]
pub fn try_read(&self) -> Option<ConsensusReadLockGuard<'_, TYPES>> {
trace!("Trying to acquire read lock on consensus");
let ret = self.inner_consensus.try_read();
if let Some(guard) = ret {
trace!("Acquired read lock on consensus");
Some(ConsensusReadLockGuard::new(guard))
} else {
trace!("Failed to acquire read lock");
None
}
}
}
pub struct ConsensusReadLockGuard<'a, TYPES: NodeType> {
lock_guard: RwLockReadGuard<'a, Consensus<TYPES>>,
}
impl<'a, TYPES: NodeType> ConsensusReadLockGuard<'a, TYPES> {
#[must_use]
pub fn new(lock_guard: RwLockReadGuard<'a, Consensus<TYPES>>) -> Self {
Self { lock_guard }
}
}
impl<'a, TYPES: NodeType> Deref for ConsensusReadLockGuard<'a, TYPES> {
type Target = Consensus<TYPES>;
fn deref(&self) -> &Self::Target {
&self.lock_guard
}
}
impl<'a, TYPES: NodeType> Drop for ConsensusReadLockGuard<'a, TYPES> {
#[instrument(skip_all, target = "ConsensusReadLockGuard")]
fn drop(&mut self) {
trace!("Read lock on consensus dropped");
}
}
pub struct ConsensusWriteLockGuard<'a, TYPES: NodeType> {
lock_guard: RwLockWriteGuard<'a, Consensus<TYPES>>,
}
impl<'a, TYPES: NodeType> ConsensusWriteLockGuard<'a, TYPES> {
#[must_use]
pub fn new(lock_guard: RwLockWriteGuard<'a, Consensus<TYPES>>) -> Self {
Self { lock_guard }
}
}
impl<'a, TYPES: NodeType> Deref for ConsensusWriteLockGuard<'a, TYPES> {
type Target = Consensus<TYPES>;
fn deref(&self) -> &Self::Target {
&self.lock_guard
}
}
impl<'a, TYPES: NodeType> DerefMut for ConsensusWriteLockGuard<'a, TYPES> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.lock_guard
}
}
impl<'a, TYPES: NodeType> Drop for ConsensusWriteLockGuard<'a, TYPES> {
#[instrument(skip_all, target = "ConsensusWriteLockGuard")]
fn drop(&mut self) {
debug!("Write lock on consensus dropped");
}
}
pub struct ConsensusUpgradableReadLockGuard<'a, TYPES: NodeType> {
lock_guard: ManuallyDrop<RwLockUpgradableReadGuard<'a, Consensus<TYPES>>>,
taken: bool,
}
impl<'a, TYPES: NodeType> ConsensusUpgradableReadLockGuard<'a, TYPES> {
#[must_use]
pub fn new(lock_guard: RwLockUpgradableReadGuard<'a, Consensus<TYPES>>) -> Self {
Self {
lock_guard: ManuallyDrop::new(lock_guard),
taken: false,
}
}
#[instrument(skip_all, target = "ConsensusUpgradableReadLockGuard")]
pub async fn upgrade(mut guard: Self) -> ConsensusWriteLockGuard<'a, TYPES> {
let inner_guard = unsafe { ManuallyDrop::take(&mut guard.lock_guard) };
guard.taken = true;
debug!("Trying to upgrade upgradable read lock on consensus");
let ret = RwLockUpgradableReadGuard::upgrade(inner_guard).await;
debug!("Upgraded upgradable read lock on consensus");
ConsensusWriteLockGuard::new(ret)
}
}
impl<'a, TYPES: NodeType> Deref for ConsensusUpgradableReadLockGuard<'a, TYPES> {
type Target = Consensus<TYPES>;
fn deref(&self) -> &Self::Target {
&self.lock_guard
}
}
impl<'a, TYPES: NodeType> Drop for ConsensusUpgradableReadLockGuard<'a, TYPES> {
#[instrument(skip_all, target = "ConsensusUpgradableReadLockGuard")]
fn drop(&mut self) {
if !self.taken {
unsafe { ManuallyDrop::drop(&mut self.lock_guard) }
debug!("Upgradable read lock on consensus dropped");
}
}
}
#[derive(Debug, Clone, Copy)]
struct HotShotActionViews<T: ConsensusTime> {
proposed: T,
voted: T,
da_proposed: T,
da_vote: T,
}
impl<T: ConsensusTime> Default for HotShotActionViews<T> {
fn default() -> Self {
let genesis = T::genesis();
Self {
proposed: genesis,
voted: genesis,
da_proposed: genesis,
da_vote: genesis,
}
}
}
impl<T: ConsensusTime> HotShotActionViews<T> {
fn from_view(view: T) -> Self {
Self {
proposed: view,
voted: view,
da_proposed: view,
da_vote: view,
}
}
}
#[derive(custom_debug::Debug, Clone)]
pub struct Consensus<TYPES: NodeType> {
validated_state_map: BTreeMap<TYPES::Time, View<TYPES>>,
vid_shares: VidShares<TYPES>,
saved_da_certs: HashMap<TYPES::Time, DaCertificate<TYPES>>,
cur_view: TYPES::Time,
last_proposals: BTreeMap<TYPES::Time, Proposal<TYPES, QuorumProposal<TYPES>>>,
last_decided_view: TYPES::Time,
locked_view: TYPES::Time,
saved_leaves: CommitmentMap<Leaf<TYPES>>,
last_actions: HotShotActionViews<TYPES::Time>,
saved_payloads: BTreeMap<TYPES::Time, Arc<[u8]>>,
high_qc: QuorumCertificate<TYPES>,
pub metrics: Arc<ConsensusMetricsValue>,
}
#[derive(Clone, Debug)]
pub struct ConsensusMetricsValue {
pub last_synced_block_height: Box<dyn Gauge>,
pub last_decided_view: Box<dyn Gauge>,
pub last_decided_time: Box<dyn Gauge>,
pub current_view: Box<dyn Gauge>,
pub number_of_views_since_last_decide: Box<dyn Gauge>,
pub number_of_views_per_decide_event: Box<dyn Histogram>,
pub view_duration_as_leader: Box<dyn Histogram>,
pub invalid_qc: Box<dyn Gauge>,
pub outstanding_transactions: Box<dyn Gauge>,
pub outstanding_transactions_memory_size: Box<dyn Gauge>,
pub number_of_timeouts: Box<dyn Counter>,
pub number_of_timeouts_as_leader: Box<dyn Counter>,
pub number_of_empty_blocks_proposed: Box<dyn Counter>,
}
impl ConsensusMetricsValue {
#[must_use]
pub fn new(metrics: &dyn Metrics) -> Self {
Self {
last_synced_block_height: metrics
.create_gauge(String::from("last_synced_block_height"), None),
last_decided_view: metrics.create_gauge(String::from("last_decided_view"), None),
last_decided_time: metrics.create_gauge(String::from("last_decided_time"), None),
current_view: metrics.create_gauge(String::from("current_view"), None),
number_of_views_since_last_decide: metrics
.create_gauge(String::from("number_of_views_since_last_decide"), None),
number_of_views_per_decide_event: metrics
.create_histogram(String::from("number_of_views_per_decide_event"), None),
view_duration_as_leader: metrics
.create_histogram(String::from("view_duration_as_leader"), None),
invalid_qc: metrics.create_gauge(String::from("invalid_qc"), None),
outstanding_transactions: metrics
.create_gauge(String::from("outstanding_transactions"), None),
outstanding_transactions_memory_size: metrics
.create_gauge(String::from("outstanding_transactions_memory_size"), None),
number_of_timeouts: metrics.create_counter(String::from("number_of_timeouts"), None),
number_of_timeouts_as_leader: metrics
.create_counter(String::from("number_of_timeouts_as_leader"), None),
number_of_empty_blocks_proposed: metrics
.create_counter(String::from("number_of_empty_blocks_proposed"), None),
}
}
}
impl Default for ConsensusMetricsValue {
fn default() -> Self {
Self::new(&*NoMetrics::boxed())
}
}
impl<TYPES: NodeType> Consensus<TYPES> {
#[allow(clippy::too_many_arguments)]
pub fn new(
validated_state_map: BTreeMap<TYPES::Time, View<TYPES>>,
cur_view: TYPES::Time,
locked_view: TYPES::Time,
last_decided_view: TYPES::Time,
last_actioned_view: TYPES::Time,
last_proposals: BTreeMap<TYPES::Time, Proposal<TYPES, QuorumProposal<TYPES>>>,
saved_leaves: CommitmentMap<Leaf<TYPES>>,
saved_payloads: BTreeMap<TYPES::Time, Arc<[u8]>>,
high_qc: QuorumCertificate<TYPES>,
metrics: Arc<ConsensusMetricsValue>,
) -> Self {
Consensus {
validated_state_map,
vid_shares: BTreeMap::new(),
saved_da_certs: HashMap::new(),
cur_view,
last_decided_view,
last_proposals,
last_actions: HotShotActionViews::from_view(last_actioned_view),
locked_view,
saved_leaves,
saved_payloads,
high_qc,
metrics,
}
}
pub fn cur_view(&self) -> TYPES::Time {
self.cur_view
}
pub fn last_decided_view(&self) -> TYPES::Time {
self.last_decided_view
}
pub fn locked_view(&self) -> TYPES::Time {
self.locked_view
}
pub fn high_qc(&self) -> &QuorumCertificate<TYPES> {
&self.high_qc
}
pub fn validated_state_map(&self) -> &BTreeMap<TYPES::Time, View<TYPES>> {
&self.validated_state_map
}
pub fn saved_leaves(&self) -> &CommitmentMap<Leaf<TYPES>> {
&self.saved_leaves
}
pub fn saved_payloads(&self) -> &BTreeMap<TYPES::Time, Arc<[u8]>> {
&self.saved_payloads
}
pub fn vid_shares(&self) -> &VidShares<TYPES> {
&self.vid_shares
}
pub fn saved_da_certs(&self) -> &HashMap<TYPES::Time, DaCertificate<TYPES>> {
&self.saved_da_certs
}
pub fn last_proposals(&self) -> &BTreeMap<TYPES::Time, Proposal<TYPES, QuorumProposal<TYPES>>> {
&self.last_proposals
}
pub fn update_view(&mut self, view_number: TYPES::Time) -> Result<()> {
ensure!(
view_number > self.cur_view,
"New view isn't newer than the current view."
);
self.cur_view = view_number;
Ok(())
}
pub fn update_action(&mut self, action: HotShotAction, view: TYPES::Time) -> bool {
let old_view = match action {
HotShotAction::Vote => &mut self.last_actions.voted,
HotShotAction::Propose => &mut self.last_actions.proposed,
HotShotAction::DaPropose => &mut self.last_actions.da_proposed,
HotShotAction::DaVote => {
if view > self.last_actions.da_vote {
self.last_actions.da_vote = view;
}
return true;
}
_ => return true,
};
if view > *old_view {
*old_view = view;
return true;
}
false
}
pub fn reset_actions(&mut self) {
self.last_actions = HotShotActionViews::default();
}
pub fn update_proposed_view(
&mut self,
proposal: Proposal<TYPES, QuorumProposal<TYPES>>,
) -> Result<()> {
ensure!(
proposal.data.view_number()
> self
.last_proposals
.last_key_value()
.map_or(TYPES::Time::genesis(), |(k, _)| { *k }),
"New view isn't newer than the previously proposed view."
);
self.last_proposals
.insert(proposal.data.view_number(), proposal);
Ok(())
}
pub fn update_last_decided_view(&mut self, view_number: TYPES::Time) -> Result<()> {
ensure!(
view_number > self.last_decided_view,
"New view isn't newer than the previously decided view."
);
self.last_decided_view = view_number;
Ok(())
}
pub fn update_locked_view(&mut self, view_number: TYPES::Time) -> Result<()> {
ensure!(
view_number > self.locked_view,
"New view isn't newer than the previously locked view."
);
self.locked_view = view_number;
Ok(())
}
pub fn update_validated_state_map(
&mut self,
view_number: TYPES::Time,
view: View<TYPES>,
) -> Result<()> {
if let Some(existing_view) = self.validated_state_map().get(&view_number) {
if let ViewInner::Leaf { .. } = existing_view.view_inner {
match view.view_inner {
ViewInner::Leaf { ref delta, .. } => {
ensure!(
delta.is_some(),
"Skipping the state update to not override a `Leaf` view with `None` state delta."
);
}
_ => {
bail!("Skipping the state update to not override a `Leaf` view with a non-`Leaf` view.");
}
}
}
}
self.validated_state_map.insert(view_number, view);
Ok(())
}
pub async fn update_saved_leaves<V: Versions>(
&mut self,
leaf: Leaf<TYPES>,
upgrade_lock: &UpgradeLock<TYPES, V>,
) {
self.saved_leaves
.insert(leaf.commit(upgrade_lock).await, leaf);
}
pub fn update_saved_payloads(
&mut self,
view_number: TYPES::Time,
encoded_transaction: Arc<[u8]>,
) -> Result<()> {
ensure!(
!self.saved_payloads.contains_key(&view_number),
"Payload with the same view already exists."
);
self.saved_payloads.insert(view_number, encoded_transaction);
Ok(())
}
pub fn update_high_qc(&mut self, high_qc: QuorumCertificate<TYPES>) -> Result<()> {
ensure!(
high_qc.view_number > self.high_qc.view_number,
"High QC with an equal or higher view exists."
);
debug!("Updating high QC");
self.high_qc = high_qc;
Ok(())
}
pub fn update_vid_shares(
&mut self,
view_number: TYPES::Time,
disperse: Proposal<TYPES, VidDisperseShare<TYPES>>,
) {
self.vid_shares
.entry(view_number)
.or_default()
.insert(disperse.data.recipient_key.clone(), disperse);
}
pub fn update_saved_da_certs(&mut self, view_number: TYPES::Time, cert: DaCertificate<TYPES>) {
self.saved_da_certs.insert(view_number, cert);
}
pub fn visit_leaf_ancestors<F>(
&self,
start_from: TYPES::Time,
terminator: Terminator<TYPES::Time>,
ok_when_finished: bool,
mut f: F,
) -> Result<(), HotShotError<TYPES>>
where
F: FnMut(
&Leaf<TYPES>,
Arc<<TYPES as NodeType>::ValidatedState>,
Option<Arc<<<TYPES as NodeType>::ValidatedState as ValidatedState<TYPES>>::Delta>>,
) -> bool,
{
let mut next_leaf = if let Some(view) = self.validated_state_map.get(&start_from) {
view.leaf_commitment()
.ok_or_else(|| HotShotError::InvalidState {
context: format!(
"Visited failed view {start_from:?} leaf. Expected successful leaf"
),
})?
} else {
return Err(HotShotError::InvalidState {
context: format!("View {start_from:?} leaf does not exist in state map "),
});
};
while let Some(leaf) = self.saved_leaves.get(&next_leaf) {
let view = leaf.view_number();
if let (Some(state), delta) = self.state_and_delta(view) {
if let Terminator::Exclusive(stop_before) = terminator {
if stop_before == view {
if ok_when_finished {
return Ok(());
}
break;
}
}
next_leaf = leaf.parent_commitment();
if !f(leaf, state, delta) {
return Ok(());
}
if let Terminator::Inclusive(stop_after) = terminator {
if stop_after == view {
if ok_when_finished {
return Ok(());
}
break;
}
}
} else {
return Err(HotShotError::InvalidState {
context: format!("View {view:?} state does not exist in state map "),
});
}
}
Err(HotShotError::LeafNotFound {})
}
pub fn collect_garbage(&mut self, old_anchor_view: TYPES::Time, new_anchor_view: TYPES::Time) {
let anchor_entry = self
.validated_state_map
.iter()
.next()
.expect("INCONSISTENT STATE: anchor leaf not in state map!");
if *anchor_entry.0 != old_anchor_view {
error!(
"Something about GC has failed. Older leaf exists than the previous anchor leaf."
);
}
self.saved_da_certs
.retain(|view_number, _| *view_number >= old_anchor_view);
self.validated_state_map
.range(old_anchor_view..new_anchor_view)
.filter_map(|(_view_number, view)| view.leaf_commitment())
.for_each(|leaf| {
self.saved_leaves.remove(&leaf);
});
self.validated_state_map = self.validated_state_map.split_off(&new_anchor_view);
self.saved_payloads = self.saved_payloads.split_off(&new_anchor_view);
self.vid_shares = self.vid_shares.split_off(&new_anchor_view);
self.last_proposals = self.last_proposals.split_off(&new_anchor_view);
}
#[must_use]
pub fn decided_leaf(&self) -> Leaf<TYPES> {
let decided_view_num = self.last_decided_view;
let view = self.validated_state_map.get(&decided_view_num).unwrap();
let leaf = view
.leaf_commitment()
.expect("Decided leaf not found! Consensus internally inconsistent");
self.saved_leaves.get(&leaf).unwrap().clone()
}
#[must_use]
pub fn state(&self, view_number: TYPES::Time) -> Option<&Arc<TYPES::ValidatedState>> {
match self.validated_state_map.get(&view_number) {
Some(view) => view.state(),
None => None,
}
}
#[must_use]
pub fn state_and_delta(&self, view_number: TYPES::Time) -> StateAndDelta<TYPES> {
match self.validated_state_map.get(&view_number) {
Some(view) => view.state_and_delta(),
None => (None, None),
}
}
#[must_use]
pub fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
let decided_view_num = self.last_decided_view;
self.state_and_delta(decided_view_num)
.0
.expect("Decided state not found! Consensus internally inconsistent")
}
#[instrument(skip_all, target = "Consensus", fields(view = *view))]
pub async fn calculate_and_update_vid(
consensus: OuterConsensus<TYPES>,
view: <TYPES as NodeType>::Time,
membership: Arc<TYPES::Membership>,
private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
) -> Option<()> {
let consensus = consensus.upgradable_read().await;
let txns = consensus.saved_payloads().get(&view)?;
let vid =
VidDisperse::calculate_vid_disperse(Arc::clone(txns), &membership, view, None).await;
let shares = VidDisperseShare::from_vid_disperse(vid);
let mut consensus = ConsensusUpgradableReadLockGuard::upgrade(consensus).await;
for share in shares {
if let Some(prop) = share.to_proposal(private_key) {
consensus.update_vid_shares(view, prop);
}
}
Some(())
}
}
#[derive(Eq, Hash, PartialEq, Debug, Clone)]
pub struct CommitmentAndMetadata<TYPES: NodeType> {
pub commitment: VidCommitment,
pub builder_commitment: BuilderCommitment,
pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
pub fees: Vec1<BuilderFee<TYPES>>,
pub block_view: TYPES::Time,
pub auction_result: Option<TYPES::AuctionResult>,
}