Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b90a75266f | |||
| 26b537e256 | |||
| 17b9fa4dd5 |
Generated
+3
-1
@@ -5254,7 +5254,9 @@ dependencies = [
|
||||
"cosmrs",
|
||||
"nym-crypto",
|
||||
"nym-gateway-requests",
|
||||
"nym-topology",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sqlx",
|
||||
"thiserror 2.0.12",
|
||||
"time",
|
||||
@@ -5866,12 +5868,12 @@ dependencies = [
|
||||
"nym-credentials-interface",
|
||||
"nym-crypto",
|
||||
"nym-gateway-requests",
|
||||
"nym-http-api-client",
|
||||
"nym-network-defaults",
|
||||
"nym-pemstore",
|
||||
"nym-sphinx",
|
||||
"nym-statistics-common",
|
||||
"nym-task",
|
||||
"nym-topology",
|
||||
"nym-validator-client",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
|
||||
+1
-1
@@ -264,7 +264,7 @@ generic-array = "0.14.7"
|
||||
getrandom = "0.2.10"
|
||||
handlebars = "3.5.5"
|
||||
hex = "0.4.3"
|
||||
hickory-resolver = "0.25"
|
||||
hickory-resolver = "0.25.2"
|
||||
hkdf = "0.12.3"
|
||||
hmac = "0.12.1"
|
||||
http = "1"
|
||||
|
||||
@@ -11,6 +11,7 @@ rust-version.workspace = true
|
||||
async-trait.workspace = true
|
||||
cosmrs.workspace = true
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json.workspace = true
|
||||
thiserror.workspace = true
|
||||
time.workspace = true
|
||||
tokio = { workspace = true, features = ["sync"] }
|
||||
@@ -20,6 +21,7 @@ zeroize = { workspace = true, features = ["zeroize_derive"] }
|
||||
|
||||
nym-crypto = { path = "../../crypto", features = ["asymmetric"] }
|
||||
nym-gateway-requests = { path = "../../gateway-requests" }
|
||||
nym-topology = { path = "../../topology", features = ["persistence"] }
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx]
|
||||
workspace = true
|
||||
|
||||
@@ -48,6 +48,6 @@ pub enum BadGateway {
|
||||
raw_listener: String,
|
||||
|
||||
#[source]
|
||||
source: url::ParseError,
|
||||
source: serde_json::Error,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -5,13 +5,13 @@ use crate::BadGateway;
|
||||
use cosmrs::AccountId;
|
||||
use nym_crypto::asymmetric::ed25519;
|
||||
use nym_gateway_requests::shared_key::{LegacySharedKeys, SharedGatewayKey, SharedSymmetricKey};
|
||||
use nym_topology::EntryDetails;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::ops::Deref;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use time::OffsetDateTime;
|
||||
use url::Url;
|
||||
use zeroize::{Zeroize, ZeroizeOnDrop};
|
||||
|
||||
pub const REMOTE_GATEWAY_TYPE: &str = "remote";
|
||||
@@ -67,7 +67,7 @@ impl GatewayDetails {
|
||||
gateway_id: ed25519::PublicKey,
|
||||
shared_key: Arc<SharedGatewayKey>,
|
||||
gateway_owner_address: Option<AccountId>,
|
||||
gateway_listener: Url,
|
||||
gateway_listener: EntryDetails,
|
||||
) -> Self {
|
||||
GatewayDetails::Remote(RemoteGatewayDetails {
|
||||
gateway_id,
|
||||
@@ -164,8 +164,7 @@ pub struct RegisteredGateway {
|
||||
pub gateway_type: GatewayType,
|
||||
}
|
||||
|
||||
#[derive(Debug, Zeroize, ZeroizeOnDrop, Serialize, Deserialize)]
|
||||
#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct RawRemoteGatewayDetails {
|
||||
pub gateway_id_bs58: String,
|
||||
pub derived_aes128_ctr_blake3_hmac_keys_bs58: Option<String>,
|
||||
@@ -174,6 +173,16 @@ pub struct RawRemoteGatewayDetails {
|
||||
pub gateway_listener: String,
|
||||
}
|
||||
|
||||
impl Zeroize for RawRemoteGatewayDetails {
|
||||
fn zeroize(&mut self) {
|
||||
self.gateway_id_bs58.zeroize();
|
||||
self.derived_aes128_ctr_blake3_hmac_keys_bs58.zeroize();
|
||||
self.derived_aes256_gcm_siv_key.zeroize();
|
||||
self.gateway_owner_address.zeroize();
|
||||
}
|
||||
}
|
||||
impl ZeroizeOnDrop for RawRemoteGatewayDetails {}
|
||||
|
||||
impl TryFrom<RawRemoteGatewayDetails> for RemoteGatewayDetails {
|
||||
type Error = BadGateway;
|
||||
|
||||
@@ -230,7 +239,7 @@ impl TryFrom<RawRemoteGatewayDetails> for RemoteGatewayDetails {
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
let gateway_listener = Url::parse(&value.gateway_listener).map_err(|source| {
|
||||
let gateway_listener = serde_json::from_str(&value.gateway_listener).map_err(|source| {
|
||||
BadGateway::MalformedListener {
|
||||
gateway_id: value.gateway_id_bs58.clone(),
|
||||
raw_listener: value.gateway_listener.clone(),
|
||||
@@ -255,12 +264,16 @@ impl<'a> From<&'a RemoteGatewayDetails> for RawRemoteGatewayDetails {
|
||||
SharedGatewayKey::Legacy(key) => (Some(key.to_base58_string()), None),
|
||||
};
|
||||
|
||||
#[allow(clippy::expect_used)]
|
||||
let gateway_listener = serde_json::to_string(&value.gateway_listener)
|
||||
.expect("serialize for gateway details should never fail");
|
||||
|
||||
RawRemoteGatewayDetails {
|
||||
gateway_id_bs58: value.gateway_id.to_base58_string(),
|
||||
derived_aes128_ctr_blake3_hmac_keys_bs58,
|
||||
derived_aes256_gcm_siv_key,
|
||||
gateway_owner_address: value.gateway_owner_address.as_ref().map(|o| o.to_string()),
|
||||
gateway_listener: value.gateway_listener.to_string(),
|
||||
gateway_listener,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -273,7 +286,7 @@ pub struct RemoteGatewayDetails {
|
||||
|
||||
pub gateway_owner_address: Option<AccountId>,
|
||||
|
||||
pub gateway_listener: Url,
|
||||
pub gateway_listener: EntryDetails,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
||||
@@ -2,10 +2,10 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use nym_crypto::asymmetric::ed25519;
|
||||
use nym_topology::EntryDetails;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt::{Display, Formatter};
|
||||
use time::OffsetDateTime;
|
||||
use url::Url;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct GatewayInfo {
|
||||
@@ -14,7 +14,7 @@ pub struct GatewayInfo {
|
||||
pub active: bool,
|
||||
|
||||
pub typ: String,
|
||||
pub endpoint: Option<Url>,
|
||||
pub endpoint: Option<EntryDetails>,
|
||||
}
|
||||
|
||||
impl Display for GatewayInfo {
|
||||
|
||||
@@ -561,7 +561,7 @@ where
|
||||
.gateway_owner_address
|
||||
.as_ref()
|
||||
.map(|o| o.to_string()),
|
||||
details.gateway_listener.to_string(),
|
||||
details.gateway_listener,
|
||||
);
|
||||
GatewayClient::new(
|
||||
GatewayClientConfig::new_default()
|
||||
|
||||
@@ -436,7 +436,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref mut next_delay) = &mut self.next_delay {
|
||||
if let Some(next_delay) = &mut self.next_delay {
|
||||
// it is not yet time to return a message
|
||||
if next_delay.as_mut().poll(cx).is_pending() {
|
||||
return Poll::Pending;
|
||||
|
||||
@@ -40,12 +40,8 @@ pub enum ClientCoreError {
|
||||
#[error("no gateway with id: {0}")]
|
||||
NoGatewayWithId(String),
|
||||
|
||||
#[error("Invalid URL: {0}")]
|
||||
InvalidUrl(String),
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
#[error("resolution failed: {0}")]
|
||||
ResolutionFailed(#[from] nym_http_api_client::ResolveError),
|
||||
#[error("Invalid Endpoint: {0}")]
|
||||
InvalidEndpoint(String),
|
||||
|
||||
#[error("no gateways on network")]
|
||||
NoGatewaysOnNetwork,
|
||||
|
||||
@@ -21,7 +21,7 @@ use url::Url;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use crate::init::websockets::connect_async;
|
||||
|
||||
use nym_topology::NodeId;
|
||||
use nym_topology::{EntryDetails, NodeId};
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::net::TcpStream;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
@@ -55,7 +55,7 @@ const PING_TIMEOUT: Duration = Duration::from_millis(1000);
|
||||
pub trait ConnectableGateway {
|
||||
fn node_id(&self) -> NodeId;
|
||||
fn identity(&self) -> ed25519::PublicKey;
|
||||
fn clients_address(&self, prefer_ipv6: bool) -> Option<String>;
|
||||
fn endpoint_details(&self) -> Option<EntryDetails>;
|
||||
fn is_wss(&self) -> bool;
|
||||
}
|
||||
|
||||
@@ -68,8 +68,8 @@ impl ConnectableGateway for RoutingNode {
|
||||
self.identity_key
|
||||
}
|
||||
|
||||
fn clients_address(&self, prefer_ipv6: bool) -> Option<String> {
|
||||
self.ws_entry_address(prefer_ipv6)
|
||||
fn endpoint_details(&self) -> Option<EntryDetails> {
|
||||
self.entry.clone()
|
||||
}
|
||||
|
||||
fn is_wss(&self) -> bool {
|
||||
@@ -191,7 +191,7 @@ pub async fn gateways_for_init(
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
async fn connect(endpoint: &str) -> Result<WsConn, ClientCoreError> {
|
||||
async fn connect(endpoint: &EntryDetails) -> Result<WsConn, ClientCoreError> {
|
||||
match tokio::time::timeout(CONN_TIMEOUT, connect_async(endpoint)).await {
|
||||
Err(_elapsed) => Err(ClientCoreError::GatewayConnectionTimeout),
|
||||
Ok(Err(conn_failure)) => Err(conn_failure),
|
||||
@@ -208,17 +208,17 @@ async fn measure_latency<G>(gateway: &G) -> Result<GatewayWithLatency<'_, G>, Cl
|
||||
where
|
||||
G: ConnectableGateway,
|
||||
{
|
||||
let Some(addr) = gateway.clients_address(false) else {
|
||||
let Some(endpoint) = gateway.endpoint_details() else {
|
||||
return Err(ClientCoreError::UnsupportedEntry {
|
||||
id: gateway.node_id(),
|
||||
identity: gateway.identity().to_string(),
|
||||
});
|
||||
};
|
||||
trace!(
|
||||
"establishing connection to {} ({addr})...",
|
||||
"establishing connection to {} ({endpoint})...",
|
||||
gateway.identity(),
|
||||
);
|
||||
let mut stream = connect(&addr).await?;
|
||||
let mut stream = connect(&endpoint).await?;
|
||||
|
||||
let mut results = Vec::new();
|
||||
for _ in 0..MEASUREMENTS {
|
||||
@@ -379,7 +379,7 @@ pub(super) fn get_specified_gateway(
|
||||
|
||||
pub(super) async fn register_with_gateway(
|
||||
gateway_id: ed25519::PublicKey,
|
||||
gateway_listener: Url,
|
||||
gateway_listener: EntryDetails,
|
||||
our_identity: Arc<ed25519::KeyPair>,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
) -> Result<RegistrationResult, ClientCoreError> {
|
||||
|
||||
@@ -14,6 +14,7 @@ use nym_gateway_client::client::InitGatewayClient;
|
||||
use nym_gateway_requests::shared_key::SharedGatewayKey;
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_topology::node::RoutingNode;
|
||||
use nym_topology::EntryDetails;
|
||||
use nym_validator_client::client::IdentityKey;
|
||||
use nym_validator_client::nyxd::AccountId;
|
||||
use serde::Serialize;
|
||||
@@ -22,7 +23,6 @@ use std::fmt::{Debug, Display};
|
||||
use std::os::fd::RawFd;
|
||||
use std::sync::Arc;
|
||||
use time::OffsetDateTime;
|
||||
use url::Url;
|
||||
|
||||
pub enum SelectedGateway {
|
||||
Remote {
|
||||
@@ -30,7 +30,7 @@ pub enum SelectedGateway {
|
||||
|
||||
gateway_owner_address: Option<AccountId>,
|
||||
|
||||
gateway_listener: Url,
|
||||
gateway_listener: EntryDetails,
|
||||
},
|
||||
Custom {
|
||||
gateway_id: ed25519::PublicKey,
|
||||
@@ -46,25 +46,25 @@ impl SelectedGateway {
|
||||
// for now, let's use 'old' behaviour, if you want to change it, you can pass it up the enum stack yourself : )
|
||||
let prefer_ipv6 = false;
|
||||
|
||||
let gateway_listener = if must_use_tls {
|
||||
node.ws_entry_address_tls()
|
||||
.ok_or(ClientCoreError::UnsupportedWssProtocol {
|
||||
gateway: node.identity_key.to_base58_string(),
|
||||
})?
|
||||
} else {
|
||||
node.ws_entry_address(prefer_ipv6)
|
||||
.ok_or(ClientCoreError::UnsupportedEntry {
|
||||
id: node.node_id,
|
||||
identity: node.identity_key.to_base58_string(),
|
||||
})?
|
||||
};
|
||||
let gateway_listener = node.entry.ok_or(ClientCoreError::UnsupportedEntry {
|
||||
id: node.node_id,
|
||||
identity: node.identity_key.to_base58_string(),
|
||||
})?;
|
||||
|
||||
let gateway_listener =
|
||||
Url::parse(&gateway_listener).map_err(|source| ClientCoreError::MalformedListener {
|
||||
gateway_id: node.identity_key.to_base58_string(),
|
||||
raw_listener: gateway_listener,
|
||||
source,
|
||||
})?;
|
||||
if must_use_tls
|
||||
&& (gateway_listener.hostname.is_none() || gateway_listener.clients_wss_port.is_none())
|
||||
{
|
||||
return Err(ClientCoreError::UnsupportedWssProtocol {
|
||||
gateway: node.identity_key.to_base58_string(),
|
||||
});
|
||||
}
|
||||
|
||||
if prefer_ipv6 && gateway_listener.ip_addresses.iter().all(|i| i.is_ipv4()) {
|
||||
return Err(ClientCoreError::UnsupportedEntry {
|
||||
id: node.node_id,
|
||||
identity: node.identity_key.to_base58_string(),
|
||||
});
|
||||
}
|
||||
|
||||
Ok(SelectedGateway::Remote {
|
||||
gateway_id: node.identity_key,
|
||||
|
||||
@@ -1,43 +1,60 @@
|
||||
use crate::error::ClientCoreError;
|
||||
|
||||
use nym_http_api_client::HickoryDnsResolver;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use nym_topology::EntryDetails;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
|
||||
use tungstenite::handshake::client::Response;
|
||||
use url::{Host, Url};
|
||||
use url::Url;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub(crate) async fn connect_async(
|
||||
endpoint: &str,
|
||||
endpoint: &EntryDetails,
|
||||
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), ClientCoreError> {
|
||||
let resolver = HickoryDnsResolver::default();
|
||||
let uri = Url::parse(endpoint).map_err(|_| ClientCoreError::InvalidUrl(endpoint.to_owned()))?;
|
||||
let uri = ws_entry_address(endpoint, false)
|
||||
.ok_or(ClientCoreError::InvalidEndpoint(endpoint.to_string()))?;
|
||||
let port: u16 = uri.port_or_known_default().unwrap_or(443);
|
||||
|
||||
let host = uri
|
||||
.host()
|
||||
.ok_or(ClientCoreError::InvalidUrl(endpoint.to_owned()))?;
|
||||
|
||||
// Get address for tcp connection, if a domain is provided use our preferred resolver rather than
|
||||
// the default std resolve
|
||||
let sock_addrs: Vec<SocketAddr> = match host {
|
||||
Host::Ipv4(addr) => vec![SocketAddr::new(addr.into(), port)],
|
||||
Host::Ipv6(addr) => vec![SocketAddr::new(addr.into(), port)],
|
||||
Host::Domain(domain) => {
|
||||
// Do a DNS lookup for the domain using our custom DNS resolver
|
||||
resolver
|
||||
.resolve_str(domain)
|
||||
.await?
|
||||
.map(|a| SocketAddr::new(a, port))
|
||||
.collect()
|
||||
}
|
||||
};
|
||||
let sock_addrs: Vec<SocketAddr> = endpoint
|
||||
.ip_addresses
|
||||
.iter()
|
||||
.map(|addr| SocketAddr::new(addr.clone(), port))
|
||||
.collect();
|
||||
|
||||
let stream = TcpStream::connect(&sock_addrs[..]).await?;
|
||||
|
||||
tokio_tungstenite::client_async_tls(endpoint, stream)
|
||||
tokio_tungstenite::client_async_tls(uri.to_string(), stream)
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
pub fn ws_entry_address_tls(entry: &EntryDetails) -> Option<Url> {
|
||||
let hostname = entry.hostname.as_ref()?;
|
||||
let wss_port = entry.clients_wss_port?;
|
||||
|
||||
Url::parse(&format!("wss://{hostname}:{wss_port}")).ok()
|
||||
}
|
||||
|
||||
pub fn ws_entry_address_no_tls(entry: &EntryDetails, prefer_ipv6: bool) -> Option<Url> {
|
||||
if let Some(hostname) = entry.hostname.as_ref() {
|
||||
return Url::parse(&format!("ws://{hostname}:{}", entry.clients_ws_port)).ok();
|
||||
}
|
||||
|
||||
if prefer_ipv6 {
|
||||
if let Some(ipv6) = entry.ip_addresses.iter().find(|ip| ip.is_ipv6()) {
|
||||
return Url::parse(&format!("ws://{ipv6}:{}", entry.clients_ws_port)).ok();
|
||||
}
|
||||
}
|
||||
|
||||
let any_ip = entry.ip_addresses.first()?;
|
||||
Url::parse(&format!("ws://{any_ip}:{}", entry.clients_ws_port)).ok()
|
||||
}
|
||||
|
||||
pub fn ws_entry_address(entry: &EntryDetails, prefer_ipv6: bool) -> Option<Url> {
|
||||
if let Some(tls) = ws_entry_address_tls(entry) {
|
||||
return Some(tls);
|
||||
}
|
||||
ws_entry_address_no_tls(entry, prefer_ipv6)
|
||||
}
|
||||
|
||||
@@ -27,11 +27,11 @@ nym-credential-storage = { path = "../../credential-storage" }
|
||||
nym-credentials-interface = { path = "../../credentials-interface" }
|
||||
nym-crypto = { path = "../../crypto" }
|
||||
nym-gateway-requests = { path = "../../gateway-requests" }
|
||||
nym-http-api-client = { path = "../../http-api-client" }
|
||||
nym-network-defaults = { path = "../../network-defaults" }
|
||||
nym-sphinx = { path = "../../nymsphinx" }
|
||||
nym-statistics-common = { path = "../../statistics" }
|
||||
nym-pemstore = { path = "../../pemstore" }
|
||||
nym-topology = { path = "../../topology", features = ["persistence"] }
|
||||
nym-validator-client = { path = "../validator-client", default-features = false }
|
||||
nym-task = { path = "../../task" }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
|
||||
@@ -28,13 +28,13 @@ use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use nym_statistics_common::clients::connection::ConnectionStatsEvent;
|
||||
use nym_statistics_common::clients::ClientStatsSender;
|
||||
use nym_task::ShutdownToken;
|
||||
use nym_topology::EntryDetails;
|
||||
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
|
||||
use rand::rngs::OsRng;
|
||||
use std::sync::Arc;
|
||||
use tracing::instrument;
|
||||
use tracing::*;
|
||||
use tungstenite::protocol::Message;
|
||||
use url::Url;
|
||||
|
||||
#[cfg(unix)]
|
||||
use std::os::fd::RawFd;
|
||||
@@ -62,14 +62,14 @@ pub struct GatewayConfig {
|
||||
// currently a dead field
|
||||
pub gateway_owner: Option<String>,
|
||||
|
||||
pub gateway_listener: String,
|
||||
pub gateway_listener: EntryDetails,
|
||||
}
|
||||
|
||||
impl GatewayConfig {
|
||||
pub fn new(
|
||||
gateway_identity: ed25519::PublicKey,
|
||||
gateway_owner: Option<String>,
|
||||
gateway_listener: String,
|
||||
gateway_listener: EntryDetails,
|
||||
) -> Self {
|
||||
GatewayConfig {
|
||||
gateway_identity,
|
||||
@@ -92,7 +92,7 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {
|
||||
|
||||
authenticated: bool,
|
||||
bandwidth: ClientBandwidth,
|
||||
gateway_address: String,
|
||||
gateway_address: EntryDetails,
|
||||
gateway_identity: ed25519::PublicKey,
|
||||
local_identity: Arc<ed25519::KeyPair>,
|
||||
shared_key: Option<Arc<SharedGatewayKey>>,
|
||||
@@ -201,7 +201,7 @@ impl<C, St> GatewayClient<C, St> {
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub async fn establish_connection(&mut self) -> Result<(), GatewayClientError> {
|
||||
debug!(
|
||||
"Attempting to establish connection to gateway at: {}",
|
||||
"Attempting to establish connection to gateway at: {:?}",
|
||||
self.gateway_address
|
||||
);
|
||||
let (ws_stream, _) = connect_async(
|
||||
@@ -467,7 +467,9 @@ impl<C, St> GatewayClient<C, St> {
|
||||
// right now there are no failure cases here, but this might change in the future
|
||||
match gateway_protocol {
|
||||
None => {
|
||||
warn!("the gateway we're connected to has not specified its protocol version. It's probably running version < 1.1.X, but that's still fine for now. It will become a hard error in 1.2.0");
|
||||
warn!(
|
||||
"the gateway we're connected to has not specified its protocol version. It's probably running version < 1.1.X, but that's still fine for now. It will become a hard error in 1.2.0"
|
||||
);
|
||||
// note: in +1.2.0 we will have to return a hard error here
|
||||
Ok(())
|
||||
}
|
||||
@@ -481,7 +483,9 @@ impl<C, St> GatewayClient<C, St> {
|
||||
}
|
||||
|
||||
Some(_) => {
|
||||
debug!("the gateway is using exactly the same (or older) protocol version as we are. We're good to continue!");
|
||||
debug!(
|
||||
"the gateway is using exactly the same (or older) protocol version as we are. We're good to continue!"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -527,7 +531,7 @@ impl<C, St> GatewayClient<C, St> {
|
||||
status,
|
||||
} => (status, protocol_version),
|
||||
ServerResponse::Error { message } => {
|
||||
return Err(GatewayClientError::GatewayError(message))
|
||||
return Err(GatewayClientError::GatewayError(message));
|
||||
}
|
||||
other => return Err(GatewayClientError::UnexpectedResponse { name: other.name() }),
|
||||
};
|
||||
@@ -589,7 +593,7 @@ impl<C, St> GatewayClient<C, St> {
|
||||
{
|
||||
ServerResponse::EncryptedResponse { ciphertext, nonce } => (ciphertext, nonce),
|
||||
ServerResponse::Error { message } => {
|
||||
return Err(GatewayClientError::GatewayError(message))
|
||||
return Err(GatewayClientError::GatewayError(message));
|
||||
}
|
||||
other => return Err(GatewayClientError::UnexpectedResponse { name: other.name() }),
|
||||
};
|
||||
@@ -858,7 +862,9 @@ impl<C, St> GatewayClient<C, St> {
|
||||
|
||||
warn!("Not enough bandwidth. Trying to get more bandwidth, this might take a while");
|
||||
if !self.cfg.bandwidth.require_tickets {
|
||||
info!("The client is running in disabled credentials mode - attempting to claim bandwidth without a credential");
|
||||
info!(
|
||||
"The client is running in disabled credentials mode - attempting to claim bandwidth without a credential"
|
||||
);
|
||||
return self.try_claim_testnet_bandwidth().await;
|
||||
}
|
||||
|
||||
@@ -896,7 +902,9 @@ impl<C, St> GatewayClient<C, St> {
|
||||
Err(err) => {
|
||||
error!("failed to claim ecash bandwidth with the gateway...: {err}");
|
||||
if err.is_ticket_replay() {
|
||||
warn!("this was due to our ticket being replayed! have you messed with the database file?")
|
||||
warn!(
|
||||
"this was due to our ticket being replayed! have you messed with the database file?"
|
||||
)
|
||||
} else {
|
||||
// TODO: tracing span
|
||||
info!("attempting to revert ticket withdrawal...");
|
||||
@@ -1112,7 +1120,9 @@ impl<C, St> GatewayClient<C, St> {
|
||||
self.cfg
|
||||
.bandwidth
|
||||
.ensure_above_cutoff(bandwidth_remaining)?;
|
||||
info!("Claiming more bandwidth with existing credentials. Stop the process now if you don't want that to happen.");
|
||||
info!(
|
||||
"Claiming more bandwidth with existing credentials. Stop the process now if you don't want that to happen."
|
||||
);
|
||||
self.claim_bandwidth().await?;
|
||||
}
|
||||
Ok(())
|
||||
@@ -1128,7 +1138,7 @@ pub struct InitOnly;
|
||||
impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
|
||||
// for initialisation we do not need credential storage. Though it's still a bit weird we have to set the generic...
|
||||
pub fn new_init(
|
||||
gateway_listener: Url,
|
||||
gateway_listener: EntryDetails,
|
||||
gateway_identity: ed25519::PublicKey,
|
||||
local_identity: Arc<ed25519::KeyPair>,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
@@ -1147,7 +1157,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
|
||||
cfg: GatewayClientConfig::default().with_disabled_credentials_mode(true),
|
||||
authenticated: false,
|
||||
bandwidth: ClientBandwidth::new_empty(),
|
||||
gateway_address: gateway_listener.to_string(),
|
||||
gateway_address: gateway_listener.clone(),
|
||||
gateway_identity,
|
||||
local_identity,
|
||||
shared_key: None,
|
||||
|
||||
@@ -1,51 +1,37 @@
|
||||
use crate::error::GatewayClientError;
|
||||
|
||||
use nym_http_api_client::HickoryDnsResolver;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use nym_topology::EntryDetails;
|
||||
#[cfg(unix)]
|
||||
use std::{
|
||||
os::fd::{AsRawFd, RawFd},
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::net::TcpSocket;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
|
||||
use tungstenite::handshake::client::Response;
|
||||
use url::{Host, Url};
|
||||
use url::Url;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub(crate) async fn connect_async(
|
||||
endpoint: &str,
|
||||
endpoint: &EntryDetails,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), GatewayClientError> {
|
||||
use tokio::net::TcpSocket;
|
||||
|
||||
let resolver = HickoryDnsResolver::default();
|
||||
let uri =
|
||||
Url::parse(endpoint).map_err(|_| GatewayClientError::InvalidUrl(endpoint.to_owned()))?;
|
||||
let uri = ws_entry_address(endpoint, false)
|
||||
.ok_or(GatewayClientError::InvalidEndpoint(endpoint.to_string()))?;
|
||||
let port: u16 = uri.port_or_known_default().unwrap_or(443);
|
||||
|
||||
let host = uri
|
||||
.host()
|
||||
.ok_or(GatewayClientError::InvalidUrl(endpoint.to_owned()))?;
|
||||
|
||||
// Get address for tcp connection, if a domain is provided use our preferred resolver rather than
|
||||
// the default std resolve
|
||||
let sock_addrs: Vec<SocketAddr> = match host {
|
||||
Host::Ipv4(addr) => vec![SocketAddr::new(addr.into(), port)],
|
||||
Host::Ipv6(addr) => vec![SocketAddr::new(addr.into(), port)],
|
||||
Host::Domain(domain) => {
|
||||
// Do a DNS lookup for the domain using our custom DNS resolver
|
||||
resolver
|
||||
.resolve_str(domain)
|
||||
.await?
|
||||
.map(|a| SocketAddr::new(a, port))
|
||||
.collect()
|
||||
}
|
||||
};
|
||||
let sock_addrs = endpoint
|
||||
.ip_addresses
|
||||
.iter()
|
||||
.map(|addr| SocketAddr::new(*addr, port));
|
||||
let uri_str = uri.to_string();
|
||||
|
||||
let mut stream = Err(GatewayClientError::NoEndpointForConnection {
|
||||
address: endpoint.to_owned(),
|
||||
address: uri_str.clone(),
|
||||
});
|
||||
for sock_addr in sock_addrs {
|
||||
let socket = if sock_addr.is_ipv4() {
|
||||
@@ -54,7 +40,7 @@ pub(crate) async fn connect_async(
|
||||
TcpSocket::new_v6()
|
||||
}
|
||||
.map_err(|err| GatewayClientError::NetworkConnectionFailed {
|
||||
address: endpoint.to_owned(),
|
||||
address: uri_str.clone(),
|
||||
source: Box::new(tungstenite::Error::from(err)),
|
||||
})?;
|
||||
|
||||
@@ -70,7 +56,7 @@ pub(crate) async fn connect_async(
|
||||
}
|
||||
Err(err) => {
|
||||
stream = Err(GatewayClientError::NetworkConnectionFailed {
|
||||
address: endpoint.to_owned(),
|
||||
address: uri_str.clone(),
|
||||
source: Box::new(tungstenite::Error::from(err)),
|
||||
});
|
||||
continue;
|
||||
@@ -78,10 +64,39 @@ pub(crate) async fn connect_async(
|
||||
}
|
||||
}
|
||||
|
||||
tokio_tungstenite::client_async_tls(endpoint, stream?)
|
||||
tokio_tungstenite::client_async_tls(uri.clone(), stream?)
|
||||
.await
|
||||
.map_err(|error| GatewayClientError::NetworkConnectionFailed {
|
||||
address: endpoint.to_owned(),
|
||||
address: uri_str.clone(),
|
||||
source: Box::new(error),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn ws_entry_address_tls(entry: &EntryDetails) -> Option<Url> {
|
||||
let hostname = entry.hostname.as_ref()?;
|
||||
let wss_port = entry.clients_wss_port?;
|
||||
|
||||
Url::parse(&format!("wss://{hostname}:{wss_port}")).ok()
|
||||
}
|
||||
|
||||
pub fn ws_entry_address_no_tls(entry: &EntryDetails, prefer_ipv6: bool) -> Option<Url> {
|
||||
if let Some(hostname) = entry.hostname.as_ref() {
|
||||
return Url::parse(&format!("ws://{hostname}:{}", entry.clients_ws_port)).ok();
|
||||
}
|
||||
|
||||
if prefer_ipv6 {
|
||||
if let Some(ipv6) = entry.ip_addresses.iter().find(|ip| ip.is_ipv6()) {
|
||||
return Url::parse(&format!("ws://{ipv6}:{}", entry.clients_ws_port)).ok();
|
||||
}
|
||||
}
|
||||
|
||||
let any_ip = entry.ip_addresses.first()?;
|
||||
Url::parse(&format!("ws://{any_ip}:{}", entry.clients_ws_port)).ok()
|
||||
}
|
||||
|
||||
pub fn ws_entry_address(entry: &EntryDetails, prefer_ipv6: bool) -> Option<Url> {
|
||||
if let Some(tls) = ws_entry_address_tls(entry) {
|
||||
return Some(tls);
|
||||
}
|
||||
ws_entry_address_no_tls(entry, prefer_ipv6)
|
||||
}
|
||||
|
||||
@@ -49,12 +49,8 @@ pub enum GatewayClientError {
|
||||
#[error("no socket address for endpoint: {address}")]
|
||||
NoEndpointForConnection { address: String },
|
||||
|
||||
#[error("Invalid URL: {0}")]
|
||||
InvalidUrl(String),
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
#[error("resolution failed: {0}")]
|
||||
ResolutionFailed(#[from] nym_http_api_client::ResolveError),
|
||||
#[error("Invalid Endpoint: {0}")]
|
||||
InvalidEndpoint(String),
|
||||
|
||||
#[error("No shared key was provided or obtained")]
|
||||
NoSharedKeyAvailable,
|
||||
|
||||
+363
-129
@@ -3,28 +3,41 @@
|
||||
|
||||
//! DNS resolver configuration for internal lookups.
|
||||
//!
|
||||
//! The resolver itself is the set combination of the google, cloudflare, and quad9 endpoints
|
||||
//! supporting DoH and DoT.
|
||||
//! The resolver itself is the set combination of the cloudflare, and quad9 endpoints supporting DoH
|
||||
//! and DoT.
|
||||
//!
|
||||
//! This resolver supports a fallback mechanism where, should the DNS-over-TLS resolution fail, a
|
||||
//! followup resolution will be done using the hosts configured default (e.g. `/etc/resolve.conf` on
|
||||
//! linux). This is disabled by default and can be enabled using [`enable_system_fallback`].
|
||||
//!
|
||||
//! Requires the `dns-over-https-rustls`, `webpki-roots` feature for the
|
||||
//! `hickory-resolver` crate
|
||||
//!
|
||||
//!
|
||||
//! Note: The hickory DoH resolver can cause warning logs about H2 connection failure. This
|
||||
//! indicates that the long lived https connection was closed by the remote peer and the resolver
|
||||
//! will have to reconnect. It should not impact actual functionality.
|
||||
//!
|
||||
//! code ref: https://github.com/hickory-dns/hickory-dns/blob/06a8b1ce9bd9322d8e6accf857d30257e1274427/crates/proto/src/h2/h2_client_stream.rs#L534
|
||||
//!
|
||||
//! example log:
|
||||
//!
|
||||
//! ```txt
|
||||
//! WARN /home/ubuntu/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/hickory-proto-0.24.3/src/h2/h2_client_stream.rs:493: h2 connection failed: unexpected end of file
|
||||
//! ```rust
|
||||
//! use nym_http_api_client::HickoryDnsResolver;
|
||||
//! # use nym_http_api_client::ResolveError;
|
||||
//! # type Err = ResolveError;
|
||||
//! # async fn run() -> Result<(), Err> {
|
||||
//! let resolver = HickoryDnsResolver::default();
|
||||
//! resolver.resolve_str("example.com").await?;
|
||||
//! # Ok(())
|
||||
//! # }
|
||||
//! ```
|
||||
//!
|
||||
//! ## Fallbacks
|
||||
//!
|
||||
//! **System Resolver --** This resolver supports an optional fallback mechanism where, should the
|
||||
//! DNS-over-TLS resolution fail, a followup resolution will be done using the hosts configured
|
||||
//! default (e.g. `/etc/resolve.conf` on linux).
|
||||
//!
|
||||
//! This is disabled by default and can be enabled using `enable_system_fallback`.
|
||||
//!
|
||||
//! **Static Table --** There is also a second optional fallback mechanism that allows a static map
|
||||
//! to be used as a last resort. This can help when DNS encounters errors due to blocked resolvers
|
||||
//! or unknown conditions. This is enabled by default, and can be customized if building a new
|
||||
//! resolver.
|
||||
//!
|
||||
//! ## IPv4 / IPv6
|
||||
//!
|
||||
//! By default the resolver uses only IPv4 nameservers, and is configured to do `A` lookups first,
|
||||
//! and only do `AAAA` if no `A` record is available.
|
||||
//!
|
||||
//! ---
|
||||
//!
|
||||
//! Requires the `dns-over-https-rustls`, `webpki-roots` feature for the `hickory-resolver` crate
|
||||
#![deny(missing_docs)]
|
||||
|
||||
use crate::ClientBuilder;
|
||||
@@ -39,7 +52,7 @@ use std::{
|
||||
|
||||
use hickory_resolver::{
|
||||
TokioResolver,
|
||||
config::{LookupIpStrategy, NameServerConfigGroup, ResolverConfig},
|
||||
config::{NameServerConfig, NameServerConfigGroup, ResolverConfig, ResolverOpts},
|
||||
lookup_ip::LookupIpIntoIter,
|
||||
name_server::TokioConnectionProvider,
|
||||
};
|
||||
@@ -49,7 +62,11 @@ use tracing::*;
|
||||
|
||||
mod constants;
|
||||
mod static_resolver;
|
||||
pub use static_resolver::*;
|
||||
pub(crate) use static_resolver::*;
|
||||
|
||||
pub(crate) const DEFAULT_POSITIVE_LOOKUP_CACHE_TTL: Duration = Duration::from_secs(1800);
|
||||
pub(crate) const DEFAULT_OVERALL_LOOKUP_TIMEOUT: Duration = Duration::from_secs(6);
|
||||
pub(crate) const DEFAULT_QUERY_TIMEOUT: Duration = Duration::from_secs(3);
|
||||
|
||||
impl ClientBuilder {
|
||||
/// Override the DNS resolver implementation used by the underlying http client.
|
||||
@@ -71,7 +88,10 @@ impl ClientBuilder {
|
||||
// but tools like valgrind might report "memory leaks" as it isn't obvious this is intentional.
|
||||
static SHARED_RESOLVER: LazyLock<HickoryDnsResolver> = LazyLock::new(|| {
|
||||
tracing::debug!("Initializing shared DNS resolver");
|
||||
HickoryDnsResolver::default()
|
||||
HickoryDnsResolver {
|
||||
use_shared: false, // prevent infinite recursion
|
||||
..Default::default()
|
||||
}
|
||||
});
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -111,7 +131,7 @@ pub struct HickoryDnsResolver {
|
||||
state: Arc<OnceCell<TokioResolver>>,
|
||||
fallback: Option<Arc<OnceCell<TokioResolver>>>,
|
||||
static_base: Option<Arc<OnceCell<StaticResolver>>>,
|
||||
dont_use_shared: bool,
|
||||
use_shared: bool,
|
||||
/// Overall timeout for dns lookup associated with any individual host resolution. For example,
|
||||
/// use of retries, server_ordering_strategy, etc. ends absolutely if this timeout is reached.
|
||||
overall_dns_timeout: Duration,
|
||||
@@ -122,9 +142,9 @@ impl Default for HickoryDnsResolver {
|
||||
Self {
|
||||
state: Default::default(),
|
||||
fallback: Default::default(),
|
||||
static_base: Default::default(),
|
||||
dont_use_shared: Default::default(),
|
||||
overall_dns_timeout: Duration::from_secs(10),
|
||||
static_base: Some(Default::default()),
|
||||
use_shared: true,
|
||||
overall_dns_timeout: DEFAULT_OVERALL_LOOKUP_TIMEOUT,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -134,7 +154,7 @@ impl Resolve for HickoryDnsResolver {
|
||||
let resolver = self.state.clone();
|
||||
let maybe_fallback = self.fallback.clone();
|
||||
let maybe_static = self.static_base.clone();
|
||||
let independent = self.dont_use_shared;
|
||||
let use_shared = self.use_shared;
|
||||
let overall_dns_timeout = self.overall_dns_timeout;
|
||||
Box::pin(async move {
|
||||
resolve(
|
||||
@@ -142,7 +162,7 @@ impl Resolve for HickoryDnsResolver {
|
||||
resolver,
|
||||
maybe_fallback,
|
||||
maybe_static,
|
||||
independent,
|
||||
use_shared,
|
||||
overall_dns_timeout,
|
||||
)
|
||||
.await
|
||||
@@ -236,7 +256,7 @@ impl HickoryDnsResolver {
|
||||
self.state.clone(),
|
||||
self.fallback.clone(),
|
||||
self.static_base.clone(),
|
||||
self.dont_use_shared,
|
||||
self.use_shared,
|
||||
self.overall_dns_timeout,
|
||||
)
|
||||
.await
|
||||
@@ -246,25 +266,25 @@ impl HickoryDnsResolver {
|
||||
/// Create a (lazy-initialized) resolver that is not shared across threads.
|
||||
pub fn thread_resolver() -> Self {
|
||||
Self {
|
||||
dont_use_shared: true,
|
||||
use_shared: false,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn new_resolver(dont_use_shared: bool) -> Result<TokioResolver, ResolveError> {
|
||||
fn new_resolver(use_shared: bool) -> Result<TokioResolver, ResolveError> {
|
||||
// using a closure here is slightly gross, but this makes sure that if the
|
||||
// lazy-init returns an error it can be handled by the client
|
||||
if dont_use_shared {
|
||||
if !use_shared {
|
||||
new_resolver()
|
||||
} else {
|
||||
Ok(SHARED_RESOLVER.state.get_or_try_init(new_resolver)?.clone())
|
||||
}
|
||||
}
|
||||
|
||||
fn new_resolver_system(dont_use_shared: bool) -> Result<TokioResolver, ResolveError> {
|
||||
fn new_resolver_system(use_shared: bool) -> Result<TokioResolver, ResolveError> {
|
||||
// using a closure here is slightly gross, but this makes sure that if the
|
||||
// lazy-init returns an error it can be handled by the client
|
||||
if dont_use_shared || SHARED_RESOLVER.fallback.is_none() {
|
||||
if !use_shared || SHARED_RESOLVER.fallback.is_none() {
|
||||
new_resolver_system()
|
||||
} else {
|
||||
Ok(SHARED_RESOLVER
|
||||
@@ -276,8 +296,8 @@ impl HickoryDnsResolver {
|
||||
}
|
||||
}
|
||||
|
||||
fn new_static_fallback(dont_use_shared: bool) -> StaticResolver {
|
||||
if !dont_use_shared && let Some(ref shared_resolver) = SHARED_RESOLVER.static_base {
|
||||
fn new_static_fallback(use_shared: bool) -> StaticResolver {
|
||||
if use_shared && let Some(ref shared_resolver) = SHARED_RESOLVER.static_base {
|
||||
shared_resolver
|
||||
.get_or_init(new_default_static_fallback)
|
||||
.clone()
|
||||
@@ -294,6 +314,11 @@ impl HickoryDnsResolver {
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.get_or_try_init(new_resolver_system)?;
|
||||
|
||||
// IF THIS INSTANCE IS A FRONT FOR THE SHARED RESOLVER SHOULDN'T THIS FN ENABLE THE SYSTEM FALLBACK FOR THE SHARED RESOLVER TOO?
|
||||
// if self.use_shared {
|
||||
// SHARED_RESOLVER.enable_system_fallback()?;
|
||||
// }
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -301,6 +326,11 @@ impl HickoryDnsResolver {
|
||||
/// returned immediately
|
||||
pub fn disable_system_fallback(&mut self) {
|
||||
self.fallback = None;
|
||||
|
||||
// // IF THIS INSTANCE IS A FRONT FOR THE SHARED RESOLVER SHOULDN'T THIS FN ENABLE THE SYSTEM FALLBACK FOR THE SHARED RESOLVER TOO?
|
||||
// if self.use_shared {
|
||||
// SHARED_RESOLVER.fallback = None;
|
||||
// }
|
||||
}
|
||||
|
||||
/// Get the current map of hostname to address in use by the fallback static lookup if one
|
||||
@@ -316,39 +346,122 @@ impl HickoryDnsResolver {
|
||||
.expect("infallible assign");
|
||||
self.static_base = Some(Arc::new(cell));
|
||||
}
|
||||
|
||||
/// Successfully resolved addresses are cached for a minimum of 30 minutes
|
||||
/// Individual lookup Timeouts are set to 3 seconds
|
||||
/// Number of retries after lookup failure before giving up is set to (default) to 2
|
||||
/// Lookup order is set to (default) A then AAAA
|
||||
/// Number or parallel lookup is set to (default) 2
|
||||
/// Nameserver selection uses the (default) EWMA statistics / performance based strategy
|
||||
fn default_options() -> ResolverOpts {
|
||||
let mut opts = ResolverOpts::default();
|
||||
// Always cache successful responses for queries received by this resolver for 30 min minimum.
|
||||
opts.positive_min_ttl = Some(DEFAULT_POSITIVE_LOOKUP_CACHE_TTL);
|
||||
opts.timeout = DEFAULT_QUERY_TIMEOUT;
|
||||
|
||||
opts
|
||||
}
|
||||
|
||||
/// Get the list of currently available nameserver configs.
|
||||
pub fn all_configured_name_servers(&self) -> Vec<NameServerConfig> {
|
||||
default_nameserver_group().to_vec()
|
||||
}
|
||||
|
||||
/// Get the list of currently used nameserver configs.
|
||||
pub fn active_name_servers(&self) -> Vec<NameServerConfig> {
|
||||
if !self.use_shared {
|
||||
return self
|
||||
.state
|
||||
.get()
|
||||
.map(|r| r.config().name_servers().to_vec())
|
||||
.unwrap_or(self.all_configured_name_servers());
|
||||
}
|
||||
|
||||
SHARED_RESOLVER.active_name_servers()
|
||||
}
|
||||
|
||||
/// Do a trial resolution using each nameserver individually to test which are working and which
|
||||
/// fail to complete a lookup. This will always try the full set of default configured resolvers.
|
||||
pub async fn trial_nameservers(&self) {
|
||||
let nameservers = default_nameserver_group();
|
||||
for (ns, result) in trial_nameservers_inner(&nameservers).await {
|
||||
if let Err(e) = result {
|
||||
warn!("trial {ns:?} errored: {e}");
|
||||
} else {
|
||||
info!("trial {ns:?} succeeded");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new resolver with a custom DoT based configuration. The options are overridden to look
|
||||
/// up for both IPv4 and IPv6 addresses to work with "happy eyeballs" algorithm.
|
||||
///
|
||||
/// Timeout Defaults to 5 seconds
|
||||
/// Individual lookup Timeouts are set to 3 seconds
|
||||
/// Number of retries after lookup failure before giving up Defaults to 2
|
||||
/// Lookup order is set to (default) A then AAAA
|
||||
///
|
||||
/// Caches successfully resolved addresses for 30 minutes to prevent continual use of remote lookup.
|
||||
/// This resolver is intended to be used for OUR API endpoints that do not rapidly rotate IPs.
|
||||
fn new_resolver() -> Result<TokioResolver, ResolveError> {
|
||||
info!("building new configured resolver");
|
||||
let name_servers = default_nameserver_group_ipv4_only();
|
||||
|
||||
let mut name_servers = NameServerConfigGroup::quad9_tls();
|
||||
name_servers.merge(NameServerConfigGroup::quad9_https());
|
||||
name_servers.merge(NameServerConfigGroup::cloudflare_tls());
|
||||
name_servers.merge(NameServerConfigGroup::cloudflare_https());
|
||||
|
||||
configure_and_build_resolver(name_servers)
|
||||
Ok(configure_and_build_resolver(name_servers))
|
||||
}
|
||||
|
||||
fn configure_and_build_resolver(
|
||||
name_servers: NameServerConfigGroup,
|
||||
) -> Result<TokioResolver, ResolveError> {
|
||||
fn configure_and_build_resolver<G>(name_servers: G) -> TokioResolver
|
||||
where
|
||||
G: Into<NameServerConfigGroup>,
|
||||
{
|
||||
let options = HickoryDnsResolver::default_options();
|
||||
let name_servers: NameServerConfigGroup = name_servers.into();
|
||||
info!("building new configured resolver");
|
||||
debug!("configuring resolver with {options:?}, {name_servers:?}");
|
||||
|
||||
let config = ResolverConfig::from_parts(None, Vec::new(), name_servers);
|
||||
let mut resolver_builder =
|
||||
TokioResolver::builder_with_config(config, TokioConnectionProvider::default());
|
||||
|
||||
resolver_builder.options_mut().ip_strategy = LookupIpStrategy::Ipv4AndIpv6;
|
||||
// Cache successful responses for queries received by this resolver for 30 min minimum.
|
||||
resolver_builder.options_mut().positive_min_ttl = Some(Duration::from_secs(1800));
|
||||
resolver_builder = resolver_builder.with_options(options);
|
||||
|
||||
Ok(resolver_builder.build())
|
||||
resolver_builder.build()
|
||||
}
|
||||
|
||||
fn filter_ipv4(nameservers: impl AsRef<[NameServerConfig]>) -> Vec<NameServerConfig> {
|
||||
nameservers
|
||||
.as_ref()
|
||||
.iter()
|
||||
.filter(|ns| ns.socket_addr.is_ipv4())
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn filter_ipv6(nameservers: impl AsRef<[NameServerConfig]>) -> Vec<NameServerConfig> {
|
||||
nameservers
|
||||
.as_ref()
|
||||
.iter()
|
||||
.filter(|ns| ns.socket_addr.is_ipv6())
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn default_nameserver_group() -> NameServerConfigGroup {
|
||||
let mut name_servers = NameServerConfigGroup::quad9_tls();
|
||||
name_servers.merge(NameServerConfigGroup::quad9_https());
|
||||
name_servers.merge(NameServerConfigGroup::cloudflare_tls());
|
||||
name_servers.merge(NameServerConfigGroup::cloudflare_https());
|
||||
name_servers
|
||||
}
|
||||
|
||||
fn default_nameserver_group_ipv4_only() -> NameServerConfigGroup {
|
||||
filter_ipv4(&default_nameserver_group() as &[NameServerConfig]).into()
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn default_nameserver_group_ipv6_only() -> NameServerConfigGroup {
|
||||
filter_ipv6(&default_nameserver_group() as &[NameServerConfig]).into()
|
||||
}
|
||||
|
||||
/// Create a new resolver with the default configuration, which reads from the system DNS config
|
||||
@@ -356,7 +469,12 @@ fn configure_and_build_resolver(
|
||||
/// addresses to work with "happy eyeballs" algorithm.
|
||||
fn new_resolver_system() -> Result<TokioResolver, ResolveError> {
|
||||
let mut resolver_builder = TokioResolver::builder_tokio()?;
|
||||
resolver_builder.options_mut().ip_strategy = LookupIpStrategy::Ipv4AndIpv6;
|
||||
|
||||
let options = HickoryDnsResolver::default_options();
|
||||
info!("building new fallback system resolver");
|
||||
debug!("fallback system resolver with {options:?}");
|
||||
|
||||
resolver_builder = resolver_builder.with_options(options);
|
||||
|
||||
Ok(resolver_builder.build())
|
||||
}
|
||||
@@ -365,11 +483,54 @@ fn new_default_static_fallback() -> StaticResolver {
|
||||
StaticResolver::new(constants::default_static_addrs())
|
||||
}
|
||||
|
||||
/// Do a trial resolution using each nameserver individually to test which are working and which
|
||||
/// fail to complete a lookup.
|
||||
async fn trial_nameservers_inner(
|
||||
name_servers: &[NameServerConfig],
|
||||
) -> Vec<(NameServerConfig, Result<(), ResolveError>)> {
|
||||
let mut trial_lookups = tokio::task::JoinSet::new();
|
||||
|
||||
for name_server in name_servers {
|
||||
let ns = name_server.clone();
|
||||
trial_lookups.spawn(async { (ns.clone(), trial_lookup(ns, "example.com").await) });
|
||||
}
|
||||
|
||||
trial_lookups.join_all().await
|
||||
}
|
||||
|
||||
/// Create an independent resolver that has only the provided nameserver and do one lookup for the
|
||||
/// provided query target.
|
||||
async fn trial_lookup(name_server: NameServerConfig, query: &str) -> Result<(), ResolveError> {
|
||||
debug!("running ns trial {name_server:?} query={query}");
|
||||
|
||||
let resolver = configure_and_build_resolver(vec![name_server]);
|
||||
|
||||
match tokio::time::timeout(DEFAULT_OVERALL_LOOKUP_TIMEOUT, resolver.ipv4_lookup(query)).await {
|
||||
Ok(Ok(_)) => Ok(()),
|
||||
Ok(Err(e)) => Err(e.into()),
|
||||
Err(_) => Err(ResolveError::Timeout),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use itertools::Itertools;
|
||||
use std::collections::HashMap;
|
||||
use std::{
|
||||
net::{IpAddr, Ipv4Addr, Ipv6Addr},
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
/// IP addresses guaranteed to fail attempts to resolve
|
||||
///
|
||||
/// Addresses drawn from blocks set off by RFC5737 (ipv4) and RFC3849 (ipv6)
|
||||
const GUARANTEED_BROKEN_IPS_1: &[IpAddr] = &[
|
||||
IpAddr::V4(Ipv4Addr::new(192, 0, 2, 1)),
|
||||
IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1)),
|
||||
IpAddr::V6(Ipv6Addr::new(0x2001, 0x0db8, 0, 0, 0, 0, 0, 0x1111)),
|
||||
IpAddr::V6(Ipv6Addr::new(0x2001, 0x0db8, 0, 0, 0, 0, 0, 0x1001)),
|
||||
];
|
||||
|
||||
#[tokio::test]
|
||||
async fn reqwest_with_custom_dns() {
|
||||
@@ -428,99 +589,172 @@ mod test {
|
||||
assert!(addrs.contains(&example_ip6));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod failure_test {
|
||||
use super::*;
|
||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
|
||||
// Test the nameserver trial functionality with mostly nameservers guaranteed to be broken and
|
||||
// one that should work.
|
||||
#[tokio::test]
|
||||
async fn trial_nameservers() {
|
||||
let good_cf_ip = IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1));
|
||||
|
||||
/// IP addresses guaranteed to fail attempts to resolve
|
||||
///
|
||||
/// Addresses drawn from blocks set off by RFC5737 (ipv4) and RFC3849 (ipv6)
|
||||
const GUARANTEED_BROKEN_IPS_1: &[IpAddr] = &[
|
||||
IpAddr::V4(Ipv4Addr::new(192, 0, 2, 1)),
|
||||
IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1)),
|
||||
IpAddr::V6(Ipv6Addr::new(0x2001, 0x0db8, 0, 0, 0, 0, 0, 0x1111)),
|
||||
IpAddr::V6(Ipv6Addr::new(0x2001, 0x0db8, 0, 0, 0, 0, 0, 0x1001)),
|
||||
];
|
||||
let mut ns_ips = GUARANTEED_BROKEN_IPS_1.to_vec();
|
||||
ns_ips.push(good_cf_ip);
|
||||
|
||||
// Create a resolver that behaves the same as the custom configured router, except for the fact
|
||||
// that it is guaranteed to fail.
|
||||
fn build_broken_resolver() -> Result<TokioResolver, ResolveError> {
|
||||
info!("building new faulty resolver");
|
||||
|
||||
let mut broken_ns_group = NameServerConfigGroup::from_ips_tls(
|
||||
GUARANTEED_BROKEN_IPS_1,
|
||||
853,
|
||||
"cloudflare-dns.com".to_string(),
|
||||
true,
|
||||
);
|
||||
let broken_ns_https = NameServerConfigGroup::from_ips_https(
|
||||
GUARANTEED_BROKEN_IPS_1,
|
||||
&ns_ips,
|
||||
443,
|
||||
"cloudflare-dns.com".to_string(),
|
||||
true,
|
||||
);
|
||||
broken_ns_group.merge(broken_ns_https);
|
||||
|
||||
configure_and_build_resolver(broken_ns_group)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn dns_lookup_failures() -> Result<(), ResolveError> {
|
||||
let time_start = std::time::Instant::now();
|
||||
|
||||
let r = OnceCell::new();
|
||||
r.set(build_broken_resolver().expect("failed to build resolver"))
|
||||
.expect("broken resolver init error");
|
||||
let inner = configure_and_build_resolver(broken_ns_https);
|
||||
|
||||
// create a new resolver that won't mess with the shared resolver used by other tests
|
||||
let resolver = HickoryDnsResolver {
|
||||
dont_use_shared: true,
|
||||
state: Arc::new(r),
|
||||
overall_dns_timeout: Duration::from_secs(5),
|
||||
..Default::default()
|
||||
};
|
||||
build_broken_resolver()?;
|
||||
let domain = "ifconfig.me";
|
||||
let result = resolver.resolve_str(domain).await;
|
||||
assert!(result.is_err_and(|e| matches!(e, ResolveError::Timeout)));
|
||||
|
||||
let duration = time_start.elapsed();
|
||||
assert!(duration < resolver.overall_dns_timeout + Duration::from_secs(1));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fallback_to_static() -> Result<(), ResolveError> {
|
||||
let r = OnceCell::new();
|
||||
r.set(build_broken_resolver().expect("failed to build resolver"))
|
||||
.expect("broken resolver init error");
|
||||
|
||||
// create a new resolver that won't mess with the shared resolver used by other tests
|
||||
let resolver = HickoryDnsResolver {
|
||||
dont_use_shared: true,
|
||||
state: Arc::new(r),
|
||||
use_shared: false,
|
||||
state: Arc::new(OnceCell::with_value(inner)),
|
||||
static_base: Some(Default::default()),
|
||||
overall_dns_timeout: Duration::from_secs(5),
|
||||
..Default::default()
|
||||
};
|
||||
build_broken_resolver()?;
|
||||
|
||||
// successful lookup using fallback to static resolver
|
||||
let domain = "nymvpn.com";
|
||||
let _ = resolver
|
||||
.resolve_str(domain)
|
||||
.await
|
||||
.expect("failed to resolve address in static lookup");
|
||||
let name_servers = resolver.state.get().unwrap().config().name_servers();
|
||||
for (ns, result) in trial_nameservers_inner(name_servers).await {
|
||||
if ns.socket_addr.ip() == good_cf_ip {
|
||||
assert!(result.is_ok())
|
||||
} else {
|
||||
assert!(result.is_err())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// unsuccessful lookup - primary times out, and not in
|
||||
let domain = "non-existent.nymtech.net";
|
||||
let result = resolver.resolve_str(domain).await;
|
||||
assert!(result.is_err_and(|e| matches!(e, ResolveError::Timeout)));
|
||||
mod failure_test {
|
||||
use super::*;
|
||||
|
||||
Ok(())
|
||||
// Create a resolver that behaves the same as the custom configured router, except for the fact
|
||||
// that it is guaranteed to fail.
|
||||
fn build_broken_resolver() -> Result<TokioResolver, ResolveError> {
|
||||
info!("building new faulty resolver");
|
||||
|
||||
let mut broken_ns_group = NameServerConfigGroup::from_ips_tls(
|
||||
GUARANTEED_BROKEN_IPS_1,
|
||||
853,
|
||||
"cloudflare-dns.com".to_string(),
|
||||
true,
|
||||
);
|
||||
let broken_ns_https = NameServerConfigGroup::from_ips_https(
|
||||
GUARANTEED_BROKEN_IPS_1,
|
||||
443,
|
||||
"cloudflare-dns.com".to_string(),
|
||||
true,
|
||||
);
|
||||
broken_ns_group.merge(broken_ns_https);
|
||||
|
||||
Ok(configure_and_build_resolver(broken_ns_group))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn dns_lookup_failures() -> Result<(), ResolveError> {
|
||||
let time_start = std::time::Instant::now();
|
||||
|
||||
let r = OnceCell::new();
|
||||
r.set(build_broken_resolver().expect("failed to build resolver"))
|
||||
.expect("broken resolver init error");
|
||||
|
||||
// create a new resolver that won't mess with the shared resolver used by other tests
|
||||
let resolver = HickoryDnsResolver {
|
||||
use_shared: false,
|
||||
state: Arc::new(r),
|
||||
overall_dns_timeout: Duration::from_secs(5),
|
||||
..Default::default()
|
||||
};
|
||||
build_broken_resolver()?;
|
||||
let domain = "ifconfig.me";
|
||||
let result = resolver.resolve_str(domain).await;
|
||||
assert!(result.is_err_and(|e| matches!(e, ResolveError::Timeout)));
|
||||
|
||||
let duration = time_start.elapsed();
|
||||
assert!(duration < resolver.overall_dns_timeout + Duration::from_secs(1));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fallback_to_static() -> Result<(), ResolveError> {
|
||||
let r = OnceCell::new();
|
||||
r.set(build_broken_resolver().expect("failed to build resolver"))
|
||||
.expect("broken resolver init error");
|
||||
|
||||
// create a new resolver that won't mess with the shared resolver used by other tests
|
||||
let resolver = HickoryDnsResolver {
|
||||
use_shared: false,
|
||||
state: Arc::new(r),
|
||||
static_base: Some(Default::default()),
|
||||
overall_dns_timeout: Duration::from_secs(5),
|
||||
..Default::default()
|
||||
};
|
||||
build_broken_resolver()?;
|
||||
|
||||
// successful lookup using fallback to static resolver
|
||||
let domain = "nymvpn.com";
|
||||
let _ = resolver
|
||||
.resolve_str(domain)
|
||||
.await
|
||||
.expect("failed to resolve address in static lookup");
|
||||
|
||||
// unsuccessful lookup - primary times out, and not in static table
|
||||
let domain = "non-existent.nymtech.net";
|
||||
let result = resolver.resolve_str(domain).await;
|
||||
assert!(result.is_err_and(|e| matches!(e, ResolveError::Timeout)));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn default_resolver_uses_ipv4_only_nameservers() {
|
||||
let resolver = HickoryDnsResolver::thread_resolver();
|
||||
resolver
|
||||
.active_name_servers()
|
||||
.iter()
|
||||
.all(|cfg| cfg.socket_addr.is_ipv4());
|
||||
|
||||
SHARED_RESOLVER
|
||||
.active_name_servers()
|
||||
.iter()
|
||||
.all(|cfg| cfg.socket_addr.is_ipv4());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore]
|
||||
// this test is dependent of external network setup -- i.e. blocking all traffic to the default
|
||||
// resolvers. Otherwise the default resolvers will succeed without using the static fallback,
|
||||
// making the test pointless
|
||||
async fn dns_lookup_failure_on_shared() -> Result<(), ResolveError> {
|
||||
let time_start = Instant::now();
|
||||
let r = OnceCell::new();
|
||||
r.set(build_broken_resolver().expect("failed to build resolver"))
|
||||
.expect("broken resolver init error");
|
||||
|
||||
// create a new resolver that won't mess with the shared resolver used by other tests
|
||||
let resolver = HickoryDnsResolver::default();
|
||||
|
||||
// successful lookup using fallback to static resolver
|
||||
let domain = "rpc.nymtech.net";
|
||||
let _ = resolver
|
||||
.resolve_str(domain)
|
||||
.await
|
||||
.expect("failed to resolve address in static lookup");
|
||||
|
||||
println!(
|
||||
"{}ms resolved {domain}",
|
||||
(Instant::now() - time_start).as_millis()
|
||||
);
|
||||
|
||||
// unsuccessful lookup - primary times out, and not in static table
|
||||
let domain = "non-existent.nymtech.net";
|
||||
let result = resolver.resolve_str(domain).await;
|
||||
assert!(result.is_err());
|
||||
// assert!(result.is_err_and(|e| matches!(e, ResolveError::Timeout)));
|
||||
// assert!(result.is_err_and(|e| matches!(e, ResolveError::ResolveError(e) if e.is_nx_domain())));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -175,7 +175,7 @@ mod user_agent;
|
||||
pub use user_agent::UserAgent;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
mod dns;
|
||||
pub mod dns;
|
||||
mod path;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
|
||||
@@ -91,7 +91,7 @@ fn sanitizing_urls() {
|
||||
#[tokio::test]
|
||||
async fn api_client_retry() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let client = ClientBuilder::new_with_urls(vec![
|
||||
"http://broken.nym.test".parse()?, // This will fail because of DNS (rotate)
|
||||
"http://broken.nym.test".parse()?, // This should fail because of DNS NXDomain (rotate)
|
||||
"http://127.0.0.1:9".parse()?, // This will fail because of TCP refused (rotate)
|
||||
"https://httpbin.org/status/200".parse()?, // This should succeed
|
||||
])?
|
||||
|
||||
@@ -28,6 +28,12 @@ pub struct EntryDetails {
|
||||
pub clients_wss_port: Option<u16>,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for EntryDetails {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{:?}", self)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
pub struct SupportedRoles {
|
||||
pub mixnode: bool,
|
||||
@@ -59,36 +65,6 @@ pub struct RoutingNode {
|
||||
}
|
||||
|
||||
impl RoutingNode {
|
||||
pub fn ws_entry_address_tls(&self) -> Option<String> {
|
||||
let entry = self.entry.as_ref()?;
|
||||
let hostname = entry.hostname.as_ref()?;
|
||||
let wss_port = entry.clients_wss_port?;
|
||||
|
||||
Some(format!("wss://{hostname}:{wss_port}"))
|
||||
}
|
||||
|
||||
pub fn ws_entry_address_no_tls(&self, prefer_ipv6: bool) -> Option<String> {
|
||||
let entry = self.entry.as_ref()?;
|
||||
|
||||
if let Some(hostname) = entry.hostname.as_ref() {
|
||||
return Some(format!("ws://{hostname}:{}", entry.clients_ws_port));
|
||||
}
|
||||
|
||||
if prefer_ipv6 && let Some(ipv6) = entry.ip_addresses.iter().find(|ip| ip.is_ipv6()) {
|
||||
return Some(format!("ws://{ipv6}:{}", entry.clients_ws_port));
|
||||
}
|
||||
|
||||
let any_ip = entry.ip_addresses.first()?;
|
||||
Some(format!("ws://{any_ip}:{}", entry.clients_ws_port))
|
||||
}
|
||||
|
||||
pub fn ws_entry_address(&self, prefer_ipv6: bool) -> Option<String> {
|
||||
if let Some(tls) = self.ws_entry_address_tls() {
|
||||
return Some(tls);
|
||||
}
|
||||
self.ws_entry_address_no_tls(prefer_ipv6)
|
||||
}
|
||||
|
||||
pub fn identity(&self) -> ed25519::PublicKey {
|
||||
self.identity_key
|
||||
}
|
||||
|
||||
@@ -453,7 +453,7 @@ impl PacketPreparer {
|
||||
for gateway in &gateways_to_test_details {
|
||||
let recipient = self.create_packet_sender(gateway);
|
||||
let gateway_identity = gateway.identity_key;
|
||||
let gateway_address = gateway.ws_entry_address(false);
|
||||
let gateway_address = gateway.entry;
|
||||
|
||||
// the unwrap here is fine as:
|
||||
// 1. the topology is definitely valid (otherwise we wouldn't be here)
|
||||
|
||||
Generated
+2
-2
@@ -2864,9 +2864,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "hickory-resolver"
|
||||
version = "0.25.1"
|
||||
version = "0.25.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a128410b38d6f931fcc6ca5c107a3b02cabd6c05967841269a4ad65d23c44331"
|
||||
checksum = "dc62a9a99b0bfb44d2ab95a7208ac952d31060efc16241c87eaf36406fecf87a"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"futures-util",
|
||||
|
||||
Reference in New Issue
Block a user