Compare commits

...

3 Commits

Author SHA1 Message Date
Andrej Mihajlov 637d9a4ebf ci: fix 2025-10-23 12:45:52 +02:00
Andrej Mihajlov ecf6e849d9 Add connect_timeout to registration client 2025-10-22 15:41:26 +02:00
Andrej Mihajlov 6156e2656e Add connect timeout 2025-10-22 15:21:12 +02:00
16 changed files with 109 additions and 4 deletions
@@ -140,6 +140,7 @@ where
available_gateways,
#[cfg(unix)]
connection_fd_callback: None,
connect_timeout: None,
};
let init_details =
@@ -188,6 +188,7 @@ where
available_gateways,
#[cfg(unix)]
connection_fd_callback: None,
connect_timeout: None,
};
let init_details =
@@ -65,6 +65,7 @@ use std::fmt::Debug;
use std::os::raw::c_int as RawFd;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::mpsc::Sender;
use url::Url;
@@ -230,6 +231,7 @@ pub struct BaseClientBuilder<C, S: MixnetClientStorage> {
#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
connect_timeout: Option<Duration>,
derivation_material: Option<DerivationMaterial>,
}
@@ -258,6 +260,7 @@ where
setup_method: GatewaySetup::MustLoad { gateway_id: None },
#[cfg(unix)]
connection_fd_callback: None,
connect_timeout: None,
derivation_material: None,
}
}
@@ -356,6 +359,11 @@ where
self
}
pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
self.connect_timeout = Some(timeout);
self
}
// note: do **NOT** make this method public as its only valid usage is from within `start_base`
// because it relies on the crypto keys being already loaded
fn mix_address(details: &InitialisationResult) -> Recipient {
@@ -533,6 +541,7 @@ where
packet_router: PacketRouter,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
connect_timeout: Option<Duration>,
shutdown_tracker: &ShutdownTracker,
) -> Result<GatewayClient<C, S::CredentialStore>, ClientCoreError>
where
@@ -577,6 +586,7 @@ where
stats_reporter,
#[cfg(unix)]
connection_fd_callback,
connect_timeout,
shutdown_tracker.clone_shutdown_token(),
)
};
@@ -640,6 +650,7 @@ where
packet_router: PacketRouter,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
connect_timeout: Option<Duration>,
shutdown_tracker: &ShutdownTracker,
) -> Result<Box<dyn GatewayTransceiver + Send>, ClientCoreError>
where
@@ -672,6 +683,7 @@ where
stats_reporter,
#[cfg(unix)]
connection_fd_callback,
connect_timeout,
shutdown_tracker,
)
.await?;
@@ -1074,6 +1086,7 @@ where
stats_reporter.clone(),
#[cfg(unix)]
self.connection_fd_callback,
self.connect_timeout,
&shutdown_tracker.child_tracker(),
)
.await?;
+2
View File
@@ -382,6 +382,7 @@ pub(super) async fn register_with_gateway(
gateway_listener: Url,
our_identity: Arc<ed25519::KeyPair>,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
connect_timeout: Option<Duration>,
) -> Result<RegistrationResult, ClientCoreError> {
let mut gateway_client = GatewayClient::new_init(
gateway_listener,
@@ -389,6 +390,7 @@ pub(super) async fn register_with_gateway(
our_identity.clone(),
#[cfg(unix)]
connection_fd_callback,
connect_timeout,
);
gateway_client.establish_connection().await.map_err(|err| {
+5
View File
@@ -23,6 +23,7 @@ use nym_topology::node::RoutingNode;
use rand::rngs::OsRng;
use rand::{CryptoRng, RngCore};
use serde::Serialize;
use std::time::Duration;
#[cfg(unix)]
use std::{os::fd::RawFd, sync::Arc};
@@ -56,6 +57,7 @@ async fn setup_new_gateway<K, D>(
selection_specification: GatewaySelectionSpecification,
available_gateways: Vec<RoutingNode>,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
connect_timeout: Option<Duration>,
) -> Result<InitialisationResult, ClientCoreError>
where
K: KeyStore,
@@ -117,6 +119,7 @@ where
our_identity,
#[cfg(unix)]
connection_fd_callback,
connect_timeout,
)
.await?;
(
@@ -213,6 +216,7 @@ where
available_gateways,
#[cfg(unix)]
connection_fd_callback,
connect_timeout,
} => {
tracing::debug!("GatewaySetup::New with spec: {specification:?}");
setup_new_gateway(
@@ -222,6 +226,7 @@ where
available_gateways,
#[cfg(unix)]
connection_fd_callback,
connect_timeout,
)
.await
}
+6
View File
@@ -21,6 +21,7 @@ use std::fmt::{Debug, Display};
#[cfg(unix)]
use std::os::fd::RawFd;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use url::Url;
@@ -214,6 +215,9 @@ pub enum GatewaySetup {
/// Callback useful for allowing initial connection to gateway
#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
/// Timeout for establishing connection
connect_timeout: Option<Duration>,
},
ReuseConnection {
@@ -239,6 +243,7 @@ impl Debug for GatewaySetup {
available_gateways,
#[cfg(unix)]
connection_fd_callback: _,
connect_timeout: _,
} => f
.debug_struct("GatewaySetup::New")
.field("specification", specification)
@@ -280,6 +285,7 @@ impl GatewaySetup {
available_gateways: vec![],
#[cfg(unix)]
connection_fd_callback: None,
connect_timeout: None,
}
}
@@ -38,6 +38,7 @@ use url::Url;
#[cfg(unix)]
use std::os::fd::RawFd;
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;
@@ -104,10 +105,13 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {
// currently unused (but populated)
negotiated_protocol: Option<u8>,
// Callback on the fd as soon as the connection has been established
/// Callback on the fd as soon as the connection has been established
#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
/// Maximum duration to wait for a connection to be established when set
connect_timeout: Option<Duration>,
/// Listen to shutdown messages and send notifications back to the task manager
shutdown_token: ShutdownToken,
}
@@ -124,6 +128,7 @@ impl<C, St> GatewayClient<C, St> {
bandwidth_controller: Option<BandwidthController<C, St>>,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
connect_timeout: Option<Duration>,
shutdown_token: ShutdownToken,
) -> Self {
GatewayClient {
@@ -141,6 +146,7 @@ impl<C, St> GatewayClient<C, St> {
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback,
connect_timeout,
shutdown_token,
}
}
@@ -208,6 +214,7 @@ impl<C, St> GatewayClient<C, St> {
&self.gateway_address,
#[cfg(unix)]
self.connection_fd_callback.clone(),
self.connect_timeout,
)
.await?;
@@ -1132,6 +1139,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
gateway_identity: ed25519::PublicKey,
local_identity: Arc<ed25519::KeyPair>,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
connect_timeout: Option<Duration>,
) -> Self {
log::trace!("Initialising gateway client");
use futures::channel::mpsc;
@@ -1158,6 +1166,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback,
connect_timeout,
shutdown_token,
}
}
@@ -1190,6 +1199,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
negotiated_protocol: self.negotiated_protocol,
#[cfg(unix)]
connection_fd_callback: self.connection_fd_callback,
connect_timeout: self.connect_timeout,
shutdown_token,
}
}
@@ -1,6 +1,7 @@
use crate::error::GatewayClientError;
use nym_http_api_client::HickoryDnsResolver;
use std::time::Duration;
#[cfg(unix)]
use std::{
os::fd::{AsRawFd, RawFd},
@@ -17,6 +18,7 @@ use std::net::SocketAddr;
pub(crate) async fn connect_async(
endpoint: &str,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
connect_timeout: Option<Duration>,
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), GatewayClientError> {
use tokio::net::TcpSocket;
@@ -64,7 +66,22 @@ pub(crate) async fn connect_async(
callback.as_ref()(socket.as_raw_fd());
}
match socket.connect(sock_addr).await {
let connect_res = if let Some(connect_timeout) = connect_timeout {
match tokio::time::timeout(connect_timeout, socket.connect(sock_addr)).await {
Ok(res) => res,
Err(_elapsed) => {
stream = Err(GatewayClientError::NetworkConnectionTimeout {
address: endpoint.to_owned(),
timeout: connect_timeout,
});
continue;
}
}
} else {
socket.connect(sock_addr).await
};
match connect_res {
Ok(s) => {
stream = Ok(s);
break;
@@ -4,6 +4,7 @@
use nym_gateway_requests::registration::handshake::error::HandshakeError;
use nym_gateway_requests::{GatewayRequestsError, SimpleGatewayRequestsError};
use std::io;
use std::time::Duration;
use thiserror::Error;
use tungstenite::Error as WsError;
@@ -46,6 +47,9 @@ pub enum GatewayClientError {
source: Box<WsError>,
},
#[error("timeout when establishing connection: {address}, timeout: {timeout:?}")]
NetworkConnectionTimeout { address: String, timeout: Duration },
#[error("no socket address for endpoint: {address}")]
NoEndpointForConnection { address: String },
+23 -2
View File
@@ -23,6 +23,7 @@ use nym_topology::{EpochRewardedSet, NymTopology, RoutingNode};
use nym_validator_client::client::IdentityKey;
use nym_validator_client::{nym_api::NymApiClientExt, UserAgent};
use rand::thread_rng;
use std::time::Duration;
use url::Url;
use wasm_bindgen::prelude::wasm_bindgen;
use wasm_bindgen_futures::future_to_promise;
@@ -127,6 +128,7 @@ pub async fn setup_gateway_wasm(
force_tls: bool,
chosen_gateway: Option<IdentityKey>,
gateways: Vec<RoutingNode>,
connect_timeout: Option<Duration>,
) -> Result<InitialisationResult, WasmCoreError> {
// TODO: so much optimization and extra features could be added here, but that's for the future
@@ -144,6 +146,7 @@ pub async fn setup_gateway_wasm(
GatewaySetup::New {
specification: selection_spec,
available_gateways: gateways,
connect_timeout,
}
};
@@ -159,6 +162,7 @@ pub async fn setup_gateway_from_api(
nym_apis: &[Url],
minimum_performance: u8,
ignore_epoch_roles: bool,
connect_timeout: Option<Duration>,
) -> Result<InitialisationResult, WasmCoreError> {
let gateways = gateways_for_init(
nym_apis,
@@ -168,7 +172,14 @@ pub async fn setup_gateway_from_api(
None,
)
.await?;
setup_gateway_wasm(client_store, force_tls, chosen_gateway, gateways).await
setup_gateway_wasm(
client_store,
force_tls,
chosen_gateway,
gateways,
connect_timeout,
)
.await
}
pub async fn current_gateways_wasm(
@@ -192,9 +203,17 @@ pub async fn setup_from_topology(
force_tls: bool,
topology: &NymTopology,
client_store: &ClientStorage,
connect_timeout: Option<Duration>,
) -> Result<InitialisationResult, WasmCoreError> {
let gateways = topology.entry_capable_nodes().cloned().collect::<Vec<_>>();
setup_gateway_wasm(client_store, force_tls, explicit_gateway, gateways).await
setup_gateway_wasm(
client_store,
force_tls,
explicit_gateway,
gateways,
connect_timeout,
)
.await
}
pub async fn generate_new_client_keys(store: &ClientStorage) -> Result<(), WasmCoreError> {
@@ -213,6 +232,7 @@ pub async fn add_gateway(
min_performance: u8,
ignore_epoch_roles: bool,
storage: &ClientStorage,
connect_timeout: Option<Duration>,
) -> Result<(), WasmCoreError> {
let selection_spec = GatewaySelectionSpecification::new(
preferred_gateway.clone(),
@@ -267,6 +287,7 @@ pub async fn add_gateway(
let gateway_setup = GatewaySetup::New {
specification: selection_spec,
available_gateways,
connect_timeout,
};
let init_details = setup_gateway(gateway_setup, storage, storage).await?;
@@ -190,6 +190,7 @@ impl PacketSender {
),
#[cfg(unix)]
None,
None,
fresh_gateway_client_data.shutdown_token.clone(),
);
@@ -38,6 +38,7 @@ pub struct BuilderConfig {
pub cancel_token: CancellationToken,
#[cfg(unix)]
pub connection_fd_callback: Arc<dyn Fn(RawFd) + Send + Sync>,
pub connect_timeout: Option<Duration>,
}
#[derive(Clone, Default, Debug, Eq, PartialEq)]
@@ -71,6 +72,7 @@ impl BuilderConfig {
network_env: NymNetworkDetails,
cancel_token: CancellationToken,
#[cfg(unix)] connection_fd_callback: Arc<dyn Fn(RawFd) + Send + Sync>,
connect_timeout: Option<Duration>,
) -> Self {
Self {
entry_node,
@@ -84,6 +86,7 @@ impl BuilderConfig {
cancel_token,
#[cfg(unix)]
connection_fd_callback,
connect_timeout,
}
}
@@ -294,6 +297,7 @@ pub struct BuilderConfigBuilder {
cancel_token: Option<CancellationToken>,
#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
connect_timeout: Option<Duration>,
}
impl BuilderConfigBuilder {
@@ -358,6 +362,11 @@ impl BuilderConfigBuilder {
self
}
pub fn with_connect_timeout(mut self, connect_timeout: Duration) -> Self {
self.connect_timeout = Some(connect_timeout);
self
}
/// Builds the `BuilderConfig`.
///
/// Returns an error if any required field is missing.
@@ -388,6 +397,7 @@ impl BuilderConfigBuilder {
connection_fd_callback: self
.connection_fd_callback
.ok_or(BuilderConfigError::MissingConnectionFdCallback)?,
connect_timeout: self.connect_timeout,
})
}
}
+10
View File
@@ -38,6 +38,7 @@ use std::path::Path;
use std::path::PathBuf;
#[cfg(unix)]
use std::sync::Arc;
use std::time::Duration;
use url::Url;
use zeroize::Zeroizing;
@@ -405,6 +406,9 @@ where
#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(std::os::fd::RawFd) + Send + Sync>>,
/// Timeout for establishing a connection
connect_timeout: Option<Duration>,
forget_me: ForgetMe,
remember_me: RememberMe,
@@ -466,6 +470,7 @@ where
user_agent: None,
#[cfg(unix)]
connection_fd_callback: None,
connect_timeout: None,
forget_me,
remember_me,
derivation_material: None,
@@ -589,6 +594,7 @@ where
available_gateways,
#[cfg(unix)]
connection_fd_callback: self.connection_fd_callback.clone(),
connect_timeout: self.connect_timeout,
})
}
@@ -758,6 +764,10 @@ where
base_builder = base_builder.with_connection_fd_callback(connection_fd_callback);
}
if let Some(connect_timeout) = self.connect_timeout {
base_builder = base_builder.with_connect_timeout(connect_timeout);
}
let started_client = base_builder.start_base().await?;
self.state = BuilderState::Registered {};
let nym_address = started_client.address;
+1
View File
@@ -222,6 +222,7 @@ impl NymClientBuilder {
self.config.base.debug.topology.minimum_gateway_performance,
self.config.base.debug.topology.ignore_ingress_epoch_role,
&client_store,
None,
)
.await?;
}
+1
View File
@@ -158,6 +158,7 @@ impl MixFetchClientBuilder {
self.config.base.debug.topology.minimum_gateway_performance,
self.config.base.debug.topology.ignore_ingress_epoch_role,
&client_store,
None,
)
.await?;
}
+2
View File
@@ -153,6 +153,7 @@ impl NymNodeTesterBuilder {
false,
&self.base_topology,
client_store,
None,
)
.await?)
}
@@ -211,6 +212,7 @@ impl NymNodeTesterBuilder {
packet_router,
self.bandwidth_controller.take(),
ClientStatsSender::new(None, stats_sender_task),
None,
gateway_task,
)
};