use std::{
fmt::{self, Debug},
marker::PhantomData,
sync::Arc,
};
use anyhow::{bail, ensure, Context, Result};
use async_lock::RwLock;
use cdn_proto::util::mnemonic;
use derivative::Derivative;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use vbs::{
version::{StaticVersionType, Version},
BinarySerializer, Serializer,
};
use crate::{
data::{DaProposal, Leaf, QuorumProposal, UpgradeProposal, VidDisperseShare},
request_response::ProposalRequestPayload,
simple_certificate::{
DaCertificate, UpgradeCertificate, ViewSyncCommitCertificate2,
ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate2,
},
simple_vote::{
DaVote, QuorumVote, TimeoutVote, UpgradeVote, ViewSyncCommitVote, ViewSyncFinalizeVote,
ViewSyncPreCommitVote,
},
traits::{
election::Membership,
network::{DataRequest, ResponseMessage, ViewMessage},
node_implementation::{ConsensusTime, NodeType, Versions},
signature_key::SignatureKey,
},
vote::HasViewNumber,
};
#[derive(Serialize, Deserialize, Clone, Derivative, PartialEq, Eq, Hash)]
#[serde(bound(deserialize = "", serialize = ""))]
pub struct Message<TYPES: NodeType> {
pub sender: TYPES::SignatureKey,
pub kind: MessageKind<TYPES>,
}
impl<TYPES: NodeType> fmt::Debug for Message<TYPES> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Message")
.field("sender", &mnemonic(&self.sender))
.field("kind", &self.kind)
.finish()
}
}
impl<TYPES: NodeType> HasViewNumber<TYPES> for Message<TYPES> {
fn view_number(&self) -> TYPES::View {
self.kind.view_number()
}
}
#[derive(Clone, Debug)]
pub struct Messages<TYPES: NodeType>(pub Vec<Message<TYPES>>);
#[derive(PartialEq, Copy, Clone)]
pub enum MessagePurpose {
Proposal,
LatestProposal,
LatestViewSyncCertificate,
Vote,
ViewSyncVote,
ViewSyncCertificate,
DaCertificate,
Internal,
Data,
VidDisperse,
UpgradeProposal,
UpgradeVote,
External,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Hash, Eq)]
#[serde(bound(deserialize = "", serialize = ""))]
pub enum MessageKind<TYPES: NodeType> {
Consensus(SequencingMessage<TYPES>),
Data(DataMessage<TYPES>),
External(Vec<u8>),
}
pub enum RecipientList<K: SignatureKey> {
Broadcast,
Direct(K),
Many(Vec<K>),
}
impl<TYPES: NodeType> MessageKind<TYPES> {
pub fn from_consensus_message(m: SequencingMessage<TYPES>) -> Self {
Self::Consensus(m)
}
}
impl<TYPES: NodeType> From<DataMessage<TYPES>> for MessageKind<TYPES> {
fn from(m: DataMessage<TYPES>) -> Self {
Self::Data(m)
}
}
impl<TYPES: NodeType> ViewMessage<TYPES> for MessageKind<TYPES> {
fn view_number(&self) -> TYPES::View {
match &self {
MessageKind::Consensus(message) => message.view_number(),
MessageKind::Data(DataMessage::SubmitTransaction(_, v)) => *v,
MessageKind::Data(DataMessage::RequestData(msg)) => msg.view,
MessageKind::Data(DataMessage::DataResponse(msg)) => match msg {
ResponseMessage::Found(m) => m.view_number(),
ResponseMessage::NotFound | ResponseMessage::Denied => TYPES::View::new(1),
},
MessageKind::External(_) => TYPES::View::new(1),
}
}
fn purpose(&self) -> MessagePurpose {
match &self {
MessageKind::Consensus(message) => message.purpose(),
MessageKind::Data(_) => MessagePurpose::Data,
MessageKind::External(_) => MessagePurpose::External,
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
#[serde(bound(deserialize = "", serialize = ""))]
pub enum GeneralConsensusMessage<TYPES: NodeType> {
Proposal(Proposal<TYPES, QuorumProposal<TYPES>>),
Vote(QuorumVote<TYPES>),
ViewSyncPreCommitVote(ViewSyncPreCommitVote<TYPES>),
ViewSyncCommitVote(ViewSyncCommitVote<TYPES>),
ViewSyncFinalizeVote(ViewSyncFinalizeVote<TYPES>),
ViewSyncPreCommitCertificate(ViewSyncPreCommitCertificate2<TYPES>),
ViewSyncCommitCertificate(ViewSyncCommitCertificate2<TYPES>),
ViewSyncFinalizeCertificate(ViewSyncFinalizeCertificate2<TYPES>),
TimeoutVote(TimeoutVote<TYPES>),
UpgradeProposal(Proposal<TYPES, UpgradeProposal<TYPES>>),
UpgradeVote(UpgradeVote<TYPES>),
ProposalRequested(
ProposalRequestPayload<TYPES>,
<TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
),
LeaderProposalAvailable(Proposal<TYPES, QuorumProposal<TYPES>>),
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Hash, Eq)]
#[serde(bound(deserialize = "", serialize = ""))]
pub enum DaConsensusMessage<TYPES: NodeType> {
DaProposal(Proposal<TYPES, DaProposal<TYPES>>),
DaVote(DaVote<TYPES>),
DaCertificate(DaCertificate<TYPES>),
VidDisperseMsg(Proposal<TYPES, VidDisperseShare<TYPES>>),
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash)]
#[serde(bound(deserialize = "", serialize = ""))]
pub enum SequencingMessage<TYPES: NodeType> {
General(GeneralConsensusMessage<TYPES>),
Da(DaConsensusMessage<TYPES>),
}
impl<TYPES: NodeType> SequencingMessage<TYPES> {
fn view_number(&self) -> TYPES::View {
match &self {
SequencingMessage::General(general_message) => {
match general_message {
GeneralConsensusMessage::Proposal(p) => {
p.data.view_number()
}
GeneralConsensusMessage::ProposalRequested(req, _) => req.view_number,
GeneralConsensusMessage::LeaderProposalAvailable(proposal) => {
proposal.data.view_number()
}
GeneralConsensusMessage::Vote(vote_message) => vote_message.view_number(),
GeneralConsensusMessage::TimeoutVote(message) => message.view_number(),
GeneralConsensusMessage::ViewSyncPreCommitVote(message) => {
message.view_number()
}
GeneralConsensusMessage::ViewSyncCommitVote(message) => message.view_number(),
GeneralConsensusMessage::ViewSyncFinalizeVote(message) => message.view_number(),
GeneralConsensusMessage::ViewSyncPreCommitCertificate(message) => {
message.view_number()
}
GeneralConsensusMessage::ViewSyncCommitCertificate(message) => {
message.view_number()
}
GeneralConsensusMessage::ViewSyncFinalizeCertificate(message) => {
message.view_number()
}
GeneralConsensusMessage::UpgradeProposal(message) => message.data.view_number(),
GeneralConsensusMessage::UpgradeVote(message) => message.view_number(),
}
}
SequencingMessage::Da(da_message) => {
match da_message {
DaConsensusMessage::DaProposal(p) => {
p.data.view_number()
}
DaConsensusMessage::DaVote(vote_message) => vote_message.view_number(),
DaConsensusMessage::DaCertificate(cert) => cert.view_number,
DaConsensusMessage::VidDisperseMsg(disperse) => disperse.data.view_number(),
}
}
}
}
#[allow(clippy::panic)]
fn purpose(&self) -> MessagePurpose {
match &self {
SequencingMessage::General(general_message) => match general_message {
GeneralConsensusMessage::Proposal(_) => MessagePurpose::Proposal,
GeneralConsensusMessage::ProposalRequested(_, _)
| GeneralConsensusMessage::LeaderProposalAvailable(_) => {
MessagePurpose::LatestProposal
}
GeneralConsensusMessage::Vote(_) | GeneralConsensusMessage::TimeoutVote(_) => {
MessagePurpose::Vote
}
GeneralConsensusMessage::ViewSyncPreCommitVote(_)
| GeneralConsensusMessage::ViewSyncCommitVote(_)
| GeneralConsensusMessage::ViewSyncFinalizeVote(_) => MessagePurpose::ViewSyncVote,
GeneralConsensusMessage::ViewSyncPreCommitCertificate(_)
| GeneralConsensusMessage::ViewSyncCommitCertificate(_)
| GeneralConsensusMessage::ViewSyncFinalizeCertificate(_) => {
MessagePurpose::ViewSyncCertificate
}
GeneralConsensusMessage::UpgradeProposal(_) => MessagePurpose::UpgradeProposal,
GeneralConsensusMessage::UpgradeVote(_) => MessagePurpose::UpgradeVote,
},
SequencingMessage::Da(da_message) => match da_message {
DaConsensusMessage::DaProposal(_) => MessagePurpose::Proposal,
DaConsensusMessage::DaVote(_) => MessagePurpose::Vote,
DaConsensusMessage::DaCertificate(_) => MessagePurpose::DaCertificate,
DaConsensusMessage::VidDisperseMsg(_) => MessagePurpose::VidDisperse,
},
}
}
}
#[derive(Serialize, Deserialize, Derivative, Clone, Debug, PartialEq, Eq, Hash)]
#[serde(bound(deserialize = ""))]
#[allow(clippy::large_enum_variant)]
pub enum DataMessage<TYPES: NodeType> {
SubmitTransaction(TYPES::Transaction, TYPES::View),
RequestData(DataRequest<TYPES>),
DataResponse(ResponseMessage<TYPES>),
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
#[serde(bound(deserialize = ""))]
pub struct Proposal<TYPES: NodeType, PROPOSAL: HasViewNumber<TYPES> + DeserializeOwned> {
pub data: PROPOSAL,
pub signature: <TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
pub _pd: PhantomData<TYPES>,
}
impl<TYPES> Proposal<TYPES, QuorumProposal<TYPES>>
where
TYPES: NodeType,
{
pub async fn validate_signature<V: Versions>(
&self,
quorum_membership: &TYPES::Membership,
epoch: TYPES::Epoch,
upgrade_lock: &UpgradeLock<TYPES, V>,
) -> Result<()> {
let view_number = self.data.view_number();
let view_leader_key = quorum_membership.leader(view_number, epoch);
let proposed_leaf = Leaf::from_quorum_proposal(&self.data);
ensure!(
view_leader_key.validate(
&self.signature,
proposed_leaf.commit(upgrade_lock).await.as_ref()
),
"Proposal signature is invalid."
);
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct UpgradeLock<TYPES: NodeType, V: Versions> {
pub decided_upgrade_certificate: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
pub _pd: PhantomData<V>,
}
impl<TYPES: NodeType, V: Versions> UpgradeLock<TYPES, V> {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Self {
decided_upgrade_certificate: Arc::new(RwLock::new(None)),
_pd: PhantomData::<V>,
}
}
#[allow(clippy::new_without_default)]
pub fn from_certificate(certificate: &Option<UpgradeCertificate<TYPES>>) -> Self {
Self {
decided_upgrade_certificate: Arc::new(RwLock::new(certificate.clone())),
_pd: PhantomData::<V>,
}
}
pub async fn version(&self, view: TYPES::View) -> Result<Version> {
let upgrade_certificate = self.decided_upgrade_certificate.read().await;
let version = match *upgrade_certificate {
Some(ref cert) => {
if view >= cert.data.new_version_first_view {
if cert.data.new_version == V::Upgrade::VERSION {
V::Upgrade::VERSION
} else {
bail!("The network has upgraded to a new version that we do not support!");
}
} else {
V::Base::VERSION
}
}
None => V::Base::VERSION,
};
Ok(version)
}
pub async fn version_infallible(&self, view: TYPES::View) -> Version {
let upgrade_certificate = self.decided_upgrade_certificate.read().await;
match *upgrade_certificate {
Some(ref cert) => {
if view >= cert.data.new_version_first_view {
cert.data.new_version
} else {
cert.data.old_version
}
}
None => V::Base::VERSION,
}
}
pub async fn serialize<M: HasViewNumber<TYPES> + Serialize>(
&self,
message: &M,
) -> Result<Vec<u8>> {
let view = message.view_number();
let version = self.version(view).await?;
let serialized_message = match version {
v if v == V::Base::VERSION => Serializer::<V::Base>::serialize(&message),
v if v == V::Upgrade::VERSION => Serializer::<V::Upgrade>::serialize(&message),
v => {
bail!("Attempted to serialize with version {}, which is incompatible. This should be impossible.", v);
}
};
serialized_message.context("Failed to serialize message!")
}
pub async fn deserialize<M: HasViewNumber<TYPES> + for<'a> Deserialize<'a>>(
&self,
message: &[u8],
) -> Result<M> {
let actual_version = Version::deserialize(message)
.context("Failed to read message version!")?
.0;
let deserialized_message: M = match actual_version {
v if v == V::Base::VERSION => Serializer::<V::Base>::deserialize(message),
v if v == V::Upgrade::VERSION => Serializer::<V::Upgrade>::deserialize(message),
v => {
bail!("Cannot deserialize message with stated version {}", v);
}
}
.context("Failed to deserialize message!")?;
let view = deserialized_message.view_number();
let expected_version = self.version(view).await?;
ensure!(
actual_version == expected_version,
"Message has invalid version number for its view. Expected: {expected_version}, Actual: {actual_version}, View: {view:?}"
);
Ok(deserialized_message)
}
}