Compare commits

...

7 Commits

Author SHA1 Message Date
Bogdan-Ștefan Neacşu b83fdd34af Add fd callback for initial authentication 2025-03-21 15:07:54 +02:00
Bogdan-Ștefan Neacşu 356cd2eeac WIP 2025-03-21 13:53:37 +02:00
Jędrzej Stuczyński 0f6ec8610e hotfix: correctly increment ws connection counter (#5620) 2025-03-14 15:47:17 +00:00
benedetta davico c3b8c4b2f7 Merge pull request #5616 from nymtech/bd/remove-explorer-api-ci
Remove explorer-api from ci-build-binaries
2025-03-13 13:36:30 +01:00
benedettadavico 271b9e545c remove bump to explorer-api 2025-03-13 13:35:06 +01:00
benedetta davico 9641f01670 remove explorer-api from ci-build-binaries 2025-03-13 13:31:46 +01:00
benedettadavico a7bb3e8d91 bump versions for chokito 2025-03-13 13:19:37 +01:00
21 changed files with 151 additions and 68 deletions
@@ -100,7 +100,6 @@ jobs:
cp target/release/nymvisor $OUTPUT_DIR
cp target/release/nym-node $OUTPUT_DIR
cp target/release/nym-cli $OUTPUT_DIR
cp target/release/explorer-api $OUTPUT_DIR
if [ ${{ github.event_name == 'workflow_dispatch' && inputs.enable_deb == true }} = true ]; then
cp target/debian/*.deb $OUTPUT_DIR
fi
Generated
+8 -7
View File
@@ -4786,7 +4786,7 @@ checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]]
name = "nym-api"
version = "1.1.53"
version = "1.1.54"
dependencies = [
"anyhow",
"async-trait",
@@ -5036,7 +5036,7 @@ dependencies = [
[[package]]
name = "nym-cli"
version = "1.1.50"
version = "1.1.51"
dependencies = [
"anyhow",
"base64 0.22.1",
@@ -5119,7 +5119,7 @@ dependencies = [
[[package]]
name = "nym-client"
version = "1.1.50"
version = "1.1.51"
dependencies = [
"bs58",
"clap",
@@ -6163,7 +6163,7 @@ dependencies = [
[[package]]
name = "nym-network-requester"
version = "1.1.51"
version = "1.1.52"
dependencies = [
"addr",
"anyhow",
@@ -6214,7 +6214,7 @@ dependencies = [
[[package]]
name = "nym-node"
version = "1.6.2"
version = "1.7.0"
dependencies = [
"anyhow",
"arc-swap",
@@ -6241,6 +6241,7 @@ dependencies = [
"nym-crypto",
"nym-gateway",
"nym-gateway-stats-storage",
"nym-http-api-client",
"nym-http-api-common",
"nym-ip-packet-router",
"nym-metrics",
@@ -6600,7 +6601,7 @@ dependencies = [
[[package]]
name = "nym-socks5-client"
version = "1.1.50"
version = "1.1.51"
dependencies = [
"bs58",
"clap",
@@ -7205,7 +7206,7 @@ dependencies = [
[[package]]
name = "nymvisor"
version = "0.1.15"
version = "0.1.16"
dependencies = [
"anyhow",
"bytes",
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-client"
version = "1.1.50"
version = "1.1.51"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
description = "Implementation of the Nym Client"
edition = "2021"
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-socks5-client"
version = "1.1.50"
version = "1.1.51"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
description = "A SOCKS5 localhost proxy that converts incoming messages to Sphinx and sends them to a Nym address"
edition = "2021"
@@ -139,6 +139,8 @@ where
let gateway_setup = GatewaySetup::New {
specification: selection_spec,
available_gateways,
#[cfg(unix)]
connection_fd_callback: None,
};
let init_details =
@@ -187,6 +187,8 @@ where
let gateway_setup = GatewaySetup::New {
specification: selection_spec,
available_gateways,
#[cfg(unix)]
connection_fd_callback: None,
};
let init_details =
+10 -2
View File
@@ -11,6 +11,8 @@ use nym_topology::node::RoutingNode;
use nym_validator_client::client::IdentityKeyRef;
use nym_validator_client::UserAgent;
use rand::{seq::SliceRandom, Rng};
#[cfg(unix)]
use std::os::fd::RawFd;
use std::{sync::Arc, time::Duration};
use tungstenite::Message;
use url::Url;
@@ -313,9 +315,15 @@ pub(super) async fn register_with_gateway(
gateway_id: identity::PublicKey,
gateway_listener: Url,
our_identity: Arc<identity::KeyPair>,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
) -> Result<RegistrationResult, ClientCoreError> {
let mut gateway_client =
GatewayClient::new_init(gateway_listener, gateway_id, our_identity.clone());
let mut gateway_client = GatewayClient::new_init(
gateway_listener,
gateway_id,
our_identity.clone(),
#[cfg(unix)]
connection_fd_callback,
);
gateway_client.establish_connection().await.map_err(|err| {
log::warn!("Failed to establish connection with gateway!");
+22 -4
View File
@@ -23,6 +23,8 @@ use nym_topology::node::RoutingNode;
use rand::rngs::OsRng;
use rand::{CryptoRng, RngCore};
use serde::Serialize;
#[cfg(unix)]
use std::{os::fd::RawFd, sync::Arc};
pub mod helpers;
pub mod types;
@@ -53,6 +55,7 @@ async fn setup_new_gateway<K, D>(
details_store: &D,
selection_specification: GatewaySelectionSpecification,
available_gateways: Vec<RoutingNode>,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
) -> Result<InitialisationResult, ClientCoreError>
where
K: KeyStore,
@@ -108,9 +111,14 @@ where
// if we're using a 'normal' gateway setup, do register
let our_identity = client_keys.identity_keypair();
let registration =
helpers::register_with_gateway(gateway_id, gateway_listener.clone(), our_identity)
.await?;
let registration = helpers::register_with_gateway(
gateway_id,
gateway_listener.clone(),
our_identity,
#[cfg(unix)]
connection_fd_callback,
)
.await?;
(
GatewayDetails::new_remote(
gateway_id,
@@ -203,9 +211,19 @@ where
GatewaySetup::New {
specification,
available_gateways,
#[cfg(unix)]
connection_fd_callback,
} => {
log::debug!("GatewaySetup::New with spec: {specification:?}");
setup_new_gateway(key_store, details_store, specification, available_gateways).await
setup_new_gateway(
key_store,
details_store,
specification,
available_gateways,
#[cfg(unix)]
connection_fd_callback,
)
.await
}
GatewaySetup::ReuseConnection {
authenticated_ephemeral_client,
+10
View File
@@ -18,6 +18,8 @@ use nym_validator_client::client::IdentityKey;
use nym_validator_client::nyxd::AccountId;
use serde::Serialize;
use std::fmt::{Debug, Display};
#[cfg(unix)]
use std::os::fd::RawFd;
use std::sync::Arc;
use time::OffsetDateTime;
use url::Url;
@@ -208,6 +210,10 @@ pub enum GatewaySetup {
// TODO: seems to be a bit inefficient to pass them by value
available_gateways: Vec<RoutingNode>,
/// Callback useful for allowing initial connection to gateway
#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
},
ReuseConnection {
@@ -231,6 +237,8 @@ impl Debug for GatewaySetup {
GatewaySetup::New {
specification,
available_gateways,
#[cfg(unix)]
connection_fd_callback: _,
} => f
.debug_struct("GatewaySetup::New")
.field("specification", specification)
@@ -270,6 +278,8 @@ impl GatewaySetup {
additional_data: None,
},
available_gateways: vec![],
#[cfg(unix)]
connection_fd_callback: None,
}
}
@@ -1065,6 +1065,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
gateway_listener: Url,
gateway_identity: identity::PublicKey,
local_identity: Arc<identity::KeyPair>,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
) -> Self {
log::trace!("Initialising gateway client");
use futures::channel::mpsc;
@@ -1090,7 +1091,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
stats_reporter: ClientStatsSender::new(None, task_client.clone()),
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback: None,
connection_fd_callback,
task_client,
}
}
@@ -38,7 +38,8 @@ pub(crate) async fn connect_async(
// Do a DNS lookup for the domain using our custom DNS resolver
resolver
.resolve_str(domain)
.await?
.await
.inspect_err(|err| tracing::error!("Resolve error {err}"))?
.into_iter()
.map(|a| SocketAddr::new(a, port))
.collect()
@@ -49,20 +50,27 @@ pub(crate) async fn connect_async(
address: endpoint.to_owned(),
});
for sock_addr in sock_addrs {
tracing::info!("Trying with {sock_addr}");
let socket = if sock_addr.is_ipv4() {
TcpSocket::new_v4()
} else {
TcpSocket::new_v6()
}
.map_err(|err| GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
source: err.into(),
.map_err(|err| {
tracing::error!("Couldn't create the socket");
GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
source: err.into(),
}
})?;
tracing::info!("Preparing to call callback");
#[cfg(unix)]
if let Some(callback) = connection_fd_callback.as_ref() {
tracing::info!("Calling callback");
callback.as_ref()(socket.as_raw_fd());
}
tracing::info!("Preparing to connect");
match socket.connect(sock_addr).await {
Ok(s) => {
@@ -19,7 +19,6 @@ pub(crate) struct Config {
pub(crate) bandwidth: BandwidthFlushingBehaviourConfig,
}
// I can see this being possible expanded with say storage or client store
#[derive(Clone)]
pub(crate) struct CommonHandlerState {
pub(crate) cfg: Config,
@@ -131,9 +131,6 @@ impl<R, S> FreshHandler<R, S> {
// for time being we assume handle is always constructed from raw socket.
// if we decide we want to change it, that's not too difficult
// also at this point I'm not entirely sure how to deal with this warning without
// some considerable refactoring
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
rng: R,
conn: S,
@@ -6,9 +6,8 @@ use crate::node::client_handling::websocket::connection_handler::FreshHandler;
use nym_task::TaskClient;
use rand::rngs::OsRng;
use std::net::SocketAddr;
use std::process;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::{io, process};
use tokio::net::TcpStream;
use tokio::task::JoinHandle;
use tracing::*;
@@ -34,6 +33,76 @@ impl Listener {
}
}
fn active_connections(&self) -> usize {
self.shared_state
.metrics
.network
.active_ingress_websocket_connections_count()
}
fn prepare_connection_handler(
&self,
socket: TcpStream,
remote_address: SocketAddr,
) -> FreshHandler<OsRng, TcpStream> {
let shutdown = self
.shutdown
.fork(format!("websocket_handler_{remote_address}"));
FreshHandler::new(
OsRng,
socket,
self.shared_state.clone(),
remote_address,
shutdown,
)
}
fn try_handle_accepted_connection(&self, accepted: io::Result<(TcpStream, SocketAddr)>) {
match accepted {
Ok((socket, remote_address)) => {
trace!("received a socket connection from {remote_address}");
let active = self.active_connections();
// 1. check if we're within the connection limit
if active >= self.maximum_open_connections {
warn!(
"connection limit exceeded ({}). can't accept request from {remote_address}",
self.maximum_open_connections
);
return;
}
debug!("there are currently {active} connected clients on the gateway websocket");
// 2. prepare shared data for the new connection handler
let handle = self.prepare_connection_handler(socket, remote_address);
// 3. increment the connection counter.
// make sure to do it before spawning the task,
// as another connection might get accepted before the task is scheduled
// for execution
self.shared_state
.metrics
.network
.new_ingress_websocket_client();
// 4. spawn the task handling the client connection
tokio::spawn(async move {
// TODO: refactor it similarly to the mixnet listener on the nym-node
let metrics_ref = handle.shared_state.metrics.clone();
// 4.1. handle all client requests until connection gets terminated
handle.start_handling().await;
// 4.2. decrement the connection counter
metrics_ref.network.disconnected_ingress_websocket_client();
});
}
Err(err) => warn!("failed to accept client connection: {err}"),
}
}
// TODO: change the signature to pub(crate) async fn run(&self, handler: Handler)
pub(crate) async fn run(&mut self) {
@@ -46,8 +115,6 @@ impl Listener {
}
};
let open_connections = Arc::new(AtomicUsize::new(0));
while !self.shutdown.is_shutdown() {
tokio::select! {
biased;
@@ -55,38 +122,7 @@ impl Listener {
trace!("client_handling::Listener: received shutdown");
}
connection = tcp_listener.accept() => {
match connection {
Ok((socket, remote_addr)) => {
let shutdown = self.shutdown.fork(format!("websocket_handler_{remote_addr}"));
trace!("received a socket connection from {remote_addr}");
if open_connections.fetch_add(1, Ordering::SeqCst) >= self.maximum_open_connections {
warn!("connection limit exceeded ({}). can't accept request from {remote_addr}", self.maximum_open_connections);
continue;
}
// TODO: I think we *REALLY* need a mechanism for having a maximum number of connected
// clients or spawned tokio tasks -> perhaps a worker system?
let handle = FreshHandler::new(
OsRng,
socket,
self.shared_state.clone(),
remote_addr,
shutdown,
);
let open_connections = open_connections.clone();
tokio::spawn(async move {
// TODO: refactor it similarly to the mixnet listener on the nym-node
let metrics_ref = handle.shared_state.metrics.clone();
metrics_ref.network.new_ingress_websocket_client();
open_connections.fetch_add(1, Ordering::SeqCst);
handle.start_handling().await;
metrics_ref.network.disconnected_ingress_websocket_client();
open_connections.fetch_sub(1, Ordering::SeqCst);
});
}
Err(err) => warn!("failed to get client: {err}"),
}
self.try_handle_accepted_connection(connection)
}
}
+1 -1
View File
@@ -4,7 +4,7 @@
[package]
name = "nym-api"
license = "GPL-3.0"
version = "1.1.53"
version = "1.1.54"
authors.workspace = true
edition = "2021"
rust-version.workspace = true
+1 -1
View File
@@ -3,7 +3,7 @@
[package]
name = "nym-node"
version = "1.6.2"
version = "1.7.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
+1 -1
View File
@@ -45,7 +45,7 @@ impl NetworkStats {
pub fn active_ingress_websocket_connections_count(&self) -> usize {
self.active_ingress_websocket_connections
.load(Ordering::Relaxed)
.load(Ordering::SeqCst)
}
pub fn active_egress_mixnet_connections_counter(&self) -> Arc<AtomicUsize> {
+2
View File
@@ -543,6 +543,8 @@ where
Ok(GatewaySetup::New {
specification: selection_spec,
available_gateways,
#[cfg(unix)]
connection_fd_callback: self.connection_fd_callback.clone(),
})
}
@@ -4,7 +4,7 @@
[package]
name = "nym-network-requester"
license = "GPL-3.0"
version = "1.1.51"
version = "1.1.52"
authors.workspace = true
edition.workspace = true
rust-version = "1.70"
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-cli"
version = "1.1.50"
version = "1.1.51"
authors.workspace = true
edition = "2021"
license.workspace = true
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nymvisor"
version = "0.1.15"
version = "0.1.16"
authors.workspace = true
repository.workspace = true
homepage.workspace = true