use std::{
collections::{BTreeMap, HashMap},
mem::ManuallyDrop,
ops::{Deref, DerefMut},
sync::Arc,
};
use async_lock::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
use committable::{Commitment, Committable};
use tracing::instrument;
use utils::anytrace::*;
use vec1::Vec1;
pub use crate::utils::{View, ViewInner};
use crate::{
data::{Leaf2, QuorumProposalWrapper, VidDisperse, VidDisperseShare2},
drb::DrbSeedsAndResults,
error::HotShotError,
event::{HotShotAction, LeafInfo},
message::Proposal,
simple_certificate::{DaCertificate2, NextEpochQuorumCertificate2, QuorumCertificate2},
traits::{
block_contents::BuilderFee,
metrics::{Counter, Gauge, Histogram, Metrics, NoMetrics},
node_implementation::{ConsensusTime, NodeType},
signature_key::SignatureKey,
BlockPayload, ValidatedState,
},
utils::{
epoch_from_block_number, is_last_block_in_epoch, option_epoch_from_block_number,
BuilderCommitment, LeafCommitment, StateAndDelta, Terminator,
},
vid::VidCommitment,
vote::{Certificate, HasViewNumber},
};
pub type CommitmentMap<T> = HashMap<Commitment<T>, T>;
pub type VidShares<TYPES> = BTreeMap<
<TYPES as NodeType>::View,
HashMap<<TYPES as NodeType>::SignatureKey, Proposal<TYPES, VidDisperseShare2<TYPES>>>,
>;
pub type LockedConsensusState<TYPES> = Arc<RwLock<Consensus<TYPES>>>;
#[derive(Clone, Debug)]
pub struct OuterConsensus<TYPES: NodeType> {
pub inner_consensus: LockedConsensusState<TYPES>,
}
impl<TYPES: NodeType> OuterConsensus<TYPES> {
pub fn new(consensus: LockedConsensusState<TYPES>) -> Self {
Self {
inner_consensus: consensus,
}
}
#[instrument(skip_all, target = "OuterConsensus")]
pub async fn read(&self) -> ConsensusReadLockGuard<'_, TYPES> {
tracing::trace!("Trying to acquire read lock on consensus");
let ret = self.inner_consensus.read().await;
tracing::trace!("Acquired read lock on consensus");
ConsensusReadLockGuard::new(ret)
}
#[instrument(skip_all, target = "OuterConsensus")]
pub async fn write(&self) -> ConsensusWriteLockGuard<'_, TYPES> {
tracing::trace!("Trying to acquire write lock on consensus");
let ret = self.inner_consensus.write().await;
tracing::trace!("Acquired write lock on consensus");
ConsensusWriteLockGuard::new(ret)
}
#[instrument(skip_all, target = "OuterConsensus")]
pub fn try_write(&self) -> Option<ConsensusWriteLockGuard<'_, TYPES>> {
tracing::trace!("Trying to acquire write lock on consensus");
let ret = self.inner_consensus.try_write();
if let Some(guard) = ret {
tracing::trace!("Acquired write lock on consensus");
Some(ConsensusWriteLockGuard::new(guard))
} else {
tracing::trace!("Failed to acquire write lock");
None
}
}
#[instrument(skip_all, target = "OuterConsensus")]
pub async fn upgradable_read(&self) -> ConsensusUpgradableReadLockGuard<'_, TYPES> {
tracing::trace!("Trying to acquire upgradable read lock on consensus");
let ret = self.inner_consensus.upgradable_read().await;
tracing::trace!("Acquired upgradable read lock on consensus");
ConsensusUpgradableReadLockGuard::new(ret)
}
#[instrument(skip_all, target = "OuterConsensus")]
pub fn try_read(&self) -> Option<ConsensusReadLockGuard<'_, TYPES>> {
tracing::trace!("Trying to acquire read lock on consensus");
let ret = self.inner_consensus.try_read();
if let Some(guard) = ret {
tracing::trace!("Acquired read lock on consensus");
Some(ConsensusReadLockGuard::new(guard))
} else {
tracing::trace!("Failed to acquire read lock");
None
}
}
}
pub struct ConsensusReadLockGuard<'a, TYPES: NodeType> {
lock_guard: RwLockReadGuard<'a, Consensus<TYPES>>,
}
impl<'a, TYPES: NodeType> ConsensusReadLockGuard<'a, TYPES> {
#[must_use]
pub fn new(lock_guard: RwLockReadGuard<'a, Consensus<TYPES>>) -> Self {
Self { lock_guard }
}
}
impl<'a, TYPES: NodeType> Deref for ConsensusReadLockGuard<'a, TYPES> {
type Target = Consensus<TYPES>;
fn deref(&self) -> &Self::Target {
&self.lock_guard
}
}
impl<'a, TYPES: NodeType> Drop for ConsensusReadLockGuard<'a, TYPES> {
#[instrument(skip_all, target = "ConsensusReadLockGuard")]
fn drop(&mut self) {
tracing::trace!("Read lock on consensus dropped");
}
}
pub struct ConsensusWriteLockGuard<'a, TYPES: NodeType> {
lock_guard: RwLockWriteGuard<'a, Consensus<TYPES>>,
}
impl<'a, TYPES: NodeType> ConsensusWriteLockGuard<'a, TYPES> {
#[must_use]
pub fn new(lock_guard: RwLockWriteGuard<'a, Consensus<TYPES>>) -> Self {
Self { lock_guard }
}
}
impl<'a, TYPES: NodeType> Deref for ConsensusWriteLockGuard<'a, TYPES> {
type Target = Consensus<TYPES>;
fn deref(&self) -> &Self::Target {
&self.lock_guard
}
}
impl<'a, TYPES: NodeType> DerefMut for ConsensusWriteLockGuard<'a, TYPES> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.lock_guard
}
}
impl<'a, TYPES: NodeType> Drop for ConsensusWriteLockGuard<'a, TYPES> {
#[instrument(skip_all, target = "ConsensusWriteLockGuard")]
fn drop(&mut self) {
tracing::debug!("Write lock on consensus dropped");
}
}
pub struct ConsensusUpgradableReadLockGuard<'a, TYPES: NodeType> {
lock_guard: ManuallyDrop<RwLockUpgradableReadGuard<'a, Consensus<TYPES>>>,
taken: bool,
}
impl<'a, TYPES: NodeType> ConsensusUpgradableReadLockGuard<'a, TYPES> {
#[must_use]
pub fn new(lock_guard: RwLockUpgradableReadGuard<'a, Consensus<TYPES>>) -> Self {
Self {
lock_guard: ManuallyDrop::new(lock_guard),
taken: false,
}
}
#[instrument(skip_all, target = "ConsensusUpgradableReadLockGuard")]
pub async fn upgrade(mut guard: Self) -> ConsensusWriteLockGuard<'a, TYPES> {
let inner_guard = unsafe { ManuallyDrop::take(&mut guard.lock_guard) };
guard.taken = true;
tracing::debug!("Trying to upgrade upgradable read lock on consensus");
let ret = RwLockUpgradableReadGuard::upgrade(inner_guard).await;
tracing::debug!("Upgraded upgradable read lock on consensus");
ConsensusWriteLockGuard::new(ret)
}
}
impl<'a, TYPES: NodeType> Deref for ConsensusUpgradableReadLockGuard<'a, TYPES> {
type Target = Consensus<TYPES>;
fn deref(&self) -> &Self::Target {
&self.lock_guard
}
}
impl<'a, TYPES: NodeType> Drop for ConsensusUpgradableReadLockGuard<'a, TYPES> {
#[instrument(skip_all, target = "ConsensusUpgradableReadLockGuard")]
fn drop(&mut self) {
if !self.taken {
unsafe { ManuallyDrop::drop(&mut self.lock_guard) }
tracing::debug!("Upgradable read lock on consensus dropped");
}
}
}
#[derive(Debug, Clone, Copy)]
struct HotShotActionViews<T: ConsensusTime> {
proposed: T,
voted: T,
da_proposed: T,
da_vote: T,
}
impl<T: ConsensusTime> Default for HotShotActionViews<T> {
fn default() -> Self {
let genesis = T::genesis();
Self {
proposed: genesis,
voted: genesis,
da_proposed: genesis,
da_vote: genesis,
}
}
}
impl<T: ConsensusTime> HotShotActionViews<T> {
fn from_view(view: T) -> Self {
Self {
proposed: view,
voted: view,
da_proposed: view,
da_vote: view,
}
}
}
#[derive(derive_more::Debug, Clone)]
pub struct Consensus<TYPES: NodeType> {
validated_state_map: BTreeMap<TYPES::View, View<TYPES>>,
vid_shares: VidShares<TYPES>,
saved_da_certs: HashMap<TYPES::View, DaCertificate2<TYPES>>,
cur_view: TYPES::View,
cur_epoch: Option<TYPES::Epoch>,
last_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
last_decided_view: TYPES::View,
locked_view: TYPES::View,
saved_leaves: CommitmentMap<Leaf2<TYPES>>,
last_actions: HotShotActionViews<TYPES::View>,
saved_payloads: BTreeMap<TYPES::View, Arc<TYPES::BlockPayload>>,
high_qc: QuorumCertificate2<TYPES>,
next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
pub metrics: Arc<ConsensusMetricsValue>,
pub epoch_height: u64,
pub drb_seeds_and_results: DrbSeedsAndResults<TYPES>,
}
#[derive(Clone, Debug)]
pub struct ConsensusMetricsValue {
pub last_synced_block_height: Box<dyn Gauge>,
pub last_decided_view: Box<dyn Gauge>,
pub last_voted_view: Box<dyn Gauge>,
pub last_decided_time: Box<dyn Gauge>,
pub current_view: Box<dyn Gauge>,
pub number_of_views_since_last_decide: Box<dyn Gauge>,
pub number_of_views_per_decide_event: Box<dyn Histogram>,
pub view_duration_as_leader: Box<dyn Histogram>,
pub invalid_qc: Box<dyn Gauge>,
pub outstanding_transactions: Box<dyn Gauge>,
pub outstanding_transactions_memory_size: Box<dyn Gauge>,
pub number_of_timeouts: Box<dyn Counter>,
pub number_of_timeouts_as_leader: Box<dyn Counter>,
pub number_of_empty_blocks_proposed: Box<dyn Counter>,
pub internal_event_queue_len: Box<dyn Gauge>,
}
impl ConsensusMetricsValue {
#[must_use]
pub fn new(metrics: &dyn Metrics) -> Self {
Self {
last_synced_block_height: metrics
.create_gauge(String::from("last_synced_block_height"), None),
last_decided_view: metrics.create_gauge(String::from("last_decided_view"), None),
last_voted_view: metrics.create_gauge(String::from("last_voted_view"), None),
last_decided_time: metrics.create_gauge(String::from("last_decided_time"), None),
current_view: metrics.create_gauge(String::from("current_view"), None),
number_of_views_since_last_decide: metrics
.create_gauge(String::from("number_of_views_since_last_decide"), None),
number_of_views_per_decide_event: metrics
.create_histogram(String::from("number_of_views_per_decide_event"), None),
view_duration_as_leader: metrics
.create_histogram(String::from("view_duration_as_leader"), None),
invalid_qc: metrics.create_gauge(String::from("invalid_qc"), None),
outstanding_transactions: metrics
.create_gauge(String::from("outstanding_transactions"), None),
outstanding_transactions_memory_size: metrics
.create_gauge(String::from("outstanding_transactions_memory_size"), None),
number_of_timeouts: metrics.create_counter(String::from("number_of_timeouts"), None),
number_of_timeouts_as_leader: metrics
.create_counter(String::from("number_of_timeouts_as_leader"), None),
number_of_empty_blocks_proposed: metrics
.create_counter(String::from("number_of_empty_blocks_proposed"), None),
internal_event_queue_len: metrics
.create_gauge(String::from("internal_event_queue_len"), None),
}
}
}
impl Default for ConsensusMetricsValue {
fn default() -> Self {
Self::new(&*NoMetrics::boxed())
}
}
impl<TYPES: NodeType> Consensus<TYPES> {
#[allow(clippy::too_many_arguments)]
pub fn new(
validated_state_map: BTreeMap<TYPES::View, View<TYPES>>,
vid_shares: Option<VidShares<TYPES>>,
cur_view: TYPES::View,
cur_epoch: Option<TYPES::Epoch>,
locked_view: TYPES::View,
last_decided_view: TYPES::View,
last_actioned_view: TYPES::View,
last_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
saved_leaves: CommitmentMap<Leaf2<TYPES>>,
saved_payloads: BTreeMap<TYPES::View, Arc<TYPES::BlockPayload>>,
high_qc: QuorumCertificate2<TYPES>,
next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
metrics: Arc<ConsensusMetricsValue>,
epoch_height: u64,
) -> Self {
Consensus {
validated_state_map,
vid_shares: vid_shares.unwrap_or_default(),
saved_da_certs: HashMap::new(),
cur_view,
cur_epoch,
last_decided_view,
last_proposals,
last_actions: HotShotActionViews::from_view(last_actioned_view),
locked_view,
saved_leaves,
saved_payloads,
high_qc,
next_epoch_high_qc,
metrics,
epoch_height,
drb_seeds_and_results: DrbSeedsAndResults::new(),
}
}
pub fn cur_view(&self) -> TYPES::View {
self.cur_view
}
pub fn cur_epoch(&self) -> Option<TYPES::Epoch> {
self.cur_epoch
}
pub fn last_decided_view(&self) -> TYPES::View {
self.last_decided_view
}
pub fn locked_view(&self) -> TYPES::View {
self.locked_view
}
pub fn high_qc(&self) -> &QuorumCertificate2<TYPES> {
&self.high_qc
}
pub fn next_epoch_high_qc(&self) -> Option<&NextEpochQuorumCertificate2<TYPES>> {
self.next_epoch_high_qc.as_ref()
}
pub fn validated_state_map(&self) -> &BTreeMap<TYPES::View, View<TYPES>> {
&self.validated_state_map
}
pub fn saved_leaves(&self) -> &CommitmentMap<Leaf2<TYPES>> {
&self.saved_leaves
}
pub fn saved_payloads(&self) -> &BTreeMap<TYPES::View, Arc<TYPES::BlockPayload>> {
&self.saved_payloads
}
pub fn vid_shares(&self) -> &VidShares<TYPES> {
&self.vid_shares
}
pub fn saved_da_certs(&self) -> &HashMap<TYPES::View, DaCertificate2<TYPES>> {
&self.saved_da_certs
}
pub fn last_proposals(
&self,
) -> &BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>> {
&self.last_proposals
}
pub fn update_view(&mut self, view_number: TYPES::View) -> Result<()> {
ensure!(
view_number > self.cur_view,
debug!("New view isn't newer than the current view.")
);
self.cur_view = view_number;
Ok(())
}
pub fn parent_leaf_info(
&self,
leaf: &Leaf2<TYPES>,
public_key: &TYPES::SignatureKey,
) -> Option<LeafInfo<TYPES>> {
let parent_view_number = leaf.justify_qc().view_number();
let parent_leaf = self
.saved_leaves
.get(&leaf.justify_qc().data().leaf_commit)?;
let parent_state_and_delta = self.state_and_delta(parent_view_number);
let (Some(state), delta) = parent_state_and_delta else {
return None;
};
let parent_vid = self
.vid_shares()
.get(&parent_view_number)
.and_then(|inner_map| inner_map.get(public_key).cloned())
.map(|prop| prop.data);
Some(LeafInfo {
leaf: parent_leaf.clone(),
state,
delta,
vid_share: parent_vid,
})
}
pub fn update_epoch(&mut self, epoch_number: TYPES::Epoch) -> Result<()> {
ensure!(
self.cur_epoch.is_none() || Some(epoch_number) > self.cur_epoch,
debug!("New epoch isn't newer than the current epoch.")
);
tracing::trace!(
"Updating epoch from {:?} to {}",
self.cur_epoch,
epoch_number
);
self.cur_epoch = Some(epoch_number);
Ok(())
}
pub fn update_action(&mut self, action: HotShotAction, view: TYPES::View) -> bool {
let old_view = match action {
HotShotAction::Vote => &mut self.last_actions.voted,
HotShotAction::Propose => &mut self.last_actions.proposed,
HotShotAction::DaPropose => &mut self.last_actions.da_proposed,
HotShotAction::DaVote => {
if view > self.last_actions.da_vote {
self.last_actions.da_vote = view;
}
return true;
}
_ => return true,
};
if view > *old_view {
*old_view = view;
return true;
}
false
}
pub fn reset_actions(&mut self) {
self.last_actions = HotShotActionViews::default();
}
pub fn update_proposed_view(
&mut self,
proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
) -> Result<()> {
ensure!(
proposal.data.view_number()
> self
.last_proposals
.last_key_value()
.map_or(TYPES::View::genesis(), |(k, _)| { *k }),
debug!("New view isn't newer than the previously proposed view.")
);
self.last_proposals
.insert(proposal.data.view_number(), proposal);
Ok(())
}
pub fn update_last_decided_view(&mut self, view_number: TYPES::View) -> Result<()> {
ensure!(
view_number > self.last_decided_view,
debug!("New view isn't newer than the previously decided view.")
);
self.last_decided_view = view_number;
Ok(())
}
pub fn update_locked_view(&mut self, view_number: TYPES::View) -> Result<()> {
ensure!(
view_number > self.locked_view,
debug!("New view isn't newer than the previously locked view.")
);
self.locked_view = view_number;
Ok(())
}
pub fn update_da_view(
&mut self,
view_number: TYPES::View,
epoch: Option<TYPES::Epoch>,
payload_commitment: VidCommitment,
) -> Result<()> {
let view = View {
view_inner: ViewInner::Da {
payload_commitment,
epoch,
},
};
self.update_validated_state_map(view_number, view)
}
pub fn update_leaf(
&mut self,
leaf: Leaf2<TYPES>,
state: Arc<TYPES::ValidatedState>,
delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
) -> Result<()> {
let view_number = leaf.view_number();
let epoch = option_epoch_from_block_number::<TYPES>(
leaf.with_epoch,
leaf.height(),
self.epoch_height,
);
let view = View {
view_inner: ViewInner::Leaf {
leaf: leaf.commit(),
state,
delta,
epoch,
},
};
self.update_validated_state_map(view_number, view)?;
self.update_saved_leaves(leaf);
Ok(())
}
fn update_validated_state_map(
&mut self,
view_number: TYPES::View,
new_view: View<TYPES>,
) -> Result<()> {
if let Some(existing_view) = self.validated_state_map().get(&view_number) {
if let ViewInner::Leaf {
delta: ref existing_delta,
..
} = existing_view.view_inner
{
if let ViewInner::Leaf {
delta: ref new_delta,
..
} = new_view.view_inner
{
ensure!(
new_delta.is_some() || existing_delta.is_none(),
debug!("Skipping the state update to not override a `Leaf` view with `Some` state delta.")
);
} else {
bail!("Skipping the state update to not override a `Leaf` view with a non-`Leaf` view.");
}
}
}
self.validated_state_map.insert(view_number, new_view);
Ok(())
}
fn update_saved_leaves(&mut self, leaf: Leaf2<TYPES>) {
self.saved_leaves.insert(leaf.commit(), leaf);
}
pub fn update_saved_payloads(
&mut self,
view_number: TYPES::View,
payload: Arc<TYPES::BlockPayload>,
) -> Result<()> {
ensure!(
!self.saved_payloads.contains_key(&view_number),
"Payload with the same view already exists."
);
self.saved_payloads.insert(view_number, payload);
Ok(())
}
pub fn update_high_qc(&mut self, high_qc: QuorumCertificate2<TYPES>) -> Result<()> {
ensure!(
high_qc.view_number > self.high_qc.view_number || high_qc == self.high_qc,
debug!("High QC with an equal or higher view exists.")
);
tracing::debug!("Updating high QC");
self.high_qc = high_qc;
Ok(())
}
pub fn update_next_epoch_high_qc(
&mut self,
high_qc: NextEpochQuorumCertificate2<TYPES>,
) -> Result<()> {
if let Some(next_epoch_high_qc) = self.next_epoch_high_qc() {
ensure!(
high_qc.view_number > next_epoch_high_qc.view_number
|| high_qc == *next_epoch_high_qc,
debug!("Next epoch high QC with an equal or higher view exists.")
);
}
tracing::debug!("Updating next epoch high QC");
self.next_epoch_high_qc = Some(high_qc);
Ok(())
}
pub fn update_vid_shares(
&mut self,
view_number: TYPES::View,
disperse: Proposal<TYPES, VidDisperseShare2<TYPES>>,
) {
self.vid_shares
.entry(view_number)
.or_default()
.insert(disperse.data.recipient_key.clone(), disperse);
}
pub fn update_saved_da_certs(&mut self, view_number: TYPES::View, cert: DaCertificate2<TYPES>) {
self.saved_da_certs.insert(view_number, cert);
}
pub fn visit_leaf_ancestors<F>(
&self,
start_from: TYPES::View,
terminator: Terminator<TYPES::View>,
ok_when_finished: bool,
mut f: F,
) -> std::result::Result<(), HotShotError<TYPES>>
where
F: FnMut(
&Leaf2<TYPES>,
Arc<<TYPES as NodeType>::ValidatedState>,
Option<Arc<<<TYPES as NodeType>::ValidatedState as ValidatedState<TYPES>>::Delta>>,
) -> bool,
{
let mut next_leaf = if let Some(view) = self.validated_state_map.get(&start_from) {
view.leaf_commitment().ok_or_else(|| {
HotShotError::InvalidState(format!(
"Visited failed view {start_from:?} leaf. Expected successful leaf"
))
})?
} else {
return Err(HotShotError::InvalidState(format!(
"View {start_from:?} leaf does not exist in state map "
)));
};
while let Some(leaf) = self.saved_leaves.get(&next_leaf) {
let view = leaf.view_number();
if let (Some(state), delta) = self.state_and_delta(view) {
if let Terminator::Exclusive(stop_before) = terminator {
if stop_before == view {
if ok_when_finished {
return Ok(());
}
break;
}
}
next_leaf = leaf.parent_commitment();
if !f(leaf, state, delta) {
return Ok(());
}
if let Terminator::Inclusive(stop_after) = terminator {
if stop_after == view {
if ok_when_finished {
return Ok(());
}
break;
}
}
} else {
return Err(HotShotError::InvalidState(format!(
"View {view:?} state does not exist in state map"
)));
}
}
Err(HotShotError::MissingLeaf(next_leaf))
}
pub fn collect_garbage(&mut self, old_anchor_view: TYPES::View, new_anchor_view: TYPES::View) {
if new_anchor_view <= old_anchor_view {
return;
}
let gc_view = TYPES::View::new(new_anchor_view.saturating_sub(1));
let anchor_entry = self
.validated_state_map
.iter()
.next()
.expect("INCONSISTENT STATE: anchor leaf not in state map!");
if **anchor_entry.0 != old_anchor_view.saturating_sub(1) {
tracing::error!(
"Something about GC has failed. Older leaf exists than the previous anchor leaf."
);
}
self.saved_da_certs
.retain(|view_number, _| *view_number >= old_anchor_view);
self.validated_state_map
.range(old_anchor_view..gc_view)
.filter_map(|(_view_number, view)| view.leaf_commitment())
.for_each(|leaf| {
self.saved_leaves.remove(&leaf);
});
self.validated_state_map = self.validated_state_map.split_off(&gc_view);
self.saved_payloads = self.saved_payloads.split_off(&gc_view);
self.vid_shares = self.vid_shares.split_off(&gc_view);
self.last_proposals = self.last_proposals.split_off(&gc_view);
}
#[must_use]
pub fn decided_leaf(&self) -> Leaf2<TYPES> {
let decided_view_num = self.last_decided_view;
let view = self.validated_state_map.get(&decided_view_num).unwrap();
let leaf = view
.leaf_commitment()
.expect("Decided leaf not found! Consensus internally inconsistent");
self.saved_leaves.get(&leaf).unwrap().clone()
}
#[must_use]
pub fn state(&self, view_number: TYPES::View) -> Option<&Arc<TYPES::ValidatedState>> {
match self.validated_state_map.get(&view_number) {
Some(view) => view.state(),
None => None,
}
}
#[must_use]
pub fn state_and_delta(&self, view_number: TYPES::View) -> StateAndDelta<TYPES> {
match self.validated_state_map.get(&view_number) {
Some(view) => view.state_and_delta(),
None => (None, None),
}
}
#[must_use]
pub fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
let decided_view_num = self.last_decided_view;
self.state_and_delta(decided_view_num)
.0
.expect("Decided state not found! Consensus internally inconsistent")
}
#[instrument(skip_all, target = "Consensus", fields(view = *view))]
pub async fn calculate_and_update_vid(
consensus: OuterConsensus<TYPES>,
view: <TYPES as NodeType>::View,
membership: Arc<RwLock<TYPES::Membership>>,
private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
) -> Option<()> {
let payload = Arc::clone(consensus.read().await.saved_payloads().get(&view)?);
let epoch = consensus
.read()
.await
.validated_state_map()
.get(&view)?
.view_inner
.epoch()?;
let vid =
VidDisperse::calculate_vid_disperse(payload.as_ref(), &membership, view, epoch, epoch)
.await
.ok()?;
let shares = VidDisperseShare2::from_vid_disperse(vid);
let mut consensus_writer = consensus.write().await;
for share in shares {
if let Some(prop) = share.to_proposal(private_key) {
consensus_writer.update_vid_shares(view, prop);
}
}
Some(())
}
pub fn is_qc_forming_eqc(&self, qc: &QuorumCertificate2<TYPES>) -> bool {
let high_qc_leaf_commit = qc.data.leaf_commit;
let is_high_qc_extended = self.is_leaf_extended(high_qc_leaf_commit);
if is_high_qc_extended {
tracing::debug!("We have formed an eQC!");
}
self.is_leaf_for_last_block(high_qc_leaf_commit) && !is_high_qc_extended
}
pub fn is_high_qc_forming_eqc(&self) -> bool {
self.is_qc_forming_eqc(self.high_qc())
}
pub fn is_leaf_forming_eqc(&self, leaf_commit: LeafCommitment<TYPES>) -> bool {
self.is_leaf_for_last_block(leaf_commit) && !self.is_leaf_extended(leaf_commit)
}
pub fn is_leaf_extended(&self, leaf_commit: LeafCommitment<TYPES>) -> bool {
if !self.is_leaf_for_last_block(leaf_commit) {
tracing::trace!("The given leaf is not for the last block in the epoch.");
return false;
}
let Some(leaf) = self.saved_leaves.get(&leaf_commit) else {
tracing::trace!("We don't have a leaf corresponding to the leaf commit");
return false;
};
let leaf_view = leaf.view_number();
let leaf_block_number = leaf.height();
let mut last_visited_view_number = leaf_view;
let mut is_leaf_extended = true;
if let Err(e) = self.visit_leaf_ancestors(
leaf_view,
Terminator::Inclusive(leaf_view - 2),
true,
|leaf, _, _| {
tracing::trace!(
"last_visited_view_number = {}, leaf.view_number = {}",
*last_visited_view_number,
*leaf.view_number()
);
if leaf.view_number() == leaf_view {
return true;
}
if last_visited_view_number - 1 != leaf.view_number() {
tracing::trace!("The chain is broken. Non consecutive views.");
is_leaf_extended = false;
return false;
}
if leaf_block_number != leaf.height() {
tracing::trace!("The chain is broken. Block numbers do not match.");
is_leaf_extended = false;
return false;
}
last_visited_view_number = leaf.view_number();
true
},
) {
is_leaf_extended = false;
tracing::debug!("Leaf ascension failed; error={e}");
}
tracing::trace!("Can the given leaf form an eQC? {}", is_leaf_extended);
is_leaf_extended
}
pub fn is_leaf_for_last_block(&self, leaf_commit: LeafCommitment<TYPES>) -> bool {
let Some(leaf) = self.saved_leaves.get(&leaf_commit) else {
tracing::trace!("We don't have a leaf corresponding to the leaf commit");
return false;
};
let block_height = leaf.height();
is_last_block_in_epoch(block_height, self.epoch_height)
}
pub fn is_high_qc_for_last_block(&self) -> bool {
let Some(leaf) = self.saved_leaves.get(&self.high_qc().data.leaf_commit) else {
tracing::trace!("We don't have a leaf corresponding to the high QC");
return false;
};
let block_height = leaf.height();
is_last_block_in_epoch(block_height, self.epoch_height)
}
pub fn check_eqc(&self, proposed_leaf: &Leaf2<TYPES>, parent_leaf: &Leaf2<TYPES>) -> bool {
if parent_leaf.view_number() == TYPES::View::genesis() {
return true;
}
let new_epoch = epoch_from_block_number(proposed_leaf.height(), self.epoch_height);
let old_epoch = epoch_from_block_number(parent_leaf.height(), self.epoch_height);
new_epoch - 1 == old_epoch && self.is_leaf_extended(parent_leaf.commit())
}
}
#[derive(Eq, Hash, PartialEq, Debug, Clone)]
pub struct CommitmentAndMetadata<TYPES: NodeType> {
pub commitment: VidCommitment,
pub builder_commitment: BuilderCommitment,
pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
pub fees: Vec1<BuilderFee<TYPES>>,
pub block_view: TYPES::View,
pub auction_result: Option<TYPES::AuctionResult>,
}