1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
// This file is part of the HotShot repository.
// You should have received a copy of the MIT License
// along with the HotShot repository. If not, see <https://mit-license.org/>.
use std::{sync::Arc, time::Duration};
use async_broadcast::Receiver;
use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use hotshot::traits::TestableNodeImplementation;
use hotshot_types::traits::node_implementation::{NodeType, Versions};
use rand::thread_rng;
use snafu::Snafu;
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
use crate::{test_runner::Node, test_task::TestEvent};
// the obvious idea here is to pass in a "stream" that completes every `n` seconds
// the stream construction can definitely be fancier but that's the baseline idea
/// Data Availability task error
#[derive(Snafu, Debug)]
pub struct TxnTaskErr {}
/// state of task that decides when things are completed
pub struct TxnTask<TYPES: NodeType, I: TestableNodeImplementation<TYPES>, V: Versions> {
// TODO should this be in a rwlock? Or maybe a similar abstraction to the registry is in order
/// Handles for all nodes.
pub handles: Arc<RwLock<Vec<Node<TYPES, I, V>>>>,
/// Optional index of the next node.
pub next_node_idx: Option<usize>,
/// time to wait between txns
pub duration: Duration,
/// Receiver for the shutdown signal from the testing harness
pub shutdown_chan: Receiver<TestEvent>,
}
impl<TYPES: NodeType, I: TestableNodeImplementation<TYPES>, V: Versions> TxnTask<TYPES, I, V> {
pub fn run(mut self) -> JoinHandle<()> {
async_spawn(async move {
async_sleep(Duration::from_millis(100)).await;
loop {
async_sleep(self.duration).await;
if let Ok(TestEvent::Shutdown) = self.shutdown_chan.try_recv() {
break;
}
self.submit_tx().await;
}
})
}
async fn submit_tx(&mut self) {
if let Some(idx) = self.next_node_idx {
let handles = &self.handles.read().await;
// submit to idx handle
// increment state
self.next_node_idx = Some((idx + 1) % handles.len());
match handles.get(idx) {
None => {
tracing::error!("couldn't get node in txn task");
// should do error
unimplemented!()
}
Some(node) => {
// use rand::seq::IteratorRandom;
// we're assuming all nodes have the same leaf.
// If they don't match, this is probably fine since
// it should be caught by an assertion (and the txn will be rejected anyway)
let leaf = node.handle.decided_leaf().await;
let txn = I::leaf_create_random_transaction(&leaf, &mut thread_rng(), 0);
node.handle
.submit_transaction(txn.clone())
.await
.expect("Could not send transaction");
}
}
}
}
}
/// build the transaction task
#[derive(Clone, Debug)]
pub enum TxnTaskDescription {
/// submit transactions in a round robin style using
/// every `Duration` seconds
RoundRobinTimeBased(Duration),
/// TODO
DistributionBased, // others?
}