1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
// This file is part of the HotShot repository.

// You should have received a copy of the MIT License
// along with the HotShot repository. If not, see <https://mit-license.org/>.

use std::{sync::Arc, time::Duration};

use async_broadcast::Receiver;
use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use hotshot::traits::TestableNodeImplementation;
use hotshot_types::traits::node_implementation::{NodeType, Versions};
use rand::thread_rng;
use snafu::Snafu;
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;

use crate::{test_runner::Node, test_task::TestEvent};

// the obvious idea here is to pass in a "stream" that completes every `n` seconds
// the stream construction can definitely be fancier but that's the baseline idea

/// Data Availability task error
#[derive(Snafu, Debug)]
pub struct TxnTaskErr {}

/// state of task that decides when things are completed
pub struct TxnTask<TYPES: NodeType, I: TestableNodeImplementation<TYPES>, V: Versions> {
    // TODO should this be in a rwlock? Or maybe a similar abstraction to the registry is in order
    /// Handles for all nodes.
    pub handles: Arc<RwLock<Vec<Node<TYPES, I, V>>>>,
    /// Optional index of the next node.
    pub next_node_idx: Option<usize>,
    /// time to wait between txns
    pub duration: Duration,
    /// Receiver for the shutdown signal from the testing harness
    pub shutdown_chan: Receiver<TestEvent>,
}

impl<TYPES: NodeType, I: TestableNodeImplementation<TYPES>, V: Versions> TxnTask<TYPES, I, V> {
    pub fn run(mut self) -> JoinHandle<()> {
        async_spawn(async move {
            async_sleep(Duration::from_millis(100)).await;
            loop {
                async_sleep(self.duration).await;
                if let Ok(TestEvent::Shutdown) = self.shutdown_chan.try_recv() {
                    break;
                }
                self.submit_tx().await;
            }
        })
    }
    async fn submit_tx(&mut self) {
        if let Some(idx) = self.next_node_idx {
            let handles = &self.handles.read().await;
            // submit to idx handle
            // increment state
            self.next_node_idx = Some((idx + 1) % handles.len());
            match handles.get(idx) {
                None => {
                    tracing::error!("couldn't get node in txn task");
                    // should do error
                    unimplemented!()
                }
                Some(node) => {
                    // use rand::seq::IteratorRandom;
                    // we're assuming all nodes have the same leaf.
                    // If they don't match, this is probably fine since
                    // it should be caught by an assertion (and the txn will be rejected anyway)
                    let leaf = node.handle.decided_leaf().await;
                    let txn = I::leaf_create_random_transaction(&leaf, &mut thread_rng(), 0);
                    node.handle
                        .submit_transaction(txn.clone())
                        .await
                        .expect("Could not send transaction");
                }
            }
        }
    }
}

/// build the transaction task
#[derive(Clone, Debug)]
pub enum TxnTaskDescription {
    /// submit transactions in a round robin style using
    /// every `Duration` seconds
    RoundRobinTimeBased(Duration),
    /// TODO
    DistributionBased, // others?
}