// Copyright 2022-2023 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 use crate::error::ClientCoreError; use crate::init::types::RegistrationResult; use futures::{SinkExt, StreamExt}; use nym_crypto::asymmetric::ed25519; use nym_gateway_client::GatewayClient; use nym_gateway_client::client::GatewayListeners; use nym_topology::node::RoutingNode; use nym_validator_client::UserAgent; use nym_validator_client::client::{IdentityKeyRef, NymApiClientExt}; use nym_validator_client::nym_nodes::SkimmedNodesWithMetadata; use rand::{Rng, seq::SliceRandom}; #[cfg(unix)] use std::os::fd::RawFd; use std::{sync::Arc, time::Duration}; use tracing::{debug, info, trace, warn}; use tungstenite::Message; use url::Url; #[cfg(not(target_arch = "wasm32"))] use crate::init::websockets::connect_async; use nym_topology::NodeId; #[cfg(target_arch = "wasm32")] use nym_wasm_utils::websocket::JSWebsocket; #[cfg(not(target_arch = "wasm32"))] use tokio::net::TcpStream; #[cfg(not(target_arch = "wasm32"))] use tokio::time::Instant; #[cfg(not(target_arch = "wasm32"))] use tokio::time::sleep; #[cfg(not(target_arch = "wasm32"))] use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; #[cfg(target_arch = "wasm32")] use wasmtimer::std::Instant; #[cfg(target_arch = "wasm32")] use wasmtimer::tokio::sleep; #[cfg(not(target_arch = "wasm32"))] type WsConn = WebSocketStream>; #[cfg(target_arch = "wasm32")] type WsConn = JSWebsocket; const CONCURRENT_GATEWAYS_MEASURED: usize = 20; const MEASUREMENTS: usize = 3; const DEFAULT_NYM_API_RETRIES: usize = 3; #[cfg(not(target_arch = "wasm32"))] const CONN_TIMEOUT: Duration = Duration::from_millis(1500); const PING_TIMEOUT: Duration = Duration::from_millis(1000); // The abstraction that some of these helpers use pub trait ConnectableGateway { fn node_id(&self) -> NodeId; fn identity(&self) -> ed25519::PublicKey; fn clients_address(&self, prefer_ipv6: bool) -> Option; fn is_wss(&self) -> bool; } impl ConnectableGateway for RoutingNode { fn node_id(&self) -> NodeId { self.node_id } fn identity(&self) -> ed25519::PublicKey { self.identity_key } fn clients_address(&self, prefer_ipv6: bool) -> Option { self.ws_entry_address(prefer_ipv6) } fn is_wss(&self) -> bool { self.entry .as_ref() .map(|e| e.clients_wss_port.is_some()) .unwrap_or_default() } } struct GatewayWithLatency<'a, G: ConnectableGateway> { gateway: &'a G, latency: Duration, } // Helper to collect all pages of entry nodes - replicates NymApiClient's convenience method async fn get_all_basic_entry_nodes_with_metadata( client: &nym_http_api_client::Client, use_bincode: bool, ) -> Result { // Get first page to obtain metadata let mut page = 0; let res = client .get_basic_entry_assigned_nodes_v2(false, Some(page), None, use_bincode) .await?; let mut nodes = res.nodes.data; let metadata = res.metadata; if res.nodes.pagination.total == nodes.len() { return Ok(SkimmedNodesWithMetadata::new(nodes, metadata)); } page += 1; // Collect remaining pages loop { let mut res = client .get_basic_entry_assigned_nodes_v2(false, Some(page), None, use_bincode) .await?; if !metadata.consistency_check(&res.metadata) { return Err(ClientCoreError::ValidatorClientError( nym_validator_client::ValidatorClientError::InconsistentPagedMetadata, )); } nodes.append(&mut res.nodes.data); if nodes.len() < res.nodes.pagination.total { page += 1 } else { break; } } Ok(SkimmedNodesWithMetadata::new(nodes, metadata)) } impl<'a, G: ConnectableGateway> GatewayWithLatency<'a, G> { fn new(gateway: &'a G, latency: Duration) -> Self { GatewayWithLatency { gateway, latency } } } pub async fn gateways_for_init( nym_apis: &[Url], user_agent: Option, minimum_performance: u8, ignore_epoch_roles: bool, retry_count: Option, ) -> Result, ClientCoreError> { // Build client with ALL URLs for fallback support let nym_api_urls: Vec = nym_apis .iter() .map(|url| nym_http_api_client::Url::from(url.clone())) .collect(); if nym_api_urls.is_empty() { return Err(ClientCoreError::ListOfNymApisIsEmpty); } let retry_count = retry_count.unwrap_or(DEFAULT_NYM_API_RETRIES); let mut builder = nym_http_api_client::ClientBuilder::new_with_urls(nym_api_urls.clone())? .with_retries(retry_count) .with_bincode(); if let Some(user_agent) = user_agent { builder = builder.with_user_agent(user_agent); } let client = builder.build().map_err(|e| { ClientCoreError::ValidatorClientError(nym_validator_client::ValidatorClientError::from(e)) })?; tracing::debug!("Fetching list of gateways from: {:?}", nym_api_urls); // Use our helper to handle pagination let gateways = get_all_basic_entry_nodes_with_metadata(&client, true) .await? .nodes; info!("nym api reports {} gateways", gateways.len()); tracing::trace!("Gateways: {gateways:#?}"); // filter out gateways below minimum performance and ones that could operate as a mixnode // (we don't want instability) let valid_gateways: Vec = gateways .iter() .filter(|g| ignore_epoch_roles || !g.supported_roles.mixnode) .filter(|g| g.performance.round_to_integer() >= minimum_performance) .filter_map(|gateway| gateway.try_into().ok()) .collect(); tracing::info!( "Found {} valid gateways after filtering", valid_gateways.len() ); Ok(valid_gateways) } #[cfg(not(target_arch = "wasm32"))] async fn connect(endpoint: &str) -> Result { match tokio::time::timeout(CONN_TIMEOUT, connect_async(endpoint)).await { Err(_elapsed) => Err(ClientCoreError::GatewayConnectionTimeout), Ok(Err(conn_failure)) => Err(conn_failure), Ok(Ok((stream, _))) => Ok(stream), } } #[cfg(target_arch = "wasm32")] async fn connect(endpoint: &str) -> Result { JSWebsocket::new(endpoint).map_err(|_| ClientCoreError::GatewayJsConnectionFailure) } async fn measure_latency(gateway: &G) -> Result, ClientCoreError> where G: ConnectableGateway, { let Some(addr) = gateway.clients_address(false) else { return Err(ClientCoreError::UnsupportedEntry { id: gateway.node_id(), identity: gateway.identity().to_string(), }); }; trace!( "establishing connection to {} ({addr})...", gateway.identity(), ); let mut stream = connect(&addr).await?; let mut results = Vec::new(); for _ in 0..MEASUREMENTS { let measurement_future = async { let ping_content = vec![1, 2, 3]; let start = Instant::now(); stream.send(Message::Ping(ping_content.clone())).await?; match stream.next().await { Some(Ok(Message::Pong(content))) => { if content == ping_content { let elapsed = Instant::now().duration_since(start); trace!("current ping time: {elapsed:?}"); results.push(elapsed); } else { warn!("received a pong message with different content? wtf.") } } Some(Ok(_)) => warn!("received a message that's not a pong!"), Some(Err(err)) => return Err(err.into()), None => return Err(ClientCoreError::GatewayConnectionAbruptlyClosed), } Ok::<(), ClientCoreError>(()) }; let timeout = sleep(PING_TIMEOUT); tokio::pin!(timeout); tokio::select! { _ = &mut timeout => { warn!("timed out while trying to perform measurement...") } res = measurement_future => res?, } } let count = results.len() as u64; if count == 0 { return Err(ClientCoreError::NoGatewayMeasurements { identity: gateway.identity().to_base58_string(), }); } let sum: Duration = results.into_iter().sum(); let avg = Duration::from_nanos(sum.as_nanos() as u64 / count); Ok(GatewayWithLatency::new(gateway, avg)) } pub async fn choose_gateway_by_latency( rng: &mut R, gateways: &[G], must_use_tls: bool, ) -> Result { let gateways = filter_by_tls(gateways, must_use_tls)?; info!( "choosing gateway by latency, pinging {} gateways ...", gateways.len() ); let gateways_with_latency = Arc::new(tokio::sync::Mutex::new(Vec::new())); futures::stream::iter(gateways) .for_each_concurrent(CONCURRENT_GATEWAYS_MEASURED, |gateway| async { let id = gateway.identity(); trace!("measuring latency to {id}..."); match measure_latency(gateway).await { Ok(with_latency) => { debug!("{id}: {:?}", with_latency.latency); gateways_with_latency.lock().await.push(with_latency); } Err(err) => { warn!("failed to measure {id}: {err}"); } }; }) .await; let gateways_with_latency = gateways_with_latency.lock().await; let chosen = gateways_with_latency .choose_weighted(rng, |item| 1. / item.latency.as_secs_f32()) .map_err(|source| ClientCoreError::GatewaySelectionFailure { source })?; info!( "chose gateway {} with average latency of {:?}", chosen.gateway.identity(), chosen.latency ); Ok(chosen.gateway.clone()) } fn filter_by_tls( gateways: &[G], must_use_tls: bool, ) -> Result, ClientCoreError> { if must_use_tls { let filtered = gateways.iter().filter(|g| g.is_wss()).collect::>(); if filtered.is_empty() { return Err(ClientCoreError::NoWssGateways); } Ok(filtered) } else { Ok(gateways.iter().collect()) } } pub(super) fn uniformly_random_gateway( rng: &mut R, gateways: &[RoutingNode], must_use_tls: bool, ) -> Result { filter_by_tls(gateways, must_use_tls)? .choose(rng) .ok_or(ClientCoreError::NoGatewaysOnNetwork) .map(|&r| r.clone()) } pub(super) fn get_specified_gateway( gateway_identity: IdentityKeyRef, gateways: &[RoutingNode], must_use_tls: bool, ) -> Result { tracing::debug!("Requesting specified gateway: {gateway_identity}"); let user_gateway = ed25519::PublicKey::from_base58_string(gateway_identity) .map_err(ClientCoreError::UnableToCreatePublicKeyFromGatewayId)?; let gateway = gateways .iter() .find(|gateway| gateway.identity_key == user_gateway) .ok_or_else(|| { tracing::debug!( "Gateway {gateway_identity} not found in {} available gateways", gateways.len() ); ClientCoreError::NoGatewayWithId(gateway_identity.to_string()) })?; let Some(entry_details) = gateway.entry.as_ref() else { return Err(ClientCoreError::UnsupportedEntry { id: gateway.node_id, identity: gateway.identity().to_string(), }); }; if must_use_tls && entry_details.clients_wss_port.is_none() { return Err(ClientCoreError::UnsupportedWssProtocol { gateway: gateway_identity.to_string(), }); } Ok(gateway.clone()) } pub(super) async fn register_with_gateway( gateway_id: ed25519::PublicKey, gateway_listeners: GatewayListeners, our_identity: Arc, #[cfg(unix)] connection_fd_callback: Option>, ) -> Result { let mut gateway_client = GatewayClient::new_init( gateway_listeners, gateway_id, our_identity.clone(), #[cfg(unix)] connection_fd_callback, ); gateway_client.establish_connection().await.map_err(|err| { tracing::warn!("Failed to establish connection with gateway!"); ClientCoreError::GatewayClientError { gateway_id: gateway_id.to_base58_string(), source: Box::new(err), } })?; let auth_response = gateway_client .perform_initial_authentication() .await .map_err(|err| { tracing::warn!("Failed to register with the gateway {gateway_id}: {err}"); ClientCoreError::GatewayClientError { gateway_id: gateway_id.to_base58_string(), source: Box::new(err), } })?; Ok(RegistrationResult { shared_keys: auth_response.initial_shared_key, authenticated_ephemeral_client: gateway_client, }) } #[cfg(test)] mod tests { use url::Url; #[test] fn test_single_url_builds_without_retries() { let urls = [Url::parse("https://api.nym.com").unwrap()]; let nym_api_urls: Vec = urls .iter() .map(|url| nym_http_api_client::Url::from(url.clone())) .collect(); assert_eq!(nym_api_urls.len(), 1, "Should have exactly one URL"); } #[test] fn test_multiple_urls_prepared_for_retries() { let urls = [ Url::parse("https://api1.nym.com").unwrap(), Url::parse("https://api2.nym.com").unwrap(), Url::parse("https://api3.nym.com").unwrap(), ]; let nym_api_urls: Vec = urls .iter() .map(|url| nym_http_api_client::Url::from(url.clone())) .collect(); assert_eq!(nym_api_urls.len(), 3, "Should have all three URLs"); assert!( nym_api_urls.len() > 1, "Multiple URLs trigger retry behavior" ); } #[test] fn test_empty_url_list_is_detected() { let urls: Vec = vec![]; let nym_api_urls: Vec = urls .iter() .map(|url| nym_http_api_client::Url::from(url.clone())) .collect(); assert!(nym_api_urls.is_empty(), "Empty list should remain empty"); } }