Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 637d9a4ebf | |||
| ecf6e849d9 | |||
| 6156e2656e |
@@ -140,6 +140,7 @@ where
|
||||
available_gateways,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: None,
|
||||
connect_timeout: None,
|
||||
};
|
||||
|
||||
let init_details =
|
||||
|
||||
@@ -188,6 +188,7 @@ where
|
||||
available_gateways,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: None,
|
||||
connect_timeout: None,
|
||||
};
|
||||
|
||||
let init_details =
|
||||
|
||||
@@ -65,6 +65,7 @@ use std::fmt::Debug;
|
||||
use std::os::raw::c_int as RawFd;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use url::Url;
|
||||
@@ -230,6 +231,7 @@ pub struct BaseClientBuilder<C, S: MixnetClientStorage> {
|
||||
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
connect_timeout: Option<Duration>,
|
||||
|
||||
derivation_material: Option<DerivationMaterial>,
|
||||
}
|
||||
@@ -258,6 +260,7 @@ where
|
||||
setup_method: GatewaySetup::MustLoad { gateway_id: None },
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: None,
|
||||
connect_timeout: None,
|
||||
derivation_material: None,
|
||||
}
|
||||
}
|
||||
@@ -356,6 +359,11 @@ where
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
|
||||
self.connect_timeout = Some(timeout);
|
||||
self
|
||||
}
|
||||
|
||||
// note: do **NOT** make this method public as its only valid usage is from within `start_base`
|
||||
// because it relies on the crypto keys being already loaded
|
||||
fn mix_address(details: &InitialisationResult) -> Recipient {
|
||||
@@ -533,6 +541,7 @@ where
|
||||
packet_router: PacketRouter,
|
||||
stats_reporter: ClientStatsSender,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
connect_timeout: Option<Duration>,
|
||||
shutdown_tracker: &ShutdownTracker,
|
||||
) -> Result<GatewayClient<C, S::CredentialStore>, ClientCoreError>
|
||||
where
|
||||
@@ -577,6 +586,7 @@ where
|
||||
stats_reporter,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
connect_timeout,
|
||||
shutdown_tracker.clone_shutdown_token(),
|
||||
)
|
||||
};
|
||||
@@ -640,6 +650,7 @@ where
|
||||
packet_router: PacketRouter,
|
||||
stats_reporter: ClientStatsSender,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
connect_timeout: Option<Duration>,
|
||||
shutdown_tracker: &ShutdownTracker,
|
||||
) -> Result<Box<dyn GatewayTransceiver + Send>, ClientCoreError>
|
||||
where
|
||||
@@ -672,6 +683,7 @@ where
|
||||
stats_reporter,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
connect_timeout,
|
||||
shutdown_tracker,
|
||||
)
|
||||
.await?;
|
||||
@@ -1074,6 +1086,7 @@ where
|
||||
stats_reporter.clone(),
|
||||
#[cfg(unix)]
|
||||
self.connection_fd_callback,
|
||||
self.connect_timeout,
|
||||
&shutdown_tracker.child_tracker(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -382,6 +382,7 @@ pub(super) async fn register_with_gateway(
|
||||
gateway_listener: Url,
|
||||
our_identity: Arc<ed25519::KeyPair>,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
connect_timeout: Option<Duration>,
|
||||
) -> Result<RegistrationResult, ClientCoreError> {
|
||||
let mut gateway_client = GatewayClient::new_init(
|
||||
gateway_listener,
|
||||
@@ -389,6 +390,7 @@ pub(super) async fn register_with_gateway(
|
||||
our_identity.clone(),
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
connect_timeout,
|
||||
);
|
||||
|
||||
gateway_client.establish_connection().await.map_err(|err| {
|
||||
|
||||
@@ -23,6 +23,7 @@ use nym_topology::node::RoutingNode;
|
||||
use rand::rngs::OsRng;
|
||||
use rand::{CryptoRng, RngCore};
|
||||
use serde::Serialize;
|
||||
use std::time::Duration;
|
||||
#[cfg(unix)]
|
||||
use std::{os::fd::RawFd, sync::Arc};
|
||||
|
||||
@@ -56,6 +57,7 @@ async fn setup_new_gateway<K, D>(
|
||||
selection_specification: GatewaySelectionSpecification,
|
||||
available_gateways: Vec<RoutingNode>,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
connect_timeout: Option<Duration>,
|
||||
) -> Result<InitialisationResult, ClientCoreError>
|
||||
where
|
||||
K: KeyStore,
|
||||
@@ -117,6 +119,7 @@ where
|
||||
our_identity,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
connect_timeout,
|
||||
)
|
||||
.await?;
|
||||
(
|
||||
@@ -213,6 +216,7 @@ where
|
||||
available_gateways,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
connect_timeout,
|
||||
} => {
|
||||
tracing::debug!("GatewaySetup::New with spec: {specification:?}");
|
||||
setup_new_gateway(
|
||||
@@ -222,6 +226,7 @@ where
|
||||
available_gateways,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
connect_timeout,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ use std::fmt::{Debug, Display};
|
||||
#[cfg(unix)]
|
||||
use std::os::fd::RawFd;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use time::OffsetDateTime;
|
||||
use url::Url;
|
||||
|
||||
@@ -214,6 +215,9 @@ pub enum GatewaySetup {
|
||||
/// Callback useful for allowing initial connection to gateway
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
|
||||
/// Timeout for establishing connection
|
||||
connect_timeout: Option<Duration>,
|
||||
},
|
||||
|
||||
ReuseConnection {
|
||||
@@ -239,6 +243,7 @@ impl Debug for GatewaySetup {
|
||||
available_gateways,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: _,
|
||||
connect_timeout: _,
|
||||
} => f
|
||||
.debug_struct("GatewaySetup::New")
|
||||
.field("specification", specification)
|
||||
@@ -280,6 +285,7 @@ impl GatewaySetup {
|
||||
available_gateways: vec![],
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: None,
|
||||
connect_timeout: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -38,6 +38,7 @@ use url::Url;
|
||||
|
||||
#[cfg(unix)]
|
||||
use std::os::fd::RawFd;
|
||||
use std::time::Duration;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::time::sleep;
|
||||
|
||||
@@ -104,10 +105,13 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {
|
||||
// currently unused (but populated)
|
||||
negotiated_protocol: Option<u8>,
|
||||
|
||||
// Callback on the fd as soon as the connection has been established
|
||||
/// Callback on the fd as soon as the connection has been established
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
|
||||
/// Maximum duration to wait for a connection to be established when set
|
||||
connect_timeout: Option<Duration>,
|
||||
|
||||
/// Listen to shutdown messages and send notifications back to the task manager
|
||||
shutdown_token: ShutdownToken,
|
||||
}
|
||||
@@ -124,6 +128,7 @@ impl<C, St> GatewayClient<C, St> {
|
||||
bandwidth_controller: Option<BandwidthController<C, St>>,
|
||||
stats_reporter: ClientStatsSender,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
connect_timeout: Option<Duration>,
|
||||
shutdown_token: ShutdownToken,
|
||||
) -> Self {
|
||||
GatewayClient {
|
||||
@@ -141,6 +146,7 @@ impl<C, St> GatewayClient<C, St> {
|
||||
negotiated_protocol: None,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
connect_timeout,
|
||||
shutdown_token,
|
||||
}
|
||||
}
|
||||
@@ -208,6 +214,7 @@ impl<C, St> GatewayClient<C, St> {
|
||||
&self.gateway_address,
|
||||
#[cfg(unix)]
|
||||
self.connection_fd_callback.clone(),
|
||||
self.connect_timeout,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1132,6 +1139,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
|
||||
gateway_identity: ed25519::PublicKey,
|
||||
local_identity: Arc<ed25519::KeyPair>,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
connect_timeout: Option<Duration>,
|
||||
) -> Self {
|
||||
log::trace!("Initialising gateway client");
|
||||
use futures::channel::mpsc;
|
||||
@@ -1158,6 +1166,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
|
||||
negotiated_protocol: None,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
connect_timeout,
|
||||
shutdown_token,
|
||||
}
|
||||
}
|
||||
@@ -1190,6 +1199,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
|
||||
negotiated_protocol: self.negotiated_protocol,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: self.connection_fd_callback,
|
||||
connect_timeout: self.connect_timeout,
|
||||
shutdown_token,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::error::GatewayClientError;
|
||||
|
||||
use nym_http_api_client::HickoryDnsResolver;
|
||||
use std::time::Duration;
|
||||
#[cfg(unix)]
|
||||
use std::{
|
||||
os::fd::{AsRawFd, RawFd},
|
||||
@@ -17,6 +18,7 @@ use std::net::SocketAddr;
|
||||
pub(crate) async fn connect_async(
|
||||
endpoint: &str,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
connect_timeout: Option<Duration>,
|
||||
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), GatewayClientError> {
|
||||
use tokio::net::TcpSocket;
|
||||
|
||||
@@ -64,7 +66,22 @@ pub(crate) async fn connect_async(
|
||||
callback.as_ref()(socket.as_raw_fd());
|
||||
}
|
||||
|
||||
match socket.connect(sock_addr).await {
|
||||
let connect_res = if let Some(connect_timeout) = connect_timeout {
|
||||
match tokio::time::timeout(connect_timeout, socket.connect(sock_addr)).await {
|
||||
Ok(res) => res,
|
||||
Err(_elapsed) => {
|
||||
stream = Err(GatewayClientError::NetworkConnectionTimeout {
|
||||
address: endpoint.to_owned(),
|
||||
timeout: connect_timeout,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
socket.connect(sock_addr).await
|
||||
};
|
||||
|
||||
match connect_res {
|
||||
Ok(s) => {
|
||||
stream = Ok(s);
|
||||
break;
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
use nym_gateway_requests::registration::handshake::error::HandshakeError;
|
||||
use nym_gateway_requests::{GatewayRequestsError, SimpleGatewayRequestsError};
|
||||
use std::io;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
use tungstenite::Error as WsError;
|
||||
|
||||
@@ -46,6 +47,9 @@ pub enum GatewayClientError {
|
||||
source: Box<WsError>,
|
||||
},
|
||||
|
||||
#[error("timeout when establishing connection: {address}, timeout: {timeout:?}")]
|
||||
NetworkConnectionTimeout { address: String, timeout: Duration },
|
||||
|
||||
#[error("no socket address for endpoint: {address}")]
|
||||
NoEndpointForConnection { address: String },
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ use nym_topology::{EpochRewardedSet, NymTopology, RoutingNode};
|
||||
use nym_validator_client::client::IdentityKey;
|
||||
use nym_validator_client::{nym_api::NymApiClientExt, UserAgent};
|
||||
use rand::thread_rng;
|
||||
use std::time::Duration;
|
||||
use url::Url;
|
||||
use wasm_bindgen::prelude::wasm_bindgen;
|
||||
use wasm_bindgen_futures::future_to_promise;
|
||||
@@ -127,6 +128,7 @@ pub async fn setup_gateway_wasm(
|
||||
force_tls: bool,
|
||||
chosen_gateway: Option<IdentityKey>,
|
||||
gateways: Vec<RoutingNode>,
|
||||
connect_timeout: Option<Duration>,
|
||||
) -> Result<InitialisationResult, WasmCoreError> {
|
||||
// TODO: so much optimization and extra features could be added here, but that's for the future
|
||||
|
||||
@@ -144,6 +146,7 @@ pub async fn setup_gateway_wasm(
|
||||
GatewaySetup::New {
|
||||
specification: selection_spec,
|
||||
available_gateways: gateways,
|
||||
connect_timeout,
|
||||
}
|
||||
};
|
||||
|
||||
@@ -159,6 +162,7 @@ pub async fn setup_gateway_from_api(
|
||||
nym_apis: &[Url],
|
||||
minimum_performance: u8,
|
||||
ignore_epoch_roles: bool,
|
||||
connect_timeout: Option<Duration>,
|
||||
) -> Result<InitialisationResult, WasmCoreError> {
|
||||
let gateways = gateways_for_init(
|
||||
nym_apis,
|
||||
@@ -168,7 +172,14 @@ pub async fn setup_gateway_from_api(
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
setup_gateway_wasm(client_store, force_tls, chosen_gateway, gateways).await
|
||||
setup_gateway_wasm(
|
||||
client_store,
|
||||
force_tls,
|
||||
chosen_gateway,
|
||||
gateways,
|
||||
connect_timeout,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn current_gateways_wasm(
|
||||
@@ -192,9 +203,17 @@ pub async fn setup_from_topology(
|
||||
force_tls: bool,
|
||||
topology: &NymTopology,
|
||||
client_store: &ClientStorage,
|
||||
connect_timeout: Option<Duration>,
|
||||
) -> Result<InitialisationResult, WasmCoreError> {
|
||||
let gateways = topology.entry_capable_nodes().cloned().collect::<Vec<_>>();
|
||||
setup_gateway_wasm(client_store, force_tls, explicit_gateway, gateways).await
|
||||
setup_gateway_wasm(
|
||||
client_store,
|
||||
force_tls,
|
||||
explicit_gateway,
|
||||
gateways,
|
||||
connect_timeout,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn generate_new_client_keys(store: &ClientStorage) -> Result<(), WasmCoreError> {
|
||||
@@ -213,6 +232,7 @@ pub async fn add_gateway(
|
||||
min_performance: u8,
|
||||
ignore_epoch_roles: bool,
|
||||
storage: &ClientStorage,
|
||||
connect_timeout: Option<Duration>,
|
||||
) -> Result<(), WasmCoreError> {
|
||||
let selection_spec = GatewaySelectionSpecification::new(
|
||||
preferred_gateway.clone(),
|
||||
@@ -267,6 +287,7 @@ pub async fn add_gateway(
|
||||
let gateway_setup = GatewaySetup::New {
|
||||
specification: selection_spec,
|
||||
available_gateways,
|
||||
connect_timeout,
|
||||
};
|
||||
|
||||
let init_details = setup_gateway(gateway_setup, storage, storage).await?;
|
||||
|
||||
@@ -190,6 +190,7 @@ impl PacketSender {
|
||||
),
|
||||
#[cfg(unix)]
|
||||
None,
|
||||
None,
|
||||
fresh_gateway_client_data.shutdown_token.clone(),
|
||||
);
|
||||
|
||||
|
||||
@@ -38,6 +38,7 @@ pub struct BuilderConfig {
|
||||
pub cancel_token: CancellationToken,
|
||||
#[cfg(unix)]
|
||||
pub connection_fd_callback: Arc<dyn Fn(RawFd) + Send + Sync>,
|
||||
pub connect_timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Default, Debug, Eq, PartialEq)]
|
||||
@@ -71,6 +72,7 @@ impl BuilderConfig {
|
||||
network_env: NymNetworkDetails,
|
||||
cancel_token: CancellationToken,
|
||||
#[cfg(unix)] connection_fd_callback: Arc<dyn Fn(RawFd) + Send + Sync>,
|
||||
connect_timeout: Option<Duration>,
|
||||
) -> Self {
|
||||
Self {
|
||||
entry_node,
|
||||
@@ -84,6 +86,7 @@ impl BuilderConfig {
|
||||
cancel_token,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
connect_timeout,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -294,6 +297,7 @@ pub struct BuilderConfigBuilder {
|
||||
cancel_token: Option<CancellationToken>,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
connect_timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
impl BuilderConfigBuilder {
|
||||
@@ -358,6 +362,11 @@ impl BuilderConfigBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_connect_timeout(mut self, connect_timeout: Duration) -> Self {
|
||||
self.connect_timeout = Some(connect_timeout);
|
||||
self
|
||||
}
|
||||
|
||||
/// Builds the `BuilderConfig`.
|
||||
///
|
||||
/// Returns an error if any required field is missing.
|
||||
@@ -388,6 +397,7 @@ impl BuilderConfigBuilder {
|
||||
connection_fd_callback: self
|
||||
.connection_fd_callback
|
||||
.ok_or(BuilderConfigError::MissingConnectionFdCallback)?,
|
||||
connect_timeout: self.connect_timeout,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,6 +38,7 @@ use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
#[cfg(unix)]
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use url::Url;
|
||||
use zeroize::Zeroizing;
|
||||
|
||||
@@ -405,6 +406,9 @@ where
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: Option<Arc<dyn Fn(std::os::fd::RawFd) + Send + Sync>>,
|
||||
|
||||
/// Timeout for establishing a connection
|
||||
connect_timeout: Option<Duration>,
|
||||
|
||||
forget_me: ForgetMe,
|
||||
|
||||
remember_me: RememberMe,
|
||||
@@ -466,6 +470,7 @@ where
|
||||
user_agent: None,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: None,
|
||||
connect_timeout: None,
|
||||
forget_me,
|
||||
remember_me,
|
||||
derivation_material: None,
|
||||
@@ -589,6 +594,7 @@ where
|
||||
available_gateways,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: self.connection_fd_callback.clone(),
|
||||
connect_timeout: self.connect_timeout,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -758,6 +764,10 @@ where
|
||||
base_builder = base_builder.with_connection_fd_callback(connection_fd_callback);
|
||||
}
|
||||
|
||||
if let Some(connect_timeout) = self.connect_timeout {
|
||||
base_builder = base_builder.with_connect_timeout(connect_timeout);
|
||||
}
|
||||
|
||||
let started_client = base_builder.start_base().await?;
|
||||
self.state = BuilderState::Registered {};
|
||||
let nym_address = started_client.address;
|
||||
|
||||
@@ -222,6 +222,7 @@ impl NymClientBuilder {
|
||||
self.config.base.debug.topology.minimum_gateway_performance,
|
||||
self.config.base.debug.topology.ignore_ingress_epoch_role,
|
||||
&client_store,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -158,6 +158,7 @@ impl MixFetchClientBuilder {
|
||||
self.config.base.debug.topology.minimum_gateway_performance,
|
||||
self.config.base.debug.topology.ignore_ingress_epoch_role,
|
||||
&client_store,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -153,6 +153,7 @@ impl NymNodeTesterBuilder {
|
||||
false,
|
||||
&self.base_topology,
|
||||
client_store,
|
||||
None,
|
||||
)
|
||||
.await?)
|
||||
}
|
||||
@@ -211,6 +212,7 @@ impl NymNodeTesterBuilder {
|
||||
packet_router,
|
||||
self.bandwidth_controller.take(),
|
||||
ClientStatsSender::new(None, stats_sender_task),
|
||||
None,
|
||||
gateway_task,
|
||||
)
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user