Compare commits

...

28 Commits

Author SHA1 Message Date
Simon Wicky 4b3dcb6eca Feature Noise XKpsk3 : Fix localnet (#4465)
* add custom mixnet option for gateways

* add custom mixnet option for mixnode

* change topology construction as to not miss described_nodes

* adapt localnet scripts to new topology

* add snake_case alias to described_nodes
2024-03-19 15:49:29 +01:00
Simon Wicky 5a9c391ba2 change license 2024-03-13 11:35:47 +01:00
Simon Wicky 011be0d31b remove outdated comments 2024-03-13 11:16:55 +01:00
Simon Wicky 4d273ea613 change to psk structure and port checking to allow same ip 2024-03-12 12:18:25 +01:00
Simon Wicky 6b1657adae apply jstuczyn suggestion 2024-03-12 12:18:25 +01:00
Simon Wicky 3846d3c9b5 comment change according to review 2024-03-12 12:18:25 +01:00
Simon Wicky 2e12c20327 update cargo lock 2024-03-12 12:18:25 +01:00
Simon Wicky a0e3610681 change key retrieval to use bond info 2024-03-12 12:18:25 +01:00
Simon Wicky 30a1f5582a change mix_host in Node to a vec 2024-03-12 12:18:25 +01:00
Simon Wicky 7ec35fe010 Merge commit '0e56d8c2f733f37ae5cdeace779b58083b12baeb' into simon/noise_nodes 2024-03-11 09:32:09 +01:00
Simon Wicky 49588e0abc Merge branch 'develop' into simon/noise_nodes 2024-03-04 17:08:25 +01:00
Simon Wicky e57fcf4a0e Merge branch 'develop' into simon/noise_nodes 2024-02-14 09:34:52 +01:00
Simon Wicky e2fa1ae986 apply octlol's type suggestion 2024-02-14 09:20:52 +01:00
Simon Wicky 293cf2fd3f apply some of octlol's suggestion 2024-02-14 09:20:52 +01:00
Simon Wicky 06f1271a25 enable noise support in self described API 2024-02-14 09:20:52 +01:00
Simon Wicky e4425f9136 add noise connection in nodes 2024-02-14 09:20:52 +01:00
Simon Wicky 54feb9ea0a add main noise logic 2024-02-14 09:20:52 +01:00
Simon Wicky a16b21ce3c cargo fmt 2024-02-14 09:20:52 +01:00
Simon Wicky 6924f29d32 changes on nym-connect and wasm client 2024-02-14 09:20:52 +01:00
Simon Wicky fbbd634a39 apply octlol suggestion 2024-02-14 09:20:52 +01:00
Simon Wicky 8c29fba5ef move topology control into its own crate 2024-02-14 09:20:52 +01:00
Simon Wicky 0a826b8146 add topology access to nodes 2024-02-14 09:20:52 +01:00
Simon Wicky 580457aeb1 add epoch in validator client 2024-02-14 09:20:52 +01:00
Simon Wicky d4a2be8a9e augment topology with described nodes 2024-02-14 09:20:52 +01:00
Simon Wicky 7363cdab68 apply octlol unwrap or default suggestion 2024-02-06 09:57:54 +01:00
Simon Wicky 59087413c7 adapt noise support to PR chain merge order 2024-01-30 11:04:19 +01:00
Simon Wicky 0d841e5c75 augment Nym-API described with Noise Info 2024-01-30 09:38:41 +01:00
Simon Wicky 786ff31ce5 augment self described API with NoiseInformation field 2024-01-30 09:37:56 +01:00
69 changed files with 1875 additions and 199 deletions
Generated
+63 -8
View File
@@ -5215,6 +5215,7 @@ dependencies = [
"nym-sphinx",
"nym-task",
"nym-topology",
"nym-topology-control",
"nym-validator-client",
"rand 0.7.3",
"reqwest",
@@ -5538,10 +5539,13 @@ dependencies = [
"nym-network-defaults",
"nym-network-requester",
"nym-node",
"nym-noise",
"nym-pemstore",
"nym-sphinx",
"nym-statistics-common",
"nym-task",
"nym-topology",
"nym-topology-control",
"nym-types",
"nym-validator-client",
"nym-wireguard",
@@ -5723,10 +5727,16 @@ version = "0.1.0"
dependencies = [
"futures",
"log",
"nym-crypto",
"nym-noise",
"nym-sphinx",
"nym-task",
"nym-topology-control",
"nym-validator-client",
"rand 0.7.3",
"tokio",
"tokio-util",
"url",
]
[[package]]
@@ -5773,6 +5783,7 @@ dependencies = [
"nym-mixnet-client",
"nym-mixnode-common",
"nym-node",
"nym-noise",
"nym-nonexhaustive-delayqueue",
"nym-pemstore",
"nym-sphinx",
@@ -5780,6 +5791,7 @@ dependencies = [
"nym-sphinx-types",
"nym-task",
"nym-topology",
"nym-topology-control",
"nym-types",
"nym-validator-client",
"opentelemetry",
@@ -6030,6 +6042,24 @@ dependencies = [
"wasmtimer",
]
[[package]]
name = "nym-noise"
version = "0.1.0"
dependencies = [
"bytes",
"futures",
"log",
"nym-crypto",
"nym-topology",
"pin-project",
"semver 0.11.0",
"sha2 0.10.8",
"snow",
"thiserror",
"tokio",
"tokio-util",
]
[[package]]
name = "nym-nonexhaustive-delayqueue"
version = "0.1.0"
@@ -6122,6 +6152,7 @@ dependencies = [
"nym-sphinx",
"nym-task",
"nym-topology",
"nym-topology-control",
"nym-validator-client",
"parking_lot 0.12.1",
"pretty_env_logger",
@@ -6187,6 +6218,7 @@ dependencies = [
"nym-socks5-client-core",
"nym-sphinx",
"nym-topology",
"nym-topology-control",
"rand 0.7.3",
"serde",
"serde_json",
@@ -6499,6 +6531,29 @@ dependencies = [
"wasm-utils",
]
[[package]]
name = "nym-topology-control"
version = "0.1.0"
dependencies = [
"async-trait",
"futures",
"gloo-timers",
"log",
"nym-explorer-client",
"nym-network-defaults",
"nym-sphinx",
"nym-task",
"nym-topology",
"nym-validator-client",
"rand 0.7.3",
"serde",
"tap",
"tokio",
"tokio-stream",
"url",
"wasmtimer",
]
[[package]]
name = "nym-tun"
version = "0.1.0"
@@ -6814,9 +6869,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.98"
version = "0.9.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1665caf8ab2dc9aef43d1c0023bd904633a6a05cb30b0ad59bec2ae986e57a7"
checksum = "db4d56a4c0478783083cfafcc42493dd4a981d41669da64b4572a2a089b51b1d"
dependencies = [
"cc",
"libc",
@@ -7564,9 +7619,9 @@ dependencies = [
[[package]]
name = "psl"
version = "2.1.14"
version = "2.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "383703acfc34f7a00724846c14dc5ea4407c59e5aedcbbb18a1c0c1a23fe5013"
checksum = "fa35143bed048dcb22457ef82f8ba3008b842e9158e2cfcc904f5a4e2571cd4c"
dependencies = [
"psl-types",
]
@@ -8808,9 +8863,9 @@ dependencies = [
[[package]]
name = "serde_with"
version = "3.4.0"
version = "3.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64cd236ccc1b7a29e7e2739f27c0b2dd199804abc4290e32f59f3b68d6405c23"
checksum = "f5c9fdb6b00a489875b22efd4b78fe2b363b72265cc5f6eb2e2b9ee270e6140c"
dependencies = [
"base64 0.21.4",
"chrono",
@@ -8825,9 +8880,9 @@ dependencies = [
[[package]]
name = "serde_with_macros"
version = "3.4.0"
version = "3.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93634eb5f75a2323b16de4748022ac4297f9e76b6dced2be287a099f41b5e788"
checksum = "dbff351eb4b33600a2e138dfa0b10b65a238ea8ff8fb2387c422c5022a3e8298"
dependencies = [
"darling 0.20.3",
"proc-macro2",
+1
View File
@@ -34,6 +34,7 @@ nym-sphinx = { path = "../../common/nymsphinx" }
nym-ordered-buffer = { path = "../../common/socks5/ordered-buffer" }
nym-pemstore = { path = "../../common/pemstore" }
nym-topology = { path = "../../common/topology" }
nym-topology-control = { path = "../../common/topology-control" }
nym-socks5-client-core = { path = "../../common/socks5-client-core" }
nym-id = { path = "../../common/nym-id" }
+2 -2
View File
@@ -16,11 +16,11 @@ use nym_client_core::client::base_client::storage::gateway_details::{
OnDiskGatewayDetails, PersistedGatewayDetails,
};
use nym_client_core::client::key_manager::persistence::OnDiskKeys;
use nym_client_core::client::topology_control::geo_aware_provider::CountryGroup;
use nym_client_core::config::{GatewayEndpointConfig, GroupBy, TopologyStructure};
use nym_client_core::config::{GatewayEndpointConfig, TopologyStructure};
use nym_client_core::error::ClientCoreError;
use nym_config::OptionalSet;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_topology_control::geo_aware_provider::{CountryGroup, GroupBy};
use std::error::Error;
use std::net::IpAddr;
use std::sync::OnceLock;
+1 -1
View File
@@ -12,9 +12,9 @@ use log::*;
use nym_bin_common::version_checker::is_minor_version_compatible;
use nym_client_core::cli_helpers::client_run::CommonClientRunArgs;
use nym_client_core::client::base_client::storage::OnDiskPersistent;
use nym_client_core::client::topology_control::geo_aware_provider::CountryGroup;
use nym_socks5_client_core::NymClient;
use nym_sphinx::addressing::clients::Recipient;
use nym_topology_control::geo_aware_provider::CountryGroup;
use std::net::IpAddr;
#[derive(Args, Clone)]
+1
View File
@@ -42,6 +42,7 @@ nym-nonexhaustive-delayqueue = { path = "../nonexhaustive-delayqueue" }
nym-sphinx = { path = "../nymsphinx" }
nym-pemstore = { path = "../pemstore" }
nym-topology = { path = "../topology", features = ["serializable"] }
nym-topology-control = { path = "../topology-control" }
nym-validator-client = { path = "../client-libs/validator-client", default-features = false }
nym-task = { path = "../task" }
nym-credential-storage = { path = "../credential-storage" }
@@ -3,7 +3,6 @@
use super::packet_statistics_control::PacketStatisticsReporter;
use super::received_buffer::ReceivedBufferMessage;
use super::topology_control::geo_aware_provider::GeoAwareTopologyProvider;
use crate::client::base_client::storage::gateway_details::GatewayDetailsStore;
use crate::client::base_client::storage::MixnetClientStorage;
use crate::client::cover_traffic_stream::LoopCoverTrafficStream;
@@ -22,10 +21,6 @@ use crate::client::replies::reply_controller::{ReplyControllerReceiver, ReplyCon
use crate::client::replies::reply_storage::{
CombinedReplyStorage, PersistentReplyStorage, ReplyStorageBackend, SentReplyKeys,
};
use crate::client::topology_control::nym_api_provider::NymApiTopologyProvider;
use crate::client::topology_control::{
TopologyAccessor, TopologyRefresher, TopologyRefresherConfig,
};
use crate::config::{Config, DebugConfig};
use crate::error::ClientCoreError;
use crate::init::{
@@ -50,6 +45,9 @@ use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender,
use nym_task::{TaskClient, TaskHandle};
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::HardcodedTopologyProvider;
use nym_topology_control::geo_aware_provider::GeoAwareTopologyProvider;
use nym_topology_control::nym_api_provider::NymApiTopologyProvider;
use nym_topology_control::{TopologyAccessor, TopologyRefresher, TopologyRefresherConfig};
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use std::fmt::Debug;
use std::os::raw::c_int as RawFd;
@@ -3,7 +3,6 @@
use crate::client::mix_traffic::BatchMixMessageSender;
use crate::client::packet_statistics_control::{PacketStatisticsEvent, PacketStatisticsReporter};
use crate::client::topology_control::TopologyAccessor;
use crate::{config, spawn_future};
use futures::task::{Context, Poll};
use futures::{Future, Stream, StreamExt};
@@ -13,6 +12,7 @@ use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::cover::generate_loop_cover_packet;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::utils::sample_poisson_duration;
use nym_topology_control::TopologyAccessor;
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
-1
View File
@@ -11,5 +11,4 @@ pub(crate) mod packet_statistics_control;
pub mod real_messages_control;
pub mod received_buffer;
pub mod replies;
pub mod topology_control;
pub(crate) mod transmission_buffer;
@@ -7,7 +7,6 @@ use crate::client::real_messages_control::real_traffic_stream::{
};
use crate::client::real_messages_control::{AckActionSender, Action};
use crate::client::replies::reply_storage::{ReceivedReplySurbsMap, SentReplyKeys, UsedSenderTags};
use crate::client::topology_control::{TopologyAccessor, TopologyReadPermit};
use log::{debug, error, info, trace, warn};
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
@@ -20,6 +19,7 @@ use nym_sphinx::preparer::{MessagePreparer, PreparedFragment};
use nym_sphinx::Delay;
use nym_task::connections::TransmissionLane;
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology_control::{TopologyAccessor, TopologyReadPermit};
use rand::{CryptoRng, Rng};
use std::collections::HashMap;
use std::sync::Arc;
@@ -17,7 +17,6 @@ use crate::{
client::{
inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender,
real_messages_control::acknowledgement_control::AcknowledgementControllerConnectors,
topology_control::TopologyAccessor,
},
spawn_future,
};
@@ -28,6 +27,7 @@ use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::params::PacketType;
use nym_task::connections::{ConnectionCommandReceiver, LaneQueueLengths};
use nym_topology_control::TopologyAccessor;
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::sync::Arc;
@@ -5,7 +5,6 @@ use self::sending_delay_controller::SendingDelayController;
use crate::client::mix_traffic::BatchMixMessageSender;
use crate::client::packet_statistics_control::{PacketStatisticsEvent, PacketStatisticsReporter};
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
use crate::client::topology_control::TopologyAccessor;
use crate::client::transmission_buffer::TransmissionBuffer;
use crate::config;
use futures::task::{Context, Poll};
@@ -22,6 +21,7 @@ use nym_sphinx::utils::sample_poisson_duration;
use nym_task::connections::{
ConnectionCommand, ConnectionCommandReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
};
use nym_topology_control::TopologyAccessor;
use rand::{CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
+3 -21
View File
@@ -1,14 +1,12 @@
// Copyright 2021-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{client::topology_control::geo_aware_provider::CountryGroup, error::ClientCoreError};
use crate::error::ClientCoreError;
use nym_config::defaults::NymNetworkDetails;
use nym_crypto::asymmetric::identity;
use nym_gateway_client::client::GatewayConfig;
use nym_sphinx::{
addressing::clients::Recipient,
params::{PacketSize, PacketType},
};
use nym_sphinx::params::{PacketSize, PacketType};
use nym_topology_control::geo_aware_provider::GroupBy;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use url::Url;
@@ -541,22 +539,6 @@ pub enum TopologyStructure {
GeoAware(GroupBy),
}
#[allow(clippy::large_enum_variant)]
#[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum GroupBy {
CountryGroup(CountryGroup),
NymAddress(Recipient),
}
impl std::fmt::Display for GroupBy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
GroupBy::CountryGroup(group) => write!(f, "group: {}", group),
GroupBy::NymAddress(address) => write!(f, "address: {}", address),
}
}
}
impl Default for Topology {
fn default() -> Self {
Topology {
@@ -1,15 +1,15 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::topology_control::geo_aware_provider::CountryGroup;
use crate::config::{
Acknowledgements, Client, Config, CoverTraffic, DebugConfig, GatewayConnection, GroupBy,
ReplySurbs, Topology, TopologyStructure, Traffic,
Acknowledgements, Client, Config, CoverTraffic, DebugConfig, GatewayConnection, ReplySurbs,
Topology, TopologyStructure, Traffic,
};
use nym_sphinx::{
addressing::clients::Recipient,
params::{PacketSize, PacketType},
};
use nym_topology_control::geo_aware_provider::{CountryGroup, GroupBy};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use url::Url;
@@ -16,3 +16,11 @@ tokio-util = { workspace = true, features = ["codec"] }
# internal
nym-sphinx = { path = "../../nymsphinx" }
nym-task = { path = "../../task" }
nym-topology-control = { path = "../../topology-control" }
nym-noise = { path = "../../nymnoise"}
nym-crypto = { path = "../../crypto" }
nym-validator-client = { path = "../validator-client"}
[dev-dependencies]
url = { workspace = true }
rand = "0.7.3"
+87 -22
View File
@@ -4,11 +4,15 @@
use futures::channel::mpsc;
use futures::StreamExt;
use log::*;
use nym_crypto::asymmetric::encryption;
use nym_noise::upgrade_noise_initiator_with_topology;
use nym_sphinx::addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx::framing::codec::NymCodec;
use nym_sphinx::framing::packet::FramedNymPacket;
use nym_sphinx::params::PacketType;
use nym_sphinx::NymPacket;
use nym_topology_control::accessor::TopologyAccessor;
use nym_validator_client::NymApiClient;
use std::collections::HashMap;
use std::io;
use std::net::SocketAddr;
@@ -59,6 +63,9 @@ pub trait SendWithoutResponse {
pub struct Client {
conn_new: HashMap<NymNodeRoutingAddress, ConnectionSender>,
config: Config,
topology_access: TopologyAccessor,
api_client: NymApiClient,
local_identity: Arc<encryption::KeyPair>,
}
struct ConnectionSender {
@@ -76,10 +83,18 @@ impl ConnectionSender {
}
impl Client {
pub fn new(config: Config) -> Client {
pub fn new(
config: Config,
topology_access: TopologyAccessor,
api_client: NymApiClient,
local_identity: Arc<encryption::KeyPair>,
) -> Client {
Client {
conn_new: HashMap::new(),
config,
topology_access,
api_client,
local_identity,
}
}
@@ -88,25 +103,60 @@ impl Client {
receiver: mpsc::Receiver<FramedNymPacket>,
connection_timeout: Duration,
current_reconnection: &AtomicU32,
topology_access: TopologyAccessor,
api_client: NymApiClient,
local_identity: Arc<encryption::KeyPair>,
) {
let connection_fut = TcpStream::connect(address);
let conn = match tokio::time::timeout(connection_timeout, connection_fut).await {
Ok(stream_res) => match stream_res {
Ok(stream) => {
debug!("Managed to establish connection to {}", address);
// if we managed to connect, reset the reconnection count (whatever it might have been)
current_reconnection.store(0, Ordering::Release);
Framed::new(stream, NymCodec)
Ok(stream_res) => {
match stream_res {
Ok(stream) => {
debug!("Managed to establish connection to {}", address);
// if we managed to connect, reset the reconnection count (whatever it might have been)
current_reconnection.store(0, Ordering::Release);
//Get the topology, because we need the keys for the handshake
let Some(topology) = topology_access.current_topology().await else {
error!("Cannot perform Noise handshake to {address}, due to topology error");
return;
};
let epoch_id = match api_client.get_current_epoch_id().await {
Ok(id) => id,
Err(err) => {
error!("Cannot perform Noise handshake to {address}, due to epoch id error - {err}");
return;
}
};
let noise_stream = match upgrade_noise_initiator_with_topology(
stream,
Default::default(),
&topology,
epoch_id,
local_identity.private_key(),
)
.await
{
Ok(noise_stream) => noise_stream,
Err(err) => {
error!("Failed to perform Noise handshake with {address} - {err}");
return;
}
};
debug!("Noise initiator handshake completed for {:?}", address);
Framed::new(noise_stream, NymCodec)
}
Err(err) => {
debug!(
"failed to establish connection to {} (err: {})",
address, err
);
return;
}
}
Err(err) => {
debug!(
"failed to establish connection to {} (err: {})",
address, err
);
return;
}
},
}
Err(_) => {
debug!(
"failed to connect to {} within {:?}",
@@ -175,6 +225,10 @@ impl Client {
// copy the value before moving into another task
let initial_connection_timeout = self.config.initial_connection_timeout;
let topology_access_clone = self.topology_access.clone();
let api_client_clone = self.api_client.clone();
let local_id_key = self.local_identity.clone();
tokio::spawn(async move {
// before executing the manager, wait for what was specified, if anything
if let Some(backoff) = backoff {
@@ -187,6 +241,9 @@ impl Client {
receiver,
initial_connection_timeout,
&current_reconnection_attempt,
topology_access_clone,
api_client_clone,
local_id_key,
)
.await
});
@@ -253,15 +310,23 @@ impl SendWithoutResponse for Client {
#[cfg(test)]
mod tests {
use super::*;
use rand::rngs::OsRng;
use url::Url;
fn dummy_client() -> Client {
Client::new(Config {
initial_reconnection_backoff: Duration::from_millis(10_000),
maximum_reconnection_backoff: Duration::from_millis(300_000),
initial_connection_timeout: Duration::from_millis(1_500),
maximum_connection_buffer_size: 128,
use_legacy_version: false,
})
let mut rng = OsRng;
Client::new(
Config {
initial_reconnection_backoff: Duration::from_millis(10_000),
maximum_reconnection_backoff: Duration::from_millis(300_000),
initial_connection_timeout: Duration::from_millis(1_500),
maximum_connection_buffer_size: 128,
use_legacy_version: false,
},
TopologyAccessor::new(),
NymApiClient::new(Url::parse("http://dummy.url").unwrap()),
Arc::new(encryption::KeyPair::new(&mut rng)),
)
}
#[test]
@@ -5,8 +5,11 @@ use crate::client::{Client, Config, SendWithoutResponse};
use futures::channel::mpsc;
use futures::StreamExt;
use log::*;
use nym_crypto::asymmetric::encryption;
use nym_sphinx::forwarding::packet::MixPacket;
use std::time::Duration;
use nym_topology_control::accessor::TopologyAccessor;
use nym_validator_client::NymApiClient;
use std::sync::Arc;
pub type MixForwardingSender = mpsc::UnboundedSender<MixPacket>;
type MixForwardingReceiver = mpsc::UnboundedReceiver<MixPacket>;
@@ -21,26 +24,22 @@ pub struct PacketForwarder {
impl PacketForwarder {
pub fn new(
initial_reconnection_backoff: Duration,
maximum_reconnection_backoff: Duration,
initial_connection_timeout: Duration,
maximum_connection_buffer_size: usize,
use_legacy_version: bool,
client_config: Config,
topology_access: TopologyAccessor,
api_client: NymApiClient,
local_identity: Arc<encryption::KeyPair>,
shutdown: nym_task::TaskClient,
) -> (PacketForwarder, MixForwardingSender) {
let client_config = Config::new(
initial_reconnection_backoff,
maximum_reconnection_backoff,
initial_connection_timeout,
maximum_connection_buffer_size,
use_legacy_version,
);
let (packet_sender, packet_receiver) = mpsc::unbounded();
(
PacketForwarder {
mixnet_client: Client::new(client_config),
mixnet_client: Client::new(
client_config,
topology_access,
api_client,
local_identity,
),
packet_receiver,
shutdown,
},
@@ -13,7 +13,7 @@ use nym_api_requests::coconut::{
BlindSignRequestBody, BlindedSignatureResponse, FreePassRequest, VerifyCredentialBody,
VerifyCredentialResponse,
};
use nym_api_requests::models::{DescribedGateway, MixNodeBondAnnotated};
use nym_api_requests::models::{DescribedGateway, DescribedNymNode, MixNodeBondAnnotated};
use nym_api_requests::models::{
GatewayCoreStatusResponse, MixnodeCoreStatusResponse, MixnodeStatusResponse,
RewardEstimationResponse, StakeSaturationResponse,
@@ -291,6 +291,18 @@ impl NymApiClient {
Ok(self.nym_api.get_gateways_described().await?)
}
pub async fn get_cached_described_nodes(
&self,
) -> Result<Vec<DescribedNymNode>, ValidatorClientError> {
Ok(self.nym_api.get_nym_nodes_described().await?)
}
pub async fn get_current_epoch_id(
&self,
) -> Result<nym_mixnet_contract_common::EpochId, ValidatorClientError> {
Ok(self.nym_api.get_current_epoch().await?.current_epoch_id())
}
pub async fn get_gateway_core_status_count(
&self,
identity: IdentityKeyRef<'_>,
@@ -15,16 +15,16 @@ pub use nym_api_requests::{
VerifyCredentialBody, VerifyCredentialResponse,
},
models::{
ComputeRewardEstParam, DescribedGateway, GatewayBondAnnotated, GatewayCoreStatusResponse,
GatewayStatusReportResponse, GatewayUptimeHistoryResponse, InclusionProbabilityResponse,
MixNodeBondAnnotated, MixnodeCoreStatusResponse, MixnodeStatusReportResponse,
MixnodeStatusResponse, MixnodeUptimeHistoryResponse, RewardEstimationResponse,
StakeSaturationResponse, UptimeResponse,
ComputeRewardEstParam, DescribedGateway, DescribedNymNode, GatewayBondAnnotated,
GatewayCoreStatusResponse, GatewayStatusReportResponse, GatewayUptimeHistoryResponse,
InclusionProbabilityResponse, MixNodeBondAnnotated, MixnodeCoreStatusResponse,
MixnodeStatusReportResponse, MixnodeStatusResponse, MixnodeUptimeHistoryResponse,
RewardEstimationResponse, StakeSaturationResponse, UptimeResponse,
},
};
pub use nym_coconut_dkg_common::types::EpochId;
use nym_mixnet_contract_common::mixnode::MixNodeDetails;
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, MixId};
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, Interval, MixId};
use nym_name_service_common::response::NamesListResponse;
use nym_service_provider_directory_common::response::ServicesListResponse;
@@ -97,6 +97,14 @@ pub trait NymApiClientExt: ApiClient {
.await
}
async fn get_nym_nodes_described(&self) -> Result<Vec<DescribedNymNode>, NymAPIError> {
self.get_json(
&[routes::API_VERSION, routes::NYM_NODES, routes::DESCRIBED],
NO_PARAMS,
)
.await
}
async fn get_active_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
self.get_json(
&[routes::API_VERSION, routes::MIXNODES, routes::ACTIVE],
@@ -144,6 +152,14 @@ pub trait NymApiClientExt: ApiClient {
.await
}
async fn get_current_epoch(&self) -> Result<Interval, NymAPIError> {
self.get_json(
&[routes::API_VERSION, routes::EPOCH, routes::CURRENT],
NO_PARAMS,
)
.await
}
async fn get_gateway_report(
&self,
identity: IdentityKeyRef<'_>,
@@ -6,8 +6,12 @@ use nym_network_defaults::NYM_API_VERSION;
pub const API_VERSION: &str = NYM_API_VERSION;
pub const MIXNODES: &str = "mixnodes";
pub const GATEWAYS: &str = "gateways";
pub const NYM_NODES: &str = "nym-nodes";
pub const DESCRIBED: &str = "described";
pub const EPOCH: &str = "epoch";
pub const CURRENT: &str = "current";
pub const DETAILED: &str = "detailed";
pub const DETAILED_UNFILTERED: &str = "detailed-unfiltered";
pub const ACTIVE: &str = "active";
+22
View File
@@ -0,0 +1,22 @@
[package]
name = "nym-noise"
version = "0.1.0"
authors = ["Simon Wicky <simon@nymtech.net>"]
edition = "2021"
license.workspace = true
[dependencies]
snow = "0.9.2"
futures = "0.3"
tokio = { version = "1.24.1", features = ["net","io-util","time"] }
tokio-util = { workspace = true, features = ["codec"] }
pin-project = "1"
log = "0.4.19"
sha2 = "0.10.7"
bytes = "1.0"
thiserror = "1.0.44"
semver = "0.11"
# internal
nym-topology = { path = "../topology"}
nym-crypto = { path = "../crypto" }
+72
View File
@@ -0,0 +1,72 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use std::io;
use pin_project::pin_project;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpStream,
};
use crate::stream::NoiseStream;
#[pin_project(project = ConnectionProj)]
pub enum Connection {
Tcp(#[pin] TcpStream),
Noise(#[pin] NoiseStream),
}
impl Connection {
pub fn peer_addr(&self) -> Result<std::net::SocketAddr, io::Error> {
match self {
Self::Noise(stream) => stream.peer_addr(),
Self::Tcp(stream) => stream.peer_addr(),
}
}
}
impl AsyncRead for Connection {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<io::Result<()>> {
match self.project() {
ConnectionProj::Noise(stream) => stream.poll_read(cx, buf),
ConnectionProj::Tcp(stream) => stream.poll_read(cx, buf),
}
}
}
impl AsyncWrite for Connection {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, io::Error>> {
match self.project() {
ConnectionProj::Noise(stream) => stream.poll_write(cx, buf),
ConnectionProj::Tcp(stream) => stream.poll_write(cx, buf),
}
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
match self.project() {
ConnectionProj::Noise(stream) => stream.poll_flush(cx),
ConnectionProj::Tcp(stream) => stream.poll_flush(cx),
}
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
match self.project() {
ConnectionProj::Noise(stream) => stream.poll_shutdown(cx),
ConnectionProj::Tcp(stream) => stream.poll_shutdown(cx),
}
}
}
+43
View File
@@ -0,0 +1,43 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use snow::Error;
use std::io;
use std::num::TryFromIntError;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum NoiseError {
#[error("encountered a Noise decryption error")]
DecryptionError,
#[error("encountered a Noise Protocol error - {0}")]
ProtocolError(Error),
#[error("encountered an IO error - {0}")]
IoError(#[from] io::Error),
#[error("Incorrect state")]
IncorrectStateError,
#[error("Handshake timeout")]
HandshakeTimeoutError(#[from] tokio::time::error::Elapsed),
#[error("Handshake did not complete")]
HandshakeError,
#[error(transparent)]
IntConversionError(#[from] TryFromIntError),
#[error("unable to extract public key - {0}")]
EncryptionKeyConversionError(#[from] nym_crypto::asymmetric::encryption::KeyRecoveryError),
}
impl From<Error> for NoiseError {
fn from(err: Error) -> Self {
match err {
Error::Decrypt => NoiseError::DecryptionError,
err => NoiseError::ProtocolError(err),
}
}
}
+148
View File
@@ -0,0 +1,148 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::connection::Connection;
use crate::error::NoiseError;
use crate::stream::{NoisePattern, NoiseStream};
use log::*;
use nym_crypto::asymmetric::encryption;
use nym_topology::NymTopology;
use sha2::{Digest, Sha256};
use snow::{error::Prerequisite, Builder, Error};
use tokio::net::TcpStream;
pub mod connection;
pub mod error;
pub mod stream;
const NOISE_PSK_PREFIX: &[u8] = b"NYMTECH_NOISE_dQw4w9WgXcQ";
pub async fn upgrade_noise_initiator(
conn: TcpStream,
pattern: NoisePattern,
local_private_key: &encryption::PrivateKey,
remote_pub_key: &encryption::PublicKey,
epoch: u32,
) -> Result<Connection, NoiseError> {
trace!("Perform Noise Handshake, initiator side");
let secret = [
NOISE_PSK_PREFIX.to_vec(),
remote_pub_key.to_bytes().to_vec(),
epoch.to_be_bytes().to_vec(),
]
.concat();
let secret_hash = Sha256::digest(secret);
let handshake = Builder::new(pattern.as_str().parse()?)
.local_private_key(&local_private_key.to_bytes())
.remote_public_key(&remote_pub_key.to_bytes())
.psk(pattern.psk_position(), &secret_hash)
.build_initiator()?;
let noise_stream = NoiseStream::new(conn, handshake);
Ok(Connection::Noise(noise_stream.perform_handshake().await?))
}
pub async fn upgrade_noise_initiator_with_topology(
conn: TcpStream,
pattern: NoisePattern,
topology: &NymTopology,
epoch: u32,
local_private_key: &encryption::PrivateKey,
) -> Result<Connection, NoiseError> {
//Get init material
let responder_addr = conn.peer_addr().map_err(|err| {
error!("Unable to extract peer address from connection - {err}");
Error::Prereq(Prerequisite::RemotePublicKey)
})?;
let remote_pub_key = match topology.find_node_key_by_mix_host(responder_addr, true) {
Ok(Some(key)) => encryption::PublicKey::from_base58_string(key)?,
Ok(None) => {
warn!(
"{:?} can't speak Noise yet, falling back to TCP",
responder_addr
);
return Ok(Connection::Tcp(conn));
}
Err(_) => {
error!(
"Cannot find public key for node with address {:?}",
responder_addr
); //Do we still pursue a TCP connection or not?
return Err(Error::Prereq(Prerequisite::RemotePublicKey).into());
}
};
upgrade_noise_initiator(conn, pattern, local_private_key, &remote_pub_key, epoch).await
}
pub async fn upgrade_noise_responder(
conn: TcpStream,
pattern: NoisePattern,
local_public_key: &encryption::PublicKey,
local_private_key: &encryption::PrivateKey,
epoch: u32,
) -> Result<Connection, NoiseError> {
trace!("Perform Noise Handshake, responder side");
let secret = [
NOISE_PSK_PREFIX.to_vec(),
local_public_key.to_bytes().to_vec(),
epoch.to_be_bytes().to_vec(),
]
.concat();
let secret_hash = Sha256::digest(secret);
let handshake = Builder::new(pattern.as_str().parse()?)
.local_private_key(&local_private_key.to_bytes())
.psk(pattern.psk_position(), &secret_hash)
.build_responder()?;
let noise_stream = NoiseStream::new(conn, handshake);
Ok(Connection::Noise(noise_stream.perform_handshake().await?))
}
pub async fn upgrade_noise_responder_with_topology(
conn: TcpStream,
pattern: NoisePattern,
topology: &NymTopology,
epoch: u32,
local_public_key: &encryption::PublicKey,
local_private_key: &encryption::PrivateKey,
) -> Result<Connection, NoiseError> {
//Get init material
let initiator_addr = match conn.peer_addr() {
Ok(addr) => addr,
Err(err) => {
error!("Unable to extract peer address from connection - {err}");
return Err(Error::Prereq(Prerequisite::RemotePublicKey).into());
}
};
match topology.find_node_key_by_mix_host(initiator_addr, false) {
Ok(Some(_)) => {
//Existing node supporting Noise
upgrade_noise_responder(conn, pattern, local_public_key, local_private_key, epoch).await
}
Ok(None) => {
//Existing node not supporting Noise yet
warn!(
"{:?} can't speak Noise yet, falling back to TCP",
initiator_addr
);
Ok(Connection::Tcp(conn))
}
Err(_) => {
//Non existing node
error!(
"Cannot find public key for node with address {:?}",
initiator_addr
); //Do we still pursue a TCP connection with that node or not?
Err(Error::Prereq(Prerequisite::RemotePublicKey).into())
}
}
}
+220
View File
@@ -0,0 +1,220 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::error::NoiseError;
use bytes::BytesMut;
use futures::{Sink, SinkExt, Stream, StreamExt};
use pin_project::pin_project;
use snow::{HandshakeState, TransportState};
use std::cmp::min;
use std::collections::VecDeque;
use std::io;
use std::pin::Pin;
use std::task::Poll;
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::TcpStream,
};
use tokio_util::codec::{Framed, LengthDelimitedCodec};
const MAXMSGLEN: usize = 65535;
const TAGLEN: usize = 16;
#[derive(Default)]
pub enum NoisePattern {
#[default]
XKpsk3,
IKpsk2,
}
impl NoisePattern {
pub(crate) fn as_str(&self) -> &'static str {
match self {
Self::XKpsk3 => "Noise_XKpsk3_25519_AESGCM_SHA256",
Self::IKpsk2 => "Noise_IKpsk2_25519_ChaChaPoly_BLAKE2s", //Wireguard handshake (not exactly though)
}
}
pub(crate) fn psk_position(&self) -> u8 {
//automatic parsing, works for correct pattern, more convenient
match self.as_str().find("psk") {
Some(n) => {
let psk_index = n + 3;
let psk_char = self.as_str().chars().nth(psk_index).unwrap();
psk_char.to_string().parse().unwrap()
//if this fails, it means hardcoded pattern are wrong
}
None => 0,
}
}
}
/// Wrapper around a TcpStream
#[pin_project]
pub struct NoiseStream {
#[pin]
inner_stream: Framed<TcpStream, LengthDelimitedCodec>,
handshake: Option<HandshakeState>,
noise: Option<TransportState>,
dec_buffer: VecDeque<u8>,
}
impl NoiseStream {
pub(crate) fn new(inner_stream: TcpStream, handshake: HandshakeState) -> NoiseStream {
NoiseStream {
inner_stream: LengthDelimitedCodec::builder()
.length_field_type::<u16>()
.new_framed(inner_stream),
handshake: Some(handshake),
noise: None,
dec_buffer: VecDeque::with_capacity(MAXMSGLEN),
}
}
pub(crate) async fn perform_handshake(mut self) -> Result<Self, NoiseError> {
//Check if we are in the correct state
let Some(mut handshake) = self.handshake else {
return Err(NoiseError::IncorrectStateError);
};
self.handshake = None;
while !handshake.is_handshake_finished() {
if handshake.is_my_turn() {
self.send_handshake_msg(&mut handshake).await?;
} else {
self.recv_handshake_msg(&mut handshake).await?;
}
}
self.noise = Some(handshake.into_transport_mode()?);
Ok(self)
}
async fn send_handshake_msg(
&mut self,
handshake: &mut HandshakeState,
) -> Result<(), NoiseError> {
let mut buf = BytesMut::zeroed(MAXMSGLEN + TAGLEN);
let len = handshake.write_message(&[], &mut buf)?;
buf.truncate(len);
self.inner_stream.send(buf.into()).await?;
Ok(())
}
async fn recv_handshake_msg(
&mut self,
handshake: &mut HandshakeState,
) -> Result<(), NoiseError> {
match self.inner_stream.next().await {
Some(Ok(msg)) => {
let mut buf = vec![0u8; MAXMSGLEN];
handshake.read_message(&msg, &mut buf)?;
Ok(())
}
Some(Err(err)) => Err(NoiseError::IoError(err)),
None => Err(NoiseError::HandshakeError),
}
}
pub fn peer_addr(&self) -> Result<std::net::SocketAddr, io::Error> {
self.inner_stream.get_ref().peer_addr()
}
}
impl AsyncRead for NoiseStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let projected_self = self.project();
match projected_self.inner_stream.poll_next(cx) {
Poll::Pending => {
//no new data, waking is already scheduled.
//Nothing new to decrypt, only check if we can return something from dec_storage, happens after
}
Poll::Ready(Some(Ok(noise_msg))) => {
//We have a new moise msg
let mut dec_msg = vec![0u8; MAXMSGLEN];
let len = match projected_self.noise {
Some(transport_state) => {
match transport_state.read_message(&noise_msg, &mut dec_msg) {
Ok(len) => len,
Err(_) => return Poll::Ready(Err(io::ErrorKind::InvalidInput.into())),
}
}
None => return Poll::Ready(Err(io::ErrorKind::Other.into())),
};
projected_self.dec_buffer.extend(&dec_msg[..len]);
}
Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
//Stream is done, return Ok with nothing in buf
Poll::Ready(None) => return Poll::Ready(Ok(())),
}
//check and return what we can
let read_len = min(buf.remaining(), projected_self.dec_buffer.len());
if read_len > 0 {
buf.put_slice(
&projected_self
.dec_buffer
.drain(..read_len)
.collect::<Vec<u8>>(),
);
return Poll::Ready(Ok(()));
}
//If we end up here, it must mean the previous poll_next was pending as well, otherwise something was returned. Hence waking is already scheduled
Poll::Pending
}
}
impl AsyncWrite for NoiseStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
let mut projected_self = self.project();
match projected_self.inner_stream.as_mut().poll_ready(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Ready(Ok(())) => {
let mut noise_buf = BytesMut::zeroed(MAXMSGLEN + TAGLEN);
let Ok(len) = (match projected_self.noise {
Some(transport_state) => transport_state.write_message(buf, &mut noise_buf),
None => return Poll::Ready(Err(io::ErrorKind::Other.into())),
}) else {
return Poll::Ready(Err(io::ErrorKind::InvalidInput.into()));
};
noise_buf.truncate(len);
match projected_self.inner_stream.start_send(noise_buf.into()) {
Ok(()) => Poll::Ready(Ok(buf.len())),
Err(e) => Poll::Ready(Err(e)),
}
}
}
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
self.project().inner_stream.poll_flush(cx)
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
self.project().inner_stream.poll_close(cx)
}
}
+4 -4
View File
@@ -237,7 +237,7 @@ mod message_receiver {
mix_id: 123,
owner: "foomp1".to_string(),
host: "10.20.30.40".parse().unwrap(),
mix_host: "10.20.30.40:1789".parse().unwrap(),
mix_hosts: vec!["10.20.30.40:1789".parse().unwrap()],
identity_key: identity::PublicKey::from_base58_string(
"3ebjp1Fb9hdcS1AR6AZihgeJiMHkB5jjJUsvqNnfQwU7",
)
@@ -257,7 +257,7 @@ mod message_receiver {
mix_id: 234,
owner: "foomp2".to_string(),
host: "11.21.31.41".parse().unwrap(),
mix_host: "11.21.31.41:1789".parse().unwrap(),
mix_hosts: vec!["11.21.31.41:1789".parse().unwrap()],
identity_key: identity::PublicKey::from_base58_string(
"D6YaMzLSY7mANtSQRKXsmMZpqgqiVkeiagKM4V4oFPFr",
)
@@ -277,7 +277,7 @@ mod message_receiver {
mix_id: 456,
owner: "foomp3".to_string(),
host: "12.22.32.42".parse().unwrap(),
mix_host: "12.22.32.42:1789".parse().unwrap(),
mix_hosts: vec!["12.22.32.42:1789".parse().unwrap()],
identity_key: identity::PublicKey::from_base58_string(
"GkWDysw4AjESv1KiAiVn7JzzCMJeksxNSXVfr1PpX8wD",
)
@@ -297,7 +297,7 @@ mod message_receiver {
vec![gateway::Node {
owner: "foomp4".to_string(),
host: "1.2.3.4".parse().unwrap(),
mix_host: "1.2.3.4:1789".parse().unwrap(),
mix_hosts: vec!["1.2.3.4:1789".parse().unwrap()],
clients_ws_port: 9000,
clients_wss_port: None,
identity_key: identity::PublicKey::from_base58_string(
+42
View File
@@ -0,0 +1,42 @@
[package]
name = "nym-topology-control"
version = "0.1.0"
edition.workspace = true
authors.workspace = true
license.workspace = true
repository.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
rand = { version = "0.7.3", features = ["wasm-bindgen"] }
serde = { workspace = true, features = ["derive"] }
tap = "1.0.1"
url = { workspace = true, features = ["serde"] }
tokio = { workspace = true, features = ["macros"]}
# internal
nym-explorer-client = { path = "../../explorer-api/explorer-client" }
nym-sphinx = { path = "../nymsphinx" }
nym-topology = { path = "../topology", features = ["serializable"] }
nym-validator-client = { path = "../client-libs/validator-client", default-features = false }
nym-task = { path = "../task" }
nym-network-defaults = { path = "../network-defaults" }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream]
version = "0.1.11"
features = ["time"]
[target."cfg(target_arch = \"wasm32\")".dependencies.wasmtimer]
workspace = true
features = ["tokio"]
[target."cfg(target_arch = \"wasm32\")".dependencies.gloo-timers]
version = "0.2.4"
features = ["futures"]
@@ -49,7 +49,7 @@ impl<'a> Deref for TopologyReadPermit<'a> {
impl<'a> TopologyReadPermit<'a> {
/// Using provided topology read permit, tries to get an immutable reference to the underlying
/// topology. For obvious reasons the lifetime of the topology reference is bound to the permit.
pub(crate) fn try_get_valid_topology_ref(
pub fn try_get_valid_topology_ref(
&'a self,
ack_recipient: &Recipient,
packet_recipient: Option<&Recipient>,
@@ -83,6 +83,16 @@ impl<'a> TopologyReadPermit<'a> {
Ok(topology)
}
pub fn try_get_raw_topology_ref(&'a self) -> Result<&'a NymTopology, NymTopologyError> {
// 1. Have we managed to get anything from the refresher, i.e. have the nym-api queries gone through?
let topology = self
.permit
.as_ref()
.ok_or(NymTopologyError::EmptyNetworkTopology)?;
Ok(topology)
}
}
impl<'a> From<RwLockReadGuard<'a, Option<NymTopology>>> for TopologyReadPermit<'a> {
@@ -3,6 +3,7 @@ use std::{collections::HashMap, fmt};
use log::{debug, error, info};
use nym_explorer_client::{ExplorerClient, PrettyDetailedMixNodeBond};
use nym_network_defaults::var_names::EXPLORER_API;
use nym_sphinx::addressing::clients::Recipient;
use nym_topology::{
nym_topology_from_detailed,
provider_trait::{async_trait, TopologyProvider},
@@ -14,8 +15,6 @@ use serde::{Deserialize, Serialize};
use tap::TapOptional;
use url::Url;
use crate::config::GroupBy;
const MIN_NODES_PER_LAYER: usize = 1;
fn create_explorer_client() -> Option<ExplorerClient> {
@@ -38,6 +37,22 @@ fn create_explorer_client() -> Option<ExplorerClient> {
Some(client)
}
#[allow(clippy::large_enum_variant)]
#[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum GroupBy {
CountryGroup(CountryGroup),
NymAddress(Recipient),
}
impl std::fmt::Display for GroupBy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
GroupBy::CountryGroup(group) => write!(f, "group: {}", group),
GroupBy::NymAddress(address) => write!(f, "address: {}", address),
}
}
}
#[derive(Copy, Clone, Hash, PartialEq, Eq, Serialize, Deserialize, Debug)]
pub enum CountryGroup {
Europe,
@@ -279,6 +294,14 @@ impl GeoAwareTopologyProvider {
Ok(gateways) => gateways,
};
let nodes_described = match self.validator_client.get_cached_described_nodes().await {
Err(err) => {
error!("failed to get described nodes - {err}");
return None;
}
Ok(epoch) => epoch,
};
// Also fetch mixnodes cached by explorer-api, with the purpose of getting their
// geolocation.
debug!("Fetching mixnodes from explorer-api...");
@@ -338,7 +361,7 @@ impl GeoAwareTopologyProvider {
.filter(|m| filtered_mixnode_ids.contains(&m.mix_id()))
.collect::<Vec<_>>();
let topology = nym_topology_from_detailed(mixnodes, gateways)
let topology = nym_topology_from_detailed(mixnodes, gateways, nodes_described)
.filter_system_version(&self.client_version);
// TODO: return real error type
@@ -1,11 +1,11 @@
// Copyright 2021-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::spawn_future;
pub(crate) use accessor::{TopologyAccessor, TopologyReadPermit};
pub use accessor::{TopologyAccessor, TopologyReadPermit};
use futures::StreamExt;
use log::*;
use nym_sphinx::addressing::nodes::NodeIdentity;
use nym_task::spawn;
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::NymTopologyError;
use std::time::Duration;
@@ -16,9 +16,9 @@ use tokio::time::sleep;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::sleep;
mod accessor;
pub mod accessor;
pub mod geo_aware_provider;
pub(crate) mod nym_api_provider;
pub mod nym_api_provider;
// TODO: move it to config later
const MAX_FAILURE_COUNT: usize = 10;
@@ -142,7 +142,7 @@ impl TopologyRefresher {
}
pub fn start_with_shutdown(mut self, mut shutdown: nym_task::TaskClient) {
spawn_future(async move {
spawn(async move {
debug!("Started TopologyRefresher with graceful shutdown support");
#[cfg(not(target_arch = "wasm32"))]
@@ -9,7 +9,7 @@ use rand::prelude::SliceRandom;
use rand::thread_rng;
use url::Url;
pub(crate) struct NymApiTopologyProvider {
pub struct NymApiTopologyProvider {
validator_client: nym_validator_client::client::NymApiClient,
nym_api_urls: Vec<Url>,
@@ -18,7 +18,7 @@ pub(crate) struct NymApiTopologyProvider {
}
impl NymApiTopologyProvider {
pub(crate) fn new(mut nym_api_urls: Vec<Url>, client_version: String) -> Self {
pub fn new(mut nym_api_urls: Vec<Url>, client_version: String) -> Self {
nym_api_urls.shuffle(&mut thread_rng());
NymApiTopologyProvider {
@@ -77,13 +77,22 @@ impl NymApiTopologyProvider {
Ok(gateways) => gateways,
};
let topology = nym_topology_from_detailed(mixnodes, gateways)
let nodes_described = match self.validator_client.get_cached_described_nodes().await {
Err(err) => {
error!("failed to get described nodes - {err}");
return None;
}
Ok(epoch) => epoch,
};
let topology = nym_topology_from_detailed(mixnodes, gateways, nodes_described.clone())
.filter_system_version(&self.client_version);
if let Err(err) = self.check_layer_distribution(&topology) {
warn!("The current filtered active topology has extremely skewed layer distribution. It cannot be used: {err}");
self.use_next_nym_api();
None
let empty_topology = NymTopology::empty().with_described_nodes(nodes_described);
Some(empty_topology)
} else {
Some(topology)
}
+18 -16
View File
@@ -46,9 +46,9 @@ pub enum GatewayConversionError {
pub struct Node {
pub owner: String,
pub host: NetworkAddress,
// we're keeping this as separate resolved field since we do not want to be resolving the potential
// hostname every time we want to construct a path via this node
pub mix_host: SocketAddr,
// we're keeping all resolved IPs as a separate field since we do not want to be resolving the potential
// hostname every time we want to construct a path via this node. When we need one, we default to the first one
pub mix_hosts: Vec<SocketAddr>,
// #[serde(alias = "clients_port")]
pub clients_ws_port: u16,
@@ -66,7 +66,7 @@ impl std::fmt::Debug for Node {
f.debug_struct("gateway::Node")
.field("host", &self.host)
.field("owner", &self.owner)
.field("mix_host", &self.mix_host)
.field("mix_hosts", &self.mix_hosts)
.field("clients_ws_port", &self.clients_ws_port)
.field("clients_wss_port", &self.clients_wss_port)
.field("identity_key", &self.identity_key.to_base58_string())
@@ -88,13 +88,12 @@ impl Node {
pub fn extract_mix_host(
host: &NetworkAddress,
mix_port: u16,
) -> Result<SocketAddr, GatewayConversionError> {
Ok(host.to_socket_addrs(mix_port).map_err(|err| {
GatewayConversionError::InvalidAddress {
) -> Result<Vec<SocketAddr>, GatewayConversionError> {
host.to_socket_addrs(mix_port)
.map_err(|err| GatewayConversionError::InvalidAddress {
value: host.to_string(),
source: err,
}
})?[0])
})
}
pub fn identity(&self) -> &NodeIdentity {
@@ -135,7 +134,7 @@ impl filter::Versioned for Node {
impl<'a> From<&'a Node> for SphinxNode {
fn from(node: &'a Node) -> Self {
let node_address_bytes = NymNodeRoutingAddress::from(node.mix_host)
let node_address_bytes = NymNodeRoutingAddress::from(node.mix_hosts[0])
.try_into()
.unwrap();
@@ -151,12 +150,12 @@ impl<'a> TryFrom<&'a GatewayBond> for Node {
// try to completely resolve the host in the mix situation to avoid doing it every
// single time we want to construct a path
let mix_host = Self::extract_mix_host(&host, bond.gateway.mix_port)?;
let mix_hosts = Self::extract_mix_host(&host, bond.gateway.mix_port)?;
Ok(Node {
owner: bond.owner.as_str().to_owned(),
host,
mix_host,
mix_hosts,
clients_ws_port: bond.gateway.clients_port,
clients_wss_port: None,
identity_key: identity::PublicKey::from_base58_string(&bond.gateway.identity_key)?,
@@ -196,14 +195,17 @@ impl<'a> TryFrom<&'a DescribedGateway> for Node {
// get ip from the self-reported values so we wouldn't need to do any hostname resolution
// (which doesn't really work in wasm)
let mix_host = SocketAddr::new(ips[0], value.bond.gateway.mix_port);
let mix_hosts = ips
.iter()
.map(|ip| SocketAddr::new(*ip, value.bond.gateway.mix_port))
.collect();
Ok(Node {
owner: value.bond.owner.as_str().to_owned(),
host,
mix_host,
clients_ws_port: self_described.mixnet_websockets.ws_port,
clients_wss_port: self_described.mixnet_websockets.wss_port,
mix_hosts,
clients_ws_port: self_described.mixnet_websockets.unwrap().ws_port, //SW gateway have that field
clients_wss_port: self_described.mixnet_websockets.unwrap().wss_port, //SW gateway have that field
identity_key: identity::PublicKey::from_base58_string(
&self_described.host_information.keys.ed25519,
)?,
+86 -10
View File
@@ -19,7 +19,7 @@ use std::str::FromStr;
#[cfg(feature = "serializable")]
use ::serde::{Deserialize, Deserializer, Serialize, Serializer};
use nym_api_requests::models::DescribedGateway;
use nym_api_requests::models::{DescribedGateway, DescribedNymNode};
pub mod error;
pub mod filter;
@@ -115,14 +115,40 @@ pub type MixLayer = u8;
pub struct NymTopology {
mixes: BTreeMap<MixLayer, Vec<mix::Node>>,
gateways: Vec<gateway::Node>,
described_nodes: Vec<DescribedNymNode>,
}
impl NymTopology {
pub fn new(mixes: BTreeMap<MixLayer, Vec<mix::Node>>, gateways: Vec<gateway::Node>) -> Self {
NymTopology { mixes, gateways }
pub fn new(
mixes: BTreeMap<MixLayer, Vec<mix::Node>>,
gateways: Vec<gateway::Node>,
described_nodes: Vec<DescribedNymNode>,
) -> Self {
NymTopology {
mixes: mixes.clone(),
gateways: gateways.clone(),
described_nodes: described_nodes.clone(),
}
}
pub fn new_unordered(unordered_mixes: Vec<mix::Node>, gateways: Vec<gateway::Node>) -> Self {
pub fn empty() -> Self {
NymTopology {
mixes: BTreeMap::new(),
gateways: Vec::new(),
described_nodes: Vec::new(),
}
}
pub fn with_described_nodes(mut self, described_nodes: Vec<DescribedNymNode>) -> Self {
self.described_nodes = described_nodes;
self
}
pub fn new_unordered(
unordered_mixes: Vec<mix::Node>,
gateways: Vec<gateway::Node>,
described_nodes: Vec<DescribedNymNode>,
) -> Self {
let mut mixes = BTreeMap::new();
for node in unordered_mixes.into_iter() {
let layer = node.layer as MixLayer;
@@ -130,7 +156,7 @@ impl NymTopology {
layer_entry.push(node)
}
NymTopology { mixes, gateways }
NymTopology::new(mixes, gateways, described_nodes)
}
#[cfg(feature = "serializable")]
@@ -142,8 +168,9 @@ impl NymTopology {
pub fn from_detailed(
mix_details: Vec<MixNodeDetails>,
gateway_bonds: Vec<GatewayBond>,
described_nodes: Vec<DescribedNymNode>,
) -> Self {
nym_topology_from_detailed(mix_details, gateway_bonds)
nym_topology_from_detailed(mix_details, gateway_bonds, described_nodes)
}
pub fn find_mix(&self, mix_id: MixId) -> Option<&mix::Node> {
@@ -168,6 +195,53 @@ impl NymTopology {
None
}
pub fn find_node_key_by_mix_host(
&self,
mix_host: SocketAddr,
check_port: bool,
) -> Result<Option<String>, NymTopologyError> {
for node in self.described_nodes.iter() {
let (sphinx_key, socket_addresses, description) = match node {
DescribedNymNode::Gateway(g) => {
let sphinx_key = &g.bond.gateway.sphinx_key;
let gateway_node: Option<gateway::Node> = (&g.bond).try_into().ok();
let mix_hosts = gateway_node.map(|node| node.mix_hosts);
let description = &g.self_described;
(sphinx_key, mix_hosts, description)
}
DescribedNymNode::Mixnode(m) => {
let sphinx_key = &m.bond.mix_node.sphinx_key;
let mix_node: Option<mix::Node> = (&m.bond).try_into().ok();
let mix_hosts = mix_node.map(|node| node.mix_hosts);
let description = &m.self_described;
(sphinx_key, mix_hosts, description)
}
};
if let Some(sock_addr) = socket_addresses {
let existing_node = if check_port {
//Initiator side, we know the port should be correct as well
sock_addr.contains(&mix_host)
} else {
//responder side, we don't know the port.
//SW This can lead to some troubles if two nodes shares the same IP and one support Noise but not the other. This in only for the progressive update though
let ip_addresses = sock_addr.iter().map(|addr| addr.ip()).collect::<Vec<_>>();
ip_addresses.contains(&mix_host.ip())
};
if existing_node {
//we have our node
if let Some(d) = description {
if d.noise_information.supported {
return Ok(Some(sphinx_key.to_string()));
}
}
return Ok(None);
}
}
}
//didn't find that node
Err(NymTopologyError::NoMixnodesAvailable)
}
pub fn find_gateway(&self, gateway_identity: IdentityKeyRef) -> Option<&gateway::Node> {
self.gateways
.iter()
@@ -379,6 +453,7 @@ impl NymTopology {
NymTopology {
mixes: self.mixes.filter_by_version(expected_mix_version),
gateways: self.gateways.clone(),
described_nodes: self.described_nodes.clone(),
}
}
}
@@ -426,6 +501,7 @@ impl IntoGatewayNode for DescribedGateway {
pub fn nym_topology_from_detailed<G>(
mix_details: Vec<MixNodeDetails>,
gateway_bonds: Vec<G>,
described_nodes: Vec<DescribedNymNode>,
) -> NymTopology
where
G: IntoGatewayNode,
@@ -469,7 +545,7 @@ where
}
}
NymTopology::new(mixes, gateways)
NymTopology::new(mixes, gateways, described_nodes)
}
#[cfg(test)]
@@ -489,7 +565,7 @@ mod converting_mixes_to_vec {
mix_id: 42,
owner: "N/A".to_string(),
host: "3.3.3.3".parse().unwrap(),
mix_host: "3.3.3.3:1789".parse().unwrap(),
mix_hosts: vec!["3.3.3.3:1789".parse().unwrap()],
identity_key: identity::PublicKey::from_base58_string(
"3ebjp1Fb9hdcS1AR6AZihgeJiMHkB5jjJUsvqNnfQwU7",
)
@@ -516,7 +592,7 @@ mod converting_mixes_to_vec {
mixes.insert(1, vec![node1, node2]);
mixes.insert(2, vec![node3]);
let topology = NymTopology::new(mixes, vec![]);
let topology = NymTopology::new(mixes, vec![], vec![]);
let mixvec = topology.mixes_as_vec();
assert!(mixvec.iter().any(|node| node.owner == "N/A"));
}
@@ -528,7 +604,7 @@ mod converting_mixes_to_vec {
#[test]
fn returns_an_empty_vec() {
let topology = NymTopology::new(BTreeMap::new(), vec![]);
let topology = NymTopology::new(BTreeMap::new(), vec![], vec![]);
let mixvec = topology.mixes_as_vec();
assert!(mixvec.is_empty());
}
+11 -12
View File
@@ -34,9 +34,9 @@ pub struct Node {
pub mix_id: MixId,
pub owner: String,
pub host: NetworkAddress,
// we're keeping this as separate resolved field since we do not want to be resolving the potential
// hostname every time we want to construct a path via this node
pub mix_host: SocketAddr,
// we're keeping all resolved IPs as a separate field since we do not want to be resolving the potential
// hostname every time we want to construct a path via this node. When we need one, we default to the first one
pub mix_hosts: Vec<SocketAddr>,
pub identity_key: identity::PublicKey,
pub sphinx_key: encryption::PublicKey, // TODO: or nymsphinx::PublicKey? both are x25519
pub layer: Layer,
@@ -49,7 +49,7 @@ impl std::fmt::Debug for Node {
.field("mix_id", &self.mix_id)
.field("owner", &self.owner)
.field("host", &self.host)
.field("mix_host", &self.mix_host)
.field("mix_hosts", &self.mix_hosts)
.field("identity_key", &self.identity_key.to_base58_string())
.field("sphinx_key", &self.sphinx_key.to_base58_string())
.field("layer", &self.layer)
@@ -70,13 +70,12 @@ impl Node {
pub fn extract_mix_host(
host: &NetworkAddress,
mix_port: u16,
) -> Result<SocketAddr, MixnodeConversionError> {
Ok(host.to_socket_addrs(mix_port).map_err(|err| {
MixnodeConversionError::InvalidAddress {
) -> Result<Vec<SocketAddr>, MixnodeConversionError> {
host.to_socket_addrs(mix_port)
.map_err(|err| MixnodeConversionError::InvalidAddress {
value: host.to_string(),
source: err,
}
})?[0])
})
}
}
@@ -89,7 +88,7 @@ impl filter::Versioned for Node {
impl<'a> From<&'a Node> for SphinxNode {
fn from(node: &'a Node) -> Self {
let node_address_bytes = NymNodeRoutingAddress::from(node.mix_host)
let node_address_bytes = NymNodeRoutingAddress::from(node.mix_hosts[0])
.try_into()
.unwrap();
@@ -105,13 +104,13 @@ impl<'a> TryFrom<&'a MixNodeBond> for Node {
// try to completely resolve the host in the mix situation to avoid doing it every
// single time we want to construct a path
let mix_host = Self::extract_mix_host(&host, bond.mix_node.mix_port)?;
let mix_hosts = Self::extract_mix_host(&host, bond.mix_node.mix_port)?;
Ok(Node {
mix_id: bond.mix_id,
owner: bond.owner.as_str().to_owned(),
host,
mix_host,
mix_hosts,
identity_key: identity::PublicKey::from_base58_string(&bond.mix_node.identity_key)?,
sphinx_key: encryption::PublicKey::from_base58_string(&bond.mix_node.sphinx_key)?,
layer: bond.layer,
+26 -11
View File
@@ -4,6 +4,7 @@
use crate::gateway::GatewayConversionError;
use crate::mix::MixnodeConversionError;
use crate::{gateway, mix, MixLayer, NymTopology};
use nym_api_requests::models::DescribedNymNode;
use nym_config::defaults::{DEFAULT_CLIENT_LISTENING_PORT, DEFAULT_MIX_LISTENING_PORT};
use nym_crypto::asymmetric::{encryption, identity};
use serde::{Deserialize, Serialize};
@@ -53,6 +54,12 @@ impl From<SerializableTopologyError> for JsValue {
pub struct SerializableNymTopology {
pub mixnodes: BTreeMap<MixLayer, Vec<SerializableMixNode>>,
pub gateways: Vec<SerializableGateway>,
//SW NOTE : make this an option to keep backwards compatibility. Noise with fallback needs that to work though
// Once fallback is removed, we only need a list of unfiltered nodes that can be constructed from mixnodes and gateways
// depending on the usecase of this struct
#[serde(alias = "described_nodes")]
pub described_nodes: Option<Vec<DescribedNymNode>>, //DescribedNymNode is already Serialize and Deserialize
}
impl TryFrom<SerializableNymTopology> for NymTopology {
@@ -76,7 +83,11 @@ impl TryFrom<SerializableNymTopology> for NymTopology {
.map(TryInto::try_into)
.collect::<Result<_, _>>()?;
Ok(NymTopology::new(converted_mixes, gateways))
Ok(NymTopology::new(
converted_mixes,
gateways,
value.described_nodes.unwrap_or_default(),
))
}
}
@@ -89,6 +100,7 @@ impl From<NymTopology> for SerializableNymTopology {
.map(|(&l, nodes)| (l, nodes.iter().map(Into::into).collect()))
.collect(),
gateways: value.gateways().iter().map(Into::into).collect(),
described_nodes: Some(value.described_nodes),
}
}
}
@@ -135,13 +147,13 @@ impl TryFrom<SerializableMixNode> for mix::Node {
// try to completely resolve the host in the mix situation to avoid doing it every
// single time we want to construct a path
let mix_host = mix::Node::extract_mix_host(&host, mix_port)?;
let mix_hosts = mix::Node::extract_mix_host(&host, mix_port)?;
Ok(mix::Node {
mix_id: value.mix_id,
owner: value.owner,
host,
mix_host,
mix_hosts,
identity_key: identity::PublicKey::from_base58_string(&value.identity_key)
.map_err(MixnodeConversionError::from)?,
sphinx_key: encryption::PublicKey::from_base58_string(&value.sphinx_key)
@@ -159,7 +171,7 @@ impl<'a> From<&'a mix::Node> for SerializableMixNode {
mix_id: value.mix_id,
owner: value.owner.clone(),
host: value.host.to_string(),
mix_port: Some(value.mix_host.port()),
mix_port: Some(value.mix_hosts[0].port()),
identity_key: value.identity_key.to_base58_string(),
sphinx_key: value.sphinx_key.to_base58_string(),
layer: value.layer.into(),
@@ -181,8 +193,8 @@ pub struct SerializableGateway {
// optional ip address in the case of host being a hostname that can't be resolved
// (thank you wasm)
#[cfg_attr(feature = "wasm-serde-types", tsify(optional))]
#[serde(alias = "explicit_ip")]
pub explicit_ip: Option<IpAddr>,
#[serde(alias = "explicit_ips")]
pub explicit_ips: Option<Vec<IpAddr>>,
#[cfg_attr(feature = "wasm-serde-types", tsify(optional))]
#[serde(alias = "mix_port")]
@@ -221,8 +233,11 @@ impl TryFrom<SerializableGateway> for gateway::Node {
// try to completely resolve the host in the mix situation to avoid doing it every
// single time we want to construct a path
let mix_host = if let Some(explicit_ip) = value.explicit_ip {
SocketAddr::new(explicit_ip, mix_port)
let mix_hosts = if let Some(explicit_ips) = value.explicit_ips {
explicit_ips
.iter()
.map(|explicit_ip| SocketAddr::new(*explicit_ip, mix_port))
.collect()
} else {
gateway::Node::extract_mix_host(&host, mix_port)?
};
@@ -230,7 +245,7 @@ impl TryFrom<SerializableGateway> for gateway::Node {
Ok(gateway::Node {
owner: value.owner,
host,
mix_host,
mix_hosts,
clients_ws_port,
clients_wss_port: value.clients_wss_port,
identity_key: identity::PublicKey::from_base58_string(&value.identity_key)
@@ -247,8 +262,8 @@ impl<'a> From<&'a gateway::Node> for SerializableGateway {
SerializableGateway {
owner: value.owner.clone(),
host: value.host.to_string(),
explicit_ip: Some(value.mix_host.ip()),
mix_port: Some(value.mix_host.port()),
explicit_ips: Some(value.mix_hosts.iter().map(|addr| addr.ip()).collect()),
mix_port: Some(value.mix_hosts[0].port()),
clients_ws_port: Some(value.clients_ws_port),
clients_wss_port: value.clients_wss_port,
identity_key: value.identity_key.to_base58_string(),
+3
View File
@@ -78,6 +78,9 @@ nym-task = { path = "../common/task" }
nym-types = { path = "../common/types" }
nym-validator-client = { path = "../common/client-libs/validator-client" }
nym-ip-packet-router = { path = "../service-providers/ip-packet-router" }
nym-topology-control = { path = "../common/topology-control" }
nym-topology = { path = "../common/topology" }
nym-noise = { path = "../common/nymnoise" }
nym-wireguard = { path = "../common/wireguard", optional = true }
defguard_wireguard_rs = { git = "https://github.com/neacsu/wireguard-rs.git", rev = "c2cd0c1119f699f4bc43f5e6ffd6fc242caa42ed", optional = true }
+11 -7
View File
@@ -58,6 +58,10 @@ pub struct Run {
// the alias here is included for backwards compatibility (1.1.4 and before)
nym_apis: Option<Vec<url::Url>>,
/// Path to .json file containing custom network specification.
#[arg(long, group = "network", hide = true)]
custom_mixnet: Option<PathBuf>,
/// Comma separated list of endpoints of the validator
#[arg(
long,
@@ -127,11 +131,6 @@ pub struct Run {
)]
medium_toggle: bool,
/// Path to .json file containing custom network specification.
/// Only usable when local network requester is enabled.
#[arg(long, group = "network", hide = true)]
custom_mixnet: Option<PathBuf>,
/// Specifies whether this network requester will run using the default ExitPolicy
/// as opposed to the allow list.
/// Note: this setting will become the default in the future releases.
@@ -249,12 +248,17 @@ pub async fn execute(args: Run) -> anyhow::Result<()> {
}
let node_details = node_details(&config)?;
let gateway =
crate::node::create_gateway(config, Some(nr_opts), Some(ip_opts), custom_mixnet).await?;
let mut gateway =
crate::node::create_gateway(config, Some(nr_opts), Some(ip_opts), custom_mixnet.clone())
.await?;
eprintln!(
"\nTo bond your gateway you will need to install the Nym wallet, go to https://nymtech.net/get-involved and select the Download button.\n\
Select the correct version and install it to your machine. You will need to provide some of the following: \n ");
output.to_stdout(&node_details);
if let Some(custom_mixnet) = custom_mixnet {
gateway = gateway.with_stored_topology(custom_mixnet)?;
}
gateway.run().await
}
+37
View File
@@ -38,6 +38,9 @@ const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_milli
const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500);
const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 2000;
const DEFAULT_TOPOLOGY_REFRESH_RATE: Duration = Duration::from_secs(5 * 60); // every 5min
const DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT: Duration = Duration::from_millis(5_000);
const DEFAULT_STORED_MESSAGE_FILENAME_LENGTH: u16 = 16;
const DEFAULT_MESSAGE_RETRIEVAL_LIMIT: i64 = 100;
@@ -109,6 +112,9 @@ pub struct Config {
#[serde(default)]
pub debug: Debug,
#[serde(default)]
pub topology: Topology,
}
impl NymConfigTemplate for Config {
@@ -135,6 +141,7 @@ impl Config {
ip_packet_router: Default::default(),
logging: Default::default(),
debug: Default::default(),
topology: Default::default(),
}
}
@@ -442,3 +449,33 @@ impl Default for Debug {
}
}
}
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct Topology {
/// The uniform delay every which clients are querying the directory server
/// to try to obtain a compatible network topology to send sphinx packets through.
#[serde(with = "humantime_serde")]
pub topology_refresh_rate: Duration,
/// During topology refresh, test packets are sent through every single possible network
/// path. This timeout determines waiting period until it is decided that the packet
/// did not reach its destination.
#[serde(with = "humantime_serde")]
pub topology_resolution_timeout: Duration,
/// Specifies whether the client should not refresh the network topology after obtaining
/// the first valid instance.
/// Supersedes `topology_refresh_rate_ms`.
pub disable_refreshing: bool,
}
impl Default for Topology {
fn default() -> Self {
Topology {
topology_refresh_rate: DEFAULT_TOPOLOGY_REFRESH_RATE,
topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT,
disable_refreshing: false,
}
}
}
+3
View File
@@ -165,6 +165,9 @@ impl From<ConfigV1_1_31> for Config {
message_retrieval_limit: value.debug.message_retrieval_limit,
use_legacy_framed_packet_version: value.debug.use_legacy_framed_packet_version,
},
// \/ ADDED
topology: Default::default(),
// /\ ADDED
}
}
}
+9
View File
@@ -39,6 +39,15 @@ pub(crate) enum GatewayError {
source: io::Error,
},
#[error(
"failed to load custom topology using path '{}'. detailed message: {source}", file_path.display()
)]
CustomTopologyLoadFailure {
file_path: PathBuf,
#[source]
source: std::io::Error,
},
#[error(
"failed to load config file for network requester (gateway-id: '{id}') using path '{}'. detailed message: {source}", path.display()
)]
+2
View File
@@ -12,6 +12,7 @@ use nym_network_requester::RequestFilter;
use nym_node::error::NymNodeError;
use nym_node::http::api::api_requests;
use nym_node::http::api::api_requests::v1::network_requester::exit_policy::models::UsedExitPolicy;
use nym_node::http::api::api_requests::v1::node::models::NoiseInformation;
use nym_node::http::api::api_requests::SignedHostInformation;
use nym_node::http::router::WireguardAppState;
use nym_node::wireguard::types::GatewayClientRegistry;
@@ -260,6 +261,7 @@ impl<'a> HttpApiBuilder<'a> {
self.sphinx_keypair.public_key(),
self.identity_keypair,
)?,
NoiseInformation { supported: true }, //Now we can enable that
)
.with_gateway(load_gateway_details(self.gateway_config)?)
.with_landing_page_assets(self.gateway_config.http.landing_page_assets_path.as_ref());
@@ -9,15 +9,20 @@ use crate::node::storage::Storage;
use futures::channel::mpsc::SendError;
use futures::StreamExt;
use log::*;
use nym_crypto::asymmetric::encryption;
use nym_mixnet_client::forwarder::MixForwardingSender;
use nym_mixnode_common::packet_processor::processor::ProcessedFinalHop;
use nym_noise::upgrade_noise_responder_with_topology;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::framing::codec::NymCodec;
use nym_sphinx::framing::packet::FramedNymPacket;
use nym_sphinx::DestinationAddressBytes;
use nym_task::TaskClient;
use nym_topology_control::accessor::TopologyAccessor;
use nym_validator_client::NymApiClient;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
@@ -40,6 +45,9 @@ pub(crate) struct ConnectionHandler<St: Storage> {
active_clients_store: ActiveClientsStore,
storage: St,
ack_sender: MixForwardingSender,
topology_access: TopologyAccessor,
api_client: NymApiClient,
local_identity: Arc<encryption::KeyPair>,
}
impl<St: Storage + Clone> Clone for ConnectionHandler<St> {
@@ -58,6 +66,9 @@ impl<St: Storage + Clone> Clone for ConnectionHandler<St> {
active_clients_store: self.active_clients_store.clone(),
storage: self.storage.clone(),
ack_sender: self.ack_sender.clone(),
topology_access: self.topology_access.clone(),
api_client: self.api_client.clone(),
local_identity: self.local_identity.clone(),
}
}
}
@@ -68,6 +79,9 @@ impl<St: Storage> ConnectionHandler<St> {
storage: St,
ack_sender: MixForwardingSender,
active_clients_store: ActiveClientsStore,
topology_access: TopologyAccessor,
api_client: NymApiClient,
local_identity: Arc<encryption::KeyPair>,
) -> Self {
ConnectionHandler {
packet_processor,
@@ -75,6 +89,9 @@ impl<St: Storage> ConnectionHandler<St> {
storage,
active_clients_store,
ack_sender,
topology_access,
api_client,
local_identity,
}
}
@@ -205,7 +222,38 @@ impl<St: Storage> ConnectionHandler<St> {
) {
debug!("Starting connection handler for {:?}", remote);
shutdown.mark_as_success();
let mut framed_conn = Framed::new(conn, NymCodec);
let Some(topology) = self.topology_access.current_topology().await else {
error!("Cannot perform Noise handshake to {remote}, due to topology error");
return;
};
let epoch_id = match self.api_client.get_current_epoch_id().await {
Ok(id) => id,
Err(err) => {
error!("Cannot perform Noise handshake to {remote}, due to epoch id error - {err}");
return;
}
};
let noise_stream = match upgrade_noise_responder_with_topology(
conn,
Default::default(),
&topology,
epoch_id,
self.local_identity.public_key(),
self.local_identity.private_key(),
)
.await
{
Ok(noise_stream) => noise_stream,
Err(err) => {
error!("Failed to perform Noise handshake with {remote} - {err}");
return;
}
};
debug!("Noise responder handshake completed for {:?}", remote);
let mut framed_conn = Framed::new(noise_stream, NymCodec);
while !shutdown.is_shutdown() {
tokio::select! {
biased;
+112 -7
View File
@@ -7,7 +7,7 @@ use crate::commands::helpers::{
override_ip_packet_router_config, override_network_requester_config,
OverrideIpPacketRouterConfig, OverrideNetworkRequesterConfig,
};
use crate::config::Config;
use crate::config::{Config, Topology};
use crate::error::GatewayError;
use crate::http::HttpApiBuilder;
use crate::node::client_handling::active_clients::ActiveClientsStore;
@@ -31,13 +31,21 @@ use nym_network_requester::{LocalGateway, NRServiceProviderBuilder, RequestFilte
use nym_node::wireguard::types::GatewayClientRegistry;
use nym_statistics_common::collector::StatisticsSender;
use nym_task::{TaskClient, TaskManager};
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::HardcodedTopologyProvider;
use nym_topology_control::accessor::TopologyAccessor;
use nym_topology_control::nym_api_provider::NymApiTopologyProvider;
use nym_topology_control::TopologyRefresher;
use nym_topology_control::TopologyRefresherConfig;
use nym_validator_client::NymApiClient;
use nym_validator_client::{nyxd, DirectSigningHttpRpcNyxdClient};
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::error::Error;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use url::Url;
pub(crate) mod client_handling;
pub(crate) mod helpers;
@@ -131,6 +139,7 @@ pub(crate) struct Gateway<St = PersistentStorage> {
storage: St,
client_registry: Arc<GatewayClientRegistry>,
custom_topology_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
}
impl<St> Gateway<St> {
@@ -149,6 +158,7 @@ impl<St> Gateway<St> {
network_requester_opts,
ip_packet_router_opts,
client_registry: Arc::new(DashMap::new()),
custom_topology_provider: None,
})
}
@@ -169,13 +179,28 @@ impl<St> Gateway<St> {
sphinx_keypair: Arc::new(sphinx_keypair),
storage,
client_registry: Arc::new(DashMap::new()),
custom_topology_provider: None,
}
}
pub fn with_stored_topology<P: AsRef<Path>>(mut self, file: P) -> Result<Self, GatewayError> {
self.custom_topology_provider = Some(Box::new(
HardcodedTopologyProvider::new_from_file(&file).map_err(|source| {
GatewayError::CustomTopologyLoadFailure {
file_path: file.as_ref().to_path_buf(),
source,
}
})?,
));
Ok(self)
}
fn start_mix_socket_listener(
&self,
ack_sender: MixForwardingSender,
active_clients_store: ActiveClientsStore,
topology_access: TopologyAccessor,
api_client: NymApiClient,
shutdown: TaskClient,
) where
St: Storage + Clone + 'static,
@@ -190,6 +215,9 @@ impl<St> Gateway<St> {
self.storage.clone(),
ack_sender,
active_clients_store,
topology_access,
api_client,
Arc::clone(&self.sphinx_keypair),
);
let listening_address = SocketAddr::new(
@@ -243,15 +271,26 @@ impl<St> Gateway<St> {
);
}
fn start_packet_forwarder(&self, shutdown: TaskClient) -> MixForwardingSender {
fn start_packet_forwarder(
&self,
topology_access: TopologyAccessor,
api_client: NymApiClient,
shutdown: TaskClient,
) -> MixForwardingSender {
info!("Starting mix packet forwarder...");
let (mut packet_forwarder, packet_sender) = PacketForwarder::new(
let forwarder_config = nym_mixnet_client::client::Config::new(
self.config.debug.packet_forwarding_initial_backoff,
self.config.debug.packet_forwarding_maximum_backoff,
self.config.debug.initial_connection_timeout,
self.config.debug.maximum_connection_buffer_size,
self.config.debug.use_legacy_framed_packet_version,
);
let (mut packet_forwarder, packet_sender) = PacketForwarder::new(
forwarder_config,
topology_access,
api_client,
Arc::clone(&self.sphinx_keypair),
shutdown,
);
@@ -425,6 +464,50 @@ impl<St> Gateway<St> {
.map_err(Into::into)
}
fn setup_topology_provider(
custom_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
nym_api_urls: Vec<Url>,
) -> Box<dyn TopologyProvider + Send + Sync> {
// if no custom provider was ... provided ..., create one using nym-api
custom_provider.unwrap_or(Box::new(NymApiTopologyProvider::new(
nym_api_urls,
env!("CARGO_PKG_VERSION").to_string(),
)))
}
// future responsible for periodically polling directory server and updating
// the current global view of topology
async fn start_topology_refresher(
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
topology_config: Topology,
topology_accessor: TopologyAccessor,
mut shutdown: TaskClient,
) {
let topology_refresher_config =
TopologyRefresherConfig::new(topology_config.topology_refresh_rate);
let mut topology_refresher = TopologyRefresher::new(
topology_refresher_config,
topology_accessor,
topology_provider,
);
// before returning, block entire runtime to refresh the current network view so that any
// components depending on topology would see a non-empty view
info!("Obtaining initial network topology");
topology_refresher.try_refresh().await;
if topology_config.disable_refreshing {
// if we're not spawning the refresher, don't cause shutdown immediately
info!("The topology refesher is not going to be started");
shutdown.mark_as_success();
} else {
// don't spawn the refresher if we don't want to be refreshing the topology.
// only use the initial values obtained
info!("Starting topology refresher...");
topology_refresher.start_with_shutdown(shutdown);
}
}
async fn check_if_bonded(&self) -> Result<bool, GatewayError> {
// TODO: if anything, this should be getting data directly from the contract
// as opposed to the validator API
@@ -442,7 +525,7 @@ impl<St> Gateway<St> {
}))
}
pub async fn run(self) -> anyhow::Result<()>
pub async fn run(mut self) -> anyhow::Result<()>
where
St: Storage + Clone + 'static,
{
@@ -459,13 +542,35 @@ impl<St> Gateway<St> {
CoconutVerifier::new(nyxd_client).await
}?;
let mix_forwarding_channel =
self.start_packet_forwarder(shutdown.subscribe().named("PacketForwarder"));
let topology_provider = Self::setup_topology_provider(
self.custom_topology_provider.take(),
self.config.get_nym_api_endpoints(),
);
let shared_topology_access = TopologyAccessor::new();
Self::start_topology_refresher(
topology_provider,
self.config.topology,
shared_topology_access.clone(),
shutdown.subscribe(),
)
.await;
let random_api_client = self.random_api_client()?;
let mix_forwarding_channel = self.start_packet_forwarder(
shared_topology_access.clone(),
random_api_client.clone(),
shutdown.subscribe().named("PacketForwarder"),
);
let active_clients_store = ActiveClientsStore::new();
self.start_mix_socket_listener(
mix_forwarding_channel.clone(),
active_clients_store.clone(),
shared_topology_access,
random_api_client,
shutdown.subscribe().named("mixnet_handling::Listener"),
);
+2
View File
@@ -58,9 +58,11 @@ nym-pemstore = { path = "../common/pemstore", version = "0.3.0" }
nym-task = { path = "../common/task" }
nym-types = { path = "../common/types" }
nym-topology = { path = "../common/topology" }
nym-topology-control = { path = "../common/topology-control" }
nym-validator-client = { path = "../common/client-libs/validator-client" }
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
cpu-cycles = { path = "../cpu-cycles", optional = true }
nym-noise = { path = "../common/nymnoise" }
[dev-dependencies]
tokio = { workspace = true, features = [
+9
View File
@@ -11,6 +11,7 @@ use nym_bin_common::output_format::OutputFormat;
use nym_config::helpers::SPECIAL_ADDRESSES;
use nym_validator_client::nyxd;
use std::net::IpAddr;
use std::path::PathBuf;
#[derive(Args, Clone)]
pub(crate) struct Run {
/// Id of the nym-mixnode we want to run
@@ -42,6 +43,10 @@ pub(crate) struct Run {
#[clap(long, alias = "validators", value_delimiter = ',')]
nym_apis: Option<Vec<url::Url>>,
/// Path to .json file containing custom network specification.
#[arg(long, group = "network", hide = true)]
pub custom_mixnet: Option<PathBuf>,
#[clap(short, long, default_value_t = OutputFormat::default())]
output: OutputFormat,
}
@@ -91,6 +96,10 @@ pub(crate) async fn execute(args: &Run) -> anyhow::Result<()> {
Select the correct version and install it to your machine. You will need to provide the following: \n ");
mixnode.print_node_details(args.output);
if let Some(custom_mixnet) = &args.custom_mixnet {
mixnode = mixnode.with_stored_topology(custom_mixnet)?;
}
mixnode.run().await?;
Ok(())
}
+37
View File
@@ -47,6 +47,9 @@ const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_milli
const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500);
const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 2000;
const DEFAULT_TOPOLOGY_REFRESH_RATE: Duration = Duration::from_secs(5 * 60); // every 5min
const DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT: Duration = Duration::from_millis(5_000);
/// Derive default path to mixnodes's config directory.
/// It should get resolved to `$HOME/.nym/mixnodes/<id>/config`
pub fn default_config_directory<P: AsRef<Path>>(id: P) -> PathBuf {
@@ -107,6 +110,9 @@ pub struct Config {
#[serde(default)]
pub debug: Debug,
#[serde(default)]
pub topology: Topology,
}
impl NymConfigTemplate for Config {
@@ -132,6 +138,7 @@ impl Config {
verloc: Default::default(),
logging: Default::default(),
debug: Default::default(),
topology: Default::default(),
}
}
@@ -339,3 +346,33 @@ impl Default for Debug {
}
}
}
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct Topology {
/// The uniform delay every which clients are querying the directory server
/// to try to obtain a compatible network topology to send sphinx packets through.
#[serde(with = "humantime_serde")]
pub topology_refresh_rate: Duration,
/// During topology refresh, test packets are sent through every single possible network
/// path. This timeout determines waiting period until it is decided that the packet
/// did not reach its destination.
#[serde(with = "humantime_serde")]
pub topology_resolution_timeout: Duration,
/// Specifies whether the client should not refresh the network topology after obtaining
/// the first valid instance.
/// Supersedes `topology_refresh_rate_ms`.
pub disable_refreshing: bool,
}
impl Default for Topology {
fn default() -> Self {
Topology {
topology_refresh_rate: DEFAULT_TOPOLOGY_REFRESH_RATE,
topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT,
disable_refreshing: false,
}
}
}
+3
View File
@@ -109,6 +109,9 @@ impl From<ConfigV1_1_32> for Config {
verloc: value.verloc.into(),
logging: value.logging,
debug: value.debug.into(),
// \/ ADDED
topology: Default::default(),
// /\ ADDED
}
}
}
+9
View File
@@ -34,6 +34,15 @@ pub enum MixnodeError {
source: io::Error,
},
#[error(
"failed to load custom topology using path '{}'. detailed message: {source}", file_path.display()
)]
CustomTopologyLoadFailure {
file_path: PathBuf,
#[source]
source: std::io::Error,
},
#[error(
"failed to save config file for id {id} using path '{}'. detailed message: {source}", path.display()
)]
+2
View File
@@ -11,6 +11,7 @@ use nym_bin_common::bin_info_owned;
use nym_crypto::asymmetric::{encryption, identity};
use nym_node::error::NymNodeError;
use nym_node::http::api::api_requests;
use nym_node::http::api::api_requests::v1::node::models::NoiseInformation;
use nym_node::http::api::api_requests::SignedHostInformation;
use nym_task::TaskClient;
@@ -93,6 +94,7 @@ impl<'a> HttpApiBuilder<'a> {
self.sphinx_keypair.public_key(),
self.identity_keypair,
)?,
NoiseInformation { supported: true }, //Now we can enable that
)
.with_mixnode(load_mixnode_details(self.mixnode_config)?)
.with_landing_page_assets(self.mixnode_config.http.landing_page_assets_path.as_ref());
@@ -9,12 +9,17 @@ use crate::node::TaskClient;
use futures::StreamExt;
use log::debug;
use log::{error, info, warn};
use nym_crypto::asymmetric::encryption;
use nym_mixnode_common::measure;
use nym_noise::upgrade_noise_responder_with_topology;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::framing::codec::NymCodec;
use nym_sphinx::framing::packet::FramedNymPacket;
use nym_sphinx::Delay as SphinxDelay;
use nym_topology_control::accessor::TopologyAccessor;
use nym_validator_client::NymApiClient;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::time::Instant;
use tokio_util::codec::Framed;
@@ -28,16 +33,25 @@ pub(crate) mod packet_processing;
pub(crate) struct ConnectionHandler {
packet_processor: PacketProcessor,
delay_forwarding_channel: PacketDelayForwardSender,
topology_access: TopologyAccessor,
api_client: NymApiClient,
local_identity: Arc<encryption::KeyPair>,
}
impl ConnectionHandler {
pub(crate) fn new(
packet_processor: PacketProcessor,
delay_forwarding_channel: PacketDelayForwardSender,
topology_access: TopologyAccessor,
api_client: NymApiClient,
local_identity: Arc<encryption::KeyPair>,
) -> Self {
ConnectionHandler {
packet_processor,
delay_forwarding_channel,
topology_access,
api_client,
local_identity,
}
}
@@ -88,8 +102,40 @@ impl ConnectionHandler {
mut shutdown: TaskClient,
) {
debug!("Starting connection handler for {:?}", remote);
shutdown.mark_as_success();
let mut framed_conn = Framed::new(conn, NymCodec);
let Some(topology) = self.topology_access.current_topology().await else {
error!("Cannot perform Noise handshake to {remote}, due to topology error");
return;
};
let epoch_id = match self.api_client.get_current_epoch_id().await {
Ok(id) => id,
Err(err) => {
error!("Cannot perform Noise handshake to {remote}, due to epoch id error - {err}");
return;
}
};
let noise_stream = match upgrade_noise_responder_with_topology(
conn,
Default::default(),
&topology,
epoch_id,
self.local_identity.public_key(),
self.local_identity.private_key(),
)
.await
{
Ok(noise_stream) => noise_stream,
Err(err) => {
error!("Failed to perform Noise handshake with {remote} - {err}");
return;
}
};
debug!("Noise responder handshake completed for {:?}", remote);
let mut framed_conn = Framed::new(noise_stream, NymCodec);
while !shutdown.is_shutdown() {
tokio::select! {
biased;
@@ -121,10 +167,7 @@ impl ConnectionHandler {
}
}
info!(
"Closing connection from {:?}",
framed_conn.into_inner().peer_addr()
);
info!("Closing connection from {:?}", remote);
log::trace!("ConnectionHandler: Exiting");
}
}
+106 -3
View File
@@ -1,7 +1,7 @@
// Copyright 2020-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::config::Config;
use crate::config::{Config, Topology};
use crate::error::MixnodeError;
use crate::node::helpers::{load_identity_keys, load_sphinx_keys};
use crate::node::http::legacy::verloc::VerlocState;
@@ -18,11 +18,20 @@ use nym_bin_common::version_checker::parse_version;
use nym_crypto::asymmetric::{encryption, identity};
use nym_mixnode_common::verloc::{self, AtomicVerlocResult, VerlocMeasurer};
use nym_task::{TaskClient, TaskManager};
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::HardcodedTopologyProvider;
use nym_topology_control::accessor::TopologyAccessor;
use nym_topology_control::nym_api_provider::NymApiTopologyProvider;
use nym_topology_control::TopologyRefresher;
use nym_topology_control::TopologyRefresherConfig;
use nym_validator_client::NymApiClient;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::net::SocketAddr;
use std::path::Path;
use std::process;
use std::sync::Arc;
use url::Url;
pub(crate) mod helpers;
mod http;
@@ -37,6 +46,7 @@ pub struct MixNode {
descriptor: NodeDescription,
identity_keypair: Arc<identity::KeyPair>,
sphinx_keypair: Arc<encryption::KeyPair>,
custom_topology_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
}
impl MixNode {
@@ -46,6 +56,7 @@ impl MixNode {
identity_keypair: Arc::new(load_identity_keys(&config)?),
sphinx_keypair: Arc::new(load_sphinx_keys(&config)?),
config,
custom_topology_provider: None,
})
}
@@ -68,6 +79,18 @@ impl MixNode {
println!("{}", output.format(&node_details));
}
pub fn with_stored_topology<P: AsRef<Path>>(mut self, file: P) -> Result<Self, MixnodeError> {
self.custom_topology_provider = Some(Box::new(
HardcodedTopologyProvider::new_from_file(&file).map_err(|source| {
MixnodeError::CustomTopologyLoadFailure {
file_path: file.as_ref().to_path_buf(),
source,
}
})?,
));
Ok(self)
}
fn start_http_api(
&self,
atomic_verloc_result: AtomicVerlocResult,
@@ -101,6 +124,8 @@ impl MixNode {
&self,
node_stats_update_sender: node_statistics::UpdateSender,
delay_forwarding_channel: PacketDelayForwardSender,
topology_access: TopologyAccessor,
api_client: NymApiClient,
shutdown: TaskClient,
) {
info!("Starting socket listener...");
@@ -108,7 +133,13 @@ impl MixNode {
let packet_processor =
PacketProcessor::new(self.sphinx_keypair.private_key(), node_stats_update_sender);
let connection_handler = ConnectionHandler::new(packet_processor, delay_forwarding_channel);
let connection_handler = ConnectionHandler::new(
packet_processor,
delay_forwarding_channel,
topology_access,
api_client,
Arc::clone(&self.sphinx_keypair),
);
let listening_address = SocketAddr::new(
self.config.mixnode.listening_address,
@@ -121,6 +152,8 @@ impl MixNode {
fn start_packet_delay_forwarder(
&mut self,
node_stats_update_sender: node_statistics::UpdateSender,
topology_access: TopologyAccessor,
api_client: NymApiClient,
shutdown: TaskClient,
) -> PacketDelayForwardSender {
info!("Starting packet delay-forwarder...");
@@ -134,7 +167,12 @@ impl MixNode {
);
let mut packet_forwarder = DelayForwarder::new(
nym_mixnet_client::Client::new(client_config),
nym_mixnet_client::Client::new(
client_config,
topology_access,
api_client,
Arc::clone(&self.sphinx_keypair),
),
node_stats_update_sender,
shutdown,
);
@@ -185,6 +223,50 @@ impl MixNode {
atomic_verloc_results
}
fn setup_topology_provider(
custom_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
nym_api_urls: Vec<Url>,
) -> Box<dyn TopologyProvider + Send + Sync> {
// if no custom provider was ... provided ..., create one using nym-api
custom_provider.unwrap_or(Box::new(NymApiTopologyProvider::new(
nym_api_urls,
env!("CARGO_PKG_VERSION").to_string(),
)))
}
// future responsible for periodically polling directory server and updating
// the current global view of topology
async fn start_topology_refresher(
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
topology_config: Topology,
topology_accessor: TopologyAccessor,
mut shutdown: TaskClient,
) {
let topology_refresher_config =
TopologyRefresherConfig::new(topology_config.topology_refresh_rate);
let mut topology_refresher = TopologyRefresher::new(
topology_refresher_config,
topology_accessor,
topology_provider,
);
// before returning, block entire runtime to refresh the current network view so that any
// components depending on topology would see a non-empty view
info!("Obtaining initial network topology");
topology_refresher.try_refresh().await;
if topology_config.disable_refreshing {
// if we're not spawning the refresher, don't cause shutdown immediately
info!("The topology refesher is not going to be started");
shutdown.mark_as_success();
} else {
// don't spawn the refresher if we don't want to be refreshing the topology.
// only use the initial values obtained
info!("Starting topology refresher...");
topology_refresher.start_with_shutdown(shutdown);
}
}
fn random_api_client(&self) -> nym_validator_client::NymApiClient {
let endpoints = self.config.get_nym_api_endpoints();
let nym_api = endpoints
@@ -231,13 +313,34 @@ impl MixNode {
let (node_stats_pointer, node_stats_update_sender) = self
.start_node_stats_controller(shutdown.subscribe().named("node_statistics::Controller"));
let topology_provider = Self::setup_topology_provider(
self.custom_topology_provider.take(),
self.config.get_nym_api_endpoints(),
);
let shared_topology_access = TopologyAccessor::new();
Self::start_topology_refresher(
topology_provider,
self.config.topology,
shared_topology_access.clone(),
shutdown.subscribe().named("TopologyRefresher"),
)
.await;
let random_api_client = self.random_api_client();
let delay_forwarding_channel = self.start_packet_delay_forwarder(
node_stats_update_sender.clone(),
shared_topology_access.clone(),
random_api_client.clone(),
shutdown.subscribe().named("DelayForwarder"),
);
self.start_socket_listener(
node_stats_update_sender,
delay_forwarding_channel,
shared_topology_access,
random_api_client,
shutdown.subscribe().named("Listener"),
);
let atomic_verloc_results =
+40 -3
View File
@@ -7,10 +7,12 @@ use nym_mixnet_contract_common::mixnode::MixNodeDetails;
use nym_mixnet_contract_common::reward_params::{Performance, RewardingParams};
use nym_mixnet_contract_common::rewarding::RewardEstimate;
use nym_mixnet_contract_common::{
GatewayBond, IdentityKey, Interval, MixId, MixNode, Percent, RewardedSetNodeStatus,
GatewayBond, IdentityKey, Interval, MixId, MixNode, MixNodeBond, Percent, RewardedSetNodeStatus,
};
use nym_node_requests::api::v1::gateway::models::WebSockets;
use nym_node_requests::api::v1::node::models::{BinaryBuildInformationOwned, HostInformation};
use nym_node_requests::api::v1::node::models::{
BinaryBuildInformationOwned, HostInformation, NoiseInformation,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
@@ -370,7 +372,10 @@ pub struct NymNodeDescription {
pub ip_packet_router: Option<IpPacketRouterDetails>,
// for now we only care about their ws/wss situation, nothing more
pub mixnet_websockets: WebSockets,
#[serde(default)]
pub mixnet_websockets: Option<WebSockets>,
pub noise_information: NoiseInformation,
}
#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema)]
@@ -388,6 +393,38 @@ impl From<GatewayBond> for DescribedGateway {
}
}
#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema)]
pub struct DescribedMixnode {
pub bond: MixNodeBond,
pub self_described: Option<NymNodeDescription>,
}
impl From<MixNodeBond> for DescribedMixnode {
fn from(bond: MixNodeBond) -> Self {
DescribedMixnode {
bond,
self_described: None,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema)]
pub enum DescribedNymNode {
Gateway(DescribedGateway),
Mixnode(DescribedMixnode),
}
impl From<GatewayBond> for DescribedNymNode {
fn from(value: GatewayBond) -> Self {
Self::Gateway(value.into())
}
}
impl From<MixNodeDetails> for DescribedNymNode {
fn from(value: MixNodeDetails) -> Self {
Self::Mixnode(value.bond_information.into())
}
}
#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema)]
pub struct NetworkRequesterDetails {
/// address of the embedded network requester
@@ -0,0 +1,93 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use nym_api_requests::models::NymNodeDescription;
use nym_config::defaults::DEFAULT_HTTP_API_LISTENING_PORT;
use nym_contracts_common::IdentityKey;
use nym_mixnet_contract_common::MixNode;
use nym_node_requests::api::client::NymNodeApiClientExt;
use super::NodeDescribeCacheError;
//this is a copy of try_get_client but for mixnode, to be deleted after smoosh probably
async fn try_get_client(
mixnode: &MixNode,
) -> Result<nym_node_requests::api::Client, NodeDescribeCacheError> {
let mixnode_host = &mixnode.host;
// first try the standard port in case the operator didn't put the node behind the proxy,
// then default https (443)
// finally default http (80)
let addresses_to_try = vec![
format!("http://{mixnode_host}:{DEFAULT_HTTP_API_LISTENING_PORT}"),
format!("https://{mixnode_host}"),
format!("http://{mixnode_host}"),
];
for address in addresses_to_try {
// if provided host was malformed, no point in continuing
let client = match nym_node_requests::api::Client::new_url(address, None) {
Ok(client) => client,
Err(err) => {
return Err(NodeDescribeCacheError::MalformedHost {
host: mixnode_host.clone(),
gateway: mixnode.identity_key.clone(),
source: err,
});
}
};
if let Ok(health) = client.get_health().await {
if health.status.is_up() {
return Ok(client);
}
}
}
Err(NodeDescribeCacheError::NoHttpPortsAvailable {
host: mixnode_host.clone(),
gateway: mixnode.identity_key.clone(),
})
}
pub(crate) async fn get_mixnode_description(
mixnode: MixNode,
) -> Result<(IdentityKey, NymNodeDescription), NodeDescribeCacheError> {
let client = try_get_client(&mixnode).await?;
let host_info =
client
.get_host_information()
.await
.map_err(|err| NodeDescribeCacheError::ApiFailure {
gateway: mixnode.identity_key.clone(),
source: err,
})?;
if !host_info.verify_host_information() {
return Err(NodeDescribeCacheError::MissignedHostInformation {
gateway: mixnode.identity_key,
});
}
let build_info =
client
.get_build_information()
.await
.map_err(|err| NodeDescribeCacheError::ApiFailure {
gateway: mixnode.identity_key.clone(),
source: err,
})?;
let noise_info = client.get_noise_information().await.unwrap_or_default();
let description = NymNodeDescription {
host_information: host_info.data,
build_information: build_info,
network_requester: None,
ip_packet_router: None,
mixnet_websockets: None,
noise_information: noise_info,
};
Ok((mixnode.identity_key, description))
}
+31 -3
View File
@@ -7,6 +7,7 @@ use crate::support::caching::refresher::{CacheItemProvider, CacheRefresher};
use crate::support::config;
use crate::support::config::DEFAULT_NODE_DESCRIBE_BATCH_SIZE;
use futures::{stream, StreamExt};
use mixnode::get_mixnode_description;
use nym_api_requests::models::{
IpPacketRouterDetails, NetworkRequesterDetails, NymNodeDescription,
};
@@ -17,6 +18,8 @@ use nym_node_requests::api::client::{NymNodeApiClientError, NymNodeApiClientExt}
use std::collections::HashMap;
use thiserror::Error;
pub mod mixnode;
// type alias for ease of use
pub type DescribedNodes = HashMap<IdentityKey, NymNodeDescription>;
@@ -180,12 +183,15 @@ async fn get_gateway_description(
None
};
let noise_info = client.get_noise_information().await.unwrap_or_default();
let description = NymNodeDescription {
host_information: host_info.data,
build_information: build_info,
network_requester,
ip_packet_router,
mixnet_websockets: websockets,
mixnet_websockets: Some(websockets),
noise_information: noise_info,
};
Ok((gateway.identity_key, description))
@@ -202,16 +208,17 @@ impl CacheItemProvider for NodeDescriptionProvider {
async fn try_refresh(&self) -> Result<Self::Item, Self::Error> {
let gateways = self.contract_cache.gateways_all().await;
let mixnodes = self.contract_cache.mixnodes_all().await;
// let guard = self.network_gateways.get().await?;
// let gateways = &*guard;
if gateways.is_empty() {
if gateways.is_empty() && mixnodes.is_empty() {
return Ok(HashMap::new());
}
// TODO: somehow bypass the 'higher-ranked lifetime error' and remove that redundant clone
let websockets = stream::iter(
let mut websockets = stream::iter(
gateways
// .deref()
// .clone()
@@ -232,6 +239,27 @@ impl CacheItemProvider for NodeDescriptionProvider {
.collect::<HashMap<_, _>>()
.await;
let mixnodes_websockets = stream::iter(
mixnodes
.into_iter()
.map(|detail| detail.bond_information.mix_node)
.map(get_mixnode_description),
)
.buffer_unordered(self.batch_size)
.filter_map(|res| async move {
match res {
Ok((identity, description)) => Some((identity, description)),
Err(err) => {
debug!("{err}");
None
}
}
})
.collect::<HashMap<_, _>>()
.await;
websockets.extend(mixnodes_websockets);
Ok(websockets)
}
}
+2 -1
View File
@@ -11,6 +11,7 @@ pub(crate) mod routes;
/// Merges the routes with http information and returns it to Rocket for serving
pub(crate) fn nym_node_routes(settings: &OpenApiSettings) -> (Vec<Route>, OpenApi) {
openapi_get_routes_spec![
settings: routes::get_gateways_described
settings: routes::get_gateways_described,
routes::get_nym_nodes_described,
]
}
+57 -1
View File
@@ -4,7 +4,7 @@
use crate::node_describe_cache::DescribedNodes;
use crate::nym_contract_cache::cache::NymContractCache;
use crate::support::caching::cache::SharedCache;
use nym_api_requests::models::DescribedGateway;
use nym_api_requests::models::{DescribedGateway, DescribedMixnode, DescribedNymNode};
use rocket::serde::json::Json;
use rocket::State;
use rocket_okapi::openapi;
@@ -42,3 +42,59 @@ pub async fn get_gateways_described(
.collect(),
)
}
#[openapi(tag = "Nym Nodes")]
#[get("/nym-nodes/described")]
pub async fn get_nym_nodes_described(
contract_cache: &State<NymContractCache>,
describe_cache: &State<SharedCache<DescribedNodes>>,
) -> Json<Vec<DescribedNymNode>> {
let gateways = contract_cache.gateways_all().await;
let mixnodes = contract_cache.mixnodes_all().await;
if gateways.is_empty() && mixnodes.is_empty() {
return Json(Vec::new());
}
// if the self describe cache is unavailable, well, don't attach describe data
let Ok(self_descriptions) = describe_cache.get().await else {
return Json(
gateways
.into_iter()
.map(Into::into)
.chain(mixnodes.into_iter().map(Into::into))
.collect(),
);
};
// TODO: this is extremely inefficient, I'm merely copying existing stuff
// it shouldn't be too much of a problem until we go ahead with directory v3 / the smoosh 2: electric smoosharoo,
// but at that point (I hope) the whole caching situation should get refactored
let gateways_described: Vec<DescribedNymNode> = gateways
.into_iter()
.map(|bond| {
DescribedNymNode::Gateway(DescribedGateway {
self_described: self_descriptions.deref().get(bond.identity()).cloned(),
bond,
})
})
.collect();
let mixnodes_described: Vec<DescribedNymNode> = mixnodes
.into_iter()
.map(|detail| {
DescribedNymNode::Mixnode(DescribedMixnode {
self_described: self_descriptions
.deref()
.get(detail.bond_information.identity())
.cloned(),
bond: detail.bond_information,
})
})
.collect();
Json(
gateways_described
.into_iter()
.chain(mixnodes_described.into_iter())
.collect(),
)
}
+25
View File
@@ -3769,6 +3769,7 @@ dependencies = [
"nym-sphinx",
"nym-task",
"nym-topology",
"nym-topology-control",
"nym-validator-client",
"rand 0.7.3",
"reqwest",
@@ -3872,6 +3873,7 @@ dependencies = [
"nym-sphinx",
"nym-task",
"nym-topology",
"nym-topology-control",
"nym-validator-client",
"pretty_env_logger",
"rand 0.7.3",
@@ -4486,6 +4488,29 @@ dependencies = [
"thiserror",
]
[[package]]
name = "nym-topology-control"
version = "0.1.0"
dependencies = [
"async-trait",
"futures",
"gloo-timers",
"log",
"nym-explorer-client",
"nym-network-defaults",
"nym-sphinx",
"nym-task",
"nym-topology",
"nym-validator-client",
"rand 0.7.3",
"serde",
"tap",
"tokio",
"tokio-stream",
"url",
"wasmtimer",
]
[[package]]
name = "nym-validator-client"
version = "0.1.0"
+1
View File
@@ -61,6 +61,7 @@ nym-socks5-client-core = { path = "../../../common/socks5-client-core" }
nym-sphinx = { path = "../../../common/nymsphinx" }
nym-task = { path = "../../../common/task" }
nym-topology = { path = "../../../common/topology" }
nym-topology-control = { path = "../../../common/topology-control" }
nym-validator-client = { path = "../../../common/client-libs/validator-client" }
[dev-dependencies]
+2 -1
View File
@@ -4,12 +4,13 @@ use nym_client_core::{
client::base_client::storage::{
gateway_details::GatewayDetailsStore, MixnetClientStorage, OnDiskPersistent,
},
config::{GroupBy, TopologyStructure},
config::TopologyStructure,
error::ClientCoreStatusMessage,
};
use nym_socks5_client_core::{NymClient as Socks5NymClient, Socks5ControlMessageSender};
use nym_sphinx::params::PacketSize;
use nym_task::manager::TaskStatus;
use nym_topology_control::geo_aware_provider::GroupBy;
use std::sync::Arc;
use tap::TapFallible;
use tokio::sync::RwLock;
+6 -1
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::api::v1::gateway::models::WebSockets;
use crate::api::v1::node::models::SignedHostInformation;
use crate::api::v1::node::models::{NoiseInformation, SignedHostInformation};
use crate::api::ErrorResponse;
use crate::routes;
use async_trait::async_trait;
@@ -60,6 +60,11 @@ pub trait NymNodeApiClientExt: ApiClient {
.await
}
async fn get_noise_information(&self) -> Result<NoiseInformation, NymNodeApiClientError> {
self.get_json_from(routes::api::v1::noise_info_absolute())
.await
}
async fn post_gateway_register_client(
&self,
client_message: &ClientMessage,
@@ -42,3 +42,9 @@ pub struct HostKeys {
/// Currently it corresponds to either mixnode's or gateway's key.
pub x25519: String,
}
#[derive(Default, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct NoiseInformation {
pub supported: bool,
}
+2
View File
@@ -28,6 +28,7 @@ pub mod routes {
pub const ROLES: &str = "/roles";
pub const BUILD_INFO: &str = "/build-information";
pub const HOST_INFO: &str = "/host-information";
pub const NOISE_INFO: &str = "/noise";
pub const HEALTH: &str = "/health";
pub const GATEWAY: &str = "/gateway";
@@ -45,6 +46,7 @@ pub mod routes {
absolute_route!(mixnode_absolute, v1_absolute(), MIXNODE);
absolute_route!(network_requester_absolute, v1_absolute(), NETWORK_REQUESTER);
absolute_route!(ip_packet_router_absolute, v1_absolute(), IP_PACKET_ROUTER);
absolute_route!(noise_info_absolute, v1_absolute(), NOISE_INFO);
absolute_route!(swagger_absolute, v1_absolute(), SWAGGER);
pub mod gateway {
@@ -3,6 +3,7 @@
use crate::http::api::v1::node::build_information::build_information;
use crate::http::api::v1::node::host_information::host_information;
use crate::http::api::v1::node::noise_information::noise_information;
use crate::http::api::v1::node::roles::roles;
use axum::routing::get;
use axum::Router;
@@ -11,12 +12,14 @@ use nym_node_requests::routes::api::v1;
pub mod build_information;
pub mod host_information;
pub mod noise_information;
pub mod roles;
#[derive(Debug, Clone)]
pub struct Config {
pub build_information: models::BinaryBuildInformationOwned,
pub host_information: models::SignedHostInformation,
pub noise_information: models::NoiseInformation,
pub roles: models::NodeRoles,
}
@@ -43,4 +46,11 @@ pub(super) fn routes<S: Send + Sync + 'static + Clone>(config: Config) -> Router
move |query| host_information(host_info, query)
}),
)
.route(
v1::NOISE_INFO,
get({
let noise_info = config.noise_information;
move |query| noise_information(noise_info, query)
}),
)
}
@@ -0,0 +1,30 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::http::api::{FormattedResponse, OutputParams};
use axum::extract::Query;
use nym_node_requests::api::v1::node::models::NoiseInformation;
/// Returns host information of this node.
#[utoipa::path(
get,
path = "/noise",
context_path = "/api/v1",
tag = "Node",
responses(
(status = 200, content(
("application/json" = NoiseInformation),
("application/yaml" = NoiseInformation)
))
),
params(OutputParams)
)]
pub(crate) async fn noise_information(
host_information: NoiseInformation,
Query(output): Query<OutputParams>,
) -> NoiseInformationResponse {
let output = output.output.unwrap_or_default();
output.to_response(host_information)
}
pub type NoiseInformationResponse = FormattedResponse<NoiseInformation>;
+3 -1
View File
@@ -12,7 +12,7 @@ use nym_node_requests::api::v1::ip_packet_router::models::IpPacketRouter;
use nym_node_requests::api::v1::mixnode::models::Mixnode;
use nym_node_requests::api::v1::network_requester::exit_policy::models::UsedExitPolicy;
use nym_node_requests::api::v1::network_requester::models::NetworkRequester;
use nym_node_requests::api::v1::node::models;
use nym_node_requests::api::v1::node::models::{self, NoiseInformation};
use nym_node_requests::api::SignedHostInformation;
use nym_node_requests::routes;
use std::net::SocketAddr;
@@ -33,6 +33,7 @@ impl Config {
pub fn new(
build_information: models::BinaryBuildInformationOwned,
host_information: SignedHostInformation,
noise_information: NoiseInformation,
) -> Self {
Config {
landing: Default::default(),
@@ -41,6 +42,7 @@ impl Config {
node: api::v1::node::Config {
build_information,
host_information,
noise_information,
roles: Default::default(),
},
gateway: Default::default(),
+137 -1
View File
@@ -14,6 +14,18 @@ def add_mixnode(base_network, base_dir, mix_id):
base_network["mixnodes"][str(mix_id)][0]["layer"] = mix_id
base_network["mixnodes"][str(mix_id)][0]["mix_id"] = mix_id
base_network["mixnodes"][str(mix_id)][0]["owner"] = "whatever"
#described_node
template = mixnode_template()
template["Mixnode"]["bond"]["mix_node"]["identity_key"] = mix_data["identity_key"]
template["Mixnode"]["bond"]["mix_node"]["sphinx_key"] = mix_data["sphinx_key"]
template["Mixnode"]["bond"]["mix_node"]["mix_port"] = mix_data["mix_port"]
template["Mixnode"]["bond"]["mix_node"]["host"] = mix_data["bind_address"]
template["Mixnode"]["bond"]["layer"] = mix_id
template["Mixnode"]["bond"]["mix_id"] = mix_id
template["Mixnode"]["self_described"]["host_information"]["keys"]["ed25519"] = mix_data["identity_key"]
template["Mixnode"]["self_described"]["host_information"]["keys"]["x25519"] = mix_data["sphinx_key"]
base_network["described_nodes"][mix_id] = template
return base_network
@@ -27,6 +39,17 @@ def add_gateway(base_network, base_dir):
# base_network["gateways"][0]["version"] = gateway_data["version"]
base_network["gateways"][0]["host"] = gateway_data["bind_address"]
base_network["gateways"][0]["owner"] = "whatever"
#described_node
template = gateway_template()
template["Gateway"]["bond"]["gateway"]["identity_key"] = gateway_data["identity_key"]
template["Gateway"]["bond"]["gateway"]["sphinx_key"] = gateway_data["sphinx_key"]
template["Gateway"]["bond"]["gateway"]["mix_port"] = gateway_data["mix_port"]
template["Gateway"]["bond"]["gateway"]["clients_port"] = gateway_data["clients_port"]
template["Gateway"]["bond"]["gateway"]["host"] = gateway_data["bind_address"]
template["Gateway"]["self_described"]["host_information"]["keys"]["ed25519"] = gateway_data["identity_key"]
template["Gateway"]["self_described"]["host_information"]["keys"]["x25519"] = gateway_data["sphinx_key"]
base_network["described_nodes"][0] = template
return base_network
@@ -37,7 +60,8 @@ def main(args):
"2": [{}],
"3": [{}],
},
"gateways": [{}]
"gateways": [{}],
"described_nodes":[{}, {}, {}, {}]
}
base_dir = args[0]
@@ -50,5 +74,117 @@ def main(args):
json.dump(base_network, out, indent=2)
def gateway_template():
return {"Gateway": {
"bond": {
"pledge_amount": {
"denom": "unym",
"amount": "0"
},
"owner": "whatever",
"block_height": 0,
"gateway": {
"host": "TO_BE_FILLED",
"mix_port": "TO_BE_FILLED",
"clients_port": "TO_BE_FILLED",
"location": "whatever",
"sphinx_key": "TO_BE_FILLED",
"identity_key": "TO_BE_FILLED",
"version": "whatever",
},
"proxy": None,
},
"self_described": {
"host_information": {
"ip_address": [
"0.0.0.0"
],
"hostname": None,
"keys": {
"ed25519": "TO_BE_FILLED",
"x25519": "TO_BE_FILLED"
}
},
"build_information": {
"binary_name": "whatever",
"build_timestamp": "whatever",
"build_version": "whatever",
"commit_sha": "whatever",
"commit_timestamp": "whatever",
"commit_branch": "whatever",
"rustc_version": "whatever",
"rustc_channel": "whatever",
"cargo_profile": "whatever"
},
"network_requester": {
"address": "none",
"uses_exit_policy": True
},
"mixnet_websockets": {
"ws_port": 9000,
"wss_port": None
},
"noise_information": {
"supported": True
}
}
}}
def mixnode_template():
return {
"Mixnode": {
"bond": {
"mix_id": "TO_BE_FILLED",
"owner": "whatever",
"original_pledge": {
"denom": "unym",
"amount": "0"
},
"layer": "TO_BE_FILLED",
"mix_node": {
"host": "TO_BE_FILLED",
"mix_port": "TO_BE_FILLED",
"verloc_port": 1790,
"http_api_port": 8000,
"sphinx_key": "TO_BE_FILLED",
"identity_key": "TO_BE_FILLED",
"version": "whatever"
},
"proxy": None,
"bonding_height": 0,
"is_unbonding": False
},
"self_described": {
"host_information": {
"ip_address": [
"0.0.0.0"
],
"hostname": None,
"keys": {
"ed25519": "TO_BE_FILLED",
"x25519": "TO_BE_FILLED"
}
},
"build_information": {
"binary_name": "whatever",
"build_timestamp": "whatever",
"build_version": "whatever",
"commit_sha": "whatever",
"commit_timestamp": "whatever",
"commit_branch": "whatever",
"rustc_version": "whatever",
"rustc_channel": "whatever",
"cargo_profile": "whatever"
},
"network_requester": None,
"ip_packet_router": None,
"mixnet_websockets": None,
"noise_information": {
"supported": True
}
}
}
}
if __name__ == '__main__':
main(sys.argv[1:])
+4 -4
View File
@@ -42,10 +42,10 @@ echo "the full network file is located at $networkfile"
echo "starting the mixnet..."
tmux start-server
tmux new-session -d -s localnet -n Mixnet -d "/usr/bin/env sh -c \" cargo run --release --bin nym-mixnode -- run --id mix1-$suffix \""
tmux split-window -t localnet:0 "/usr/bin/env sh -c \" cargo run --release --bin nym-mixnode -- run --id mix2-$suffix \""
tmux split-window -t localnet:0 "/usr/bin/env sh -c \" cargo run --release --bin nym-mixnode -- run --id mix3-$suffix \""
tmux split-window -t localnet:0 "/usr/bin/env sh -c \" cargo run --release --bin nym-gateway -- run --id gateway-$suffix --local \""
tmux new-session -d -s localnet -n Mixnet -d "/usr/bin/env sh -c \" cargo run --release --bin nym-mixnode -- run --id mix1-$suffix --custom-mixnet \"$networkfile\" \""
tmux split-window -t localnet:0 "/usr/bin/env sh -c \" cargo run --release --bin nym-mixnode -- run --id mix2-$suffix --custom-mixnet \"$networkfile\" \""
tmux split-window -t localnet:0 "/usr/bin/env sh -c \" cargo run --release --bin nym-mixnode -- run --id mix3-$suffix --custom-mixnet \"$networkfile\" \""
tmux split-window -t localnet:0 "/usr/bin/env sh -c \" cargo run --release --bin nym-gateway -- run --id gateway-$suffix --local --custom-mixnet \"$networkfile\" \""
while ! nc -z localhost 9000; do
echo "waiting for nym-gateway to launch on port 9000..."
+1
View File
@@ -20,6 +20,7 @@ nym-network-defaults = { path = "../../../common/network-defaults" }
nym-sphinx = { path = "../../../common/nymsphinx" }
nym-task = { path = "../../../common/task" }
nym-topology = { path = "../../../common/topology" }
nym-topology-control = { path = "../../../common/topology-control" }
nym-socks5-client-core = { path = "../../../common/socks5-client-core" }
nym-validator-client = { path = "../../../common/client-libs/validator-client", features = ["http-client"] }
nym-socks5-requests = { path = "../../../common/socks5/requests" }
@@ -23,7 +23,7 @@ async fn main() {
mix_id: 63,
owner: "n1k52k5n45cqt5qpjh8tcwmgqm0wkt355yy0g5vu".to_string(),
host: "172.105.92.48".parse().unwrap(),
mix_host: "172.105.92.48:1789".parse().unwrap(),
mix_hosts: vec!["172.105.92.48:1789".parse().unwrap()],
identity_key: "GLdR2NRVZBiCoCbv4fNqt9wUJZAnNjGXHkx3TjVAUzrK"
.parse()
.unwrap(),
@@ -40,7 +40,7 @@ async fn main() {
mix_id: 23,
owner: "n1fzv4jc7fanl9s0qj02ge2ezk3kts545kjtek47".to_string(),
host: "178.79.143.65".parse().unwrap(),
mix_host: "178.79.143.65:1789".parse().unwrap(),
mix_hosts: vec!["178.79.143.65:1789".parse().unwrap()],
identity_key: "4Yr4qmEHd9sgsuQ83191FR2hD88RfsbMmB4tzhhZWriz"
.parse()
.unwrap(),
@@ -57,7 +57,7 @@ async fn main() {
mix_id: 66,
owner: "n1ae2pjd7q9p0dea65pqkvcm4x9s264v4fktpyru".to_string(),
host: "139.162.247.97".parse().unwrap(),
mix_host: "139.162.247.97:1789".parse().unwrap(),
mix_hosts: vec!["139.162.247.97:1789".parse().unwrap()],
identity_key: "66UngapebhJRni3Nj52EW1qcNsWYiuonjkWJzHFsmyYY"
.parse()
.unwrap(),
+4 -2
View File
@@ -54,9 +54,8 @@ pub use nym_client_core::{
fs_backend::Backend as ReplyStorage, CombinedReplyStorage, Empty as EmptyReplyStorage,
ReplyStorageBackend,
},
topology_control::geo_aware_provider::{CountryGroup, GeoAwareTopologyProvider},
},
config::{GatewayEndpointConfig, GroupBy},
config::GatewayEndpointConfig,
};
pub use nym_credential_storage::{
ephemeral_storage::EphemeralStorage as EphemeralCredentialStorage,
@@ -73,6 +72,9 @@ pub use nym_sphinx::{
receiver::ReconstructedMessage,
};
pub use nym_topology::{provider_trait::TopologyProvider, NymTopology};
pub use nym_topology_control::geo_aware_provider::{
CountryGroup, GeoAwareTopologyProvider, GroupBy,
};
pub use paths::StoragePaths;
pub use socks5_client::Socks5MixnetClient;
pub use traits::MixnetMessageSender;