Compare commits
1 Commits
fix/doclink
...
hive-mind
| Author | SHA1 | Date | |
|---|---|---|---|
| 0877b6edae |
Generated
+19
@@ -505,6 +505,22 @@ dependencies = [
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "chitchat"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fec1bff4fabc59f0b7408478183a796ab0a9bcb0534bb413f3cf66f44ce2aca1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "chrono"
|
||||
version = "0.4.23"
|
||||
@@ -2911,6 +2927,7 @@ name = "mixnode-common"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"chitchat",
|
||||
"futures",
|
||||
"humantime-serde",
|
||||
"log",
|
||||
@@ -2926,6 +2943,7 @@ dependencies = [
|
||||
"nym-task",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
@@ -3491,6 +3509,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"atty",
|
||||
"bs58",
|
||||
"chitchat",
|
||||
"clap 4.1.4",
|
||||
"colored",
|
||||
"cupid",
|
||||
|
||||
@@ -13,12 +13,20 @@ humantime-serde = "1.0"
|
||||
log = { workspace = true }
|
||||
rand = "0.8"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
tokio = { version = "1.24.1", features = ["time", "macros", "rt", "net", "io-util"] }
|
||||
tokio = { version = "1.24.1", features = [
|
||||
"time",
|
||||
"macros",
|
||||
"rt",
|
||||
"net",
|
||||
"io-util",
|
||||
] }
|
||||
tokio-util = { version = "0.7.4", features = ["codec"] }
|
||||
url = "2.2"
|
||||
thiserror = "1.0.37"
|
||||
chitchat = "0.5"
|
||||
serde_json = "1"
|
||||
|
||||
nym-crypto = { path = "../crypto" }
|
||||
nym-crypto = { path = "../crypto" }
|
||||
nym-network-defaults = { path = "../network-defaults" }
|
||||
nym-sphinx-acknowledgements = { path = "../nymsphinx/acknowledgements" }
|
||||
nym-sphinx-addressing = { path = "../nymsphinx/addressing" }
|
||||
@@ -27,5 +35,7 @@ nym-sphinx-framing = { path = "../nymsphinx/framing" }
|
||||
nym-sphinx-params = { path = "../nymsphinx/params" }
|
||||
nym-sphinx-types = { path = "../nymsphinx/types" }
|
||||
nym-task = { path = "../task" }
|
||||
validator-client = { path = "../client-libs/validator-client", features = ["nyxd-client"]}
|
||||
validator-client = { path = "../client-libs/validator-client", features = [
|
||||
"nyxd-client",
|
||||
] }
|
||||
nym-bin-common = { path = "../bin-common" }
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
use crate::verloc::listener::PacketListener;
|
||||
use crate::verloc::sender::{PacketSender, TestedNode};
|
||||
use chitchat::Chitchat;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
@@ -16,6 +17,7 @@ use std::net::SocketAddr;
|
||||
use std::net::ToSocketAddrs;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::sleep;
|
||||
use url::Url;
|
||||
@@ -293,7 +295,7 @@ impl VerlocMeasurer {
|
||||
MeasurementOutcome::Done
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) {
|
||||
pub async fn run(&mut self, chitchat_handle: Arc<Mutex<Chitchat>>) {
|
||||
self.start_listening();
|
||||
|
||||
while !self.shutdown_listener.is_shutdown() {
|
||||
@@ -345,6 +347,17 @@ impl VerlocMeasurer {
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut chitchat_guard = chitchat_handle.lock().await;
|
||||
let cc_state = chitchat_guard.self_node_state();
|
||||
let tested_nodes_cc = tested_nodes
|
||||
.iter()
|
||||
.map(|node| node.identity.to_string())
|
||||
.collect::<Vec<String>>();
|
||||
cc_state.set(
|
||||
"tested_nodes",
|
||||
serde_json::to_value(tested_nodes_cc).expect("Could not serialize"),
|
||||
);
|
||||
|
||||
// on start of each run remove old results
|
||||
self.results.reset_results(tested_nodes.len()).await;
|
||||
|
||||
|
||||
@@ -377,6 +377,7 @@ pub const DEFAULT_CLIENT_LISTENING_PORT: u16 = 9000;
|
||||
// 'MIXNODE'
|
||||
pub const DEFAULT_VERLOC_LISTENING_PORT: u16 = 1790;
|
||||
pub const DEFAULT_HTTP_API_LISTENING_PORT: u16 = 8000;
|
||||
pub const DEFAULT_GOSSIP_PORT: u16 = 10000;
|
||||
|
||||
// 'CLIENT'
|
||||
pub const DEFAULT_WEBSOCKET_LISTENING_PORT: u16 = 1977;
|
||||
|
||||
+21
-13
@@ -29,32 +29,40 @@ log = { workspace = true }
|
||||
pretty_env_logger = "0.4.0"
|
||||
rand = "0.7.3"
|
||||
rocket = { version = "0.5.0-rc.2", features = ["json"] }
|
||||
serde = { version="1.0", features = ["derive"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
sysinfo = "0.27.7"
|
||||
tokio = { version="1.21.2", features = ["rt-multi-thread", "net", "signal"] }
|
||||
tokio-util = { version="0.7.3", features = ["codec"] }
|
||||
tokio = { version = "1.21.2", features = ["rt-multi-thread", "net", "signal"] }
|
||||
tokio-util = { version = "0.7.3", features = ["codec"] }
|
||||
toml = "0.5.8"
|
||||
url = { version = "2.2", features = ["serde"] }
|
||||
atty = "0.2"
|
||||
|
||||
## internal
|
||||
nym-config = { path="../common/config" }
|
||||
nym-crypto = { path="../common/crypto" }
|
||||
nym-config = { path = "../common/config" }
|
||||
nym-crypto = { path = "../common/crypto" }
|
||||
nym-contracts-common = { path = "../common/cosmwasm-smart-contracts/contracts-common" }
|
||||
mixnet-client = { path="../common/client-libs/mixnet-client" }
|
||||
mixnode-common = { path="../common/mixnode-common" }
|
||||
nym-nonexhaustive-delayqueue = { path="../common/nonexhaustive-delayqueue" }
|
||||
nym-sphinx = { path="../common/nymsphinx" }
|
||||
mixnet-client = { path = "../common/client-libs/mixnet-client" }
|
||||
mixnode-common = { path = "../common/mixnode-common" }
|
||||
nym-nonexhaustive-delayqueue = { path = "../common/nonexhaustive-delayqueue" }
|
||||
nym-sphinx = { path = "../common/nymsphinx" }
|
||||
nym-pemstore = { path = "../common/pemstore", version = "0.2.0" }
|
||||
nym-task = { path = "../common/task" }
|
||||
nym-types = { path = "../common/types" }
|
||||
nym-topology = { path="../common/topology" }
|
||||
validator-client = { path="../common/client-libs/validator-client" }
|
||||
nym-bin-common = { path="../common/bin-common" }
|
||||
nym-topology = { path = "../common/topology" }
|
||||
validator-client = { path = "../common/client-libs/validator-client" }
|
||||
nym-bin-common = { path = "../common/bin-common" }
|
||||
|
||||
# chichat
|
||||
chitchat = "0.5"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version="1.21.2", features = ["rt-multi-thread", "net", "signal", "test-util"] }
|
||||
tokio = { version = "1.21.2", features = [
|
||||
"rt-multi-thread",
|
||||
"net",
|
||||
"signal",
|
||||
"test-util",
|
||||
] }
|
||||
|
||||
nym-sphinx-types = { path = "../common/nymsphinx/types" }
|
||||
nym-sphinx-params = { path = "../common/nymsphinx/params" }
|
||||
|
||||
@@ -109,5 +109,5 @@ pub(crate) async fn execute(args: &Run, output: OutputFormat) {
|
||||
Select the correct version and install it to your machine. You will need to provide the following: \n ");
|
||||
mixnode.print_node_details(output);
|
||||
|
||||
mixnode.run().await
|
||||
mixnode.run().await.expect("Could not run mixnode")
|
||||
}
|
||||
|
||||
@@ -4,7 +4,8 @@
|
||||
use crate::config::template::config_template;
|
||||
use nym_config::defaults::mainnet::NYM_API;
|
||||
use nym_config::defaults::{
|
||||
DEFAULT_HTTP_API_LISTENING_PORT, DEFAULT_MIX_LISTENING_PORT, DEFAULT_VERLOC_LISTENING_PORT,
|
||||
DEFAULT_GOSSIP_PORT, DEFAULT_HTTP_API_LISTENING_PORT, DEFAULT_MIX_LISTENING_PORT,
|
||||
DEFAULT_VERLOC_LISTENING_PORT,
|
||||
};
|
||||
use nym_config::NymConfig;
|
||||
use serde::{Deserialize, Deserializer, Serialize};
|
||||
@@ -258,6 +259,10 @@ impl Config {
|
||||
self.mixnode.http_api_port
|
||||
}
|
||||
|
||||
pub fn get_gossip_port(&self) -> u16 {
|
||||
self.mixnode.gossip_port
|
||||
}
|
||||
|
||||
pub fn get_packet_forwarding_initial_backoff(&self) -> Duration {
|
||||
self.debug.packet_forwarding_initial_backoff
|
||||
}
|
||||
@@ -348,6 +353,10 @@ struct MixNode {
|
||||
#[serde(default = "default_http_api_port")]
|
||||
http_api_port: u16,
|
||||
|
||||
/// Port used for the gossip protocol
|
||||
#[serde(default = "default_http_api_port")]
|
||||
gossip_port: u16,
|
||||
|
||||
/// Path to file containing private identity key.
|
||||
#[serde(default = "missing_string_value")]
|
||||
private_identity_key_file: PathBuf,
|
||||
@@ -402,6 +411,7 @@ impl Default for MixNode {
|
||||
mix_port: DEFAULT_MIX_LISTENING_PORT,
|
||||
verloc_port: DEFAULT_VERLOC_LISTENING_PORT,
|
||||
http_api_port: DEFAULT_HTTP_API_LISTENING_PORT,
|
||||
gossip_port: DEFAULT_GOSSIP_PORT,
|
||||
private_identity_key_file: Default::default(),
|
||||
public_identity_key_file: Default::default(),
|
||||
private_sphinx_key_file: Default::default(),
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use chitchat::{Chitchat, ClusterStateSnapshot, NodeId};
|
||||
use rocket::serde::json::Json;
|
||||
use rocket::State;
|
||||
use serde::Serialize;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct ClusterState {
|
||||
cluster_id: String,
|
||||
cluster_state: ClusterStateSnapshot,
|
||||
live_nodes: Vec<NodeId>,
|
||||
dead_nodes: Vec<NodeId>,
|
||||
}
|
||||
|
||||
/// Returns a description of the node and why someone might want to delegate stake to it.
|
||||
#[get("/state")]
|
||||
pub(crate) async fn state(chitchat: &State<Arc<Mutex<Chitchat>>>) -> Json<ClusterState> {
|
||||
let chitchat_guard = chitchat.lock().await;
|
||||
let cluster_state = ClusterState {
|
||||
cluster_id: chitchat_guard.cluster_id().to_string(),
|
||||
cluster_state: chitchat_guard.state_snapshot(),
|
||||
live_nodes: chitchat_guard.live_nodes().cloned().collect::<Vec<_>>(),
|
||||
dead_nodes: chitchat_guard.dead_nodes().cloned().collect::<Vec<_>>(),
|
||||
};
|
||||
Json(cluster_state)
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
pub(crate) mod description;
|
||||
pub(crate) mod gossip;
|
||||
pub(crate) mod hardware;
|
||||
pub(crate) mod stats;
|
||||
pub(crate) mod verloc;
|
||||
|
||||
+67
-12
@@ -5,6 +5,7 @@ use crate::config::persistence::pathfinder::MixNodePathfinder;
|
||||
use crate::config::Config;
|
||||
use crate::node::http::{
|
||||
description::description,
|
||||
gossip,
|
||||
hardware::hardware,
|
||||
not_found,
|
||||
stats::stats,
|
||||
@@ -17,6 +18,10 @@ use crate::node::node_description::NodeDescription;
|
||||
use crate::node::node_statistics::SharedNodeStats;
|
||||
use crate::node::packet_delayforwarder::{DelayForwarder, PacketDelayForwardSender};
|
||||
use crate::OutputFormat;
|
||||
use chitchat::transport::UdpTransport;
|
||||
use chitchat::{
|
||||
spawn_chitchat, Chitchat, ChitchatConfig, ChitchatHandle, FailureDetectorConfig, NodeId,
|
||||
};
|
||||
use log::{error, info, warn};
|
||||
use mixnode_common::verloc::{self, AtomicVerlocResult, VerlocMeasurer};
|
||||
use nym_bin_common::version_checker::parse_version;
|
||||
@@ -28,6 +33,8 @@ use rand::thread_rng;
|
||||
use std::net::SocketAddr;
|
||||
use std::process;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
mod http;
|
||||
mod listener;
|
||||
@@ -110,6 +117,7 @@ impl MixNode {
|
||||
&self,
|
||||
atomic_verloc_result: AtomicVerlocResult,
|
||||
node_stats_pointer: SharedNodeStats,
|
||||
chitchat: Arc<Mutex<Chitchat>>,
|
||||
) {
|
||||
info!("Starting HTTP API on http://localhost:8000");
|
||||
|
||||
@@ -125,11 +133,15 @@ impl MixNode {
|
||||
tokio::spawn(async move {
|
||||
rocket::build()
|
||||
.configure(config)
|
||||
.mount("/", routes![verlocRoute, description, stats, hardware])
|
||||
.mount(
|
||||
"/",
|
||||
routes![verlocRoute, description, stats, hardware, gossip::state],
|
||||
)
|
||||
.register("/", catchers![not_found])
|
||||
.manage(verloc_state)
|
||||
.manage(descriptor)
|
||||
.manage(node_stats_pointer)
|
||||
.manage(chitchat)
|
||||
.launch()
|
||||
.await
|
||||
});
|
||||
@@ -199,7 +211,11 @@ impl MixNode {
|
||||
packet_sender
|
||||
}
|
||||
|
||||
fn start_verloc_measurements(&self, shutdown: TaskClient) -> AtomicVerlocResult {
|
||||
fn start_verloc_measurements(
|
||||
&self,
|
||||
chitchat: Arc<Mutex<Chitchat>>,
|
||||
shutdown: TaskClient,
|
||||
) -> AtomicVerlocResult {
|
||||
info!("Starting the round-trip-time measurer...");
|
||||
|
||||
// this is a sanity check to make sure we didn't mess up with the minimum version at some point
|
||||
@@ -235,7 +251,7 @@ impl MixNode {
|
||||
let mut verloc_measurer =
|
||||
VerlocMeasurer::new(config, Arc::clone(&self.identity_keypair), shutdown);
|
||||
let atomic_verloc_results = verloc_measurer.get_verloc_results_pointer();
|
||||
tokio::spawn(async move { verloc_measurer.run().await });
|
||||
tokio::spawn(async move { verloc_measurer.run(chitchat).await });
|
||||
atomic_verloc_results
|
||||
}
|
||||
|
||||
@@ -277,18 +293,53 @@ impl MixNode {
|
||||
log::info!("Stopping nym mixnode");
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) {
|
||||
fn gossip_node_id(&self) -> NodeId {
|
||||
NodeId::new(
|
||||
self.identity_keypair.public_key().to_string(),
|
||||
(
|
||||
self.config.get_listening_address(),
|
||||
self.config.get_gossip_port(),
|
||||
)
|
||||
.into(),
|
||||
)
|
||||
}
|
||||
|
||||
fn gossip_config(&self) -> ChitchatConfig {
|
||||
ChitchatConfig {
|
||||
node_id: self.gossip_node_id(),
|
||||
cluster_id: "hive_mind".to_string(),
|
||||
gossip_interval: Duration::from_millis(500),
|
||||
listen_addr: (
|
||||
self.config.get_listening_address(),
|
||||
self.config.get_gossip_port(),
|
||||
)
|
||||
.into(),
|
||||
// We'd probably wanna get this from the nym_api, or have a few known good nodes, the ones we run probably
|
||||
seed_nodes: vec![],
|
||||
failure_detector_config: FailureDetectorConfig::default(),
|
||||
is_ready_predicate: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn init_chitchat(&self) -> Result<ChitchatHandle, String> {
|
||||
let chitchat_handler = spawn_chitchat(self.gossip_config(), Vec::new(), &UdpTransport)
|
||||
.await
|
||||
.map_err(|_| "Could not spawn chitchat")?;
|
||||
|
||||
Ok(chitchat_handler)
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) -> Result<(), String> {
|
||||
info!("Starting nym mixnode");
|
||||
|
||||
if let Some(duplicate_node_key) = self.check_if_same_ip_node_exists().await {
|
||||
if duplicate_node_key == self.identity_keypair.public_key().to_base58_string() {
|
||||
warn!("You seem to have bonded your mixnode before starting it - that's highly unrecommended as in the future it might result in slashing");
|
||||
} else {
|
||||
log::error!(
|
||||
"Our announce-host is identical to an existing node's announce-host! (its key is {:?})",
|
||||
duplicate_node_key
|
||||
);
|
||||
return;
|
||||
let err = format!("Our announce-host is identical to an existing node's announce-host! (its key is {:?})",
|
||||
duplicate_node_key);
|
||||
log::error!("{err}");
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -303,14 +354,18 @@ impl MixNode {
|
||||
delay_forwarding_channel,
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
let atomic_verloc_results = self.start_verloc_measurements(shutdown.subscribe());
|
||||
let chitchat_handler = self.init_chitchat().await?;
|
||||
let chitchat = chitchat_handler.chitchat();
|
||||
let atomic_verloc_results =
|
||||
self.start_verloc_measurements(chitchat.clone(), shutdown.subscribe());
|
||||
|
||||
// Rocket handles shutdown on it's own, but its shutdown handling should be incorporated
|
||||
// with that of the rest of the tasks.
|
||||
// Currently it's runtime is forcefully terminated once the mixnode exits.
|
||||
self.start_http_api(atomic_verloc_results, node_stats_pointer);
|
||||
self.start_http_api(atomic_verloc_results, node_stats_pointer, chitchat);
|
||||
|
||||
info!("Finished nym mixnode startup procedure - it should now be able to receive mix traffic!");
|
||||
self.wait_for_interrupt(shutdown).await
|
||||
self.wait_for_interrupt(shutdown).await;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user