Compare commits

...

1 Commits

Author SHA1 Message Date
durch 0877b6edae Add gossip to mixnodes 2023-03-17 11:49:52 +01:00
10 changed files with 176 additions and 31 deletions
Generated
+19
View File
@@ -505,6 +505,22 @@ dependencies = [
"zeroize",
]
[[package]]
name = "chitchat"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fec1bff4fabc59f0b7408478183a796ab0a9bcb0534bb413f3cf66f44ce2aca1"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"rand 0.8.5",
"serde",
"tokio",
"tokio-stream",
"tracing",
]
[[package]]
name = "chrono"
version = "0.4.23"
@@ -2911,6 +2927,7 @@ name = "mixnode-common"
version = "0.1.0"
dependencies = [
"bytes",
"chitchat",
"futures",
"humantime-serde",
"log",
@@ -2926,6 +2943,7 @@ dependencies = [
"nym-task",
"rand 0.8.5",
"serde",
"serde_json",
"thiserror",
"tokio",
"tokio-util",
@@ -3491,6 +3509,7 @@ dependencies = [
"anyhow",
"atty",
"bs58",
"chitchat",
"clap 4.1.4",
"colored",
"cupid",
+13 -3
View File
@@ -13,12 +13,20 @@ humantime-serde = "1.0"
log = { workspace = true }
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.24.1", features = ["time", "macros", "rt", "net", "io-util"] }
tokio = { version = "1.24.1", features = [
"time",
"macros",
"rt",
"net",
"io-util",
] }
tokio-util = { version = "0.7.4", features = ["codec"] }
url = "2.2"
thiserror = "1.0.37"
chitchat = "0.5"
serde_json = "1"
nym-crypto = { path = "../crypto" }
nym-crypto = { path = "../crypto" }
nym-network-defaults = { path = "../network-defaults" }
nym-sphinx-acknowledgements = { path = "../nymsphinx/acknowledgements" }
nym-sphinx-addressing = { path = "../nymsphinx/addressing" }
@@ -27,5 +35,7 @@ nym-sphinx-framing = { path = "../nymsphinx/framing" }
nym-sphinx-params = { path = "../nymsphinx/params" }
nym-sphinx-types = { path = "../nymsphinx/types" }
nym-task = { path = "../task" }
validator-client = { path = "../client-libs/validator-client", features = ["nyxd-client"]}
validator-client = { path = "../client-libs/validator-client", features = [
"nyxd-client",
] }
nym-bin-common = { path = "../bin-common" }
+14 -1
View File
@@ -3,6 +3,7 @@
use crate::verloc::listener::PacketListener;
use crate::verloc::sender::{PacketSender, TestedNode};
use chitchat::Chitchat;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use log::*;
@@ -16,6 +17,7 @@ use std::net::SocketAddr;
use std::net::ToSocketAddrs;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use url::Url;
@@ -293,7 +295,7 @@ impl VerlocMeasurer {
MeasurementOutcome::Done
}
pub async fn run(&mut self) {
pub async fn run(&mut self, chitchat_handle: Arc<Mutex<Chitchat>>) {
self.start_listening();
while !self.shutdown_listener.is_shutdown() {
@@ -345,6 +347,17 @@ impl VerlocMeasurer {
})
.collect::<Vec<_>>();
let mut chitchat_guard = chitchat_handle.lock().await;
let cc_state = chitchat_guard.self_node_state();
let tested_nodes_cc = tested_nodes
.iter()
.map(|node| node.identity.to_string())
.collect::<Vec<String>>();
cc_state.set(
"tested_nodes",
serde_json::to_value(tested_nodes_cc).expect("Could not serialize"),
);
// on start of each run remove old results
self.results.reset_results(tested_nodes.len()).await;
+1
View File
@@ -377,6 +377,7 @@ pub const DEFAULT_CLIENT_LISTENING_PORT: u16 = 9000;
// 'MIXNODE'
pub const DEFAULT_VERLOC_LISTENING_PORT: u16 = 1790;
pub const DEFAULT_HTTP_API_LISTENING_PORT: u16 = 8000;
pub const DEFAULT_GOSSIP_PORT: u16 = 10000;
// 'CLIENT'
pub const DEFAULT_WEBSOCKET_LISTENING_PORT: u16 = 1977;
+21 -13
View File
@@ -29,32 +29,40 @@ log = { workspace = true }
pretty_env_logger = "0.4.0"
rand = "0.7.3"
rocket = { version = "0.5.0-rc.2", features = ["json"] }
serde = { version="1.0", features = ["derive"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sysinfo = "0.27.7"
tokio = { version="1.21.2", features = ["rt-multi-thread", "net", "signal"] }
tokio-util = { version="0.7.3", features = ["codec"] }
tokio = { version = "1.21.2", features = ["rt-multi-thread", "net", "signal"] }
tokio-util = { version = "0.7.3", features = ["codec"] }
toml = "0.5.8"
url = { version = "2.2", features = ["serde"] }
atty = "0.2"
## internal
nym-config = { path="../common/config" }
nym-crypto = { path="../common/crypto" }
nym-config = { path = "../common/config" }
nym-crypto = { path = "../common/crypto" }
nym-contracts-common = { path = "../common/cosmwasm-smart-contracts/contracts-common" }
mixnet-client = { path="../common/client-libs/mixnet-client" }
mixnode-common = { path="../common/mixnode-common" }
nym-nonexhaustive-delayqueue = { path="../common/nonexhaustive-delayqueue" }
nym-sphinx = { path="../common/nymsphinx" }
mixnet-client = { path = "../common/client-libs/mixnet-client" }
mixnode-common = { path = "../common/mixnode-common" }
nym-nonexhaustive-delayqueue = { path = "../common/nonexhaustive-delayqueue" }
nym-sphinx = { path = "../common/nymsphinx" }
nym-pemstore = { path = "../common/pemstore", version = "0.2.0" }
nym-task = { path = "../common/task" }
nym-types = { path = "../common/types" }
nym-topology = { path="../common/topology" }
validator-client = { path="../common/client-libs/validator-client" }
nym-bin-common = { path="../common/bin-common" }
nym-topology = { path = "../common/topology" }
validator-client = { path = "../common/client-libs/validator-client" }
nym-bin-common = { path = "../common/bin-common" }
# chichat
chitchat = "0.5"
[dev-dependencies]
tokio = { version="1.21.2", features = ["rt-multi-thread", "net", "signal", "test-util"] }
tokio = { version = "1.21.2", features = [
"rt-multi-thread",
"net",
"signal",
"test-util",
] }
nym-sphinx-types = { path = "../common/nymsphinx/types" }
nym-sphinx-params = { path = "../common/nymsphinx/params" }
+1 -1
View File
@@ -109,5 +109,5 @@ pub(crate) async fn execute(args: &Run, output: OutputFormat) {
Select the correct version and install it to your machine. You will need to provide the following: \n ");
mixnode.print_node_details(output);
mixnode.run().await
mixnode.run().await.expect("Could not run mixnode")
}
+11 -1
View File
@@ -4,7 +4,8 @@
use crate::config::template::config_template;
use nym_config::defaults::mainnet::NYM_API;
use nym_config::defaults::{
DEFAULT_HTTP_API_LISTENING_PORT, DEFAULT_MIX_LISTENING_PORT, DEFAULT_VERLOC_LISTENING_PORT,
DEFAULT_GOSSIP_PORT, DEFAULT_HTTP_API_LISTENING_PORT, DEFAULT_MIX_LISTENING_PORT,
DEFAULT_VERLOC_LISTENING_PORT,
};
use nym_config::NymConfig;
use serde::{Deserialize, Deserializer, Serialize};
@@ -258,6 +259,10 @@ impl Config {
self.mixnode.http_api_port
}
pub fn get_gossip_port(&self) -> u16 {
self.mixnode.gossip_port
}
pub fn get_packet_forwarding_initial_backoff(&self) -> Duration {
self.debug.packet_forwarding_initial_backoff
}
@@ -348,6 +353,10 @@ struct MixNode {
#[serde(default = "default_http_api_port")]
http_api_port: u16,
/// Port used for the gossip protocol
#[serde(default = "default_http_api_port")]
gossip_port: u16,
/// Path to file containing private identity key.
#[serde(default = "missing_string_value")]
private_identity_key_file: PathBuf,
@@ -402,6 +411,7 @@ impl Default for MixNode {
mix_port: DEFAULT_MIX_LISTENING_PORT,
verloc_port: DEFAULT_VERLOC_LISTENING_PORT,
http_api_port: DEFAULT_HTTP_API_LISTENING_PORT,
gossip_port: DEFAULT_GOSSIP_PORT,
private_identity_key_file: Default::default(),
public_identity_key_file: Default::default(),
private_sphinx_key_file: Default::default(),
+28
View File
@@ -0,0 +1,28 @@
use std::sync::Arc;
use chitchat::{Chitchat, ClusterStateSnapshot, NodeId};
use rocket::serde::json::Json;
use rocket::State;
use serde::Serialize;
use tokio::sync::Mutex;
#[derive(Serialize)]
pub struct ClusterState {
cluster_id: String,
cluster_state: ClusterStateSnapshot,
live_nodes: Vec<NodeId>,
dead_nodes: Vec<NodeId>,
}
/// Returns a description of the node and why someone might want to delegate stake to it.
#[get("/state")]
pub(crate) async fn state(chitchat: &State<Arc<Mutex<Chitchat>>>) -> Json<ClusterState> {
let chitchat_guard = chitchat.lock().await;
let cluster_state = ClusterState {
cluster_id: chitchat_guard.cluster_id().to_string(),
cluster_state: chitchat_guard.state_snapshot(),
live_nodes: chitchat_guard.live_nodes().cloned().collect::<Vec<_>>(),
dead_nodes: chitchat_guard.dead_nodes().cloned().collect::<Vec<_>>(),
};
Json(cluster_state)
}
+1
View File
@@ -1,4 +1,5 @@
pub(crate) mod description;
pub(crate) mod gossip;
pub(crate) mod hardware;
pub(crate) mod stats;
pub(crate) mod verloc;
+67 -12
View File
@@ -5,6 +5,7 @@ use crate::config::persistence::pathfinder::MixNodePathfinder;
use crate::config::Config;
use crate::node::http::{
description::description,
gossip,
hardware::hardware,
not_found,
stats::stats,
@@ -17,6 +18,10 @@ use crate::node::node_description::NodeDescription;
use crate::node::node_statistics::SharedNodeStats;
use crate::node::packet_delayforwarder::{DelayForwarder, PacketDelayForwardSender};
use crate::OutputFormat;
use chitchat::transport::UdpTransport;
use chitchat::{
spawn_chitchat, Chitchat, ChitchatConfig, ChitchatHandle, FailureDetectorConfig, NodeId,
};
use log::{error, info, warn};
use mixnode_common::verloc::{self, AtomicVerlocResult, VerlocMeasurer};
use nym_bin_common::version_checker::parse_version;
@@ -28,6 +33,8 @@ use rand::thread_rng;
use std::net::SocketAddr;
use std::process;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
mod http;
mod listener;
@@ -110,6 +117,7 @@ impl MixNode {
&self,
atomic_verloc_result: AtomicVerlocResult,
node_stats_pointer: SharedNodeStats,
chitchat: Arc<Mutex<Chitchat>>,
) {
info!("Starting HTTP API on http://localhost:8000");
@@ -125,11 +133,15 @@ impl MixNode {
tokio::spawn(async move {
rocket::build()
.configure(config)
.mount("/", routes![verlocRoute, description, stats, hardware])
.mount(
"/",
routes![verlocRoute, description, stats, hardware, gossip::state],
)
.register("/", catchers![not_found])
.manage(verloc_state)
.manage(descriptor)
.manage(node_stats_pointer)
.manage(chitchat)
.launch()
.await
});
@@ -199,7 +211,11 @@ impl MixNode {
packet_sender
}
fn start_verloc_measurements(&self, shutdown: TaskClient) -> AtomicVerlocResult {
fn start_verloc_measurements(
&self,
chitchat: Arc<Mutex<Chitchat>>,
shutdown: TaskClient,
) -> AtomicVerlocResult {
info!("Starting the round-trip-time measurer...");
// this is a sanity check to make sure we didn't mess up with the minimum version at some point
@@ -235,7 +251,7 @@ impl MixNode {
let mut verloc_measurer =
VerlocMeasurer::new(config, Arc::clone(&self.identity_keypair), shutdown);
let atomic_verloc_results = verloc_measurer.get_verloc_results_pointer();
tokio::spawn(async move { verloc_measurer.run().await });
tokio::spawn(async move { verloc_measurer.run(chitchat).await });
atomic_verloc_results
}
@@ -277,18 +293,53 @@ impl MixNode {
log::info!("Stopping nym mixnode");
}
pub async fn run(&mut self) {
fn gossip_node_id(&self) -> NodeId {
NodeId::new(
self.identity_keypair.public_key().to_string(),
(
self.config.get_listening_address(),
self.config.get_gossip_port(),
)
.into(),
)
}
fn gossip_config(&self) -> ChitchatConfig {
ChitchatConfig {
node_id: self.gossip_node_id(),
cluster_id: "hive_mind".to_string(),
gossip_interval: Duration::from_millis(500),
listen_addr: (
self.config.get_listening_address(),
self.config.get_gossip_port(),
)
.into(),
// We'd probably wanna get this from the nym_api, or have a few known good nodes, the ones we run probably
seed_nodes: vec![],
failure_detector_config: FailureDetectorConfig::default(),
is_ready_predicate: None,
}
}
pub async fn init_chitchat(&self) -> Result<ChitchatHandle, String> {
let chitchat_handler = spawn_chitchat(self.gossip_config(), Vec::new(), &UdpTransport)
.await
.map_err(|_| "Could not spawn chitchat")?;
Ok(chitchat_handler)
}
pub async fn run(&mut self) -> Result<(), String> {
info!("Starting nym mixnode");
if let Some(duplicate_node_key) = self.check_if_same_ip_node_exists().await {
if duplicate_node_key == self.identity_keypair.public_key().to_base58_string() {
warn!("You seem to have bonded your mixnode before starting it - that's highly unrecommended as in the future it might result in slashing");
} else {
log::error!(
"Our announce-host is identical to an existing node's announce-host! (its key is {:?})",
duplicate_node_key
);
return;
let err = format!("Our announce-host is identical to an existing node's announce-host! (its key is {:?})",
duplicate_node_key);
log::error!("{err}");
return Err(err);
}
}
@@ -303,14 +354,18 @@ impl MixNode {
delay_forwarding_channel,
shutdown.subscribe(),
);
let atomic_verloc_results = self.start_verloc_measurements(shutdown.subscribe());
let chitchat_handler = self.init_chitchat().await?;
let chitchat = chitchat_handler.chitchat();
let atomic_verloc_results =
self.start_verloc_measurements(chitchat.clone(), shutdown.subscribe());
// Rocket handles shutdown on it's own, but its shutdown handling should be incorporated
// with that of the rest of the tasks.
// Currently it's runtime is forcefully terminated once the mixnode exits.
self.start_http_api(atomic_verloc_results, node_stats_pointer);
self.start_http_api(atomic_verloc_results, node_stats_pointer, chitchat);
info!("Finished nym mixnode startup procedure - it should now be able to receive mix traffic!");
self.wait_for_interrupt(shutdown).await
self.wait_for_interrupt(shutdown).await;
Ok(())
}
}