1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
// This file is part of the HotShot repository.

// You should have received a copy of the MIT License
// along with the HotShot repository. If not, see <https://mit-license.org/>.

use std::{
    collections::{BTreeMap, HashMap},
    sync::Arc,
};

use anyhow::Result;
use async_broadcast::broadcast;
use async_lock::RwLock;
use async_trait::async_trait;
use futures::future::join_all;
use hotshot::{
    traits::TestableNodeImplementation, types::EventType, HotShotInitializer, SystemContext,
};
use hotshot_example_types::{
    auction_results_provider_types::TestAuctionResultsProvider,
    block_types::TestBlockHeader,
    state_types::{TestInstanceState, TestValidatedState},
    storage_types::TestStorage,
    testable_delay::DelayConfig,
};
use hotshot_types::{
    constants::EVENT_CHANNEL_SIZE,
    data::Leaf,
    event::Event,
    simple_certificate::QuorumCertificate,
    traits::{
        network::ConnectedNetwork,
        node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
    },
    vote::HasViewNumber,
    ValidatorConfig,
};
use snafu::Snafu;

use crate::{
    test_runner::{LateNodeContext, LateNodeContextParameters, LateStartNode, Node, TestRunner},
    test_task::{TestResult, TestTaskState},
};

/// convience type for state and block
pub type StateAndBlock<S, B> = (Vec<S>, Vec<B>);

/// error for the spinning task
#[derive(Snafu, Debug)]
pub struct SpinningTaskErr {}

/// Spinning task state
pub struct SpinningTask<
    TYPES: NodeType,
    N: ConnectedNetwork<TYPES::SignatureKey>,
    I: TestableNodeImplementation<TYPES>,
    V: Versions,
> {
    /// handle to the nodes
    pub(crate) handles: Arc<RwLock<Vec<Node<TYPES, I, V>>>>,
    /// late start nodes
    pub(crate) late_start: HashMap<u64, LateStartNode<TYPES, I, V>>,
    /// time based changes
    pub(crate) changes: BTreeMap<TYPES::Time, Vec<ChangeNode>>,
    /// most recent view seen by spinning task
    pub(crate) latest_view: Option<TYPES::Time>,
    /// Last decided leaf that can be used as the anchor leaf to initialize the node.
    pub(crate) last_decided_leaf: Leaf<TYPES>,
    /// Highest qc seen in the test for restarting nodes
    pub(crate) high_qc: QuorumCertificate<TYPES>,
    /// Add specified delay to async calls
    pub(crate) async_delay_config: DelayConfig,
    /// Context stored for nodes to be restarted with
    pub(crate) restart_contexts: HashMap<usize, RestartContext<TYPES, N, I, V>>,
}

#[async_trait]
impl<
        TYPES: NodeType<
            InstanceState = TestInstanceState,
            ValidatedState = TestValidatedState,
            BlockHeader = TestBlockHeader,
        >,
        I: TestableNodeImplementation<TYPES>,
        N: ConnectedNetwork<TYPES::SignatureKey>,
        V: Versions,
    > TestTaskState for SpinningTask<TYPES, N, I, V>
where
    I: TestableNodeImplementation<TYPES>,
    I: NodeImplementation<
        TYPES,
        Network = N,
        Storage = TestStorage<TYPES>,
        AuctionResultsProvider = TestAuctionResultsProvider<TYPES>,
    >,
{
    type Event = Event<TYPES>;

    async fn handle_event(&mut self, (message, _id): (Self::Event, usize)) -> Result<()> {
        let Event { view_number, event } = message;

        if let EventType::Decide {
            leaf_chain,
            qc: _,
            block_size: _,
        } = event
        {
            let leaf = leaf_chain.first().unwrap().leaf.clone();
            if leaf.view_number() > self.last_decided_leaf.view_number() {
                self.last_decided_leaf = leaf;
            }
        } else if let EventType::QuorumProposal {
            proposal,
            sender: _,
        } = event
        {
            if proposal.data.justify_qc.view_number() > self.high_qc.view_number() {
                self.high_qc = proposal.data.justify_qc.clone();
            }
        }

        let mut new_nodes = vec![];
        let mut new_networks = vec![];
        // if we have not seen this view before
        if self.latest_view.is_none() || view_number > self.latest_view.unwrap() {
            // perform operations on the nodes
            if let Some(operations) = self.changes.remove(&view_number) {
                for ChangeNode { idx, updown } in operations {
                    match updown {
                        NodeAction::Up => {
                            let node_id = idx.try_into().unwrap();
                            if let Some(node) = self.late_start.remove(&node_id) {
                                tracing::error!("Node {} spinning up late", idx);
                                let node_id = idx.try_into().unwrap();
                                let context = match node.context {
                                    LateNodeContext::InitializedContext(context) => context,
                                    // Node not initialized. Initialize it
                                    // based on the received leaf.
                                    LateNodeContext::UninitializedContext(late_context_params) => {
                                        // We'll deconstruct the individual terms here.
                                        let LateNodeContextParameters {
                                            storage,
                                            memberships,
                                            config,
                                            marketplace_config,
                                        } = late_context_params;

                                        let initializer = HotShotInitializer::<TYPES>::from_reload(
                                            self.last_decided_leaf.clone(),
                                            TestInstanceState::new(self.async_delay_config.clone()),
                                            None,
                                            TYPES::Time::genesis(),
                                            TYPES::Time::genesis(),
                                            BTreeMap::new(),
                                            self.high_qc.clone(),
                                            Vec::new(),
                                            BTreeMap::new(),
                                        );
                                        // We assign node's public key and stake value rather than read from config file since it's a test
                                        let validator_config =
                                            ValidatorConfig::generated_from_seed_indexed(
                                                [0u8; 32],
                                                node_id,
                                                1,
                                                // For tests, make the node DA based on its index
                                                node_id < config.da_staked_committee_size as u64,
                                            );
                                        TestRunner::add_node_with_config(
                                            node_id,
                                            node.network.clone(),
                                            memberships,
                                            initializer,
                                            config,
                                            validator_config,
                                            storage,
                                            marketplace_config,
                                        )
                                        .await
                                    }
                                    LateNodeContext::Restart => {
                                        panic!("Cannot spin up a node with Restart context")
                                    }
                                };

                                let handle = context.run_tasks().await;

                                // Create the node and add it to the state, so we can shut them
                                // down properly later to avoid the overflow error in the overall
                                // safety task.
                                let node = Node {
                                    node_id,
                                    network: node.network,
                                    handle,
                                };
                                node.handle.hotshot.start_consensus().await;

                                self.handles.write().await.push(node);
                            }
                        }
                        NodeAction::Down => {
                            if let Some(node) = self.handles.write().await.get_mut(idx) {
                                tracing::error!("Node {} shutting down", idx);
                                node.handle.shut_down().await;
                            }
                        }
                        NodeAction::RestartDown(delay_views) => {
                            let node_id = idx.try_into().unwrap();
                            if let Some(node) = self.handles.write().await.get_mut(idx) {
                                tracing::error!("Node {} shutting down", idx);
                                node.handle.shut_down().await;

                                let Some(LateStartNode {
                                    network,
                                    context: LateNodeContext::Restart,
                                }) = self.late_start.get(&node_id)
                                else {
                                    panic!("Restated Nodes must have an unitialized context");
                                };

                                let storage = node.handle.storage().clone();
                                let memberships = node.handle.memberships.clone();
                                let config = node.handle.hotshot.config.clone();
                                let marketplace_config =
                                    node.handle.hotshot.marketplace_config.clone();
                                let read_storage = storage.read().await;
                                let initializer = HotShotInitializer::<TYPES>::from_reload(
                                    self.last_decided_leaf.clone(),
                                    TestInstanceState::new(self.async_delay_config.clone()),
                                    None,
                                    read_storage.last_actioned_view().await,
                                    read_storage.last_actioned_view().await,
                                    read_storage.proposals_cloned().await,
                                    read_storage.high_qc_cloned().await.unwrap_or(
                                        QuorumCertificate::genesis::<V>(
                                            &TestValidatedState::default(),
                                            &TestInstanceState::default(),
                                        )
                                        .await,
                                    ),
                                    Vec::new(),
                                    BTreeMap::new(),
                                );
                                // We assign node's public key and stake value rather than read from config file since it's a test
                                let validator_config = ValidatorConfig::generated_from_seed_indexed(
                                    [0u8; 32],
                                    node_id,
                                    1,
                                    // For tests, make the node DA based on its index
                                    node_id < config.da_staked_committee_size as u64,
                                );
                                let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
                                let context =
                                    TestRunner::<TYPES, I, V, N>::add_node_with_config_and_channels(
                                        node_id,
                                        network.clone(),
                                        (*memberships).clone(),
                                        initializer,
                                        config,
                                        validator_config,
                                        (*read_storage).clone(),
                                        marketplace_config.clone(),
                                        internal_chan,
                                        (
                                            node.handle.external_channel_sender(),
                                            node.handle.event_stream_known_impl().new_receiver(),
                                        ),
                                    )
                                    .await;
                                if delay_views == 0 {
                                    new_nodes.push((context, idx));
                                    new_networks.push(network.clone());
                                } else {
                                    let up_view = view_number + delay_views;
                                    let change = ChangeNode {
                                        idx,
                                        updown: NodeAction::RestartUp,
                                    };
                                    self.changes.entry(up_view).or_default().push(change);
                                    let new_ctx = RestartContext {
                                        context,
                                        network: network.clone(),
                                    };
                                    self.restart_contexts.insert(idx, new_ctx);
                                }
                            }
                        }
                        NodeAction::RestartUp => {
                            if let Some(ctx) = self.restart_contexts.remove(&idx) {
                                new_nodes.push((ctx.context, idx));
                                new_networks.push(ctx.network.clone());
                            }
                        }
                        NodeAction::NetworkUp => {
                            if let Some(handle) = self.handles.write().await.get(idx) {
                                tracing::error!("Node {} networks resuming", idx);
                                handle.network.resume();
                            }
                        }
                        NodeAction::NetworkDown => {
                            if let Some(handle) = self.handles.write().await.get(idx) {
                                tracing::error!("Node {} networks pausing", idx);
                                handle.network.pause();
                            }
                        }
                    }
                }
            }
            let mut ready_futs = vec![];
            while let Some(net) = new_networks.pop() {
                ready_futs.push(async move {
                    net.wait_for_ready().await;
                });
            }
            join_all(ready_futs).await;

            while let Some((node, id)) = new_nodes.pop() {
                tracing::error!("Starting node {} back up", id);
                let handle = node.run_tasks().await;

                // Create the node and add it to the state, so we can shut them
                // down properly later to avoid the overflow error in the overall
                // safety task.
                let node = Node {
                    node_id: id.try_into().unwrap(),
                    network: node.network.clone(),
                    handle,
                };
                node.handle.hotshot.start_consensus().await;

                self.handles.write().await[id] = node;
            }
            // update our latest view
            self.latest_view = Some(view_number);
        }

        Ok(())
    }

    async fn check(&self) -> TestResult {
        TestResult::Pass
    }
}

#[derive(Clone)]
pub(crate) struct RestartContext<
    TYPES: NodeType,
    N: ConnectedNetwork<TYPES::SignatureKey>,
    I: TestableNodeImplementation<TYPES>,
    V: Versions,
> {
    context: Arc<SystemContext<TYPES, I, V>>,
    network: Arc<N>,
}

/// Spin the node up or down
#[derive(Clone, Debug)]
pub enum NodeAction {
    /// spin the node up
    Up,
    /// spin the node down
    Down,
    /// spin the node's network up
    NetworkUp,
    /// spin the node's network down
    NetworkDown,
    /// Take a node down to be restarted after a number of views
    RestartDown(u64),
    /// Start a node up again after it's been shutdown for restart.  This
    /// should only be created following a `ResartDown`
    RestartUp,
}

/// denotes a change in node state
#[derive(Clone, Debug)]
pub struct ChangeNode {
    /// the index of the node
    pub idx: usize,
    /// spin the node or node's network up or down
    pub updown: NodeAction,
}

/// description of the spinning task
/// (used to build a spinning task)
#[derive(Clone, Debug)]
pub struct SpinningTaskDescription {
    /// the changes in node status, time -> changes
    pub node_changes: Vec<(u64, Vec<ChangeNode>)>,
}