Compare commits
44 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 88f18b7a4c | |||
| d919e5199a | |||
| 25b9feef5f | |||
| db112eaa45 | |||
| da4f9bb3b2 | |||
| d710242566 | |||
| 815920c9f1 | |||
| 50a45eab5a | |||
| 2a47420e0b | |||
| b5a2b79eeb | |||
| d00d4bd73e | |||
| 33ea1501d0 | |||
| aa48d18753 | |||
| 4bfe533b51 | |||
| abaa64a54b | |||
| 7ca4c3b7b1 | |||
| 0d81c7d765 | |||
| 6e3a84b759 | |||
| ba96744dd8 | |||
| c28410f6de | |||
| 203633cabf | |||
| 331a0328c7 | |||
| 9e3bb6ef24 | |||
| 4014467496 | |||
| 3cd17be26f | |||
| 9d91145f0a | |||
| 83d0dfc657 | |||
| f91a22cb6a | |||
| 981f567131 | |||
| c58331f9b0 | |||
| 5d26fefaaa | |||
| dc77e3f962 | |||
| eca406d9a7 | |||
| 139bde0176 | |||
| a4c0be13f8 | |||
| f680222b91 | |||
| 27b399331b | |||
| 3228bb5aa0 | |||
| 8e4516a0a8 | |||
| 234656aba8 | |||
| 15f43c705a | |||
| 044251d60f | |||
| 5424568e81 | |||
| b939978c28 |
Generated
+337
-301
File diff suppressed because it is too large
Load Diff
@@ -45,6 +45,9 @@ use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender,
|
||||
use nym_task::{TaskClient, TaskManager};
|
||||
use nym_topology::provider_trait::TopologyProvider;
|
||||
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
|
||||
use nym_validator_client::NymApiClient;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use std::sync::Arc;
|
||||
use tap::TapFallible;
|
||||
use url::Url;
|
||||
@@ -289,6 +292,7 @@ where
|
||||
bandwidth_controller: Option<BandwidthController<C, S::CredentialStore>>,
|
||||
mixnet_message_sender: MixnetMessageSender,
|
||||
ack_sender: AcknowledgementSender,
|
||||
api_client: NymApiClient,
|
||||
shutdown: TaskClient,
|
||||
) -> Result<GatewayClient<C, S::CredentialStore>, ClientCoreError>
|
||||
where
|
||||
@@ -311,20 +315,26 @@ where
|
||||
|
||||
let gateway_address = gateway_config.gateway_listener.clone();
|
||||
let gateway_id = gateway_config.gateway_id;
|
||||
let gateway_sphinx = gateway_config.gateway_sphinx;
|
||||
|
||||
// TODO: in theory, at this point, this should be infallible
|
||||
let gateway_identity = identity::PublicKey::from_base58_string(gateway_id)
|
||||
.map_err(ClientCoreError::UnableToCreatePublicKeyFromGatewayId)?;
|
||||
let gateway_sphinx_key = encryption::PublicKey::from_base58_string(gateway_sphinx)
|
||||
.map_err(ClientCoreError::UnableToCreateSphinxKeyFromGatewayId)?;
|
||||
|
||||
GatewayClient::new(
|
||||
gateway_address,
|
||||
managed_keys.identity_keypair(),
|
||||
managed_keys.encryption_keypair(),
|
||||
gateway_identity,
|
||||
gateway_sphinx_key,
|
||||
Some(managed_keys.must_get_gateway_shared_key()),
|
||||
mixnet_message_sender,
|
||||
ack_sender,
|
||||
config.debug.gateway_connection.gateway_response_timeout,
|
||||
bandwidth_controller,
|
||||
api_client,
|
||||
shutdown,
|
||||
)
|
||||
};
|
||||
@@ -343,6 +353,15 @@ where
|
||||
Ok(gateway_client)
|
||||
}
|
||||
|
||||
fn random_api_client(&self) -> nym_validator_client::NymApiClient {
|
||||
let endpoints = self.config.get_nym_api_endpoints();
|
||||
let nym_api = endpoints
|
||||
.choose(&mut thread_rng())
|
||||
.expect("The list of validator apis is empty");
|
||||
|
||||
nym_validator_client::NymApiClient::new(nym_api.clone())
|
||||
}
|
||||
|
||||
fn setup_topology_provider(
|
||||
custom_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
|
||||
provider_from_config: config::TopologyStructure,
|
||||
@@ -481,6 +500,7 @@ where
|
||||
{
|
||||
info!("Starting nym client");
|
||||
|
||||
let random_api_client = self.random_api_client();
|
||||
// derive (or load) client keys and gateway configuration
|
||||
let init_res = Self::initialise_keys_and_gateway(
|
||||
self.setup_method,
|
||||
@@ -535,6 +555,7 @@ where
|
||||
bandwidth_controller,
|
||||
mixnet_messages_sender,
|
||||
ack_sender,
|
||||
random_api_client,
|
||||
task_manager.subscribe(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -83,6 +83,16 @@ impl<'a> TopologyReadPermit<'a> {
|
||||
|
||||
Ok(topology)
|
||||
}
|
||||
|
||||
pub fn try_get_raw_topology_ref(&'a self) -> Result<&'a NymTopology, NymTopologyError> {
|
||||
// 1. Have we managed to get anything from the refresher, i.e. have the nym-api queries gone through?
|
||||
let topology = self
|
||||
.permit
|
||||
.as_ref()
|
||||
.ok_or(NymTopologyError::EmptyNetworkTopology)?;
|
||||
|
||||
Ok(topology)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<RwLockReadGuard<'a, Option<NymTopology>>> for TopologyReadPermit<'a> {
|
||||
|
||||
@@ -9,9 +9,9 @@ use nym_topology::provider_trait::TopologyProvider;
|
||||
use nym_topology::NymTopologyError;
|
||||
use std::time::Duration;
|
||||
|
||||
mod accessor;
|
||||
pub mod accessor;
|
||||
pub mod geo_aware_provider;
|
||||
pub(crate) mod nym_api_provider;
|
||||
pub mod nym_api_provider;
|
||||
|
||||
// TODO: move it to config later
|
||||
const MAX_FAILURE_COUNT: usize = 10;
|
||||
|
||||
@@ -9,7 +9,7 @@ use rand::prelude::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use url::Url;
|
||||
|
||||
pub(crate) struct NymApiTopologyProvider {
|
||||
pub struct NymApiTopologyProvider {
|
||||
validator_client: nym_validator_client::client::NymApiClient,
|
||||
nym_api_urls: Vec<Url>,
|
||||
|
||||
@@ -18,7 +18,7 @@ pub(crate) struct NymApiTopologyProvider {
|
||||
}
|
||||
|
||||
impl NymApiTopologyProvider {
|
||||
pub(crate) fn new(mut nym_api_urls: Vec<Url>, client_version: String) -> Self {
|
||||
pub fn new(mut nym_api_urls: Vec<Url>, client_version: String) -> Self {
|
||||
nym_api_urls.shuffle(&mut thread_rng());
|
||||
|
||||
NymApiTopologyProvider {
|
||||
@@ -77,13 +77,34 @@ impl NymApiTopologyProvider {
|
||||
Ok(gateways) => gateways,
|
||||
};
|
||||
|
||||
let all_mixes = match self.validator_client.get_all_mixnodes().await {
|
||||
Err(err) => {
|
||||
error!("failed to get all mixes - {err}");
|
||||
return None;
|
||||
}
|
||||
Ok(epoch) => epoch,
|
||||
};
|
||||
|
||||
let all_gateways = match self.validator_client.get_all_gateways().await {
|
||||
Err(err) => {
|
||||
error!("failed to get all gateways - {err}");
|
||||
return None;
|
||||
}
|
||||
Ok(epoch) => epoch,
|
||||
};
|
||||
|
||||
let topology = nym_topology_from_detailed(mixnodes, gateways)
|
||||
.with_all_mixes(all_mixes.clone())
|
||||
.with_all_gateways(all_gateways.clone())
|
||||
.filter_system_version(&self.client_version);
|
||||
|
||||
if let Err(err) = self.check_layer_distribution(&topology) {
|
||||
warn!("The current filtered active topology has extremely skewed layer distribution. It cannot be used: {err}");
|
||||
self.use_next_nym_api();
|
||||
None
|
||||
let empty_topology = NymTopology::empty()
|
||||
.with_all_mixes(all_mixes)
|
||||
.with_all_gateways(all_gateways);
|
||||
Some(empty_topology)
|
||||
} else {
|
||||
Some(topology)
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use nym_config::defaults::NymNetworkDetails;
|
||||
use nym_crypto::asymmetric::identity;
|
||||
use nym_crypto::asymmetric::{encryption, identity};
|
||||
use nym_sphinx::params::{PacketSize, PacketType};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::time::Duration;
|
||||
@@ -216,6 +216,8 @@ pub struct GatewayEndpointConfig {
|
||||
/// If initially omitted, a random gateway will be chosen from the available topology.
|
||||
pub gateway_id: String,
|
||||
|
||||
pub gateway_sphinx: String,
|
||||
|
||||
/// Address of the gateway owner to which the client should send messages.
|
||||
pub gateway_owner: String,
|
||||
|
||||
@@ -228,11 +230,13 @@ impl GatewayEndpointConfig {
|
||||
#[cfg_attr(target_arch = "wasm32", wasm_bindgen(constructor))]
|
||||
pub fn new(
|
||||
gateway_id: String,
|
||||
gateway_sphinx: String,
|
||||
gateway_owner: String,
|
||||
gateway_listener: String,
|
||||
) -> GatewayEndpointConfig {
|
||||
GatewayEndpointConfig {
|
||||
gateway_id,
|
||||
gateway_sphinx,
|
||||
gateway_owner,
|
||||
gateway_listener,
|
||||
}
|
||||
@@ -245,6 +249,11 @@ impl GatewayEndpointConfig {
|
||||
identity::PublicKey::from_base58_string(&self.gateway_id)
|
||||
.map_err(ClientCoreError::UnableToCreatePublicKeyFromGatewayId)
|
||||
}
|
||||
|
||||
pub fn try_get_gateway_sphinx_key(&self) -> Result<encryption::PublicKey, ClientCoreError> {
|
||||
encryption::PublicKey::from_base58_string(&self.gateway_sphinx)
|
||||
.map_err(ClientCoreError::UnableToCreateSphinxKeyFromGatewayId)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<nym_topology::gateway::Node> for GatewayEndpointConfig {
|
||||
@@ -252,6 +261,7 @@ impl From<nym_topology::gateway::Node> for GatewayEndpointConfig {
|
||||
let gateway_listener = node.clients_address();
|
||||
GatewayEndpointConfig {
|
||||
gateway_id: node.identity_key.to_base58_string(),
|
||||
gateway_sphinx: node.sphinx_key.to_base58_string(),
|
||||
gateway_owner: node.owner,
|
||||
gateway_listener,
|
||||
}
|
||||
|
||||
@@ -68,6 +68,7 @@ pub struct ConfigV1_1_20<T> {
|
||||
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Eq, Serialize)]
|
||||
pub struct GatewayEndpointConfigV1_1_20 {
|
||||
pub gateway_id: String,
|
||||
pub gateway_sphinx: String,
|
||||
pub gateway_owner: String,
|
||||
pub gateway_listener: String,
|
||||
}
|
||||
@@ -76,6 +77,7 @@ impl From<GatewayEndpointConfigV1_1_20> for GatewayEndpointConfigV1_1_20_2 {
|
||||
fn from(value: GatewayEndpointConfigV1_1_20) -> Self {
|
||||
GatewayEndpointConfigV1_1_20_2 {
|
||||
gateway_id: value.gateway_id,
|
||||
gateway_sphinx: value.gateway_sphinx,
|
||||
gateway_owner: value.gateway_owner,
|
||||
gateway_listener: value.gateway_listener,
|
||||
}
|
||||
|
||||
@@ -73,6 +73,8 @@ pub struct GatewayEndpointConfigV1_1_20_2 {
|
||||
/// If initially omitted, a random gateway will be chosen from the available topology.
|
||||
pub gateway_id: String,
|
||||
|
||||
pub gateway_sphinx: String,
|
||||
|
||||
/// Address of the gateway owner to which the client should send messages.
|
||||
pub gateway_owner: String,
|
||||
|
||||
@@ -84,6 +86,7 @@ impl From<GatewayEndpointConfigV1_1_20_2> for GatewayEndpointConfig {
|
||||
fn from(value: GatewayEndpointConfigV1_1_20_2) -> Self {
|
||||
GatewayEndpointConfig {
|
||||
gateway_id: value.gateway_id,
|
||||
gateway_sphinx: value.gateway_sphinx,
|
||||
gateway_owner: value.gateway_owner,
|
||||
gateway_listener: value.gateway_listener,
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use nym_crypto::asymmetric::encryption::KeyRecoveryError;
|
||||
use nym_crypto::asymmetric::identity::Ed25519RecoveryError;
|
||||
use nym_gateway_client::error::GatewayClientError;
|
||||
use nym_topology::gateway::GatewayConversionError;
|
||||
@@ -58,6 +59,9 @@ pub enum ClientCoreError {
|
||||
#[error("The gateway id is invalid - {0}")]
|
||||
UnableToCreatePublicKeyFromGatewayId(Ed25519RecoveryError),
|
||||
|
||||
#[error("The gateway sphinx is invalid - {0}")]
|
||||
UnableToCreateSphinxKeyFromGatewayId(KeyRecoveryError),
|
||||
|
||||
#[error("The identity of the gateway is unknown - did you run init?")]
|
||||
GatewayIdUnknown,
|
||||
|
||||
|
||||
@@ -6,9 +6,10 @@ use crate::error::ClientCoreError;
|
||||
use crate::init::RegistrationResult;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use log::{debug, info, trace, warn};
|
||||
use nym_crypto::asymmetric::identity;
|
||||
use nym_crypto::asymmetric::{encryption, identity};
|
||||
use nym_gateway_client::GatewayClient;
|
||||
use nym_topology::{filter::VersionFilterable, gateway};
|
||||
use nym_validator_client::NymApiClient;
|
||||
use rand::{seq::SliceRandom, Rng};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tap::TapFallible;
|
||||
@@ -201,13 +202,18 @@ pub(super) fn uniformly_random_gateway<R: Rng>(
|
||||
pub(super) async fn register_with_gateway(
|
||||
gateway: &GatewayEndpointConfig,
|
||||
our_identity: Arc<identity::KeyPair>,
|
||||
our_sphinx: Arc<encryption::KeyPair>,
|
||||
nym_api_client: NymApiClient,
|
||||
) -> Result<RegistrationResult, ClientCoreError> {
|
||||
let timeout = Duration::from_millis(1500);
|
||||
let mut gateway_client = GatewayClient::new_init(
|
||||
gateway.gateway_listener.clone(),
|
||||
gateway.try_get_gateway_identity_key()?,
|
||||
gateway.try_get_gateway_sphinx_key()?,
|
||||
our_identity.clone(),
|
||||
our_sphinx.clone(),
|
||||
timeout,
|
||||
nym_api_client,
|
||||
);
|
||||
gateway_client
|
||||
.establish_connection()
|
||||
|
||||
@@ -20,7 +20,9 @@ use nym_gateway_requests::registration::handshake::SharedKeys;
|
||||
use nym_sphinx::addressing::{clients::Recipient, nodes::NodeIdentity};
|
||||
use nym_topology::gateway;
|
||||
use nym_validator_client::client::IdentityKey;
|
||||
use nym_validator_client::NymApiClient;
|
||||
use rand::rngs::OsRng;
|
||||
use rand::seq::SliceRandom;
|
||||
use serde::Serialize;
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::sync::Arc;
|
||||
@@ -300,6 +302,7 @@ pub async fn setup_gateway_from<K, D>(
|
||||
details_store: &D,
|
||||
overwrite_data: bool,
|
||||
gateways: Option<&[gateway::Node]>,
|
||||
nym_api_client: NymApiClient,
|
||||
) -> Result<InitialisationResult, ClientCoreError>
|
||||
where
|
||||
K: KeyStore,
|
||||
@@ -417,9 +420,12 @@ where
|
||||
// get our identity key
|
||||
let our_identity = managed_keys.identity_keypair();
|
||||
|
||||
let our_sphinx = managed_keys.encryption_keypair();
|
||||
|
||||
// Establish connection, authenticate and generate keys for talking with the gateway
|
||||
let registration_result =
|
||||
helpers::register_with_gateway(&gateway_details, our_identity).await?;
|
||||
helpers::register_with_gateway(&gateway_details, our_identity, our_sphinx, nym_api_client)
|
||||
.await?;
|
||||
let shared_keys = registration_result.shared_keys;
|
||||
|
||||
let persisted_details = PersistedGatewayDetails::new(gateway_details, &shared_keys);
|
||||
@@ -457,12 +463,19 @@ where
|
||||
let mut rng = OsRng;
|
||||
let gateways = current_gateways(&mut rng, validator_servers.unwrap_or_default()).await?;
|
||||
|
||||
let nym_api = validator_servers
|
||||
.unwrap_or_default()
|
||||
.choose(&mut rng)
|
||||
.ok_or(ClientCoreError::ListOfNymApisIsEmpty)?;
|
||||
let client = nym_validator_client::client::NymApiClient::new(nym_api.clone());
|
||||
|
||||
setup_gateway_from(
|
||||
setup,
|
||||
key_store,
|
||||
details_store,
|
||||
overwrite_data,
|
||||
Some(&gateways),
|
||||
client,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -35,6 +35,9 @@ version = "0.13"
|
||||
default-features = false
|
||||
|
||||
# non-wasm-only dependencies
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.nym-noise]
|
||||
path = "../../nymnoise"
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio]
|
||||
version = "1.24.1"
|
||||
features = ["macros", "rt", "net", "sync", "time"]
|
||||
|
||||
@@ -14,7 +14,7 @@ use nym_bandwidth_controller::BandwidthController;
|
||||
use nym_coconut_interface::Credential;
|
||||
use nym_credential_storage::ephemeral_storage::EphemeralStorage as EphemeralCredentialStorage;
|
||||
use nym_credential_storage::storage::Storage as CredentialStorage;
|
||||
use nym_crypto::asymmetric::identity;
|
||||
use nym_crypto::asymmetric::{encryption, identity};
|
||||
use nym_gateway_requests::authentication::encrypted_address::EncryptedAddressBytes;
|
||||
use nym_gateway_requests::iv::IV;
|
||||
use nym_gateway_requests::registration::handshake::{client_handshake, SharedKeys};
|
||||
@@ -23,16 +23,23 @@ use nym_network_defaults::{REMAINING_BANDWIDTH_THRESHOLD, TOKENS_TO_BURN};
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use nym_task::TaskClient;
|
||||
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
|
||||
use nym_validator_client::NymApiClient;
|
||||
use rand::rngs::OsRng;
|
||||
use std::convert::TryFrom;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tungstenite::protocol::Message;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use nym_noise::upgrade_noise_initiator;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use std::net::SocketAddr;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::net::TcpStream;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::time::sleep;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio_tungstenite::connect_async;
|
||||
use tokio_tungstenite::client_async;
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use wasm_utils::websocket::JSWebsocket;
|
||||
@@ -48,12 +55,15 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {
|
||||
bandwidth_remaining: i64,
|
||||
gateway_address: String,
|
||||
gateway_identity: identity::PublicKey,
|
||||
gateway_sphinx: encryption::PublicKey,
|
||||
local_identity: Arc<identity::KeyPair>,
|
||||
local_sphinx: Arc<encryption::KeyPair>,
|
||||
shared_key: Option<Arc<SharedKeys>>,
|
||||
connection: SocketState,
|
||||
packet_router: PacketRouter,
|
||||
response_timeout_duration: Duration,
|
||||
bandwidth_controller: Option<BandwidthController<C, St>>,
|
||||
nym_api_client: NymApiClient,
|
||||
|
||||
// reconnection related variables
|
||||
/// Specifies whether client should try to reconnect to gateway on connection failure.
|
||||
@@ -74,13 +84,16 @@ impl<C, St> GatewayClient<C, St> {
|
||||
pub fn new(
|
||||
gateway_address: String,
|
||||
local_identity: Arc<identity::KeyPair>,
|
||||
local_sphinx: Arc<encryption::KeyPair>,
|
||||
gateway_identity: identity::PublicKey,
|
||||
gateway_sphinx: encryption::PublicKey,
|
||||
// TODO: make it mandatory. if you don't want to pass it, use `new_init`
|
||||
shared_key: Option<Arc<SharedKeys>>,
|
||||
mixnet_message_sender: MixnetMessageSender,
|
||||
ack_sender: AcknowledgementSender,
|
||||
response_timeout_duration: Duration,
|
||||
bandwidth_controller: Option<BandwidthController<C, St>>,
|
||||
nym_api_client: NymApiClient,
|
||||
shutdown: TaskClient,
|
||||
) -> Self {
|
||||
GatewayClient {
|
||||
@@ -88,13 +101,16 @@ impl<C, St> GatewayClient<C, St> {
|
||||
disabled_credentials_mode: true,
|
||||
bandwidth_remaining: 0,
|
||||
gateway_address,
|
||||
gateway_identity,
|
||||
local_identity,
|
||||
local_sphinx,
|
||||
gateway_identity,
|
||||
gateway_sphinx,
|
||||
shared_key,
|
||||
connection: SocketState::NotConnected,
|
||||
packet_router: PacketRouter::new(ack_sender, mixnet_message_sender, shutdown.clone()),
|
||||
response_timeout_duration,
|
||||
bandwidth_controller,
|
||||
nym_api_client,
|
||||
should_reconnect_on_failure: true,
|
||||
reconnection_attempts: DEFAULT_RECONNECTION_ATTEMPTS,
|
||||
reconnection_backoff: DEFAULT_RECONNECTION_BACKOFF,
|
||||
@@ -162,7 +178,59 @@ impl<C, St> GatewayClient<C, St> {
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub async fn establish_connection(&mut self) -> Result<(), GatewayClientError> {
|
||||
let ws_stream = match connect_async(&self.gateway_address).await {
|
||||
let socket_addr: SocketAddr = self.gateway_address.parse().unwrap();
|
||||
let connection_fut = TcpStream::connect(socket_addr);
|
||||
|
||||
//arbitrary TO, it's a POC
|
||||
let noise_conn = match tokio::time::timeout(Duration::from_secs(5), connection_fut).await {
|
||||
Ok(stream_res) => match stream_res {
|
||||
Ok(stream) => {
|
||||
debug!("Managed to establish connection to gateway");
|
||||
let current_epoch_id = match self.nym_api_client.get_current_epoch_id().await {
|
||||
Ok(epoch_id) => epoch_id,
|
||||
Err(err) => {
|
||||
error!("Failed to retrieve epoch Id for Noise handshake - {err}");
|
||||
return Err(GatewayClientError::ConnectionNotEstablished);
|
||||
}
|
||||
};
|
||||
|
||||
let noise_stream = match upgrade_noise_initiator(
|
||||
stream,
|
||||
None, //as a client, the gateway cannot know my pub key
|
||||
&self.local_sphinx.private_key().to_bytes(),
|
||||
&self.gateway_sphinx.to_bytes(),
|
||||
current_epoch_id,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(noise_stream) => noise_stream,
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Failed to perform Noise handshake with {:?} - {err}",
|
||||
self.gateway_address
|
||||
);
|
||||
return Err(GatewayClientError::ConnectionNotEstablished);
|
||||
}
|
||||
};
|
||||
debug!(
|
||||
"Noise initiator handshake completed for {:?}",
|
||||
self.gateway_address
|
||||
);
|
||||
noise_stream
|
||||
}
|
||||
Err(err) => {
|
||||
debug!("failed to establish connection to gateway (err: {})", err);
|
||||
return Err(GatewayClientError::NetworkIoError(err));
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
debug!("failed to connect to {} within 5s", self.gateway_address);
|
||||
return Err(GatewayClientError::Timeout);
|
||||
}
|
||||
};
|
||||
let ws_address = format!("ws://{}", self.gateway_address);
|
||||
|
||||
let ws_stream = match client_async(ws_address, noise_conn).await {
|
||||
Ok((ws_stream, _)) => ws_stream,
|
||||
Err(e) => return Err(GatewayClientError::NetworkError(e)),
|
||||
};
|
||||
@@ -766,8 +834,11 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
|
||||
pub fn new_init(
|
||||
gateway_address: String,
|
||||
gateway_identity: identity::PublicKey,
|
||||
gateway_sphinx: encryption::PublicKey,
|
||||
local_identity: Arc<identity::KeyPair>,
|
||||
local_sphinx: Arc<encryption::KeyPair>,
|
||||
response_timeout_duration: Duration,
|
||||
nym_api_client: NymApiClient,
|
||||
) -> Self {
|
||||
use futures::channel::mpsc;
|
||||
|
||||
@@ -784,12 +855,15 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
|
||||
bandwidth_remaining: 0,
|
||||
gateway_address,
|
||||
gateway_identity,
|
||||
gateway_sphinx,
|
||||
local_identity,
|
||||
local_sphinx,
|
||||
shared_key: None,
|
||||
connection: SocketState::NotConnected,
|
||||
packet_router,
|
||||
response_timeout_duration,
|
||||
bandwidth_controller: None,
|
||||
nym_api_client,
|
||||
should_reconnect_on_failure: false,
|
||||
reconnection_attempts: DEFAULT_RECONNECTION_ATTEMPTS,
|
||||
reconnection_backoff: DEFAULT_RECONNECTION_BACKOFF,
|
||||
@@ -817,12 +891,15 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
|
||||
bandwidth_remaining: self.bandwidth_remaining,
|
||||
gateway_address: self.gateway_address,
|
||||
gateway_identity: self.gateway_identity,
|
||||
gateway_sphinx: self.gateway_sphinx,
|
||||
local_sphinx: self.local_sphinx,
|
||||
local_identity: self.local_identity,
|
||||
shared_key: self.shared_key,
|
||||
connection: self.connection,
|
||||
packet_router: PacketRouter::new(ack_sender, mixnet_message_sender, shutdown.clone()),
|
||||
response_timeout_duration,
|
||||
bandwidth_controller,
|
||||
nym_api_client: self.nym_api_client,
|
||||
should_reconnect_on_failure: self.should_reconnect_on_failure,
|
||||
reconnection_attempts: self.reconnection_attempts,
|
||||
reconnection_backoff: self.reconnection_backoff,
|
||||
|
||||
@@ -19,6 +19,9 @@ pub enum GatewayClientError {
|
||||
#[error("There was a network error - {0}")]
|
||||
NetworkError(#[from] WsError),
|
||||
|
||||
#[error("There was a network error - {0}")]
|
||||
NetworkIoError(#[from] io::Error),
|
||||
|
||||
// TODO: see if `JsValue` is a reasonable type for this
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
#[error("There was a network error")]
|
||||
|
||||
@@ -14,9 +14,9 @@ use std::sync::Arc;
|
||||
use tungstenite::Message;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::net::TcpStream;
|
||||
use nym_noise::NoiseStream;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use wasm_bindgen_futures;
|
||||
@@ -26,7 +26,7 @@ use wasm_utils::websocket::JSWebsocket;
|
||||
// type alias for not having to type the whole thing every single time (and now it makes it easier
|
||||
// to use different types based on compilation target)
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
type WsConn = WebSocketStream<MaybeTlsStream<TcpStream>>;
|
||||
type WsConn = WebSocketStream<NoiseStream>;
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
type WsConn = JSWebsocket;
|
||||
|
||||
@@ -15,3 +15,7 @@ tokio-util = { version = "0.7.4", features = ["codec"] }
|
||||
# internal
|
||||
nym-sphinx = { path = "../../nymsphinx" }
|
||||
nym-task = { path = "../../task" }
|
||||
nym-client-core = { path = "../../client-core" }
|
||||
nym-noise = { path = "../../nymnoise"}
|
||||
nym-crypto = { path = "../../crypto" }
|
||||
nym-validator-client = { path = "../validator-client"}
|
||||
|
||||
@@ -4,11 +4,15 @@
|
||||
use futures::channel::mpsc;
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
use nym_client_core::client::topology_control::accessor::TopologyAccessor;
|
||||
use nym_crypto::asymmetric::encryption;
|
||||
use nym_noise::upgrade_noise_initiator_with_topology;
|
||||
use nym_sphinx::addressing::nodes::NymNodeRoutingAddress;
|
||||
use nym_sphinx::framing::codec::NymCodec;
|
||||
use nym_sphinx::framing::packet::FramedNymPacket;
|
||||
use nym_sphinx::params::PacketType;
|
||||
use nym_sphinx::NymPacket;
|
||||
use nym_validator_client::NymApiClient;
|
||||
use std::collections::HashMap;
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
@@ -59,6 +63,9 @@ pub trait SendWithoutResponse {
|
||||
pub struct Client {
|
||||
conn_new: HashMap<NymNodeRoutingAddress, ConnectionSender>,
|
||||
config: Config,
|
||||
topology_access: TopologyAccessor,
|
||||
api_client: NymApiClient,
|
||||
local_identity: Arc<encryption::KeyPair>,
|
||||
}
|
||||
|
||||
struct ConnectionSender {
|
||||
@@ -76,10 +83,18 @@ impl ConnectionSender {
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(config: Config) -> Client {
|
||||
pub fn new(
|
||||
config: Config,
|
||||
topology_access: TopologyAccessor,
|
||||
api_client: NymApiClient,
|
||||
local_identity: Arc<encryption::KeyPair>,
|
||||
) -> Client {
|
||||
Client {
|
||||
conn_new: HashMap::new(),
|
||||
config,
|
||||
topology_access,
|
||||
api_client,
|
||||
local_identity,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,6 +103,10 @@ impl Client {
|
||||
receiver: mpsc::Receiver<FramedNymPacket>,
|
||||
connection_timeout: Duration,
|
||||
current_reconnection: &AtomicU32,
|
||||
topology_access: TopologyAccessor,
|
||||
api_client: NymApiClient,
|
||||
local_public_key: &[u8],
|
||||
local_private_key: &[u8],
|
||||
) {
|
||||
let connection_fut = TcpStream::connect(address);
|
||||
|
||||
@@ -97,7 +116,40 @@ impl Client {
|
||||
debug!("Managed to establish connection to {}", address);
|
||||
// if we managed to connect, reset the reconnection count (whatever it might have been)
|
||||
current_reconnection.store(0, Ordering::Release);
|
||||
Framed::new(stream, NymCodec)
|
||||
//Get the topology, because we need the keys for the handshake
|
||||
let topology_ref = match topology_access.current_topology().await {
|
||||
Some(topology) => topology,
|
||||
None => {
|
||||
error!("Cannot perform Noise handshake to {address}, due to topology error");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let epoch_id = match api_client.get_current_epoch_id().await {
|
||||
Ok(id) => id,
|
||||
Err(err) => {
|
||||
error!("Cannot perform Noise handshake to {address}, due to epoch id error - {err}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let noise_stream = match upgrade_noise_initiator_with_topology(
|
||||
stream,
|
||||
&topology_ref,
|
||||
epoch_id,
|
||||
local_public_key,
|
||||
local_private_key,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(noise_stream) => noise_stream,
|
||||
Err(err) => {
|
||||
error!("Failed to perform Noise handshake with {address} - {err}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
debug!("Noise initiator handshake completed for {:?}", address);
|
||||
Framed::new(noise_stream, NymCodec)
|
||||
}
|
||||
Err(err) => {
|
||||
debug!(
|
||||
@@ -175,6 +227,11 @@ impl Client {
|
||||
// copy the value before moving into another task
|
||||
let initial_connection_timeout = self.config.initial_connection_timeout;
|
||||
|
||||
let topology_access_clone = self.topology_access.clone();
|
||||
let api_client_clone = self.api_client.clone();
|
||||
let local_public_key = self.local_identity.public_key().to_bytes();
|
||||
let local_private_key = self.local_identity.private_key().to_bytes();
|
||||
|
||||
tokio::spawn(async move {
|
||||
// before executing the manager, wait for what was specified, if anything
|
||||
if let Some(backoff) = backoff {
|
||||
@@ -187,6 +244,10 @@ impl Client {
|
||||
receiver,
|
||||
initial_connection_timeout,
|
||||
¤t_reconnection_attempt,
|
||||
topology_access_clone,
|
||||
api_client_clone,
|
||||
&local_public_key,
|
||||
&local_private_key,
|
||||
)
|
||||
.await
|
||||
});
|
||||
@@ -255,13 +316,16 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
fn dummy_client() -> Client {
|
||||
Client::new(Config {
|
||||
initial_reconnection_backoff: Duration::from_millis(10_000),
|
||||
maximum_reconnection_backoff: Duration::from_millis(300_000),
|
||||
initial_connection_timeout: Duration::from_millis(1_500),
|
||||
maximum_connection_buffer_size: 128,
|
||||
use_legacy_version: false,
|
||||
})
|
||||
Client::new(
|
||||
Config {
|
||||
initial_reconnection_backoff: Duration::from_millis(10_000),
|
||||
maximum_reconnection_backoff: Duration::from_millis(300_000),
|
||||
initial_connection_timeout: Duration::from_millis(1_500),
|
||||
maximum_connection_buffer_size: 128,
|
||||
use_legacy_version: false,
|
||||
},
|
||||
TopologyAccessor::new(),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -5,7 +5,11 @@ use crate::client::{Client, Config, SendWithoutResponse};
|
||||
use futures::channel::mpsc;
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
use nym_client_core::client::topology_control::accessor::TopologyAccessor;
|
||||
use nym_crypto::asymmetric::encryption;
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use nym_validator_client::NymApiClient;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
pub type MixForwardingSender = mpsc::UnboundedSender<MixPacket>;
|
||||
@@ -26,6 +30,9 @@ impl PacketForwarder {
|
||||
initial_connection_timeout: Duration,
|
||||
maximum_connection_buffer_size: usize,
|
||||
use_legacy_version: bool,
|
||||
topology_access: TopologyAccessor,
|
||||
api_client: NymApiClient,
|
||||
local_identity: Arc<encryption::KeyPair>,
|
||||
shutdown: nym_task::TaskClient,
|
||||
) -> (PacketForwarder, MixForwardingSender) {
|
||||
let client_config = Config::new(
|
||||
@@ -40,7 +47,12 @@ impl PacketForwarder {
|
||||
|
||||
(
|
||||
PacketForwarder {
|
||||
mixnet_client: Client::new(client_config),
|
||||
mixnet_client: Client::new(
|
||||
client_config,
|
||||
topology_access,
|
||||
api_client,
|
||||
local_identity,
|
||||
),
|
||||
packet_receiver,
|
||||
shutdown,
|
||||
},
|
||||
|
||||
@@ -260,10 +260,24 @@ impl NymApiClient {
|
||||
Ok(self.nym_api.get_mixnodes().await?)
|
||||
}
|
||||
|
||||
pub async fn get_all_mixnodes(&self) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
|
||||
Ok(self.nym_api.get_all_mixnodes().await?)
|
||||
}
|
||||
|
||||
pub async fn get_all_gateways(&self) -> Result<Vec<GatewayBond>, ValidatorClientError> {
|
||||
Ok(self.nym_api.get_all_gateways().await?)
|
||||
}
|
||||
|
||||
pub async fn get_cached_gateways(&self) -> Result<Vec<GatewayBond>, ValidatorClientError> {
|
||||
Ok(self.nym_api.get_gateways().await?)
|
||||
}
|
||||
|
||||
pub async fn get_current_epoch_id(
|
||||
&self,
|
||||
) -> Result<nym_mixnet_contract_common::EpochId, ValidatorClientError> {
|
||||
Ok(self.nym_api.get_current_epoch().await?.current_epoch_id())
|
||||
}
|
||||
|
||||
pub async fn get_gateway_core_status_count(
|
||||
&self,
|
||||
identity: IdentityKeyRef<'_>,
|
||||
|
||||
@@ -14,7 +14,7 @@ use nym_api_requests::models::{
|
||||
StakeSaturationResponse, UptimeResponse,
|
||||
};
|
||||
use nym_mixnet_contract_common::mixnode::MixNodeDetails;
|
||||
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, MixId};
|
||||
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, Interval, MixId};
|
||||
use nym_name_service_common::response::NamesListResponse;
|
||||
use nym_service_provider_directory_common::response::ServicesListResponse;
|
||||
use reqwest::{Response, StatusCode};
|
||||
@@ -130,11 +130,27 @@ impl Client {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_current_epoch(&self) -> Result<Interval, NymAPIError> {
|
||||
self.query_nym_api(
|
||||
&[routes::API_VERSION, routes::EPOCH, routes::CURRENT],
|
||||
NO_PARAMS,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
|
||||
self.query_nym_api(&[routes::API_VERSION, routes::MIXNODES], NO_PARAMS)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_all_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
|
||||
self.query_nym_api(
|
||||
&[routes::API_VERSION, routes::MIXNODES, routes::ALL],
|
||||
NO_PARAMS,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_mixnodes_detailed(&self) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
|
||||
self.query_nym_api(
|
||||
&[
|
||||
@@ -181,6 +197,14 @@ impl Client {
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_all_gateways(&self) -> Result<Vec<GatewayBond>, NymAPIError> {
|
||||
self.query_nym_api(
|
||||
&[routes::API_VERSION, routes::GATEWAYS, routes::ALL],
|
||||
NO_PARAMS,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_active_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
|
||||
self.query_nym_api(
|
||||
&[routes::API_VERSION, routes::MIXNODES, routes::ACTIVE],
|
||||
|
||||
@@ -6,6 +6,10 @@ use nym_network_defaults::NYM_API_VERSION;
|
||||
pub const API_VERSION: &str = NYM_API_VERSION;
|
||||
pub const MIXNODES: &str = "mixnodes";
|
||||
pub const GATEWAYS: &str = "gateways";
|
||||
pub const ALL: &str = "all";
|
||||
|
||||
pub const EPOCH: &str = "epoch";
|
||||
pub const CURRENT: &str = "current";
|
||||
|
||||
pub const DETAILED: &str = "detailed";
|
||||
pub const DETAILED_UNFILTERED: &str = "detailed-unfiltered";
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
[package]
|
||||
name = "nym-noise"
|
||||
version = "0.1.0"
|
||||
authors = ["Simon Wicky <simon@nymtech.net>"]
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
snow = "0.9.2"
|
||||
futures = "0.3"
|
||||
tokio = { version = "1.24.1", features = ["net","io-util","time"] }
|
||||
pin-project = "1"
|
||||
log = "0.4.19"
|
||||
sha2 = "0.10.7"
|
||||
bytes = "1.0"
|
||||
thiserror = "1.0.44"
|
||||
|
||||
# internal
|
||||
nym-topology = { path = "../topology"}
|
||||
@@ -0,0 +1,405 @@
|
||||
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use log::*;
|
||||
use nym_topology::NymTopology;
|
||||
use pin_project::pin_project;
|
||||
use sha2::{Digest, Sha256};
|
||||
use snow::error::Prerequisite;
|
||||
use snow::Builder;
|
||||
use snow::Error;
|
||||
use snow::HandshakeState;
|
||||
use snow::TransportState;
|
||||
use std::cmp::min;
|
||||
use std::collections::VecDeque;
|
||||
use std::io;
|
||||
use std::io::ErrorKind;
|
||||
use std::num::TryFromIntError;
|
||||
use std::pin::Pin;
|
||||
use std::task::Poll;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::ReadBuf;
|
||||
use tokio::{
|
||||
io::{AsyncRead, AsyncWrite},
|
||||
net::TcpStream,
|
||||
};
|
||||
|
||||
const NOISE_HS_PATTERN: &str = "Noise_XKpsk3_25519_AESGCM_SHA256";
|
||||
const MAXMSGLEN: usize = 65535;
|
||||
const TAGLEN: usize = 16;
|
||||
const HEADER_SIZE: usize = 2;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum NoiseError {
|
||||
#[error("encountered a Noise decryption error")]
|
||||
DecryptionError,
|
||||
|
||||
#[error("encountered a Noise Protocol error - {0}")]
|
||||
ProtocolError(Error),
|
||||
#[error("encountered an IO error - {0}")]
|
||||
IoError(#[from] io::Error),
|
||||
|
||||
#[error("Incorrect state")]
|
||||
IncorrectStateError,
|
||||
|
||||
#[error("Handshake timeout")]
|
||||
HandshakeTimeoutError(#[from] tokio::time::error::Elapsed),
|
||||
|
||||
#[error(transparent)]
|
||||
IntConversionError(#[from] TryFromIntError),
|
||||
}
|
||||
|
||||
impl From<Error> for NoiseError {
|
||||
fn from(err: Error) -> Self {
|
||||
match err {
|
||||
Error::Decrypt => NoiseError::DecryptionError,
|
||||
err => NoiseError::ProtocolError(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper around a TcpStream
|
||||
#[pin_project]
|
||||
pub struct NoiseStream {
|
||||
#[pin]
|
||||
inner_stream: TcpStream,
|
||||
handshake: Option<HandshakeState>,
|
||||
noise: Option<TransportState>,
|
||||
enc_storage: VecDeque<u8>,
|
||||
dec_storage: VecDeque<u8>,
|
||||
}
|
||||
|
||||
impl NoiseStream {
|
||||
fn new(inner_stream: TcpStream, handshake: HandshakeState) -> NoiseStream {
|
||||
NoiseStream {
|
||||
inner_stream,
|
||||
handshake: Some(handshake),
|
||||
noise: None,
|
||||
enc_storage: VecDeque::with_capacity(MAXMSGLEN + TAGLEN + HEADER_SIZE), //At least one message
|
||||
dec_storage: VecDeque::with_capacity(MAXMSGLEN),
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_handshake_msg(&mut self) -> Result<(), NoiseError> {
|
||||
let mut buf = vec![0u8; MAXMSGLEN];
|
||||
let len = match &mut self.handshake {
|
||||
Some(handshake_state) => handshake_state.write_message(&[], &mut buf)?,
|
||||
None => return Err(NoiseError::IncorrectStateError),
|
||||
};
|
||||
|
||||
self.inner_stream.write_u16(len.try_into()?).await?; //len is always < 2^16, so it shouldn't fail
|
||||
self.inner_stream.write_all(&buf[..len]).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn recv_handshake_msg(&mut self) -> Result<(), NoiseError> {
|
||||
let msg_len = self.inner_stream.read_u16().await?;
|
||||
let mut msg = vec![0u8; msg_len.into()];
|
||||
self.inner_stream.read_exact(&mut msg[..]).await?;
|
||||
|
||||
let mut buf = vec![0u8; MAXMSGLEN];
|
||||
match &mut self.handshake {
|
||||
Some(handshake_state) => handshake_state.read_message(&msg, &mut buf)?,
|
||||
None => return Err(NoiseError::IncorrectStateError),
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn into_transport_mode(mut self) -> Result<Self, NoiseError> {
|
||||
let Some(handshake) = self.handshake else {return Err(NoiseError::IncorrectStateError)};
|
||||
self.handshake = None;
|
||||
self.noise = Some(handshake.into_transport_mode()?);
|
||||
Ok(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for NoiseStream {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<std::io::Result<()>> {
|
||||
let projected_self = self.project();
|
||||
let enc_storage = projected_self.enc_storage;
|
||||
let ready_to_read = projected_self.inner_stream.poll_read_ready(cx);
|
||||
|
||||
match ready_to_read {
|
||||
Poll::Pending => {
|
||||
//no new data, waking is already scheduled.
|
||||
//Nothing new to decrypt, only check if we can return something from dec_storage, happens after
|
||||
}
|
||||
|
||||
Poll::Ready(Ok(())) => {
|
||||
//Read what we can into enc_storage, decrypt what we can into dec_storage
|
||||
let mut tcp_buf = vec![0u8; MAXMSGLEN + HEADER_SIZE + TAGLEN];
|
||||
if let Ok(tcp_len) = projected_self.inner_stream.try_read(&mut tcp_buf) {
|
||||
if tcp_len == 0 && projected_self.dec_storage.is_empty() {
|
||||
//EOF
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
enc_storage.extend(&tcp_buf[..tcp_len]);
|
||||
//we can at least read the length
|
||||
while enc_storage.len() >= HEADER_SIZE {
|
||||
let msg_len = ((enc_storage[0] as usize) << 8) + (enc_storage[1] as usize);
|
||||
|
||||
//no more messages to read
|
||||
if enc_storage.len() < HEADER_SIZE + msg_len {
|
||||
break;
|
||||
}
|
||||
//we have a full message to decrypt
|
||||
//remove size
|
||||
enc_storage.pop_front();
|
||||
enc_storage.pop_front();
|
||||
|
||||
let noise_msg = enc_storage.drain(..msg_len).collect::<Vec<u8>>();
|
||||
let mut dec_msg = vec![0u8; MAXMSGLEN];
|
||||
|
||||
let Ok(len) = (match projected_self.noise {
|
||||
Some(transport_state) => transport_state.read_message(&noise_msg, &mut dec_msg),
|
||||
None => return Poll::Ready(Err(ErrorKind::Other.into())),
|
||||
}) else {
|
||||
return Poll::Ready(Err(ErrorKind::InvalidInput.into()));
|
||||
};
|
||||
projected_self.dec_storage.extend(&dec_msg[..len]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//an error occured, let's return it right away
|
||||
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
|
||||
}
|
||||
|
||||
//check if we can return something
|
||||
let read_len = min(buf.remaining(), projected_self.dec_storage.len());
|
||||
if read_len > 0 {
|
||||
buf.put_slice(
|
||||
&projected_self
|
||||
.dec_storage
|
||||
.drain(..read_len)
|
||||
.collect::<Vec<u8>>(),
|
||||
);
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
|
||||
//can't return anything, schedule the wakeup and return pending
|
||||
if let Poll::Ready(Ok(())) = projected_self.inner_stream.poll_read_ready(cx) {
|
||||
//we got data in the meantime, we can wake up immediately
|
||||
cx.waker().wake_by_ref();
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
impl AsyncWrite for NoiseStream {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, std::io::Error>> {
|
||||
let projected_self = self.project();
|
||||
let mut noise_buf = vec![0u8; MAXMSGLEN];
|
||||
|
||||
let Ok(len) = (match projected_self.noise {
|
||||
Some(transport_state) => transport_state.write_message(buf, &mut noise_buf),
|
||||
None => return Poll::Ready(Err(ErrorKind::Other.into())),
|
||||
}) else {
|
||||
return Poll::Ready(Err(ErrorKind::InvalidInput.into()));
|
||||
};
|
||||
let to_send = [&[(len >> 8) as u8, (len & 0xff) as u8], &noise_buf[..len]].concat();
|
||||
|
||||
match projected_self.inner_stream.poll_write(cx, &to_send) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
|
||||
Poll::Ready(Ok(n)) => {
|
||||
//didn't send a thing, no problem for the underlying stream
|
||||
if n == 0 {
|
||||
return Poll::Ready(Ok(0));
|
||||
}
|
||||
//we sent the whole thing, no problem for the underlying stream
|
||||
//We must guarantee that the return number is <= buf.len()
|
||||
if n == to_send.len() {
|
||||
return Poll::Ready(Ok(n - HEADER_SIZE - TAGLEN));
|
||||
}
|
||||
//We didn't write the whole message, the stream will be corrupted
|
||||
error!(
|
||||
"Partial write on Noise Stream, it will be corrupted - {}",
|
||||
n
|
||||
);
|
||||
Poll::Ready(Err(ErrorKind::WriteZero.into()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Result<(), std::io::Error>> {
|
||||
self.project().inner_stream.poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_shutdown(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Result<(), std::io::Error>> {
|
||||
self.project().inner_stream.poll_shutdown(cx)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upgrade_noise_initiator(
|
||||
conn: TcpStream,
|
||||
local_public_key: Option<&[u8]>,
|
||||
local_private_key: &[u8],
|
||||
remote_pub_key: &[u8],
|
||||
epoch: u32,
|
||||
) -> Result<NoiseStream, NoiseError> {
|
||||
trace!("Perform Noise Handshake, initiator side");
|
||||
|
||||
//In case the local key cannot be known by the remote party, e.g. in a client-gateway connection
|
||||
let secret = [
|
||||
local_public_key.unwrap_or(&[]),
|
||||
remote_pub_key,
|
||||
&epoch.to_be_bytes(),
|
||||
]
|
||||
.concat();
|
||||
let secret_hash = Sha256::digest(secret);
|
||||
|
||||
let builder = Builder::new(NOISE_HS_PATTERN.parse().unwrap()); //This cannot fail, hardcoded pattern must be correct
|
||||
let handshake = builder
|
||||
.local_private_key(local_private_key)
|
||||
.remote_public_key(remote_pub_key)
|
||||
.psk(3, &secret_hash)
|
||||
.build_initiator()?;
|
||||
|
||||
let mut noise_stream = NoiseStream::new(conn, handshake);
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(5), async {
|
||||
// -> e, es
|
||||
noise_stream.send_handshake_msg().await?;
|
||||
|
||||
// <- e, ee
|
||||
noise_stream.recv_handshake_msg().await?;
|
||||
|
||||
// -> s, se, psk
|
||||
|
||||
noise_stream.send_handshake_msg().await
|
||||
})
|
||||
.await??;
|
||||
|
||||
noise_stream.into_transport_mode()
|
||||
}
|
||||
|
||||
pub async fn upgrade_noise_initiator_with_topology(
|
||||
conn: TcpStream,
|
||||
topology: &NymTopology,
|
||||
epoch: u32,
|
||||
local_public_key: &[u8],
|
||||
local_private_key: &[u8],
|
||||
) -> Result<NoiseStream, NoiseError> {
|
||||
//Get init material
|
||||
let responder_addr = match conn.peer_addr() {
|
||||
Ok(addr) => addr,
|
||||
Err(err) => {
|
||||
error!("Unable to extract peer address from connection - {err}");
|
||||
return Err(Error::Prereq(Prerequisite::RemotePublicKey).into());
|
||||
}
|
||||
};
|
||||
let remote_pub_key = match topology.find_node_key_by_mix_host(responder_addr) {
|
||||
Some(pub_key) => pub_key.to_bytes(),
|
||||
None => {
|
||||
error!(
|
||||
"Cannot find public key for node with address {:?}",
|
||||
responder_addr
|
||||
);
|
||||
return Err(Error::Prereq(Prerequisite::RemotePublicKey).into());
|
||||
}
|
||||
};
|
||||
|
||||
upgrade_noise_initiator(
|
||||
conn,
|
||||
Some(local_public_key),
|
||||
local_private_key,
|
||||
&remote_pub_key,
|
||||
epoch,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn upgrade_noise_responder(
|
||||
conn: TcpStream,
|
||||
local_public_key: &[u8],
|
||||
local_private_key: &[u8],
|
||||
remote_pub_key: Option<&[u8]>,
|
||||
epoch: u32,
|
||||
) -> Result<NoiseStream, NoiseError> {
|
||||
trace!("Perform Noise Handshake, responder side");
|
||||
|
||||
//If the remote_key cannot be kwnown, e.g. in a client-gateway connection
|
||||
let secret = [
|
||||
remote_pub_key.unwrap_or(&[]),
|
||||
local_public_key,
|
||||
&epoch.to_be_bytes(),
|
||||
]
|
||||
.concat();
|
||||
let secret_hash = Sha256::digest(secret);
|
||||
|
||||
let builder = Builder::new(NOISE_HS_PATTERN.parse().unwrap()); //This cannot fail, hardcoded pattern must be correct
|
||||
let handshake = builder
|
||||
.local_private_key(local_private_key)
|
||||
.psk(3, &secret_hash)
|
||||
.build_responder()?;
|
||||
|
||||
let mut noise_stream = NoiseStream::new(conn, handshake);
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(5), async {
|
||||
//Actual Handshake
|
||||
// <- e, es
|
||||
noise_stream.recv_handshake_msg().await?;
|
||||
|
||||
// -> e, ee
|
||||
noise_stream.send_handshake_msg().await?;
|
||||
|
||||
// <- s, se, psk
|
||||
noise_stream.recv_handshake_msg().await
|
||||
})
|
||||
.await??;
|
||||
|
||||
noise_stream.into_transport_mode()
|
||||
}
|
||||
|
||||
pub async fn upgrade_noise_responder_with_topology(
|
||||
conn: TcpStream,
|
||||
topology: &NymTopology,
|
||||
epoch: u32,
|
||||
local_public_key: &[u8],
|
||||
local_private_key: &[u8],
|
||||
) -> Result<NoiseStream, NoiseError> {
|
||||
//Get init material
|
||||
let initiator_addr = match conn.peer_addr() {
|
||||
Ok(addr) => addr,
|
||||
Err(err) => {
|
||||
error!("Unable to extract peer address from connection - {err}");
|
||||
return Err(Error::Prereq(Prerequisite::RemotePublicKey).into());
|
||||
}
|
||||
};
|
||||
let remote_pub_key = match topology.find_node_key_by_mix_host(initiator_addr) {
|
||||
Some(pub_key) => pub_key.to_bytes(),
|
||||
None => {
|
||||
error!(
|
||||
"Cannot find public key for node with address {:?}",
|
||||
initiator_addr
|
||||
);
|
||||
return Err(Error::Prereq(Prerequisite::RemotePublicKey).into());
|
||||
}
|
||||
};
|
||||
|
||||
upgrade_noise_responder(
|
||||
conn,
|
||||
local_public_key,
|
||||
local_private_key,
|
||||
Some(&remote_pub_key),
|
||||
epoch,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -67,7 +67,7 @@ impl Node {
|
||||
}
|
||||
|
||||
pub fn clients_address(&self) -> String {
|
||||
format!("ws://{}:{}", self.host, self.clients_port)
|
||||
format!("{}:{}", self.host, self.clients_port)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
use crate::filter::VersionFilterable;
|
||||
use log::warn;
|
||||
use nym_crypto::asymmetric::encryption;
|
||||
use nym_mixnet_contract_common::mixnode::MixNodeDetails;
|
||||
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, MixId};
|
||||
use nym_sphinx_addressing::nodes::NodeIdentity;
|
||||
@@ -71,11 +72,64 @@ pub type MixLayer = u8;
|
||||
pub struct NymTopology {
|
||||
mixes: BTreeMap<MixLayer, Vec<mix::Node>>,
|
||||
gateways: Vec<gateway::Node>,
|
||||
all_mixes: Vec<mix::Node>,
|
||||
all_gateways: Vec<gateway::Node>,
|
||||
}
|
||||
|
||||
impl NymTopology {
|
||||
pub fn new(mixes: BTreeMap<MixLayer, Vec<mix::Node>>, gateways: Vec<gateway::Node>) -> Self {
|
||||
NymTopology { mixes, gateways }
|
||||
NymTopology {
|
||||
mixes: mixes.clone(),
|
||||
gateways: gateways.clone(),
|
||||
all_mixes: mixes.values().flatten().cloned().collect(),
|
||||
all_gateways: gateways,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn empty() -> Self {
|
||||
NymTopology {
|
||||
mixes: BTreeMap::new(),
|
||||
gateways: Vec::new(),
|
||||
all_mixes: Vec::new(),
|
||||
all_gateways: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_all_mixes(mut self, all_mixes: Vec<MixNodeDetails>) -> Self {
|
||||
let mut mixes = Vec::new();
|
||||
for bond in all_mixes
|
||||
.into_iter()
|
||||
.map(|details| details.bond_information)
|
||||
{
|
||||
let mix_id = bond.mix_id;
|
||||
let mix_identity = bond.mix_node.identity_key.clone();
|
||||
|
||||
match bond.try_into() {
|
||||
Ok(mix) => mixes.push(mix),
|
||||
Err(err) => {
|
||||
warn!("Mix {} / {} is malformed - {err}", mix_id, mix_identity);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
self.all_mixes = mixes;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_all_gateways(mut self, all_gateways: Vec<GatewayBond>) -> Self {
|
||||
let mut gateways = Vec::with_capacity(all_gateways.len());
|
||||
for bond in all_gateways.into_iter() {
|
||||
let gate_id = bond.gateway.identity_key.clone();
|
||||
match bond.try_into() {
|
||||
Ok(gate) => gateways.push(gate),
|
||||
Err(err) => {
|
||||
warn!("Gateway {} is malformed - {err}", gate_id);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
self.all_gateways = gateways;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn from_detailed(
|
||||
@@ -107,6 +161,23 @@ impl NymTopology {
|
||||
None
|
||||
}
|
||||
|
||||
pub fn find_node_key_by_mix_host(
|
||||
&self,
|
||||
mix_host: SocketAddr,
|
||||
) -> Option<&encryption::PublicKey> {
|
||||
for node in self.all_gateways.iter() {
|
||||
if node.mix_host.ip() == mix_host.ip() {
|
||||
return Some(&node.sphinx_key);
|
||||
}
|
||||
}
|
||||
for node in self.all_mixes.iter() {
|
||||
if node.mix_host.ip() == mix_host.ip() {
|
||||
return Some(&node.sphinx_key);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub fn find_gateway(&self, gateway_identity: IdentityKeyRef) -> Option<&gateway::Node> {
|
||||
self.gateways
|
||||
.iter()
|
||||
@@ -314,6 +385,8 @@ impl NymTopology {
|
||||
NymTopology {
|
||||
mixes: self.mixes.filter_by_version(expected_mix_version),
|
||||
gateways: self.gateways.clone(),
|
||||
all_mixes: self.all_mixes.clone(),
|
||||
all_gateways: self.all_gateways.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+4
-1
@@ -60,7 +60,10 @@ nym-sphinx = { path = "../common/nymsphinx" }
|
||||
nym-statistics-common = { path = "../common/statistics" }
|
||||
nym-task = { path = "../common/task" }
|
||||
nym-types = { path = "../common/types" }
|
||||
nym-validator-client = { path = "../common/client-libs/validator-client" }
|
||||
nym-validator-client = { path = "../common/client-libs/validator-client"}
|
||||
nym-client-core = { path = "../common/client-core" }
|
||||
nym-topology = { path = "../common/topology" }
|
||||
nym-noise = { path = "../common/nymnoise" }
|
||||
|
||||
[build-dependencies]
|
||||
tokio = { version = "1.24.1", features = ["rt-multi-thread", "macros"] }
|
||||
|
||||
@@ -33,6 +33,9 @@ const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_milli
|
||||
const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500);
|
||||
const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 2000;
|
||||
|
||||
const DEFAULT_TOPOLOGY_REFRESH_RATE: Duration = Duration::from_secs(5 * 60); // every 5min
|
||||
const DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT: Duration = Duration::from_millis(5_000);
|
||||
|
||||
const DEFAULT_STORED_MESSAGE_FILENAME_LENGTH: u16 = 16;
|
||||
const DEFAULT_MESSAGE_RETRIEVAL_LIMIT: i64 = 100;
|
||||
|
||||
@@ -74,6 +77,9 @@ pub struct Config {
|
||||
|
||||
#[serde(default)]
|
||||
pub debug: Debug,
|
||||
|
||||
#[serde(default)]
|
||||
pub topology: Topology,
|
||||
}
|
||||
|
||||
impl NymConfigTemplate for Config {
|
||||
@@ -89,6 +95,7 @@ impl Config {
|
||||
storage_paths: GatewayPaths::new_default(id.as_ref()),
|
||||
logging: Default::default(),
|
||||
debug: Default::default(),
|
||||
topology: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -297,3 +304,33 @@ impl Default for Debug {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
|
||||
#[serde(default, deny_unknown_fields)]
|
||||
pub struct Topology {
|
||||
/// The uniform delay every which clients are querying the directory server
|
||||
/// to try to obtain a compatible network topology to send sphinx packets through.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub topology_refresh_rate: Duration,
|
||||
|
||||
/// During topology refresh, test packets are sent through every single possible network
|
||||
/// path. This timeout determines waiting period until it is decided that the packet
|
||||
/// did not reach its destination.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub topology_resolution_timeout: Duration,
|
||||
|
||||
/// Specifies whether the client should not refresh the network topology after obtaining
|
||||
/// the first valid instance.
|
||||
/// Supersedes `topology_refresh_rate_ms`.
|
||||
pub disable_refreshing: bool,
|
||||
}
|
||||
|
||||
impl Default for Topology {
|
||||
fn default() -> Self {
|
||||
Topology {
|
||||
topology_refresh_rate: DEFAULT_TOPOLOGY_REFRESH_RATE,
|
||||
topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT,
|
||||
disable_refreshing: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -69,6 +69,7 @@ impl From<ConfigV1_1_20> for Config {
|
||||
},
|
||||
logging: value.logging.into(),
|
||||
debug: value.debug.into(),
|
||||
topology: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,8 +6,10 @@ use crate::node::client_handling::websocket::connection_handler::coconut::Coconu
|
||||
use crate::node::client_handling::websocket::connection_handler::FreshHandler;
|
||||
use crate::node::storage::Storage;
|
||||
use log::*;
|
||||
use nym_crypto::asymmetric::identity;
|
||||
use nym_crypto::asymmetric::{encryption, identity};
|
||||
use nym_mixnet_client::forwarder::MixForwardingSender;
|
||||
use nym_noise::upgrade_noise_responder;
|
||||
use nym_validator_client::NymApiClient;
|
||||
use rand::rngs::OsRng;
|
||||
use std::net::SocketAddr;
|
||||
use std::process;
|
||||
@@ -17,22 +19,28 @@ use tokio::task::JoinHandle;
|
||||
pub(crate) struct Listener {
|
||||
address: SocketAddr,
|
||||
local_identity: Arc<identity::KeyPair>,
|
||||
local_sphinx: Arc<encryption::KeyPair>,
|
||||
only_coconut_credentials: bool,
|
||||
pub(crate) coconut_verifier: Arc<CoconutVerifier>,
|
||||
nym_api_client: NymApiClient,
|
||||
}
|
||||
|
||||
impl Listener {
|
||||
pub(crate) fn new(
|
||||
address: SocketAddr,
|
||||
local_identity: Arc<identity::KeyPair>,
|
||||
local_sphinx: Arc<encryption::KeyPair>,
|
||||
only_coconut_credentials: bool,
|
||||
coconut_verifier: Arc<CoconutVerifier>,
|
||||
nym_api_client: NymApiClient,
|
||||
) -> Self {
|
||||
Listener {
|
||||
address,
|
||||
local_identity,
|
||||
local_sphinx,
|
||||
only_coconut_credentials,
|
||||
coconut_verifier,
|
||||
nym_api_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,9 +76,32 @@ impl Listener {
|
||||
trace!("received a socket connection from {remote_addr}");
|
||||
// TODO: I think we *REALLY* need a mechanism for having a maximum number of connected
|
||||
// clients or spawned tokio tasks -> perhaps a worker system?
|
||||
|
||||
let current_epoch_id = match self.nym_api_client.get_current_epoch_id().await {
|
||||
Ok(epoch_id) => epoch_id,
|
||||
Err(err) => {
|
||||
error!("Failed to retrieve epoch Id for Noise handshake - {err}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
let noise_stream = match upgrade_noise_responder(
|
||||
socket,
|
||||
&self.local_sphinx.public_key().to_bytes(),
|
||||
&self.local_sphinx.private_key().to_bytes(),
|
||||
None, //connection from client, no remote pub key
|
||||
current_epoch_id,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(noise_stream) => noise_stream,
|
||||
Err(err) => {
|
||||
error!("Failed to perform Noise handshake with {remote_addr} - {err}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
let handle = FreshHandler::new(
|
||||
OsRng,
|
||||
socket,
|
||||
noise_stream,
|
||||
self.only_coconut_credentials,
|
||||
outbound_mix_sender.clone(),
|
||||
Arc::clone(&self.local_identity),
|
||||
|
||||
@@ -8,15 +8,20 @@ use crate::node::storage::error::StorageError;
|
||||
use crate::node::storage::Storage;
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
use nym_client_core::client::topology_control::accessor::TopologyAccessor;
|
||||
use nym_crypto::asymmetric::encryption;
|
||||
use nym_mixnet_client::forwarder::MixForwardingSender;
|
||||
use nym_mixnode_common::packet_processor::processor::ProcessedFinalHop;
|
||||
use nym_noise::upgrade_noise_responder_with_topology;
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use nym_sphinx::framing::codec::NymCodec;
|
||||
use nym_sphinx::framing::packet::FramedNymPacket;
|
||||
use nym_sphinx::DestinationAddressBytes;
|
||||
use nym_task::TaskClient;
|
||||
use nym_validator_client::NymApiClient;
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_util::codec::Framed;
|
||||
|
||||
@@ -31,6 +36,9 @@ pub(crate) struct ConnectionHandler<St: Storage> {
|
||||
active_clients_store: ActiveClientsStore,
|
||||
storage: St,
|
||||
ack_sender: MixForwardingSender,
|
||||
topology_access: TopologyAccessor,
|
||||
api_client: NymApiClient,
|
||||
local_identity: Arc<encryption::KeyPair>,
|
||||
}
|
||||
|
||||
impl<St: Storage + Clone> Clone for ConnectionHandler<St> {
|
||||
@@ -49,6 +57,9 @@ impl<St: Storage + Clone> Clone for ConnectionHandler<St> {
|
||||
active_clients_store: self.active_clients_store.clone(),
|
||||
storage: self.storage.clone(),
|
||||
ack_sender: self.ack_sender.clone(),
|
||||
topology_access: self.topology_access.clone(),
|
||||
api_client: self.api_client.clone(),
|
||||
local_identity: self.local_identity.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -59,6 +70,9 @@ impl<St: Storage> ConnectionHandler<St> {
|
||||
storage: St,
|
||||
ack_sender: MixForwardingSender,
|
||||
active_clients_store: ActiveClientsStore,
|
||||
topology_access: TopologyAccessor,
|
||||
api_client: NymApiClient,
|
||||
local_identity: Arc<encryption::KeyPair>,
|
||||
) -> Self {
|
||||
ConnectionHandler {
|
||||
packet_processor,
|
||||
@@ -66,6 +80,9 @@ impl<St: Storage> ConnectionHandler<St> {
|
||||
storage,
|
||||
active_clients_store,
|
||||
ack_sender,
|
||||
topology_access,
|
||||
api_client,
|
||||
local_identity,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -182,7 +199,40 @@ impl<St: Storage> ConnectionHandler<St> {
|
||||
) {
|
||||
debug!("Starting connection handler for {:?}", remote);
|
||||
shutdown.mark_as_success();
|
||||
let mut framed_conn = Framed::new(conn, NymCodec);
|
||||
|
||||
let topology_ref = match self.topology_access.current_topology().await {
|
||||
Some(topology) => topology,
|
||||
None => {
|
||||
error!("Cannot perform Noise handshake to {remote}, due to topology error");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let epoch_id = match self.api_client.get_current_epoch_id().await {
|
||||
Ok(id) => id,
|
||||
Err(err) => {
|
||||
error!("Cannot perform Noise handshake to {remote}, due to epoch id error - {err}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let noise_stream = match upgrade_noise_responder_with_topology(
|
||||
conn,
|
||||
&topology_ref,
|
||||
epoch_id,
|
||||
&self.local_identity.public_key().to_bytes(),
|
||||
&self.local_identity.private_key().to_bytes(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(noise_stream) => noise_stream,
|
||||
Err(err) => {
|
||||
error!("Failed to perform Noise handshake with {remote} - {err}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
debug!("Noise responder handshake completed for {:?}", remote);
|
||||
let mut framed_conn = Framed::new(noise_stream, NymCodec);
|
||||
while !shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
@@ -212,9 +262,6 @@ impl<St: Storage> ConnectionHandler<St> {
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"Closing connection from {:?}",
|
||||
framed_conn.into_inner().peer_addr()
|
||||
);
|
||||
info!("Closing connection from {:?}", remote);
|
||||
}
|
||||
}
|
||||
|
||||
+83
-3
@@ -2,7 +2,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use self::storage::PersistentStorage;
|
||||
use crate::config::Config;
|
||||
use crate::config::{Config, Topology};
|
||||
use crate::error::GatewayError;
|
||||
use crate::node::client_handling::active_clients::ActiveClientsStore;
|
||||
use crate::node::client_handling::websocket;
|
||||
@@ -12,17 +12,24 @@ use crate::node::statistics::collector::GatewayStatisticsCollector;
|
||||
use crate::node::storage::Storage;
|
||||
use log::*;
|
||||
use nym_bin_common::output_format::OutputFormat;
|
||||
use nym_client_core::client::topology_control::accessor::TopologyAccessor;
|
||||
use nym_client_core::client::topology_control::nym_api_provider::NymApiTopologyProvider;
|
||||
use nym_client_core::client::topology_control::TopologyRefresher;
|
||||
use nym_client_core::client::topology_control::TopologyRefresherConfig;
|
||||
use nym_crypto::asymmetric::{encryption, identity};
|
||||
use nym_mixnet_client::forwarder::{MixForwardingSender, PacketForwarder};
|
||||
use nym_network_defaults::NymNetworkDetails;
|
||||
use nym_statistics_common::collector::StatisticsSender;
|
||||
use nym_task::{TaskClient, TaskManager};
|
||||
use nym_topology::provider_trait::TopologyProvider;
|
||||
use nym_validator_client::NymApiClient;
|
||||
use nym_validator_client::{nyxd, DirectSigningHttpRpcNyxdClient};
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use std::error::Error;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use url::Url;
|
||||
|
||||
pub(crate) mod client_handling;
|
||||
pub(crate) mod mixnet_handling;
|
||||
@@ -124,6 +131,8 @@ impl<St> Gateway<St> {
|
||||
&self,
|
||||
ack_sender: MixForwardingSender,
|
||||
active_clients_store: ActiveClientsStore,
|
||||
topology_access: TopologyAccessor,
|
||||
api_client: NymApiClient,
|
||||
shutdown: TaskClient,
|
||||
) where
|
||||
St: Storage + Clone + 'static,
|
||||
@@ -138,6 +147,9 @@ impl<St> Gateway<St> {
|
||||
self.storage.clone(),
|
||||
ack_sender,
|
||||
active_clients_store,
|
||||
topology_access,
|
||||
api_client,
|
||||
Arc::clone(&self.sphinx_keypair),
|
||||
);
|
||||
|
||||
let listening_address = SocketAddr::new(
|
||||
@@ -167,8 +179,10 @@ impl<St> Gateway<St> {
|
||||
websocket::Listener::new(
|
||||
listening_address,
|
||||
Arc::clone(&self.identity_keypair),
|
||||
Arc::clone(&self.sphinx_keypair),
|
||||
self.config.gateway.only_coconut_credentials,
|
||||
coconut_verifier,
|
||||
self.random_api_client(),
|
||||
)
|
||||
.start(
|
||||
forwarding_channel,
|
||||
@@ -178,7 +192,12 @@ impl<St> Gateway<St> {
|
||||
);
|
||||
}
|
||||
|
||||
fn start_packet_forwarder(&self, shutdown: TaskClient) -> MixForwardingSender {
|
||||
fn start_packet_forwarder(
|
||||
&self,
|
||||
topology_access: TopologyAccessor,
|
||||
api_client: NymApiClient,
|
||||
shutdown: TaskClient,
|
||||
) -> MixForwardingSender {
|
||||
info!("Starting mix packet forwarder...");
|
||||
|
||||
let (mut packet_forwarder, packet_sender) = PacketForwarder::new(
|
||||
@@ -187,6 +206,9 @@ impl<St> Gateway<St> {
|
||||
self.config.debug.initial_connection_timeout,
|
||||
self.config.debug.maximum_connection_buffer_size,
|
||||
self.config.debug.use_legacy_framed_packet_version,
|
||||
topology_access,
|
||||
api_client,
|
||||
Arc::clone(&self.sphinx_keypair),
|
||||
shutdown,
|
||||
);
|
||||
|
||||
@@ -230,6 +252,47 @@ impl<St> Gateway<St> {
|
||||
.expect("Could not connect with mnemonic")
|
||||
}
|
||||
|
||||
fn setup_topology_provider(nym_api_urls: Vec<Url>) -> Box<dyn TopologyProvider + Send + Sync> {
|
||||
// if no custom provider was ... provided ..., create one using nym-api
|
||||
Box::new(NymApiTopologyProvider::new(
|
||||
nym_api_urls,
|
||||
env!("CARGO_PKG_VERSION").to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
// future responsible for periodically polling directory server and updating
|
||||
// the current global view of topology
|
||||
async fn start_topology_refresher(
|
||||
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
|
||||
topology_config: Topology,
|
||||
topology_accessor: TopologyAccessor,
|
||||
mut shutdown: TaskClient,
|
||||
) {
|
||||
let topology_refresher_config =
|
||||
TopologyRefresherConfig::new(topology_config.topology_refresh_rate);
|
||||
|
||||
let mut topology_refresher = TopologyRefresher::new(
|
||||
topology_refresher_config,
|
||||
topology_accessor,
|
||||
topology_provider,
|
||||
);
|
||||
// before returning, block entire runtime to refresh the current network view so that any
|
||||
// components depending on topology would see a non-empty view
|
||||
info!("Obtaining initial network topology");
|
||||
topology_refresher.try_refresh().await;
|
||||
|
||||
if topology_config.disable_refreshing {
|
||||
// if we're not spawning the refresher, don't cause shutdown immediately
|
||||
info!("The topology refesher is not going to be started");
|
||||
shutdown.mark_as_success();
|
||||
} else {
|
||||
// don't spawn the refresher if we don't want to be refreshing the topology.
|
||||
// only use the initial values obtained
|
||||
info!("Starting topology refresher...");
|
||||
topology_refresher.start_with_shutdown(shutdown);
|
||||
}
|
||||
}
|
||||
|
||||
async fn check_if_bonded(&self) -> Result<bool, GatewayError> {
|
||||
// TODO: if anything, this should be getting data directly from the contract
|
||||
// as opposed to the validator API
|
||||
@@ -263,13 +326,30 @@ impl<St> Gateway<St> {
|
||||
let nyxd_client = self.random_nyxd_client();
|
||||
CoconutVerifier::new(nyxd_client)
|
||||
};
|
||||
let random_api_client = self.random_api_client();
|
||||
let topology_provider = Self::setup_topology_provider(self.config.get_nym_api_endpoints());
|
||||
let shared_topology_access = TopologyAccessor::new();
|
||||
|
||||
let mix_forwarding_channel = self.start_packet_forwarder(shutdown.subscribe());
|
||||
Self::start_topology_refresher(
|
||||
topology_provider,
|
||||
self.config.topology,
|
||||
shared_topology_access.clone(),
|
||||
shutdown.subscribe(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let mix_forwarding_channel = self.start_packet_forwarder(
|
||||
shared_topology_access.clone(),
|
||||
random_api_client.clone(),
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
|
||||
let active_clients_store = ActiveClientsStore::new();
|
||||
self.start_mix_socket_listener(
|
||||
mix_forwarding_channel.clone(),
|
||||
active_clients_store.clone(),
|
||||
shared_topology_access,
|
||||
random_api_client,
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
|
||||
|
||||
@@ -56,9 +56,11 @@ nym-pemstore = { path = "../common/pemstore", version = "0.3.0" }
|
||||
nym-task = { path = "../common/task" }
|
||||
nym-types = { path = "../common/types" }
|
||||
nym-topology = { path = "../common/topology" }
|
||||
nym-client-core = { path = "../common/client-core/" }
|
||||
nym-validator-client = { path = "../common/client-libs/validator-client" }
|
||||
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
|
||||
cpu-cycles = { path = "../cpu-cycles", optional = true }
|
||||
nym-noise = { path = "../common/nymnoise" }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1.21.2", features = [
|
||||
|
||||
@@ -44,6 +44,9 @@ const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_milli
|
||||
const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500);
|
||||
const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 2000;
|
||||
|
||||
const DEFAULT_TOPOLOGY_REFRESH_RATE: Duration = Duration::from_secs(5 * 60); // every 5min
|
||||
const DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT: Duration = Duration::from_millis(5_000);
|
||||
|
||||
/// Derive default path to mixnodes's config directory.
|
||||
/// It should get resolved to `$HOME/.nym/mixnodes/<id>/config`
|
||||
pub fn default_config_directory<P: AsRef<Path>>(id: P) -> PathBuf {
|
||||
@@ -85,6 +88,9 @@ pub struct Config {
|
||||
|
||||
#[serde(default)]
|
||||
pub debug: Debug,
|
||||
|
||||
#[serde(default)]
|
||||
pub topology: Topology,
|
||||
}
|
||||
|
||||
impl NymConfigTemplate for Config {
|
||||
@@ -101,6 +107,7 @@ impl Config {
|
||||
verloc: Default::default(),
|
||||
logging: Default::default(),
|
||||
debug: Default::default(),
|
||||
topology: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -287,3 +294,33 @@ impl Default for Debug {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
|
||||
#[serde(default, deny_unknown_fields)]
|
||||
pub struct Topology {
|
||||
/// The uniform delay every which clients are querying the directory server
|
||||
/// to try to obtain a compatible network topology to send sphinx packets through.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub topology_refresh_rate: Duration,
|
||||
|
||||
/// During topology refresh, test packets are sent through every single possible network
|
||||
/// path. This timeout determines waiting period until it is decided that the packet
|
||||
/// did not reach its destination.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub topology_resolution_timeout: Duration,
|
||||
|
||||
/// Specifies whether the client should not refresh the network topology after obtaining
|
||||
/// the first valid instance.
|
||||
/// Supersedes `topology_refresh_rate_ms`.
|
||||
pub disable_refreshing: bool,
|
||||
}
|
||||
|
||||
impl Default for Topology {
|
||||
fn default() -> Self {
|
||||
Topology {
|
||||
topology_refresh_rate: DEFAULT_TOPOLOGY_REFRESH_RATE,
|
||||
topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT,
|
||||
disable_refreshing: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,6 +94,7 @@ impl From<ConfigV1_1_21> for Config {
|
||||
verloc: value.verloc.into(),
|
||||
logging: value.logging.into(),
|
||||
debug: value.debug.into(),
|
||||
topology: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,12 +7,17 @@ use crate::node::listener::connection_handler::packet_processing::{
|
||||
use crate::node::packet_delayforwarder::PacketDelayForwardSender;
|
||||
use crate::node::TaskClient;
|
||||
use futures::StreamExt;
|
||||
use nym_client_core::client::topology_control::accessor::TopologyAccessor;
|
||||
use nym_crypto::asymmetric::encryption;
|
||||
use nym_mixnode_common::measure;
|
||||
use nym_noise::upgrade_noise_responder_with_topology;
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use nym_sphinx::framing::codec::NymCodec;
|
||||
use nym_sphinx::framing::packet::FramedNymPacket;
|
||||
use nym_sphinx::Delay as SphinxDelay;
|
||||
use nym_validator_client::NymApiClient;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::codec::Framed;
|
||||
@@ -25,16 +30,25 @@ pub(crate) mod packet_processing;
|
||||
pub(crate) struct ConnectionHandler {
|
||||
packet_processor: PacketProcessor,
|
||||
delay_forwarding_channel: PacketDelayForwardSender,
|
||||
topology_access: TopologyAccessor,
|
||||
api_client: NymApiClient,
|
||||
local_identity: Arc<encryption::KeyPair>,
|
||||
}
|
||||
|
||||
impl ConnectionHandler {
|
||||
pub(crate) fn new(
|
||||
packet_processor: PacketProcessor,
|
||||
delay_forwarding_channel: PacketDelayForwardSender,
|
||||
topology_access: TopologyAccessor,
|
||||
api_client: NymApiClient,
|
||||
local_identity: Arc<encryption::KeyPair>,
|
||||
) -> Self {
|
||||
ConnectionHandler {
|
||||
packet_processor,
|
||||
delay_forwarding_channel,
|
||||
topology_access,
|
||||
api_client,
|
||||
local_identity,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,8 +99,42 @@ impl ConnectionHandler {
|
||||
mut shutdown: TaskClient,
|
||||
) {
|
||||
debug!("Starting connection handler for {:?}", remote);
|
||||
|
||||
shutdown.mark_as_success();
|
||||
let mut framed_conn = Framed::new(conn, NymCodec);
|
||||
|
||||
let topology_ref = match self.topology_access.current_topology().await {
|
||||
Some(topology) => topology,
|
||||
None => {
|
||||
error!("Cannot perform Noise handshake to {remote}, due to topology error");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let epoch_id = match self.api_client.get_current_epoch_id().await {
|
||||
Ok(id) => id,
|
||||
Err(err) => {
|
||||
error!("Cannot perform Noise handshake to {remote}, due to epoch id error - {err}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let noise_stream = match upgrade_noise_responder_with_topology(
|
||||
conn,
|
||||
&topology_ref,
|
||||
epoch_id,
|
||||
&self.local_identity.public_key().to_bytes(),
|
||||
&self.local_identity.private_key().to_bytes(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(noise_stream) => noise_stream,
|
||||
Err(err) => {
|
||||
error!("Failed to perform Noise handshake with {remote} - {err}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
debug!("Noise responder handshake completed for {:?}", remote);
|
||||
let mut framed_conn = Framed::new(noise_stream, NymCodec);
|
||||
while !shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
@@ -118,10 +166,7 @@ impl ConnectionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"Closing connection from {:?}",
|
||||
framed_conn.into_inner().peer_addr()
|
||||
);
|
||||
info!("Closing connection from {:?}", remote);
|
||||
log::trace!("ConnectionHandler: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
+87
-5
@@ -1,7 +1,7 @@
|
||||
// Copyright 2020-2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::config::{Config, Topology};
|
||||
use crate::node::http::{
|
||||
description::description,
|
||||
hardware::hardware,
|
||||
@@ -17,14 +17,21 @@ use crate::node::node_statistics::SharedNodeStats;
|
||||
use crate::node::packet_delayforwarder::{DelayForwarder, PacketDelayForwardSender};
|
||||
use nym_bin_common::output_format::OutputFormat;
|
||||
use nym_bin_common::version_checker::parse_version;
|
||||
use nym_client_core::client::topology_control::accessor::TopologyAccessor;
|
||||
use nym_client_core::client::topology_control::nym_api_provider::NymApiTopologyProvider;
|
||||
use nym_client_core::client::topology_control::TopologyRefresher;
|
||||
use nym_client_core::client::topology_control::TopologyRefresherConfig;
|
||||
use nym_crypto::asymmetric::{encryption, identity};
|
||||
use nym_mixnode_common::verloc::{self, AtomicVerlocResult, VerlocMeasurer};
|
||||
use nym_task::{TaskClient, TaskManager};
|
||||
use nym_topology::provider_trait::TopologyProvider;
|
||||
use nym_validator_client::NymApiClient;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use std::net::SocketAddr;
|
||||
use std::process;
|
||||
use std::sync::Arc;
|
||||
use url::Url;
|
||||
|
||||
#[cfg(feature = "cpucycles")]
|
||||
use tracing::{error, info, warn};
|
||||
@@ -146,6 +153,8 @@ impl MixNode {
|
||||
&self,
|
||||
node_stats_update_sender: node_statistics::UpdateSender,
|
||||
delay_forwarding_channel: PacketDelayForwardSender,
|
||||
topology_access: TopologyAccessor,
|
||||
api_client: NymApiClient,
|
||||
shutdown: TaskClient,
|
||||
) {
|
||||
info!("Starting socket listener...");
|
||||
@@ -153,7 +162,13 @@ impl MixNode {
|
||||
let packet_processor =
|
||||
PacketProcessor::new(self.sphinx_keypair.private_key(), node_stats_update_sender);
|
||||
|
||||
let connection_handler = ConnectionHandler::new(packet_processor, delay_forwarding_channel);
|
||||
let connection_handler = ConnectionHandler::new(
|
||||
packet_processor,
|
||||
delay_forwarding_channel,
|
||||
topology_access,
|
||||
api_client,
|
||||
Arc::clone(&self.sphinx_keypair),
|
||||
);
|
||||
|
||||
let listening_address = SocketAddr::new(
|
||||
self.config.mixnode.listening_address,
|
||||
@@ -166,6 +181,8 @@ impl MixNode {
|
||||
fn start_packet_delay_forwarder(
|
||||
&mut self,
|
||||
node_stats_update_sender: node_statistics::UpdateSender,
|
||||
topology_access: TopologyAccessor,
|
||||
api_client: NymApiClient,
|
||||
shutdown: TaskClient,
|
||||
) -> PacketDelayForwardSender {
|
||||
info!("Starting packet delay-forwarder...");
|
||||
@@ -179,7 +196,12 @@ impl MixNode {
|
||||
);
|
||||
|
||||
let mut packet_forwarder = DelayForwarder::new(
|
||||
nym_mixnet_client::Client::new(client_config),
|
||||
nym_mixnet_client::Client::new(
|
||||
client_config,
|
||||
topology_access,
|
||||
api_client,
|
||||
Arc::clone(&self.sphinx_keypair),
|
||||
),
|
||||
node_stats_update_sender,
|
||||
shutdown,
|
||||
);
|
||||
@@ -230,6 +252,47 @@ impl MixNode {
|
||||
atomic_verloc_results
|
||||
}
|
||||
|
||||
fn setup_topology_provider(nym_api_urls: Vec<Url>) -> Box<dyn TopologyProvider + Send + Sync> {
|
||||
// if no custom provider was ... provided ..., create one using nym-api
|
||||
Box::new(NymApiTopologyProvider::new(
|
||||
nym_api_urls,
|
||||
env!("CARGO_PKG_VERSION").to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
// future responsible for periodically polling directory server and updating
|
||||
// the current global view of topology
|
||||
async fn start_topology_refresher(
|
||||
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
|
||||
topology_config: Topology,
|
||||
topology_accessor: TopologyAccessor,
|
||||
mut shutdown: TaskClient,
|
||||
) {
|
||||
let topology_refresher_config =
|
||||
TopologyRefresherConfig::new(topology_config.topology_refresh_rate);
|
||||
|
||||
let mut topology_refresher = TopologyRefresher::new(
|
||||
topology_refresher_config,
|
||||
topology_accessor,
|
||||
topology_provider,
|
||||
);
|
||||
// before returning, block entire runtime to refresh the current network view so that any
|
||||
// components depending on topology would see a non-empty view
|
||||
info!("Obtaining initial network topology");
|
||||
topology_refresher.try_refresh().await;
|
||||
|
||||
if topology_config.disable_refreshing {
|
||||
// if we're not spawning the refresher, don't cause shutdown immediately
|
||||
info!("The topology refesher is not going to be started");
|
||||
shutdown.mark_as_success();
|
||||
} else {
|
||||
// don't spawn the refresher if we don't want to be refreshing the topology.
|
||||
// only use the initial values obtained
|
||||
info!("Starting topology refresher...");
|
||||
topology_refresher.start_with_shutdown(shutdown);
|
||||
}
|
||||
}
|
||||
|
||||
fn random_api_client(&self) -> nym_validator_client::NymApiClient {
|
||||
let endpoints = self.config.get_nym_api_endpoints();
|
||||
let nym_api = endpoints
|
||||
@@ -276,11 +339,30 @@ impl MixNode {
|
||||
|
||||
let (node_stats_pointer, node_stats_update_sender) =
|
||||
self.start_node_stats_controller(shutdown.subscribe());
|
||||
let delay_forwarding_channel = self
|
||||
.start_packet_delay_forwarder(node_stats_update_sender.clone(), shutdown.subscribe());
|
||||
|
||||
let topology_provider = Self::setup_topology_provider(self.config.get_nym_api_endpoints());
|
||||
let shared_topology_access = TopologyAccessor::new();
|
||||
Self::start_topology_refresher(
|
||||
topology_provider,
|
||||
self.config.topology,
|
||||
shared_topology_access.clone(),
|
||||
shutdown.subscribe(),
|
||||
)
|
||||
.await;
|
||||
let random_api_client = self.random_api_client();
|
||||
|
||||
let delay_forwarding_channel = self.start_packet_delay_forwarder(
|
||||
node_stats_update_sender.clone(),
|
||||
shared_topology_access.clone(),
|
||||
random_api_client.clone(),
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
|
||||
self.start_socket_listener(
|
||||
node_stats_update_sender,
|
||||
delay_forwarding_channel,
|
||||
shared_topology_access,
|
||||
random_api_client,
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
let atomic_verloc_results = self.start_verloc_measurements(shutdown.subscribe());
|
||||
|
||||
+10
-1
@@ -24,6 +24,7 @@ use nym_bin_common::logging::setup_logging;
|
||||
use nym_contract_cache::cache::NymContractCache;
|
||||
use nym_sphinx::receiver::SphinxMessageReceiver;
|
||||
use nym_task::TaskManager;
|
||||
use nym_validator_client::NymApiClient;
|
||||
use rand::rngs::OsRng;
|
||||
use std::error::Error;
|
||||
use support::{http, nyxd};
|
||||
@@ -157,12 +158,20 @@ async fn start_nym_api_tasks(
|
||||
|
||||
// if network monitor is enabled, the storage MUST BE available
|
||||
let storage = maybe_storage.unwrap();
|
||||
|
||||
let url = format!(
|
||||
"http://{}:{}",
|
||||
rocket.config().address,
|
||||
rocket.config().port
|
||||
)
|
||||
.parse()
|
||||
.unwrap();
|
||||
let nym_api_client = NymApiClient::new(url);
|
||||
network_monitor::start::<SphinxMessageReceiver>(
|
||||
&config.network_monitor,
|
||||
nym_contract_cache_state,
|
||||
storage,
|
||||
nyxd_client.clone(),
|
||||
nym_api_client,
|
||||
&shutdown,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -22,6 +22,7 @@ use nym_sphinx::acknowledgements::AckKey;
|
||||
use nym_sphinx::params::PacketType;
|
||||
use nym_sphinx::receiver::MessageReceiver;
|
||||
use nym_task::TaskManager;
|
||||
use nym_validator_client::NymApiClient;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub(crate) mod gateways_reader;
|
||||
@@ -36,10 +37,12 @@ pub(crate) fn setup<'a>(
|
||||
nym_contract_cache_state: &NymContractCache,
|
||||
storage: &NymApiStorage,
|
||||
nyxd_client: nyxd::Client,
|
||||
nym_api_client: NymApiClient,
|
||||
) -> NetworkMonitorBuilder<'a> {
|
||||
NetworkMonitorBuilder::new(
|
||||
config,
|
||||
nyxd_client,
|
||||
nym_api_client,
|
||||
storage.to_owned(),
|
||||
nym_contract_cache_state.to_owned(),
|
||||
)
|
||||
@@ -48,6 +51,7 @@ pub(crate) fn setup<'a>(
|
||||
pub(crate) struct NetworkMonitorBuilder<'a> {
|
||||
config: &'a config::NetworkMonitor,
|
||||
nyxd_client: nyxd::Client,
|
||||
nym_api_client: NymApiClient,
|
||||
node_status_storage: NymApiStorage,
|
||||
validator_cache: NymContractCache,
|
||||
}
|
||||
@@ -56,12 +60,14 @@ impl<'a> NetworkMonitorBuilder<'a> {
|
||||
pub(crate) fn new(
|
||||
config: &'a config::NetworkMonitor,
|
||||
nyxd_client: nyxd::Client,
|
||||
nym_api_client: NymApiClient,
|
||||
node_status_storage: NymApiStorage,
|
||||
validator_cache: NymContractCache,
|
||||
) -> Self {
|
||||
NetworkMonitorBuilder {
|
||||
config,
|
||||
nyxd_client,
|
||||
nym_api_client,
|
||||
node_status_storage,
|
||||
validator_cache,
|
||||
}
|
||||
@@ -105,8 +111,10 @@ impl<'a> NetworkMonitorBuilder<'a> {
|
||||
self.config,
|
||||
gateway_status_update_sender,
|
||||
Arc::clone(&identity_keypair),
|
||||
Arc::clone(&encryption_keypair),
|
||||
self.config.debug.gateway_sending_rate,
|
||||
bandwidth_controller,
|
||||
self.nym_api_client,
|
||||
self.config.debug.disabled_credentials_mode,
|
||||
);
|
||||
|
||||
@@ -177,18 +185,22 @@ fn new_packet_sender(
|
||||
config: &config::NetworkMonitor,
|
||||
gateways_status_updater: GatewayClientUpdateSender,
|
||||
local_identity: Arc<identity::KeyPair>,
|
||||
local_sphinx: Arc<encryption::KeyPair>,
|
||||
max_sending_rate: usize,
|
||||
bandwidth_controller: BandwidthController<nyxd::Client, PersistentStorage>,
|
||||
nym_api_client: nym_validator_client::NymApiClient,
|
||||
disabled_credentials_mode: bool,
|
||||
) -> PacketSender {
|
||||
PacketSender::new(
|
||||
gateways_status_updater,
|
||||
local_identity,
|
||||
local_sphinx,
|
||||
config.debug.gateway_response_timeout,
|
||||
config.debug.gateway_connection_timeout,
|
||||
config.debug.max_concurrent_gateway_clients,
|
||||
max_sending_rate,
|
||||
bandwidth_controller,
|
||||
nym_api_client,
|
||||
disabled_credentials_mode,
|
||||
)
|
||||
}
|
||||
@@ -221,9 +233,16 @@ pub(crate) async fn start<R: MessageReceiver + Send + 'static>(
|
||||
nym_contract_cache_state: &NymContractCache,
|
||||
storage: &NymApiStorage,
|
||||
nyxd_client: nyxd::Client,
|
||||
nym_api_client: NymApiClient,
|
||||
shutdown: &TaskManager,
|
||||
) {
|
||||
let monitor_builder = setup(config, nym_contract_cache_state, storage, nyxd_client);
|
||||
let monitor_builder = setup(
|
||||
config,
|
||||
nym_contract_cache_state,
|
||||
storage,
|
||||
nyxd_client,
|
||||
nym_api_client,
|
||||
);
|
||||
info!("Starting network monitor...");
|
||||
let runnables: NetworkMonitorRunnables<R> = monitor_builder.build().await;
|
||||
runnables.spawn_tasks(shutdown);
|
||||
|
||||
@@ -310,6 +310,7 @@ impl PacketPreparer {
|
||||
GatewayPackets::new(
|
||||
route.gateway_clients_address(),
|
||||
route.gateway_identity(),
|
||||
route.gateway_sphinx(),
|
||||
mix_packets,
|
||||
)
|
||||
}
|
||||
@@ -387,6 +388,7 @@ impl PacketPreparer {
|
||||
let route_ext = test_route.test_message_ext(test_nonce);
|
||||
let gateway_address = test_route.gateway_clients_address();
|
||||
let gateway_identity = test_route.gateway_identity();
|
||||
let gateway_sphinx = test_route.gateway_sphinx();
|
||||
|
||||
let mut mix_tester = self.ephemeral_mix_tester(test_route);
|
||||
|
||||
@@ -408,7 +410,9 @@ impl PacketPreparer {
|
||||
|
||||
let gateway_packets = all_gateway_packets
|
||||
.entry(gateway_identity.to_bytes())
|
||||
.or_insert_with(|| GatewayPackets::empty(gateway_address, gateway_identity));
|
||||
.or_insert_with(|| {
|
||||
GatewayPackets::empty(gateway_address, gateway_identity, gateway_sphinx)
|
||||
});
|
||||
gateway_packets.push_packets(mix_packets);
|
||||
|
||||
// and generate test packets for gateways (note the variable recipient)
|
||||
@@ -436,7 +440,9 @@ impl PacketPreparer {
|
||||
// or create a new one
|
||||
let gateway_packets = all_gateway_packets
|
||||
.entry(gateway_identity.to_bytes())
|
||||
.or_insert_with(|| GatewayPackets::empty(gateway_address, gateway_identity));
|
||||
.or_insert_with(|| {
|
||||
GatewayPackets::empty(gateway_address, gateway_identity, gateway_sphinx)
|
||||
});
|
||||
gateway_packets.push_packets(gateway_mix_packets);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,11 +15,13 @@ use log::{debug, info, trace, warn};
|
||||
use nym_bandwidth_controller::BandwidthController;
|
||||
use nym_config::defaults::REMAINING_BANDWIDTH_THRESHOLD;
|
||||
use nym_credential_storage::persistent_storage::PersistentStorage;
|
||||
use nym_crypto::asymmetric::encryption;
|
||||
use nym_crypto::asymmetric::identity::{self, PUBLIC_KEY_LENGTH};
|
||||
use nym_gateway_client::error::GatewayClientError;
|
||||
use nym_gateway_client::{AcknowledgementReceiver, GatewayClient, MixnetMessageReceiver};
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use nym_task::TaskClient;
|
||||
use nym_validator_client::NymApiClient;
|
||||
use pin_project::pin_project;
|
||||
use std::mem;
|
||||
use std::num::NonZeroUsize;
|
||||
@@ -38,6 +40,8 @@ pub(crate) struct GatewayPackets {
|
||||
/// Public key of the target gateway.
|
||||
pub(crate) pub_key: identity::PublicKey,
|
||||
|
||||
pub(crate) encryption_key: encryption::PublicKey,
|
||||
|
||||
/// All the packets that are going to get sent to the gateway.
|
||||
pub(crate) packets: Vec<MixPacket>,
|
||||
}
|
||||
@@ -46,19 +50,26 @@ impl GatewayPackets {
|
||||
pub(crate) fn new(
|
||||
clients_address: String,
|
||||
pub_key: identity::PublicKey,
|
||||
encryption_key: encryption::PublicKey,
|
||||
packets: Vec<MixPacket>,
|
||||
) -> Self {
|
||||
GatewayPackets {
|
||||
clients_address,
|
||||
pub_key,
|
||||
encryption_key,
|
||||
packets,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn empty(clients_address: String, pub_key: identity::PublicKey) -> Self {
|
||||
pub(crate) fn empty(
|
||||
clients_address: String,
|
||||
pub_key: identity::PublicKey,
|
||||
encryption_key: encryption::PublicKey,
|
||||
) -> Self {
|
||||
GatewayPackets {
|
||||
clients_address,
|
||||
pub_key,
|
||||
encryption_key,
|
||||
packets: Vec::new(),
|
||||
}
|
||||
}
|
||||
@@ -79,6 +90,7 @@ impl GatewayPackets {
|
||||
struct FreshGatewayClientData {
|
||||
gateways_status_updater: GatewayClientUpdateSender,
|
||||
local_identity: Arc<identity::KeyPair>,
|
||||
local_sphinx: Arc<encryption::KeyPair>,
|
||||
gateway_response_timeout: Duration,
|
||||
bandwidth_controller: BandwidthController<nyxd::Client, PersistentStorage>,
|
||||
disabled_credentials_mode: bool,
|
||||
@@ -122,6 +134,7 @@ pub(crate) struct PacketSender {
|
||||
active_gateway_clients: ActiveGatewayClients,
|
||||
|
||||
fresh_gateway_client_data: Arc<FreshGatewayClientData>,
|
||||
nym_api_client: NymApiClient,
|
||||
gateway_connection_timeout: Duration,
|
||||
max_concurrent_clients: usize,
|
||||
max_sending_rate: usize,
|
||||
@@ -134,11 +147,13 @@ impl PacketSender {
|
||||
pub(crate) fn new(
|
||||
gateways_status_updater: GatewayClientUpdateSender,
|
||||
local_identity: Arc<identity::KeyPair>,
|
||||
local_sphinx: Arc<encryption::KeyPair>,
|
||||
gateway_response_timeout: Duration,
|
||||
gateway_connection_timeout: Duration,
|
||||
max_concurrent_clients: usize,
|
||||
max_sending_rate: usize,
|
||||
bandwidth_controller: BandwidthController<nyxd::Client, PersistentStorage>,
|
||||
nym_api_client: NymApiClient,
|
||||
disabled_credentials_mode: bool,
|
||||
) -> Self {
|
||||
PacketSender {
|
||||
@@ -146,10 +161,12 @@ impl PacketSender {
|
||||
fresh_gateway_client_data: Arc::new(FreshGatewayClientData {
|
||||
gateways_status_updater,
|
||||
local_identity,
|
||||
local_sphinx,
|
||||
gateway_response_timeout,
|
||||
bandwidth_controller,
|
||||
disabled_credentials_mode,
|
||||
}),
|
||||
nym_api_client,
|
||||
gateway_connection_timeout,
|
||||
max_concurrent_clients,
|
||||
max_sending_rate,
|
||||
@@ -171,7 +188,9 @@ impl PacketSender {
|
||||
fn new_gateway_client_handle(
|
||||
address: String,
|
||||
identity: identity::PublicKey,
|
||||
sphinx: encryption::PublicKey,
|
||||
fresh_gateway_client_data: &FreshGatewayClientData,
|
||||
nym_api_client: NymApiClient,
|
||||
) -> (
|
||||
GatewayClientHandle,
|
||||
(MixnetMessageReceiver, AcknowledgementReceiver),
|
||||
@@ -187,12 +206,15 @@ impl PacketSender {
|
||||
let mut gateway_client = GatewayClient::new(
|
||||
address,
|
||||
Arc::clone(&fresh_gateway_client_data.local_identity),
|
||||
Arc::clone(&fresh_gateway_client_data.local_sphinx),
|
||||
identity,
|
||||
sphinx,
|
||||
None,
|
||||
message_sender,
|
||||
ack_sender,
|
||||
fresh_gateway_client_data.gateway_response_timeout,
|
||||
Some(fresh_gateway_client_data.bandwidth_controller.clone()),
|
||||
nym_api_client,
|
||||
nym_task::TaskClient::dummy(),
|
||||
);
|
||||
|
||||
@@ -267,14 +289,21 @@ impl PacketSender {
|
||||
async fn create_new_gateway_client_handle_and_authenticate(
|
||||
address: String,
|
||||
identity: identity::PublicKey,
|
||||
sphinx: encryption::PublicKey,
|
||||
fresh_gateway_client_data: &FreshGatewayClientData,
|
||||
nym_api_client: NymApiClient,
|
||||
gateway_connection_timeout: Duration,
|
||||
) -> Option<(
|
||||
GatewayClientHandle,
|
||||
(MixnetMessageReceiver, AcknowledgementReceiver),
|
||||
)> {
|
||||
let (new_client, (message_receiver, ack_receiver)) =
|
||||
Self::new_gateway_client_handle(address, identity, fresh_gateway_client_data);
|
||||
let (new_client, (message_receiver, ack_receiver)) = Self::new_gateway_client_handle(
|
||||
address,
|
||||
identity,
|
||||
sphinx,
|
||||
fresh_gateway_client_data,
|
||||
nym_api_client,
|
||||
);
|
||||
|
||||
// Put this in timeout in case the gateway has incorrectly set their ulimit and our connection
|
||||
// gets stuck in their TCP queue and just hangs on our end but does not terminate
|
||||
@@ -332,6 +361,7 @@ impl PacketSender {
|
||||
gateway_connection_timeout: Duration,
|
||||
packets: GatewayPackets,
|
||||
fresh_gateway_client_data: Arc<FreshGatewayClientData>,
|
||||
nym_api_client: NymApiClient,
|
||||
client: Option<GatewayClientHandle>,
|
||||
max_sending_rate: usize,
|
||||
) -> Option<GatewayClientHandle> {
|
||||
@@ -351,7 +381,9 @@ impl PacketSender {
|
||||
Self::create_new_gateway_client_handle_and_authenticate(
|
||||
packets.clients_address,
|
||||
packets.pub_key,
|
||||
packets.encryption_key,
|
||||
&fresh_gateway_client_data,
|
||||
nym_api_client,
|
||||
gateway_connection_timeout,
|
||||
)
|
||||
.await?;
|
||||
@@ -479,6 +511,7 @@ impl PacketSender {
|
||||
// we're not interacting with right now)
|
||||
drop(guard);
|
||||
|
||||
let nym_api_client = &self.nym_api_client;
|
||||
// can't chain it all nicely together as there's no adapter method defined on Stream directly
|
||||
// for ForEachConcurrentClientUse
|
||||
let used_clients = ForEachConcurrentClientUse::new(
|
||||
@@ -489,6 +522,7 @@ impl PacketSender {
|
||||
gateway_connection_timeout,
|
||||
packets,
|
||||
fresh_data,
|
||||
nym_api_client.clone(),
|
||||
client,
|
||||
max_sending_rate,
|
||||
)
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
use crate::network_monitor::test_packet::NymApiTestMessageExt;
|
||||
use crate::network_monitor::ROUTE_TESTING_TEST_NONCE;
|
||||
use nym_crypto::asymmetric::identity;
|
||||
use nym_crypto::asymmetric::{encryption, identity};
|
||||
use nym_topology::{gateway, mix, NymTopology};
|
||||
use std::fmt::{Debug, Formatter};
|
||||
|
||||
@@ -63,6 +63,10 @@ impl TestRoute {
|
||||
self.gateway().identity_key
|
||||
}
|
||||
|
||||
pub(crate) fn gateway_sphinx(&self) -> encryption::PublicKey {
|
||||
self.gateway().sphinx_key
|
||||
}
|
||||
|
||||
pub(crate) fn topology(&self) -> &NymTopology {
|
||||
&self.nodes
|
||||
}
|
||||
|
||||
@@ -17,8 +17,10 @@ pub(crate) mod routes;
|
||||
pub(crate) fn nym_contract_cache_routes(settings: &OpenApiSettings) -> (Vec<Route>, OpenApi) {
|
||||
openapi_get_routes_spec![
|
||||
settings: routes::get_mixnodes,
|
||||
routes::get_mixnodes_all,
|
||||
routes::get_mixnodes_detailed,
|
||||
routes::get_gateways,
|
||||
routes::get_gateways_all,
|
||||
routes::get_active_set,
|
||||
routes::get_active_set_detailed,
|
||||
routes::get_rewarded_set,
|
||||
|
||||
@@ -25,6 +25,12 @@ pub async fn get_mixnodes(cache: &State<NymContractCache>) -> Json<Vec<MixNodeDe
|
||||
Json(cache.mixnodes_filtered().await)
|
||||
}
|
||||
|
||||
#[openapi(tag = "contract-cache")]
|
||||
#[get("/mixnodes/all")]
|
||||
pub async fn get_mixnodes_all(cache: &State<NymContractCache>) -> Json<Vec<MixNodeDetails>> {
|
||||
Json(cache.mixnodes_all().await)
|
||||
}
|
||||
|
||||
// DEPRECATED: this endpoint now lives in `node_status_api`. Once all consumers are updated,
|
||||
// replace this with
|
||||
// ```
|
||||
@@ -46,6 +52,12 @@ pub async fn get_gateways(cache: &State<NymContractCache>) -> Json<Vec<GatewayBo
|
||||
Json(cache.gateways_filtered().await)
|
||||
}
|
||||
|
||||
#[openapi(tag = "contract-cache")]
|
||||
#[get("/gateways/all")]
|
||||
pub async fn get_gateways_all(cache: &State<NymContractCache>) -> Json<Vec<GatewayBond>> {
|
||||
Json(cache.gateways_all().await)
|
||||
}
|
||||
|
||||
#[openapi(tag = "contract-cache")]
|
||||
#[get("/mixnodes/rewarded")]
|
||||
pub async fn get_rewarded_set(cache: &State<NymContractCache>) -> Json<Vec<MixNodeDetails>> {
|
||||
|
||||
Reference in New Issue
Block a user