use std::{
fmt::{self, Debug},
marker::PhantomData,
sync::Arc,
};
use async_lock::RwLock;
use committable::Committable;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use utils::anytrace::*;
use vbs::{
version::{StaticVersionType, Version},
BinarySerializer, Serializer,
};
use crate::{
data::{
DaProposal, DaProposal2, Leaf, Leaf2, QuorumProposal, QuorumProposal2, UpgradeProposal,
VidDisperseShare, VidDisperseShare2,
},
request_response::ProposalRequestPayload,
simple_certificate::{
DaCertificate, DaCertificate2, QuorumCertificate2, UpgradeCertificate,
ViewSyncCommitCertificate, ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate,
ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate, ViewSyncPreCommitCertificate2,
},
simple_vote::{
DaVote, DaVote2, QuorumVote, QuorumVote2, TimeoutVote, TimeoutVote2, UpgradeVote,
ViewSyncCommitVote, ViewSyncCommitVote2, ViewSyncFinalizeVote, ViewSyncFinalizeVote2,
ViewSyncPreCommitVote, ViewSyncPreCommitVote2,
},
traits::{
block_contents::BlockHeader,
election::Membership,
network::{DataRequest, ResponseMessage, ViewMessage},
node_implementation::{ConsensusTime, NodeType, Versions},
signature_key::SignatureKey,
},
utils::{epoch_from_block_number, mnemonic},
vote::HasViewNumber,
};
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Hash)]
#[serde(bound(deserialize = "", serialize = ""))]
pub struct Message<TYPES: NodeType> {
pub sender: TYPES::SignatureKey,
pub kind: MessageKind<TYPES>,
}
impl<TYPES: NodeType> fmt::Debug for Message<TYPES> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Message")
.field("sender", &mnemonic(&self.sender))
.field("kind", &self.kind)
.finish()
}
}
impl<TYPES: NodeType> HasViewNumber<TYPES> for Message<TYPES> {
fn view_number(&self) -> TYPES::View {
self.kind.view_number()
}
}
#[derive(Clone, Debug)]
pub struct Messages<TYPES: NodeType>(pub Vec<Message<TYPES>>);
#[derive(PartialEq, Copy, Clone)]
pub enum MessagePurpose {
Proposal,
LatestProposal,
LatestViewSyncCertificate,
Vote,
ViewSyncVote,
ViewSyncCertificate,
DaCertificate,
Internal,
Data,
VidDisperse,
UpgradeProposal,
UpgradeVote,
External,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Hash, Eq)]
#[serde(bound(deserialize = "", serialize = ""))]
pub enum MessageKind<TYPES: NodeType> {
Consensus(SequencingMessage<TYPES>),
Data(DataMessage<TYPES>),
External(Vec<u8>),
}
pub enum RecipientList<K: SignatureKey> {
Broadcast,
Direct(K),
Many(Vec<K>),
}
impl<TYPES: NodeType> MessageKind<TYPES> {
pub fn from_consensus_message(m: SequencingMessage<TYPES>) -> Self {
Self::Consensus(m)
}
}
impl<TYPES: NodeType> From<DataMessage<TYPES>> for MessageKind<TYPES> {
fn from(m: DataMessage<TYPES>) -> Self {
Self::Data(m)
}
}
impl<TYPES: NodeType> ViewMessage<TYPES> for MessageKind<TYPES> {
fn view_number(&self) -> TYPES::View {
match &self {
MessageKind::Consensus(message) => message.view_number(),
MessageKind::Data(DataMessage::SubmitTransaction(_, v)) => *v,
MessageKind::Data(DataMessage::RequestData(msg)) => msg.view,
MessageKind::Data(DataMessage::DataResponse(msg)) => match msg {
ResponseMessage::Found(m) => m.view_number(),
ResponseMessage::NotFound | ResponseMessage::Denied => TYPES::View::new(1),
},
MessageKind::External(_) => TYPES::View::new(1),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
#[serde(bound(deserialize = "", serialize = ""))]
pub enum GeneralConsensusMessage<TYPES: NodeType> {
Proposal(Proposal<TYPES, QuorumProposal<TYPES>>),
Vote(QuorumVote<TYPES>),
ViewSyncPreCommitVote(ViewSyncPreCommitVote<TYPES>),
ViewSyncCommitVote(ViewSyncCommitVote<TYPES>),
ViewSyncFinalizeVote(ViewSyncFinalizeVote<TYPES>),
ViewSyncPreCommitCertificate(ViewSyncPreCommitCertificate<TYPES>),
ViewSyncCommitCertificate(ViewSyncCommitCertificate<TYPES>),
ViewSyncFinalizeCertificate(ViewSyncFinalizeCertificate<TYPES>),
TimeoutVote(TimeoutVote<TYPES>),
UpgradeProposal(Proposal<TYPES, UpgradeProposal<TYPES>>),
UpgradeVote(UpgradeVote<TYPES>),
Proposal2(Proposal<TYPES, QuorumProposal2<TYPES>>),
Vote2(QuorumVote2<TYPES>),
ProposalRequested(
ProposalRequestPayload<TYPES>,
<TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
),
ProposalResponse(Proposal<TYPES, QuorumProposal<TYPES>>),
ProposalResponse2(Proposal<TYPES, QuorumProposal2<TYPES>>),
HighQc(QuorumCertificate2<TYPES>),
ViewSyncPreCommitVote2(ViewSyncPreCommitVote2<TYPES>),
ViewSyncCommitVote2(ViewSyncCommitVote2<TYPES>),
ViewSyncFinalizeVote2(ViewSyncFinalizeVote2<TYPES>),
ViewSyncPreCommitCertificate2(ViewSyncPreCommitCertificate2<TYPES>),
ViewSyncCommitCertificate2(ViewSyncCommitCertificate2<TYPES>),
ViewSyncFinalizeCertificate2(ViewSyncFinalizeCertificate2<TYPES>),
TimeoutVote2(TimeoutVote2<TYPES>),
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Hash, Eq)]
#[serde(bound(deserialize = "", serialize = ""))]
pub enum DaConsensusMessage<TYPES: NodeType> {
DaProposal(Proposal<TYPES, DaProposal<TYPES>>),
DaVote(DaVote<TYPES>),
DaCertificate(DaCertificate<TYPES>),
VidDisperseMsg(Proposal<TYPES, VidDisperseShare<TYPES>>),
DaProposal2(Proposal<TYPES, DaProposal2<TYPES>>),
DaVote2(DaVote2<TYPES>),
DaCertificate2(DaCertificate2<TYPES>),
VidDisperseMsg2(Proposal<TYPES, VidDisperseShare2<TYPES>>),
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash)]
#[serde(bound(deserialize = "", serialize = ""))]
pub enum SequencingMessage<TYPES: NodeType> {
General(GeneralConsensusMessage<TYPES>),
Da(DaConsensusMessage<TYPES>),
}
impl<TYPES: NodeType> SequencingMessage<TYPES> {
fn view_number(&self) -> TYPES::View {
match &self {
SequencingMessage::General(general_message) => {
match general_message {
GeneralConsensusMessage::Proposal(p) => {
p.data.view_number()
}
GeneralConsensusMessage::Proposal2(p) => {
p.data.view_number()
}
GeneralConsensusMessage::ProposalRequested(req, _) => req.view_number,
GeneralConsensusMessage::ProposalResponse(proposal) => {
proposal.data.view_number()
}
GeneralConsensusMessage::ProposalResponse2(proposal) => {
proposal.data.view_number()
}
GeneralConsensusMessage::Vote(vote_message) => vote_message.view_number(),
GeneralConsensusMessage::Vote2(vote_message) => vote_message.view_number(),
GeneralConsensusMessage::TimeoutVote(message) => message.view_number(),
GeneralConsensusMessage::ViewSyncPreCommitVote(message) => {
message.view_number()
}
GeneralConsensusMessage::ViewSyncCommitVote(message) => message.view_number(),
GeneralConsensusMessage::ViewSyncFinalizeVote(message) => message.view_number(),
GeneralConsensusMessage::ViewSyncPreCommitCertificate(message) => {
message.view_number()
}
GeneralConsensusMessage::ViewSyncCommitCertificate(message) => {
message.view_number()
}
GeneralConsensusMessage::ViewSyncFinalizeCertificate(message) => {
message.view_number()
}
GeneralConsensusMessage::TimeoutVote2(message) => message.view_number(),
GeneralConsensusMessage::ViewSyncPreCommitVote2(message) => {
message.view_number()
}
GeneralConsensusMessage::ViewSyncCommitVote2(message) => message.view_number(),
GeneralConsensusMessage::ViewSyncFinalizeVote2(message) => {
message.view_number()
}
GeneralConsensusMessage::ViewSyncPreCommitCertificate2(message) => {
message.view_number()
}
GeneralConsensusMessage::ViewSyncCommitCertificate2(message) => {
message.view_number()
}
GeneralConsensusMessage::ViewSyncFinalizeCertificate2(message) => {
message.view_number()
}
GeneralConsensusMessage::UpgradeProposal(message) => message.data.view_number(),
GeneralConsensusMessage::UpgradeVote(message) => message.view_number(),
GeneralConsensusMessage::HighQc(qc) => qc.view_number(),
}
}
SequencingMessage::Da(da_message) => {
match da_message {
DaConsensusMessage::DaProposal(p) => {
p.data.view_number()
}
DaConsensusMessage::DaVote(vote_message) => vote_message.view_number(),
DaConsensusMessage::DaCertificate(cert) => cert.view_number,
DaConsensusMessage::VidDisperseMsg(disperse) => disperse.data.view_number(),
DaConsensusMessage::VidDisperseMsg2(disperse) => disperse.data.view_number(),
DaConsensusMessage::DaProposal2(p) => {
p.data.view_number()
}
DaConsensusMessage::DaVote2(vote_message) => vote_message.view_number(),
DaConsensusMessage::DaCertificate2(cert) => cert.view_number,
}
}
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
#[serde(bound(deserialize = ""))]
#[allow(clippy::large_enum_variant)]
pub enum DataMessage<TYPES: NodeType> {
SubmitTransaction(TYPES::Transaction, TYPES::View),
RequestData(DataRequest<TYPES>),
DataResponse(ResponseMessage<TYPES>),
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
#[serde(bound(deserialize = ""))]
pub struct Proposal<TYPES: NodeType, PROPOSAL: HasViewNumber<TYPES> + DeserializeOwned> {
pub data: PROPOSAL,
pub signature: <TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
pub _pd: PhantomData<TYPES>,
}
pub fn convert_proposal<TYPES, PROPOSAL, PROPOSAL2>(
proposal: Proposal<TYPES, PROPOSAL>,
) -> Proposal<TYPES, PROPOSAL2>
where
TYPES: NodeType,
PROPOSAL: HasViewNumber<TYPES> + DeserializeOwned,
PROPOSAL2: HasViewNumber<TYPES> + DeserializeOwned + From<PROPOSAL>,
{
Proposal {
data: proposal.data.into(),
signature: proposal.signature,
_pd: proposal._pd,
}
}
impl<TYPES> Proposal<TYPES, QuorumProposal<TYPES>>
where
TYPES: NodeType,
{
pub async fn validate_signature<V: Versions>(
&self,
membership: &TYPES::Membership,
epoch_height: u64,
upgrade_lock: &UpgradeLock<TYPES, V>,
) -> Result<()> {
let view_number = self.data.view_number();
let proposal_epoch = TYPES::Epoch::new(epoch_from_block_number(
self.data.block_header.block_number(),
epoch_height,
));
let view_leader_key = membership.leader(view_number, proposal_epoch)?;
let proposed_leaf = Leaf::from_quorum_proposal(&self.data);
ensure!(
view_leader_key.validate(
&self.signature,
proposed_leaf.commit(upgrade_lock).await.as_ref()
),
"Proposal signature is invalid."
);
Ok(())
}
}
impl<TYPES> Proposal<TYPES, QuorumProposal2<TYPES>>
where
TYPES: NodeType,
{
pub fn validate_signature(
&self,
membership: &TYPES::Membership,
epoch_height: u64,
) -> Result<()> {
let view_number = self.data.view_number();
let proposal_epoch = TYPES::Epoch::new(epoch_from_block_number(
self.data.block_header.block_number(),
epoch_height,
));
let view_leader_key = membership.leader(view_number, proposal_epoch)?;
let proposed_leaf = Leaf2::from_quorum_proposal(&self.data);
ensure!(
view_leader_key.validate(&self.signature, proposed_leaf.commit().as_ref()),
"Proposal signature is invalid."
);
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct UpgradeLock<TYPES: NodeType, V: Versions> {
pub decided_upgrade_certificate: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
pub _pd: PhantomData<V>,
}
impl<TYPES: NodeType, V: Versions> UpgradeLock<TYPES, V> {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Self {
decided_upgrade_certificate: Arc::new(RwLock::new(None)),
_pd: PhantomData::<V>,
}
}
#[allow(clippy::new_without_default)]
pub fn from_certificate(certificate: &Option<UpgradeCertificate<TYPES>>) -> Self {
Self {
decided_upgrade_certificate: Arc::new(RwLock::new(certificate.clone())),
_pd: PhantomData::<V>,
}
}
pub async fn version(&self, view: TYPES::View) -> Result<Version> {
let upgrade_certificate = self.decided_upgrade_certificate.read().await;
let version = match *upgrade_certificate {
Some(ref cert) => {
if view >= cert.data.new_version_first_view {
if cert.data.new_version == V::Upgrade::VERSION {
V::Upgrade::VERSION
} else {
bail!("The network has upgraded to a new version that we do not support!");
}
} else {
V::Base::VERSION
}
}
None => V::Base::VERSION,
};
Ok(version)
}
pub async fn version_infallible(&self, view: TYPES::View) -> Version {
let upgrade_certificate = self.decided_upgrade_certificate.read().await;
match *upgrade_certificate {
Some(ref cert) => {
if view >= cert.data.new_version_first_view {
cert.data.new_version
} else {
cert.data.old_version
}
}
None => V::Base::VERSION,
}
}
pub async fn serialize<M: HasViewNumber<TYPES> + Serialize>(
&self,
message: &M,
) -> Result<Vec<u8>> {
let view = message.view_number();
let version = self.version(view).await?;
let serialized_message = match version {
v if v == V::Base::VERSION => Serializer::<V::Base>::serialize(&message),
v if v == V::Upgrade::VERSION => Serializer::<V::Upgrade>::serialize(&message),
v => {
bail!("Attempted to serialize with version {}, which is incompatible. This should be impossible.", v);
}
};
serialized_message
.wrap()
.context(info!("Failed to serialize message!"))
}
pub async fn deserialize<M: HasViewNumber<TYPES> + for<'a> Deserialize<'a>>(
&self,
message: &[u8],
) -> Result<M> {
let actual_version = Version::deserialize(message)
.wrap()
.context(info!("Failed to read message version!"))?
.0;
let deserialized_message: M = match actual_version {
v if v == V::Base::VERSION => Serializer::<V::Base>::deserialize(message),
v if v == V::Upgrade::VERSION => Serializer::<V::Upgrade>::deserialize(message),
v => {
bail!("Cannot deserialize message with stated version {}", v);
}
}
.wrap()
.context(info!("Failed to deserialize message!"))?;
let view = deserialized_message.view_number();
let expected_version = self.version(view).await?;
ensure!(
actual_version == expected_version,
"Message has invalid version number for its view. Expected: {expected_version}, Actual: {actual_version}, View: {view:?}"
);
Ok(deserialized_message)
}
}