Compare commits
2 Commits
rtt-analyzer
...
test_fd
| Author | SHA1 | Date | |
|---|---|---|---|
| b83fdd34af | |||
| 356cd2eeac |
@@ -139,6 +139,8 @@ where
|
||||
let gateway_setup = GatewaySetup::New {
|
||||
specification: selection_spec,
|
||||
available_gateways,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: None,
|
||||
};
|
||||
|
||||
let init_details =
|
||||
|
||||
@@ -187,6 +187,8 @@ where
|
||||
let gateway_setup = GatewaySetup::New {
|
||||
specification: selection_spec,
|
||||
available_gateways,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: None,
|
||||
};
|
||||
|
||||
let init_details =
|
||||
|
||||
@@ -11,6 +11,8 @@ use nym_topology::node::RoutingNode;
|
||||
use nym_validator_client::client::IdentityKeyRef;
|
||||
use nym_validator_client::UserAgent;
|
||||
use rand::{seq::SliceRandom, Rng};
|
||||
#[cfg(unix)]
|
||||
use std::os::fd::RawFd;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tungstenite::Message;
|
||||
use url::Url;
|
||||
@@ -313,9 +315,15 @@ pub(super) async fn register_with_gateway(
|
||||
gateway_id: identity::PublicKey,
|
||||
gateway_listener: Url,
|
||||
our_identity: Arc<identity::KeyPair>,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
) -> Result<RegistrationResult, ClientCoreError> {
|
||||
let mut gateway_client =
|
||||
GatewayClient::new_init(gateway_listener, gateway_id, our_identity.clone());
|
||||
let mut gateway_client = GatewayClient::new_init(
|
||||
gateway_listener,
|
||||
gateway_id,
|
||||
our_identity.clone(),
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
);
|
||||
|
||||
gateway_client.establish_connection().await.map_err(|err| {
|
||||
log::warn!("Failed to establish connection with gateway!");
|
||||
|
||||
@@ -23,6 +23,8 @@ use nym_topology::node::RoutingNode;
|
||||
use rand::rngs::OsRng;
|
||||
use rand::{CryptoRng, RngCore};
|
||||
use serde::Serialize;
|
||||
#[cfg(unix)]
|
||||
use std::{os::fd::RawFd, sync::Arc};
|
||||
|
||||
pub mod helpers;
|
||||
pub mod types;
|
||||
@@ -53,6 +55,7 @@ async fn setup_new_gateway<K, D>(
|
||||
details_store: &D,
|
||||
selection_specification: GatewaySelectionSpecification,
|
||||
available_gateways: Vec<RoutingNode>,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
) -> Result<InitialisationResult, ClientCoreError>
|
||||
where
|
||||
K: KeyStore,
|
||||
@@ -108,9 +111,14 @@ where
|
||||
// if we're using a 'normal' gateway setup, do register
|
||||
let our_identity = client_keys.identity_keypair();
|
||||
|
||||
let registration =
|
||||
helpers::register_with_gateway(gateway_id, gateway_listener.clone(), our_identity)
|
||||
.await?;
|
||||
let registration = helpers::register_with_gateway(
|
||||
gateway_id,
|
||||
gateway_listener.clone(),
|
||||
our_identity,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
)
|
||||
.await?;
|
||||
(
|
||||
GatewayDetails::new_remote(
|
||||
gateway_id,
|
||||
@@ -203,9 +211,19 @@ where
|
||||
GatewaySetup::New {
|
||||
specification,
|
||||
available_gateways,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
} => {
|
||||
log::debug!("GatewaySetup::New with spec: {specification:?}");
|
||||
setup_new_gateway(key_store, details_store, specification, available_gateways).await
|
||||
setup_new_gateway(
|
||||
key_store,
|
||||
details_store,
|
||||
specification,
|
||||
available_gateways,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
)
|
||||
.await
|
||||
}
|
||||
GatewaySetup::ReuseConnection {
|
||||
authenticated_ephemeral_client,
|
||||
|
||||
@@ -18,6 +18,8 @@ use nym_validator_client::client::IdentityKey;
|
||||
use nym_validator_client::nyxd::AccountId;
|
||||
use serde::Serialize;
|
||||
use std::fmt::{Debug, Display};
|
||||
#[cfg(unix)]
|
||||
use std::os::fd::RawFd;
|
||||
use std::sync::Arc;
|
||||
use time::OffsetDateTime;
|
||||
use url::Url;
|
||||
@@ -208,6 +210,10 @@ pub enum GatewaySetup {
|
||||
|
||||
// TODO: seems to be a bit inefficient to pass them by value
|
||||
available_gateways: Vec<RoutingNode>,
|
||||
|
||||
/// Callback useful for allowing initial connection to gateway
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
},
|
||||
|
||||
ReuseConnection {
|
||||
@@ -231,6 +237,8 @@ impl Debug for GatewaySetup {
|
||||
GatewaySetup::New {
|
||||
specification,
|
||||
available_gateways,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: _,
|
||||
} => f
|
||||
.debug_struct("GatewaySetup::New")
|
||||
.field("specification", specification)
|
||||
@@ -270,6 +278,8 @@ impl GatewaySetup {
|
||||
additional_data: None,
|
||||
},
|
||||
available_gateways: vec![],
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1065,6 +1065,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
|
||||
gateway_listener: Url,
|
||||
gateway_identity: identity::PublicKey,
|
||||
local_identity: Arc<identity::KeyPair>,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
) -> Self {
|
||||
log::trace!("Initialising gateway client");
|
||||
use futures::channel::mpsc;
|
||||
@@ -1090,7 +1091,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
|
||||
stats_reporter: ClientStatsSender::new(None, task_client.clone()),
|
||||
negotiated_protocol: None,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: None,
|
||||
connection_fd_callback,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,7 +38,8 @@ pub(crate) async fn connect_async(
|
||||
// Do a DNS lookup for the domain using our custom DNS resolver
|
||||
resolver
|
||||
.resolve_str(domain)
|
||||
.await?
|
||||
.await
|
||||
.inspect_err(|err| tracing::error!("Resolve error {err}"))?
|
||||
.into_iter()
|
||||
.map(|a| SocketAddr::new(a, port))
|
||||
.collect()
|
||||
@@ -49,20 +50,27 @@ pub(crate) async fn connect_async(
|
||||
address: endpoint.to_owned(),
|
||||
});
|
||||
for sock_addr in sock_addrs {
|
||||
tracing::info!("Trying with {sock_addr}");
|
||||
let socket = if sock_addr.is_ipv4() {
|
||||
TcpSocket::new_v4()
|
||||
} else {
|
||||
TcpSocket::new_v6()
|
||||
}
|
||||
.map_err(|err| GatewayClientError::NetworkConnectionFailed {
|
||||
address: endpoint.to_owned(),
|
||||
source: err.into(),
|
||||
.map_err(|err| {
|
||||
tracing::error!("Couldn't create the socket");
|
||||
GatewayClientError::NetworkConnectionFailed {
|
||||
address: endpoint.to_owned(),
|
||||
source: err.into(),
|
||||
}
|
||||
})?;
|
||||
|
||||
tracing::info!("Preparing to call callback");
|
||||
#[cfg(unix)]
|
||||
if let Some(callback) = connection_fd_callback.as_ref() {
|
||||
tracing::info!("Calling callback");
|
||||
callback.as_ref()(socket.as_raw_fd());
|
||||
}
|
||||
tracing::info!("Preparing to connect");
|
||||
|
||||
match socket.connect(sock_addr).await {
|
||||
Ok(s) => {
|
||||
|
||||
@@ -543,6 +543,8 @@ where
|
||||
Ok(GatewaySetup::New {
|
||||
specification: selection_spec,
|
||||
available_gateways,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: self.connection_fd_callback.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user