Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b83fdd34af | |||
| 356cd2eeac | |||
| 0f6ec8610e | |||
| c3b8c4b2f7 | |||
| 271b9e545c | |||
| 9641f01670 | |||
| a7bb3e8d91 |
@@ -100,7 +100,6 @@ jobs:
|
||||
cp target/release/nymvisor $OUTPUT_DIR
|
||||
cp target/release/nym-node $OUTPUT_DIR
|
||||
cp target/release/nym-cli $OUTPUT_DIR
|
||||
cp target/release/explorer-api $OUTPUT_DIR
|
||||
if [ ${{ github.event_name == 'workflow_dispatch' && inputs.enable_deb == true }} = true ]; then
|
||||
cp target/debian/*.deb $OUTPUT_DIR
|
||||
fi
|
||||
|
||||
Generated
+8
-7
@@ -4786,7 +4786,7 @@ checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
|
||||
|
||||
[[package]]
|
||||
name = "nym-api"
|
||||
version = "1.1.53"
|
||||
version = "1.1.54"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@@ -5036,7 +5036,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-cli"
|
||||
version = "1.1.50"
|
||||
version = "1.1.51"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"base64 0.22.1",
|
||||
@@ -5119,7 +5119,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-client"
|
||||
version = "1.1.50"
|
||||
version = "1.1.51"
|
||||
dependencies = [
|
||||
"bs58",
|
||||
"clap",
|
||||
@@ -6163,7 +6163,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-network-requester"
|
||||
version = "1.1.51"
|
||||
version = "1.1.52"
|
||||
dependencies = [
|
||||
"addr",
|
||||
"anyhow",
|
||||
@@ -6214,7 +6214,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-node"
|
||||
version = "1.6.2"
|
||||
version = "1.7.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
@@ -6241,6 +6241,7 @@ dependencies = [
|
||||
"nym-crypto",
|
||||
"nym-gateway",
|
||||
"nym-gateway-stats-storage",
|
||||
"nym-http-api-client",
|
||||
"nym-http-api-common",
|
||||
"nym-ip-packet-router",
|
||||
"nym-metrics",
|
||||
@@ -6600,7 +6601,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-socks5-client"
|
||||
version = "1.1.50"
|
||||
version = "1.1.51"
|
||||
dependencies = [
|
||||
"bs58",
|
||||
"clap",
|
||||
@@ -7205,7 +7206,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nymvisor"
|
||||
version = "0.1.15"
|
||||
version = "0.1.16"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nym-client"
|
||||
version = "1.1.50"
|
||||
version = "1.1.51"
|
||||
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
|
||||
description = "Implementation of the Nym Client"
|
||||
edition = "2021"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nym-socks5-client"
|
||||
version = "1.1.50"
|
||||
version = "1.1.51"
|
||||
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
|
||||
description = "A SOCKS5 localhost proxy that converts incoming messages to Sphinx and sends them to a Nym address"
|
||||
edition = "2021"
|
||||
|
||||
@@ -139,6 +139,8 @@ where
|
||||
let gateway_setup = GatewaySetup::New {
|
||||
specification: selection_spec,
|
||||
available_gateways,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: None,
|
||||
};
|
||||
|
||||
let init_details =
|
||||
|
||||
@@ -187,6 +187,8 @@ where
|
||||
let gateway_setup = GatewaySetup::New {
|
||||
specification: selection_spec,
|
||||
available_gateways,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: None,
|
||||
};
|
||||
|
||||
let init_details =
|
||||
|
||||
@@ -11,6 +11,8 @@ use nym_topology::node::RoutingNode;
|
||||
use nym_validator_client::client::IdentityKeyRef;
|
||||
use nym_validator_client::UserAgent;
|
||||
use rand::{seq::SliceRandom, Rng};
|
||||
#[cfg(unix)]
|
||||
use std::os::fd::RawFd;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tungstenite::Message;
|
||||
use url::Url;
|
||||
@@ -313,9 +315,15 @@ pub(super) async fn register_with_gateway(
|
||||
gateway_id: identity::PublicKey,
|
||||
gateway_listener: Url,
|
||||
our_identity: Arc<identity::KeyPair>,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
) -> Result<RegistrationResult, ClientCoreError> {
|
||||
let mut gateway_client =
|
||||
GatewayClient::new_init(gateway_listener, gateway_id, our_identity.clone());
|
||||
let mut gateway_client = GatewayClient::new_init(
|
||||
gateway_listener,
|
||||
gateway_id,
|
||||
our_identity.clone(),
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
);
|
||||
|
||||
gateway_client.establish_connection().await.map_err(|err| {
|
||||
log::warn!("Failed to establish connection with gateway!");
|
||||
|
||||
@@ -23,6 +23,8 @@ use nym_topology::node::RoutingNode;
|
||||
use rand::rngs::OsRng;
|
||||
use rand::{CryptoRng, RngCore};
|
||||
use serde::Serialize;
|
||||
#[cfg(unix)]
|
||||
use std::{os::fd::RawFd, sync::Arc};
|
||||
|
||||
pub mod helpers;
|
||||
pub mod types;
|
||||
@@ -53,6 +55,7 @@ async fn setup_new_gateway<K, D>(
|
||||
details_store: &D,
|
||||
selection_specification: GatewaySelectionSpecification,
|
||||
available_gateways: Vec<RoutingNode>,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
) -> Result<InitialisationResult, ClientCoreError>
|
||||
where
|
||||
K: KeyStore,
|
||||
@@ -108,9 +111,14 @@ where
|
||||
// if we're using a 'normal' gateway setup, do register
|
||||
let our_identity = client_keys.identity_keypair();
|
||||
|
||||
let registration =
|
||||
helpers::register_with_gateway(gateway_id, gateway_listener.clone(), our_identity)
|
||||
.await?;
|
||||
let registration = helpers::register_with_gateway(
|
||||
gateway_id,
|
||||
gateway_listener.clone(),
|
||||
our_identity,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
)
|
||||
.await?;
|
||||
(
|
||||
GatewayDetails::new_remote(
|
||||
gateway_id,
|
||||
@@ -203,9 +211,19 @@ where
|
||||
GatewaySetup::New {
|
||||
specification,
|
||||
available_gateways,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
} => {
|
||||
log::debug!("GatewaySetup::New with spec: {specification:?}");
|
||||
setup_new_gateway(key_store, details_store, specification, available_gateways).await
|
||||
setup_new_gateway(
|
||||
key_store,
|
||||
details_store,
|
||||
specification,
|
||||
available_gateways,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
)
|
||||
.await
|
||||
}
|
||||
GatewaySetup::ReuseConnection {
|
||||
authenticated_ephemeral_client,
|
||||
|
||||
@@ -18,6 +18,8 @@ use nym_validator_client::client::IdentityKey;
|
||||
use nym_validator_client::nyxd::AccountId;
|
||||
use serde::Serialize;
|
||||
use std::fmt::{Debug, Display};
|
||||
#[cfg(unix)]
|
||||
use std::os::fd::RawFd;
|
||||
use std::sync::Arc;
|
||||
use time::OffsetDateTime;
|
||||
use url::Url;
|
||||
@@ -208,6 +210,10 @@ pub enum GatewaySetup {
|
||||
|
||||
// TODO: seems to be a bit inefficient to pass them by value
|
||||
available_gateways: Vec<RoutingNode>,
|
||||
|
||||
/// Callback useful for allowing initial connection to gateway
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
},
|
||||
|
||||
ReuseConnection {
|
||||
@@ -231,6 +237,8 @@ impl Debug for GatewaySetup {
|
||||
GatewaySetup::New {
|
||||
specification,
|
||||
available_gateways,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: _,
|
||||
} => f
|
||||
.debug_struct("GatewaySetup::New")
|
||||
.field("specification", specification)
|
||||
@@ -270,6 +278,8 @@ impl GatewaySetup {
|
||||
additional_data: None,
|
||||
},
|
||||
available_gateways: vec![],
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1065,6 +1065,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
|
||||
gateway_listener: Url,
|
||||
gateway_identity: identity::PublicKey,
|
||||
local_identity: Arc<identity::KeyPair>,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
) -> Self {
|
||||
log::trace!("Initialising gateway client");
|
||||
use futures::channel::mpsc;
|
||||
@@ -1090,7 +1091,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
|
||||
stats_reporter: ClientStatsSender::new(None, task_client.clone()),
|
||||
negotiated_protocol: None,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: None,
|
||||
connection_fd_callback,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,7 +38,8 @@ pub(crate) async fn connect_async(
|
||||
// Do a DNS lookup for the domain using our custom DNS resolver
|
||||
resolver
|
||||
.resolve_str(domain)
|
||||
.await?
|
||||
.await
|
||||
.inspect_err(|err| tracing::error!("Resolve error {err}"))?
|
||||
.into_iter()
|
||||
.map(|a| SocketAddr::new(a, port))
|
||||
.collect()
|
||||
@@ -49,20 +50,27 @@ pub(crate) async fn connect_async(
|
||||
address: endpoint.to_owned(),
|
||||
});
|
||||
for sock_addr in sock_addrs {
|
||||
tracing::info!("Trying with {sock_addr}");
|
||||
let socket = if sock_addr.is_ipv4() {
|
||||
TcpSocket::new_v4()
|
||||
} else {
|
||||
TcpSocket::new_v6()
|
||||
}
|
||||
.map_err(|err| GatewayClientError::NetworkConnectionFailed {
|
||||
address: endpoint.to_owned(),
|
||||
source: err.into(),
|
||||
.map_err(|err| {
|
||||
tracing::error!("Couldn't create the socket");
|
||||
GatewayClientError::NetworkConnectionFailed {
|
||||
address: endpoint.to_owned(),
|
||||
source: err.into(),
|
||||
}
|
||||
})?;
|
||||
|
||||
tracing::info!("Preparing to call callback");
|
||||
#[cfg(unix)]
|
||||
if let Some(callback) = connection_fd_callback.as_ref() {
|
||||
tracing::info!("Calling callback");
|
||||
callback.as_ref()(socket.as_raw_fd());
|
||||
}
|
||||
tracing::info!("Preparing to connect");
|
||||
|
||||
match socket.connect(sock_addr).await {
|
||||
Ok(s) => {
|
||||
|
||||
@@ -19,7 +19,6 @@ pub(crate) struct Config {
|
||||
pub(crate) bandwidth: BandwidthFlushingBehaviourConfig,
|
||||
}
|
||||
|
||||
// I can see this being possible expanded with say storage or client store
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct CommonHandlerState {
|
||||
pub(crate) cfg: Config,
|
||||
|
||||
@@ -131,9 +131,6 @@ impl<R, S> FreshHandler<R, S> {
|
||||
|
||||
// for time being we assume handle is always constructed from raw socket.
|
||||
// if we decide we want to change it, that's not too difficult
|
||||
// also at this point I'm not entirely sure how to deal with this warning without
|
||||
// some considerable refactoring
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn new(
|
||||
rng: R,
|
||||
conn: S,
|
||||
|
||||
@@ -6,9 +6,8 @@ use crate::node::client_handling::websocket::connection_handler::FreshHandler;
|
||||
use nym_task::TaskClient;
|
||||
use rand::rngs::OsRng;
|
||||
use std::net::SocketAddr;
|
||||
use std::process;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::{io, process};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::*;
|
||||
|
||||
@@ -34,6 +33,76 @@ impl Listener {
|
||||
}
|
||||
}
|
||||
|
||||
fn active_connections(&self) -> usize {
|
||||
self.shared_state
|
||||
.metrics
|
||||
.network
|
||||
.active_ingress_websocket_connections_count()
|
||||
}
|
||||
|
||||
fn prepare_connection_handler(
|
||||
&self,
|
||||
socket: TcpStream,
|
||||
remote_address: SocketAddr,
|
||||
) -> FreshHandler<OsRng, TcpStream> {
|
||||
let shutdown = self
|
||||
.shutdown
|
||||
.fork(format!("websocket_handler_{remote_address}"));
|
||||
FreshHandler::new(
|
||||
OsRng,
|
||||
socket,
|
||||
self.shared_state.clone(),
|
||||
remote_address,
|
||||
shutdown,
|
||||
)
|
||||
}
|
||||
|
||||
fn try_handle_accepted_connection(&self, accepted: io::Result<(TcpStream, SocketAddr)>) {
|
||||
match accepted {
|
||||
Ok((socket, remote_address)) => {
|
||||
trace!("received a socket connection from {remote_address}");
|
||||
|
||||
let active = self.active_connections();
|
||||
|
||||
// 1. check if we're within the connection limit
|
||||
if active >= self.maximum_open_connections {
|
||||
warn!(
|
||||
"connection limit exceeded ({}). can't accept request from {remote_address}",
|
||||
self.maximum_open_connections
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
debug!("there are currently {active} connected clients on the gateway websocket");
|
||||
|
||||
// 2. prepare shared data for the new connection handler
|
||||
let handle = self.prepare_connection_handler(socket, remote_address);
|
||||
|
||||
// 3. increment the connection counter.
|
||||
// make sure to do it before spawning the task,
|
||||
// as another connection might get accepted before the task is scheduled
|
||||
// for execution
|
||||
self.shared_state
|
||||
.metrics
|
||||
.network
|
||||
.new_ingress_websocket_client();
|
||||
|
||||
// 4. spawn the task handling the client connection
|
||||
tokio::spawn(async move {
|
||||
// TODO: refactor it similarly to the mixnet listener on the nym-node
|
||||
let metrics_ref = handle.shared_state.metrics.clone();
|
||||
|
||||
// 4.1. handle all client requests until connection gets terminated
|
||||
handle.start_handling().await;
|
||||
|
||||
// 4.2. decrement the connection counter
|
||||
metrics_ref.network.disconnected_ingress_websocket_client();
|
||||
});
|
||||
}
|
||||
Err(err) => warn!("failed to accept client connection: {err}"),
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: change the signature to pub(crate) async fn run(&self, handler: Handler)
|
||||
|
||||
pub(crate) async fn run(&mut self) {
|
||||
@@ -46,8 +115,6 @@ impl Listener {
|
||||
}
|
||||
};
|
||||
|
||||
let open_connections = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
while !self.shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
@@ -55,38 +122,7 @@ impl Listener {
|
||||
trace!("client_handling::Listener: received shutdown");
|
||||
}
|
||||
connection = tcp_listener.accept() => {
|
||||
match connection {
|
||||
Ok((socket, remote_addr)) => {
|
||||
let shutdown = self.shutdown.fork(format!("websocket_handler_{remote_addr}"));
|
||||
trace!("received a socket connection from {remote_addr}");
|
||||
|
||||
if open_connections.fetch_add(1, Ordering::SeqCst) >= self.maximum_open_connections {
|
||||
warn!("connection limit exceeded ({}). can't accept request from {remote_addr}", self.maximum_open_connections);
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: I think we *REALLY* need a mechanism for having a maximum number of connected
|
||||
// clients or spawned tokio tasks -> perhaps a worker system?
|
||||
let handle = FreshHandler::new(
|
||||
OsRng,
|
||||
socket,
|
||||
self.shared_state.clone(),
|
||||
remote_addr,
|
||||
shutdown,
|
||||
);
|
||||
let open_connections = open_connections.clone();
|
||||
tokio::spawn(async move {
|
||||
// TODO: refactor it similarly to the mixnet listener on the nym-node
|
||||
let metrics_ref = handle.shared_state.metrics.clone();
|
||||
metrics_ref.network.new_ingress_websocket_client();
|
||||
open_connections.fetch_add(1, Ordering::SeqCst);
|
||||
handle.start_handling().await;
|
||||
metrics_ref.network.disconnected_ingress_websocket_client();
|
||||
open_connections.fetch_sub(1, Ordering::SeqCst);
|
||||
});
|
||||
}
|
||||
Err(err) => warn!("failed to get client: {err}"),
|
||||
}
|
||||
self.try_handle_accepted_connection(connection)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
+1
-1
@@ -4,7 +4,7 @@
|
||||
[package]
|
||||
name = "nym-api"
|
||||
license = "GPL-3.0"
|
||||
version = "1.1.53"
|
||||
version = "1.1.54"
|
||||
authors.workspace = true
|
||||
edition = "2021"
|
||||
rust-version.workspace = true
|
||||
|
||||
+1
-1
@@ -3,7 +3,7 @@
|
||||
|
||||
[package]
|
||||
name = "nym-node"
|
||||
version = "1.6.2"
|
||||
version = "1.7.0"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
|
||||
@@ -45,7 +45,7 @@ impl NetworkStats {
|
||||
|
||||
pub fn active_ingress_websocket_connections_count(&self) -> usize {
|
||||
self.active_ingress_websocket_connections
|
||||
.load(Ordering::Relaxed)
|
||||
.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
pub fn active_egress_mixnet_connections_counter(&self) -> Arc<AtomicUsize> {
|
||||
|
||||
@@ -543,6 +543,8 @@ where
|
||||
Ok(GatewaySetup::New {
|
||||
specification: selection_spec,
|
||||
available_gateways,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: self.connection_fd_callback.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
[package]
|
||||
name = "nym-network-requester"
|
||||
license = "GPL-3.0"
|
||||
version = "1.1.51"
|
||||
version = "1.1.52"
|
||||
authors.workspace = true
|
||||
edition.workspace = true
|
||||
rust-version = "1.70"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nym-cli"
|
||||
version = "1.1.50"
|
||||
version = "1.1.51"
|
||||
authors.workspace = true
|
||||
edition = "2021"
|
||||
license.workspace = true
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nymvisor"
|
||||
version = "0.1.15"
|
||||
version = "0.1.16"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
|
||||
Reference in New Issue
Block a user