Compare commits

...

63 Commits

Author SHA1 Message Date
Simon Wicky d41e84e3cf fixed wireguard handshake 2023-09-06 11:27:14 +02:00
Simon Wicky 2c70801bcc timestamp as payload 2023-09-06 09:49:12 +02:00
Simon Wicky 13b0a236d7 tentative snow modifications 2023-09-05 13:26:02 +02:00
Simon Wicky 60c36a3c62 wireguard shenanigans 2023-09-04 15:21:04 +02:00
Simon Wicky 831b739e95 wip wireguard handshake 2023-09-04 10:24:09 +02:00
Simon Wicky 8c17017749 Merge commit '8f0c427734667f870d029850866e2a0c7a979000' into simon/noise_wg 2023-09-01 13:50:57 +02:00
Simon Wicky 74c527383d remove timeout from noise 2023-08-30 14:47:51 +02:00
durch 8f0c427734 Dummy WG implementation - cleaned up 2023-08-30 12:47:48 +02:00
Simon Wicky d7a290a9c9 correct some patterns 2023-08-29 16:54:14 +02:00
Simon Wicky 0f0f7d43c4 fix psk_position 2023-08-29 13:34:11 +02:00
Simon Wicky a3dc9ceea0 tidy noise mode 2023-08-29 13:16:48 +02:00
Simon Wicky e6f5d2b1c4 remove not working empty pub key 2023-08-29 12:05:09 +02:00
Simon Wicky 59085f8c2d noise mode as enum 2023-08-29 11:31:39 +02:00
Simon Wicky 113f6ac201 building blocks for configurable noise pattern 2023-08-28 15:13:35 +02:00
Simon Wicky 2c8544e36a prevent gateway crash if noise handshake fails 2023-08-28 15:10:36 +02:00
Simon Wicky b9a91399f8 comment about 5sec timeout 2023-08-28 13:41:35 +02:00
Simon Wicky 88f18b7a4c merge conflict changes 2023-08-28 10:35:18 +02:00
Simon Wicky d919e5199a Merge commit '22246d0d5d4f91ad474ae69e635802431ef67337' into simon/noise 2023-08-28 10:35:05 +02:00
Simon Wicky 25b9feef5f add couple of reviewer's suggestion 2023-08-23 15:44:46 +02:00
Simon Wicky db112eaa45 fix dependencies 2023-08-23 15:02:29 +02:00
Simon Wicky da4f9bb3b2 fix clippy warning 2023-08-23 15:02:18 +02:00
Simon Wicky d710242566 buffer size change 2023-08-10 13:34:09 +02:00
Simon Wicky 815920c9f1 fix import error for wasm clients 2023-08-10 13:27:43 +02:00
Simon Wicky 50a45eab5a remove epoch id caching 2023-08-09 08:59:09 +02:00
Simon Wicky 2a47420e0b cleanup 2023-08-08 16:54:24 +02:00
Simon Wicky b5a2b79eeb tentative bgufix 2023-08-08 16:13:18 +02:00
Simon Wicky d00d4bd73e add timeout to handshake 2023-08-08 10:05:38 +02:00
Simon Wicky 33ea1501d0 filter_node_versions keeps the epoch 2023-08-08 09:57:11 +02:00
Simon Wicky aa48d18753 tentative stabilisation of Noise handshake 2023-08-07 14:58:29 +02:00
Simon Wicky 4bfe533b51 allow emtpy topology to be served 2023-08-04 15:08:01 +02:00
Simon Wicky abaa64a54b add all nodes in topology 2023-08-04 14:17:38 +02:00
Simon Wicky 7ca4c3b7b1 query nym-api from the nym-api 2023-08-03 14:50:35 +02:00
Simon Wicky 0d81c7d765 epoch in client-gateway, nym-api does not compile 2023-08-03 11:00:53 +02:00
Simon Wicky 6e3a84b759 adapt client to new noise signature 2023-08-03 10:43:48 +02:00
Simon Wicky ba96744dd8 epoch in psk for nodes 2023-08-03 10:38:49 +02:00
Simon Wicky c28410f6de allow multiple messages decryption 2023-08-02 13:59:06 +02:00
Simon Wicky 203633cabf noise between client and gateway, first try 2023-07-28 14:33:01 +02:00
Simon Wicky 331a0328c7 remove absolute dependence on topology 2023-07-28 12:09:13 +02:00
Simon Wicky 9e3bb6ef24 error handling on try_read + EOF 2023-07-28 09:25:44 +02:00
Simon Wicky 4014467496 revamp poll_read for Noise Stream 2023-07-27 12:25:17 +02:00
Simon Wicky 3cd17be26f error handling 2023-07-27 10:56:35 +02:00
Simon Wicky 9d91145f0a somewhat working stream 2023-07-26 16:18:40 +02:00
Simon Wicky 83d0dfc657 bit of cleanup 2023-07-26 15:47:43 +02:00
Simon Wicky f91a22cb6a store extra bytes in storage 2023-07-26 14:49:09 +02:00
Simon Wicky 981f567131 first try at noisestream 2023-07-26 14:44:56 +02:00
durch 7dfc396f4f Each packet to its own thread 2023-07-25 17:18:06 +02:00
durch 2bf44db72f Tun arc and mutex 2023-07-25 16:46:24 +02:00
Simon Wicky c58331f9b0 change to sphinx key initiator side 2023-07-25 09:52:04 +02:00
Simon Wicky 5d26fefaaa hash psk 2023-07-25 09:23:51 +02:00
Simon Wicky dc77e3f962 swap id key for sphinx key 2023-07-24 16:54:08 +02:00
Simon Wicky eca406d9a7 more debugging... 2023-07-24 16:26:32 +02:00
Simon Wicky 139bde0176 more debugging 2023-07-24 16:19:31 +02:00
Simon Wicky a4c0be13f8 continue debugging 2023-07-24 15:16:32 +02:00
Simon Wicky f680222b91 add debug info 2023-07-24 11:48:18 +02:00
Simon Wicky 27b399331b debug info 2023-07-24 11:27:36 +02:00
Simon Wicky 3228bb5aa0 swap misplaced arguments for upgrade responder 2023-07-24 11:27:26 +02:00
Simon Wicky 8e4516a0a8 find node by ip not by full ip,port addr 2023-07-24 10:44:49 +02:00
Simon Wicky 234656aba8 draft a full noise handshake, with dummy secret 2023-07-24 10:18:22 +02:00
Simon Wicky 15f43c705a bring what's needed for secret, but epoch 2023-07-21 16:37:52 +02:00
Simon Wicky 044251d60f bring private key to the noise handshake. still does nothing though 2023-07-21 14:41:31 +02:00
Simon Wicky 5424568e81 add NoiseStream decorator, does nothing yet 2023-07-21 12:02:03 +02:00
Simon Wicky b939978c28 add topology refresher to gateways and mixnodes 2023-07-21 09:20:23 +02:00
durch ebfecba933 Wireguard POC 2023-06-27 11:45:25 +02:00
51 changed files with 3180 additions and 783 deletions
+2 -1
View File
@@ -43,4 +43,5 @@ envs/qwerty.env
.parcel-cache
**/.DS_Store
cpu-cycles/libcpucycles/build
foxyfox.env
foxyfox.env
gateway/deploy.sh
Generated
+1463 -723
View File
File diff suppressed because it is too large Load Diff
@@ -45,6 +45,9 @@ use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender,
use nym_task::{TaskClient, TaskManager};
use nym_topology::provider_trait::TopologyProvider;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use nym_validator_client::NymApiClient;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::sync::Arc;
use tap::TapFallible;
use url::Url;
@@ -289,6 +292,7 @@ where
bandwidth_controller: Option<BandwidthController<C, S::CredentialStore>>,
mixnet_message_sender: MixnetMessageSender,
ack_sender: AcknowledgementSender,
api_client: NymApiClient,
shutdown: TaskClient,
) -> Result<GatewayClient<C, S::CredentialStore>, ClientCoreError>
where
@@ -311,20 +315,26 @@ where
let gateway_address = gateway_config.gateway_listener.clone();
let gateway_id = gateway_config.gateway_id;
let gateway_sphinx = gateway_config.gateway_sphinx;
// TODO: in theory, at this point, this should be infallible
let gateway_identity = identity::PublicKey::from_base58_string(gateway_id)
.map_err(ClientCoreError::UnableToCreatePublicKeyFromGatewayId)?;
let gateway_sphinx_key = encryption::PublicKey::from_base58_string(gateway_sphinx)
.map_err(ClientCoreError::UnableToCreateSphinxKeyFromGatewayId)?;
GatewayClient::new(
gateway_address,
managed_keys.identity_keypair(),
managed_keys.encryption_keypair(),
gateway_identity,
gateway_sphinx_key,
Some(managed_keys.must_get_gateway_shared_key()),
mixnet_message_sender,
ack_sender,
config.debug.gateway_connection.gateway_response_timeout,
bandwidth_controller,
api_client,
shutdown,
)
};
@@ -343,6 +353,15 @@ where
Ok(gateway_client)
}
fn random_api_client(&self) -> nym_validator_client::NymApiClient {
let endpoints = self.config.get_nym_api_endpoints();
let nym_api = endpoints
.choose(&mut thread_rng())
.expect("The list of validator apis is empty");
nym_validator_client::NymApiClient::new(nym_api.clone())
}
fn setup_topology_provider(
custom_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
provider_from_config: config::TopologyStructure,
@@ -481,6 +500,7 @@ where
{
info!("Starting nym client");
let random_api_client = self.random_api_client();
// derive (or load) client keys and gateway configuration
let init_res = Self::initialise_keys_and_gateway(
self.setup_method,
@@ -535,6 +555,7 @@ where
bandwidth_controller,
mixnet_messages_sender,
ack_sender,
random_api_client,
task_manager.subscribe(),
)
.await?;
@@ -83,6 +83,16 @@ impl<'a> TopologyReadPermit<'a> {
Ok(topology)
}
pub fn try_get_raw_topology_ref(&'a self) -> Result<&'a NymTopology, NymTopologyError> {
// 1. Have we managed to get anything from the refresher, i.e. have the nym-api queries gone through?
let topology = self
.permit
.as_ref()
.ok_or(NymTopologyError::EmptyNetworkTopology)?;
Ok(topology)
}
}
impl<'a> From<RwLockReadGuard<'a, Option<NymTopology>>> for TopologyReadPermit<'a> {
@@ -9,9 +9,9 @@ use nym_topology::provider_trait::TopologyProvider;
use nym_topology::NymTopologyError;
use std::time::Duration;
mod accessor;
pub mod accessor;
pub mod geo_aware_provider;
pub(crate) mod nym_api_provider;
pub mod nym_api_provider;
// TODO: move it to config later
const MAX_FAILURE_COUNT: usize = 10;
@@ -9,7 +9,7 @@ use rand::prelude::SliceRandom;
use rand::thread_rng;
use url::Url;
pub(crate) struct NymApiTopologyProvider {
pub struct NymApiTopologyProvider {
validator_client: nym_validator_client::client::NymApiClient,
nym_api_urls: Vec<Url>,
@@ -18,7 +18,7 @@ pub(crate) struct NymApiTopologyProvider {
}
impl NymApiTopologyProvider {
pub(crate) fn new(mut nym_api_urls: Vec<Url>, client_version: String) -> Self {
pub fn new(mut nym_api_urls: Vec<Url>, client_version: String) -> Self {
nym_api_urls.shuffle(&mut thread_rng());
NymApiTopologyProvider {
@@ -77,13 +77,34 @@ impl NymApiTopologyProvider {
Ok(gateways) => gateways,
};
let all_mixes = match self.validator_client.get_all_mixnodes().await {
Err(err) => {
error!("failed to get all mixes - {err}");
return None;
}
Ok(epoch) => epoch,
};
let all_gateways = match self.validator_client.get_all_gateways().await {
Err(err) => {
error!("failed to get all gateways - {err}");
return None;
}
Ok(epoch) => epoch,
};
let topology = nym_topology_from_detailed(mixnodes, gateways)
.with_all_mixes(all_mixes.clone())
.with_all_gateways(all_gateways.clone())
.filter_system_version(&self.client_version);
if let Err(err) = self.check_layer_distribution(&topology) {
warn!("The current filtered active topology has extremely skewed layer distribution. It cannot be used: {err}");
self.use_next_nym_api();
None
let empty_topology = NymTopology::empty()
.with_all_mixes(all_mixes)
.with_all_gateways(all_gateways);
Some(empty_topology)
} else {
Some(topology)
}
+11 -1
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use nym_config::defaults::NymNetworkDetails;
use nym_crypto::asymmetric::identity;
use nym_crypto::asymmetric::{encryption, identity};
use nym_sphinx::params::{PacketSize, PacketType};
use serde::{Deserialize, Serialize};
use std::time::Duration;
@@ -216,6 +216,8 @@ pub struct GatewayEndpointConfig {
/// If initially omitted, a random gateway will be chosen from the available topology.
pub gateway_id: String,
pub gateway_sphinx: String,
/// Address of the gateway owner to which the client should send messages.
pub gateway_owner: String,
@@ -228,11 +230,13 @@ impl GatewayEndpointConfig {
#[cfg_attr(target_arch = "wasm32", wasm_bindgen(constructor))]
pub fn new(
gateway_id: String,
gateway_sphinx: String,
gateway_owner: String,
gateway_listener: String,
) -> GatewayEndpointConfig {
GatewayEndpointConfig {
gateway_id,
gateway_sphinx,
gateway_owner,
gateway_listener,
}
@@ -245,6 +249,11 @@ impl GatewayEndpointConfig {
identity::PublicKey::from_base58_string(&self.gateway_id)
.map_err(ClientCoreError::UnableToCreatePublicKeyFromGatewayId)
}
pub fn try_get_gateway_sphinx_key(&self) -> Result<encryption::PublicKey, ClientCoreError> {
encryption::PublicKey::from_base58_string(&self.gateway_sphinx)
.map_err(ClientCoreError::UnableToCreateSphinxKeyFromGatewayId)
}
}
impl From<nym_topology::gateway::Node> for GatewayEndpointConfig {
@@ -252,6 +261,7 @@ impl From<nym_topology::gateway::Node> for GatewayEndpointConfig {
let gateway_listener = node.clients_address();
GatewayEndpointConfig {
gateway_id: node.identity_key.to_base58_string(),
gateway_sphinx: node.sphinx_key.to_base58_string(),
gateway_owner: node.owner,
gateway_listener,
}
@@ -68,6 +68,7 @@ pub struct ConfigV1_1_20<T> {
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Eq, Serialize)]
pub struct GatewayEndpointConfigV1_1_20 {
pub gateway_id: String,
pub gateway_sphinx: String,
pub gateway_owner: String,
pub gateway_listener: String,
}
@@ -76,6 +77,7 @@ impl From<GatewayEndpointConfigV1_1_20> for GatewayEndpointConfigV1_1_20_2 {
fn from(value: GatewayEndpointConfigV1_1_20) -> Self {
GatewayEndpointConfigV1_1_20_2 {
gateway_id: value.gateway_id,
gateway_sphinx: value.gateway_sphinx,
gateway_owner: value.gateway_owner,
gateway_listener: value.gateway_listener,
}
@@ -73,6 +73,8 @@ pub struct GatewayEndpointConfigV1_1_20_2 {
/// If initially omitted, a random gateway will be chosen from the available topology.
pub gateway_id: String,
pub gateway_sphinx: String,
/// Address of the gateway owner to which the client should send messages.
pub gateway_owner: String,
@@ -84,6 +86,7 @@ impl From<GatewayEndpointConfigV1_1_20_2> for GatewayEndpointConfig {
fn from(value: GatewayEndpointConfigV1_1_20_2) -> Self {
GatewayEndpointConfig {
gateway_id: value.gateway_id,
gateway_sphinx: value.gateway_sphinx,
gateway_owner: value.gateway_owner,
gateway_listener: value.gateway_listener,
}
+4
View File
@@ -1,6 +1,7 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_crypto::asymmetric::encryption::KeyRecoveryError;
use nym_crypto::asymmetric::identity::Ed25519RecoveryError;
use nym_gateway_client::error::GatewayClientError;
use nym_topology::gateway::GatewayConversionError;
@@ -58,6 +59,9 @@ pub enum ClientCoreError {
#[error("The gateway id is invalid - {0}")]
UnableToCreatePublicKeyFromGatewayId(Ed25519RecoveryError),
#[error("The gateway sphinx is invalid - {0}")]
UnableToCreateSphinxKeyFromGatewayId(KeyRecoveryError),
#[error("The identity of the gateway is unknown - did you run init?")]
GatewayIdUnknown,
+7 -1
View File
@@ -6,9 +6,10 @@ use crate::error::ClientCoreError;
use crate::init::RegistrationResult;
use futures::{SinkExt, StreamExt};
use log::{debug, info, trace, warn};
use nym_crypto::asymmetric::identity;
use nym_crypto::asymmetric::{encryption, identity};
use nym_gateway_client::GatewayClient;
use nym_topology::{filter::VersionFilterable, gateway};
use nym_validator_client::NymApiClient;
use rand::{seq::SliceRandom, Rng};
use std::{sync::Arc, time::Duration};
use tap::TapFallible;
@@ -201,13 +202,18 @@ pub(super) fn uniformly_random_gateway<R: Rng>(
pub(super) async fn register_with_gateway(
gateway: &GatewayEndpointConfig,
our_identity: Arc<identity::KeyPair>,
our_sphinx: Arc<encryption::KeyPair>,
nym_api_client: NymApiClient,
) -> Result<RegistrationResult, ClientCoreError> {
let timeout = Duration::from_millis(1500);
let mut gateway_client = GatewayClient::new_init(
gateway.gateway_listener.clone(),
gateway.try_get_gateway_identity_key()?,
gateway.try_get_gateway_sphinx_key()?,
our_identity.clone(),
our_sphinx.clone(),
timeout,
nym_api_client,
);
gateway_client
.establish_connection()
+14 -1
View File
@@ -20,7 +20,9 @@ use nym_gateway_requests::registration::handshake::SharedKeys;
use nym_sphinx::addressing::{clients::Recipient, nodes::NodeIdentity};
use nym_topology::gateway;
use nym_validator_client::client::IdentityKey;
use nym_validator_client::NymApiClient;
use rand::rngs::OsRng;
use rand::seq::SliceRandom;
use serde::Serialize;
use std::fmt::{Debug, Display};
use std::sync::Arc;
@@ -300,6 +302,7 @@ pub async fn setup_gateway_from<K, D>(
details_store: &D,
overwrite_data: bool,
gateways: Option<&[gateway::Node]>,
nym_api_client: NymApiClient,
) -> Result<InitialisationResult, ClientCoreError>
where
K: KeyStore,
@@ -417,9 +420,12 @@ where
// get our identity key
let our_identity = managed_keys.identity_keypair();
let our_sphinx = managed_keys.encryption_keypair();
// Establish connection, authenticate and generate keys for talking with the gateway
let registration_result =
helpers::register_with_gateway(&gateway_details, our_identity).await?;
helpers::register_with_gateway(&gateway_details, our_identity, our_sphinx, nym_api_client)
.await?;
let shared_keys = registration_result.shared_keys;
let persisted_details = PersistedGatewayDetails::new(gateway_details, &shared_keys);
@@ -457,12 +463,19 @@ where
let mut rng = OsRng;
let gateways = current_gateways(&mut rng, validator_servers.unwrap_or_default()).await?;
let nym_api = validator_servers
.unwrap_or_default()
.choose(&mut rng)
.ok_or(ClientCoreError::ListOfNymApisIsEmpty)?;
let client = nym_validator_client::client::NymApiClient::new(nym_api.clone());
setup_gateway_from(
setup,
key_store,
details_store,
overwrite_data,
Some(&gateways),
client,
)
.await
}
@@ -35,6 +35,9 @@ version = "0.13"
default-features = false
# non-wasm-only dependencies
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.nym-noise]
path = "../../nymnoise"
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio]
version = "1.24.1"
features = ["macros", "rt", "net", "sync", "time"]
@@ -14,7 +14,7 @@ use nym_bandwidth_controller::BandwidthController;
use nym_coconut_interface::Credential;
use nym_credential_storage::ephemeral_storage::EphemeralStorage as EphemeralCredentialStorage;
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_crypto::asymmetric::identity;
use nym_crypto::asymmetric::{encryption, identity};
use nym_gateway_requests::authentication::encrypted_address::EncryptedAddressBytes;
use nym_gateway_requests::iv::IV;
use nym_gateway_requests::registration::handshake::{client_handshake, SharedKeys};
@@ -23,16 +23,23 @@ use nym_network_defaults::{REMAINING_BANDWIDTH_THRESHOLD, TOKENS_TO_BURN};
use nym_sphinx::forwarding::packet::MixPacket;
use nym_task::TaskClient;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use nym_validator_client::NymApiClient;
use rand::rngs::OsRng;
use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;
use tungstenite::protocol::Message;
#[cfg(not(target_arch = "wasm32"))]
use nym_noise::upgrade_noise_initiator;
#[cfg(not(target_arch = "wasm32"))]
use std::net::SocketAddr;
#[cfg(not(target_arch = "wasm32"))]
use tokio::net::TcpStream;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::connect_async;
use tokio_tungstenite::client_async;
#[cfg(target_arch = "wasm32")]
use wasm_utils::websocket::JSWebsocket;
@@ -48,12 +55,15 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {
bandwidth_remaining: i64,
gateway_address: String,
gateway_identity: identity::PublicKey,
gateway_sphinx: encryption::PublicKey,
local_identity: Arc<identity::KeyPair>,
local_sphinx: Arc<encryption::KeyPair>,
shared_key: Option<Arc<SharedKeys>>,
connection: SocketState,
packet_router: PacketRouter,
response_timeout_duration: Duration,
bandwidth_controller: Option<BandwidthController<C, St>>,
nym_api_client: NymApiClient,
// reconnection related variables
/// Specifies whether client should try to reconnect to gateway on connection failure.
@@ -74,13 +84,16 @@ impl<C, St> GatewayClient<C, St> {
pub fn new(
gateway_address: String,
local_identity: Arc<identity::KeyPair>,
local_sphinx: Arc<encryption::KeyPair>,
gateway_identity: identity::PublicKey,
gateway_sphinx: encryption::PublicKey,
// TODO: make it mandatory. if you don't want to pass it, use `new_init`
shared_key: Option<Arc<SharedKeys>>,
mixnet_message_sender: MixnetMessageSender,
ack_sender: AcknowledgementSender,
response_timeout_duration: Duration,
bandwidth_controller: Option<BandwidthController<C, St>>,
nym_api_client: NymApiClient,
shutdown: TaskClient,
) -> Self {
GatewayClient {
@@ -88,13 +101,16 @@ impl<C, St> GatewayClient<C, St> {
disabled_credentials_mode: true,
bandwidth_remaining: 0,
gateway_address,
gateway_identity,
local_identity,
local_sphinx,
gateway_identity,
gateway_sphinx,
shared_key,
connection: SocketState::NotConnected,
packet_router: PacketRouter::new(ack_sender, mixnet_message_sender, shutdown.clone()),
response_timeout_duration,
bandwidth_controller,
nym_api_client,
should_reconnect_on_failure: true,
reconnection_attempts: DEFAULT_RECONNECTION_ATTEMPTS,
reconnection_backoff: DEFAULT_RECONNECTION_BACKOFF,
@@ -162,7 +178,60 @@ impl<C, St> GatewayClient<C, St> {
#[cfg(not(target_arch = "wasm32"))]
pub async fn establish_connection(&mut self) -> Result<(), GatewayClientError> {
let ws_stream = match connect_async(&self.gateway_address).await {
let socket_addr: SocketAddr = self.gateway_address.parse().unwrap();
let connection_fut = TcpStream::connect(socket_addr);
//arbitrary TO, it's a POC
let noise_conn = match tokio::time::timeout(Duration::from_secs(5), connection_fut).await {
Ok(stream_res) => match stream_res {
Ok(stream) => {
debug!("Managed to establish connection to gateway");
let current_epoch_id = match self.nym_api_client.get_current_epoch_id().await {
Ok(epoch_id) => epoch_id,
Err(err) => {
error!("Failed to retrieve epoch Id for Noise handshake - {err}");
return Err(GatewayClientError::ConnectionNotEstablished);
}
};
let noise_stream = match upgrade_noise_initiator(
stream,
Default::default(),
None, //as a client, the gateway cannot know my pub key
&self.local_sphinx.private_key().to_bytes(),
&self.gateway_sphinx.to_bytes(),
current_epoch_id,
)
.await
{
Ok(noise_stream) => noise_stream,
Err(err) => {
error!(
"Failed to perform Noise handshake with {:?} - {err}",
self.gateway_address
);
return Err(GatewayClientError::ConnectionNotEstablished);
}
};
debug!(
"Noise initiator handshake completed for {:?}",
self.gateway_address
);
noise_stream
}
Err(err) => {
debug!("failed to establish connection to gateway (err: {})", err);
return Err(GatewayClientError::NetworkIoError(err));
}
},
Err(_) => {
debug!("failed to connect to {} within 5s", self.gateway_address);
return Err(GatewayClientError::Timeout);
}
};
let ws_address = format!("ws://{}", self.gateway_address);
let ws_stream = match client_async(ws_address, noise_conn).await {
Ok((ws_stream, _)) => ws_stream,
Err(e) => return Err(GatewayClientError::NetworkError(e)),
};
@@ -766,8 +835,11 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
pub fn new_init(
gateway_address: String,
gateway_identity: identity::PublicKey,
gateway_sphinx: encryption::PublicKey,
local_identity: Arc<identity::KeyPair>,
local_sphinx: Arc<encryption::KeyPair>,
response_timeout_duration: Duration,
nym_api_client: NymApiClient,
) -> Self {
use futures::channel::mpsc;
@@ -784,12 +856,15 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
bandwidth_remaining: 0,
gateway_address,
gateway_identity,
gateway_sphinx,
local_identity,
local_sphinx,
shared_key: None,
connection: SocketState::NotConnected,
packet_router,
response_timeout_duration,
bandwidth_controller: None,
nym_api_client,
should_reconnect_on_failure: false,
reconnection_attempts: DEFAULT_RECONNECTION_ATTEMPTS,
reconnection_backoff: DEFAULT_RECONNECTION_BACKOFF,
@@ -817,12 +892,15 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
bandwidth_remaining: self.bandwidth_remaining,
gateway_address: self.gateway_address,
gateway_identity: self.gateway_identity,
gateway_sphinx: self.gateway_sphinx,
local_sphinx: self.local_sphinx,
local_identity: self.local_identity,
shared_key: self.shared_key,
connection: self.connection,
packet_router: PacketRouter::new(ack_sender, mixnet_message_sender, shutdown.clone()),
response_timeout_duration,
bandwidth_controller,
nym_api_client: self.nym_api_client,
should_reconnect_on_failure: self.should_reconnect_on_failure,
reconnection_attempts: self.reconnection_attempts,
reconnection_backoff: self.reconnection_backoff,
@@ -19,6 +19,9 @@ pub enum GatewayClientError {
#[error("There was a network error - {0}")]
NetworkError(#[from] WsError),
#[error("There was a network error - {0}")]
NetworkIoError(#[from] io::Error),
// TODO: see if `JsValue` is a reasonable type for this
#[cfg(target_arch = "wasm32")]
#[error("There was a network error")]
@@ -14,9 +14,9 @@ use std::sync::Arc;
use tungstenite::Message;
#[cfg(not(target_arch = "wasm32"))]
use tokio::net::TcpStream;
use nym_noise::NoiseStream;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tokio_tungstenite::WebSocketStream;
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_futures;
@@ -26,7 +26,7 @@ use wasm_utils::websocket::JSWebsocket;
// type alias for not having to type the whole thing every single time (and now it makes it easier
// to use different types based on compilation target)
#[cfg(not(target_arch = "wasm32"))]
type WsConn = WebSocketStream<MaybeTlsStream<TcpStream>>;
type WsConn = WebSocketStream<NoiseStream>;
#[cfg(target_arch = "wasm32")]
type WsConn = JSWebsocket;
@@ -15,3 +15,7 @@ tokio-util = { version = "0.7.4", features = ["codec"] }
# internal
nym-sphinx = { path = "../../nymsphinx" }
nym-task = { path = "../../task" }
nym-client-core = { path = "../../client-core" }
nym-noise = { path = "../../nymnoise"}
nym-crypto = { path = "../../crypto" }
nym-validator-client = { path = "../validator-client"}
+74 -9
View File
@@ -4,11 +4,15 @@
use futures::channel::mpsc;
use futures::StreamExt;
use log::*;
use nym_client_core::client::topology_control::accessor::TopologyAccessor;
use nym_crypto::asymmetric::encryption;
use nym_noise::upgrade_noise_initiator_with_topology;
use nym_sphinx::addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx::framing::codec::NymCodec;
use nym_sphinx::framing::packet::FramedNymPacket;
use nym_sphinx::params::PacketType;
use nym_sphinx::NymPacket;
use nym_validator_client::NymApiClient;
use std::collections::HashMap;
use std::io;
use std::net::SocketAddr;
@@ -59,6 +63,9 @@ pub trait SendWithoutResponse {
pub struct Client {
conn_new: HashMap<NymNodeRoutingAddress, ConnectionSender>,
config: Config,
topology_access: TopologyAccessor,
api_client: NymApiClient,
local_identity: Arc<encryption::KeyPair>,
}
struct ConnectionSender {
@@ -76,10 +83,18 @@ impl ConnectionSender {
}
impl Client {
pub fn new(config: Config) -> Client {
pub fn new(
config: Config,
topology_access: TopologyAccessor,
api_client: NymApiClient,
local_identity: Arc<encryption::KeyPair>,
) -> Client {
Client {
conn_new: HashMap::new(),
config,
topology_access,
api_client,
local_identity,
}
}
@@ -88,6 +103,10 @@ impl Client {
receiver: mpsc::Receiver<FramedNymPacket>,
connection_timeout: Duration,
current_reconnection: &AtomicU32,
topology_access: TopologyAccessor,
api_client: NymApiClient,
local_public_key: &[u8],
local_private_key: &[u8],
) {
let connection_fut = TcpStream::connect(address);
@@ -97,7 +116,41 @@ impl Client {
debug!("Managed to establish connection to {}", address);
// if we managed to connect, reset the reconnection count (whatever it might have been)
current_reconnection.store(0, Ordering::Release);
Framed::new(stream, NymCodec)
//Get the topology, because we need the keys for the handshake
let topology_ref = match topology_access.current_topology().await {
Some(topology) => topology,
None => {
error!("Cannot perform Noise handshake to {address}, due to topology error");
return;
}
};
let epoch_id = match api_client.get_current_epoch_id().await {
Ok(id) => id,
Err(err) => {
error!("Cannot perform Noise handshake to {address}, due to epoch id error - {err}");
return;
}
};
let noise_stream = match upgrade_noise_initiator_with_topology(
stream,
Default::default(),
&topology_ref,
epoch_id,
local_public_key,
local_private_key,
)
.await
{
Ok(noise_stream) => noise_stream,
Err(err) => {
error!("Failed to perform Noise handshake with {address} - {err}");
return;
}
};
debug!("Noise initiator handshake completed for {:?}", address);
Framed::new(noise_stream, NymCodec)
}
Err(err) => {
debug!(
@@ -175,6 +228,11 @@ impl Client {
// copy the value before moving into another task
let initial_connection_timeout = self.config.initial_connection_timeout;
let topology_access_clone = self.topology_access.clone();
let api_client_clone = self.api_client.clone();
let local_public_key = self.local_identity.public_key().to_bytes();
let local_private_key = self.local_identity.private_key().to_bytes();
tokio::spawn(async move {
// before executing the manager, wait for what was specified, if anything
if let Some(backoff) = backoff {
@@ -187,6 +245,10 @@ impl Client {
receiver,
initial_connection_timeout,
&current_reconnection_attempt,
topology_access_clone,
api_client_clone,
&local_public_key,
&local_private_key,
)
.await
});
@@ -255,13 +317,16 @@ mod tests {
use super::*;
fn dummy_client() -> Client {
Client::new(Config {
initial_reconnection_backoff: Duration::from_millis(10_000),
maximum_reconnection_backoff: Duration::from_millis(300_000),
initial_connection_timeout: Duration::from_millis(1_500),
maximum_connection_buffer_size: 128,
use_legacy_version: false,
})
Client::new(
Config {
initial_reconnection_backoff: Duration::from_millis(10_000),
maximum_reconnection_backoff: Duration::from_millis(300_000),
initial_connection_timeout: Duration::from_millis(1_500),
maximum_connection_buffer_size: 128,
use_legacy_version: false,
},
TopologyAccessor::new(),
)
}
#[test]
@@ -5,7 +5,11 @@ use crate::client::{Client, Config, SendWithoutResponse};
use futures::channel::mpsc;
use futures::StreamExt;
use log::*;
use nym_client_core::client::topology_control::accessor::TopologyAccessor;
use nym_crypto::asymmetric::encryption;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_validator_client::NymApiClient;
use std::sync::Arc;
use std::time::Duration;
pub type MixForwardingSender = mpsc::UnboundedSender<MixPacket>;
@@ -26,6 +30,9 @@ impl PacketForwarder {
initial_connection_timeout: Duration,
maximum_connection_buffer_size: usize,
use_legacy_version: bool,
topology_access: TopologyAccessor,
api_client: NymApiClient,
local_identity: Arc<encryption::KeyPair>,
shutdown: nym_task::TaskClient,
) -> (PacketForwarder, MixForwardingSender) {
let client_config = Config::new(
@@ -40,7 +47,12 @@ impl PacketForwarder {
(
PacketForwarder {
mixnet_client: Client::new(client_config),
mixnet_client: Client::new(
client_config,
topology_access,
api_client,
local_identity,
),
packet_receiver,
shutdown,
},
@@ -260,10 +260,24 @@ impl NymApiClient {
Ok(self.nym_api.get_mixnodes().await?)
}
pub async fn get_all_mixnodes(&self) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
Ok(self.nym_api.get_all_mixnodes().await?)
}
pub async fn get_all_gateways(&self) -> Result<Vec<GatewayBond>, ValidatorClientError> {
Ok(self.nym_api.get_all_gateways().await?)
}
pub async fn get_cached_gateways(&self) -> Result<Vec<GatewayBond>, ValidatorClientError> {
Ok(self.nym_api.get_gateways().await?)
}
pub async fn get_current_epoch_id(
&self,
) -> Result<nym_mixnet_contract_common::EpochId, ValidatorClientError> {
Ok(self.nym_api.get_current_epoch().await?.current_epoch_id())
}
pub async fn get_gateway_core_status_count(
&self,
identity: IdentityKeyRef<'_>,
@@ -14,7 +14,7 @@ use nym_api_requests::models::{
StakeSaturationResponse, UptimeResponse,
};
use nym_mixnet_contract_common::mixnode::MixNodeDetails;
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, MixId};
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, Interval, MixId};
use nym_name_service_common::response::NamesListResponse;
use nym_service_provider_directory_common::response::ServicesListResponse;
use reqwest::{Response, StatusCode};
@@ -130,11 +130,27 @@ impl Client {
}
}
pub async fn get_current_epoch(&self) -> Result<Interval, NymAPIError> {
self.query_nym_api(
&[routes::API_VERSION, routes::EPOCH, routes::CURRENT],
NO_PARAMS,
)
.await
}
pub async fn get_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
self.query_nym_api(&[routes::API_VERSION, routes::MIXNODES], NO_PARAMS)
.await
}
pub async fn get_all_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
self.query_nym_api(
&[routes::API_VERSION, routes::MIXNODES, routes::ALL],
NO_PARAMS,
)
.await
}
pub async fn get_mixnodes_detailed(&self) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
self.query_nym_api(
&[
@@ -181,6 +197,14 @@ impl Client {
.await
}
pub async fn get_all_gateways(&self) -> Result<Vec<GatewayBond>, NymAPIError> {
self.query_nym_api(
&[routes::API_VERSION, routes::GATEWAYS, routes::ALL],
NO_PARAMS,
)
.await
}
pub async fn get_active_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
self.query_nym_api(
&[routes::API_VERSION, routes::MIXNODES, routes::ACTIVE],
@@ -6,6 +6,10 @@ use nym_network_defaults::NYM_API_VERSION;
pub const API_VERSION: &str = NYM_API_VERSION;
pub const MIXNODES: &str = "mixnodes";
pub const GATEWAYS: &str = "gateways";
pub const ALL: &str = "all";
pub const EPOCH: &str = "epoch";
pub const CURRENT: &str = "current";
pub const DETAILED: &str = "detailed";
pub const DETAILED_UNFILTERED: &str = "detailed-unfiltered";
+19
View File
@@ -0,0 +1,19 @@
[package]
name = "nym-noise"
version = "0.1.0"
authors = ["Simon Wicky <simon@nymtech.net>"]
edition = "2021"
[dependencies]
snow = { git = "https://github.com/simonwicky/snow"}
futures = "0.3"
tokio = { version = "1.24.1", features = ["net","io-util","time"] }
pin-project = "1"
log = "0.4.19"
sha2 = "0.10.7"
bytes = "1.0"
thiserror = "1.0.44"
blake2 = "0.10.6"
# internal
nym-topology = { path = "../topology"}
+435
View File
@@ -0,0 +1,435 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use log::*;
use nym_topology::NymTopology;
use pin_project::pin_project;
use sha2::{Digest, Sha256};
use snow::error::Prerequisite;
use snow::Builder;
use snow::Error;
use snow::HandshakeState;
use snow::TransportState;
use std::cmp::min;
use std::collections::VecDeque;
use std::io;
use std::io::ErrorKind;
use std::num::TryFromIntError;
use std::pin::Pin;
use std::task::Poll;
use thiserror::Error;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::ReadBuf;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpStream,
};
pub mod wireguard;
const MAXMSGLEN: usize = 65535;
const TAGLEN: usize = 16;
const HEADER_SIZE: usize = 2;
#[derive(Error, Debug)]
pub enum NoiseError {
#[error("encountered a Noise decryption error")]
DecryptionError,
#[error("encountered a Noise Protocol error - {0}")]
ProtocolError(Error),
#[error("encountered an IO error - {0}")]
IoError(#[from] io::Error),
#[error("Incorrect state")]
IncorrectStateError,
#[error("Handshake timeout")]
HandshakeTimeoutError(#[from] tokio::time::error::Elapsed),
#[error(transparent)]
IntConversionError(#[from] TryFromIntError),
}
impl From<Error> for NoiseError {
fn from(err: Error) -> Self {
match err {
Error::Decrypt => NoiseError::DecryptionError,
err => NoiseError::ProtocolError(err),
}
}
}
#[derive(Default)]
pub enum NoisePattern {
#[default]
XKpsk3,
IKpsk2,
//DEMO MODE, TO BE DELETED
NN,
XXpsk0,
XKpsk3Var,
}
impl NoisePattern {
fn as_str(&self) -> &'static str {
match self {
Self::XKpsk3 => "Noise_XKpsk3_25519_AESGCM_SHA256",
Self::IKpsk2 => "Noise_IKpsk2_25519_ChaChaPoly_BLAKE2s", //Wireguard handshake
//DEMO MODE, TO BE DELETED
Self::NN => "Noise_NN_25519_AESGCM_SHA256",
Self::XXpsk0 => "Noise_XXpsk0_25519_AESGCM_SHA256",
Self::XKpsk3Var => "Noise_XKpsk3_25519_ChaChaPoly_BLAKE2s",
}
}
fn psk_position(&self) -> u8 {
//automatic parsing, works for correct pattern, more convenient
match self.as_str().find("psk") {
Some(n) => {
let psk_index = n + 3;
let psk_char = self.as_str().chars().nth(psk_index).unwrap();
psk_char.to_string().parse().unwrap()
//if this fails, it means hardcoded pattern are wrong
}
None => 0,
}
}
}
/// Wrapper around a TcpStream
#[pin_project]
pub struct NoiseStream {
#[pin]
inner_stream: TcpStream,
handshake: Option<HandshakeState>,
noise: Option<TransportState>,
enc_storage: VecDeque<u8>,
dec_storage: VecDeque<u8>,
}
impl NoiseStream {
fn new(inner_stream: TcpStream, handshake: HandshakeState) -> NoiseStream {
NoiseStream {
inner_stream,
handshake: Some(handshake),
noise: None,
enc_storage: VecDeque::with_capacity(MAXMSGLEN + TAGLEN + HEADER_SIZE), //At least one message
dec_storage: VecDeque::with_capacity(MAXMSGLEN),
}
}
async fn perform_handshake(mut self) -> Result<Self, NoiseError> {
//Check if we are in the correct state
let Some(mut handshake) = self.handshake else {
return Err(NoiseError::IncorrectStateError);
};
self.handshake = None;
while !handshake.is_handshake_finished() {
if handshake.is_my_turn() {
self.send_handshake_msg(&mut handshake).await?;
} else {
self.recv_handshake_msg(&mut handshake).await?;
}
}
self.noise = Some(handshake.into_transport_mode()?);
Ok(self)
}
async fn send_handshake_msg(
&mut self,
handshake: &mut HandshakeState,
) -> Result<(), NoiseError> {
let mut buf = vec![0u8; MAXMSGLEN];
let len = handshake.write_message(&[], &mut buf)?;
self.inner_stream.write_u16(len.try_into()?).await?; //len is always < 2^16, so it shouldn't fail
self.inner_stream.write_all(&buf[..len]).await?;
Ok(())
}
async fn recv_handshake_msg(
&mut self,
handshake: &mut HandshakeState,
) -> Result<(), NoiseError> {
let msg_len = self.inner_stream.read_u16().await?;
let mut msg = vec![0u8; msg_len.into()];
self.inner_stream.read_exact(&mut msg[..]).await?;
let mut buf = vec![0u8; MAXMSGLEN];
handshake.read_message(&msg, &mut buf)?;
Ok(())
}
}
impl AsyncRead for NoiseStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let projected_self = self.project();
let enc_storage = projected_self.enc_storage;
let ready_to_read = projected_self.inner_stream.poll_read_ready(cx);
match ready_to_read {
Poll::Pending => {
//no new data, waking is already scheduled.
//Nothing new to decrypt, only check if we can return something from dec_storage, happens after
}
Poll::Ready(Ok(())) => {
//Read what we can into enc_storage, decrypt what we can into dec_storage
let mut tcp_buf = vec![0u8; MAXMSGLEN + HEADER_SIZE + TAGLEN];
if let Ok(tcp_len) = projected_self.inner_stream.try_read(&mut tcp_buf) {
if tcp_len == 0 && projected_self.dec_storage.is_empty() {
//EOF
return Poll::Ready(Ok(()));
}
enc_storage.extend(&tcp_buf[..tcp_len]);
//we can at least read the length
while enc_storage.len() >= HEADER_SIZE {
let msg_len = ((enc_storage[0] as usize) << 8) + (enc_storage[1] as usize);
//no more messages to read
if enc_storage.len() < HEADER_SIZE + msg_len {
break;
}
//we have a full message to decrypt
//remove size
enc_storage.pop_front();
enc_storage.pop_front();
let noise_msg = enc_storage.drain(..msg_len).collect::<Vec<u8>>();
let mut dec_msg = vec![0u8; MAXMSGLEN];
let Ok(len) = (match projected_self.noise {
Some(transport_state) => {
transport_state.read_message(&noise_msg, &mut dec_msg)
}
None => return Poll::Ready(Err(ErrorKind::Other.into())),
}) else {
return Poll::Ready(Err(ErrorKind::InvalidInput.into()));
};
projected_self.dec_storage.extend(&dec_msg[..len]);
}
}
}
//an error occured, let's return it right away
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
}
//check if we can return something
let read_len = min(buf.remaining(), projected_self.dec_storage.len());
if read_len > 0 {
buf.put_slice(
&projected_self
.dec_storage
.drain(..read_len)
.collect::<Vec<u8>>(),
);
return Poll::Ready(Ok(()));
}
//can't return anything, schedule the wakeup and return pending
if let Poll::Ready(Ok(())) = projected_self.inner_stream.poll_read_ready(cx) {
//we got data in the meantime, we can wake up immediately
cx.waker().wake_by_ref();
}
Poll::Pending
}
}
impl AsyncWrite for NoiseStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
let projected_self = self.project();
let mut noise_buf = vec![0u8; MAXMSGLEN];
let Ok(len) = (match projected_self.noise {
Some(transport_state) => transport_state.write_message(buf, &mut noise_buf),
None => return Poll::Ready(Err(ErrorKind::Other.into())),
}) else {
return Poll::Ready(Err(ErrorKind::InvalidInput.into()));
};
let to_send = [&[(len >> 8) as u8, (len & 0xff) as u8], &noise_buf[..len]].concat();
match projected_self.inner_stream.poll_write(cx, &to_send) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Ready(Ok(n)) => {
//didn't send a thing, no problem for the underlying stream
if n == 0 {
return Poll::Ready(Ok(0));
}
//we sent the whole thing, no problem for the underlying stream
//We must guarantee that the return number is <= buf.len()
if n == to_send.len() {
return Poll::Ready(Ok(n - HEADER_SIZE - TAGLEN));
}
//We didn't write the whole message, the stream will be corrupted
error!(
"Partial write on Noise Stream, it will be corrupted - {}",
n
);
Poll::Ready(Err(ErrorKind::WriteZero.into()))
}
}
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
self.project().inner_stream.poll_flush(cx)
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
self.project().inner_stream.poll_shutdown(cx)
}
}
pub async fn upgrade_noise_initiator(
conn: TcpStream,
pattern: NoisePattern,
local_public_key: Option<&[u8]>,
local_private_key: &[u8],
remote_pub_key: &[u8],
epoch: u32,
) -> Result<NoiseStream, NoiseError> {
trace!("Perform Noise Handshake, initiator side");
//In case the local key cannot be known by the remote party, e.g. in a client-gateway connection
let secret = [
local_public_key.unwrap_or(&[]),
remote_pub_key,
&epoch.to_be_bytes(),
]
.concat();
let secret_hash = Sha256::digest(secret);
let handshake = Builder::new(pattern.as_str().parse()?)
.local_private_key(local_private_key)
.remote_public_key(remote_pub_key)
.psk(pattern.psk_position(), &secret_hash)
.build_initiator()?;
let noise_stream = NoiseStream::new(conn, handshake);
noise_stream.perform_handshake().await
}
pub async fn upgrade_noise_initiator_with_topology(
conn: TcpStream,
pattern: NoisePattern,
topology: &NymTopology,
epoch: u32,
local_public_key: &[u8],
local_private_key: &[u8],
) -> Result<NoiseStream, NoiseError> {
//Get init material
let responder_addr = match conn.peer_addr() {
Ok(addr) => addr,
Err(err) => {
error!("Unable to extract peer address from connection - {err}");
return Err(Error::Prereq(Prerequisite::RemotePublicKey).into());
}
};
let remote_pub_key = match topology.find_node_key_by_mix_host(responder_addr) {
Some(pub_key) => pub_key.to_bytes(),
None => {
error!(
"Cannot find public key for node with address {:?}",
responder_addr
);
return Err(Error::Prereq(Prerequisite::RemotePublicKey).into());
}
};
upgrade_noise_initiator(
conn,
pattern,
Some(local_public_key),
local_private_key,
&remote_pub_key,
epoch,
)
.await
}
pub async fn upgrade_noise_responder(
conn: TcpStream,
pattern: NoisePattern,
local_public_key: &[u8],
local_private_key: &[u8],
remote_pub_key: Option<&[u8]>,
epoch: u32,
) -> Result<NoiseStream, NoiseError> {
trace!("Perform Noise Handshake, responder side");
//If the remote_key cannot be kwnown, e.g. in a client-gateway connection
let secret = [
remote_pub_key.unwrap_or(&[]),
local_public_key,
&epoch.to_be_bytes(),
]
.concat();
let secret_hash = Sha256::digest(secret);
let handshake = Builder::new(pattern.as_str().parse()?)
.local_private_key(local_private_key)
.psk(pattern.psk_position(), &secret_hash)
.build_responder()?;
let noise_stream = NoiseStream::new(conn, handshake);
noise_stream.perform_handshake().await
}
pub async fn upgrade_noise_responder_with_topology(
conn: TcpStream,
pattern: NoisePattern,
topology: &NymTopology,
epoch: u32,
local_public_key: &[u8],
local_private_key: &[u8],
) -> Result<NoiseStream, NoiseError> {
//Get init material
let initiator_addr = match conn.peer_addr() {
Ok(addr) => addr,
Err(err) => {
error!("Unable to extract peer address from connection - {err}");
return Err(Error::Prereq(Prerequisite::RemotePublicKey).into());
}
};
let remote_pub_key = match topology.find_node_key_by_mix_host(initiator_addr) {
Some(pub_key) => pub_key.to_bytes(),
None => {
error!(
"Cannot find public key for node with address {:?}",
initiator_addr
);
return Err(Error::Prereq(Prerequisite::RemotePublicKey).into());
}
};
upgrade_noise_responder(
conn,
pattern,
local_public_key,
local_private_key,
Some(&remote_pub_key),
epoch,
)
.await
}
+256
View File
@@ -0,0 +1,256 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use blake2::digest::FixedOutput;
use blake2::digest::KeyInit;
use blake2::Blake2s256;
use blake2::Blake2sMac;
use log::*;
use snow::Builder;
use snow::HandshakeState;
use snow::TransportState;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
use crate::*;
/// Wrapper around a UdpSocket
pub struct WireGuardStream {
inner_stream: Arc<UdpSocket>,
handshake: Option<HandshakeState>,
noise: Option<TransportState>,
peer_public_key: [u8; 32],
}
impl WireGuardStream {
fn new(
inner_stream: Arc<UdpSocket>,
handshake: HandshakeState,
peer_public_key: [u8; 32],
) -> WireGuardStream {
WireGuardStream {
inner_stream,
handshake: Some(handshake),
noise: None,
peer_public_key,
}
}
async fn perform_handshake(mut self) -> Result<Self, NoiseError> {
//Check if we are in the correct state
let Some(mut handshake) = self.handshake else {
return Err(NoiseError::IncorrectStateError);
};
self.handshake = None;
let mut id_i = [0u8; 4];
let mut address: SocketAddr = "0.0.0.0:12345".parse().unwrap();
while !handshake.is_handshake_finished() {
if handshake.is_my_turn() {
self.send_handshake_msg(&mut handshake, id_i.clone(), address)
.await?;
} else {
let res = self.recv_handshake_msg(&mut handshake).await?;
id_i = res.0;
address = res.1;
}
}
self.noise = Some(handshake.into_transport_mode()?);
Ok(self)
}
async fn send_handshake_msg(
&mut self,
handshake: &mut HandshakeState,
id_initiator: [u8; 4],
address: SocketAddr,
) -> Result<(), NoiseError> {
let mut buf = vec![0u8; MAXMSGLEN];
let len = handshake.write_message(&[], &mut buf)?;
let msg = [&2u32.to_le_bytes(), &[42u8; 4], &id_initiator, &buf[..len]].concat();
//mac1 key
let mut k_mac1 = Blake2s256::new();
k_mac1.update(b"mac1----");
k_mac1.update(self.peer_public_key);
let k_mac1_bytes: [u8; 32] = k_mac1.finalize().into();
//mac1
let mut hmac = Blake2sMac::new_from_slice(&k_mac1_bytes).unwrap();
blake2::digest::Update::update(&mut hmac, &msg);
let mac1_bytes: [u8; 16] = hmac.finalize_fixed().into();
let final_msg = [msg, mac1_bytes.to_vec(), vec![0u8; 16]].concat();
println!("Sending : {:?}", final_msg);
self.send_wg_msg(&final_msg, address).await?;
// self.inner_stream.write_u16(len.try_into()?).await?; //len is always < 2^16, so it shouldn't fail
// self.inner_stream.write_all(&buf[..len]).await?;
Ok(())
}
async fn recv_handshake_msg(
&mut self,
handshake: &mut HandshakeState,
) -> Result<([u8; 4], SocketAddr), NoiseError> {
let (msg, address) = self.recv_wg_msg().await?;
println!("Rcv: {:?}", msg);
let mut buf = vec![0u8; MAXMSGLEN];
let len = handshake.read_message(&msg[8..116], &mut buf)?;
println!("Timestamp : {:?} : {:?}", len, &buf[..len]);
Ok((msg[4..8].try_into().unwrap(), address))
}
async fn recv_wg_msg(&self) -> Result<(Vec<u8>, SocketAddr), NoiseError> {
let mut buf = [0u8; MAXMSGLEN];
let (len, address) = self.inner_stream.recv_from(&mut buf).await?;
Ok((buf[..len].to_vec(), address))
}
async fn send_wg_msg(&self, msg: &[u8], address: SocketAddr) -> Result<(), NoiseError> {
self.inner_stream.send_to(msg, address).await?;
Ok(())
}
pub async fn recv(&mut self) -> Result<Vec<u8>, NoiseError> {
let (msg, _) = self.recv_wg_msg().await?;
println!("Rcv data: {:?}", msg);
let mut buf = vec![0u8; MAXMSGLEN];
if let Some(noise) = &mut self.noise {
let len = noise.read_message(&msg[16..], &mut buf)?;
return Ok(buf[..len].to_vec());
}
Err(NoiseError::IncorrectStateError)
}
}
// pub async fn upgrade_noise_initiator(
// conn: UdpSocket,
// pattern: NoisePattern,
// local_public_key: Option<&[u8]>,
// local_private_key: &[u8],
// remote_pub_key: &[u8],
// epoch: u32,
// ) -> Result<WireGuardStream, NoiseError> {
// trace!("Perform Noise Handshake, initiator side");
// //In case the local key cannot be known by the remote party, e.g. in a client-gateway connection
// let secret = [
// local_public_key.unwrap_or(&[]),
// remote_pub_key,
// &epoch.to_be_bytes(),
// ]
// .concat();
// let secret_hash = Sha256::digest(secret);
// let handshake = Builder::new(pattern.as_str().parse()?)
// .local_private_key(local_private_key)
// .remote_public_key(remote_pub_key)
// .psk(pattern.psk_position(), &secret_hash)
// .build_initiator()?;
// let noise_stream = WireGuardStream::new(conn, handshake);
// noise_stream.perform_handshake().await
// }
// pub async fn upgrade_noise_initiator_with_topology(
// conn: UdpSocket,
// pattern: NoisePattern,
// topology: &NymTopology,
// epoch: u32,
// local_public_key: &[u8],
// local_private_key: &[u8],
// ) -> Result<WireGuardStream, NoiseError> {
// //Get init material
// let responder_addr = match conn.peer_addr() {
// Ok(addr) => addr,
// Err(err) => {
// error!("Unable to extract peer address from connection - {err}");
// return Err(Error::Prereq(Prerequisite::RemotePublicKey).into());
// }
// };
// let remote_pub_key = match topology.find_node_key_by_mix_host(responder_addr) {
// Some(pub_key) => pub_key.to_bytes(),
// None => {
// error!(
// "Cannot find public key for node with address {:?}",
// responder_addr
// );
// return Err(Error::Prereq(Prerequisite::RemotePublicKey).into());
// }
// };
// upgrade_noise_initiator(
// conn,
// pattern,
// Some(local_public_key),
// local_private_key,
// &remote_pub_key,
// epoch,
// )
// .await
// }
pub async fn upgrade_noise_responder(
conn: Arc<UdpSocket>,
local_private_key: &[u8],
peer_public_key: [u8; 32],
) -> Result<WireGuardStream, NoiseError> {
trace!("Perform Wireguard Handshake, responder side");
let pattern = NoisePattern::IKpsk2;
//If the remote_key cannot be kwnown, e.g. in a client-gateway connection
let secret = [0u8; 32];
let handshake = Builder::new(pattern.as_str().parse()?)
.local_private_key(local_private_key)
.psk(pattern.psk_position(), &secret)
.prologue(b"WireGuard v1 zx2c4 Jason@zx2c4.com")
.build_responder()?;
let noise_stream = WireGuardStream::new(conn, handshake, peer_public_key);
noise_stream.perform_handshake().await
}
// pub async fn upgrade_noise_responder_with_topology(
// conn: UdpSocket,
// pattern: NoisePattern,
// topology: &NymTopology,
// epoch: u32,
// local_public_key: &[u8],
// local_private_key: &[u8],
// ) -> Result<WireGuardStream, NoiseError> {
// //Get init material
// let initiator_addr = match conn.peer_addr() {
// Ok(addr) => addr,
// Err(err) => {
// error!("Unable to extract peer address from connection - {err}");
// return Err(Error::Prereq(Prerequisite::RemotePublicKey).into());
// }
// };
// let remote_pub_key = match topology.find_node_key_by_mix_host(initiator_addr) {
// Some(pub_key) => pub_key.to_bytes(),
// None => {
// error!(
// "Cannot find public key for node with address {:?}",
// initiator_addr
// );
// return Err(Error::Prereq(Prerequisite::RemotePublicKey).into());
// }
// };
// upgrade_noise_responder(
// conn,
// pattern,
// local_public_key,
// local_private_key,
// Some(&remote_pub_key),
// epoch,
// )
// .await
// }
+1 -1
View File
@@ -67,7 +67,7 @@ impl Node {
}
pub fn clients_address(&self) -> String {
format!("ws://{}:{}", self.host, self.clients_port)
format!("{}:{}", self.host, self.clients_port)
}
}
+74 -1
View File
@@ -3,6 +3,7 @@
use crate::filter::VersionFilterable;
use log::warn;
use nym_crypto::asymmetric::encryption;
use nym_mixnet_contract_common::mixnode::MixNodeDetails;
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, MixId};
use nym_sphinx_addressing::nodes::NodeIdentity;
@@ -71,11 +72,64 @@ pub type MixLayer = u8;
pub struct NymTopology {
mixes: BTreeMap<MixLayer, Vec<mix::Node>>,
gateways: Vec<gateway::Node>,
all_mixes: Vec<mix::Node>,
all_gateways: Vec<gateway::Node>,
}
impl NymTopology {
pub fn new(mixes: BTreeMap<MixLayer, Vec<mix::Node>>, gateways: Vec<gateway::Node>) -> Self {
NymTopology { mixes, gateways }
NymTopology {
mixes: mixes.clone(),
gateways: gateways.clone(),
all_mixes: mixes.values().flatten().cloned().collect(),
all_gateways: gateways,
}
}
pub fn empty() -> Self {
NymTopology {
mixes: BTreeMap::new(),
gateways: Vec::new(),
all_mixes: Vec::new(),
all_gateways: Vec::new(),
}
}
pub fn with_all_mixes(mut self, all_mixes: Vec<MixNodeDetails>) -> Self {
let mut mixes = Vec::new();
for bond in all_mixes
.into_iter()
.map(|details| details.bond_information)
{
let mix_id = bond.mix_id;
let mix_identity = bond.mix_node.identity_key.clone();
match bond.try_into() {
Ok(mix) => mixes.push(mix),
Err(err) => {
warn!("Mix {} / {} is malformed - {err}", mix_id, mix_identity);
continue;
}
}
}
self.all_mixes = mixes;
self
}
pub fn with_all_gateways(mut self, all_gateways: Vec<GatewayBond>) -> Self {
let mut gateways = Vec::with_capacity(all_gateways.len());
for bond in all_gateways.into_iter() {
let gate_id = bond.gateway.identity_key.clone();
match bond.try_into() {
Ok(gate) => gateways.push(gate),
Err(err) => {
warn!("Gateway {} is malformed - {err}", gate_id);
continue;
}
}
}
self.all_gateways = gateways;
self
}
pub fn from_detailed(
@@ -107,6 +161,23 @@ impl NymTopology {
None
}
pub fn find_node_key_by_mix_host(
&self,
mix_host: SocketAddr,
) -> Option<&encryption::PublicKey> {
for node in self.all_gateways.iter() {
if node.mix_host.ip() == mix_host.ip() {
return Some(&node.sphinx_key);
}
}
for node in self.all_mixes.iter() {
if node.mix_host.ip() == mix_host.ip() {
return Some(&node.sphinx_key);
}
}
None
}
pub fn find_gateway(&self, gateway_identity: IdentityKeyRef) -> Option<&gateway::Node> {
self.gateways
.iter()
@@ -314,6 +385,8 @@ impl NymTopology {
NymTopology {
mixes: self.mixes.filter_by_version(expected_mix_version),
gateways: self.gateways.clone(),
all_mixes: self.all_mixes.clone(),
all_gateways: self.all_gateways.clone(),
}
}
}
+30 -3
View File
@@ -34,16 +34,40 @@ pretty_env_logger = "0.4"
rand = "0.7"
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sqlx = { version = "0.5", features = [ "runtime-tokio-rustls", "sqlite", "macros", "migrate", ] }
sqlx = { version = "0.5", features = [
"runtime-tokio-rustls",
"sqlite",
"macros",
"migrate",
] }
subtle-encoding = { version = "0.5", features = ["bech32-preview"] }
thiserror = "1"
tokio = { version = "1.24.1", features = [ "rt-multi-thread", "net", "signal", "fs", ] }
tokio = { version = "1.24.1", features = [
"rt-multi-thread",
"net",
"signal",
"fs",
] }
tokio-stream = { version = "0.1.11", features = ["fs"] }
tokio-tungstenite = "0.14"
tokio-util = { version = "0.7.4", features = ["codec"] }
url = { version = "2.2", features = ["serde"] }
zeroize = { workspace = true }
# wireguard
# Forked it to be able to bump x25519-dalek to rc.3
base64 = "0.21"
x25519-dalek = { version = "2.0.0", features = [
"reusable_secrets",
"static_secrets",
] }
etherparse = "0.13.0"
pnet = "0.34.0"
bytes = "1.4.0"
async-recursion = "1.0.4"
smoltcp = "0.10.0"
tun-tap = "0.1.3"
# internal
nym-api-requests = { path = "../nym-api/nym-api-requests" }
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
@@ -60,7 +84,10 @@ nym-sphinx = { path = "../common/nymsphinx" }
nym-statistics-common = { path = "../common/statistics" }
nym-task = { path = "../common/task" }
nym-types = { path = "../common/types" }
nym-validator-client = { path = "../common/client-libs/validator-client" }
nym-validator-client = { path = "../common/client-libs/validator-client"}
nym-client-core = { path = "../common/client-core" }
nym-topology = { path = "../common/topology" }
nym-noise = { path = "../common/nymnoise" }
[build-dependencies]
tokio = { version = "1.24.1", features = ["rt-multi-thread", "macros"] }
+1
View File
@@ -0,0 +1 @@
gA3NCDl+xOorR3heFVB47FlGunsZgS4RDX2M0IY73lc=
+1
View File
@@ -0,0 +1 @@
mxV/mw7WZTe+0Msa0kvJHMHERDA/cSskiZWQce+TdEs=
+1
View File
@@ -0,0 +1 @@
AEqXrLFT4qjYq3wmX0456iv94uM6nDj5ugp6Jedcflg=
+1
View File
@@ -0,0 +1 @@
WM8s8bYegwMa0TJ+xIwhk+dImk2IpDUKslDBCZPizlE=
+37
View File
@@ -33,6 +33,9 @@ const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_milli
const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500);
const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 2000;
const DEFAULT_TOPOLOGY_REFRESH_RATE: Duration = Duration::from_secs(5 * 60); // every 5min
const DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT: Duration = Duration::from_millis(5_000);
const DEFAULT_STORED_MESSAGE_FILENAME_LENGTH: u16 = 16;
const DEFAULT_MESSAGE_RETRIEVAL_LIMIT: i64 = 100;
@@ -74,6 +77,9 @@ pub struct Config {
#[serde(default)]
pub debug: Debug,
#[serde(default)]
pub topology: Topology,
}
impl NymConfigTemplate for Config {
@@ -89,6 +95,7 @@ impl Config {
storage_paths: GatewayPaths::new_default(id.as_ref()),
logging: Default::default(),
debug: Default::default(),
topology: Default::default(),
}
}
@@ -297,3 +304,33 @@ impl Default for Debug {
}
}
}
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct Topology {
/// The uniform delay every which clients are querying the directory server
/// to try to obtain a compatible network topology to send sphinx packets through.
#[serde(with = "humantime_serde")]
pub topology_refresh_rate: Duration,
/// During topology refresh, test packets are sent through every single possible network
/// path. This timeout determines waiting period until it is decided that the packet
/// did not reach its destination.
#[serde(with = "humantime_serde")]
pub topology_resolution_timeout: Duration,
/// Specifies whether the client should not refresh the network topology after obtaining
/// the first valid instance.
/// Supersedes `topology_refresh_rate_ms`.
pub disable_refreshing: bool,
}
impl Default for Topology {
fn default() -> Self {
Topology {
topology_refresh_rate: DEFAULT_TOPOLOGY_REFRESH_RATE,
topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT,
disable_refreshing: false,
}
}
}
+1
View File
@@ -69,6 +69,7 @@ impl From<ConfigV1_1_20> for Config {
},
logging: value.logging.into(),
debug: value.debug.into(),
topology: Default::default(),
}
}
}
@@ -6,8 +6,10 @@ use crate::node::client_handling::websocket::connection_handler::coconut::Coconu
use crate::node::client_handling::websocket::connection_handler::FreshHandler;
use crate::node::storage::Storage;
use log::*;
use nym_crypto::asymmetric::identity;
use nym_crypto::asymmetric::{encryption, identity};
use nym_mixnet_client::forwarder::MixForwardingSender;
use nym_noise::upgrade_noise_responder;
use nym_validator_client::NymApiClient;
use rand::rngs::OsRng;
use std::net::SocketAddr;
use std::process;
@@ -17,22 +19,28 @@ use tokio::task::JoinHandle;
pub(crate) struct Listener {
address: SocketAddr,
local_identity: Arc<identity::KeyPair>,
local_sphinx: Arc<encryption::KeyPair>,
only_coconut_credentials: bool,
pub(crate) coconut_verifier: Arc<CoconutVerifier>,
nym_api_client: NymApiClient,
}
impl Listener {
pub(crate) fn new(
address: SocketAddr,
local_identity: Arc<identity::KeyPair>,
local_sphinx: Arc<encryption::KeyPair>,
only_coconut_credentials: bool,
coconut_verifier: Arc<CoconutVerifier>,
nym_api_client: NymApiClient,
) -> Self {
Listener {
address,
local_identity,
local_sphinx,
only_coconut_credentials,
coconut_verifier,
nym_api_client,
}
}
@@ -68,9 +76,33 @@ impl Listener {
trace!("received a socket connection from {remote_addr}");
// TODO: I think we *REALLY* need a mechanism for having a maximum number of connected
// clients or spawned tokio tasks -> perhaps a worker system?
let current_epoch_id = match self.nym_api_client.get_current_epoch_id().await {
Ok(epoch_id) => epoch_id,
Err(err) => {
error!("Failed to retrieve epoch Id for Noise handshake - {err}");
continue;
}
};
let noise_stream = match upgrade_noise_responder(
socket,
Default::default(),
&self.local_sphinx.public_key().to_bytes(),
&self.local_sphinx.private_key().to_bytes(),
None, //connection from client, no remote pub key
current_epoch_id,
)
.await
{
Ok(noise_stream) => noise_stream,
Err(err) => {
error!("Failed to perform Noise handshake with {remote_addr} - {err}");
continue;
}
};
let handle = FreshHandler::new(
OsRng,
socket,
noise_stream,
self.only_coconut_credentials,
outbound_mix_sender.clone(),
Arc::clone(&self.local_identity),
@@ -8,15 +8,20 @@ use crate::node::storage::error::StorageError;
use crate::node::storage::Storage;
use futures::StreamExt;
use log::*;
use nym_client_core::client::topology_control::accessor::TopologyAccessor;
use nym_crypto::asymmetric::encryption;
use nym_mixnet_client::forwarder::MixForwardingSender;
use nym_mixnode_common::packet_processor::processor::ProcessedFinalHop;
use nym_noise::upgrade_noise_responder_with_topology;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::framing::codec::NymCodec;
use nym_sphinx::framing::packet::FramedNymPacket;
use nym_sphinx::DestinationAddressBytes;
use nym_task::TaskClient;
use nym_validator_client::NymApiClient;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
@@ -31,6 +36,9 @@ pub(crate) struct ConnectionHandler<St: Storage> {
active_clients_store: ActiveClientsStore,
storage: St,
ack_sender: MixForwardingSender,
topology_access: TopologyAccessor,
api_client: NymApiClient,
local_identity: Arc<encryption::KeyPair>,
}
impl<St: Storage + Clone> Clone for ConnectionHandler<St> {
@@ -49,6 +57,9 @@ impl<St: Storage + Clone> Clone for ConnectionHandler<St> {
active_clients_store: self.active_clients_store.clone(),
storage: self.storage.clone(),
ack_sender: self.ack_sender.clone(),
topology_access: self.topology_access.clone(),
api_client: self.api_client.clone(),
local_identity: self.local_identity.clone(),
}
}
}
@@ -59,6 +70,9 @@ impl<St: Storage> ConnectionHandler<St> {
storage: St,
ack_sender: MixForwardingSender,
active_clients_store: ActiveClientsStore,
topology_access: TopologyAccessor,
api_client: NymApiClient,
local_identity: Arc<encryption::KeyPair>,
) -> Self {
ConnectionHandler {
packet_processor,
@@ -66,6 +80,9 @@ impl<St: Storage> ConnectionHandler<St> {
storage,
active_clients_store,
ack_sender,
topology_access,
api_client,
local_identity,
}
}
@@ -182,7 +199,41 @@ impl<St: Storage> ConnectionHandler<St> {
) {
debug!("Starting connection handler for {:?}", remote);
shutdown.mark_as_success();
let mut framed_conn = Framed::new(conn, NymCodec);
let topology_ref = match self.topology_access.current_topology().await {
Some(topology) => topology,
None => {
error!("Cannot perform Noise handshake to {remote}, due to topology error");
return;
}
};
let epoch_id = match self.api_client.get_current_epoch_id().await {
Ok(id) => id,
Err(err) => {
error!("Cannot perform Noise handshake to {remote}, due to epoch id error - {err}");
return;
}
};
let noise_stream = match upgrade_noise_responder_with_topology(
conn,
Default::default(),
&topology_ref,
epoch_id,
&self.local_identity.public_key().to_bytes(),
&self.local_identity.private_key().to_bytes(),
)
.await
{
Ok(noise_stream) => noise_stream,
Err(err) => {
error!("Failed to perform Noise handshake with {remote} - {err}");
return;
}
};
debug!("Noise responder handshake completed for {:?}", remote);
let mut framed_conn = Framed::new(noise_stream, NymCodec);
while !shutdown.is_shutdown() {
tokio::select! {
biased;
@@ -212,9 +263,6 @@ impl<St: Storage> ConnectionHandler<St> {
}
}
info!(
"Closing connection from {:?}",
framed_conn.into_inner().peer_addr()
);
info!("Closing connection from {:?}", remote);
}
}
+87 -3
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use self::storage::PersistentStorage;
use crate::config::Config;
use crate::config::{Config, Topology};
use crate::error::GatewayError;
use crate::node::client_handling::active_clients::ActiveClientsStore;
use crate::node::client_handling::websocket;
@@ -10,24 +10,33 @@ use crate::node::client_handling::websocket::connection_handler::coconut::Coconu
use crate::node::mixnet_handling::receiver::connection_handler::ConnectionHandler;
use crate::node::statistics::collector::GatewayStatisticsCollector;
use crate::node::storage::Storage;
use crate::node::wireguard::wireguard;
use log::*;
use nym_bin_common::output_format::OutputFormat;
use nym_client_core::client::topology_control::accessor::TopologyAccessor;
use nym_client_core::client::topology_control::nym_api_provider::NymApiTopologyProvider;
use nym_client_core::client::topology_control::TopologyRefresher;
use nym_client_core::client::topology_control::TopologyRefresherConfig;
use nym_crypto::asymmetric::{encryption, identity};
use nym_mixnet_client::forwarder::{MixForwardingSender, PacketForwarder};
use nym_network_defaults::NymNetworkDetails;
use nym_statistics_common::collector::StatisticsSender;
use nym_task::{TaskClient, TaskManager};
use nym_topology::provider_trait::TopologyProvider;
use nym_validator_client::NymApiClient;
use nym_validator_client::{nyxd, DirectSigningHttpRpcNyxdClient};
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::error::Error;
use std::net::SocketAddr;
use std::sync::Arc;
use url::Url;
pub(crate) mod client_handling;
pub(crate) mod mixnet_handling;
pub(crate) mod statistics;
pub(crate) mod storage;
pub(crate) mod wireguard;
/// Wire up and create Gateway instance
pub(crate) async fn create_gateway(config: Config) -> Gateway<PersistentStorage> {
@@ -124,6 +133,8 @@ impl<St> Gateway<St> {
&self,
ack_sender: MixForwardingSender,
active_clients_store: ActiveClientsStore,
topology_access: TopologyAccessor,
api_client: NymApiClient,
shutdown: TaskClient,
) where
St: Storage + Clone + 'static,
@@ -138,6 +149,9 @@ impl<St> Gateway<St> {
self.storage.clone(),
ack_sender,
active_clients_store,
topology_access,
api_client,
Arc::clone(&self.sphinx_keypair),
);
let listening_address = SocketAddr::new(
@@ -167,8 +181,10 @@ impl<St> Gateway<St> {
websocket::Listener::new(
listening_address,
Arc::clone(&self.identity_keypair),
Arc::clone(&self.sphinx_keypair),
self.config.gateway.only_coconut_credentials,
coconut_verifier,
self.random_api_client(),
)
.start(
forwarding_channel,
@@ -178,7 +194,12 @@ impl<St> Gateway<St> {
);
}
fn start_packet_forwarder(&self, shutdown: TaskClient) -> MixForwardingSender {
fn start_packet_forwarder(
&self,
topology_access: TopologyAccessor,
api_client: NymApiClient,
shutdown: TaskClient,
) -> MixForwardingSender {
info!("Starting mix packet forwarder...");
let (mut packet_forwarder, packet_sender) = PacketForwarder::new(
@@ -187,6 +208,9 @@ impl<St> Gateway<St> {
self.config.debug.initial_connection_timeout,
self.config.debug.maximum_connection_buffer_size,
self.config.debug.use_legacy_framed_packet_version,
topology_access,
api_client,
Arc::clone(&self.sphinx_keypair),
shutdown,
);
@@ -230,6 +254,47 @@ impl<St> Gateway<St> {
.expect("Could not connect with mnemonic")
}
fn setup_topology_provider(nym_api_urls: Vec<Url>) -> Box<dyn TopologyProvider + Send + Sync> {
// if no custom provider was ... provided ..., create one using nym-api
Box::new(NymApiTopologyProvider::new(
nym_api_urls,
env!("CARGO_PKG_VERSION").to_string(),
))
}
// future responsible for periodically polling directory server and updating
// the current global view of topology
async fn start_topology_refresher(
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
topology_config: Topology,
topology_accessor: TopologyAccessor,
mut shutdown: TaskClient,
) {
let topology_refresher_config =
TopologyRefresherConfig::new(topology_config.topology_refresh_rate);
let mut topology_refresher = TopologyRefresher::new(
topology_refresher_config,
topology_accessor,
topology_provider,
);
// before returning, block entire runtime to refresh the current network view so that any
// components depending on topology would see a non-empty view
info!("Obtaining initial network topology");
topology_refresher.try_refresh().await;
if topology_config.disable_refreshing {
// if we're not spawning the refresher, don't cause shutdown immediately
info!("The topology refesher is not going to be started");
shutdown.mark_as_success();
} else {
// don't spawn the refresher if we don't want to be refreshing the topology.
// only use the initial values obtained
info!("Starting topology refresher...");
topology_refresher.start_with_shutdown(shutdown);
}
}
async fn check_if_bonded(&self) -> Result<bool, GatewayError> {
// TODO: if anything, this should be getting data directly from the contract
// as opposed to the validator API
@@ -263,13 +328,30 @@ impl<St> Gateway<St> {
let nyxd_client = self.random_nyxd_client();
CoconutVerifier::new(nyxd_client)
};
let random_api_client = self.random_api_client();
let topology_provider = Self::setup_topology_provider(self.config.get_nym_api_endpoints());
let shared_topology_access = TopologyAccessor::new();
let mix_forwarding_channel = self.start_packet_forwarder(shutdown.subscribe());
Self::start_topology_refresher(
topology_provider,
self.config.topology,
shared_topology_access.clone(),
shutdown.subscribe(),
)
.await;
let mix_forwarding_channel = self.start_packet_forwarder(
shared_topology_access.clone(),
random_api_client.clone(),
shutdown.subscribe(),
);
let active_clients_store = ActiveClientsStore::new();
self.start_mix_socket_listener(
mix_forwarding_channel.clone(),
active_clients_store.clone(),
shared_topology_access,
random_api_client,
shutdown.subscribe(),
);
@@ -293,6 +375,8 @@ impl<St> Gateway<St> {
Arc::new(coconut_verifier),
);
tokio::spawn(wireguard());
info!("Finished nym gateway startup procedure - it should now be able to receive mix and client traffic!");
self.wait_for_interrupt(shutdown).await
+84
View File
@@ -0,0 +1,84 @@
use base64::engine::general_purpose;
use base64::Engine as _;
use log::{error, info};
use nym_noise::wireguard::upgrade_noise_responder;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::net::UdpSocket;
use tokio::sync::broadcast;
use x25519_dalek::{PublicKey, StaticSecret};
pub async fn wireguard() {
let wg_address = "0.0.0.0:51820";
let sock = Arc::new(UdpSocket::bind(wg_address).await.unwrap());
info!("wg listening on {wg_address}");
// Secret key ofthe gateway, we'll need a way to generate this from the IdentityKey, might be enough to do some base58 -> base64 conversion
let secret_bytes: [u8; 32] = general_purpose::STANDARD
.decode("+EWK0GFOIhSOuAo6vFqTgnm14lJiIBWt0KXnZ06/pkU=")
.unwrap()
.try_into()
.unwrap();
// Hardcoded peer public key, we'll need a way to register those, private key for that one is `aMUcuAgTiFCHQ/fHqEQRvpLWBxh8sKA7f7lSyWymrGE=`
// Wireguard configuration that works with this setup is below, this needs to be put into the wireguard client of choice.
// Working in this case means that they go through the handshake, and client
// starts sending data packets to the gateway.
//
// [Interface]
// PrivateKey = 6OhouAaOtkcrCPDX5UZHAwXmagYX8x/Y1vTO4mWst0M=
// Address = 10.8.0.0/24
// DNS = 1.1.1.1
//
// [Peer]
// PublicKey = 2Ie0Cp1tQnejZfKdHGmpIkWS/9MQJV6sWtP4QJLREl4=
// AllowedIPs = 0.0.0.0/0
// Endpoint = 127.0.0.1:51820
let peer_public_bytes: [u8; 32] = general_purpose::STANDARD
.decode("MzfycYCQl1KR6LSViZCrp6Imx/MfXHH11U+Nrwxr5Dw=")
.unwrap()
.try_into()
.unwrap();
let peer_public = PublicKey::from(peer_public_bytes);
let secret = StaticSecret::try_from(secret_bytes).unwrap();
let public = PublicKey::from(&secret);
info!(
"wg public key: {}",
general_purpose::STANDARD.encode(public)
);
// let mut buf = [0; 1024];
// let mut peers = HashSet::new();
// let (bus_tx, _) = broadcast::channel(128);
let mut wireguard_stream =
upgrade_noise_responder(sock.clone(), &secret_bytes, peer_public_bytes.clone())
.await
.unwrap();
println!("Handshake completed");
while let Ok(msg) = wireguard_stream.recv().await {
println!("msg : {:?}", msg);
}
// while let Ok((len, addr)) = sock.recv_from(&mut buf).await {
// info!("Received {} bytes from {}", len, addr);
// if peers.contains(&addr) {
// bus_tx
// .send(Event::WgPacket(buf[..len].to_vec().into()))
// .map_err(|e| error!("{e}"))
// .unwrap();
// } else {
// info!("New peer with endpoint {addr}");
// let tun =
// WireGuardTunnel::new(peer_public, Arc::clone(&sock), addr, bus_tx.clone()).await;
// peers.insert(addr);
// tokio::spawn(tun.spin_off());
// bus_tx
// .send(Event::WgPacket(buf[..len].to_vec().into()))
// .map_err(|e| error!("{e}"))
// .unwrap();
// }
// }
panic!("Not OK");
}
+2
View File
@@ -56,9 +56,11 @@ nym-pemstore = { path = "../common/pemstore", version = "0.3.0" }
nym-task = { path = "../common/task" }
nym-types = { path = "../common/types" }
nym-topology = { path = "../common/topology" }
nym-client-core = { path = "../common/client-core/" }
nym-validator-client = { path = "../common/client-libs/validator-client" }
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
cpu-cycles = { path = "../cpu-cycles", optional = true }
nym-noise = { path = "../common/nymnoise" }
[dev-dependencies]
tokio = { version = "1.21.2", features = [
+37
View File
@@ -44,6 +44,9 @@ const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_milli
const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500);
const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 2000;
const DEFAULT_TOPOLOGY_REFRESH_RATE: Duration = Duration::from_secs(5 * 60); // every 5min
const DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT: Duration = Duration::from_millis(5_000);
/// Derive default path to mixnodes's config directory.
/// It should get resolved to `$HOME/.nym/mixnodes/<id>/config`
pub fn default_config_directory<P: AsRef<Path>>(id: P) -> PathBuf {
@@ -85,6 +88,9 @@ pub struct Config {
#[serde(default)]
pub debug: Debug,
#[serde(default)]
pub topology: Topology,
}
impl NymConfigTemplate for Config {
@@ -101,6 +107,7 @@ impl Config {
verloc: Default::default(),
logging: Default::default(),
debug: Default::default(),
topology: Default::default(),
}
}
@@ -287,3 +294,33 @@ impl Default for Debug {
}
}
}
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct Topology {
/// The uniform delay every which clients are querying the directory server
/// to try to obtain a compatible network topology to send sphinx packets through.
#[serde(with = "humantime_serde")]
pub topology_refresh_rate: Duration,
/// During topology refresh, test packets are sent through every single possible network
/// path. This timeout determines waiting period until it is decided that the packet
/// did not reach its destination.
#[serde(with = "humantime_serde")]
pub topology_resolution_timeout: Duration,
/// Specifies whether the client should not refresh the network topology after obtaining
/// the first valid instance.
/// Supersedes `topology_refresh_rate_ms`.
pub disable_refreshing: bool,
}
impl Default for Topology {
fn default() -> Self {
Topology {
topology_refresh_rate: DEFAULT_TOPOLOGY_REFRESH_RATE,
topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT,
disable_refreshing: false,
}
}
}
+1
View File
@@ -94,6 +94,7 @@ impl From<ConfigV1_1_21> for Config {
verloc: value.verloc.into(),
logging: value.logging.into(),
debug: value.debug.into(),
topology: Default::default(),
}
}
}
@@ -7,12 +7,17 @@ use crate::node::listener::connection_handler::packet_processing::{
use crate::node::packet_delayforwarder::PacketDelayForwardSender;
use crate::node::TaskClient;
use futures::StreamExt;
use nym_client_core::client::topology_control::accessor::TopologyAccessor;
use nym_crypto::asymmetric::encryption;
use nym_mixnode_common::measure;
use nym_noise::upgrade_noise_responder_with_topology;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::framing::codec::NymCodec;
use nym_sphinx::framing::packet::FramedNymPacket;
use nym_sphinx::Delay as SphinxDelay;
use nym_validator_client::NymApiClient;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::time::Instant;
use tokio_util::codec::Framed;
@@ -25,16 +30,25 @@ pub(crate) mod packet_processing;
pub(crate) struct ConnectionHandler {
packet_processor: PacketProcessor,
delay_forwarding_channel: PacketDelayForwardSender,
topology_access: TopologyAccessor,
api_client: NymApiClient,
local_identity: Arc<encryption::KeyPair>,
}
impl ConnectionHandler {
pub(crate) fn new(
packet_processor: PacketProcessor,
delay_forwarding_channel: PacketDelayForwardSender,
topology_access: TopologyAccessor,
api_client: NymApiClient,
local_identity: Arc<encryption::KeyPair>,
) -> Self {
ConnectionHandler {
packet_processor,
delay_forwarding_channel,
topology_access,
api_client,
local_identity,
}
}
@@ -85,8 +99,43 @@ impl ConnectionHandler {
mut shutdown: TaskClient,
) {
debug!("Starting connection handler for {:?}", remote);
shutdown.mark_as_success();
let mut framed_conn = Framed::new(conn, NymCodec);
let topology_ref = match self.topology_access.current_topology().await {
Some(topology) => topology,
None => {
error!("Cannot perform Noise handshake to {remote}, due to topology error");
return;
}
};
let epoch_id = match self.api_client.get_current_epoch_id().await {
Ok(id) => id,
Err(err) => {
error!("Cannot perform Noise handshake to {remote}, due to epoch id error - {err}");
return;
}
};
let noise_stream = match upgrade_noise_responder_with_topology(
conn,
Default::default(),
&topology_ref,
epoch_id,
&self.local_identity.public_key().to_bytes(),
&self.local_identity.private_key().to_bytes(),
)
.await
{
Ok(noise_stream) => noise_stream,
Err(err) => {
error!("Failed to perform Noise handshake with {remote} - {err}");
return;
}
};
debug!("Noise responder handshake completed for {:?}", remote);
let mut framed_conn = Framed::new(noise_stream, NymCodec);
while !shutdown.is_shutdown() {
tokio::select! {
biased;
@@ -118,10 +167,7 @@ impl ConnectionHandler {
}
}
info!(
"Closing connection from {:?}",
framed_conn.into_inner().peer_addr()
);
info!("Closing connection from {:?}", remote);
log::trace!("ConnectionHandler: Exiting");
}
}
+87 -5
View File
@@ -1,7 +1,7 @@
// Copyright 2020-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::config::Config;
use crate::config::{Config, Topology};
use crate::node::http::{
description::description,
hardware::hardware,
@@ -17,14 +17,21 @@ use crate::node::node_statistics::SharedNodeStats;
use crate::node::packet_delayforwarder::{DelayForwarder, PacketDelayForwardSender};
use nym_bin_common::output_format::OutputFormat;
use nym_bin_common::version_checker::parse_version;
use nym_client_core::client::topology_control::accessor::TopologyAccessor;
use nym_client_core::client::topology_control::nym_api_provider::NymApiTopologyProvider;
use nym_client_core::client::topology_control::TopologyRefresher;
use nym_client_core::client::topology_control::TopologyRefresherConfig;
use nym_crypto::asymmetric::{encryption, identity};
use nym_mixnode_common::verloc::{self, AtomicVerlocResult, VerlocMeasurer};
use nym_task::{TaskClient, TaskManager};
use nym_topology::provider_trait::TopologyProvider;
use nym_validator_client::NymApiClient;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::net::SocketAddr;
use std::process;
use std::sync::Arc;
use url::Url;
#[cfg(feature = "cpucycles")]
use tracing::{error, info, warn};
@@ -146,6 +153,8 @@ impl MixNode {
&self,
node_stats_update_sender: node_statistics::UpdateSender,
delay_forwarding_channel: PacketDelayForwardSender,
topology_access: TopologyAccessor,
api_client: NymApiClient,
shutdown: TaskClient,
) {
info!("Starting socket listener...");
@@ -153,7 +162,13 @@ impl MixNode {
let packet_processor =
PacketProcessor::new(self.sphinx_keypair.private_key(), node_stats_update_sender);
let connection_handler = ConnectionHandler::new(packet_processor, delay_forwarding_channel);
let connection_handler = ConnectionHandler::new(
packet_processor,
delay_forwarding_channel,
topology_access,
api_client,
Arc::clone(&self.sphinx_keypair),
);
let listening_address = SocketAddr::new(
self.config.mixnode.listening_address,
@@ -166,6 +181,8 @@ impl MixNode {
fn start_packet_delay_forwarder(
&mut self,
node_stats_update_sender: node_statistics::UpdateSender,
topology_access: TopologyAccessor,
api_client: NymApiClient,
shutdown: TaskClient,
) -> PacketDelayForwardSender {
info!("Starting packet delay-forwarder...");
@@ -179,7 +196,12 @@ impl MixNode {
);
let mut packet_forwarder = DelayForwarder::new(
nym_mixnet_client::Client::new(client_config),
nym_mixnet_client::Client::new(
client_config,
topology_access,
api_client,
Arc::clone(&self.sphinx_keypair),
),
node_stats_update_sender,
shutdown,
);
@@ -230,6 +252,47 @@ impl MixNode {
atomic_verloc_results
}
fn setup_topology_provider(nym_api_urls: Vec<Url>) -> Box<dyn TopologyProvider + Send + Sync> {
// if no custom provider was ... provided ..., create one using nym-api
Box::new(NymApiTopologyProvider::new(
nym_api_urls,
env!("CARGO_PKG_VERSION").to_string(),
))
}
// future responsible for periodically polling directory server and updating
// the current global view of topology
async fn start_topology_refresher(
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
topology_config: Topology,
topology_accessor: TopologyAccessor,
mut shutdown: TaskClient,
) {
let topology_refresher_config =
TopologyRefresherConfig::new(topology_config.topology_refresh_rate);
let mut topology_refresher = TopologyRefresher::new(
topology_refresher_config,
topology_accessor,
topology_provider,
);
// before returning, block entire runtime to refresh the current network view so that any
// components depending on topology would see a non-empty view
info!("Obtaining initial network topology");
topology_refresher.try_refresh().await;
if topology_config.disable_refreshing {
// if we're not spawning the refresher, don't cause shutdown immediately
info!("The topology refesher is not going to be started");
shutdown.mark_as_success();
} else {
// don't spawn the refresher if we don't want to be refreshing the topology.
// only use the initial values obtained
info!("Starting topology refresher...");
topology_refresher.start_with_shutdown(shutdown);
}
}
fn random_api_client(&self) -> nym_validator_client::NymApiClient {
let endpoints = self.config.get_nym_api_endpoints();
let nym_api = endpoints
@@ -276,11 +339,30 @@ impl MixNode {
let (node_stats_pointer, node_stats_update_sender) =
self.start_node_stats_controller(shutdown.subscribe());
let delay_forwarding_channel = self
.start_packet_delay_forwarder(node_stats_update_sender.clone(), shutdown.subscribe());
let topology_provider = Self::setup_topology_provider(self.config.get_nym_api_endpoints());
let shared_topology_access = TopologyAccessor::new();
Self::start_topology_refresher(
topology_provider,
self.config.topology,
shared_topology_access.clone(),
shutdown.subscribe(),
)
.await;
let random_api_client = self.random_api_client();
let delay_forwarding_channel = self.start_packet_delay_forwarder(
node_stats_update_sender.clone(),
shared_topology_access.clone(),
random_api_client.clone(),
shutdown.subscribe(),
);
self.start_socket_listener(
node_stats_update_sender,
delay_forwarding_channel,
shared_topology_access,
random_api_client,
shutdown.subscribe(),
);
let atomic_verloc_results = self.start_verloc_measurements(shutdown.subscribe());
+10 -1
View File
@@ -24,6 +24,7 @@ use nym_bin_common::logging::setup_logging;
use nym_contract_cache::cache::NymContractCache;
use nym_sphinx::receiver::SphinxMessageReceiver;
use nym_task::TaskManager;
use nym_validator_client::NymApiClient;
use rand::rngs::OsRng;
use std::error::Error;
use support::{http, nyxd};
@@ -157,12 +158,20 @@ async fn start_nym_api_tasks(
// if network monitor is enabled, the storage MUST BE available
let storage = maybe_storage.unwrap();
let url = format!(
"http://{}:{}",
rocket.config().address,
rocket.config().port
)
.parse()
.unwrap();
let nym_api_client = NymApiClient::new(url);
network_monitor::start::<SphinxMessageReceiver>(
&config.network_monitor,
nym_contract_cache_state,
storage,
nyxd_client.clone(),
nym_api_client,
&shutdown,
)
.await;
+20 -1
View File
@@ -22,6 +22,7 @@ use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::params::PacketType;
use nym_sphinx::receiver::MessageReceiver;
use nym_task::TaskManager;
use nym_validator_client::NymApiClient;
use std::sync::Arc;
pub(crate) mod gateways_reader;
@@ -36,10 +37,12 @@ pub(crate) fn setup<'a>(
nym_contract_cache_state: &NymContractCache,
storage: &NymApiStorage,
nyxd_client: nyxd::Client,
nym_api_client: NymApiClient,
) -> NetworkMonitorBuilder<'a> {
NetworkMonitorBuilder::new(
config,
nyxd_client,
nym_api_client,
storage.to_owned(),
nym_contract_cache_state.to_owned(),
)
@@ -48,6 +51,7 @@ pub(crate) fn setup<'a>(
pub(crate) struct NetworkMonitorBuilder<'a> {
config: &'a config::NetworkMonitor,
nyxd_client: nyxd::Client,
nym_api_client: NymApiClient,
node_status_storage: NymApiStorage,
validator_cache: NymContractCache,
}
@@ -56,12 +60,14 @@ impl<'a> NetworkMonitorBuilder<'a> {
pub(crate) fn new(
config: &'a config::NetworkMonitor,
nyxd_client: nyxd::Client,
nym_api_client: NymApiClient,
node_status_storage: NymApiStorage,
validator_cache: NymContractCache,
) -> Self {
NetworkMonitorBuilder {
config,
nyxd_client,
nym_api_client,
node_status_storage,
validator_cache,
}
@@ -105,8 +111,10 @@ impl<'a> NetworkMonitorBuilder<'a> {
self.config,
gateway_status_update_sender,
Arc::clone(&identity_keypair),
Arc::clone(&encryption_keypair),
self.config.debug.gateway_sending_rate,
bandwidth_controller,
self.nym_api_client,
self.config.debug.disabled_credentials_mode,
);
@@ -177,18 +185,22 @@ fn new_packet_sender(
config: &config::NetworkMonitor,
gateways_status_updater: GatewayClientUpdateSender,
local_identity: Arc<identity::KeyPair>,
local_sphinx: Arc<encryption::KeyPair>,
max_sending_rate: usize,
bandwidth_controller: BandwidthController<nyxd::Client, PersistentStorage>,
nym_api_client: nym_validator_client::NymApiClient,
disabled_credentials_mode: bool,
) -> PacketSender {
PacketSender::new(
gateways_status_updater,
local_identity,
local_sphinx,
config.debug.gateway_response_timeout,
config.debug.gateway_connection_timeout,
config.debug.max_concurrent_gateway_clients,
max_sending_rate,
bandwidth_controller,
nym_api_client,
disabled_credentials_mode,
)
}
@@ -221,9 +233,16 @@ pub(crate) async fn start<R: MessageReceiver + Send + 'static>(
nym_contract_cache_state: &NymContractCache,
storage: &NymApiStorage,
nyxd_client: nyxd::Client,
nym_api_client: NymApiClient,
shutdown: &TaskManager,
) {
let monitor_builder = setup(config, nym_contract_cache_state, storage, nyxd_client);
let monitor_builder = setup(
config,
nym_contract_cache_state,
storage,
nyxd_client,
nym_api_client,
);
info!("Starting network monitor...");
let runnables: NetworkMonitorRunnables<R> = monitor_builder.build().await;
runnables.spawn_tasks(shutdown);
@@ -310,6 +310,7 @@ impl PacketPreparer {
GatewayPackets::new(
route.gateway_clients_address(),
route.gateway_identity(),
route.gateway_sphinx(),
mix_packets,
)
}
@@ -387,6 +388,7 @@ impl PacketPreparer {
let route_ext = test_route.test_message_ext(test_nonce);
let gateway_address = test_route.gateway_clients_address();
let gateway_identity = test_route.gateway_identity();
let gateway_sphinx = test_route.gateway_sphinx();
let mut mix_tester = self.ephemeral_mix_tester(test_route);
@@ -408,7 +410,9 @@ impl PacketPreparer {
let gateway_packets = all_gateway_packets
.entry(gateway_identity.to_bytes())
.or_insert_with(|| GatewayPackets::empty(gateway_address, gateway_identity));
.or_insert_with(|| {
GatewayPackets::empty(gateway_address, gateway_identity, gateway_sphinx)
});
gateway_packets.push_packets(mix_packets);
// and generate test packets for gateways (note the variable recipient)
@@ -436,7 +440,9 @@ impl PacketPreparer {
// or create a new one
let gateway_packets = all_gateway_packets
.entry(gateway_identity.to_bytes())
.or_insert_with(|| GatewayPackets::empty(gateway_address, gateway_identity));
.or_insert_with(|| {
GatewayPackets::empty(gateway_address, gateway_identity, gateway_sphinx)
});
gateway_packets.push_packets(gateway_mix_packets);
}
}
+37 -3
View File
@@ -15,11 +15,13 @@ use log::{debug, info, trace, warn};
use nym_bandwidth_controller::BandwidthController;
use nym_config::defaults::REMAINING_BANDWIDTH_THRESHOLD;
use nym_credential_storage::persistent_storage::PersistentStorage;
use nym_crypto::asymmetric::encryption;
use nym_crypto::asymmetric::identity::{self, PUBLIC_KEY_LENGTH};
use nym_gateway_client::error::GatewayClientError;
use nym_gateway_client::{AcknowledgementReceiver, GatewayClient, MixnetMessageReceiver};
use nym_sphinx::forwarding::packet::MixPacket;
use nym_task::TaskClient;
use nym_validator_client::NymApiClient;
use pin_project::pin_project;
use std::mem;
use std::num::NonZeroUsize;
@@ -38,6 +40,8 @@ pub(crate) struct GatewayPackets {
/// Public key of the target gateway.
pub(crate) pub_key: identity::PublicKey,
pub(crate) encryption_key: encryption::PublicKey,
/// All the packets that are going to get sent to the gateway.
pub(crate) packets: Vec<MixPacket>,
}
@@ -46,19 +50,26 @@ impl GatewayPackets {
pub(crate) fn new(
clients_address: String,
pub_key: identity::PublicKey,
encryption_key: encryption::PublicKey,
packets: Vec<MixPacket>,
) -> Self {
GatewayPackets {
clients_address,
pub_key,
encryption_key,
packets,
}
}
pub(crate) fn empty(clients_address: String, pub_key: identity::PublicKey) -> Self {
pub(crate) fn empty(
clients_address: String,
pub_key: identity::PublicKey,
encryption_key: encryption::PublicKey,
) -> Self {
GatewayPackets {
clients_address,
pub_key,
encryption_key,
packets: Vec::new(),
}
}
@@ -79,6 +90,7 @@ impl GatewayPackets {
struct FreshGatewayClientData {
gateways_status_updater: GatewayClientUpdateSender,
local_identity: Arc<identity::KeyPair>,
local_sphinx: Arc<encryption::KeyPair>,
gateway_response_timeout: Duration,
bandwidth_controller: BandwidthController<nyxd::Client, PersistentStorage>,
disabled_credentials_mode: bool,
@@ -122,6 +134,7 @@ pub(crate) struct PacketSender {
active_gateway_clients: ActiveGatewayClients,
fresh_gateway_client_data: Arc<FreshGatewayClientData>,
nym_api_client: NymApiClient,
gateway_connection_timeout: Duration,
max_concurrent_clients: usize,
max_sending_rate: usize,
@@ -134,11 +147,13 @@ impl PacketSender {
pub(crate) fn new(
gateways_status_updater: GatewayClientUpdateSender,
local_identity: Arc<identity::KeyPair>,
local_sphinx: Arc<encryption::KeyPair>,
gateway_response_timeout: Duration,
gateway_connection_timeout: Duration,
max_concurrent_clients: usize,
max_sending_rate: usize,
bandwidth_controller: BandwidthController<nyxd::Client, PersistentStorage>,
nym_api_client: NymApiClient,
disabled_credentials_mode: bool,
) -> Self {
PacketSender {
@@ -146,10 +161,12 @@ impl PacketSender {
fresh_gateway_client_data: Arc::new(FreshGatewayClientData {
gateways_status_updater,
local_identity,
local_sphinx,
gateway_response_timeout,
bandwidth_controller,
disabled_credentials_mode,
}),
nym_api_client,
gateway_connection_timeout,
max_concurrent_clients,
max_sending_rate,
@@ -171,7 +188,9 @@ impl PacketSender {
fn new_gateway_client_handle(
address: String,
identity: identity::PublicKey,
sphinx: encryption::PublicKey,
fresh_gateway_client_data: &FreshGatewayClientData,
nym_api_client: NymApiClient,
) -> (
GatewayClientHandle,
(MixnetMessageReceiver, AcknowledgementReceiver),
@@ -187,12 +206,15 @@ impl PacketSender {
let mut gateway_client = GatewayClient::new(
address,
Arc::clone(&fresh_gateway_client_data.local_identity),
Arc::clone(&fresh_gateway_client_data.local_sphinx),
identity,
sphinx,
None,
message_sender,
ack_sender,
fresh_gateway_client_data.gateway_response_timeout,
Some(fresh_gateway_client_data.bandwidth_controller.clone()),
nym_api_client,
nym_task::TaskClient::dummy(),
);
@@ -267,14 +289,21 @@ impl PacketSender {
async fn create_new_gateway_client_handle_and_authenticate(
address: String,
identity: identity::PublicKey,
sphinx: encryption::PublicKey,
fresh_gateway_client_data: &FreshGatewayClientData,
nym_api_client: NymApiClient,
gateway_connection_timeout: Duration,
) -> Option<(
GatewayClientHandle,
(MixnetMessageReceiver, AcknowledgementReceiver),
)> {
let (new_client, (message_receiver, ack_receiver)) =
Self::new_gateway_client_handle(address, identity, fresh_gateway_client_data);
let (new_client, (message_receiver, ack_receiver)) = Self::new_gateway_client_handle(
address,
identity,
sphinx,
fresh_gateway_client_data,
nym_api_client,
);
// Put this in timeout in case the gateway has incorrectly set their ulimit and our connection
// gets stuck in their TCP queue and just hangs on our end but does not terminate
@@ -332,6 +361,7 @@ impl PacketSender {
gateway_connection_timeout: Duration,
packets: GatewayPackets,
fresh_gateway_client_data: Arc<FreshGatewayClientData>,
nym_api_client: NymApiClient,
client: Option<GatewayClientHandle>,
max_sending_rate: usize,
) -> Option<GatewayClientHandle> {
@@ -351,7 +381,9 @@ impl PacketSender {
Self::create_new_gateway_client_handle_and_authenticate(
packets.clients_address,
packets.pub_key,
packets.encryption_key,
&fresh_gateway_client_data,
nym_api_client,
gateway_connection_timeout,
)
.await?;
@@ -479,6 +511,7 @@ impl PacketSender {
// we're not interacting with right now)
drop(guard);
let nym_api_client = &self.nym_api_client;
// can't chain it all nicely together as there's no adapter method defined on Stream directly
// for ForEachConcurrentClientUse
let used_clients = ForEachConcurrentClientUse::new(
@@ -489,6 +522,7 @@ impl PacketSender {
gateway_connection_timeout,
packets,
fresh_data,
nym_api_client.clone(),
client,
max_sending_rate,
)
@@ -3,7 +3,7 @@
use crate::network_monitor::test_packet::NymApiTestMessageExt;
use crate::network_monitor::ROUTE_TESTING_TEST_NONCE;
use nym_crypto::asymmetric::identity;
use nym_crypto::asymmetric::{encryption, identity};
use nym_topology::{gateway, mix, NymTopology};
use std::fmt::{Debug, Formatter};
@@ -63,6 +63,10 @@ impl TestRoute {
self.gateway().identity_key
}
pub(crate) fn gateway_sphinx(&self) -> encryption::PublicKey {
self.gateway().sphinx_key
}
pub(crate) fn topology(&self) -> &NymTopology {
&self.nodes
}
+2
View File
@@ -17,8 +17,10 @@ pub(crate) mod routes;
pub(crate) fn nym_contract_cache_routes(settings: &OpenApiSettings) -> (Vec<Route>, OpenApi) {
openapi_get_routes_spec![
settings: routes::get_mixnodes,
routes::get_mixnodes_all,
routes::get_mixnodes_detailed,
routes::get_gateways,
routes::get_gateways_all,
routes::get_active_set,
routes::get_active_set_detailed,
routes::get_rewarded_set,
+12
View File
@@ -25,6 +25,12 @@ pub async fn get_mixnodes(cache: &State<NymContractCache>) -> Json<Vec<MixNodeDe
Json(cache.mixnodes_filtered().await)
}
#[openapi(tag = "contract-cache")]
#[get("/mixnodes/all")]
pub async fn get_mixnodes_all(cache: &State<NymContractCache>) -> Json<Vec<MixNodeDetails>> {
Json(cache.mixnodes_all().await)
}
// DEPRECATED: this endpoint now lives in `node_status_api`. Once all consumers are updated,
// replace this with
// ```
@@ -46,6 +52,12 @@ pub async fn get_gateways(cache: &State<NymContractCache>) -> Json<Vec<GatewayBo
Json(cache.gateways_filtered().await)
}
#[openapi(tag = "contract-cache")]
#[get("/gateways/all")]
pub async fn get_gateways_all(cache: &State<NymContractCache>) -> Json<Vec<GatewayBond>> {
Json(cache.gateways_all().await)
}
#[openapi(tag = "contract-cache")]
#[get("/mixnodes/rewarded")]
pub async fn get_rewarded_set(cache: &State<NymContractCache>) -> Json<Vec<MixNodeDetails>> {
@@ -0,0 +1,5 @@
{
"name": "@nymproject/nym-client-wasm",
"version": "1.0.0",
"sideEffects": false
}