Compare commits

...

1 Commits

Author SHA1 Message Date
jmwample dba9de84fd initial implementation pass 2025-12-16 08:52:04 -07:00
29 changed files with 278 additions and 448 deletions
Generated
+4 -1
View File
@@ -5265,7 +5265,9 @@ dependencies = [
"nym-crypto",
"nym-gateway-client",
"nym-gateway-requests",
"nym-topology",
"serde",
"serde_json",
"sqlx",
"thiserror 2.0.17",
"time",
@@ -5912,12 +5914,12 @@ dependencies = [
"nym-credentials-interface",
"nym-crypto",
"nym-gateway-requests",
"nym-http-api-client",
"nym-network-defaults",
"nym-pemstore",
"nym-sphinx",
"nym-statistics-common",
"nym-task",
"nym-topology",
"nym-validator-client",
"rand 0.8.5",
"serde",
@@ -7379,6 +7381,7 @@ dependencies = [
"time",
"tracing",
"tsify",
"url",
"wasm-bindgen",
"wasm-utils",
]
@@ -10,6 +10,7 @@ rust-version.workspace = true
[dependencies]
async-trait.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
thiserror.workspace = true
time.workspace = true
tokio = { workspace = true, features = ["sync"] }
@@ -20,10 +21,11 @@ zeroize = { workspace = true, features = ["zeroize_derive"] }
nym-crypto = { path = "../../crypto", features = ["asymmetric"] }
nym-gateway-requests = { path = "../../gateway-requests" }
nym-gateway-client = { path = "../../client-libs/gateway-client" }
nym-topology = { path = "../../topology", features = ["persistence"] }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx]
workspace = true
features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate", "time"]
features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate", "time", "json"]
optional = true
[build-dependencies]
@@ -34,6 +36,7 @@ sqlx = { workspace = true, features = [
"sqlite",
"macros",
"migrate",
"json",
] }
[features]
@@ -0,0 +1,21 @@
/*
* Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: Apache-2.0
*/
CREATE TABLE remote_gateway_details_temp
(
gateway_id_bs58 TEXT NOT NULL UNIQUE PRIMARY KEY REFERENCES registered_gateway (gateway_id_bs58),
derived_aes256_gcm_siv_key BLOB NOT NULL,
gateway_details TEXT NOT NULL CHECK (json_valid(gateway_details)),
expiration_timestamp DATETIME NOT NULL
);
-- keep none, the gateways listener URL does not contain the gateway details information
-- INSERT INTO remote_gateway_details_temp SELECT gateway_id_bs58, derived_aes256_gcm_siv_key, gateway_listener, NULL, datetime(0, 'unixepoch') FROM remote_gateway_details WHERE derived_aes256_gcm_siv_key IS NOT NULL;
DROP TABLE remote_gateway_details;
ALTER TABLE remote_gateway_details_temp RENAME TO remote_gateway_details;
-- delete registrations with no key
DELETE FROM registered_gateway WHERE gateway_id_bs58 NOT IN ( SELECT gateway_id_bs58 FROM remote_gateway_details);
@@ -156,15 +156,16 @@ impl StorageManager {
&self,
remote: &RawRemoteGatewayDetails,
) -> Result<(), sqlx::Error> {
let details =
serde_json::to_string(&remote.published_data.gateway_details).expect("AHHHHHHHHH");
sqlx::query!(
r#"
INSERT INTO remote_gateway_details(gateway_id_bs58, derived_aes256_gcm_siv_key, gateway_listener, fallback_listener, expiration_timestamp)
VALUES (?, ?, ?, ?, ?)
INSERT INTO remote_gateway_details(gateway_id_bs58, derived_aes256_gcm_siv_key, gateway_details, expiration_timestamp)
VALUES (?, ?, ?, ?)
"#,
remote.gateway_id_bs58,
remote.derived_aes256_gcm_siv_key,
remote.published_data.gateway_listener,
remote.published_data.fallback_listener,
details,
remote.published_data.expiration_timestamp
)
.execute(&self.connection_pool)
@@ -177,12 +178,12 @@ impl StorageManager {
gateway_id_bs58: &str,
published_data: &RawGatewayPublishedData,
) -> Result<(), sqlx::Error> {
let details = serde_json::to_string(&published_data.gateway_details).expect("AHHHHHHHHH");
sqlx::query!(
r#"
UPDATE remote_gateway_details SET gateway_listener = ?, fallback_listener = ?, expiration_timestamp = ? WHERE gateway_id_bs58 = ?
UPDATE remote_gateway_details SET gateway_details = ?, expiration_timestamp = ? WHERE gateway_id_bs58 = ?
"#,
published_data.gateway_listener,
published_data.fallback_listener,
details,
published_data.expiration_timestamp,
gateway_id_bs58
)
@@ -3,6 +3,7 @@
use nym_crypto::asymmetric::ed25519::Ed25519RecoveryError;
use nym_gateway_requests::shared_key::SharedKeyConversionError;
use serde_json::Error as JsonError;
use thiserror::Error;
#[derive(Debug, Error)]
@@ -29,23 +30,11 @@ pub enum BadGateway {
#[error("could not find any valid shared keys for gateway {gateway_id}")]
MissingSharedKey { gateway_id: String },
#[error(
"the listening address of gateway {gateway_id} ({raw_listener}) is malformed: {source}"
)]
MalformedListener {
gateway_id: String,
raw_listener: String,
#[error("the listening address ({raw_details}) is malformed: {source}")]
MalformedDetailsNoId {
raw_details: String,
#[source]
source: url::ParseError,
},
#[error("the listening address ({raw_listener}) is malformed: {source}")]
MalformedListenerNoId {
raw_listener: String,
#[source]
source: url::ParseError,
source: JsonError,
},
}
@@ -3,15 +3,14 @@
use crate::BadGateway;
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::client::GatewayListeners;
use nym_gateway_requests::shared_key::SharedSymmetricKey;
use nym_topology::EntryDetails;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use std::sync::Arc;
use time::Duration;
use time::OffsetDateTime;
use url::Url;
use zeroize::{Zeroize, ZeroizeOnDrop};
pub const REMOTE_GATEWAY_TYPE: &str = "remote";
@@ -172,14 +171,14 @@ pub struct RegisteredGateway {
#[derive(Debug, Clone)]
pub struct GatewayPublishedData {
pub listeners: GatewayListeners,
pub details: EntryDetails,
pub expiration_timestamp: OffsetDateTime,
}
impl GatewayPublishedData {
pub fn new(listeners: GatewayListeners) -> GatewayPublishedData {
pub fn new(details: EntryDetails) -> GatewayPublishedData {
GatewayPublishedData {
listeners,
details,
expiration_timestamp: OffsetDateTime::now_utc() + GATEWAY_DETAILS_TTL,
}
}
@@ -188,16 +187,16 @@ impl GatewayPublishedData {
#[derive(Debug, Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))]
pub struct RawGatewayPublishedData {
pub gateway_listener: String,
pub fallback_listener: Option<String>,
#[cfg_attr(feature = "sqlx", sqlx(json))]
pub gateway_details: EntryDetails,
pub expiration_timestamp: OffsetDateTime,
}
impl<'a> From<&'a GatewayPublishedData> for RawGatewayPublishedData {
fn from(value: &'a GatewayPublishedData) -> Self {
Self {
gateway_listener: value.listeners.primary.to_string(),
fallback_listener: value.listeners.fallback.as_ref().map(|uri| uri.to_string()),
// fallback_listener: value.listeners.fallback.as_ref().map(|uri| uri.to_string()),
gateway_details: value.details.clone(),
expiration_timestamp: value.expiration_timestamp,
}
}
@@ -207,28 +206,15 @@ impl TryFrom<RawGatewayPublishedData> for GatewayPublishedData {
type Error = BadGateway;
fn try_from(value: RawGatewayPublishedData) -> Result<Self, Self::Error> {
let gateway_listener: Url = Url::parse(&value.gateway_listener).map_err(|source| {
BadGateway::MalformedListenerNoId {
raw_listener: value.gateway_listener.clone(),
source,
}
})?;
let fallback_listener = value
.fallback_listener
.as_ref()
.map(|uri| {
Url::parse(uri).map_err(|source| BadGateway::MalformedListenerNoId {
raw_listener: uri.to_owned(),
source,
})
})
.transpose()?;
// let details = serde_json::from_str(&value.gateway_listener).map_err(|source| {
// BadGateway::MalformedDetailsNoId {
// raw_details: value.gateway_listener.clone(),
// source,
// }
// })?;
Ok(GatewayPublishedData {
listeners: GatewayListeners {
primary: gateway_listener,
fallback: fallback_listener,
},
details: value.gateway_details,
expiration_timestamp: value.expiration_timestamp,
})
}
@@ -87,7 +87,6 @@ where
user_chosen_gateway_id.map(|id| id.to_base58_string()),
Some(common_args.latency_based_selection),
common_args.force_tls_gateway,
false,
);
tracing::debug!("Gateway selection specification: {selection_spec:?}");
@@ -168,7 +167,6 @@ where
identity: gateway_details.gateway_id,
active: common_args.set_active,
typ: gateway_registration.details.typ().to_string(),
endpoint: Some(gateway_details.published_data.listeners.primary.clone()),
fallback_endpoint: gateway_details.published_data.listeners.fallback.clone(),
endpoint: Some(gateway_details.published_data.details.clone()),
})
}
@@ -140,7 +140,6 @@ where
user_chosen_gateway_id.map(|id| id.to_base58_string()),
Some(common_args.latency_based_selection),
common_args.force_tls_gateway,
false,
);
tracing::debug!("Gateway selection specification: {selection_spec:?}");
@@ -56,8 +56,7 @@ where
identity: remote_details.gateway_id,
active: active_gateway == Some(remote_details.gateway_id),
typ: GatewayType::Remote.to_string(),
endpoint: Some(remote_details.published_data.listeners.primary.clone()),
fallback_endpoint: remote_details.published_data.listeners.fallback.clone(),
endpoint: Some(remote_details.published_data.details.clone()),
}),
GatewayDetails::Custom(_) => info.push(GatewayInfo {
registration: gateway.registration_timestamp,
@@ -65,7 +64,6 @@ where
active: active_gateway == Some(gateway.details.gateway_id()),
typ: gateway.details.typ().to_string(),
endpoint: None,
fallback_endpoint: None,
}),
};
}
+2 -6
View File
@@ -2,10 +2,10 @@
// SPDX-License-Identifier: Apache-2.0
use nym_crypto::asymmetric::ed25519;
use nym_topology::EntryDetails;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
use time::OffsetDateTime;
use url::Url;
#[derive(Serialize, Deserialize)]
pub struct GatewayInfo {
@@ -14,8 +14,7 @@ pub struct GatewayInfo {
pub active: bool,
pub typ: String,
pub endpoint: Option<Url>,
pub fallback_endpoint: Option<Url>,
pub endpoint: Option<EntryDetails>,
}
impl Display for GatewayInfo {
@@ -31,9 +30,6 @@ impl Display for GatewayInfo {
if let Some(endpoint) = &self.endpoint {
write!(f, " endpoint: {endpoint}")?;
}
if let Some(fallback_endpoint) = &self.fallback_endpoint {
write!(f, " fallback: {fallback_endpoint}")?;
}
Ok(())
}
}
@@ -554,7 +554,7 @@ where
shutdown_tracker.clone_shutdown_token(),
)
} else {
let cfg = GatewayConfig::new(details.gateway_id, details.published_data.listeners);
let cfg = GatewayConfig::new(details.gateway_id, details.published_data.details);
GatewayClient::new(
GatewayClientConfig::new_default()
.with_disabled_credentials_mode(config.client.disabled_credentials_mode)
+2 -9
View File
@@ -43,15 +43,8 @@ pub enum ClientCoreError {
#[error("no gateway with id: {0}")]
NoGatewayWithId(String),
#[error("Invalid URL: {0}")]
InvalidUrl(String),
#[error("node doesn't advertise ip addresses : {0}")]
MissingIpAddress(String),
#[cfg(not(target_arch = "wasm32"))]
#[error("resolution failed: {0}")]
ResolutionFailed(#[from] nym_http_api_client::ResolveError),
#[error("Invalid Endpoint: {0}")]
InvalidEndpoint(String),
#[error("no gateways on network")]
NoGatewaysOnNetwork,
+15 -13
View File
@@ -5,7 +5,6 @@ use crate::error::ClientCoreError;
use crate::init::types::RegistrationResult;
use futures::{SinkExt, StreamExt};
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::client::GatewayListeners;
use nym_gateway_client::GatewayClient;
use nym_topology::node::RoutingNode;
use nym_validator_client::client::{IdentityKeyRef, NymApiClientExt};
@@ -22,7 +21,7 @@ use url::Url;
#[cfg(not(target_arch = "wasm32"))]
use crate::init::websockets::connect_async;
use nym_topology::NodeId;
use nym_topology::{EntryDetails, NodeId};
#[cfg(not(target_arch = "wasm32"))]
use tokio::net::TcpStream;
#[cfg(not(target_arch = "wasm32"))]
@@ -56,7 +55,7 @@ const PING_TIMEOUT: Duration = Duration::from_millis(1000);
pub trait ConnectableGateway {
fn node_id(&self) -> NodeId;
fn identity(&self) -> ed25519::PublicKey;
fn clients_address(&self, prefer_ipv6: bool) -> Option<String>;
fn endpoint_details(&self) -> Option<EntryDetails>;
fn is_wss(&self) -> bool;
}
@@ -69,8 +68,8 @@ impl ConnectableGateway for RoutingNode {
self.identity_key
}
fn clients_address(&self, prefer_ipv6: bool) -> Option<String> {
self.ws_entry_address(prefer_ipv6)
fn endpoint_details(&self) -> Option<EntryDetails> {
self.entry.clone()
}
fn is_wss(&self) -> bool {
@@ -192,7 +191,7 @@ pub async fn gateways_for_init(
}
#[cfg(not(target_arch = "wasm32"))]
async fn connect(endpoint: &str) -> Result<WsConn, ClientCoreError> {
async fn connect(endpoint: &EntryDetails) -> Result<WsConn, ClientCoreError> {
match tokio::time::timeout(CONN_TIMEOUT, connect_async(endpoint)).await {
Err(_elapsed) => Err(ClientCoreError::GatewayConnectionTimeout),
Ok(Err(conn_failure)) => Err(conn_failure),
@@ -201,25 +200,28 @@ async fn connect(endpoint: &str) -> Result<WsConn, ClientCoreError> {
}
#[cfg(target_arch = "wasm32")]
async fn connect(endpoint: &str) -> Result<WsConn, ClientCoreError> {
JSWebsocket::new(endpoint).map_err(|_| ClientCoreError::GatewayJsConnectionFailure)
async fn connect(endpoint: &EntryDetails) -> Result<WsConn, ClientCoreError> {
let uri = endpoint
.ws_entry_address(false)
.ok_or(ClientCoreError::InvalidEndpoint(endpoint.to_string()))?;
JSWebsocket::new(uri.as_ref()).map_err(|_| ClientCoreError::GatewayJsConnectionFailure)
}
async fn measure_latency<G>(gateway: &G) -> Result<GatewayWithLatency<'_, G>, ClientCoreError>
where
G: ConnectableGateway,
{
let Some(addr) = gateway.clients_address(false) else {
let Some(endpoint) = gateway.endpoint_details() else {
return Err(ClientCoreError::UnsupportedEntry {
id: gateway.node_id(),
identity: gateway.identity().to_string(),
});
};
trace!(
"establishing connection to {} ({addr})...",
"establishing connection to {} ({endpoint})...",
gateway.identity(),
);
let mut stream = connect(&addr).await?;
let mut stream = connect(&endpoint).await?;
let mut results = Vec::new();
for _ in 0..MEASUREMENTS {
@@ -380,12 +382,12 @@ pub(super) fn get_specified_gateway(
pub(super) async fn register_with_gateway(
gateway_id: ed25519::PublicKey,
gateway_listeners: GatewayListeners,
gateway_details: EntryDetails,
our_identity: Arc<ed25519::KeyPair>,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
) -> Result<RegistrationResult, ClientCoreError> {
let mut gateway_client = GatewayClient::new_init(
gateway_listeners,
gateway_details.clone(),
gateway_id,
our_identity.clone(),
#[cfg(unix)]
+13 -21
View File
@@ -71,28 +71,21 @@ where
let mut rng = OsRng;
let selected_gateway = match selection_specification {
GatewaySelectionSpecification::UniformRemote {
must_use_tls,
no_hostname,
} => {
GatewaySelectionSpecification::UniformRemote { must_use_tls } => {
let gateway = uniformly_random_gateway(&mut rng, &available_gateways, must_use_tls)?;
SelectedGateway::from_topology_node(gateway, must_use_tls, no_hostname)?
SelectedGateway::from_topology_node(gateway, must_use_tls)?
}
GatewaySelectionSpecification::RemoteByLatency {
must_use_tls,
no_hostname,
} => {
GatewaySelectionSpecification::RemoteByLatency { must_use_tls } => {
let gateway =
choose_gateway_by_latency(&mut rng, &available_gateways, must_use_tls).await?;
SelectedGateway::from_topology_node(gateway, must_use_tls, no_hostname)?
SelectedGateway::from_topology_node(gateway, must_use_tls)?
}
GatewaySelectionSpecification::Specified {
must_use_tls,
no_hostname,
identity,
} => {
let gateway = get_specified_gateway(&identity, &available_gateways, must_use_tls)?;
SelectedGateway::from_topology_node(gateway, must_use_tls, no_hostname)?
SelectedGateway::from_topology_node(gateway, must_use_tls)?
}
GatewaySelectionSpecification::Custom {
gateway_identity,
@@ -113,14 +106,14 @@ where
SelectedGateway::Remote {
gateway_id,
gateway_listeners,
gateway_details,
} => {
// if we're using a 'normal' gateway setup, do register
let our_identity = client_keys.identity_keypair();
let registration = helpers::register_with_gateway(
gateway_id,
gateway_listeners.clone(),
gateway_details.clone(),
our_identity,
#[cfg(unix)]
connection_fd_callback,
@@ -130,7 +123,7 @@ where
GatewayDetails::new_remote(
gateway_id,
registration.shared_keys,
GatewayPublishedData::new(gateway_listeners),
GatewayPublishedData::new(gateway_details),
),
Some(registration.authenticated_ephemeral_client),
)
@@ -161,7 +154,6 @@ pub async fn refresh_gateway_published_data<D>(
registration: GatewayRegistration,
available_gateways: Vec<RoutingNode>,
must_use_tls: bool,
no_hostname: bool,
) -> Result<(), ClientCoreError>
where
D: GatewaysDetailsStore,
@@ -171,19 +163,19 @@ where
tracing::trace!("Updating gateway details : {gateway_id}");
let gateway = get_specified_gateway(&gateway_id, &available_gateways, must_use_tls)?;
let selected_gateway = SelectedGateway::from_topology_node(gateway, must_use_tls, no_hostname)?;
let selected_gateway = SelectedGateway::from_topology_node(gateway, must_use_tls)?;
let new_gateway_listeners = match selected_gateway {
let new_gateway_details = match selected_gateway {
SelectedGateway::Remote {
gateway_listeners, ..
} => gateway_listeners,
gateway_details, ..
} => gateway_details,
SelectedGateway::Custom { .. } => {
// this should not happen, as `from_topology_node` returns a Remote
Err(ClientCoreError::UnexpectedCustomGatewaySelection)?
}
};
let new_published_data = GatewayPublishedData::new(new_gateway_listeners);
let new_published_data = GatewayPublishedData::new(new_gateway_details);
// update gateway details
update_stored_published_data_gateway(
+28 -70
View File
@@ -10,10 +10,11 @@ use nym_client_core_gateways_storage::{
GatewayRegistration, GatewaysDetailsStore, RemoteGatewayDetails,
};
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::client::{GatewayListeners, InitGatewayClient};
use nym_gateway_client::client::InitGatewayClient;
use nym_gateway_client::SharedSymmetricKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_topology::node::RoutingNode;
use nym_topology::EntryDetails;
use nym_validator_client::client::IdentityKey;
use serde::Serialize;
use std::fmt::{Debug, Display};
@@ -21,13 +22,12 @@ use std::fmt::{Debug, Display};
use std::os::fd::RawFd;
use std::sync::Arc;
use time::OffsetDateTime;
use url::Url;
pub enum SelectedGateway {
Remote {
gateway_id: ed25519::PublicKey,
gateway_listeners: GatewayListeners,
gateway_details: EntryDetails,
},
Custom {
gateway_id: ed25519::PublicKey,
@@ -39,52 +39,33 @@ impl SelectedGateway {
pub fn from_topology_node(
node: RoutingNode,
must_use_tls: bool,
no_hostname: bool,
) -> Result<Self, ClientCoreError> {
// for now, let's use 'old' behaviour, if you want to change it, you can pass it up the enum stack yourself : )
let prefer_ipv6 = false;
let (gateway_listener, fallback_listener) = if must_use_tls {
// WSS main, no fallback
let primary =
node.ws_entry_address_tls()
.ok_or(ClientCoreError::UnsupportedWssProtocol {
gateway: node.identity_key.to_base58_string(),
})?;
(primary, None)
} else {
let (maybe_primary, fallback) =
node.ws_entry_address_with_fallback(prefer_ipv6, no_hostname);
(
maybe_primary.ok_or(ClientCoreError::UnsupportedEntry {
id: node.node_id,
identity: node.identity_key.to_base58_string(),
})?,
fallback,
)
};
let gateway_details = node.entry.ok_or(ClientCoreError::UnsupportedEntry {
id: node.node_id,
identity: node.identity_key.to_base58_string(),
})?;
let fallback_listener_url = fallback_listener.and_then(|address| {
Url::parse(&address)
.inspect_err(|err| {
tracing::warn!("Malformed fallback listener, none will be used : {err}")
})
.ok()
});
if must_use_tls
&& (gateway_details.hostname.is_none() || gateway_details.clients_wss_port.is_none())
{
return Err(ClientCoreError::UnsupportedWssProtocol {
gateway: node.identity_key.to_base58_string(),
});
}
let gateway_listener_url =
Url::parse(&gateway_listener).map_err(|source| ClientCoreError::MalformedListener {
gateway_id: node.identity_key.to_base58_string(),
raw_listener: gateway_listener,
source,
})?;
if prefer_ipv6 && gateway_details.ip_addresses.iter().all(|i| i.is_ipv4()) {
return Err(ClientCoreError::UnsupportedEntry {
id: node.node_id,
identity: node.identity_key.to_base58_string(),
});
}
Ok(SelectedGateway::Remote {
gateway_id: node.identity_key,
gateway_listeners: GatewayListeners {
primary: gateway_listener_url,
fallback: fallback_listener_url,
},
gateway_details,
})
}
@@ -174,22 +155,15 @@ impl InitialisationResult {
#[derive(Clone, Debug)]
pub enum GatewaySelectionSpecification {
/// Uniformly choose a random remote gateway.
UniformRemote {
must_use_tls: bool,
no_hostname: bool,
},
UniformRemote { must_use_tls: bool },
/// Should the new, remote, gateway be selected based on latency.
RemoteByLatency {
must_use_tls: bool,
no_hostname: bool,
},
RemoteByLatency { must_use_tls: bool },
/// Gateway with this specific identity should be chosen.
// JS: I don't really like the name of this enum variant but couldn't think of anything better at the time
Specified {
must_use_tls: bool,
no_hostname: bool,
identity: IdentityKey,
},
@@ -205,7 +179,6 @@ impl Default for GatewaySelectionSpecification {
fn default() -> Self {
GatewaySelectionSpecification::UniformRemote {
must_use_tls: false,
no_hostname: false,
}
}
}
@@ -215,24 +188,16 @@ impl GatewaySelectionSpecification {
gateway_identity: Option<String>,
latency_based_selection: Option<bool>,
must_use_tls: bool,
no_hostname: bool,
) -> Self {
if let Some(identity) = gateway_identity {
GatewaySelectionSpecification::Specified {
identity,
must_use_tls,
no_hostname,
}
} else if let Some(true) = latency_based_selection {
GatewaySelectionSpecification::RemoteByLatency {
must_use_tls,
no_hostname,
}
GatewaySelectionSpecification::RemoteByLatency { must_use_tls }
} else {
GatewaySelectionSpecification::UniformRemote {
must_use_tls,
no_hostname,
}
GatewaySelectionSpecification::UniformRemote { must_use_tls }
}
}
}
@@ -354,8 +319,7 @@ pub struct InitResults {
pub identity_key: String,
pub encryption_key: String,
pub gateway_id: String,
pub gateway_listener: String,
pub fallback_listener: Option<String>,
pub gateway_details: EntryDetails,
pub gateway_registration: OffsetDateTime,
pub address: Recipient,
}
@@ -373,13 +337,7 @@ impl InitResults {
identity_key: address.identity().to_base58_string(),
encryption_key: address.encryption_key().to_base58_string(),
gateway_id: gateway.gateway_id.to_base58_string(),
gateway_listener: gateway.published_data.listeners.primary.to_string(),
fallback_listener: gateway
.published_data
.listeners
.fallback
.as_ref()
.map(|uri| uri.to_string()),
gateway_details: gateway.published_data.details.clone(),
gateway_registration: registration,
address,
}
@@ -393,7 +351,7 @@ impl Display for InitResults {
writeln!(f, "Identity key: {}", self.identity_key)?;
writeln!(f, "Encryption: {}", self.encryption_key)?;
writeln!(f, "Gateway ID: {}", self.gateway_id)?;
writeln!(f, "Gateway: {}", self.gateway_listener)?;
writeln!(f, "Gateway: {}", self.gateway_details)?;
write!(f, "Registered at: {}", self.gateway_registration)
}
}
+12 -24
View File
@@ -1,43 +1,31 @@
use crate::error::ClientCoreError;
use nym_http_api_client::HickoryDnsResolver;
#[cfg(not(target_arch = "wasm32"))]
use nym_topology::EntryDetails;
use tokio::net::TcpStream;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tungstenite::handshake::client::Response;
use url::{Host, Url};
use std::net::SocketAddr;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) async fn connect_async(
endpoint: &str,
endpoint: &EntryDetails,
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), ClientCoreError> {
let resolver = HickoryDnsResolver::default();
let uri = Url::parse(endpoint).map_err(|_| ClientCoreError::InvalidUrl(endpoint.to_owned()))?;
let uri = endpoint
.ws_entry_address(false)
.ok_or(ClientCoreError::InvalidEndpoint(endpoint.to_string()))?;
let port: u16 = uri.port_or_known_default().unwrap_or(443);
let host = uri
.host()
.ok_or(ClientCoreError::InvalidUrl(endpoint.to_owned()))?;
// Get address for tcp connection, if a domain is provided use our preferred resolver rather than
// the default std resolve
let sock_addrs: Vec<SocketAddr> = match host {
Host::Ipv4(addr) => vec![SocketAddr::new(addr.into(), port)],
Host::Ipv6(addr) => vec![SocketAddr::new(addr.into(), port)],
Host::Domain(domain) => {
// Do a DNS lookup for the domain using our custom DNS resolver
resolver
.resolve_str(domain)
.await?
.map(|a| SocketAddr::new(a, port))
.collect()
}
};
let sock_addrs: Vec<SocketAddr> = endpoint
.ip_addresses
.iter()
.map(|addr| SocketAddr::new(addr.clone(), port))
.collect();
let stream = TcpStream::connect(&sock_addrs[..]).await?;
tokio_tungstenite::client_async_tls(endpoint, stream)
tokio_tungstenite::client_async_tls(uri.to_string(), stream)
.await
.map_err(Into::into)
}
+1 -1
View File
@@ -27,11 +27,11 @@ nym-credential-storage = { path = "../../credential-storage" }
nym-credentials-interface = { path = "../../credentials-interface" }
nym-crypto = { path = "../../crypto" }
nym-gateway-requests = { path = "../../gateway-requests" }
nym-http-api-client = { path = "../../http-api-client" }
nym-network-defaults = { path = "../../network-defaults" }
nym-sphinx = { path = "../../nymsphinx" }
nym-statistics-common = { path = "../../statistics" }
nym-pemstore = { path = "../../pemstore" }
nym-topology = { path = "../../topology", features = ["persistence"] }
nym-validator-client = { path = "../validator-client", default-features = false }
nym-task = { path = "../../task" }
serde = { workspace = true, features = ["derive"] }
@@ -28,13 +28,13 @@ use nym_sphinx::forwarding::packet::MixPacket;
use nym_statistics_common::clients::connection::ConnectionStatsEvent;
use nym_statistics_common::clients::ClientStatsSender;
use nym_task::ShutdownToken;
use nym_topology::EntryDetails;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use rand::rngs::OsRng;
use std::sync::Arc;
use tracing::instrument;
use tracing::*;
use tungstenite::protocol::Message;
use url::Url;
#[cfg(unix)]
use std::os::fd::RawFd;
@@ -53,29 +53,23 @@ pub mod config;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) mod websockets;
#[cfg(not(target_arch = "wasm32"))]
use crate::client::websockets::connect_async_with_fallback;
use crate::client::websockets::connect_async;
pub struct GatewayConfig {
pub gateway_identity: ed25519::PublicKey,
pub gateway_listeners: GatewayListeners,
pub gateway_details: EntryDetails,
}
impl GatewayConfig {
pub fn new(gateway_identity: ed25519::PublicKey, gateway_listeners: GatewayListeners) -> Self {
pub fn new(gateway_identity: ed25519::PublicKey, gateway_details: EntryDetails) -> Self {
GatewayConfig {
gateway_identity,
gateway_listeners,
gateway_details,
}
}
}
#[derive(Debug, Clone)]
pub struct GatewayListeners {
pub primary: Url,
pub fallback: Option<Url>,
}
#[must_use]
#[derive(Debug)]
pub struct AuthenticationResponse {
@@ -88,7 +82,7 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {
authenticated: bool,
bandwidth: ClientBandwidth,
gateway_addresses: GatewayListeners,
gateway_details: EntryDetails,
gateway_identity: ed25519::PublicKey,
local_identity: Arc<ed25519::KeyPair>,
shared_key: Option<Arc<SharedSymmetricKey>>,
@@ -125,7 +119,7 @@ impl<C, St> GatewayClient<C, St> {
cfg,
authenticated: false,
bandwidth: ClientBandwidth::new_empty(),
gateway_addresses: gateway_config.gateway_listeners,
gateway_details: gateway_config.gateway_details,
gateway_identity: gateway_config.gateway_identity,
local_identity,
shared_key,
@@ -199,19 +193,13 @@ impl<C, St> GatewayClient<C, St> {
#[cfg(not(target_arch = "wasm32"))]
pub async fn establish_connection(&mut self) -> Result<(), GatewayClientError> {
if let Some(fallback_url) = &self.gateway_addresses.fallback {
debug!(
"Attempting to establish connection to gateway at: {}, with fallback at: {fallback_url}",
self.gateway_addresses.primary
);
} else {
debug!(
"Attempting to establish connection to gateway at: {}",
self.gateway_addresses.primary
);
}
let (ws_stream, _) = connect_async_with_fallback(
&self.gateway_addresses,
debug!(
"Attempting to establish connection to gateway at: {:?}",
self.gateway_details
);
let (ws_stream, _) = connect_async(
&self.gateway_details,
#[cfg(unix)]
self.connection_fd_callback.clone(),
)
@@ -224,7 +212,11 @@ impl<C, St> GatewayClient<C, St> {
#[cfg(target_arch = "wasm32")]
pub async fn establish_connection(&mut self) -> Result<(), GatewayClientError> {
let ws_stream = match JSWebsocket::new(self.gateway_addresses.primary.as_ref()) {
let endpoint = self.gateway_details.clone();
let uri = endpoint
.ws_entry_address(false)
.ok_or(GatewayClientError::InvalidEndpoint(endpoint.to_string()))?;
let ws_stream = match JSWebsocket::new(uri.as_ref()) {
Ok(ws_stream) => ws_stream,
Err(e) => {
return Err(GatewayClientError::NetworkErrorWasm(e));
@@ -590,7 +582,7 @@ impl<C, St> GatewayClient<C, St> {
#[instrument(skip_all,
fields(
gateway = %self.gateway_identity,
gateway_address = %self.gateway_addresses.primary
gateway_address = %self.gateway_details
)
)]
pub async fn perform_initial_authentication(
@@ -1089,7 +1081,7 @@ pub struct InitOnly;
impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
// for initialisation we do not need credential storage. Though it's still a bit weird we have to set the generic...
pub fn new_init(
gateway_listeners: GatewayListeners,
gateway_details: EntryDetails,
gateway_identity: ed25519::PublicKey,
local_identity: Arc<ed25519::KeyPair>,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
@@ -1108,7 +1100,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
cfg: GatewayClientConfig::default().with_disabled_credentials_mode(true),
authenticated: false,
bandwidth: ClientBandwidth::new_empty(),
gateway_addresses: gateway_listeners,
gateway_details,
gateway_identity,
local_identity,
shared_key: None,
@@ -1140,7 +1132,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
cfg: self.cfg,
authenticated: self.authenticated,
bandwidth: self.bandwidth,
gateway_addresses: self.gateway_addresses,
gateway_details: self.gateway_details,
gateway_identity: self.gateway_identity,
local_identity: self.local_identity,
shared_key: self.shared_key,
@@ -1,53 +1,38 @@
#[cfg(not(target_arch = "wasm32"))]
use crate::client::GatewayListeners;
use crate::error::GatewayClientError;
use nym_http_api_client::HickoryDnsResolver;
use nym_topology::EntryDetails;
#[cfg(unix)]
use std::{
os::fd::{AsRawFd, RawFd},
sync::Arc,
};
use tokio::net::TcpStream;
use tokio::net::{TcpSocket, TcpStream};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tungstenite::handshake::client::Response;
use url::{Host, Url};
use std::net::SocketAddr;
#[cfg(target_arch = "wasm32")]
use wasm_utils::websocket::JSWebsocket;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) async fn connect_async(
endpoint: &str,
endpoint: &EntryDetails,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), GatewayClientError> {
use tokio::net::TcpSocket;
let resolver = HickoryDnsResolver::default();
let uri =
Url::parse(endpoint).map_err(|_| GatewayClientError::InvalidUrl(endpoint.to_owned()))?;
let uri = endpoint
.ws_entry_address(false)
.ok_or(GatewayClientError::InvalidEndpoint(endpoint.to_string()))?;
let port: u16 = uri.port_or_known_default().unwrap_or(443);
let host = uri
.host()
.ok_or(GatewayClientError::InvalidUrl(endpoint.to_owned()))?;
// Get address for tcp connection, if a domain is provided use our preferred resolver rather than
// the default std resolve
let sock_addrs: Vec<SocketAddr> = match host {
Host::Ipv4(addr) => vec![SocketAddr::new(addr.into(), port)],
Host::Ipv6(addr) => vec![SocketAddr::new(addr.into(), port)],
Host::Domain(domain) => {
// Do a DNS lookup for the domain using our custom DNS resolver
resolver
.resolve_str(domain)
.await?
.map(|a| SocketAddr::new(a, port))
.collect()
}
};
let sock_addrs = endpoint
.ip_addresses
.iter()
.map(|addr| SocketAddr::new(*addr, port));
let uri_str = uri.to_string();
let mut stream = Err(GatewayClientError::NoEndpointForConnection {
address: endpoint.to_owned(),
address: uri_str.clone(),
});
for sock_addr in sock_addrs {
let socket = if sock_addr.is_ipv4() {
@@ -56,7 +41,7 @@ pub(crate) async fn connect_async(
TcpSocket::new_v6()
}
.map_err(|err| GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
address: uri_str.clone(),
source: Box::new(tungstenite::Error::from(err)),
})?;
@@ -72,7 +57,7 @@ pub(crate) async fn connect_async(
}
Err(err) => {
stream = Err(GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
address: uri_str.clone(),
source: Box::new(tungstenite::Error::from(err)),
});
continue;
@@ -80,42 +65,10 @@ pub(crate) async fn connect_async(
}
}
tokio_tungstenite::client_async_tls(endpoint, stream?)
tokio_tungstenite::client_async_tls(uri.clone(), stream?)
.await
.map_err(|error| GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
address: uri_str.clone(),
source: Box::new(error),
})
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) async fn connect_async_with_fallback(
endpoints: &GatewayListeners,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), GatewayClientError> {
match connect_async(
endpoints.primary.as_ref(),
#[cfg(unix)]
connection_fd_callback.clone(),
)
.await
{
Ok(inner) => Ok(inner),
Err(e) => {
if let Some(fallback) = &endpoints.fallback {
tracing::warn!(
"Main endpoint failed {} : {e}, trying fallback : {fallback}",
endpoints.primary
);
connect_async(
fallback.as_ref(),
#[cfg(unix)]
connection_fd_callback,
)
.await
} else {
Err(e)
}
}
}
}
@@ -49,12 +49,8 @@ pub enum GatewayClientError {
#[error("no socket address for endpoint: {address}")]
NoEndpointForConnection { address: String },
#[error("Invalid URL: {0}")]
InvalidUrl(String),
#[cfg(not(target_arch = "wasm32"))]
#[error("resolution failed: {0}")]
ResolutionFailed(#[from] nym_http_api_client::ResolveError),
#[error("Invalid Endpoint: {0}")]
InvalidEndpoint(String),
#[error("No shared key was provided or obtained")]
NoSharedKeyAvailable,
+1
View File
@@ -18,6 +18,7 @@ reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
time = { workspace = true, features = ["serde"] }
url.workspace = true
# 'serde' feature
serde_json = { workspace = true, optional = true }
+36 -69
View File
@@ -10,6 +10,7 @@ use nym_sphinx_types::Node as SphinxNode;
use serde::{Deserialize, Serialize};
use std::net::{IpAddr, SocketAddr};
use thiserror::Error;
use url::Url;
pub use nym_mixnet_contract_common::LegacyMixLayer;
@@ -28,6 +29,35 @@ pub struct EntryDetails {
pub clients_wss_port: Option<u16>,
}
impl EntryDetails {
pub fn ws_entry_address_tls(&self) -> Option<Url> {
let hostname = self.hostname.as_ref()?;
let wss_port = self.clients_wss_port?;
Url::parse(&format!("wss://{hostname}:{wss_port}")).ok()
}
pub fn ws_entry_address_no_tls(&self, prefer_ipv6: bool) -> Option<Url> {
if let Some(hostname) = self.hostname.as_ref() {
return Url::parse(&format!("ws://{hostname}:{}", self.clients_ws_port)).ok();
}
if prefer_ipv6 && let Some(ipv6) = self.ip_addresses.iter().find(|ip| ip.is_ipv6()) {
return Url::parse(&format!("ws://{ipv6}:{}", self.clients_ws_port)).ok();
}
let any_ip = self.ip_addresses.first()?;
Url::parse(&format!("ws://{any_ip}:{}", self.clients_ws_port)).ok()
}
pub fn ws_entry_address(&self, prefer_ipv6: bool) -> Option<Url> {
if let Some(tls) = self.ws_entry_address_tls() {
return Some(tls);
}
self.ws_entry_address_no_tls(prefer_ipv6)
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct SupportedRoles {
pub mixnode: bool,
@@ -59,80 +89,17 @@ pub struct RoutingNode {
}
impl RoutingNode {
pub fn ws_entry_address_tls(&self) -> Option<String> {
let entry = self.entry.as_ref()?;
let hostname = entry.hostname.as_ref()?;
let wss_port = entry.clients_wss_port?;
Some(format!("wss://{hostname}:{wss_port}"))
}
pub fn ws_entry_address_no_tls(&self, prefer_ipv6: bool) -> Option<String> {
let entry = self.entry.as_ref()?;
if let Some(hostname) = entry.hostname.as_ref() {
return Some(format!("ws://{hostname}:{}", entry.clients_ws_port));
}
if prefer_ipv6 && let Some(ipv6) = entry.ip_addresses.iter().find(|ip| ip.is_ipv6()) {
return Some(format!("ws://{ipv6}:{}", entry.clients_ws_port));
}
let any_ip = entry.ip_addresses.first()?;
Some(format!("ws://{any_ip}:{}", entry.clients_ws_port))
}
pub fn ws_entry_address(&self, prefer_ipv6: bool) -> Option<String> {
if let Some(tls) = self.ws_entry_address_tls() {
return Some(tls);
}
self.ws_entry_address_no_tls(prefer_ipv6)
}
pub fn ws_entry_address_with_fallback(
&self,
prefer_ipv6: bool,
no_hostname: bool,
) -> (Option<String>, Option<String>) {
let Some(entry) = &self.entry else {
return (None, None);
};
// Put hostname first if we want it
let maybe_hostname = if !no_hostname {
entry.hostname.clone()
} else {
None
};
// Put ipv6 first or keep them as is
let ips: Vec<&IpAddr> = if prefer_ipv6 {
entry
.ip_addresses
.iter()
.filter(|ip| ip.is_ipv6())
.chain(entry.ip_addresses.iter().filter(|ip| ip.is_ipv4()))
.collect()
} else {
entry.ip_addresses.iter().collect()
};
// chain everything and keep the top two as ws addresses
let ws_addresses: Vec<_> = maybe_hostname
.into_iter()
.chain(ips.into_iter().map(|ip| ip.to_string()))
.take(2)
.map(|host| format!("ws://{host}:{}", entry.clients_ws_port))
.collect();
(ws_addresses.first().cloned(), ws_addresses.get(1).cloned())
}
pub fn identity(&self) -> ed25519::PublicKey {
self.identity_key
}
}
impl std::fmt::Display for EntryDetails {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl<'a> From<&'a RoutingNode> for SphinxNode {
fn from(node: &'a RoutingNode) -> Self {
// SAFETY: this conversion is infallible as all versions of socket addresses have
+1 -2
View File
@@ -139,7 +139,7 @@ pub async fn setup_gateway_wasm(
GatewaySetup::MustLoad { gateway_id: None }
} else {
let selection_spec =
GatewaySelectionSpecification::new(chosen_gateway.clone(), None, force_tls, false);
GatewaySelectionSpecification::new(chosen_gateway.clone(), None, force_tls);
GatewaySetup::New {
specification: selection_spec,
@@ -218,7 +218,6 @@ pub async fn add_gateway(
preferred_gateway.clone(),
latency_based_selection,
force_tls,
false,
);
let preferred_gateway = preferred_gateway
+4 -7
View File
@@ -5,6 +5,7 @@ use nym_client_core::client::base_client::storage::gateways_storage::{
BadGateway, GatewayDetails, GatewayPublishedData, GatewayRegistration, RawGatewayPublishedData,
RawRemoteGatewayDetails, RemoteGatewayDetails,
};
use nym_topology::EntryDetails;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use zeroize::Zeroize;
@@ -33,8 +34,7 @@ impl TryFrom<WasmRawRegisteredGateway> for GatewayRegistration {
gateway_id_bs58: value.gateway_id_bs58,
derived_aes256_gcm_siv_key: value.derived_aes256_gcm_siv_key,
published_data: RawGatewayPublishedData {
gateway_listener: value.published_data.gateway_listener,
fallback_listener: value.published_data.fallback_listener,
gateway_details: value.published_data.gateway_details,
expiration_timestamp: value.published_data.expiration_timestamp,
},
};
@@ -66,9 +66,7 @@ impl<'a> From<&'a GatewayRegistration> for WasmRawRegisteredGateway {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WasmRawGatewayPublishedData {
pub gateway_listener: String,
pub fallback_listener: Option<String>,
pub gateway_details: EntryDetails,
pub expiration_timestamp: OffsetDateTime,
}
@@ -76,8 +74,7 @@ pub struct WasmRawGatewayPublishedData {
impl<'a> From<&'a GatewayPublishedData> for WasmRawGatewayPublishedData {
fn from(value: &'a GatewayPublishedData) -> Self {
WasmRawGatewayPublishedData {
gateway_listener: value.listeners.primary.to_string(),
fallback_listener: value.listeners.fallback.as_ref().map(|uri| uri.to_string()),
gateway_details: value.details.clone(),
expiration_timestamp: value.expiration_timestamp,
}
}
@@ -352,7 +352,7 @@ impl PacketPreparer {
.collect();
GatewayPackets::new(
route.gateway_clients_address(),
route.gateway_details().expect("AHHHHHHH"),
route.gateway_identity(),
mix_packets,
)
@@ -422,8 +422,14 @@ impl PacketPreparer {
// for each test route...
for test_route in test_routes {
let route_ext = test_route.test_message_ext(test_nonce);
let gateway_address = test_route.gateway_clients_address();
let gateway_identity = test_route.gateway_identity();
let gateway_address = match test_route.gateway_details() {
Some(details) => details,
None => {
tracing::warn!("skipping gateway {gateway_identity} - missing gateway details");
continue;
}
};
let mut mix_tester = self.ephemeral_mix_tester(test_route);
@@ -453,7 +459,15 @@ impl PacketPreparer {
for gateway in &gateways_to_test_details {
let recipient = self.create_packet_sender(gateway);
let gateway_identity = gateway.identity_key;
let gateway_address = gateway.ws_entry_address(false);
let gateway_details = match &gateway.entry {
Some(details) => details,
None => {
tracing::warn!(
"skipping gateway {gateway_identity} - missing gateway details"
);
continue;
}
};
// the unwrap here is fine as:
// 1. the topology is definitely valid (otherwise we wouldn't be here)
@@ -475,7 +489,9 @@ impl PacketPreparer {
// or create a new one
let gateway_packets = all_gateway_packets
.entry(gateway_identity.to_bytes())
.or_insert_with(|| GatewayPackets::empty(gateway_address, gateway_identity));
.or_insert_with(|| {
GatewayPackets::empty(gateway_details.clone(), gateway_identity)
});
gateway_packets.push_packets(gateway_mix_packets);
}
}
+13 -23
View File
@@ -14,13 +14,14 @@ use nym_bandwidth_controller::BandwidthController;
use nym_credential_storage::persistent_storage::PersistentStorage;
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::client::config::GatewayClientConfig;
use nym_gateway_client::client::{GatewayConfig, GatewayListeners};
use nym_gateway_client::client::GatewayConfig;
use nym_gateway_client::error::GatewayClientError;
use nym_gateway_client::{
AcknowledgementReceiver, GatewayClient, MixnetMessageReceiver, PacketRouter, SharedSymmetricKey,
};
use nym_sphinx::forwarding::packet::MixPacket;
use nym_task::ShutdownToken;
use nym_topology::EntryDetails;
use pin_project::pin_project;
use sqlx::__rt::timeout;
use std::mem;
@@ -30,14 +31,13 @@ use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use tracing::{debug, error, info, trace, warn};
use url::Url;
const TIME_CHUNK_SIZE: Duration = Duration::from_millis(50);
pub(crate) struct GatewayPackets {
/// Network address of the target gateway if wanted to be accessed by the client.
/// It is a websocket address.
pub(crate) clients_address: Option<String>,
pub(crate) gateway_details: EntryDetails,
/// Public key of the target gateway.
pub(crate) pub_key: ed25519::PublicKey,
@@ -48,33 +48,27 @@ pub(crate) struct GatewayPackets {
impl GatewayPackets {
pub(crate) fn new(
clients_address: Option<String>,
gateway_details: EntryDetails,
pub_key: ed25519::PublicKey,
packets: Vec<MixPacket>,
) -> Self {
GatewayPackets {
clients_address,
gateway_details,
pub_key,
packets,
}
}
pub(crate) fn gateway_config(&self) -> Result<Option<GatewayConfig>, url::ParseError> {
match self.clients_address.as_ref() {
Some(gateway_listener) => Ok(Some(GatewayConfig {
gateway_identity: self.pub_key,
gateway_listeners: GatewayListeners {
primary: Url::parse(gateway_listener)?,
fallback: None,
},
})),
None => Ok(None),
}
pub(crate) fn gateway_config(&self) -> Result<GatewayConfig, url::ParseError> {
Ok(GatewayConfig {
gateway_identity: self.pub_key,
gateway_details: self.gateway_details.clone(),
})
}
pub(crate) fn empty(clients_address: Option<String>, pub_key: ed25519::PublicKey) -> Self {
pub(crate) fn empty(gateway_details: EntryDetails, pub_key: ed25519::PublicKey) -> Self {
GatewayPackets {
clients_address,
gateway_details,
pub_key,
packets: Vec::new(),
}
@@ -370,11 +364,7 @@ impl PacketSender {
let identity = packets.pub_key;
let gateway_config = match packets.gateway_config() {
Ok(Some(gateway_config)) => gateway_config,
Ok(None) => {
warn!("gateway {identity} didn't provide valid entry information");
return None;
}
Ok(gateway_config) => gateway_config,
Err(e) => {
warn!("Error while parsing entry information for gateway {identity} : {e}");
return None;
@@ -7,7 +7,7 @@ use nym_crypto::asymmetric::ed25519;
use nym_mixnet_contract_common::nym_node::Role;
use nym_mixnet_contract_common::{EpochId, EpochRewardedSet, RewardedSet};
use nym_topology::node::RoutingNode;
use nym_topology::{NymRouteProvider, NymTopology, NymTopologyMetadata};
use nym_topology::{EntryDetails, NymRouteProvider, NymTopology, NymTopologyMetadata};
use std::fmt::{Debug, Formatter};
use time::OffsetDateTime;
@@ -82,8 +82,8 @@ impl TestRoute {
self.nodes.nodes_with_role(Role::Layer3).next().unwrap()
}
pub(crate) fn gateway_clients_address(&self) -> Option<String> {
self.gateway().ws_entry_address(false)
pub(crate) fn gateway_details(&self) -> Option<EntryDetails> {
self.gateway().entry.clone()
}
pub(crate) fn gateway_identity(&self) -> ed25519::PublicKey {
-7
View File
@@ -353,7 +353,6 @@ where
client.custom_shutdown = self.custom_shutdown;
client.wait_for_gateway = self.wait_for_gateway;
client.force_tls = self.force_tls;
client.no_hostname = self.no_hostname;
client.user_agent = self.user_agent;
#[cfg(unix)]
if self.connection_fd_callback.is_some() {
@@ -406,9 +405,6 @@ where
/// Force the client to connect using wss protocol with the gateway.
force_tls: bool,
/// Force the client to pick gateway IP and not hostname, ignored if force_tls is set
no_hostname: bool,
/// Allows passing an externally controlled shutdown handle.
custom_shutdown: Option<ShutdownTracker>,
@@ -477,7 +473,6 @@ where
custom_gateway_transceiver: None,
wait_for_gateway: false,
force_tls: false,
no_hostname: false,
custom_shutdown: None,
event_tx,
user_agent: None,
@@ -597,7 +592,6 @@ where
self.config.user_chosen_gateway.clone(),
None,
self.force_tls,
self.no_hostname,
);
let available_gateways = self.available_gateways().await?;
@@ -620,7 +614,6 @@ where
gateway_registration,
available_gateways,
self.force_tls,
self.no_hostname,
)
.await
}
+23 -24
View File
@@ -188,30 +188,29 @@ impl NymNodeTesterBuilder {
let stats_sender_task = task_manager.clone_shutdown_token();
let mut gateway_client =
if let Some(existing_client) = initialisation_result.authenticated_ephemeral_client {
existing_client.upgrade(
packet_router,
self.bandwidth_controller.take(),
ClientStatsSender::new(None, stats_sender_task),
gateway_task.clone(),
)
} else {
let cfg = GatewayConfig::new(
gateway_info.gateway_id,
gateway_info.published_data.listeners,
);
GatewayClient::new(
GatewayClientConfig::new_default().with_disabled_credentials_mode(true),
cfg,
managed_keys.identity_keypair(),
Some(gateway_info.shared_key),
packet_router,
self.bandwidth_controller.take(),
ClientStatsSender::new(None, stats_sender_task),
gateway_task,
)
};
let mut gateway_client = if let Some(existing_client) =
initialisation_result.authenticated_ephemeral_client
{
existing_client.upgrade(
packet_router,
self.bandwidth_controller.take(),
ClientStatsSender::new(None, stats_sender_task),
gateway_task.clone(),
)
} else {
let cfg =
GatewayConfig::new(gateway_info.gateway_id, gateway_info.published_data.details);
GatewayClient::new(
GatewayClientConfig::new_default().with_disabled_credentials_mode(true),
cfg,
managed_keys.identity_keypair(),
Some(gateway_info.shared_key),
packet_router,
self.bandwidth_controller.take(),
ClientStatsSender::new(None, stats_sender_task),
gateway_task,
)
};
let _auth_res = gateway_client.perform_initial_authentication().await?;