Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ee951213e9 | |||
| 1ac262ec90 | |||
| 7c1fca8ce4 |
@@ -0,0 +1,7 @@
|
||||
.git
|
||||
.github
|
||||
.gitignore
|
||||
**/node_modules
|
||||
**/target
|
||||
dist
|
||||
documentation
|
||||
+4
-1
@@ -48,4 +48,7 @@ foxyfox.env
|
||||
|
||||
.next
|
||||
ppa-private-key.b64
|
||||
ppa-private-key.asc
|
||||
ppa-private-key.asc
|
||||
nym-network-monitor/topology.json
|
||||
nym-network-monitor/__pycache__
|
||||
nym-network-monitor/*.key
|
||||
Generated
+58
@@ -1858,6 +1858,7 @@ dependencies = [
|
||||
"lock_api",
|
||||
"once_cell",
|
||||
"parking_lot_core 0.9.10",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2397,6 +2398,12 @@ dependencies = [
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fixedbitset"
|
||||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
|
||||
|
||||
[[package]]
|
||||
name = "flate2"
|
||||
version = "1.0.30"
|
||||
@@ -4253,6 +4260,7 @@ dependencies = [
|
||||
"nym-sphinx",
|
||||
"nym-task",
|
||||
"nym-topology",
|
||||
"nym-types",
|
||||
"nym-validator-client",
|
||||
"nym-vesting-contract-common",
|
||||
"okapi",
|
||||
@@ -4579,6 +4587,7 @@ dependencies = [
|
||||
"nym-topology",
|
||||
"nym-validator-client",
|
||||
"rand 0.8.5",
|
||||
"rand_chacha 0.3.1",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2 0.10.8",
|
||||
@@ -5410,6 +5419,36 @@ dependencies = [
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nym-network-monitor"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"axum 0.7.5",
|
||||
"clap 4.5.7",
|
||||
"dashmap",
|
||||
"futures",
|
||||
"log",
|
||||
"nym-bin-common",
|
||||
"nym-crypto",
|
||||
"nym-network-defaults",
|
||||
"nym-sdk",
|
||||
"nym-sphinx",
|
||||
"nym-topology",
|
||||
"nym-types",
|
||||
"nym-validator-client",
|
||||
"petgraph",
|
||||
"rand 0.8.5",
|
||||
"rand_chacha 0.3.1",
|
||||
"reqwest 0.12.4",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"utoipa",
|
||||
"utoipa-swagger-ui",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nym-network-requester"
|
||||
version = "1.1.40"
|
||||
@@ -5575,6 +5614,7 @@ dependencies = [
|
||||
"nym-task",
|
||||
"nym-topology",
|
||||
"rand 0.8.5",
|
||||
"rand_chacha 0.3.1",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
@@ -5858,6 +5898,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"log",
|
||||
"nym-crypto",
|
||||
"nym-metrics",
|
||||
"nym-mixnet-contract-common",
|
||||
"nym-sphinx-acknowledgements",
|
||||
"nym-sphinx-addressing",
|
||||
@@ -5871,6 +5912,7 @@ dependencies = [
|
||||
"nym-sphinx-types",
|
||||
"nym-topology",
|
||||
"rand 0.8.5",
|
||||
"rand_chacha 0.3.1",
|
||||
"rand_distr",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
@@ -5927,12 +5969,17 @@ dependencies = [
|
||||
name = "nym-sphinx-chunking"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"dashmap",
|
||||
"log",
|
||||
"nym-crypto",
|
||||
"nym-metrics",
|
||||
"nym-sphinx-addressing",
|
||||
"nym-sphinx-params",
|
||||
"nym-sphinx-types",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"utoipa",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6046,6 +6093,7 @@ dependencies = [
|
||||
"nym-sphinx-routing",
|
||||
"nym-sphinx-types",
|
||||
"rand 0.8.5",
|
||||
"reqwest 0.12.4",
|
||||
"semver 0.11.0",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -6682,6 +6730,16 @@ dependencies = [
|
||||
"sha2 0.10.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "petgraph"
|
||||
version = "0.6.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db"
|
||||
dependencies = [
|
||||
"fixedbitset",
|
||||
"indexmap 2.2.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project"
|
||||
version = "1.1.5"
|
||||
|
||||
+2
-1
@@ -14,7 +14,6 @@ panic = "abort"
|
||||
opt-level = 3
|
||||
|
||||
[workspace]
|
||||
|
||||
resolver = "2"
|
||||
members = [
|
||||
"clients/native",
|
||||
@@ -106,6 +105,7 @@ members = [
|
||||
"service-providers/common",
|
||||
"service-providers/ip-packet-router",
|
||||
"service-providers/network-requester",
|
||||
"nym-network-monitor",
|
||||
"nym-api",
|
||||
"nym-browser-extension/storage",
|
||||
"nym-api/nym-api-requests",
|
||||
@@ -159,6 +159,7 @@ homepage = "https://nymtech.net"
|
||||
documentation = "https://nymtech.net"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
rust-version = "1.80"
|
||||
|
||||
[workspace.dependencies]
|
||||
addr = "0.15.6"
|
||||
|
||||
@@ -19,6 +19,7 @@ futures = { workspace = true }
|
||||
humantime-serde = { workspace = true }
|
||||
log = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
rand_chacha = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
sha2 = { workspace = true }
|
||||
|
||||
@@ -458,7 +458,7 @@ impl PacketStatisticsControl {
|
||||
|
||||
fn report_rates(&self) {
|
||||
if let Some((_, rates)) = self.rates.back() {
|
||||
log::info!("{}", rates.summary());
|
||||
log::debug!("{}", rates.summary());
|
||||
log::debug!("{}", rates.detailed_summary());
|
||||
}
|
||||
}
|
||||
@@ -486,7 +486,7 @@ impl PacketStatisticsControl {
|
||||
// Check what the number of retransmissions was during the recording window
|
||||
if let Some((_, start_stats)) = self.history.front() {
|
||||
let delta = self.stats.clone() - start_stats.clone();
|
||||
log::info!(
|
||||
log::debug!(
|
||||
"mix packet retransmissions/real mix packets: {}/{}",
|
||||
delta.retransmissions_queued,
|
||||
delta.real_packets_queued,
|
||||
|
||||
@@ -453,6 +453,7 @@ where
|
||||
|
||||
let mut pending_acks = Vec::with_capacity(fragments.len());
|
||||
let mut real_messages = Vec::with_capacity(fragments.len());
|
||||
debug!("Splitting message into {} fragments", fragments.len());
|
||||
for fragment in fragments {
|
||||
// we need to clone it because we need to keep it in memory in case we had to retransmit
|
||||
// it. And then we'd need to recreate entire ACK again.
|
||||
|
||||
@@ -90,4 +90,6 @@ default = ["http-client"]
|
||||
http-client = ["cosmrs/rpc"]
|
||||
generate-ts = []
|
||||
contract-testing = ["nym-mixnet-contract-common/contract-testing"]
|
||||
|
||||
# Features below are added to make clippy happy, it seems like they're unused we should remove them
|
||||
tendermint-rpc-http-client = ["tendermint-rpc/http-client"]
|
||||
tendermint-rpc-websocket-client = ["tendermint-rpc/websocket-client"]
|
||||
|
||||
@@ -49,5 +49,7 @@ pub const COMPUTE_REWARD_ESTIMATION: &str = "compute-reward-estimation";
|
||||
pub const AVG_UPTIME: &str = "avg_uptime";
|
||||
pub const STAKE_SATURATION: &str = "stake-saturation";
|
||||
pub const INCLUSION_CHANCE: &str = "inclusion-probability";
|
||||
pub const SUBMIT_GATEWAY: &str = "submit-gateway-monitoring-results";
|
||||
pub const SUBMIT_NODE: &str = "submit-node-monitoring-results";
|
||||
|
||||
pub const SERVICE_PROVIDERS: &str = "services";
|
||||
|
||||
@@ -300,8 +300,8 @@ where
|
||||
}
|
||||
|
||||
#[cfg(any(
|
||||
feature = "tendermint-rpc/http-client",
|
||||
feature = "tendermint-rpc/websocket-client"
|
||||
feature = "tendermint-rpc-http-client",
|
||||
feature = "tendermint-rpc-websocket-client"
|
||||
))]
|
||||
async fn wait_until_healthy<T>(&self, timeout: T) -> Result<(), Error>
|
||||
where
|
||||
|
||||
@@ -820,8 +820,8 @@ where
|
||||
}
|
||||
|
||||
#[cfg(any(
|
||||
feature = "tendermint-rpc/http-client",
|
||||
feature = "tendermint-rpc/websocket-client"
|
||||
feature = "tendermint-rpc-http-client",
|
||||
feature = "tendermint-rpc-websocket-client"
|
||||
))]
|
||||
async fn wait_until_healthy<T>(&self, timeout: T) -> Result<(), Error>
|
||||
where
|
||||
|
||||
@@ -300,8 +300,8 @@ pub trait TendermintRpcClient {
|
||||
}
|
||||
|
||||
#[cfg(any(
|
||||
feature = "tendermint-rpc/http-client",
|
||||
feature = "tendermint-rpc/websocket-client"
|
||||
feature = "tendermint-rpc-http-client",
|
||||
feature = "tendermint-rpc-websocket-client"
|
||||
))]
|
||||
/// Poll the `/health` endpoint until it returns a successful result or
|
||||
/// the given `timeout` has elapsed.
|
||||
@@ -518,8 +518,8 @@ mod non_wasm {
|
||||
}
|
||||
|
||||
#[cfg(any(
|
||||
feature = "tendermint-rpc/http-client",
|
||||
feature = "tendermint-rpc/websocket-client"
|
||||
feature = "tendermint-rpc-http-client",
|
||||
feature = "tendermint-rpc-websocket-client"
|
||||
))]
|
||||
async fn wait_until_healthy<T>(&self, timeout: T) -> Result<(), Error>
|
||||
where
|
||||
|
||||
@@ -22,7 +22,7 @@ use tracing::{debug, error};
|
||||
pub mod bandwidth;
|
||||
pub mod error;
|
||||
mod inboxes;
|
||||
pub(crate) mod models;
|
||||
pub mod models;
|
||||
mod shared_keys;
|
||||
mod tickets;
|
||||
#[cfg(feature = "wireguard")]
|
||||
|
||||
@@ -9,11 +9,12 @@ license.workspace = true
|
||||
[dependencies]
|
||||
futures = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
rand_chacha = { workspace = true }
|
||||
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros"]}
|
||||
tokio = { workspace = true, features = ["macros"] }
|
||||
|
||||
nym-crypto = { path = "../crypto", features = ["asymmetric"] }
|
||||
nym-task = { path = "../task" }
|
||||
|
||||
@@ -294,4 +294,8 @@ impl<R: CryptoRng + Rng> FragmentPreparer for NodeTester<R> {
|
||||
fn average_ack_delay(&self) -> Duration {
|
||||
self.average_ack_delay
|
||||
}
|
||||
|
||||
fn nonce(&self) -> i32 {
|
||||
1
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ repository = { workspace = true }
|
||||
log = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
rand_distr = { workspace = true }
|
||||
rand_chacha = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
|
||||
nym-sphinx-acknowledgements = { path = "acknowledgements" }
|
||||
@@ -27,10 +28,13 @@ nym-sphinx-types = { path = "types" }
|
||||
# to separate crate?
|
||||
nym-crypto = { path = "../crypto", version = "0.4.0" }
|
||||
nym-topology = { path = "../topology" }
|
||||
nym-metrics = { path = "../nym-metrics" }
|
||||
|
||||
[dev-dependencies]
|
||||
nym-mixnet-contract-common = { path = "../cosmwasm-smart-contracts/mixnet-contract" }
|
||||
nym-crypto = { path = "../crypto", version = "0.4.0", features = ["asymmetric"] }
|
||||
nym-crypto = { path = "../crypto", version = "0.4.0", features = [
|
||||
"asymmetric",
|
||||
] }
|
||||
|
||||
# do not include this when compiling into wasm as it somehow when combined together with reqwest, it will require
|
||||
# net2 via tokio-util -> tokio -> mio -> net2
|
||||
@@ -43,5 +47,13 @@ features = ["sync"]
|
||||
|
||||
[features]
|
||||
default = ["sphinx"]
|
||||
sphinx = ["nym-crypto/sphinx", "nym-sphinx-params/sphinx", "nym-sphinx-types/sphinx"]
|
||||
outfox = ["nym-crypto/outfox", "nym-sphinx-params/outfox", "nym-sphinx-types/outfox"]
|
||||
sphinx = [
|
||||
"nym-crypto/sphinx",
|
||||
"nym-sphinx-params/sphinx",
|
||||
"nym-sphinx-types/sphinx",
|
||||
]
|
||||
outfox = [
|
||||
"nym-crypto/outfox",
|
||||
"nym-sphinx-params/outfox",
|
||||
"nym-sphinx-types/outfox",
|
||||
]
|
||||
|
||||
@@ -13,7 +13,14 @@ repository = { workspace = true }
|
||||
log = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
dashmap = { workspace = true, features = ["serde"] }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
utoipa = { workspace = true }
|
||||
|
||||
nym-sphinx-addressing = { path = "../addressing" }
|
||||
nym-sphinx-params = { path = "../params" }
|
||||
nym-sphinx-types = { path = "../types" }
|
||||
nym-metrics = { path = "../../nym-metrics" }
|
||||
nym-crypto = { path = "../../crypto", version = "0.4.0", features = [
|
||||
"asymmetric",
|
||||
] }
|
||||
|
||||
@@ -3,6 +3,8 @@
|
||||
|
||||
use crate::ChunkingError;
|
||||
use nym_sphinx_params::{SerializedFragmentIdentifier, FRAG_ID_LEN};
|
||||
use serde::Serialize;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
use std::fmt::{self, Debug, Formatter};
|
||||
|
||||
@@ -58,7 +60,7 @@ pub const COVER_FRAG_ID: FragmentIdentifier = FragmentIdentifier {
|
||||
/// and u8 position of the `Fragment` in the set.
|
||||
// TODO: this should really be redesigned, especially how cover and reply messages are really
|
||||
// "abusing" this. They should work with it natively instead.
|
||||
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, Ord, PartialOrd)]
|
||||
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, Ord, PartialOrd, Serialize)]
|
||||
pub struct FragmentIdentifier {
|
||||
set_id: i32,
|
||||
fragment_position: u8,
|
||||
@@ -75,6 +77,10 @@ impl fmt::Display for FragmentIdentifier {
|
||||
}
|
||||
|
||||
impl FragmentIdentifier {
|
||||
pub fn set_id(&self) -> i32 {
|
||||
self.set_id
|
||||
}
|
||||
|
||||
pub fn to_bytes(self) -> SerializedFragmentIdentifier {
|
||||
debug_assert_eq!(FRAG_ID_LEN, 5);
|
||||
|
||||
@@ -125,6 +131,10 @@ impl Debug for Fragment {
|
||||
}
|
||||
|
||||
impl Fragment {
|
||||
pub fn header(&self) -> FragmentHeader {
|
||||
self.header.clone()
|
||||
}
|
||||
|
||||
/// Tries to encapsulate provided payload slice and metadata into a `Fragment`.
|
||||
/// It can fail if payload would not fully fit in a single `Fragment` or some of the metadata
|
||||
/// is malformed or self-contradictory, for example if current_fragment > total_fragments.
|
||||
@@ -216,6 +226,10 @@ impl Fragment {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn seed(&self) -> i32 {
|
||||
self.header().seed()
|
||||
}
|
||||
|
||||
/// Gets the size of payload contained in this `Fragment`.
|
||||
pub fn payload_size(&self) -> usize {
|
||||
self.payload.len()
|
||||
@@ -297,8 +311,8 @@ impl Fragment {
|
||||
/// there is 7 bytes of overhead inside each sphinx packet sent
|
||||
/// and for the longest messages, without upper bound, there is usually also only 7 bytes
|
||||
/// of overhead apart from first and last fragments in each set that instead have 10 bytes of overhead.
|
||||
#[derive(PartialEq, Clone, Debug)]
|
||||
pub(crate) struct FragmentHeader {
|
||||
#[derive(PartialEq, Clone, Debug, Serialize, ToSchema)]
|
||||
pub struct FragmentHeader {
|
||||
/// ID associated with `FragmentSet` to which this particular `Fragment` belongs.
|
||||
/// Its value is restricted to (0, i32::MAX].
|
||||
/// Note that it *excludes* 0, but *includes* i32::MAX.
|
||||
@@ -324,6 +338,20 @@ pub(crate) struct FragmentHeader {
|
||||
}
|
||||
|
||||
impl FragmentHeader {
|
||||
pub fn seed(&self) -> i32 {
|
||||
let mut seed = self.id;
|
||||
seed = seed.wrapping_mul(self.total_fragments as i32);
|
||||
seed = seed.wrapping_mul(self.current_fragment as i32);
|
||||
seed
|
||||
}
|
||||
|
||||
pub fn total_fragments(&self) -> u8 {
|
||||
self.total_fragments
|
||||
}
|
||||
|
||||
pub fn current_fragment(&self) -> u8 {
|
||||
self.current_fragment
|
||||
}
|
||||
/// Tries to create a new `FragmentHeader` using provided metadata. Bunch of logical
|
||||
/// checks are performed to see if the data is not self-contradictory,
|
||||
/// for example if current_fragment > total_fragments.
|
||||
|
||||
@@ -1,9 +1,16 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::sync::LazyLock;
|
||||
|
||||
use crate::fragment::{linked_fragment_payload_max_len, unlinked_fragment_payload_max_len};
|
||||
use dashmap::DashMap;
|
||||
use fragment::{Fragment, FragmentHeader};
|
||||
use nym_crypto::asymmetric::ed25519::PublicKey;
|
||||
use serde::Serialize;
|
||||
pub use set::split_into_sets;
|
||||
use thiserror::Error;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
pub const MIN_PADDING_OVERHEAD: usize = 1;
|
||||
|
||||
@@ -22,6 +29,118 @@ pub mod fragment;
|
||||
pub mod reconstruction;
|
||||
pub mod set;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FragmentMixParams {
|
||||
destination: PublicKey,
|
||||
hops: u8,
|
||||
}
|
||||
|
||||
impl FragmentMixParams {
|
||||
pub fn destination(&self) -> &PublicKey {
|
||||
&self.destination
|
||||
}
|
||||
|
||||
pub fn hops(&self) -> u8 {
|
||||
self.hops
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, ToSchema)]
|
||||
pub struct SentFragment {
|
||||
header: FragmentHeader,
|
||||
at: u64,
|
||||
client_nonce: i32,
|
||||
#[serde(skip)]
|
||||
mixnet_params: FragmentMixParams,
|
||||
}
|
||||
|
||||
impl SentFragment {
|
||||
fn new(
|
||||
header: FragmentHeader,
|
||||
at: u64,
|
||||
client_nonce: i32,
|
||||
destination: PublicKey,
|
||||
hops: u8,
|
||||
) -> Self {
|
||||
let mixnet_params = FragmentMixParams { destination, hops };
|
||||
SentFragment {
|
||||
header,
|
||||
at,
|
||||
client_nonce,
|
||||
mixnet_params,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn header(&self) -> FragmentHeader {
|
||||
self.header.clone()
|
||||
}
|
||||
|
||||
pub fn at(&self) -> u64 {
|
||||
self.at
|
||||
}
|
||||
|
||||
pub fn client_nonce(&self) -> i32 {
|
||||
self.client_nonce
|
||||
}
|
||||
|
||||
pub fn seed(&self) -> i32 {
|
||||
self.header().seed().wrapping_mul(self.client_nonce())
|
||||
}
|
||||
|
||||
pub fn mixnet_params(&self) -> FragmentMixParams {
|
||||
self.mixnet_params.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, ToSchema)]
|
||||
pub struct ReceivedFragment {
|
||||
header: FragmentHeader,
|
||||
at: u64,
|
||||
}
|
||||
|
||||
impl ReceivedFragment {
|
||||
fn new(header: FragmentHeader, at: u64) -> Self {
|
||||
ReceivedFragment { header, at }
|
||||
}
|
||||
|
||||
pub fn header(&self) -> FragmentHeader {
|
||||
self.header.clone()
|
||||
}
|
||||
|
||||
pub fn at(&self) -> u64 {
|
||||
self.at
|
||||
}
|
||||
}
|
||||
|
||||
pub static FRAGMENTS_RECEIVED: LazyLock<DashMap<i32, Vec<ReceivedFragment>>> =
|
||||
LazyLock::new(DashMap::new);
|
||||
|
||||
pub static FRAGMENTS_SENT: LazyLock<DashMap<i32, Vec<SentFragment>>> = LazyLock::new(DashMap::new);
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! now {
|
||||
() => {
|
||||
match std::time::SystemTime::now().duration_since(std::time::SystemTime::UNIX_EPOCH) {
|
||||
Ok(n) => n.as_secs(),
|
||||
Err(_) => 0,
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub fn fragment_received(fragment: &Fragment) {
|
||||
let id = fragment.fragment_identifier().set_id();
|
||||
let mut entry = FRAGMENTS_RECEIVED.entry(id).or_default();
|
||||
let r = ReceivedFragment::new(fragment.header(), now!());
|
||||
entry.push(r);
|
||||
}
|
||||
|
||||
pub fn fragment_sent(fragment: &Fragment, client_nonce: i32, destination: PublicKey, hops: u8) {
|
||||
let id = fragment.fragment_identifier().set_id();
|
||||
let mut entry = FRAGMENTS_SENT.entry(id).or_default();
|
||||
let s = SentFragment::new(fragment.header(), now!(), client_nonce, destination, hops);
|
||||
entry.push(s);
|
||||
}
|
||||
|
||||
/// The idea behind the process of chunking is to incur as little data overhead as possible due
|
||||
/// to very computationally costly sphinx encapsulation procedure.
|
||||
///
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
use crate::fragment::Fragment;
|
||||
use crate::ChunkingError;
|
||||
use crate::{fragment_received, ChunkingError};
|
||||
use log::*;
|
||||
use std::collections::HashMap;
|
||||
|
||||
@@ -66,6 +66,12 @@ impl ReconstructionBuffer {
|
||||
// if the set is complete.
|
||||
debug_assert!(self.is_complete);
|
||||
|
||||
debug!(
|
||||
"Got {} fragments for set id {}",
|
||||
self.fragments.len(),
|
||||
self.fragments[0].as_ref().unwrap().id()
|
||||
);
|
||||
|
||||
self.fragments
|
||||
.into_iter()
|
||||
.map(|fragment| fragment.unwrap().extract_payload())
|
||||
@@ -104,6 +110,8 @@ impl ReconstructionBuffer {
|
||||
}
|
||||
});
|
||||
|
||||
fragment_received(&fragment);
|
||||
|
||||
let fragment_index = fragment.current_fragment() as usize - 1;
|
||||
if self.fragments[fragment_index].is_some() {
|
||||
// TODO: what to do in that case? give up on the message? overwrite it? panic?
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
use crate::message::{NymMessage, ACK_OVERHEAD, OUTFOX_ACK_OVERHEAD};
|
||||
use crate::NymPayloadBuilder;
|
||||
use log::debug;
|
||||
use nym_crypto::asymmetric::encryption;
|
||||
use nym_crypto::Digest;
|
||||
use nym_sphinx_acknowledgements::surb_ack::SurbAck;
|
||||
@@ -11,12 +12,14 @@ use nym_sphinx_addressing::clients::Recipient;
|
||||
use nym_sphinx_addressing::nodes::NymNodeRoutingAddress;
|
||||
use nym_sphinx_anonymous_replies::reply_surb::ReplySurb;
|
||||
use nym_sphinx_chunking::fragment::{Fragment, FragmentIdentifier};
|
||||
use nym_sphinx_chunking::fragment_sent;
|
||||
use nym_sphinx_forwarding::packet::MixPacket;
|
||||
use nym_sphinx_params::packet_sizes::PacketSize;
|
||||
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm, DEFAULT_NUM_MIX_HOPS};
|
||||
use nym_sphinx_types::{Delay, NymPacket};
|
||||
use nym_topology::{NymTopology, NymTopologyError};
|
||||
use rand::{CryptoRng, Rng};
|
||||
use rand::{CryptoRng, Rng, SeedableRng};
|
||||
use rand_chacha::ChaCha20Rng;
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -49,6 +52,7 @@ pub trait FragmentPreparer {
|
||||
type Rng: CryptoRng + Rng;
|
||||
|
||||
fn rng(&mut self) -> &mut Self::Rng;
|
||||
fn nonce(&self) -> i32;
|
||||
fn num_mix_hops(&self) -> u8;
|
||||
fn average_packet_delay(&self) -> Duration;
|
||||
fn average_ack_delay(&self) -> Duration;
|
||||
@@ -192,9 +196,18 @@ pub trait FragmentPreparer {
|
||||
packet_type: PacketType,
|
||||
mix_hops: Option<u8>,
|
||||
) -> Result<PreparedFragment, NymTopologyError> {
|
||||
debug!("Preparing chunk for sending");
|
||||
// each plain or repliable packet (i.e. not a reply) attaches an ephemeral public key so that the recipient
|
||||
// could perform diffie-hellman with its own keys followed by a kdf to re-derive
|
||||
// the packet encryption key
|
||||
|
||||
let seed = fragment.seed().wrapping_mul(self.nonce());
|
||||
let mut rng = ChaCha20Rng::seed_from_u64(seed as u64);
|
||||
|
||||
let destination = packet_recipient.gateway();
|
||||
let hops = mix_hops.unwrap_or(self.num_mix_hops());
|
||||
fragment_sent(&fragment, self.nonce(), *destination, hops);
|
||||
|
||||
let non_reply_overhead = encryption::PUBLIC_KEY_SIZE;
|
||||
let expected_plaintext = match packet_type {
|
||||
PacketType::Outfox => {
|
||||
@@ -228,10 +241,8 @@ pub trait FragmentPreparer {
|
||||
};
|
||||
|
||||
// generate pseudorandom route for the packet
|
||||
let hops = mix_hops.unwrap_or(self.num_mix_hops());
|
||||
log::trace!("Preparing chunk for sending with {} mix hops", hops);
|
||||
let route =
|
||||
topology.random_route_to_gateway(self.rng(), hops, packet_recipient.gateway())?;
|
||||
let route = topology.random_route_to_gateway(&mut rng, hops, destination)?;
|
||||
let destination = packet_recipient.as_sphinx_destination();
|
||||
|
||||
// including set of delays
|
||||
@@ -313,6 +324,8 @@ pub struct MessagePreparer<R> {
|
||||
/// Number of mix hops each packet ('real' message, ack, reply) is expected to take.
|
||||
/// Note that it does not include gateway hops.
|
||||
num_mix_hops: u8,
|
||||
|
||||
nonce: i32,
|
||||
}
|
||||
|
||||
impl<R> MessagePreparer<R>
|
||||
@@ -325,12 +338,15 @@ where
|
||||
average_packet_delay: Duration,
|
||||
average_ack_delay: Duration,
|
||||
) -> Self {
|
||||
let mut rng = rng;
|
||||
let nonce = rng.gen();
|
||||
MessagePreparer {
|
||||
rng,
|
||||
sender_address,
|
||||
average_packet_delay,
|
||||
average_ack_delay,
|
||||
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
|
||||
nonce,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -454,6 +470,10 @@ impl<R: CryptoRng + Rng> FragmentPreparer for MessagePreparer<R> {
|
||||
fn average_ack_delay(&self) -> Duration {
|
||||
self.average_ack_delay
|
||||
}
|
||||
|
||||
fn nonce(&self) -> i32 {
|
||||
self.nonce
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@@ -60,7 +60,7 @@ pub(super) async fn run_outbound(
|
||||
|
||||
loop {
|
||||
select! {
|
||||
connection_message = &mut mix_receiver.next() => {
|
||||
connection_message = mix_receiver.next() => {
|
||||
if let Some(connection_message) = connection_message {
|
||||
if deal_with_message(connection_message, &mut writer, &local_destination_address, &remote_source_address, connection_id).await {
|
||||
break;
|
||||
|
||||
@@ -5,7 +5,6 @@ edition = { workspace = true }
|
||||
authors = { workspace = true }
|
||||
license = { workspace = true }
|
||||
repository = { workspace = true }
|
||||
readme = { workspace = true }
|
||||
homepage = { workspace = true }
|
||||
documentation = { workspace = true }
|
||||
|
||||
@@ -15,9 +14,10 @@ documentation = { workspace = true }
|
||||
bs58 = { workspace = true }
|
||||
log = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
thiserror = { workspace = true }
|
||||
async-trait = { workspace = true, optional = true }
|
||||
semver = "0.11"
|
||||
semver = { version = "0.11" }
|
||||
|
||||
# 'serializable' feature
|
||||
serde = { workspace = true, features = ["derive"], optional = true }
|
||||
@@ -28,20 +28,22 @@ tsify = { workspace = true, features = ["js"], optional = true }
|
||||
wasm-bindgen = { workspace = true, optional = true }
|
||||
|
||||
## internal
|
||||
nym-bin-common = { path = "../bin-common" }
|
||||
nym-config = { path = "../config" }
|
||||
nym-crypto = { path = "../crypto", features = ["sphinx", "outfox"] }
|
||||
nym-mixnet-contract-common = { path = "../cosmwasm-smart-contracts/mixnet-contract" }
|
||||
nym-sphinx-addressing = { path = "../nymsphinx/addressing" }
|
||||
nym-sphinx-types = { path = "../nymsphinx/types", features = ["sphinx", "outfox"] }
|
||||
nym-sphinx-types = { path = "../nymsphinx/types", features = [
|
||||
"sphinx",
|
||||
"outfox",
|
||||
] }
|
||||
nym-sphinx-routing = { path = "../nymsphinx/routing" }
|
||||
nym-bin-common = { path = "../bin-common" }
|
||||
|
||||
|
||||
# I'm not sure how to feel about pulling in this dependency here...
|
||||
nym-api-requests = { path = "../../nym-api/nym-api-requests" }
|
||||
|
||||
|
||||
# 'serializable' feature
|
||||
nym-config = { path = "../config", optional = true }
|
||||
|
||||
# 'wasm-serde-types' feature
|
||||
wasm-utils = { path = "../wasm/utils", default-features = false, optional = true }
|
||||
|
||||
@@ -49,4 +51,5 @@ wasm-utils = { path = "../wasm/utils", default-features = false, optional = true
|
||||
default = ["provider-trait"]
|
||||
provider-trait = ["async-trait"]
|
||||
wasm-serde-types = ["tsify", "wasm-bindgen", "wasm-utils"]
|
||||
serializable = ["serde", "nym-config", "serde_json"]
|
||||
serializable = ["serde", "serde_json"]
|
||||
outfox = []
|
||||
|
||||
@@ -51,4 +51,16 @@ pub enum NymTopologyError {
|
||||
|
||||
#[error("{0}")]
|
||||
PacketError(#[from] NymPacketError),
|
||||
|
||||
#[error("{0}")]
|
||||
ReqwestError(#[from] reqwest::Error),
|
||||
|
||||
#[error("{0}")]
|
||||
MixnodeConversionError(#[from] crate::mix::MixnodeConversionError),
|
||||
|
||||
#[error("{0}")]
|
||||
GatewayConversionError(#[from] crate::gateway::GatewayConversionError),
|
||||
|
||||
#[error("{0}")]
|
||||
VarError(#[from] std::env::VarError),
|
||||
}
|
||||
|
||||
@@ -6,7 +6,9 @@
|
||||
|
||||
use crate::filter::VersionFilterable;
|
||||
pub use error::NymTopologyError;
|
||||
use log::{debug, warn};
|
||||
use log::{debug, info, warn};
|
||||
use mix::Node;
|
||||
use nym_config::defaults::var_names::NYM_API;
|
||||
use nym_mixnet_contract_common::mixnode::MixNodeDetails;
|
||||
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, MixId};
|
||||
use nym_sphinx_addressing::nodes::NodeIdentity;
|
||||
@@ -116,13 +118,40 @@ impl Display for NetworkAddress {
|
||||
|
||||
pub type MixLayer = u8;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct NymTopology {
|
||||
mixes: BTreeMap<MixLayer, Vec<mix::Node>>,
|
||||
gateways: Vec<gateway::Node>,
|
||||
}
|
||||
|
||||
impl NymTopology {
|
||||
pub async fn new_from_env() -> Result<Self, NymTopologyError> {
|
||||
let api_url = std::env::var(NYM_API)?;
|
||||
|
||||
info!("Generating topology from {}", api_url);
|
||||
|
||||
let mixnodes = reqwest::get(&format!("{}/v1/mixnodes", api_url))
|
||||
.await?
|
||||
.json::<Vec<MixNodeDetails>>()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|details| details.bond_information)
|
||||
.map(mix::Node::try_from)
|
||||
.filter(Result::is_ok)
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
let gateways = reqwest::get(&format!("{}/v1/gateways", api_url))
|
||||
.await?
|
||||
.json::<Vec<GatewayBond>>()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(gateway::Node::try_from)
|
||||
.filter(Result::is_ok)
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
let topology = NymTopology::new_unordered(mixnodes, gateways);
|
||||
Ok(topology)
|
||||
}
|
||||
|
||||
pub fn new(mixes: BTreeMap<MixLayer, Vec<mix::Node>>, gateways: Vec<gateway::Node>) -> Self {
|
||||
NymTopology { mixes, gateways }
|
||||
}
|
||||
@@ -270,7 +299,7 @@ impl NymTopology {
|
||||
&self,
|
||||
rng: &mut R,
|
||||
num_mix_hops: u8,
|
||||
) -> Result<Vec<SphinxNode>, NymTopologyError>
|
||||
) -> Result<Vec<Node>, NymTopologyError>
|
||||
where
|
||||
R: Rng + CryptoRng + ?Sized,
|
||||
{
|
||||
@@ -295,12 +324,32 @@ impl NymTopology {
|
||||
let random_mix = layer_mixes
|
||||
.choose(rng)
|
||||
.ok_or(NymTopologyError::EmptyMixLayer { layer })?;
|
||||
route.push(random_mix.into());
|
||||
route.push(random_mix.clone());
|
||||
}
|
||||
|
||||
Ok(route)
|
||||
}
|
||||
|
||||
pub fn random_path_to_gateway<R>(
|
||||
&self,
|
||||
rng: &mut R,
|
||||
num_mix_hops: u8,
|
||||
gateway_identity: &NodeIdentity,
|
||||
) -> Result<(Vec<mix::Node>, gateway::Node), NymTopologyError>
|
||||
where
|
||||
R: Rng + CryptoRng + ?Sized,
|
||||
{
|
||||
let gateway = self.get_gateway(gateway_identity).ok_or(
|
||||
NymTopologyError::NonExistentGatewayError {
|
||||
identity_key: gateway_identity.to_base58_string(),
|
||||
},
|
||||
)?;
|
||||
|
||||
let path = self.random_mix_route(rng, num_mix_hops)?;
|
||||
|
||||
Ok((path, gateway.clone()))
|
||||
}
|
||||
|
||||
/// Tries to create a route to the specified gateway, such that it goes through mixnode on layer 1,
|
||||
/// mixnode on layer2, .... mixnode on layer n and finally the target gateway
|
||||
pub fn random_route_to_gateway<R>(
|
||||
@@ -321,6 +370,7 @@ impl NymTopology {
|
||||
Ok(self
|
||||
.random_mix_route(rng, num_mix_hops)?
|
||||
.into_iter()
|
||||
.map(|node| SphinxNode::from(&node))
|
||||
.chain(std::iter::once(gateway.into()))
|
||||
.collect())
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ version = "1.0.0"
|
||||
description = "Nym common types"
|
||||
authors.workspace = true
|
||||
edition = "2021"
|
||||
rust-version = "1.58"
|
||||
rust-version.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -11,6 +11,7 @@ pub mod gas;
|
||||
pub mod gateway;
|
||||
pub mod helpers;
|
||||
pub mod mixnode;
|
||||
pub mod monitoring;
|
||||
pub mod pending_events;
|
||||
pub mod transaction;
|
||||
pub mod vesting;
|
||||
|
||||
@@ -0,0 +1,131 @@
|
||||
use std::{collections::HashSet, sync::LazyLock, time::SystemTime};
|
||||
|
||||
use nym_crypto::asymmetric::identity::{PrivateKey, PublicKey, Signature};
|
||||
use nym_mixnet_contract_common::MixId;
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
static NETWORK_MONITORS: LazyLock<HashSet<String>> = LazyLock::new(|| {
|
||||
let mut nm = HashSet::new();
|
||||
nm.insert("5VsPyLbsBCq9PAMWmjKkToteVAKNabNqex6QwDf5fWzt".to_string());
|
||||
nm
|
||||
});
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
|
||||
pub struct NodeResult {
|
||||
pub node_id: MixId,
|
||||
pub identity: String,
|
||||
pub reliability: u8,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
|
||||
pub struct MixnodeResult {
|
||||
pub mix_id: MixId,
|
||||
pub identity: String,
|
||||
pub owner: String,
|
||||
pub reliability: u8,
|
||||
}
|
||||
|
||||
impl MixnodeResult {
|
||||
pub fn new(mix_id: MixId, identity: String, owner: String, reliability: u8) -> Self {
|
||||
MixnodeResult {
|
||||
mix_id,
|
||||
identity,
|
||||
owner,
|
||||
reliability,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
|
||||
pub struct GatewayResult {
|
||||
pub identity: String,
|
||||
pub owner: String,
|
||||
pub reliability: u8,
|
||||
pub mix_id: MixId,
|
||||
}
|
||||
|
||||
impl GatewayResult {
|
||||
pub fn new(identity: String, owner: String, reliability: u8) -> Self {
|
||||
GatewayResult {
|
||||
identity,
|
||||
owner,
|
||||
reliability,
|
||||
mix_id: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, JsonSchema)]
|
||||
#[serde(untagged)]
|
||||
pub enum MonitorResults {
|
||||
Mixnode(Vec<MixnodeResult>),
|
||||
Gateway(Vec<GatewayResult>),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, JsonSchema)]
|
||||
pub struct MonitorMessage {
|
||||
results: Vec<NodeResult>,
|
||||
signature: String,
|
||||
signer: String,
|
||||
timestamp: i64,
|
||||
}
|
||||
|
||||
impl MonitorMessage {
|
||||
fn message_to_sign(results: &[NodeResult], timestamp: i64) -> Vec<u8> {
|
||||
let mut msg = match serde_json::to_vec(results) {
|
||||
Ok(msg) => msg,
|
||||
Err(_) => Vec::new(),
|
||||
};
|
||||
msg.extend_from_slice(×tamp.to_le_bytes());
|
||||
msg
|
||||
}
|
||||
|
||||
pub fn timely(&self) -> bool {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.expect("Time went backwards")
|
||||
.as_secs() as i64;
|
||||
|
||||
now - self.timestamp < 5
|
||||
}
|
||||
|
||||
pub fn new(results: Vec<NodeResult>, private_key: &PrivateKey) -> Self {
|
||||
let timestamp = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.expect("Time went backwards")
|
||||
.as_secs() as i64;
|
||||
|
||||
let msg = Self::message_to_sign(&results, timestamp);
|
||||
let signature = private_key.sign(&msg);
|
||||
let public_key = private_key.public_key();
|
||||
|
||||
MonitorMessage {
|
||||
results,
|
||||
signature: signature.to_base58_string(),
|
||||
signer: public_key.to_base58_string(),
|
||||
timestamp,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_allowed(&self) -> bool {
|
||||
NETWORK_MONITORS.contains(&self.signer)
|
||||
}
|
||||
|
||||
pub fn results(&self) -> &[NodeResult] {
|
||||
&self.results
|
||||
}
|
||||
|
||||
pub fn verify(&self) -> bool {
|
||||
let msg = Self::message_to_sign(&self.results, self.timestamp);
|
||||
|
||||
let signature = match Signature::from_base58_string(&self.signature) {
|
||||
Ok(sig) => sig,
|
||||
Err(_) => return false,
|
||||
};
|
||||
|
||||
PublicKey::from_base58_string(&self.signer)
|
||||
.map(|pk| pk.verify(msg, &signature).is_ok())
|
||||
.unwrap_or(false)
|
||||
}
|
||||
}
|
||||
@@ -30,13 +30,9 @@ workspace = true
|
||||
optional = true
|
||||
|
||||
[features]
|
||||
default = ["sleep", "console_error_panic_hook"]
|
||||
default = ["sleep"]
|
||||
sleep = ["web-sys", "web-sys/Window"]
|
||||
websocket = [
|
||||
"getrandom",
|
||||
"tungstenite",
|
||||
"gloo-net"
|
||||
]
|
||||
websocket = ["getrandom", "tungstenite", "gloo-net"]
|
||||
crypto = [
|
||||
"web-sys",
|
||||
"web-sys/Crypto",
|
||||
|
||||
@@ -84,6 +84,7 @@ pub struct WireguardData {
|
||||
#[cfg(target_os = "linux")]
|
||||
pub async fn start_wireguard<St: nym_gateway_storage::Storage + 'static>(
|
||||
storage: St,
|
||||
all_peers: Vec<nym_gateway_storage::models::WireguardPeer>,
|
||||
task_client: nym_task::TaskClient,
|
||||
wireguard_data: WireguardData,
|
||||
control_tx: UnboundedSender<peer_controller::PeerControlResponse>,
|
||||
@@ -95,7 +96,7 @@ pub async fn start_wireguard<St: nym_gateway_storage::Storage + 'static>(
|
||||
|
||||
let mut peers = vec![];
|
||||
let mut suspended_peers = vec![];
|
||||
for storage_peer in storage.get_all_wireguard_peers().await? {
|
||||
for storage_peer in all_peers {
|
||||
let suspended = storage_peer.suspended;
|
||||
let peer = Peer::try_from(storage_peer)?;
|
||||
if suspended {
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
CONFIGURED=true
|
||||
|
||||
NETWORK_NAME=mainnet
|
||||
|
||||
RUST_LOG=info
|
||||
RUST_BACKTRACE=1
|
||||
|
||||
BECH32_PREFIX=n
|
||||
MIX_DENOM=unym
|
||||
MIX_DENOM_DISPLAY=nym
|
||||
STAKE_DENOM=unyx
|
||||
STAKE_DENOM_DISPLAY=nyx
|
||||
DENOMS_EXPONENT=6
|
||||
|
||||
MIXNET_CONTRACT_ADDRESS=n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr
|
||||
VESTING_CONTRACT_ADDRESS=n1nc5tatafv6eyq7llkr2gv50ff9e22mnf70qgjlv737ktmt4eswrq73f2nw
|
||||
GROUP_CONTRACT_ADDRESS=n1e2zq4886zzewpvpucmlw8v9p7zv692f6yck4zjzxh699dkcmlrfqk2knsr
|
||||
MULTISIG_CONTRACT_ADDRESS=n1txayqfz5g9qww3rlflpg025xd26m9payz96u54x4fe3s2ktz39xqk67gzx
|
||||
COCONUT_DKG_CONTRACT_ADDRESS=n19604yflqggs9mk2z26mqygq43q2kr3n932egxx630svywd5mpxjsztfpvx
|
||||
|
||||
REWARDING_VALIDATOR_ADDRESS=n10yyd98e2tuwu0f7ypz9dy3hhjw7v772q6287gy
|
||||
STATISTICS_SERVICE_DOMAIN_ADDRESS="https://mainnet-stats.nymte.ch:8090"
|
||||
NYXD="https://rpc.nymtech.net"
|
||||
NYM_API="http://127.0.0.1:8000"
|
||||
NYXD_WS="wss://rpc.nymtech.net/websocket"
|
||||
EXPLORER_API="https://explorer.nymtech.net/api/"
|
||||
NYM_VPN_API="https://nymvpn.com/api"
|
||||
+4
-5
@@ -13,11 +13,10 @@ DENOMS_EXPONENT=6
|
||||
REWARDING_VALIDATOR_ADDRESS=n1pefc2utwpy5w78p2kqdsfmpjxfwmn9d39k5mqa
|
||||
MIXNET_CONTRACT_ADDRESS=n1xr3rq8yvd7qplsw5yx90ftsr2zdhg4e9z60h5duusgxpv72hud3sjkxkav
|
||||
VESTING_CONTRACT_ADDRESS=n1unyuj8qnmygvzuex3dwmg9yzt9alhvyeat0uu0jedg2wj33efl5qackslz
|
||||
ECASH_CONTRACT_ADDRESS=n1ljlwey4xdj0zs7zueepc48nkr033fca6fjgvurfvttqegm8dvsrswsul70
|
||||
GROUP_CONTRACT_ADDRESS=n10v3rjnq4cjyccfykyams68ztce337gksuu6f0lvtl4meuwvkewaqru4uav
|
||||
MULTISIG_CONTRACT_ADDRESS=n1cemnu8as0ls45v3caunpesl8jlsfw2ff9rlwnltlecp7zrxct4dsqc2y42
|
||||
COCONUT_DKG_CONTRACT_ADDRESS=n1zx96qgd88vqlzcxkpwzks7kqs5ctrx36xtzfc58p7q6c4ng9anlqzc4nh8
|
||||
|
||||
GROUP_CONTRACT_ADDRESS=n1ewmwz97xm0h8rdk8sw7h9mwn866qkx9hl9zlmagqfkhuzvwk5hhq844ue9
|
||||
MULTISIG_CONTRACT_ADDRESS=n1tz0setr8vkh9udp8xyxgpqc89ns27k4d0jx2h942hr0ax63yjhmqz6xct8
|
||||
COCONUT_DKG_CONTRACT_ADDRESS=n1v3n2ly2dp3a9ng3ff6rh26yfkn0pc5hed7w2shc5u9ca5c865utqj5elvh
|
||||
ECASH_CONTRACT_ADDRESS=n1v3vydvs2ued84yv3khqwtgldmgwn0elljsdh08dr5s2j9x4rc5fs9jlwz9
|
||||
|
||||
STATISTICS_SERVICE_DOMAIN_ADDRESS="http://0.0.0.0"
|
||||
EXPLORER_API=https://sandbox-explorer.nymtech.net/api
|
||||
|
||||
@@ -204,8 +204,8 @@ pub enum GatewayError {
|
||||
WireguardInterfaceError(#[from] defguard_wireguard_rs::error::WireguardInterfaceError),
|
||||
|
||||
#[cfg(all(feature = "wireguard", target_os = "linux"))]
|
||||
#[error("wireguard not set")]
|
||||
WireguardNotSet,
|
||||
#[error("internal wireguard error {0}")]
|
||||
InternalWireguardError(String),
|
||||
|
||||
#[error("failed to start authenticator: {source}")]
|
||||
AuthenticatorStartError {
|
||||
|
||||
+21
-1
@@ -267,12 +267,29 @@ impl<St> Gateway<St> {
|
||||
forwarding_channel,
|
||||
router_tx,
|
||||
);
|
||||
let all_peers = self.storage.get_all_wireguard_peers().await?;
|
||||
let used_private_network_ips = all_peers
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(|wireguard_peer| {
|
||||
defguard_wireguard_rs::host::Peer::try_from(wireguard_peer).map(|mut peer| {
|
||||
peer.allowed_ips
|
||||
.pop()
|
||||
.ok_or(Box::new(GatewayError::InternalWireguardError(format!(
|
||||
"no private IP set for peer {}",
|
||||
peer.public_key
|
||||
))))
|
||||
.map(|p| p.ip)
|
||||
})
|
||||
})
|
||||
.collect::<Result<Result<Vec<_>, _>, _>>()??;
|
||||
|
||||
if let Some(wireguard_data) = self.wireguard_data.take() {
|
||||
let (on_start_tx, on_start_rx) = oneshot::channel();
|
||||
let mut authenticator_server = nym_authenticator::Authenticator::new(
|
||||
opts.config.clone(),
|
||||
wireguard_data.inner.clone(),
|
||||
used_private_network_ips,
|
||||
peer_response_rx,
|
||||
)
|
||||
.with_custom_gateway_transceiver(Box::new(transceiver))
|
||||
@@ -306,6 +323,7 @@ impl<St> Gateway<St> {
|
||||
|
||||
let wg_api = nym_wireguard::start_wireguard(
|
||||
self.storage.clone(),
|
||||
all_peers,
|
||||
shutdown,
|
||||
wireguard_data,
|
||||
peer_response_tx,
|
||||
@@ -317,7 +335,9 @@ impl<St> Gateway<St> {
|
||||
handle: LocalEmbeddedClientHandle::new(start_data.address, auth_mix_sender),
|
||||
})
|
||||
} else {
|
||||
Err(Box::new(GatewayError::WireguardNotSet))
|
||||
Err(Box::new(GatewayError::InternalWireguardError(
|
||||
"wireguard not set".to_string(),
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+14
-4
@@ -28,10 +28,13 @@ dirs = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
itertools = { workspace = true }
|
||||
humantime-serde = { workspace = true }
|
||||
k256 = { workspace = true, features = ["ecdsa-core"] } # needed for the Verifier trait; pull whatever version is used by other dependencies
|
||||
k256 = { workspace = true, features = [
|
||||
"ecdsa-core",
|
||||
] } # needed for the Verifier trait; pull whatever version is used by other dependencies
|
||||
log = { workspace = true }
|
||||
pin-project = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
rand_chacha = { workspace = true }
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
rocket = { workspace = true, features = ["json"] }
|
||||
rocket_cors = { workspace = true }
|
||||
@@ -40,7 +43,12 @@ serde_json = { workspace = true }
|
||||
tap = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
time = { workspace = true, features = ["serde-human-readable", "parsing"] }
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal", "time"] }
|
||||
tokio = { workspace = true, features = [
|
||||
"rt-multi-thread",
|
||||
"macros",
|
||||
"signal",
|
||||
"time",
|
||||
] }
|
||||
tokio-stream = { workspace = true }
|
||||
url = { workspace = true }
|
||||
|
||||
@@ -82,7 +90,9 @@ nym-credentials-interface = { path = "../common/credentials-interface" }
|
||||
#nym-ephemera-common = { path = "../common/cosmwasm-smart-contracts/ephemera" }
|
||||
nym-config = { path = "../common/config" }
|
||||
cosmwasm-std = { workspace = true }
|
||||
nym-credential-storage = { path = "../common/credential-storage", features = ["persistent-storage"] }
|
||||
nym-credential-storage = { path = "../common/credential-storage", features = [
|
||||
"persistent-storage",
|
||||
] }
|
||||
nym-credentials = { path = "../common/credentials" }
|
||||
nym-crypto = { path = "../common/crypto" }
|
||||
cw2 = { workspace = true }
|
||||
@@ -105,6 +115,7 @@ nym-validator-client = { path = "../common/client-libs/validator-client" }
|
||||
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
|
||||
nym-node-tester-utils = { path = "../common/node-tester-utils" }
|
||||
nym-node-requests = { path = "../nym-node/nym-node-requests" }
|
||||
nym-types = { path = "../common/types" }
|
||||
|
||||
[features]
|
||||
no-reward = []
|
||||
@@ -125,4 +136,3 @@ cw3 = { workspace = true }
|
||||
cw-utils = { workspace = true }
|
||||
rand_chacha = { workspace = true }
|
||||
sha2 = "0.9"
|
||||
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
CREATE TABLE mixnode_details_v2
|
||||
(
|
||||
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||
mix_id INTEGER NOT NULL UNIQUE,
|
||||
owner VARCHAR NOT NULL,
|
||||
identity_key VARCHAR NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE mixnode_status_v2
|
||||
(
|
||||
mixnode_details_id INTEGER NOT NULL,
|
||||
reliability INTEGER NOT NULL,
|
||||
timestamp INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE gateway_details_v2
|
||||
(
|
||||
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||
owner VARCHAR NOT NULL,
|
||||
identity VARCHAR NOT NULL UNIQUE
|
||||
);
|
||||
|
||||
CREATE TABLE gateway_status_v2
|
||||
(
|
||||
gateway_details_id INTEGER NOT NULL,
|
||||
reliability INTEGER NOT NULL,
|
||||
timestamp INTEGER NOT NULL
|
||||
);
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
DROP TABLE IF EXISTS mixnode_details_v2;
|
||||
|
||||
CREATE TABLE mixnode_details_v2
|
||||
(
|
||||
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||
node_id INTEGER NOT NULL UNIQUE,
|
||||
identity_key VARCHAR NOT NULL
|
||||
);
|
||||
|
||||
DROP TABLE IF EXISTS gateway_details_v2;
|
||||
|
||||
CREATE TABLE gateway_details_v2
|
||||
(
|
||||
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||
node_id INTEGER NOT NULL UNIQUE,
|
||||
identity VARCHAR NOT NULL UNIQUE
|
||||
);
|
||||
|
||||
|
||||
@@ -0,0 +1,10 @@
|
||||
DROP TABLE IF EXISTS gateway_details_v2;
|
||||
|
||||
CREATE TABLE gateway_details_v2
|
||||
(
|
||||
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||
node_id INTEGER NOT NULL,
|
||||
identity VARCHAR NOT NULL UNIQUE
|
||||
);
|
||||
|
||||
|
||||
@@ -56,6 +56,7 @@ impl RewardedSetUpdater {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::doc_lazy_continuation)]
|
||||
// This is where the epoch gets advanced, and all epoch related transactions originate
|
||||
/// Upon each epoch having finished the following actions are executed by this nym-api:
|
||||
/// 1. it computes the rewards for each node using the ephemera channel for the epoch that
|
||||
|
||||
@@ -4,8 +4,8 @@
|
||||
use crate::network_monitor::monitor::preparer::InvalidNode;
|
||||
use crate::network_monitor::test_packet::NodeTestMessage;
|
||||
use crate::network_monitor::test_route::TestRoute;
|
||||
use nym_mixnet_contract_common::MixId;
|
||||
use nym_node_tester_utils::node::{NodeType, TestableNode};
|
||||
use nym_types::monitoring::{GatewayResult, MixnodeResult};
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::{Display, Formatter};
|
||||
|
||||
@@ -20,42 +20,6 @@ const UNRELIABLE_THRESHOLD: u8 = 1; // 1 - 60
|
||||
// from the average result, remove this data and recalculate scores.
|
||||
// const ALLOWED_RELIABILITY_DEVIATION: f32 = 5.0;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct MixnodeResult {
|
||||
pub(crate) mix_id: MixId,
|
||||
pub(crate) identity: String,
|
||||
pub(crate) owner: String,
|
||||
pub(crate) reliability: u8,
|
||||
}
|
||||
|
||||
impl MixnodeResult {
|
||||
pub(crate) fn new(mix_id: MixId, identity: String, owner: String, reliability: u8) -> Self {
|
||||
MixnodeResult {
|
||||
mix_id,
|
||||
identity,
|
||||
owner,
|
||||
reliability,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct GatewayResult {
|
||||
pub(crate) identity: String,
|
||||
pub(crate) owner: String,
|
||||
pub(crate) reliability: u8,
|
||||
}
|
||||
|
||||
impl GatewayResult {
|
||||
pub(crate) fn new(identity: String, owner: String, reliability: u8) -> Self {
|
||||
GatewayResult {
|
||||
identity,
|
||||
owner,
|
||||
reliability,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct RouteResult {
|
||||
pub(crate) route: TestRoute,
|
||||
|
||||
@@ -54,6 +54,8 @@ pub(crate) fn node_status_routes(
|
||||
routes::get_gateways_detailed_unfiltered,
|
||||
routes::unstable::mixnode_test_results,
|
||||
routes::unstable::gateway_test_results,
|
||||
routes::submit_gateway_monitoring_results,
|
||||
routes::submit_node_monitoring_results,
|
||||
]
|
||||
} else {
|
||||
// in the minimal variant we would not have access to endpoints relying on existence
|
||||
|
||||
@@ -10,6 +10,7 @@ use nym_api_requests::models::{
|
||||
UptimeResponse,
|
||||
};
|
||||
use nym_mixnet_contract_common::MixId;
|
||||
use nym_types::monitoring::MonitorMessage;
|
||||
use rocket::serde::json::Json;
|
||||
use rocket::State;
|
||||
use rocket_okapi::openapi;
|
||||
@@ -29,6 +30,92 @@ use crate::node_status_api::models::ErrorResponse;
|
||||
use crate::storage::NymApiStorage;
|
||||
use crate::NymContractCache;
|
||||
|
||||
#[openapi(tag = "status")]
|
||||
#[post("/submit-gateway-monitoring-results", data = "<message>")]
|
||||
pub(crate) async fn submit_gateway_monitoring_results(
|
||||
message: Json<MonitorMessage>,
|
||||
storage: &State<NymApiStorage>,
|
||||
) -> Result<(), ErrorResponse> {
|
||||
if !message.from_allowed() {
|
||||
return Err(ErrorResponse::new(
|
||||
"Monitor not registered to submit results".to_string(),
|
||||
rocket::http::Status::Forbidden,
|
||||
));
|
||||
}
|
||||
|
||||
if !message.timely() {
|
||||
return Err(ErrorResponse::new(
|
||||
"Message is too old".to_string(),
|
||||
rocket::http::Status::BadRequest,
|
||||
));
|
||||
}
|
||||
|
||||
if !message.verify() {
|
||||
return Err(ErrorResponse::new(
|
||||
"Invalid signature".to_string(),
|
||||
rocket::http::Status::BadRequest,
|
||||
));
|
||||
}
|
||||
|
||||
match storage
|
||||
.manager
|
||||
.submit_gateway_statuses_v2(message.results())
|
||||
.await
|
||||
{
|
||||
Ok(_) => Ok(()),
|
||||
Err(err) => {
|
||||
error!("failed to submit gateway monitoring results: {}", err);
|
||||
Err(ErrorResponse::new(
|
||||
"failed to submit gateway monitoring results".to_string(),
|
||||
rocket::http::Status::InternalServerError,
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[openapi(tag = "status")]
|
||||
#[post("/submit-node-monitoring-results", data = "<message>")]
|
||||
pub(crate) async fn submit_node_monitoring_results(
|
||||
message: Json<MonitorMessage>,
|
||||
storage: &State<NymApiStorage>,
|
||||
) -> Result<(), ErrorResponse> {
|
||||
if !message.from_allowed() {
|
||||
return Err(ErrorResponse::new(
|
||||
"Monitor not registered to submit results".to_string(),
|
||||
rocket::http::Status::Forbidden,
|
||||
));
|
||||
}
|
||||
|
||||
if !message.timely() {
|
||||
return Err(ErrorResponse::new(
|
||||
"Message is too old".to_string(),
|
||||
rocket::http::Status::BadRequest,
|
||||
));
|
||||
}
|
||||
|
||||
if !message.verify() {
|
||||
return Err(ErrorResponse::new(
|
||||
"Invalid signature".to_string(),
|
||||
rocket::http::Status::BadRequest,
|
||||
));
|
||||
}
|
||||
|
||||
match storage
|
||||
.manager
|
||||
.submit_mixnode_statuses_v2(message.results())
|
||||
.await
|
||||
{
|
||||
Ok(_) => Ok(()),
|
||||
Err(err) => {
|
||||
error!("failed to submit node monitoring results: {}", err);
|
||||
Err(ErrorResponse::new(
|
||||
"failed to submit node monitoring results".to_string(),
|
||||
rocket::http::Status::InternalServerError,
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[openapi(tag = "status")]
|
||||
#[get("/gateway/<identity>/report")]
|
||||
pub(crate) async fn gateway_report(
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
use crate::network_monitor::monitor::summary_producer::{GatewayResult, MixnodeResult};
|
||||
use crate::node_status_api::models::{HistoricalUptime, Uptime};
|
||||
use crate::node_status_api::utils::{ActiveGatewayStatuses, ActiveMixnodeStatuses};
|
||||
use crate::support::storage::models::{
|
||||
@@ -8,6 +7,8 @@ use crate::support::storage::models::{
|
||||
TestedGatewayStatus, TestedMixnodeStatus, TestingRoute,
|
||||
};
|
||||
use nym_mixnet_contract_common::{EpochId, IdentityKey, MixId};
|
||||
use nym_types::monitoring::{GatewayResult, MixnodeResult, NodeResult};
|
||||
use time::OffsetDateTime;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct StorageManager {
|
||||
@@ -502,6 +503,47 @@ impl StorageManager {
|
||||
tx.commit().await
|
||||
}
|
||||
|
||||
pub(crate) async fn submit_mixnode_statuses_v2(
|
||||
&self,
|
||||
mixnode_results: &[NodeResult],
|
||||
) -> Result<(), sqlx::Error> {
|
||||
info!("Inserting {} mixnode statuses", mixnode_results.len());
|
||||
|
||||
let timestamp = OffsetDateTime::now_utc().unix_timestamp();
|
||||
// insert it all in a transaction to make sure all nodes are updated at the same time
|
||||
// (plus it's a nice guard against new nodes)
|
||||
let mut tx = self.connection_pool.begin().await?;
|
||||
for mixnode_result in mixnode_results {
|
||||
let mixnode_id = sqlx::query!(
|
||||
r#"
|
||||
INSERT OR IGNORE INTO mixnode_details_v2(node_id, identity_key) VALUES (?, ?);
|
||||
SELECT id FROM mixnode_details_v2 WHERE node_id = ?;
|
||||
"#,
|
||||
mixnode_result.node_id,
|
||||
mixnode_result.identity,
|
||||
mixnode_result.node_id,
|
||||
)
|
||||
.fetch_one(&mut tx)
|
||||
.await?
|
||||
.id;
|
||||
|
||||
// insert the actual status
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO mixnode_status_v2 (mixnode_details_id, reliability, timestamp) VALUES (?, ?, ?);
|
||||
"#,
|
||||
mixnode_id,
|
||||
mixnode_result.reliability,
|
||||
timestamp
|
||||
)
|
||||
.execute(&mut tx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// finally commit the transaction
|
||||
tx.commit().await
|
||||
}
|
||||
|
||||
/// Tries to submit gateway [`NodeResult`] from the network monitor to the database.
|
||||
///
|
||||
/// # Arguments
|
||||
@@ -551,6 +593,51 @@ impl StorageManager {
|
||||
tx.commit().await
|
||||
}
|
||||
|
||||
pub(crate) async fn submit_gateway_statuses_v2(
|
||||
&self,
|
||||
gateway_results: &[NodeResult],
|
||||
) -> Result<(), sqlx::Error> {
|
||||
info!("Inserting {} gateway statuses", gateway_results.len());
|
||||
|
||||
let timestamp = OffsetDateTime::now_utc().unix_timestamp();
|
||||
// insert it all in a transaction to make sure all nodes are updated at the same time
|
||||
// (plus it's a nice guard against new nodes)
|
||||
let mut tx = self.connection_pool.begin().await?;
|
||||
|
||||
for gateway_result in gateway_results {
|
||||
// if gateway info doesn't exist, insert it and get its id
|
||||
|
||||
// same ID "problem" as described for mixnode insertion
|
||||
let gateway_id = sqlx::query!(
|
||||
r#"
|
||||
INSERT OR IGNORE INTO gateway_details_v2(identity, node_id) VALUES (?, ?);
|
||||
SELECT id FROM gateway_details_v2 WHERE identity = ?;
|
||||
"#,
|
||||
gateway_result.identity,
|
||||
gateway_result.node_id,
|
||||
gateway_result.identity,
|
||||
)
|
||||
.fetch_one(&mut tx)
|
||||
.await?
|
||||
.id;
|
||||
|
||||
// insert the actual status
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO gateway_status_v2 (gateway_details_id, reliability, timestamp) VALUES (?, ?, ?);
|
||||
"#,
|
||||
gateway_id,
|
||||
gateway_result.reliability,
|
||||
timestamp
|
||||
)
|
||||
.execute(&mut tx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// finally commit the transaction
|
||||
tx.commit().await
|
||||
}
|
||||
|
||||
/// Saves the information about which nodes were used as core nodes during this particular
|
||||
/// network monitor test run.
|
||||
///
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::network_monitor::monitor::summary_producer::{GatewayResult, MixnodeResult};
|
||||
use crate::network_monitor::test_route::TestRoute;
|
||||
use crate::node_status_api::models::{
|
||||
GatewayStatusReport, GatewayUptimeHistory, MixnodeStatusReport, MixnodeUptimeHistory,
|
||||
@@ -14,6 +13,7 @@ use crate::support::storage::models::{
|
||||
GatewayDetails, MixnodeDetails, TestedGatewayStatus, TestedMixnodeStatus,
|
||||
};
|
||||
use nym_mixnet_contract_common::MixId;
|
||||
use nym_types::monitoring::{GatewayResult, MixnodeResult};
|
||||
use rocket::fairing::AdHoc;
|
||||
use sqlx::ConnectOptions;
|
||||
use std::path::Path;
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
FROM rust:latest AS builder
|
||||
|
||||
COPY ./ /usr/src/nym
|
||||
WORKDIR /usr/src/nym/nym-network-monitor
|
||||
RUN cargo build --release
|
||||
|
||||
FROM locustio/locust
|
||||
EXPOSE 8089
|
||||
COPY --from=builder /usr/src/nym/target/release/nym-network-monitor /bin/nym-network-monitor
|
||||
COPY --from=builder /usr/src/nym/nym-network-monitor/locustfile.py locustfile.py
|
||||
COPY --from=builder /usr/src/nym/nym-network-monitor/entrypoint.sh entrypoint.sh
|
||||
COPY --from=builder /usr/src/nym/envs/mainnet.env mainnet.env
|
||||
|
||||
ENTRYPOINT ["./entrypoint.sh"]
|
||||
@@ -0,0 +1,39 @@
|
||||
[package]
|
||||
name = "nym-network-monitor"
|
||||
version = "0.1.0"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
documentation.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
axum = { workspace = true, features = ["json"] }
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
dashmap = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
log = { workspace = true }
|
||||
petgraph = "0.6.5"
|
||||
rand = { workspace = true }
|
||||
rand_chacha = { workspace = true }
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros", "time"] }
|
||||
tokio-util = { workspace = true }
|
||||
utoipa = { workspace = true, features = ["axum_extras"] }
|
||||
utoipa-swagger-ui = { workspace = true, features = ["axum"] }
|
||||
|
||||
# internal
|
||||
nym-bin-common = { path = "../common/bin-common" }
|
||||
nym-crypto = { path = "../common/crypto" }
|
||||
nym-network-defaults = { path = "../common/network-defaults" }
|
||||
nym-sdk = { path = "../sdk/rust/nym-sdk" }
|
||||
nym-sphinx = { path = "../common/nymsphinx" }
|
||||
nym-topology = { path = "../common/topology" }
|
||||
nym-types = { path = "../common/types" }
|
||||
nym-validator-client = { path = "../common/client-libs/validator-client" }
|
||||
@@ -0,0 +1,47 @@
|
||||
# Nym Network Monitor
|
||||
|
||||
Monitors the Nym network by sending itself packages across the mixnet.
|
||||
|
||||
Network monitor is running two tokio tasks, one manages mixnet clients and another manages monitoring itself. Monitor is designed to be driven externally, via an HTTP api. This means that it does not do any monitoring unless driven by something like [`locust`](https://locust.io/). This allows us to tailor the load externally, potentially distributing it across multiple monitors.
|
||||
|
||||
### Client manager
|
||||
|
||||
On start network monitor will spawn `C` clients, with 10 being the default. Random client is dropped every `T`, defaults to 60 seconds, and a new one is created. Clients chose a random gateway to connect to the mixnet. Meaning that on average all gateways will be tested in `NUMBER_OF_GATEWAYS/N*T`, assuming at least one request per client per T.
|
||||
|
||||
### Network monitor API
|
||||
|
||||
Swagger UI is available at `/v1/ui/`, ie `http://localhost:8080/v1/ui/`
|
||||
|
||||
### Driving the monitor with Locust
|
||||
|
||||
+ Head over to https://locust.io/ and get `locust`
|
||||
+ Start everything
|
||||
```bash
|
||||
# Start the network monitor
|
||||
cargo run --release
|
||||
|
||||
# Start locus in a separate terminal
|
||||
python -m locust -H http://127.0.0.1:8080 --processes 4
|
||||
```
|
||||
+ Head over to http://127.0.0.1:8089/ and start a testing run
|
||||
|
||||
## Usage
|
||||
|
||||
```bash
|
||||
Usage: nym-network-monitor [OPTIONS]
|
||||
|
||||
Options:
|
||||
-C, --clients <N_CLIENTS> Number of clients to spawn [default: 10]
|
||||
-T, --client-lifetime <CLIENT_LIFETIME> Lifetime of each client in seconds [default: 60]
|
||||
--port <PORT> Port to listen on [default: 8080]
|
||||
--host <HOST> Host to listen on [default: 127.0.0.1]
|
||||
-t, --topology <TOPOLOGY> Path to the topology file
|
||||
-e, --env <ENV> Path to the environment file
|
||||
-m, --mixnet-timeout <MIXNET_TIMEOUT> [default: 10]
|
||||
--generate-key-pair
|
||||
--private-key <PRIVATE_KEY>
|
||||
-h, --help Print help
|
||||
-V, --version Print version
|
||||
```
|
||||
|
||||
|
||||
Executable
+21
@@ -0,0 +1,21 @@
|
||||
#!/bin/bash
|
||||
# Takes timeout in seconds as the first argument, defaults to 60
|
||||
# Takes number of users as the second argument, defaults to 10
|
||||
|
||||
set -ex
|
||||
|
||||
users=${2:-10}
|
||||
timeout=${1:-600}
|
||||
|
||||
RUST_LOG=info nym-network-monitor --env mainnet.env --host 127.0.0.1 --port 8080 &
|
||||
nnm_pid=$!
|
||||
|
||||
sleep 10
|
||||
|
||||
python -m locust -H http://127.0.0.1:8080 --processes 4 --autostart --autoquit 60 -u "$users" -t "$timeout"s &
|
||||
locust_pid=$!
|
||||
|
||||
wait $locust_pid
|
||||
kill -2 $nnm_pid
|
||||
|
||||
exit $?
|
||||
@@ -0,0 +1,7 @@
|
||||
from locust import HttpUser, task
|
||||
|
||||
|
||||
class SendMsg(HttpUser):
|
||||
@task
|
||||
def hello_world(self):
|
||||
self.client.post("/v1/send")
|
||||
@@ -0,0 +1,382 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use anyhow::Result;
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use log::{debug, info};
|
||||
use nym_sphinx::chunking::{SentFragment, FRAGMENTS_RECEIVED, FRAGMENTS_SENT};
|
||||
use nym_topology::{gateway, mix, NymTopology};
|
||||
use nym_types::monitoring::{MonitorMessage, NodeResult};
|
||||
use nym_validator_client::nym_api::routes::{API_VERSION, STATUS, SUBMIT_GATEWAY, SUBMIT_NODE};
|
||||
use rand::SeedableRng;
|
||||
use rand_chacha::ChaCha8Rng;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utoipa::ToSchema;
|
||||
|
||||
use crate::{NYM_API_URL, PRIVATE_KEY, TOPOLOGY};
|
||||
|
||||
struct HydratedRoute {
|
||||
mix_nodes: Vec<mix::Node>,
|
||||
gateway_node: gateway::Node,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default, ToSchema)]
|
||||
struct GatewayStats(u32, u32, Option<String>);
|
||||
|
||||
impl GatewayStats {
|
||||
fn new(sent: u32, recv: u32, owner: Option<String>) -> Self {
|
||||
GatewayStats(sent, recv, owner)
|
||||
}
|
||||
|
||||
fn success(&self) -> u32 {
|
||||
self.0
|
||||
}
|
||||
|
||||
fn failed(&self) -> u32 {
|
||||
self.1
|
||||
}
|
||||
|
||||
fn reliability(&self) -> f64 {
|
||||
self.success() as f64 / (self.success() + self.failed()) as f64
|
||||
}
|
||||
|
||||
fn incr_success(&mut self) {
|
||||
self.0 += 1;
|
||||
}
|
||||
|
||||
fn incr_failure(&mut self) {
|
||||
self.1 += 1;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default, ToSchema)]
|
||||
pub struct NetworkAccount {
|
||||
complete_fragment_sets: HashSet<i32>,
|
||||
incomplete_fragment_sets: HashSet<i32>,
|
||||
missing_fragments: HashMap<i32, Vec<u8>>,
|
||||
complete_routes: Vec<Vec<u32>>,
|
||||
gateway_stats: HashMap<String, GatewayStats>,
|
||||
incomplete_routes: Vec<Vec<u32>>,
|
||||
#[serde(skip)]
|
||||
topology: NymTopology,
|
||||
tested_nodes: HashSet<u32>,
|
||||
#[serde(skip)]
|
||||
mix_details: HashMap<u32, mix::Node>,
|
||||
#[serde(skip)]
|
||||
gateway_details: HashMap<String, gateway::Node>,
|
||||
}
|
||||
|
||||
impl NetworkAccount {
|
||||
pub fn tested_nodes(&self) -> &HashSet<u32> {
|
||||
&self.tested_nodes
|
||||
}
|
||||
|
||||
pub fn node_stats(&self, id: u32) -> NodeStats {
|
||||
let complete_routes = self.complete_for_id(id);
|
||||
let incomplete_routes = self.incomplete_for_id(id);
|
||||
let node = self
|
||||
.mix_details
|
||||
.get(&id)
|
||||
.expect("Has to be in here, since we've put it in!");
|
||||
NodeStats::new(
|
||||
id,
|
||||
complete_routes,
|
||||
incomplete_routes,
|
||||
node.identity_key.to_base58_string(),
|
||||
node.owner.clone(),
|
||||
)
|
||||
}
|
||||
|
||||
fn complete_for_id(&self, id: u32) -> usize {
|
||||
self.complete_routes()
|
||||
.iter()
|
||||
.filter(|r| r.contains(&id))
|
||||
.count()
|
||||
}
|
||||
|
||||
fn incomplete_for_id(&self, id: u32) -> usize {
|
||||
self.incomplete_routes()
|
||||
.iter()
|
||||
.filter(|r| r.contains(&id))
|
||||
.count()
|
||||
}
|
||||
|
||||
pub fn complete_routes(&self) -> &Vec<Vec<u32>> {
|
||||
&self.complete_routes
|
||||
}
|
||||
|
||||
pub fn incomplete_routes(&self) -> &Vec<Vec<u32>> {
|
||||
&self.incomplete_routes
|
||||
}
|
||||
|
||||
pub fn finalize() -> Result<Self> {
|
||||
let mut account = NetworkAccount::new();
|
||||
account.find_missing_fragments();
|
||||
account.hydrate_all_fragments()?;
|
||||
Ok(account)
|
||||
}
|
||||
|
||||
pub fn empty_buffers() {
|
||||
FRAGMENTS_SENT.clear();
|
||||
FRAGMENTS_RECEIVED.clear();
|
||||
}
|
||||
|
||||
fn new() -> Self {
|
||||
let topology = TOPOLOGY.get().expect("Topology not set yet!").clone();
|
||||
let mut account = NetworkAccount {
|
||||
topology,
|
||||
..Default::default()
|
||||
};
|
||||
for fragment_set in FRAGMENTS_SENT.iter() {
|
||||
let sent_fragments = fragment_set
|
||||
.value()
|
||||
.first()
|
||||
.map(|f| f.header().total_fragments())
|
||||
.unwrap_or(0);
|
||||
|
||||
debug!(
|
||||
"SENT Fragment set {} has {} fragments",
|
||||
fragment_set.key(),
|
||||
sent_fragments
|
||||
);
|
||||
|
||||
let recv = FRAGMENTS_RECEIVED.get(fragment_set.key());
|
||||
let recv_fragments = recv.as_ref().map(|r| r.value().len()).unwrap_or(0);
|
||||
debug!(
|
||||
"RECV Fragment set {} has {} fragments",
|
||||
fragment_set.key(),
|
||||
recv_fragments
|
||||
);
|
||||
|
||||
// Due to retransmission we can recieve a fragment multiple times
|
||||
if sent_fragments as usize <= recv_fragments {
|
||||
account.push_complete(*fragment_set.key());
|
||||
} else {
|
||||
account.push_incomplete(*fragment_set.key());
|
||||
}
|
||||
}
|
||||
account
|
||||
}
|
||||
|
||||
fn hydrate_route(&self, fragment: SentFragment) -> anyhow::Result<HydratedRoute> {
|
||||
let mut rng = ChaCha8Rng::seed_from_u64(fragment.seed() as u64);
|
||||
let (nodes, gw) = self.topology.random_path_to_gateway(
|
||||
&mut rng,
|
||||
fragment.mixnet_params().hops(),
|
||||
fragment.mixnet_params().destination(),
|
||||
)?;
|
||||
Ok(HydratedRoute {
|
||||
mix_nodes: nodes,
|
||||
gateway_node: gw,
|
||||
})
|
||||
}
|
||||
|
||||
fn hydrate_all_fragments(&mut self) -> Result<()> {
|
||||
for fragment_set in FRAGMENTS_SENT.iter() {
|
||||
let fragment_set_id = fragment_set.key();
|
||||
for fragment in fragment_set.value() {
|
||||
let route = self.hydrate_route(fragment.clone())?;
|
||||
let mix_ids = route
|
||||
.mix_nodes
|
||||
.iter()
|
||||
.map(|n| n.mix_id)
|
||||
.collect::<Vec<u32>>();
|
||||
self.tested_nodes.extend(&mix_ids);
|
||||
self.mix_details
|
||||
.extend(route.mix_nodes.iter().map(|n| (n.mix_id, n.clone())));
|
||||
let gateway_stats_entry = self
|
||||
.gateway_stats
|
||||
.entry(route.gateway_node.identity_key.to_base58_string())
|
||||
.or_insert(GatewayStats::new(0, 0, route.gateway_node.owner.clone()));
|
||||
self.gateway_details.insert(
|
||||
route.gateway_node.identity_key.to_base58_string(),
|
||||
route.gateway_node,
|
||||
);
|
||||
if self.complete_fragment_sets.contains(fragment_set_id) {
|
||||
self.complete_routes.push(mix_ids);
|
||||
gateway_stats_entry.incr_success();
|
||||
} else {
|
||||
self.incomplete_routes.push(mix_ids);
|
||||
gateway_stats_entry.incr_failure();
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn find_missing_fragments(&mut self) {
|
||||
let mut missing_fragments_map = HashMap::new();
|
||||
for fragment_set_id in &self.incomplete_fragment_sets {
|
||||
if let Some(fragment_ref) = FRAGMENTS_RECEIVED.get(fragment_set_id) {
|
||||
if let Some(ref_fragment) = fragment_ref.value().first() {
|
||||
let ref_header = ref_fragment.header();
|
||||
let ref_id_set = (0..ref_header.total_fragments()).collect::<HashSet<u8>>();
|
||||
let recieved_set = fragment_ref
|
||||
.value()
|
||||
.iter()
|
||||
.map(|f| f.header().current_fragment())
|
||||
.collect::<HashSet<u8>>();
|
||||
let missing_fragments = ref_id_set
|
||||
.difference(&recieved_set)
|
||||
.cloned()
|
||||
.collect::<Vec<u8>>();
|
||||
missing_fragments_map.insert(*fragment_set_id, missing_fragments);
|
||||
}
|
||||
};
|
||||
}
|
||||
self.missing_fragments = missing_fragments_map;
|
||||
}
|
||||
|
||||
fn push_complete(&mut self, id: i32) {
|
||||
self.complete_fragment_sets.insert(id);
|
||||
}
|
||||
|
||||
fn push_incomplete(&mut self, id: i32) {
|
||||
self.incomplete_fragment_sets.insert(id);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug, Default, ToSchema)]
|
||||
pub struct NetworkAccountStats {
|
||||
complete_fragment_sets: usize,
|
||||
incomplete_fragment_sets: usize,
|
||||
missing_fragments: usize,
|
||||
complete_routes: usize,
|
||||
incomplete_routes: usize,
|
||||
tested_nodes: usize,
|
||||
}
|
||||
|
||||
impl From<NetworkAccount> for NetworkAccountStats {
|
||||
fn from(account: NetworkAccount) -> Self {
|
||||
NetworkAccountStats {
|
||||
complete_fragment_sets: account.complete_fragment_sets.len(),
|
||||
incomplete_fragment_sets: account.incomplete_fragment_sets.len(),
|
||||
missing_fragments: account.missing_fragments.values().map(|v| v.len()).sum(),
|
||||
complete_routes: account.complete_routes.len(),
|
||||
incomplete_routes: account.incomplete_routes.len(),
|
||||
tested_nodes: account.tested_nodes.len(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug, ToSchema)]
|
||||
pub struct NodeStats {
|
||||
mix_id: u32,
|
||||
complete_routes: usize,
|
||||
incomplete_routes: usize,
|
||||
reliability: f64,
|
||||
identity: String,
|
||||
owner: Option<String>,
|
||||
}
|
||||
|
||||
impl NodeStats {
|
||||
pub fn new(
|
||||
mix_id: u32,
|
||||
complete_routes: usize,
|
||||
incomplete_routes: usize,
|
||||
identity: String,
|
||||
owner: Option<String>,
|
||||
) -> Self {
|
||||
NodeStats {
|
||||
mix_id,
|
||||
complete_routes,
|
||||
incomplete_routes,
|
||||
reliability: complete_routes as f64 / (complete_routes + incomplete_routes) as f64,
|
||||
identity,
|
||||
owner,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn reliability(&self) -> f64 {
|
||||
self.reliability
|
||||
}
|
||||
|
||||
pub fn into_node_results(self) -> NodeResult {
|
||||
NodeResult {
|
||||
node_id: self.mix_id,
|
||||
identity: self.identity,
|
||||
reliability: (self.reliability * 100.) as u8,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn all_node_stats() -> anyhow::Result<Vec<NodeStats>> {
|
||||
let account = NetworkAccount::finalize()?;
|
||||
Ok(account
|
||||
.tested_nodes()
|
||||
.iter()
|
||||
.map(|id| account.node_stats(*id))
|
||||
.collect::<Vec<NodeStats>>())
|
||||
}
|
||||
|
||||
pub async fn monitor_gateway_results() -> anyhow::Result<Vec<NodeResult>> {
|
||||
let account = NetworkAccount::finalize()?;
|
||||
Ok(account
|
||||
.gateway_stats
|
||||
.iter()
|
||||
.map(into_gateway_result)
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub async fn monitor_mixnode_results() -> anyhow::Result<Vec<NodeResult>> {
|
||||
let stats = all_node_stats().await?;
|
||||
Ok(stats
|
||||
.into_iter()
|
||||
.map(NodeStats::into_node_results)
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub async fn submit_metrics() -> anyhow::Result<()> {
|
||||
let node_stats = monitor_mixnode_results().await?;
|
||||
let gateway_stats = monitor_gateway_results().await?;
|
||||
|
||||
info!("Submitting metrics to {}", *NYM_API_URL);
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
let node_submit_url = format!("{}/{API_VERSION}/{STATUS}/{SUBMIT_NODE}", &*NYM_API_URL);
|
||||
let gateway_submit_url = format!("{}/{API_VERSION}/{STATUS}/{SUBMIT_GATEWAY}", &*NYM_API_URL);
|
||||
|
||||
info!("Submitting {} mixnode measurements", node_stats.len());
|
||||
|
||||
node_stats
|
||||
.chunks(10)
|
||||
.map(|chunk| {
|
||||
let monitor_message =
|
||||
MonitorMessage::new(chunk.to_vec(), PRIVATE_KEY.get().expect("We've set this!"));
|
||||
client.post(&node_submit_url).json(&monitor_message).send()
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>()
|
||||
.collect::<Vec<Result<_, _>>>()
|
||||
.await
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
info!("Submitting {} gateway measurements", gateway_stats.len());
|
||||
|
||||
gateway_stats
|
||||
.chunks(10)
|
||||
.map(|chunk| {
|
||||
let monitor_message =
|
||||
MonitorMessage::new(chunk.to_vec(), PRIVATE_KEY.get().expect("We've set this!"));
|
||||
client
|
||||
.post(&gateway_submit_url)
|
||||
.json(&monitor_message)
|
||||
.send()
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>()
|
||||
.collect::<Vec<Result<_, _>>>()
|
||||
.await
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
NetworkAccount::empty_buffers();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn into_gateway_result((key, stats): (&String, &GatewayStats)) -> NodeResult {
|
||||
NodeResult {
|
||||
identity: key.clone(),
|
||||
reliability: (stats.reliability() * 100.) as u8,
|
||||
node_id: 0,
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,295 @@
|
||||
use axum::{
|
||||
extract::{Path, State},
|
||||
http::StatusCode,
|
||||
Json,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use log::{debug, error, warn};
|
||||
use nym_sdk::mixnet::MixnetMessageSender;
|
||||
use nym_sphinx::chunking::{ReceivedFragment, SentFragment, FRAGMENTS_RECEIVED, FRAGMENTS_SENT};
|
||||
use petgraph::{dot::Dot, Graph};
|
||||
use rand::{distributions::Alphanumeric, seq::SliceRandom, Rng};
|
||||
use serde::Serialize;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::time::timeout;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
use crate::{
|
||||
accounting::{all_node_stats, NetworkAccount, NetworkAccountStats, NodeStats},
|
||||
http::AppState,
|
||||
MIXNET_TIMEOUT,
|
||||
};
|
||||
|
||||
#[derive(ToSchema, Serialize)]
|
||||
pub struct FragmentsSent(HashMap<i32, Vec<SentFragment>>);
|
||||
|
||||
#[derive(ToSchema, Serialize)]
|
||||
pub struct FragmentsReceived(HashMap<i32, Vec<ReceivedFragment>>);
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/v1/stats",
|
||||
responses(
|
||||
(status = 200, description = "Returns statistics collected since startup", body = NetworkAccountStats),
|
||||
)
|
||||
)]
|
||||
pub async fn stats_handler() -> Result<Json<NetworkAccountStats>, StatusCode> {
|
||||
let account = NetworkAccount::finalize().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
Ok(Json(account.into()))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/v1/node_stats/{mix_id}",
|
||||
responses(
|
||||
(status = 200, description = "Returns statistics for a given mix_id, collected since startup", body = NodeStats),
|
||||
)
|
||||
)]
|
||||
pub async fn node_stats_handler(Path(mix_id): Path<u32>) -> Result<Json<NodeStats>, StatusCode> {
|
||||
let account = NetworkAccount::finalize().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
Ok(Json(account.node_stats(mix_id)))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/v1/node_stats",
|
||||
responses(
|
||||
(status = 200, description = "Returns statistics for all nodes, collected since startup, sorted by reliability", body = Vec<NodeStats>),
|
||||
)
|
||||
)]
|
||||
pub async fn all_nodes_stats_handler() -> Result<Json<Vec<NodeStats>>, StatusCode> {
|
||||
let mut stats = all_node_stats()
|
||||
.await
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
stats.sort_by(|a, b| a.reliability().partial_cmp(&b.reliability()).unwrap());
|
||||
Ok(Json(stats))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/v1/accounting",
|
||||
responses(
|
||||
(status = 200, description = "Returns raw aggregated data collected since startup", body = NetworkAccount),
|
||||
)
|
||||
)]
|
||||
pub async fn accounting_handler() -> Result<Json<NetworkAccount>, StatusCode> {
|
||||
NetworkAccount::finalize()
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
|
||||
.map(Json)
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/v1/dot/{mix_id}",
|
||||
responses(
|
||||
(status = 200, description = "Returns Subgraph for a given *mix_id* in `dot` format", body = String),
|
||||
)
|
||||
)]
|
||||
pub async fn mix_dot_handler(Path(mix_id): Path<u32>) -> Result<String, StatusCode> {
|
||||
generate_dot(Some(mix_id))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/v1/dot",
|
||||
responses(
|
||||
(status = 200, description = "Returns entire tested network graph in `dot` format", body = String),
|
||||
)
|
||||
)]
|
||||
pub async fn graph_handler() -> Result<String, StatusCode> {
|
||||
generate_dot(None)
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/v1/sent",
|
||||
responses(
|
||||
(status = 200, description = "Returns a map of all fragments sent by the network monitor", body = FragmentsSent),
|
||||
)
|
||||
)]
|
||||
pub async fn sent_handler() -> Json<FragmentsSent> {
|
||||
Json(FragmentsSent(
|
||||
(*FRAGMENTS_SENT)
|
||||
.clone()
|
||||
.into_iter()
|
||||
.collect::<HashMap<_, _>>(),
|
||||
))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/v1/received",
|
||||
responses(
|
||||
(status = 200, description = "Returns a map of all fragments received by the network monitor", body = FragmentsReceived),
|
||||
)
|
||||
)]
|
||||
pub async fn recv_handler() -> Json<FragmentsReceived> {
|
||||
Json(FragmentsReceived(
|
||||
(*FRAGMENTS_RECEIVED)
|
||||
.clone()
|
||||
.into_iter()
|
||||
.collect::<HashMap<_, _>>(),
|
||||
))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
post,
|
||||
path = "/v1/send",
|
||||
responses(
|
||||
(status = 200, description = "Sends a message to itself through the mixnet", body = String),
|
||||
)
|
||||
)]
|
||||
pub async fn send_handler(State(state): State<AppState>) -> Result<String, StatusCode> {
|
||||
send_receive_mixnet(state).await
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/v1/mermaid",
|
||||
responses(
|
||||
(status = 200, description = "Returns entire tested network graph in `mermaid` format", body = String),
|
||||
)
|
||||
)]
|
||||
pub async fn mermaid_handler() -> Result<String, StatusCode> {
|
||||
let account = NetworkAccount::finalize().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
let mut mermaid = String::new();
|
||||
mermaid.push_str("flowchart LR;\n");
|
||||
for route in account.complete_routes() {
|
||||
mermaid.push_str(
|
||||
route
|
||||
.iter()
|
||||
.map(|n| n.to_string())
|
||||
.collect::<Vec<String>>()
|
||||
.join("-->")
|
||||
.as_str(),
|
||||
);
|
||||
mermaid.push('\n')
|
||||
}
|
||||
for route in account.incomplete_routes() {
|
||||
mermaid.push_str(
|
||||
route
|
||||
.iter()
|
||||
.map(|n| n.to_string())
|
||||
.collect::<Vec<String>>()
|
||||
.join("-- ❌ -->")
|
||||
.as_str(),
|
||||
);
|
||||
mermaid.push('\n')
|
||||
}
|
||||
Ok(mermaid)
|
||||
}
|
||||
|
||||
async fn send_receive_mixnet(state: AppState) -> Result<String, StatusCode> {
|
||||
let msg: String = rand::thread_rng()
|
||||
.sample_iter(&Alphanumeric)
|
||||
.take(32)
|
||||
.map(char::from)
|
||||
.collect();
|
||||
let sent_msg = msg.clone();
|
||||
|
||||
let client = {
|
||||
let mut clients = state.clients().write().await;
|
||||
if let Some(client) = clients.make_contiguous().choose(&mut rand::thread_rng()) {
|
||||
Arc::clone(client)
|
||||
} else {
|
||||
error!("No clients currently available");
|
||||
return Err(StatusCode::INTERNAL_SERVER_ERROR);
|
||||
}
|
||||
};
|
||||
|
||||
let recv = Arc::clone(&client);
|
||||
let sender = Arc::clone(&client);
|
||||
|
||||
let recv_handle = tokio::spawn(async move {
|
||||
match timeout(
|
||||
Duration::from_secs(*MIXNET_TIMEOUT.get().expect("Set at the begining")),
|
||||
recv.write().await.next(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(received)) => {
|
||||
debug!("Received: {}", String::from_utf8_lossy(&received.message));
|
||||
}
|
||||
Ok(None) => debug!("No message received"),
|
||||
Err(e) => warn!("Failed to receive message: {e}"),
|
||||
}
|
||||
});
|
||||
|
||||
let send_handle = tokio::spawn(async move {
|
||||
let mixnet_sender = sender.read().await.split_sender();
|
||||
let our_address = *sender.read().await.nym_address();
|
||||
match timeout(
|
||||
Duration::from_secs(5),
|
||||
mixnet_sender.send_plain_message(our_address, &msg),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => debug!("Sent message: {msg}"),
|
||||
Err(e) => warn!("Failed to send message: {e}"),
|
||||
};
|
||||
});
|
||||
|
||||
let results = futures::future::join_all(vec![send_handle, recv_handle]).await;
|
||||
for result in results {
|
||||
match result {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("Failed to send/receive message: {e}");
|
||||
return Err(StatusCode::INTERNAL_SERVER_ERROR);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(sent_msg)
|
||||
}
|
||||
|
||||
fn generate_dot(mix_id: Option<u32>) -> Result<String, StatusCode> {
|
||||
let account = NetworkAccount::finalize().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
let mut nodes = HashSet::new();
|
||||
let mut edges: Vec<(u32, u32)> = vec![];
|
||||
let mut broken_edges: Vec<(u32, u32)> = vec![];
|
||||
|
||||
let mix_id = mix_id.unwrap_or(0);
|
||||
|
||||
for route in account.complete_routes().iter() {
|
||||
if mix_id == 0 || route.contains(&mix_id) {
|
||||
for window in route.windows(2) {
|
||||
nodes.insert(window[0]);
|
||||
nodes.insert(window[1]);
|
||||
edges.push((window[0], window[1]));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for route in account.incomplete_routes().iter() {
|
||||
if mix_id == 0 || route.contains(&mix_id) {
|
||||
for window in route.windows(2) {
|
||||
nodes.insert(window[0]);
|
||||
nodes.insert(window[1]);
|
||||
broken_edges.push((window[0], window[1]));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut graph = Graph::new();
|
||||
|
||||
let node_indices: HashMap<u32, _> = nodes
|
||||
.iter()
|
||||
.map(|node| (*node, graph.add_node(*node)))
|
||||
.collect();
|
||||
|
||||
for (from, to) in edges {
|
||||
graph.add_edge(node_indices[&from], node_indices[&to], "");
|
||||
}
|
||||
|
||||
for (from, to) in broken_edges {
|
||||
graph.add_edge(node_indices[&from], node_indices[&to], "❌");
|
||||
}
|
||||
|
||||
let dot = Dot::new(&graph);
|
||||
Ok(dot.to_string())
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
use crate::accounting::{NetworkAccount, NetworkAccountStats, NodeStats};
|
||||
use crate::handlers::{
|
||||
accounting_handler, all_nodes_stats_handler, graph_handler, mermaid_handler, mix_dot_handler,
|
||||
node_stats_handler, recv_handler, send_handler, sent_handler, stats_handler, FragmentsReceived,
|
||||
FragmentsSent,
|
||||
};
|
||||
use axum::routing::{get, post};
|
||||
use axum::Router;
|
||||
use log::info;
|
||||
use nym_sphinx::chunking::fragment::FragmentHeader;
|
||||
use nym_sphinx::chunking::{ReceivedFragment, SentFragment};
|
||||
use std::net::SocketAddr;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utoipa::OpenApi;
|
||||
use utoipa_swagger_ui::SwaggerUi;
|
||||
|
||||
use crate::ClientsWrapper;
|
||||
|
||||
pub struct HttpServer {
|
||||
listener: SocketAddr,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
#[derive(OpenApi)]
|
||||
#[openapi(
|
||||
paths(
|
||||
crate::handlers::accounting_handler,
|
||||
crate::handlers::graph_handler,
|
||||
crate::handlers::mermaid_handler,
|
||||
crate::handlers::mix_dot_handler,
|
||||
crate::handlers::node_stats_handler,
|
||||
crate::handlers::recv_handler,
|
||||
crate::handlers::send_handler,
|
||||
crate::handlers::sent_handler,
|
||||
crate::handlers::all_nodes_stats_handler,
|
||||
),
|
||||
components(schemas(
|
||||
FragmentHeader,
|
||||
FragmentsReceived,
|
||||
FragmentsSent,
|
||||
NetworkAccount,
|
||||
NetworkAccountStats,
|
||||
NodeStats,
|
||||
ReceivedFragment,
|
||||
SentFragment,
|
||||
))
|
||||
)]
|
||||
struct ApiDoc;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
clients: ClientsWrapper,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
pub fn clients(&self) -> &ClientsWrapper {
|
||||
&self.clients
|
||||
}
|
||||
}
|
||||
|
||||
impl HttpServer {
|
||||
pub fn new(listener: SocketAddr, cancel: CancellationToken) -> Self {
|
||||
HttpServer { listener, cancel }
|
||||
}
|
||||
|
||||
pub async fn run(self, clients: ClientsWrapper) -> anyhow::Result<()> {
|
||||
let n_clients = clients.read().await.len();
|
||||
let state = AppState { clients };
|
||||
let app = Router::new()
|
||||
.route("/v1/send", post(send_handler).with_state(state))
|
||||
.merge(SwaggerUi::new("/v1/ui").url("/v1/docs/openapi.json", ApiDoc::openapi()))
|
||||
.route("/v1/accounting", get(accounting_handler))
|
||||
.route("/v1/sent", get(sent_handler))
|
||||
.route("/v1/dot/:mix_id", get(mix_dot_handler))
|
||||
.route("/v1/dot", get(graph_handler))
|
||||
.route("/v1/mermaid", get(mermaid_handler))
|
||||
.route("/v1/stats", get(stats_handler))
|
||||
.route("/v1/node_stats/:mix_id", get(node_stats_handler))
|
||||
.route("/v1/node_stats", get(all_nodes_stats_handler))
|
||||
.route("/v1/received", get(recv_handler));
|
||||
let listener = tokio::net::TcpListener::bind(self.listener).await?;
|
||||
|
||||
let server_future =
|
||||
axum::serve(listener, app).with_graceful_shutdown(self.cancel.cancelled_owned());
|
||||
|
||||
info!("##########################################################################################");
|
||||
info!("######################### HTTP server running, with {} clients ############################################", n_clients);
|
||||
info!("##########################################################################################");
|
||||
|
||||
server_future.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,214 @@
|
||||
use crate::http::HttpServer;
|
||||
use accounting::submit_metrics;
|
||||
use anyhow::Result;
|
||||
use clap::Parser;
|
||||
use log::{info, warn};
|
||||
use nym_crypto::asymmetric::ed25519::PrivateKey;
|
||||
use nym_network_defaults::setup_env;
|
||||
use nym_network_defaults::var_names::NYM_API;
|
||||
use nym_sdk::mixnet::{self, MixnetClient};
|
||||
use nym_topology::{HardcodedTopologyProvider, NymTopology};
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::sync::LazyLock;
|
||||
use std::time::Duration;
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio::{signal::ctrl_c, sync::RwLock};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
static NYM_API_URL: LazyLock<String> = LazyLock::new(|| {
|
||||
std::env::var(NYM_API).unwrap_or_else(|_| panic!("{} env var not set", NYM_API))
|
||||
});
|
||||
|
||||
static MIXNET_TIMEOUT: OnceCell<u64> = OnceCell::const_new();
|
||||
static TOPOLOGY: OnceCell<NymTopology> = OnceCell::const_new();
|
||||
static PRIVATE_KEY: OnceCell<PrivateKey> = OnceCell::const_new();
|
||||
|
||||
mod accounting;
|
||||
mod handlers;
|
||||
mod http;
|
||||
|
||||
/// Simple program to greet a person
|
||||
pub type ClientsWrapper = Arc<RwLock<VecDeque<Arc<RwLock<MixnetClient>>>>>;
|
||||
|
||||
async fn make_clients(
|
||||
clients: ClientsWrapper,
|
||||
n_clients: usize,
|
||||
lifetime: u64,
|
||||
topology: NymTopology,
|
||||
) {
|
||||
loop {
|
||||
let spawned_clients = clients.read().await.len();
|
||||
info!("Currently spawned clients: {}", spawned_clients);
|
||||
// If we have enough clients, sleep for a minute and remove the oldest one
|
||||
if spawned_clients >= n_clients {
|
||||
info!("New client will be spawned in {} seconds", lifetime);
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(lifetime)).await;
|
||||
info!("Removing oldest client");
|
||||
if let Some(dropped_client) = clients.write().await.pop_front() {
|
||||
loop {
|
||||
if Arc::strong_count(&dropped_client) == 1 {
|
||||
if let Some(client) = Arc::into_inner(dropped_client) {
|
||||
client.into_inner().disconnect().await;
|
||||
} else {
|
||||
warn!("Failed to drop client, client had more then one strong ref")
|
||||
}
|
||||
break;
|
||||
}
|
||||
info!("Client still in use, waiting 2 seconds");
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("Spawning new client");
|
||||
let client = match make_client(topology.clone()).await {
|
||||
Ok(client) => client,
|
||||
Err(err) => {
|
||||
warn!("{}, moving on", err);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
clients
|
||||
.write()
|
||||
.await
|
||||
.push_back(Arc::new(RwLock::new(client)));
|
||||
}
|
||||
}
|
||||
|
||||
async fn make_client(topology: NymTopology) -> Result<MixnetClient> {
|
||||
let net = mixnet::NymNetworkDetails::new_from_env();
|
||||
let topology_provider = Box::new(HardcodedTopologyProvider::new(topology));
|
||||
let mixnet_client = mixnet::MixnetClientBuilder::new_ephemeral()
|
||||
.network_details(net)
|
||||
.custom_topology_provider(topology_provider)
|
||||
// .enable_credentials_mode()
|
||||
.build()?;
|
||||
|
||||
let client = mixnet_client.connect_to_mixnet().await?;
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version, about, long_about = None)]
|
||||
struct Args {
|
||||
/// Number of clients to spawn
|
||||
#[arg(short = 'C', long = "clients", default_value_t = 10)]
|
||||
n_clients: usize,
|
||||
|
||||
/// Lifetime of each client in seconds
|
||||
#[arg(short = 'T', long, default_value_t = 60)]
|
||||
client_lifetime: u64,
|
||||
|
||||
/// Port to listen on
|
||||
#[arg(long, default_value_t = 8080)]
|
||||
port: u16,
|
||||
|
||||
/// Host to listen on
|
||||
#[arg(long, default_value = "127.0.0.1")]
|
||||
host: String,
|
||||
|
||||
/// Path to the topology file
|
||||
#[arg(short, long, default_value = None)]
|
||||
topology: Option<String>,
|
||||
|
||||
/// Path to the environment file
|
||||
#[arg(short, long, default_value = None)]
|
||||
env: Option<String>,
|
||||
|
||||
#[arg(short, long, default_value_t = 10)]
|
||||
mixnet_timeout: u64,
|
||||
|
||||
#[arg(long, default_value_t = false)]
|
||||
generate_key_pair: bool,
|
||||
|
||||
#[arg(long)]
|
||||
private_key: String,
|
||||
}
|
||||
|
||||
fn generate_key_pair() -> Result<()> {
|
||||
let mut rng = rand::thread_rng();
|
||||
let keypair = nym_crypto::asymmetric::identity::KeyPair::new(&mut rng);
|
||||
|
||||
let mut public_key_file = File::create("network-monitor-public")?;
|
||||
public_key_file.write_all(keypair.public_key().to_base58_string().as_bytes())?;
|
||||
|
||||
let mut private_key_file = File::create("network-monitor-private")?;
|
||||
private_key_file.write_all(keypair.private_key().to_base58_string().as_bytes())?;
|
||||
|
||||
info!("Generated keypair, public key to 'network-monitor-public', and private key to 'network-monitor-private', public key should be whitelisted with the nym-api");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
nym_bin_common::logging::setup_logging();
|
||||
|
||||
let args = Args::parse();
|
||||
|
||||
setup_env(args.env); // Defaults to mainnet if empty
|
||||
|
||||
let cancel_token = CancellationToken::new();
|
||||
let server_cancel_token = cancel_token.clone();
|
||||
let clients = Arc::new(RwLock::new(VecDeque::with_capacity(args.n_clients)));
|
||||
|
||||
if args.generate_key_pair {
|
||||
generate_key_pair()?;
|
||||
std::process::exit(0);
|
||||
}
|
||||
|
||||
let pk = PrivateKey::from_base58_string(&args.private_key)?;
|
||||
PRIVATE_KEY.set(pk).ok();
|
||||
|
||||
TOPOLOGY
|
||||
.set(if let Some(topology_file) = args.topology {
|
||||
NymTopology::new_from_file(topology_file)?
|
||||
} else {
|
||||
NymTopology::new_from_env().await?
|
||||
})
|
||||
.ok();
|
||||
|
||||
MIXNET_TIMEOUT.set(args.mixnet_timeout).ok();
|
||||
|
||||
let spawn_clients = Arc::clone(&clients);
|
||||
tokio::spawn(make_clients(
|
||||
spawn_clients,
|
||||
args.n_clients,
|
||||
args.client_lifetime,
|
||||
TOPOLOGY.get().expect("Topology not set yet!").clone(),
|
||||
));
|
||||
|
||||
let server_handle = tokio::spawn(async move {
|
||||
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_str(&args.host)?), args.port);
|
||||
let server = HttpServer::new(socket, server_cancel_token);
|
||||
server.run(clients).await
|
||||
});
|
||||
|
||||
info!("Waiting for message (ctrl-c to exit)");
|
||||
|
||||
loop {
|
||||
match tokio::time::timeout(Duration::from_secs(600), ctrl_c()).await {
|
||||
Ok(_) => {
|
||||
info!("Received kill signal, shutting down, submitting final batch of metrics");
|
||||
submit_metrics().await?;
|
||||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
info!("Submitting metrics, cleaning metric buffers");
|
||||
submit_metrics().await?;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
cancel_token.cancel();
|
||||
|
||||
server_handle.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -625,7 +625,8 @@ where
|
||||
///
|
||||
/// - If the client is already registered with a gateway, use that gateway.
|
||||
/// - If no gateway is registered, but there is an existing configuration and key, use that.
|
||||
/// - If no gateway is registered, and there is no pre-existing configuration or key, try to register a new gateway.
|
||||
/// - If no gateway is registered, and there is no pre-existing configuration or key, try to
|
||||
/// register a new gateway.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
@@ -705,7 +706,8 @@ where
|
||||
///
|
||||
/// - If the client is already registered with a gateway, use that gateway.
|
||||
/// - If no gateway is registered, but there is an existing configuration and key, use that.
|
||||
/// - If no gateway is registered, and there is no pre-existing configuration or key, try to register a new gateway.
|
||||
/// - If no gateway is registered, and there is no pre-existing configuration or key, try to
|
||||
/// register a new gateway.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
@@ -727,7 +729,8 @@ where
|
||||
let (mut started_client, nym_address) = self.connect_to_mixnet_common().await?;
|
||||
let client_input = started_client.client_input.register_producer();
|
||||
let mut client_output = started_client.client_output.register_consumer();
|
||||
let client_state = started_client.client_state;
|
||||
let client_state: nym_client_core::client::base_client::ClientState =
|
||||
started_client.client_state;
|
||||
|
||||
let identity_keys = started_client.identity_keys.clone();
|
||||
let reconstructed_receiver = client_output.register_receiver()?;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::path::Path;
|
||||
use std::{net::IpAddr, path::Path, time::SystemTime};
|
||||
|
||||
use futures::channel::oneshot;
|
||||
use ipnetwork::IpNetwork;
|
||||
@@ -31,6 +31,7 @@ pub struct Authenticator {
|
||||
custom_topology_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
|
||||
custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send + Sync>>,
|
||||
wireguard_gateway_data: WireguardGatewayData,
|
||||
used_private_network_ips: Vec<IpAddr>,
|
||||
response_rx: UnboundedReceiver<PeerControlResponse>,
|
||||
shutdown: Option<TaskClient>,
|
||||
on_start: Option<oneshot::Sender<OnStartData>>,
|
||||
@@ -40,6 +41,7 @@ impl Authenticator {
|
||||
pub fn new(
|
||||
config: Config,
|
||||
wireguard_gateway_data: WireguardGatewayData,
|
||||
used_private_network_ips: Vec<IpAddr>,
|
||||
response_rx: UnboundedReceiver<PeerControlResponse>,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -48,6 +50,7 @@ impl Authenticator {
|
||||
custom_topology_provider: None,
|
||||
custom_gateway_transceiver: None,
|
||||
wireguard_gateway_data,
|
||||
used_private_network_ips,
|
||||
response_rx,
|
||||
shutdown: None,
|
||||
on_start: None,
|
||||
@@ -128,13 +131,26 @@ impl Authenticator {
|
||||
|
||||
let self_address = *mixnet_client.nym_address();
|
||||
|
||||
let used_private_network_ips =
|
||||
std::collections::BTreeSet::from_iter(self.used_private_network_ips.iter());
|
||||
let private_ip_network = IpNetwork::new(
|
||||
self.config.authenticator.private_ip,
|
||||
self.config.authenticator.private_network_prefix,
|
||||
)?;
|
||||
let now = SystemTime::now();
|
||||
let free_private_network_ips = private_ip_network
|
||||
.iter()
|
||||
.map(|ip| {
|
||||
if used_private_network_ips.contains(&ip) {
|
||||
(ip, Some(now))
|
||||
} else {
|
||||
(ip, None)
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let mixnet_listener = crate::mixnet_listener::MixnetListener::new(
|
||||
self.config,
|
||||
private_ip_network,
|
||||
free_private_network_ips,
|
||||
self.wireguard_gateway_data,
|
||||
self.response_rx,
|
||||
mixnet_client,
|
||||
|
||||
@@ -55,7 +55,7 @@ pub(crate) async fn execute(args: &Run) -> Result<(), AuthenticatorError> {
|
||||
handler.run().await;
|
||||
});
|
||||
let mut server =
|
||||
nym_authenticator::Authenticator::new(config, wireguard_gateway_data, response_rx);
|
||||
nym_authenticator::Authenticator::new(config, wireguard_gateway_data, vec![], response_rx);
|
||||
if let Some(custom_mixnet) = &args.common_args.custom_mixnet {
|
||||
server = server.with_stored_topology(custom_mixnet)?
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ use std::{
|
||||
|
||||
use crate::{error::AuthenticatorError, peer_manager::PeerManager};
|
||||
use futures::StreamExt;
|
||||
use ipnetwork::IpNetwork;
|
||||
use nym_authenticator_requests::v1::{
|
||||
self,
|
||||
request::{AuthenticatorRequest, AuthenticatorRequestData},
|
||||
@@ -67,7 +66,7 @@ pub(crate) struct MixnetListener {
|
||||
impl MixnetListener {
|
||||
pub fn new(
|
||||
config: Config,
|
||||
private_ip_network: IpNetwork,
|
||||
free_private_network_ips: PrivateIPs,
|
||||
wireguard_gateway_data: WireguardGatewayData,
|
||||
response_rx: UnboundedReceiver<PeerControlResponse>,
|
||||
mixnet_client: nym_sdk::mixnet::MixnetClient,
|
||||
@@ -75,7 +74,6 @@ impl MixnetListener {
|
||||
) -> Self {
|
||||
let timeout_check_interval =
|
||||
IntervalStream::new(tokio::time::interval(DEFAULT_REGISTRATION_TIMEOUT_CHECK));
|
||||
let free_private_network_ips = private_ip_network.iter().map(|ip| (ip, None)).collect();
|
||||
MixnetListener {
|
||||
config,
|
||||
mixnet_client,
|
||||
|
||||
@@ -176,7 +176,7 @@ impl DaemonLauncher {
|
||||
info!("it finished with the following exit status: {exit_status}");
|
||||
return Ok(false)
|
||||
}
|
||||
event = &mut self.upgrade_plan_watcher.next() => {
|
||||
event = self.upgrade_plan_watcher.next() => {
|
||||
let Some(event) = event else {
|
||||
// this is a critical failure since the file watcher task should NEVER terminate by itself
|
||||
error!("CRITICAL FAILURE: the upgrade plan watcher channel got closed");
|
||||
|
||||
Reference in New Issue
Block a user