Compare commits

...

1 Commits

Author SHA1 Message Date
durch 0877b6edae Add gossip to mixnodes 2023-03-17 11:49:52 +01:00
10 changed files with 176 additions and 31 deletions
Generated
+19
View File
@@ -505,6 +505,22 @@ dependencies = [
"zeroize", "zeroize",
] ]
[[package]]
name = "chitchat"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fec1bff4fabc59f0b7408478183a796ab0a9bcb0534bb413f3cf66f44ce2aca1"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"rand 0.8.5",
"serde",
"tokio",
"tokio-stream",
"tracing",
]
[[package]] [[package]]
name = "chrono" name = "chrono"
version = "0.4.23" version = "0.4.23"
@@ -2911,6 +2927,7 @@ name = "mixnode-common"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"bytes", "bytes",
"chitchat",
"futures", "futures",
"humantime-serde", "humantime-serde",
"log", "log",
@@ -2926,6 +2943,7 @@ dependencies = [
"nym-task", "nym-task",
"rand 0.8.5", "rand 0.8.5",
"serde", "serde",
"serde_json",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-util", "tokio-util",
@@ -3491,6 +3509,7 @@ dependencies = [
"anyhow", "anyhow",
"atty", "atty",
"bs58", "bs58",
"chitchat",
"clap 4.1.4", "clap 4.1.4",
"colored", "colored",
"cupid", "cupid",
+12 -2
View File
@@ -13,10 +13,18 @@ humantime-serde = "1.0"
log = { workspace = true } log = { workspace = true }
rand = "0.8" rand = "0.8"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.24.1", features = ["time", "macros", "rt", "net", "io-util"] } tokio = { version = "1.24.1", features = [
"time",
"macros",
"rt",
"net",
"io-util",
] }
tokio-util = { version = "0.7.4", features = ["codec"] } tokio-util = { version = "0.7.4", features = ["codec"] }
url = "2.2" url = "2.2"
thiserror = "1.0.37" thiserror = "1.0.37"
chitchat = "0.5"
serde_json = "1"
nym-crypto = { path = "../crypto" } nym-crypto = { path = "../crypto" }
nym-network-defaults = { path = "../network-defaults" } nym-network-defaults = { path = "../network-defaults" }
@@ -27,5 +35,7 @@ nym-sphinx-framing = { path = "../nymsphinx/framing" }
nym-sphinx-params = { path = "../nymsphinx/params" } nym-sphinx-params = { path = "../nymsphinx/params" }
nym-sphinx-types = { path = "../nymsphinx/types" } nym-sphinx-types = { path = "../nymsphinx/types" }
nym-task = { path = "../task" } nym-task = { path = "../task" }
validator-client = { path = "../client-libs/validator-client", features = ["nyxd-client"]} validator-client = { path = "../client-libs/validator-client", features = [
"nyxd-client",
] }
nym-bin-common = { path = "../bin-common" } nym-bin-common = { path = "../bin-common" }
+14 -1
View File
@@ -3,6 +3,7 @@
use crate::verloc::listener::PacketListener; use crate::verloc::listener::PacketListener;
use crate::verloc::sender::{PacketSender, TestedNode}; use crate::verloc::sender::{PacketSender, TestedNode};
use chitchat::Chitchat;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use log::*; use log::*;
@@ -16,6 +17,7 @@ use std::net::SocketAddr;
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::Mutex;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::sleep; use tokio::time::sleep;
use url::Url; use url::Url;
@@ -293,7 +295,7 @@ impl VerlocMeasurer {
MeasurementOutcome::Done MeasurementOutcome::Done
} }
pub async fn run(&mut self) { pub async fn run(&mut self, chitchat_handle: Arc<Mutex<Chitchat>>) {
self.start_listening(); self.start_listening();
while !self.shutdown_listener.is_shutdown() { while !self.shutdown_listener.is_shutdown() {
@@ -345,6 +347,17 @@ impl VerlocMeasurer {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let mut chitchat_guard = chitchat_handle.lock().await;
let cc_state = chitchat_guard.self_node_state();
let tested_nodes_cc = tested_nodes
.iter()
.map(|node| node.identity.to_string())
.collect::<Vec<String>>();
cc_state.set(
"tested_nodes",
serde_json::to_value(tested_nodes_cc).expect("Could not serialize"),
);
// on start of each run remove old results // on start of each run remove old results
self.results.reset_results(tested_nodes.len()).await; self.results.reset_results(tested_nodes.len()).await;
+1
View File
@@ -377,6 +377,7 @@ pub const DEFAULT_CLIENT_LISTENING_PORT: u16 = 9000;
// 'MIXNODE' // 'MIXNODE'
pub const DEFAULT_VERLOC_LISTENING_PORT: u16 = 1790; pub const DEFAULT_VERLOC_LISTENING_PORT: u16 = 1790;
pub const DEFAULT_HTTP_API_LISTENING_PORT: u16 = 8000; pub const DEFAULT_HTTP_API_LISTENING_PORT: u16 = 8000;
pub const DEFAULT_GOSSIP_PORT: u16 = 10000;
// 'CLIENT' // 'CLIENT'
pub const DEFAULT_WEBSOCKET_LISTENING_PORT: u16 = 1977; pub const DEFAULT_WEBSOCKET_LISTENING_PORT: u16 = 1977;
+21 -13
View File
@@ -29,32 +29,40 @@ log = { workspace = true }
pretty_env_logger = "0.4.0" pretty_env_logger = "0.4.0"
rand = "0.7.3" rand = "0.7.3"
rocket = { version = "0.5.0-rc.2", features = ["json"] } rocket = { version = "0.5.0-rc.2", features = ["json"] }
serde = { version="1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
sysinfo = "0.27.7" sysinfo = "0.27.7"
tokio = { version="1.21.2", features = ["rt-multi-thread", "net", "signal"] } tokio = { version = "1.21.2", features = ["rt-multi-thread", "net", "signal"] }
tokio-util = { version="0.7.3", features = ["codec"] } tokio-util = { version = "0.7.3", features = ["codec"] }
toml = "0.5.8" toml = "0.5.8"
url = { version = "2.2", features = ["serde"] } url = { version = "2.2", features = ["serde"] }
atty = "0.2" atty = "0.2"
## internal ## internal
nym-config = { path="../common/config" } nym-config = { path = "../common/config" }
nym-crypto = { path="../common/crypto" } nym-crypto = { path = "../common/crypto" }
nym-contracts-common = { path = "../common/cosmwasm-smart-contracts/contracts-common" } nym-contracts-common = { path = "../common/cosmwasm-smart-contracts/contracts-common" }
mixnet-client = { path="../common/client-libs/mixnet-client" } mixnet-client = { path = "../common/client-libs/mixnet-client" }
mixnode-common = { path="../common/mixnode-common" } mixnode-common = { path = "../common/mixnode-common" }
nym-nonexhaustive-delayqueue = { path="../common/nonexhaustive-delayqueue" } nym-nonexhaustive-delayqueue = { path = "../common/nonexhaustive-delayqueue" }
nym-sphinx = { path="../common/nymsphinx" } nym-sphinx = { path = "../common/nymsphinx" }
nym-pemstore = { path = "../common/pemstore", version = "0.2.0" } nym-pemstore = { path = "../common/pemstore", version = "0.2.0" }
nym-task = { path = "../common/task" } nym-task = { path = "../common/task" }
nym-types = { path = "../common/types" } nym-types = { path = "../common/types" }
nym-topology = { path="../common/topology" } nym-topology = { path = "../common/topology" }
validator-client = { path="../common/client-libs/validator-client" } validator-client = { path = "../common/client-libs/validator-client" }
nym-bin-common = { path="../common/bin-common" } nym-bin-common = { path = "../common/bin-common" }
# chichat
chitchat = "0.5"
[dev-dependencies] [dev-dependencies]
tokio = { version="1.21.2", features = ["rt-multi-thread", "net", "signal", "test-util"] } tokio = { version = "1.21.2", features = [
"rt-multi-thread",
"net",
"signal",
"test-util",
] }
nym-sphinx-types = { path = "../common/nymsphinx/types" } nym-sphinx-types = { path = "../common/nymsphinx/types" }
nym-sphinx-params = { path = "../common/nymsphinx/params" } nym-sphinx-params = { path = "../common/nymsphinx/params" }
+1 -1
View File
@@ -109,5 +109,5 @@ pub(crate) async fn execute(args: &Run, output: OutputFormat) {
Select the correct version and install it to your machine. You will need to provide the following: \n "); Select the correct version and install it to your machine. You will need to provide the following: \n ");
mixnode.print_node_details(output); mixnode.print_node_details(output);
mixnode.run().await mixnode.run().await.expect("Could not run mixnode")
} }
+11 -1
View File
@@ -4,7 +4,8 @@
use crate::config::template::config_template; use crate::config::template::config_template;
use nym_config::defaults::mainnet::NYM_API; use nym_config::defaults::mainnet::NYM_API;
use nym_config::defaults::{ use nym_config::defaults::{
DEFAULT_HTTP_API_LISTENING_PORT, DEFAULT_MIX_LISTENING_PORT, DEFAULT_VERLOC_LISTENING_PORT, DEFAULT_GOSSIP_PORT, DEFAULT_HTTP_API_LISTENING_PORT, DEFAULT_MIX_LISTENING_PORT,
DEFAULT_VERLOC_LISTENING_PORT,
}; };
use nym_config::NymConfig; use nym_config::NymConfig;
use serde::{Deserialize, Deserializer, Serialize}; use serde::{Deserialize, Deserializer, Serialize};
@@ -258,6 +259,10 @@ impl Config {
self.mixnode.http_api_port self.mixnode.http_api_port
} }
pub fn get_gossip_port(&self) -> u16 {
self.mixnode.gossip_port
}
pub fn get_packet_forwarding_initial_backoff(&self) -> Duration { pub fn get_packet_forwarding_initial_backoff(&self) -> Duration {
self.debug.packet_forwarding_initial_backoff self.debug.packet_forwarding_initial_backoff
} }
@@ -348,6 +353,10 @@ struct MixNode {
#[serde(default = "default_http_api_port")] #[serde(default = "default_http_api_port")]
http_api_port: u16, http_api_port: u16,
/// Port used for the gossip protocol
#[serde(default = "default_http_api_port")]
gossip_port: u16,
/// Path to file containing private identity key. /// Path to file containing private identity key.
#[serde(default = "missing_string_value")] #[serde(default = "missing_string_value")]
private_identity_key_file: PathBuf, private_identity_key_file: PathBuf,
@@ -402,6 +411,7 @@ impl Default for MixNode {
mix_port: DEFAULT_MIX_LISTENING_PORT, mix_port: DEFAULT_MIX_LISTENING_PORT,
verloc_port: DEFAULT_VERLOC_LISTENING_PORT, verloc_port: DEFAULT_VERLOC_LISTENING_PORT,
http_api_port: DEFAULT_HTTP_API_LISTENING_PORT, http_api_port: DEFAULT_HTTP_API_LISTENING_PORT,
gossip_port: DEFAULT_GOSSIP_PORT,
private_identity_key_file: Default::default(), private_identity_key_file: Default::default(),
public_identity_key_file: Default::default(), public_identity_key_file: Default::default(),
private_sphinx_key_file: Default::default(), private_sphinx_key_file: Default::default(),
+28
View File
@@ -0,0 +1,28 @@
use std::sync::Arc;
use chitchat::{Chitchat, ClusterStateSnapshot, NodeId};
use rocket::serde::json::Json;
use rocket::State;
use serde::Serialize;
use tokio::sync::Mutex;
#[derive(Serialize)]
pub struct ClusterState {
cluster_id: String,
cluster_state: ClusterStateSnapshot,
live_nodes: Vec<NodeId>,
dead_nodes: Vec<NodeId>,
}
/// Returns a description of the node and why someone might want to delegate stake to it.
#[get("/state")]
pub(crate) async fn state(chitchat: &State<Arc<Mutex<Chitchat>>>) -> Json<ClusterState> {
let chitchat_guard = chitchat.lock().await;
let cluster_state = ClusterState {
cluster_id: chitchat_guard.cluster_id().to_string(),
cluster_state: chitchat_guard.state_snapshot(),
live_nodes: chitchat_guard.live_nodes().cloned().collect::<Vec<_>>(),
dead_nodes: chitchat_guard.dead_nodes().cloned().collect::<Vec<_>>(),
};
Json(cluster_state)
}
+1
View File
@@ -1,4 +1,5 @@
pub(crate) mod description; pub(crate) mod description;
pub(crate) mod gossip;
pub(crate) mod hardware; pub(crate) mod hardware;
pub(crate) mod stats; pub(crate) mod stats;
pub(crate) mod verloc; pub(crate) mod verloc;
+67 -12
View File
@@ -5,6 +5,7 @@ use crate::config::persistence::pathfinder::MixNodePathfinder;
use crate::config::Config; use crate::config::Config;
use crate::node::http::{ use crate::node::http::{
description::description, description::description,
gossip,
hardware::hardware, hardware::hardware,
not_found, not_found,
stats::stats, stats::stats,
@@ -17,6 +18,10 @@ use crate::node::node_description::NodeDescription;
use crate::node::node_statistics::SharedNodeStats; use crate::node::node_statistics::SharedNodeStats;
use crate::node::packet_delayforwarder::{DelayForwarder, PacketDelayForwardSender}; use crate::node::packet_delayforwarder::{DelayForwarder, PacketDelayForwardSender};
use crate::OutputFormat; use crate::OutputFormat;
use chitchat::transport::UdpTransport;
use chitchat::{
spawn_chitchat, Chitchat, ChitchatConfig, ChitchatHandle, FailureDetectorConfig, NodeId,
};
use log::{error, info, warn}; use log::{error, info, warn};
use mixnode_common::verloc::{self, AtomicVerlocResult, VerlocMeasurer}; use mixnode_common::verloc::{self, AtomicVerlocResult, VerlocMeasurer};
use nym_bin_common::version_checker::parse_version; use nym_bin_common::version_checker::parse_version;
@@ -28,6 +33,8 @@ use rand::thread_rng;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::process; use std::process;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
mod http; mod http;
mod listener; mod listener;
@@ -110,6 +117,7 @@ impl MixNode {
&self, &self,
atomic_verloc_result: AtomicVerlocResult, atomic_verloc_result: AtomicVerlocResult,
node_stats_pointer: SharedNodeStats, node_stats_pointer: SharedNodeStats,
chitchat: Arc<Mutex<Chitchat>>,
) { ) {
info!("Starting HTTP API on http://localhost:8000"); info!("Starting HTTP API on http://localhost:8000");
@@ -125,11 +133,15 @@ impl MixNode {
tokio::spawn(async move { tokio::spawn(async move {
rocket::build() rocket::build()
.configure(config) .configure(config)
.mount("/", routes![verlocRoute, description, stats, hardware]) .mount(
"/",
routes![verlocRoute, description, stats, hardware, gossip::state],
)
.register("/", catchers![not_found]) .register("/", catchers![not_found])
.manage(verloc_state) .manage(verloc_state)
.manage(descriptor) .manage(descriptor)
.manage(node_stats_pointer) .manage(node_stats_pointer)
.manage(chitchat)
.launch() .launch()
.await .await
}); });
@@ -199,7 +211,11 @@ impl MixNode {
packet_sender packet_sender
} }
fn start_verloc_measurements(&self, shutdown: TaskClient) -> AtomicVerlocResult { fn start_verloc_measurements(
&self,
chitchat: Arc<Mutex<Chitchat>>,
shutdown: TaskClient,
) -> AtomicVerlocResult {
info!("Starting the round-trip-time measurer..."); info!("Starting the round-trip-time measurer...");
// this is a sanity check to make sure we didn't mess up with the minimum version at some point // this is a sanity check to make sure we didn't mess up with the minimum version at some point
@@ -235,7 +251,7 @@ impl MixNode {
let mut verloc_measurer = let mut verloc_measurer =
VerlocMeasurer::new(config, Arc::clone(&self.identity_keypair), shutdown); VerlocMeasurer::new(config, Arc::clone(&self.identity_keypair), shutdown);
let atomic_verloc_results = verloc_measurer.get_verloc_results_pointer(); let atomic_verloc_results = verloc_measurer.get_verloc_results_pointer();
tokio::spawn(async move { verloc_measurer.run().await }); tokio::spawn(async move { verloc_measurer.run(chitchat).await });
atomic_verloc_results atomic_verloc_results
} }
@@ -277,18 +293,53 @@ impl MixNode {
log::info!("Stopping nym mixnode"); log::info!("Stopping nym mixnode");
} }
pub async fn run(&mut self) { fn gossip_node_id(&self) -> NodeId {
NodeId::new(
self.identity_keypair.public_key().to_string(),
(
self.config.get_listening_address(),
self.config.get_gossip_port(),
)
.into(),
)
}
fn gossip_config(&self) -> ChitchatConfig {
ChitchatConfig {
node_id: self.gossip_node_id(),
cluster_id: "hive_mind".to_string(),
gossip_interval: Duration::from_millis(500),
listen_addr: (
self.config.get_listening_address(),
self.config.get_gossip_port(),
)
.into(),
// We'd probably wanna get this from the nym_api, or have a few known good nodes, the ones we run probably
seed_nodes: vec![],
failure_detector_config: FailureDetectorConfig::default(),
is_ready_predicate: None,
}
}
pub async fn init_chitchat(&self) -> Result<ChitchatHandle, String> {
let chitchat_handler = spawn_chitchat(self.gossip_config(), Vec::new(), &UdpTransport)
.await
.map_err(|_| "Could not spawn chitchat")?;
Ok(chitchat_handler)
}
pub async fn run(&mut self) -> Result<(), String> {
info!("Starting nym mixnode"); info!("Starting nym mixnode");
if let Some(duplicate_node_key) = self.check_if_same_ip_node_exists().await { if let Some(duplicate_node_key) = self.check_if_same_ip_node_exists().await {
if duplicate_node_key == self.identity_keypair.public_key().to_base58_string() { if duplicate_node_key == self.identity_keypair.public_key().to_base58_string() {
warn!("You seem to have bonded your mixnode before starting it - that's highly unrecommended as in the future it might result in slashing"); warn!("You seem to have bonded your mixnode before starting it - that's highly unrecommended as in the future it might result in slashing");
} else { } else {
log::error!( let err = format!("Our announce-host is identical to an existing node's announce-host! (its key is {:?})",
"Our announce-host is identical to an existing node's announce-host! (its key is {:?})", duplicate_node_key);
duplicate_node_key log::error!("{err}");
); return Err(err);
return;
} }
} }
@@ -303,14 +354,18 @@ impl MixNode {
delay_forwarding_channel, delay_forwarding_channel,
shutdown.subscribe(), shutdown.subscribe(),
); );
let atomic_verloc_results = self.start_verloc_measurements(shutdown.subscribe()); let chitchat_handler = self.init_chitchat().await?;
let chitchat = chitchat_handler.chitchat();
let atomic_verloc_results =
self.start_verloc_measurements(chitchat.clone(), shutdown.subscribe());
// Rocket handles shutdown on it's own, but its shutdown handling should be incorporated // Rocket handles shutdown on it's own, but its shutdown handling should be incorporated
// with that of the rest of the tasks. // with that of the rest of the tasks.
// Currently it's runtime is forcefully terminated once the mixnode exits. // Currently it's runtime is forcefully terminated once the mixnode exits.
self.start_http_api(atomic_verloc_results, node_stats_pointer); self.start_http_api(atomic_verloc_results, node_stats_pointer, chitchat);
info!("Finished nym mixnode startup procedure - it should now be able to receive mix traffic!"); info!("Finished nym mixnode startup procedure - it should now be able to receive mix traffic!");
self.wait_for_interrupt(shutdown).await self.wait_for_interrupt(shutdown).await;
Ok(())
} }
} }