Compare commits

...

24 Commits

Author SHA1 Message Date
Bogdan-Ștefan Neacşu f4bd48263d Apply cargo fmt 2026-06-08 15:32:57 +03:00
rachyandco 5c40052d39 Bugfix: set connection fd callback on LP registration socket
The LP registration client opened its TCP connection to the gateway
  control port with a plain TcpStream::connect, without invoking the
  connection_fd_callback used by the websocket registration path. On
  Linux the callback sets SO_MARK (fwmark) so the daemon's own firewall
  allows the connection during the connecting state; without it the
  unmarked SYN matched no allow rule and was rejected locally with
  ECONNREFUSED. The daemon blamed the gateway, blacklisted it, and
  cycled through every gateway the same way, making it impossible to
  connect when LP registration is enabled.

  Plumb the existing connection_fd_callback from the builder config
  through RegistrationClientConfig into the LP client via an optional
  dialer, which creates the socket, applies the callback before
  connect(), and only then dials. The fd callback must run before
  connect so the SYN itself carries the mark.

  The exit gateway needs no dialer: its registration is forwarded
  through the entry gateway's nested session and never opens its own
  TCP connection.
2026-06-05 23:36:52 +02:00
Jędrzej Stuczyński 7b858dfd69 chore: LP registration adjustments (#6845)
* remove mixnet fallback for LP registration

* change LP registration timeouts and introduce exchange timeout

* remove fallback client construction and disable mixnet via LP registration
2026-06-02 16:28:57 +01:00
Jack Wampler a4bd547023 Handle Rate Limit Challenge Response (#6825)
rotate urls on HTTP response error indicating API rate limiting
2026-05-26 14:58:45 -06:00
Andy Duplain db03ec31b1 Merge pull request #6812 from nymtech/cherry-pick/nym-583-corrupt-db-windows
NYM-583: Avoid corrupted database on Windows.
2026-05-21 14:44:39 +01:00
Andy Duplain 9b285735b8 NYM-583: Avoid corrupted database on Windows.
NYM-583: Avoid corrupted database on Windows.
2026-05-21 14:23:20 +01:00
Simon Wicky 691280797a back to v8 on non sdk client (#6771) 2026-05-13 18:24:50 +02:00
benedettadavico f84de25302 update changelog 2026-05-06 07:16:42 +02:00
benedettadavico e554f1e0ad bump versions 2026-04-28 15:02:40 +02:00
benedetta davico 62a4a2ed70 Merge pull request #6710 from nymtech/bdq/versioning-fix 2026-04-28 10:01:52 +02:00
benedetta davico caad74c73d Merge pull request #6713 from nymtech/bdq/nym-binaries-ci
update CI runners
2026-04-27 15:39:44 +02:00
benedettadavico 917993d8fb clean 2026-04-27 12:17:31 +02:00
benedettadavico 1451db39e6 warn 2026-04-27 11:27:41 +02:00
benedettadavico f13a2a6c06 change to warn level 2026-04-27 10:45:42 +02:00
benedetta davico ce39fb6675 Update publish-nym-binaries.yml 2026-04-27 10:20:10 +02:00
benedettadavico 02a926b74a addressing comments 2026-04-27 10:10:08 +02:00
benedetta davico 54ba710ea0 Change CI platform from ubuntu-22.04 to arc-ubuntu-22.04 2026-04-27 09:33:57 +02:00
benedettadavico 2653d12e55 fix ipr msg, and unit tests 2026-04-24 16:07:49 +02:00
benedettadavico f94d6d51cf adding debugging traces 2026-04-24 14:11:19 +02:00
Andrej Mihajlov a0116f9aec Merge pull request #6708 from nymtech/am/lazy-init-dns
Only init SHARED_CLIENT if requested
2026-04-23 18:36:57 +02:00
Andrej Mihajlov 50433fe265 Only init SHARED_CLIENT if requested 2026-04-23 16:29:02 +02:00
benedettadavico 42aade29eb more v9 fixes 2026-04-23 13:28:17 +02:00
benedettadavico 9f26759b8d v9 bugfix 2026-04-23 13:28:17 +02:00
benedettadavico 9e642c6354 v9 bugfix 2026-04-23 13:28:16 +02:00
30 changed files with 430 additions and 140 deletions
@@ -36,7 +36,7 @@ jobs:
strategy:
fail-fast: false
matrix:
platform: [ubuntu-22.04]
platform: [arc-ubuntu-22.04]
runs-on: ${{ matrix.platform }}
env:
+1 -1
View File
@@ -21,7 +21,7 @@ jobs:
fail-fast: false
matrix:
include:
- os: ubuntu-22.04
- os: arc-ubuntu-22.04
target: x86_64-unknown-linux-gnu
runs-on: ${{ matrix.os }}
+16
View File
@@ -4,6 +4,22 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
## [Unreleased]
## [2026.9-venaco] (2026-05-06)
- Fix for v9 IPR ([#6710])
- Only init SHARED_CLIENT if requested ([#6708])
- Fixes to crates and CI ([#6686])
- Return ipv6 addresses as well ([#6684])
- Fix invalid ticket spend ([#6683])
- Block non-public IPR/NR checks ([#6670])
[#6710]: https://github.com/nymtech/nym/pull/6710
[#6708]: https://github.com/nymtech/nym/pull/6708
[#6686]: https://github.com/nymtech/nym/pull/6686
[#6684]: https://github.com/nymtech/nym/pull/6684
[#6683]: https://github.com/nymtech/nym/pull/6683
[#6670]: https://github.com/nymtech/nym/pull/6670
## [2026.8-urda] (2026-04-20)
- Include all gateways in the returned list ([#6649])
Generated
+9 -8
View File
@@ -5635,7 +5635,7 @@ dependencies = [
[[package]]
name = "nym-api"
version = "1.1.78"
version = "1.1.79"
dependencies = [
"anyhow",
"async-trait",
@@ -5880,7 +5880,7 @@ dependencies = [
[[package]]
name = "nym-cli"
version = "1.1.75"
version = "1.1.76"
dependencies = [
"anyhow",
"base64 0.22.1",
@@ -5963,7 +5963,7 @@ dependencies = [
[[package]]
name = "nym-client"
version = "1.1.75"
version = "1.1.76"
dependencies = [
"bs58",
"clap",
@@ -7028,6 +7028,7 @@ dependencies = [
"bytes",
"futures",
"nym-ip-packet-requests",
"nym-lp",
"nym-sdk",
"thiserror 2.0.12",
"tokio",
@@ -7372,7 +7373,7 @@ dependencies = [
[[package]]
name = "nym-network-requester"
version = "1.1.76"
version = "1.1.77"
dependencies = [
"addr",
"anyhow",
@@ -7422,7 +7423,7 @@ dependencies = [
[[package]]
name = "nym-node"
version = "1.30.0"
version = "1.31.0"
dependencies = [
"anyhow",
"arc-swap",
@@ -7974,7 +7975,7 @@ dependencies = [
[[package]]
name = "nym-socks5-client"
version = "1.1.75"
version = "1.1.76"
dependencies = [
"bs58",
"clap",
@@ -8242,7 +8243,7 @@ dependencies = [
"tempfile",
"tokio",
"tracing",
"tracing-subscriber",
"tracing-test",
"windows 0.61.3",
]
@@ -8772,7 +8773,7 @@ dependencies = [
[[package]]
name = "nymvisor"
version = "0.1.40"
version = "0.1.41"
dependencies = [
"anyhow",
"bytes",
+1 -1
View File
@@ -1,7 +1,7 @@
[package]
name = "nym-client"
description = "Implementation of the Nym Client"
version = "1.1.75"
version = "1.1.76"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
edition = "2021"
license.workspace = true
+1 -1
View File
@@ -1,7 +1,7 @@
[package]
name = "nym-socks5-client"
description = "A SOCKS5 localhost proxy that converts incoming messages to Sphinx and sends them to a Nym address"
version = "1.1.75"
version = "1.1.76"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
edition = "2021"
license.workspace = true
+10
View File
@@ -25,6 +25,8 @@ pub trait BandwidthTicketProvider: Send + Sync {
) -> Result<PreparedCredential, BandwidthControllerError>;
async fn get_upgrade_mode_token(&self) -> Result<Option<String>, BandwidthControllerError>;
async fn close(&self) {}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -56,6 +58,10 @@ where
.map_err(|_| BandwidthControllerError::MalformedUpgradeModeToken)?;
Ok(Some(token))
}
async fn close(&self) {
self.storage.close().await;
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -75,4 +81,8 @@ impl<T: BandwidthTicketProvider + ?Sized + Send> BandwidthTicketProvider for Box
async fn get_upgrade_mode_token(&self) -> Result<Option<String>, BandwidthControllerError> {
(**self).get_upgrade_mode_token().await
}
async fn close(&self) {
(**self).close().await;
}
}
@@ -1023,6 +1023,16 @@ where
let encryption_keys = init_res.client_keys.encryption_keypair();
let identity_keys = init_res.client_keys.identity_keypair();
let credential_store_for_close = credential_store.clone();
let close_credential_token = shutdown_tracker.clone_shutdown_token();
shutdown_tracker.try_spawn_named(
async move {
close_credential_token.cancelled().await;
credential_store_for_close.close().await;
},
"CredentialStorage::close_on_shutdown",
);
// the components are started in very specific order. Unless you know what you are doing,
// do not change that.
let bandwidth_controller = self
@@ -11,11 +11,17 @@ use nym_bandwidth_controller::BandwidthController;
use nym_client_core_gateways_storage::OnDiskGatewaysDetails;
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_validator_client::{QueryHttpRpcNyxdClient, nyxd};
use std::{io, path::Path};
use std::{io, path::Path, time::Duration};
use time::OffsetDateTime;
use tracing::{error, info, trace};
use url::Url;
/// Maximum rename retry attempts when the database file is temporarily locked.
const ARCHIVE_MAX_RETRY_ATTEMPTS: u8 = 15;
/// Delay between archive rename retry attempts.
const ARCHIVE_RETRY_DELAY: Duration = Duration::from_millis(200);
async fn setup_fresh_backend<P: AsRef<Path>>(
db_path: P,
surb_config: &config::ReplySurbs,
@@ -74,13 +80,58 @@ async fn archive_corrupted_database<P: AsRef<Path>>(db_path: P) -> io::Result<()
};
let renamed = db_path.with_extension(new_extension);
tokio::fs::rename(db_path, &renamed).await.inspect_err(|_| {
error!(
"Failed to rename corrupt database file: {} to {}",
db_path.display(),
renamed.display()
);
})
// On Windows, sqlx may release its OS file handles asynchronously after
// pool.close() returns, briefly keeping the file locked
// (ERROR_SHARING_VIOLATION, os error 32). Retry with a short delay to
// give the OS time to flush the remaining handles.
for attempt in 0..ARCHIVE_MAX_RETRY_ATTEMPTS {
match tokio::fs::rename(db_path, &renamed).await {
Ok(()) => return Ok(()),
Err(e) if is_file_locked_error(&e) && (attempt + 1) < ARCHIVE_MAX_RETRY_ATTEMPTS => {
trace!(
"Database file is temporarily locked, retrying archive \
(attempt {}/{}): {e}",
attempt + 1,
ARCHIVE_MAX_RETRY_ATTEMPTS
);
tokio::time::sleep(ARCHIVE_RETRY_DELAY).await;
}
Err(e) => {
error!(
"Failed to rename corrupt database file: {} to {}",
db_path.display(),
renamed.display()
);
return Err(e);
}
}
}
// Reached only when every attempt was blocked by a file lock.
error!(
"Failed to rename corrupt database file after {} attempts: {} to {}",
ARCHIVE_MAX_RETRY_ATTEMPTS,
db_path.display(),
renamed.display()
);
Err(io::Error::other(
"corrupt database archive blocked by persistent file lock",
))
}
/// Returns `true` when the IO error indicates a temporary file lock held by another handle
/// within the same process. Only meaningful on Windows; always `false` elsewhere.
fn is_file_locked_error(e: &io::Error) -> bool {
#[cfg(windows)]
{
// ERROR_SHARING_VIOLATION = 32, ERROR_LOCK_VIOLATION = 33
matches!(e.raw_os_error(), Some(32) | Some(33))
}
#[cfg(not(windows))]
{
let _ = e;
false
}
}
pub async fn setup_fs_reply_surb_backend<P: AsRef<Path>>(
@@ -337,6 +337,8 @@ impl ReplyStorageBackend for Backend {
}
async fn stop_storage_session(self) -> Result<(), Self::StorageError> {
self.stop_client_use().await
let result = self.stop_client_use().await;
self.shutdown().await;
result
}
}
+5 -3
View File
@@ -48,6 +48,7 @@ where
debug!("Started PersistentReplyStorage");
if let Err(err) = self.backend.start_storage_session().await {
error!("failed to start the storage session - {err}");
self.backend.stop_storage_session().await.ok();
return;
}
@@ -55,10 +56,11 @@ where
info!("PersistentReplyStorage is flushing all reply-related data to underlying storage");
if let Err(err) = self.backend.flush_surb_storage(&mem_state).await {
error!("failed to flush our reply-related data to the persistent storage: {err}")
} else {
info!("Data flush is complete")
error!("failed to flush our reply-related data to the persistent storage: {err}");
self.backend.stop_storage_session().await.ok();
return;
}
info!("Data flush is complete");
if let Err(err) = self.backend.stop_storage_session().await {
error!("failed to properly stop the storage session - {err}. We might not be able to smoothly restore it")
+54 -10
View File
@@ -1152,7 +1152,10 @@ impl ApiClientCore for Client {
#[cfg(target_arch = "wasm32")]
let response: Result<Response, HttpClientError> = {
let client = self.reqwest_client.as_ref().unwrap_or(&*SHARED_CLIENT);
let client = self
.reqwest_client
.as_ref()
.unwrap_or_else(|| &*SHARED_CLIENT);
Ok(
wasmtimer::tokio::timeout(self.request_timeout, client.execute(req))
.await
@@ -1162,12 +1165,24 @@ impl ApiClientCore for Client {
#[cfg(not(target_arch = "wasm32"))]
let response = {
let client = self.reqwest_client.as_ref().unwrap_or(&*SHARED_CLIENT);
let client = self
.reqwest_client
.as_ref()
.unwrap_or_else(|| &*SHARED_CLIENT);
client.execute(req).await
};
match response {
Ok(resp) => return Ok(resp),
Ok(resp) => {
// Check if the response includes a rate limit error from the vercel API
if is_http_rate_limit_err(&resp) {
warn!("encountered vercel rate limit error for {}", url.as_str());
// if we have multiple urls, update to the next
self.maybe_rotate_hosts(Some(url.clone()));
}
return Ok(resp);
}
Err(err) => {
#[cfg(target_arch = "wasm32")]
let is_network_err = err.is_timeout();
@@ -1220,17 +1235,39 @@ impl ApiClientCore for Client {
}
}
const VERCEL_CHALLENGE_HEADER: &str = "x-vercel-mitigated";
const VERCEL_CHALLENGE_VALUE: &[u8] = b"challenge";
/// Check for Rate Limit challenge response from the vercel API
pub(crate) fn is_http_rate_limit_err(resp: &Response) -> bool {
let status = resp.status() == StatusCode::FORBIDDEN;
let header = resp
.headers()
.get(VERCEL_CHALLENGE_HEADER)
.is_some_and(|v| v.as_bytes() == VERCEL_CHALLENGE_VALUE);
let content_type = resp
.headers()
.get(CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<Mime>().ok())
.is_some_and(|mime_type| {
mime_type.type_() == mime::TEXT && mime_type.subtype() == mime::HTML
});
status && header && content_type
}
#[cfg(not(target_arch = "wasm32"))]
const MAX_ERR_SOURCE_ITERATIONS: usize = 4;
/// This functions attempts to check the error returned by reqwest to see if
/// rotating host informtion (for clients with mutliple hosts defined) could be
/// helpful. This looks for situations where the error could plausibly be caused
/// by a network adversary, or where rotating to an equival hostname might help.
/// This functions attempts to check the error returned by reqwest to see if rotating host
/// information (for clients with multiple hosts defined) could be helpful. This looks for
/// situations where the error could plausibly be caused by a network adversary, or where rotating
/// to an equivalent hostname might help.
///
/// For example --> NetworkUnreachable will not be helped by rotating domains,
/// but ConnectionReset might be caused by a network adversary blocking by SNI
/// which could possibly benefit from rotating domains.
/// For example --> NetworkUnreachable will not be helped by rotating domains, but ConnectionReset
/// might be caused by a network adversary blocking by SNI which could possibly benefit from
/// rotating domains.
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn might_be_network_interference(err: &reqwest::Error) -> bool {
if err.is_timeout() {
@@ -1691,6 +1728,13 @@ where
decode_raw_response(&headers, full)
} else if res.status() == StatusCode::NOT_FOUND {
Err(HttpClientError::NotFound { url: Box::new(url) })
} else if is_http_rate_limit_err(&res) {
Err(HttpClientError::EndpointFailure {
url: Box::new(url),
status,
headers: Box::new(headers),
error: String::from("received vercel rate limit challenge response"),
})
} else {
let Ok(plaintext) = res.text().await else {
return Err(HttpClientError::RequestFailure {
+8
View File
@@ -12,6 +12,14 @@ pub mod v7;
pub mod v8;
pub mod v9;
/// Highest IPR protocol version that is allowed to be sent as a **non-stream** mixnet payload
/// (i.e. not wrapped in `LpFrameKind::SphinxStream`).
pub const MAX_NON_STREAM_VERSION: u8 = v8::VERSION;
/// First IPR protocol version that **requires** the SphinxStream (LP) transport for non-stream
/// mixnet sends, matching the node-side enforcement in `ip-packet-router`.
pub const SPHINX_STREAM_VERSION_THRESHOLD: u8 = v9::VERSION;
// version 3: initial version
// version 4: IPv6 support
// version 5: Add severity level to info response
+1 -1
View File
@@ -3,7 +3,7 @@
[package]
name = "nym-api"
version = "1.1.78"
version = "1.1.79"
authors.workspace = true
edition = "2021"
license = "GPL-3.0"
+1
View File
@@ -26,3 +26,4 @@ tracing.workspace = true
nym-sdk = { workspace = true }
nym-ip-packet-requests = { workspace = true }
nym-lp = { workspace = true }
+1 -1
View File
@@ -11,4 +11,4 @@ pub use error::Error;
pub use listener::{IprListener, MixnetMessageOutcome};
// Re-export the currently used version
pub use nym_ip_packet_requests::v9 as current;
pub use nym_ip_packet_requests::v8 as current;
+1 -1
View File
@@ -3,7 +3,7 @@
[package]
name = "nym-node"
version = "1.30.0"
version = "1.31.0"
authors.workspace = true
edition.workspace = true
license = "GPL-3.0"
@@ -97,6 +97,8 @@ impl BuilderConfig {
exit: self.exit_node.clone(),
mode: self.mode,
lp_registration_config: self.lp_registration_config,
#[cfg(unix)]
connection_fd_callback: self.connection_fd_callback.clone(),
}
}
@@ -143,7 +143,6 @@ impl RegistrationClientBuilder {
config,
bandwidth_controller,
cancel_token: self.config.cancel_token.clone(),
fallback_client_builder: Some(self),
})
}
}
+40 -23
View File
@@ -18,13 +18,12 @@ use rand09::{CryptoRng, RngCore, SeedableRng};
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio_util::sync::CancellationToken;
use tracing::warn;
pub struct LpBasedRegistrationClient {
pub(crate) config: RegistrationClientConfig,
pub(crate) bandwidth_controller: Box<dyn BandwidthTicketProvider>,
pub(crate) cancel_token: CancellationToken,
// While we allow a fallback, we need to be able to build it
pub(crate) fallback_client_builder: Option<RegistrationClientBuilder>,
}
impl LpBasedRegistrationClient {
@@ -81,6 +80,32 @@ impl LpBasedRegistrationClient {
self.config.lp_registration_config,
);
// Open the entry connection through a socket that has the connection
// fd callback applied before connecting (sets SO_MARK on Linux), so
// the connection is allowed through the VPN firewall during the
// connecting state.
#[cfg(unix)]
{
let fd_callback = self.config.connection_fd_callback.clone();
entry_client.set_dialer(Arc::new(move |addr| {
let fd_callback = fd_callback.clone();
Box::pin(async move {
let socket = if addr.is_ipv4() {
tokio::net::TcpSocket::new_v4()
} else {
tokio::net::TcpSocket::new_v6()
}
.map_err(|err| {
nym_lp::transport::LpTransportError::connection_failure(err.to_string())
})?;
fd_callback(std::os::fd::AsRawFd::as_raw_fd(&socket));
socket.connect(addr).await.map_err(|err| {
nym_lp::transport::LpTransportError::connection_failure(err.to_string())
})
})
}));
}
// Perform handshake with entry gateway (outer session now established)
entry_client.perform_handshake().await.map_err(|source| {
RegistrationClientError::EntryGatewayRegisterLp {
@@ -162,15 +187,11 @@ impl LpBasedRegistrationClient {
self.register_wg_with_rng(&mut rng).await
}
pub(crate) async fn register(mut self) -> Result<RegistrationResult, RegistrationClientError> {
let fallback = self.fallback_client_builder.take();
async fn register_inner(mut self) -> Result<RegistrationResult, RegistrationClientError> {
match &self.config.mode {
RegistrationMode::Mixnet => {
if let Some(fallback) = fallback {
register_with_fallback(fallback).await
} else {
Err(RegistrationClientError::UnsupportedMode)
}
// mixnet registration is not supported for LP
Err(RegistrationClientError::UnsupportedMode)
}
RegistrationMode::Wireguard => {
let lp_registration_result = self
@@ -182,15 +203,9 @@ impl LpBasedRegistrationClient {
// Everything went fine
Some(Ok(res)) => Ok(res),
// LP reg failed, try fallback if we have one
Some(Err(e)) => {
tracing::error!("LP registration failed : {e}");
if let Some(fallback) = fallback {
tracing::info!("Registering with fallback");
register_with_fallback(fallback).await
} else {
Err(e)
}
Err(e)
}
// Cancelled registration
@@ -199,12 +214,14 @@ impl LpBasedRegistrationClient {
}
}
}
}
async fn register_with_fallback(
client_builder: RegistrationClientBuilder,
) -> Result<RegistrationResult, RegistrationClientError> {
// This is forcefully building a mixnet based client
let fallback_client = client_builder.build_mixnet().await?;
fallback_client.register().await
pub(crate) async fn register(mut self) -> Result<RegistrationResult, RegistrationClientError> {
let timeout = self.config.lp_registration_config.exchange_timeout;
tokio::time::timeout(timeout, self.register_inner())
.await
.unwrap_or_else(|timeout| {
warn!("timed out while attempting to complete LP registration");
Err(RegistrationClientError::Timeout(timeout))
})
}
}
+5
View File
@@ -19,4 +19,9 @@ pub struct RegistrationClientConfig {
pub(crate) exit: NymNodeWithKeys,
pub(crate) mode: RegistrationMode,
pub(crate) lp_registration_config: LpRegistrationConfig,
/// Callback invoked with the raw fd of sockets opened for registration,
/// before connecting. Used to set `SO_MARK` on Linux so the connection is
/// allowed through the VPN firewall during the connecting state.
#[cfg(unix)]
pub(crate) connection_fd_callback: std::sync::Arc<dyn Fn(std::os::fd::RawFd) + Send + Sync>,
}
+53 -16
View File
@@ -33,6 +33,19 @@ use std::time::Duration;
use tokio::net::TcpStream;
use tracing::{debug, warn};
/// Custom dialer used to open the connection to the gateway.
///
/// Allows the caller to configure the socket before the connection is
/// initiated, e.g. set `SO_MARK` on Linux so the connection is allowed
/// through the VPN firewall during the connecting state.
pub type LpDialer<S> = Arc<
dyn Fn(
SocketAddr,
) -> futures::future::BoxFuture<'static, std::result::Result<S, LpTransportError>>
+ Send
+ Sync,
>;
/// LP (Lewes Protocol) registration client for direct gateway connections.
///
/// This client uses a persistent TCP connection model where a single TCP
@@ -70,6 +83,11 @@ pub struct LpRegistrationClient<S = TcpStream> {
/// Persistent TCP stream for the connection.
/// Opened on first use, closed after registration.
stream: Option<S>,
/// Optional custom dialer used to open the connection, allowing socket
/// configuration (e.g. `SO_MARK`) before the connection is initiated.
/// Falls back to `S::connect` when unset.
dialer: Option<LpDialer<S>>,
}
impl<S> LpRegistrationClient<S>
@@ -115,9 +133,18 @@ where
transport_session: None,
config,
stream: None,
dialer: None,
}
}
/// Sets a custom dialer used to open the connection to the gateway.
///
/// Allows socket configuration (e.g. setting `SO_MARK` on Linux so the
/// connection is allowed through the VPN firewall) before connecting.
pub fn set_dialer(&mut self, dialer: LpDialer<S>) {
self.dialer = Some(dialer);
}
/// Attempt to use this `LpRegistrationClient` as transport for `NestedSession`
pub fn as_nested_connection(&mut self, exit_address: SocketAddr) -> NestedConnection<'_, S> {
NestedConnection {
@@ -209,22 +236,32 @@ where
self.gateway_lp_address
);
let mut stream = tokio::time::timeout(
self.config.connect_timeout,
S::connect(self.gateway_lp_address),
)
.await
.map_err(|_| LpClientError::TcpConnection {
address: self.gateway_lp_address.to_string(),
source: LpTransportError::ConnectionFailure(format!(
"Connection timeout after {:?}",
self.config.connect_timeout
)),
})?
.map_err(|source| LpClientError::TcpConnection {
address: self.gateway_lp_address.to_string(),
source,
})?;
let connect_result = match &self.dialer {
Some(dialer) => {
tokio::time::timeout(self.config.connect_timeout, dialer(self.gateway_lp_address))
.await
}
None => {
tokio::time::timeout(
self.config.connect_timeout,
S::connect(self.gateway_lp_address),
)
.await
}
};
let mut stream = connect_result
.map_err(|_| LpClientError::TcpConnection {
address: self.gateway_lp_address.to_string(),
source: LpTransportError::ConnectionFailure(format!(
"Connection timeout after {:?}",
self.config.connect_timeout
)),
})?
.map_err(|source| LpClientError::TcpConnection {
address: self.gateway_lp_address.to_string(),
source,
})?;
// Set TCP_NODELAY for low latency
stream
+27 -24
View File
@@ -28,40 +28,43 @@ use std::time::Duration;
/// - Optimize for latency over throughput (small messages)
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct LpRegistrationConfig {
/// TCP connection timeout (nym-102).
/// TCP connection timeout.
///
/// Maximum time to wait for TCP connection establishment.
/// Default: 10 seconds.
/// Default: 5 seconds.
pub connect_timeout: Duration,
/// Noise protocol handshake timeout (nym-102).
/// KKT/PSQ protocol handshake timeout.
///
/// Maximum time to wait for Noise handshake completion (all round-trips).
/// Default: 15 seconds.
/// Maximum time to wait for KKT/PSQ handshake completion with the entry (all round-trips).
/// Default: 8 seconds.
pub handshake_timeout: Duration,
/// Registration request/response timeout (nym-102).
/// Registration request/response timeout.
///
/// Maximum time to wait for registration request send + response receive.
/// Includes credential verification on gateway side.
/// Default: 30 seconds.
/// Default: 8 seconds.
pub registration_timeout: Duration,
/// Maximum time for the whole exchange (handshake + registration).
/// Default: 20 seconds.
pub exchange_timeout: Duration,
/// Forward packet send/receive timeout.
///
/// Maximum time to wait for forward packet send + response receive via entry gateway.
/// Covers the entire round-trip through entry to exit gateway and back.
/// Default: 30 seconds.
/// Maximum time to wait for forward packet to get sent via entry gateway.
/// Default: 3 seconds.
pub forward_timeout: Duration,
/// Enable TCP_NODELAY (disable Nagle's algorithm) (nym-104).
/// Enable TCP_NODELAY (disable Nagle's algorithm).
///
/// When true, disables Nagle's algorithm for lower latency.
/// Recommended for registration messages which are small and latency-sensitive.
/// Default: true.
pub tcp_nodelay: bool,
/// TCP keepalive duration (nym-104).
/// TCP keepalive duration.
///
/// When Some, enables TCP keepalive with specified interval.
/// Since LP is registration-only with short-lived connections, keepalive is not needed.
@@ -72,15 +75,14 @@ pub struct LpRegistrationConfig {
impl Default for LpRegistrationConfig {
fn default() -> Self {
Self {
// nym-102: Sane timeout defaults for real network conditions
connect_timeout: Duration::from_secs(10),
handshake_timeout: Duration::from_secs(15),
registration_timeout: Duration::from_secs(30),
forward_timeout: Duration::from_secs(30),
connect_timeout: Duration::from_secs(5),
handshake_timeout: Duration::from_secs(8),
registration_timeout: Duration::from_secs(8),
exchange_timeout: Duration::from_secs(20),
forward_timeout: Duration::from_secs(3),
// nym-104: Optimized for registration-only protocol
tcp_nodelay: true, // Lower latency for small messages
tcp_keepalive: None, // Not needed for ephemeral connections
tcp_nodelay: true,
tcp_keepalive: None,
}
}
}
@@ -93,10 +95,11 @@ mod tests {
fn test_default_config() {
let config = LpRegistrationConfig::default();
assert_eq!(config.connect_timeout, Duration::from_secs(10));
assert_eq!(config.handshake_timeout, Duration::from_secs(15));
assert_eq!(config.registration_timeout, Duration::from_secs(30));
assert_eq!(config.forward_timeout, Duration::from_secs(30));
assert_eq!(config.connect_timeout, Duration::from_secs(5));
assert_eq!(config.handshake_timeout, Duration::from_secs(8));
assert_eq!(config.registration_timeout, Duration::from_secs(8));
assert_eq!(config.forward_timeout, Duration::from_secs(3));
assert_eq!(config.exchange_timeout, Duration::from_secs(20));
assert!(config.tcp_nodelay);
assert_eq!(config.tcp_keepalive, None);
}
+1 -1
View File
@@ -41,4 +41,4 @@ windows = { version = "0.61", features = [
[dev-dependencies]
tempfile.workspace = true
tracing-subscriber.workspace = true
tracing-test.workspace = true
+96 -27
View File
@@ -3,8 +3,9 @@
use std::{
io,
ops::{Deref, DerefMut},
ops::Deref,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
@@ -26,10 +27,8 @@ const CHECK_FILES_CLOSED_MAX_ATTEMPTS: u8 = 20;
/// Delay between file checks
const CHECK_FILES_CLOSED_RETRY_DELAY: Duration = Duration::from_millis(100);
/// `sqlx::SqlitePool` wrapper providing a workaround for the [known bug](https://github.com/launchbadge/sqlx/issues/3217).
/// In principle after requesting to close the sqlite pool, the wrapper monitors open file descriptor and polls periodically until all database files are closed.
#[derive(Debug, Clone)]
pub struct SqlitePoolGuard {
#[derive(Debug)]
struct SqlitePoolGuardInner {
/// Path to sqlite database file.
database_path: PathBuf,
@@ -37,6 +36,18 @@ pub struct SqlitePoolGuard {
connection_pool: sqlx::SqlitePool,
}
/// `sqlx::SqlitePool` wrapper providing a workaround for the [known bug](https://github.com/launchbadge/sqlx/issues/3217).
/// In principle after requesting to close the sqlite pool, the wrapper monitors open file descriptor and polls periodically until all database files are closed.
///
/// This type is cheaply [`Clone`]-able: all clones share the same underlying pool and the same
/// reference count. The `Drop` impl only emits a warning when the **last** reference is dropped
/// without an explicit [`close`](Self::close) call, so it is safe to clone this guard temporarily
/// (e.g. to pass into a spawned task) without triggering spurious warnings.
#[derive(Debug, Clone)]
pub struct SqlitePoolGuard {
inner: Arc<SqlitePoolGuardInner>,
}
impl SqlitePoolGuard {
/// Create new instance providing path to database and connection pool
pub fn new(connection_pool: sqlx::SqlitePool) -> Self {
@@ -46,46 +57,70 @@ impl SqlitePoolGuard {
.to_path_buf();
Self {
database_path,
connection_pool,
inner: Arc::new(SqlitePoolGuardInner {
database_path,
connection_pool,
}),
}
}
/// Returns database path
pub fn database_path(&self) -> &Path {
&self.database_path
&self.inner.database_path
}
/// Close udnerlying sqlite pool and wait for files to be closed before returning.
/// Close the underlying sqlite pool and wait for OS file handles to be released.
///
/// **Callers must invoke this method before dropping the guard.** The `Drop` impl does
/// not perform async cleanup; it only logs a warning when the pool was not closed
/// beforehand.
pub async fn close(&self) {
// Avoid waiting for db files once the pool is marked closed to ensure that we don't wait on some other sqlite pool to close the database.
if !self.connection_pool.is_closed() {
tracing::info!("Closing sqlite pool: {}", self.database_path.display());
if !self.inner.connection_pool.is_closed() {
tracing::info!(
"Closing sqlite pool: {}",
self.inner.database_path.display()
);
self.close_pool_inner().await.ok();
}
}
async fn close_pool_inner(&self) -> std::io::Result<()> {
self.connection_pool.close().await;
self.inner.connection_pool.close().await;
self.wait_for_db_files_close().await.inspect_err(|e| {
tracing::error!("Failed to wait for file to close: {e}");
})
if let Err(e) = self.wait_for_db_files_close().await {
if e.kind() == std::io::ErrorKind::TimedOut {
tracing::warn!(
"Timed out waiting for OS file handles for sqlite database to be released; \
another connection to the same file may still be open. Path = {}",
self.inner.database_path.display()
);
} else {
tracing::warn!(
"Failed to wait for sqlite database file handles to be released: Path = {}. Error = {}",
self.inner.database_path.display(),
e
);
}
}
Ok(())
}
/// Returns all database files, including shm and wal files.
fn all_database_files(&self) -> Vec<PathBuf> {
let mut database_files = vec![];
let canonical_path = self
.inner
.database_path
.canonicalize()
.inspect_err(|e| {
tracing::error!(
"Failed to canonicalize path: {}. Cause: {e}",
self.database_path.display()
self.inner.database_path.display()
);
})
.unwrap_or(self.database_path.clone());
.unwrap_or(self.inner.database_path.clone());
if let Some(ext) = canonical_path.extension() {
for added_ext in ["-shm", "-wal"] {
@@ -120,34 +155,38 @@ impl SqlitePoolGuard {
}
}
impl Drop for SqlitePoolGuard {
fn drop(&mut self) {
if Arc::strong_count(&self.inner) == 1 && !self.inner.connection_pool.is_closed() {
tracing::warn!(
"SqlitePoolGuard dropped without explicit close(); path={}",
self.inner.database_path.display()
);
}
}
}
impl Deref for SqlitePoolGuard {
type Target = sqlx::SqlitePool;
fn deref(&self) -> &Self::Target {
&self.connection_pool
&self.inner.connection_pool
}
}
impl DerefMut for SqlitePoolGuard {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.connection_pool
}
}
#[cfg(test)]
mod tests {
use sqlx::{
ConnectOptions, Executor,
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
};
use tracing_test::traced_test;
use super::*;
#[traced_test]
#[tokio::test]
async fn test_wait_close() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();
let temp_dir = tempfile::tempdir().unwrap();
let database_path = temp_dir.path().join("storage.db");
@@ -177,4 +216,34 @@ mod tests {
assert!(guard.close_pool_inner().await.is_ok());
tokio::fs::remove_file(database_path).await.unwrap();
}
#[traced_test]
#[tokio::test]
async fn test_clone_drop_no_warning() {
// Cloning the guard and dropping the clone should not warn because the original is still alive.
let temp_dir = tempfile::tempdir().unwrap();
let database_path = temp_dir.path().join("storage2.db");
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path.clone())
.create_if_missing(true)
.disable_statement_logging();
let connection_pool = sqlx::SqlitePool::connect_with(opts).await.unwrap();
let guard = SqlitePoolGuard::new(connection_pool);
{
let _clone = guard.clone();
assert_eq!(Arc::strong_count(&guard.inner), 2);
}
assert_eq!(Arc::strong_count(&guard.inner), 1);
assert!(!logs_contain(
"SqlitePoolGuard dropped without explicit close"
));
guard.close().await;
tokio::fs::remove_file(database_path).await.unwrap();
}
}
@@ -0,0 +1,5 @@
{
"name": "@nymproject/nym-client-wasm",
"version": "1.0.0",
"sideEffects": false
}
@@ -23,6 +23,7 @@ use crate::{
};
use futures::StreamExt;
use nym_ip_packet_requests::codec::MultiIpPacketCodec;
use nym_ip_packet_requests::{MAX_NON_STREAM_VERSION, SPHINX_STREAM_VERSION_THRESHOLD};
use nym_lp::packet::frame::{LpFrameHeader, LpFrameKind, SphinxStreamFrameAttributes};
use nym_sdk::mixnet::MixnetMessageSender;
use nym_sphinx::receiver::ReconstructedMessage;
@@ -559,8 +560,9 @@ impl MixnetListener {
///
/// # Version / transport enforcement
///
/// - LP Stream frames (`stream_id` is `Some`) **must** carry v9+ payloads.
/// - Non-stream messages (`stream_id` is `None`) **must** be v8 or lower.
/// - LP Stream frames (`stream_id` is `Some`) **must** carry payloads with version
/// `>= SPHINX_STREAM_VERSION_THRESHOLD` (see `nym_ip_packet_requests`).
/// - Non-stream messages (`stream_id` is `None`) **must** be `<= MAX_NON_STREAM_VERSION`.
///
/// Messages that violate these rules are dropped.
async fn on_ipr_message(
@@ -578,16 +580,22 @@ impl MixnetListener {
}?;
// Enforce version/transport consistency:
// - LP Stream frames must carry v9+ payloads
// - Non-stream messages must be v8 or lower
// - LP Stream frames must carry payloads at/above the SphinxStream threshold
// - Non-stream messages must be at/below the max non-stream version
let version_num = request.version().into_u8();
if stream_id.is_some() && version_num < 9 {
log::warn!("LP Stream frame contains v{version_num} payload, expected v9+; dropping",);
if stream_id.is_some() && version_num < SPHINX_STREAM_VERSION_THRESHOLD {
log::warn!(
"LP Stream frame contains v{version_num} payload, expected v{expected}+; dropping",
expected = SPHINX_STREAM_VERSION_THRESHOLD
);
return Ok(vec![]);
}
if stream_id.is_none() && version_num >= 9 {
log::warn!("Non-stream message claims v{version_num}, expected v8 or lower; dropping",);
if stream_id.is_none() && version_num > MAX_NON_STREAM_VERSION {
log::warn!(
"Non-stream message claims v{version_num}, expected v{expected} or lower; dropping",
expected = MAX_NON_STREAM_VERSION
);
return Ok(vec![]);
}
@@ -3,7 +3,7 @@
[package]
name = "nym-network-requester"
version = "1.1.76"
version = "1.1.77"
authors.workspace = true
edition.workspace = true
license = "GPL-3.0"
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-cli"
version = "1.1.75"
version = "1.1.76"
authors.workspace = true
edition = "2021"
license.workspace = true
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nymvisor"
version = "0.1.40"
version = "0.1.41"
authors.workspace = true
edition.workspace = true
license.workspace = true