use std::{
collections::{btree_map::Entry, BTreeMap, HashMap},
fmt::Debug,
marker::PhantomData,
sync::Arc,
};
use async_broadcast::Sender;
use async_trait::async_trait;
use either::Either::{self, Left, Right};
use hotshot_types::{
message::UpgradeLock,
simple_certificate::{
DaCertificate, QuorumCertificate, TimeoutCertificate, UpgradeCertificate,
ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate2,
},
simple_vote::{
DaVote, QuorumVote, TimeoutVote, UpgradeVote, ViewSyncCommitVote, ViewSyncFinalizeVote,
ViewSyncPreCommitVote,
},
traits::{
election::Membership,
node_implementation::{NodeType, Versions},
},
vote::{Certificate, HasViewNumber, Vote, VoteAccumulator},
};
use tracing::{debug, error};
use crate::{
events::{HotShotEvent, HotShotTaskCompleted},
helpers::broadcast_event,
};
pub type VoteCollectorsMap<TYPES, VOTE, CERT, V> =
BTreeMap<<TYPES as NodeType>::Time, VoteCollectionTaskState<TYPES, VOTE, CERT, V>>;
pub struct VoteCollectionTaskState<
TYPES: NodeType,
VOTE: Vote<TYPES>,
CERT: Certificate<TYPES, Voteable = VOTE::Commitment> + Debug,
V: Versions,
> {
pub public_key: TYPES::SignatureKey,
pub membership: Arc<TYPES::Membership>,
pub accumulator: Option<VoteAccumulator<TYPES, VOTE, CERT, V>>,
pub view: TYPES::Time,
pub id: u64,
}
pub trait AggregatableVote<
TYPES: NodeType,
VOTE: Vote<TYPES>,
CERT: Certificate<TYPES, Voteable = VOTE::Commitment>,
>
{
fn leader(&self, membership: &TYPES::Membership) -> TYPES::SignatureKey;
fn make_cert_event(certificate: CERT, key: &TYPES::SignatureKey) -> HotShotEvent<TYPES>;
}
impl<
TYPES: NodeType,
VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT>,
CERT: Certificate<TYPES, Voteable = VOTE::Commitment> + Debug,
V: Versions,
> VoteCollectionTaskState<TYPES, VOTE, CERT, V>
{
#[allow(clippy::question_mark)]
pub async fn accumulate_vote(
&mut self,
vote: &VOTE,
event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
) -> Option<HotShotTaskCompleted> {
if vote.leader(&self.membership) != self.public_key {
error!("Received vote for a view in which we were not the leader.");
return None;
}
if vote.view_number() != self.view {
error!(
"Vote view does not match! vote view is {} current view is {}",
*vote.view_number(),
*self.view
);
return None;
}
let accumulator = self.accumulator.as_mut()?;
match accumulator.accumulate(vote, &self.membership).await {
Either::Left(()) => None,
Either::Right(cert) => {
debug!("Certificate Formed! {:?}", cert);
broadcast_event(
Arc::new(VOTE::make_cert_event(cert, &self.public_key)),
event_stream,
)
.await;
self.accumulator = None;
Some(HotShotTaskCompleted)
}
}
}
}
#[async_trait]
pub trait HandleVoteEvent<TYPES, VOTE, CERT>
where
TYPES: NodeType,
VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT>,
CERT: Certificate<TYPES, Voteable = VOTE::Commitment> + Debug,
{
async fn handle_vote_event(
&mut self,
event: Arc<HotShotEvent<TYPES>>,
sender: &Sender<Arc<HotShotEvent<TYPES>>>,
) -> Option<HotShotTaskCompleted>;
fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool;
}
pub struct AccumulatorInfo<TYPES: NodeType> {
pub public_key: TYPES::SignatureKey,
pub membership: Arc<TYPES::Membership>,
pub view: TYPES::Time,
pub id: u64,
}
pub async fn create_vote_accumulator<TYPES, VOTE, CERT, V>(
info: &AccumulatorInfo<TYPES>,
event: Arc<HotShotEvent<TYPES>>,
sender: &Sender<Arc<HotShotEvent<TYPES>>>,
upgrade_lock: UpgradeLock<TYPES, V>,
) -> Option<VoteCollectionTaskState<TYPES, VOTE, CERT, V>>
where
TYPES: NodeType,
VOTE: Vote<TYPES>
+ AggregatableVote<TYPES, VOTE, CERT>
+ std::marker::Send
+ std::marker::Sync
+ 'static,
CERT: Certificate<TYPES, Voteable = VOTE::Commitment>
+ Debug
+ std::marker::Send
+ std::marker::Sync
+ 'static,
V: Versions,
VoteCollectionTaskState<TYPES, VOTE, CERT, V>: HandleVoteEvent<TYPES, VOTE, CERT>,
{
let new_accumulator = VoteAccumulator {
vote_outcomes: HashMap::new(),
signers: HashMap::new(),
phantom: PhantomData,
upgrade_lock,
};
let mut state = VoteCollectionTaskState::<TYPES, VOTE, CERT, V> {
membership: Arc::clone(&info.membership),
public_key: info.public_key.clone(),
accumulator: Some(new_accumulator),
view: info.view,
id: info.id,
};
let result = state.handle_vote_event(Arc::clone(&event), sender).await;
if result == Some(HotShotTaskCompleted) {
return None;
}
Some(state)
}
#[allow(clippy::too_many_arguments)]
pub async fn handle_vote<
TYPES: NodeType,
VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT> + Send + Sync + 'static,
CERT: Certificate<TYPES, Voteable = VOTE::Commitment> + Debug + Send + Sync + 'static,
V: Versions,
>(
collectors: &mut VoteCollectorsMap<TYPES, VOTE, CERT, V>,
vote: &VOTE,
public_key: TYPES::SignatureKey,
membership: &Arc<TYPES::Membership>,
id: u64,
event: &Arc<HotShotEvent<TYPES>>,
event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
upgrade_lock: &UpgradeLock<TYPES, V>,
) where
VoteCollectionTaskState<TYPES, VOTE, CERT, V>: HandleVoteEvent<TYPES, VOTE, CERT>,
{
match collectors.entry(vote.view_number()) {
Entry::Vacant(entry) => {
debug!("Starting vote handle for view {:?}", vote.view_number());
let info = AccumulatorInfo {
public_key,
membership: Arc::clone(membership),
view: vote.view_number(),
id,
};
if let Some(collector) = create_vote_accumulator(
&info,
Arc::clone(event),
event_stream,
upgrade_lock.clone(),
)
.await
{
entry.insert(collector);
};
}
Entry::Occupied(mut entry) => {
let result = entry
.get_mut()
.handle_vote_event(Arc::clone(event), event_stream)
.await;
if result == Some(HotShotTaskCompleted) {
entry.remove();
*collectors = collectors.split_off(&vote.view_number());
}
}
}
}
type QuorumVoteState<TYPES, V> =
VoteCollectionTaskState<TYPES, QuorumVote<TYPES>, QuorumCertificate<TYPES>, V>;
type DaVoteState<TYPES, V> = VoteCollectionTaskState<TYPES, DaVote<TYPES>, DaCertificate<TYPES>, V>;
type TimeoutVoteState<TYPES, V> =
VoteCollectionTaskState<TYPES, TimeoutVote<TYPES>, TimeoutCertificate<TYPES>, V>;
type UpgradeVoteState<TYPES, V> =
VoteCollectionTaskState<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>, V>;
type ViewSyncPreCommitState<TYPES, V> = VoteCollectionTaskState<
TYPES,
ViewSyncPreCommitVote<TYPES>,
ViewSyncPreCommitCertificate2<TYPES>,
V,
>;
type ViewSyncCommitVoteState<TYPES, V> =
VoteCollectionTaskState<TYPES, ViewSyncCommitVote<TYPES>, ViewSyncCommitCertificate2<TYPES>, V>;
type ViewSyncFinalizeVoteState<TYPES, V> = VoteCollectionTaskState<
TYPES,
ViewSyncFinalizeVote<TYPES>,
ViewSyncFinalizeCertificate2<TYPES>,
V,
>;
impl<TYPES: NodeType> AggregatableVote<TYPES, QuorumVote<TYPES>, QuorumCertificate<TYPES>>
for QuorumVote<TYPES>
{
fn leader(&self, membership: &TYPES::Membership) -> TYPES::SignatureKey {
membership.leader(self.view_number() + 1)
}
fn make_cert_event(
certificate: QuorumCertificate<TYPES>,
_key: &TYPES::SignatureKey,
) -> HotShotEvent<TYPES> {
HotShotEvent::QcFormed(Left(certificate))
}
}
impl<TYPES: NodeType> AggregatableVote<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>
for UpgradeVote<TYPES>
{
fn leader(&self, membership: &TYPES::Membership) -> TYPES::SignatureKey {
membership.leader(self.view_number())
}
fn make_cert_event(
certificate: UpgradeCertificate<TYPES>,
_key: &TYPES::SignatureKey,
) -> HotShotEvent<TYPES> {
HotShotEvent::UpgradeCertificateFormed(certificate)
}
}
impl<TYPES: NodeType> AggregatableVote<TYPES, DaVote<TYPES>, DaCertificate<TYPES>>
for DaVote<TYPES>
{
fn leader(&self, membership: &TYPES::Membership) -> TYPES::SignatureKey {
membership.leader(self.view_number())
}
fn make_cert_event(
certificate: DaCertificate<TYPES>,
key: &TYPES::SignatureKey,
) -> HotShotEvent<TYPES> {
HotShotEvent::DacSend(certificate, key.clone())
}
}
impl<TYPES: NodeType> AggregatableVote<TYPES, TimeoutVote<TYPES>, TimeoutCertificate<TYPES>>
for TimeoutVote<TYPES>
{
fn leader(&self, membership: &TYPES::Membership) -> TYPES::SignatureKey {
membership.leader(self.view_number() + 1)
}
fn make_cert_event(
certificate: TimeoutCertificate<TYPES>,
_key: &TYPES::SignatureKey,
) -> HotShotEvent<TYPES> {
HotShotEvent::QcFormed(Right(certificate))
}
}
impl<TYPES: NodeType>
AggregatableVote<TYPES, ViewSyncCommitVote<TYPES>, ViewSyncCommitCertificate2<TYPES>>
for ViewSyncCommitVote<TYPES>
{
fn leader(&self, membership: &TYPES::Membership) -> TYPES::SignatureKey {
membership.leader(self.date().round + self.date().relay)
}
fn make_cert_event(
certificate: ViewSyncCommitCertificate2<TYPES>,
key: &TYPES::SignatureKey,
) -> HotShotEvent<TYPES> {
HotShotEvent::ViewSyncCommitCertificate2Send(certificate, key.clone())
}
}
impl<TYPES: NodeType>
AggregatableVote<TYPES, ViewSyncPreCommitVote<TYPES>, ViewSyncPreCommitCertificate2<TYPES>>
for ViewSyncPreCommitVote<TYPES>
{
fn leader(&self, membership: &TYPES::Membership) -> TYPES::SignatureKey {
membership.leader(self.date().round + self.date().relay)
}
fn make_cert_event(
certificate: ViewSyncPreCommitCertificate2<TYPES>,
key: &TYPES::SignatureKey,
) -> HotShotEvent<TYPES> {
HotShotEvent::ViewSyncPreCommitCertificate2Send(certificate, key.clone())
}
}
impl<TYPES: NodeType>
AggregatableVote<TYPES, ViewSyncFinalizeVote<TYPES>, ViewSyncFinalizeCertificate2<TYPES>>
for ViewSyncFinalizeVote<TYPES>
{
fn leader(&self, membership: &TYPES::Membership) -> TYPES::SignatureKey {
membership.leader(self.date().round + self.date().relay)
}
fn make_cert_event(
certificate: ViewSyncFinalizeCertificate2<TYPES>,
key: &TYPES::SignatureKey,
) -> HotShotEvent<TYPES> {
HotShotEvent::ViewSyncFinalizeCertificate2Send(certificate, key.clone())
}
}
#[async_trait]
impl<TYPES: NodeType, V: Versions>
HandleVoteEvent<TYPES, QuorumVote<TYPES>, QuorumCertificate<TYPES>>
for QuorumVoteState<TYPES, V>
{
async fn handle_vote_event(
&mut self,
event: Arc<HotShotEvent<TYPES>>,
sender: &Sender<Arc<HotShotEvent<TYPES>>>,
) -> Option<HotShotTaskCompleted> {
match event.as_ref() {
HotShotEvent::QuorumVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
_ => None,
}
}
fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
matches!(event.as_ref(), HotShotEvent::QuorumVoteRecv(_))
}
}
#[async_trait]
impl<TYPES: NodeType, V: Versions>
HandleVoteEvent<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>
for UpgradeVoteState<TYPES, V>
{
async fn handle_vote_event(
&mut self,
event: Arc<HotShotEvent<TYPES>>,
sender: &Sender<Arc<HotShotEvent<TYPES>>>,
) -> Option<HotShotTaskCompleted> {
match event.as_ref() {
HotShotEvent::UpgradeVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
_ => None,
}
}
fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
matches!(event.as_ref(), HotShotEvent::UpgradeVoteRecv(_))
}
}
#[async_trait]
impl<TYPES: NodeType, V: Versions> HandleVoteEvent<TYPES, DaVote<TYPES>, DaCertificate<TYPES>>
for DaVoteState<TYPES, V>
{
async fn handle_vote_event(
&mut self,
event: Arc<HotShotEvent<TYPES>>,
sender: &Sender<Arc<HotShotEvent<TYPES>>>,
) -> Option<HotShotTaskCompleted> {
match event.as_ref() {
HotShotEvent::DaVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
_ => None,
}
}
fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
matches!(event.as_ref(), HotShotEvent::DaVoteRecv(_))
}
}
#[async_trait]
impl<TYPES: NodeType, V: Versions>
HandleVoteEvent<TYPES, TimeoutVote<TYPES>, TimeoutCertificate<TYPES>>
for TimeoutVoteState<TYPES, V>
{
async fn handle_vote_event(
&mut self,
event: Arc<HotShotEvent<TYPES>>,
sender: &Sender<Arc<HotShotEvent<TYPES>>>,
) -> Option<HotShotTaskCompleted> {
match event.as_ref() {
HotShotEvent::TimeoutVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
_ => None,
}
}
fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
matches!(event.as_ref(), HotShotEvent::TimeoutVoteRecv(_))
}
}
#[async_trait]
impl<TYPES: NodeType, V: Versions>
HandleVoteEvent<TYPES, ViewSyncPreCommitVote<TYPES>, ViewSyncPreCommitCertificate2<TYPES>>
for ViewSyncPreCommitState<TYPES, V>
{
async fn handle_vote_event(
&mut self,
event: Arc<HotShotEvent<TYPES>>,
sender: &Sender<Arc<HotShotEvent<TYPES>>>,
) -> Option<HotShotTaskCompleted> {
match event.as_ref() {
HotShotEvent::ViewSyncPreCommitVoteRecv(vote) => {
self.accumulate_vote(vote, sender).await
}
_ => None,
}
}
fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
matches!(event.as_ref(), HotShotEvent::ViewSyncPreCommitVoteRecv(_))
}
}
#[async_trait]
impl<TYPES: NodeType, V: Versions>
HandleVoteEvent<TYPES, ViewSyncCommitVote<TYPES>, ViewSyncCommitCertificate2<TYPES>>
for ViewSyncCommitVoteState<TYPES, V>
{
async fn handle_vote_event(
&mut self,
event: Arc<HotShotEvent<TYPES>>,
sender: &Sender<Arc<HotShotEvent<TYPES>>>,
) -> Option<HotShotTaskCompleted> {
match event.as_ref() {
HotShotEvent::ViewSyncCommitVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
_ => None,
}
}
fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
matches!(event.as_ref(), HotShotEvent::ViewSyncCommitVoteRecv(_))
}
}
#[async_trait]
impl<TYPES: NodeType, V: Versions>
HandleVoteEvent<TYPES, ViewSyncFinalizeVote<TYPES>, ViewSyncFinalizeCertificate2<TYPES>>
for ViewSyncFinalizeVoteState<TYPES, V>
{
async fn handle_vote_event(
&mut self,
event: Arc<HotShotEvent<TYPES>>,
sender: &Sender<Arc<HotShotEvent<TYPES>>>,
) -> Option<HotShotTaskCompleted> {
match event.as_ref() {
HotShotEvent::ViewSyncFinalizeVoteRecv(vote) => {
self.accumulate_vote(vote, sender).await
}
_ => None,
}
}
fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
matches!(event.as_ref(), HotShotEvent::ViewSyncFinalizeVoteRecv(_))
}
}