Compare commits

...

5 Commits

Author SHA1 Message Date
Jędrzej Stuczyński 7b858dfd69 chore: LP registration adjustments (#6845)
* remove mixnet fallback for LP registration

* change LP registration timeouts and introduce exchange timeout

* remove fallback client construction and disable mixnet via LP registration
2026-06-02 16:28:57 +01:00
Jack Wampler a4bd547023 Handle Rate Limit Challenge Response (#6825)
rotate urls on HTTP response error indicating API rate limiting
2026-05-26 14:58:45 -06:00
Andy Duplain db03ec31b1 Merge pull request #6812 from nymtech/cherry-pick/nym-583-corrupt-db-windows
NYM-583: Avoid corrupted database on Windows.
2026-05-21 14:44:39 +01:00
Andy Duplain 9b285735b8 NYM-583: Avoid corrupted database on Windows.
NYM-583: Avoid corrupted database on Windows.
2026-05-21 14:23:20 +01:00
Simon Wicky 691280797a back to v8 on non sdk client (#6771) 2026-05-13 18:24:50 +02:00
24 changed files with 325 additions and 353 deletions
Generated
+1 -1
View File
@@ -8243,7 +8243,7 @@ dependencies = [
"tempfile",
"tokio",
"tracing",
"tracing-subscriber",
"tracing-test",
"windows 0.61.3",
]
+10
View File
@@ -25,6 +25,8 @@ pub trait BandwidthTicketProvider: Send + Sync {
) -> Result<PreparedCredential, BandwidthControllerError>;
async fn get_upgrade_mode_token(&self) -> Result<Option<String>, BandwidthControllerError>;
async fn close(&self) {}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -56,6 +58,10 @@ where
.map_err(|_| BandwidthControllerError::MalformedUpgradeModeToken)?;
Ok(Some(token))
}
async fn close(&self) {
self.storage.close().await;
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -75,4 +81,8 @@ impl<T: BandwidthTicketProvider + ?Sized + Send> BandwidthTicketProvider for Box
async fn get_upgrade_mode_token(&self) -> Result<Option<String>, BandwidthControllerError> {
(**self).get_upgrade_mode_token().await
}
async fn close(&self) {
(**self).close().await;
}
}
@@ -1023,6 +1023,16 @@ where
let encryption_keys = init_res.client_keys.encryption_keypair();
let identity_keys = init_res.client_keys.identity_keypair();
let credential_store_for_close = credential_store.clone();
let close_credential_token = shutdown_tracker.clone_shutdown_token();
shutdown_tracker.try_spawn_named(
async move {
close_credential_token.cancelled().await;
credential_store_for_close.close().await;
},
"CredentialStorage::close_on_shutdown",
);
// the components are started in very specific order. Unless you know what you are doing,
// do not change that.
let bandwidth_controller = self
@@ -11,11 +11,17 @@ use nym_bandwidth_controller::BandwidthController;
use nym_client_core_gateways_storage::OnDiskGatewaysDetails;
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_validator_client::{QueryHttpRpcNyxdClient, nyxd};
use std::{io, path::Path};
use std::{io, path::Path, time::Duration};
use time::OffsetDateTime;
use tracing::{error, info, trace};
use url::Url;
/// Maximum rename retry attempts when the database file is temporarily locked.
const ARCHIVE_MAX_RETRY_ATTEMPTS: u8 = 15;
/// Delay between archive rename retry attempts.
const ARCHIVE_RETRY_DELAY: Duration = Duration::from_millis(200);
async fn setup_fresh_backend<P: AsRef<Path>>(
db_path: P,
surb_config: &config::ReplySurbs,
@@ -74,13 +80,58 @@ async fn archive_corrupted_database<P: AsRef<Path>>(db_path: P) -> io::Result<()
};
let renamed = db_path.with_extension(new_extension);
tokio::fs::rename(db_path, &renamed).await.inspect_err(|_| {
error!(
"Failed to rename corrupt database file: {} to {}",
db_path.display(),
renamed.display()
);
})
// On Windows, sqlx may release its OS file handles asynchronously after
// pool.close() returns, briefly keeping the file locked
// (ERROR_SHARING_VIOLATION, os error 32). Retry with a short delay to
// give the OS time to flush the remaining handles.
for attempt in 0..ARCHIVE_MAX_RETRY_ATTEMPTS {
match tokio::fs::rename(db_path, &renamed).await {
Ok(()) => return Ok(()),
Err(e) if is_file_locked_error(&e) && (attempt + 1) < ARCHIVE_MAX_RETRY_ATTEMPTS => {
trace!(
"Database file is temporarily locked, retrying archive \
(attempt {}/{}): {e}",
attempt + 1,
ARCHIVE_MAX_RETRY_ATTEMPTS
);
tokio::time::sleep(ARCHIVE_RETRY_DELAY).await;
}
Err(e) => {
error!(
"Failed to rename corrupt database file: {} to {}",
db_path.display(),
renamed.display()
);
return Err(e);
}
}
}
// Reached only when every attempt was blocked by a file lock.
error!(
"Failed to rename corrupt database file after {} attempts: {} to {}",
ARCHIVE_MAX_RETRY_ATTEMPTS,
db_path.display(),
renamed.display()
);
Err(io::Error::other(
"corrupt database archive blocked by persistent file lock",
))
}
/// Returns `true` when the IO error indicates a temporary file lock held by another handle
/// within the same process. Only meaningful on Windows; always `false` elsewhere.
fn is_file_locked_error(e: &io::Error) -> bool {
#[cfg(windows)]
{
// ERROR_SHARING_VIOLATION = 32, ERROR_LOCK_VIOLATION = 33
matches!(e.raw_os_error(), Some(32) | Some(33))
}
#[cfg(not(windows))]
{
let _ = e;
false
}
}
pub async fn setup_fs_reply_surb_backend<P: AsRef<Path>>(
@@ -337,6 +337,8 @@ impl ReplyStorageBackend for Backend {
}
async fn stop_storage_session(self) -> Result<(), Self::StorageError> {
self.stop_client_use().await
let result = self.stop_client_use().await;
self.shutdown().await;
result
}
}
+5 -3
View File
@@ -48,6 +48,7 @@ where
debug!("Started PersistentReplyStorage");
if let Err(err) = self.backend.start_storage_session().await {
error!("failed to start the storage session - {err}");
self.backend.stop_storage_session().await.ok();
return;
}
@@ -55,10 +56,11 @@ where
info!("PersistentReplyStorage is flushing all reply-related data to underlying storage");
if let Err(err) = self.backend.flush_surb_storage(&mem_state).await {
error!("failed to flush our reply-related data to the persistent storage: {err}")
} else {
info!("Data flush is complete")
error!("failed to flush our reply-related data to the persistent storage: {err}");
self.backend.stop_storage_session().await.ok();
return;
}
info!("Data flush is complete");
if let Err(err) = self.backend.stop_storage_session().await {
error!("failed to properly stop the storage session - {err}. We might not be able to smoothly restore it")
+46 -8
View File
@@ -1173,7 +1173,16 @@ impl ApiClientCore for Client {
};
match response {
Ok(resp) => return Ok(resp),
Ok(resp) => {
// Check if the response includes a rate limit error from the vercel API
if is_http_rate_limit_err(&resp) {
warn!("encountered vercel rate limit error for {}", url.as_str());
// if we have multiple urls, update to the next
self.maybe_rotate_hosts(Some(url.clone()));
}
return Ok(resp);
}
Err(err) => {
#[cfg(target_arch = "wasm32")]
let is_network_err = err.is_timeout();
@@ -1226,17 +1235,39 @@ impl ApiClientCore for Client {
}
}
const VERCEL_CHALLENGE_HEADER: &str = "x-vercel-mitigated";
const VERCEL_CHALLENGE_VALUE: &[u8] = b"challenge";
/// Check for Rate Limit challenge response from the vercel API
pub(crate) fn is_http_rate_limit_err(resp: &Response) -> bool {
let status = resp.status() == StatusCode::FORBIDDEN;
let header = resp
.headers()
.get(VERCEL_CHALLENGE_HEADER)
.is_some_and(|v| v.as_bytes() == VERCEL_CHALLENGE_VALUE);
let content_type = resp
.headers()
.get(CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<Mime>().ok())
.is_some_and(|mime_type| {
mime_type.type_() == mime::TEXT && mime_type.subtype() == mime::HTML
});
status && header && content_type
}
#[cfg(not(target_arch = "wasm32"))]
const MAX_ERR_SOURCE_ITERATIONS: usize = 4;
/// This functions attempts to check the error returned by reqwest to see if
/// rotating host informtion (for clients with mutliple hosts defined) could be
/// helpful. This looks for situations where the error could plausibly be caused
/// by a network adversary, or where rotating to an equival hostname might help.
/// This functions attempts to check the error returned by reqwest to see if rotating host
/// information (for clients with multiple hosts defined) could be helpful. This looks for
/// situations where the error could plausibly be caused by a network adversary, or where rotating
/// to an equivalent hostname might help.
///
/// For example --> NetworkUnreachable will not be helped by rotating domains,
/// but ConnectionReset might be caused by a network adversary blocking by SNI
/// which could possibly benefit from rotating domains.
/// For example --> NetworkUnreachable will not be helped by rotating domains, but ConnectionReset
/// might be caused by a network adversary blocking by SNI which could possibly benefit from
/// rotating domains.
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn might_be_network_interference(err: &reqwest::Error) -> bool {
if err.is_timeout() {
@@ -1697,6 +1728,13 @@ where
decode_raw_response(&headers, full)
} else if res.status() == StatusCode::NOT_FOUND {
Err(HttpClientError::NotFound { url: Box::new(url) })
} else if is_http_rate_limit_err(&res) {
Err(HttpClientError::EndpointFailure {
url: Box::new(url),
status,
headers: Box::new(headers),
error: String::from("received vercel rate limit challenge response"),
})
} else {
let Ok(plaintext) = res.text().await else {
return Err(HttpClientError::RequestFailure {
-12
View File
@@ -20,18 +20,6 @@ pub const MAX_NON_STREAM_VERSION: u8 = v8::VERSION;
/// mixnet sends, matching the node-side enforcement in `ip-packet-router`.
pub const SPHINX_STREAM_VERSION_THRESHOLD: u8 = v9::VERSION;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn stream_transport_threshold_is_consistent() {
assert_eq!(MAX_NON_STREAM_VERSION, 8);
assert_eq!(SPHINX_STREAM_VERSION_THRESHOLD, 9);
assert!(SPHINX_STREAM_VERSION_THRESHOLD > MAX_NON_STREAM_VERSION);
}
}
// version 3: initial version
// version 4: IPv6 support
// version 5: Add severity level to info response
+2 -6
View File
@@ -1,10 +1,9 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_ip_packet_client::current::response::{
use nym_ip_packet_requests::v8::response::{
ControlResponse, DataResponse, InfoLevel, IpPacketResponse, IpPacketResponseData,
};
use nym_ip_packet_client::lp_stream;
use nym_sdk::{
DebugConfig, NymApiTopologyProvider, NymApiTopologyProviderConfig, NymNetworkDetails,
TopologyProvider, mixnet::ReconstructedMessage,
@@ -33,10 +32,7 @@ pub fn mixnet_debug_config(
}
pub fn unpack_data_response(reconstructed_message: &ReconstructedMessage) -> Option<DataResponse> {
let payload =
lp_stream::maybe_unwrap_lp_stream_payload_from_reconstructed(reconstructed_message);
match IpPacketResponse::from_bytes(payload) {
match IpPacketResponse::from_reconstructed_message(reconstructed_message) {
Ok(response) => match response.data {
IpPacketResponseData::Data(data_response) => Some(data_response),
IpPacketResponseData::Control(control) => match *control {
+5 -24
View File
@@ -9,7 +9,7 @@ use nym_connection_monitor::{
wrap_icmp_in_ipv6,
},
};
use nym_ip_packet_requests::{IpPair, codec::MultiIpPacketCodec};
use nym_ip_packet_requests::{IpPair, codec::MultiIpPacketCodec, v8::request::IpPacketRequest};
use nym_sdk::mixnet::{
InputMessage, MixnetClient, MixnetMessageSender, Recipient, TransmissionLane,
};
@@ -25,7 +25,6 @@ pub async fn send_ping_v4(
sequence_number: u16,
destination: Ipv4Addr,
exit_router_address: Recipient,
stream_id: u64,
) -> anyhow::Result<()> {
let icmp_identifier = icmp_identifier();
let icmp_echo_request = create_icmpv4_echo_request(sequence_number, icmp_identifier)?;
@@ -36,12 +35,7 @@ pub async fn send_ping_v4(
MultiIpPacketCodec::bundle_one_packet(ipv4_packet.packet().to_vec().into());
// Wrap into a mixnet input message addressed to the IPR
let mixnet_message = create_input_message(
exit_router_address,
bundled_packet,
stream_id,
sequence_number,
)?;
let mixnet_message = create_input_message(exit_router_address, bundled_packet)?;
mixnet_client.send(mixnet_message).await?;
Ok(())
@@ -53,7 +47,6 @@ pub async fn send_ping_v6(
sequence_number: u16,
destination: Ipv6Addr,
exit_router_address: Recipient,
stream_id: u64,
) -> anyhow::Result<()> {
let icmp_identifier = icmp_identifier();
let icmp_echo_request = create_icmpv6_echo_request(
@@ -69,12 +62,7 @@ pub async fn send_ping_v6(
MultiIpPacketCodec::bundle_one_packet(ipv6_packet.packet().to_vec().into());
// Wrap into a mixnet input message addressed to the IPR
let mixnet_message = create_input_message(
exit_router_address,
bundled_packet,
stream_id,
sequence_number,
)?;
let mixnet_message = create_input_message(exit_router_address, bundled_packet)?;
// Send across the mixnet
mixnet_client.send(mixnet_message).await?;
@@ -84,22 +72,15 @@ pub async fn send_ping_v6(
fn create_input_message(
recipient: impl Into<Recipient>,
bundled_packets: Bytes,
stream_id: u64,
sequence_number: u16,
) -> anyhow::Result<InputMessage> {
let packet = nym_ip_packet_client::current::new_data_request(bundled_packets).to_bytes()?;
let framed_packet = nym_ip_packet_client::lp_stream::encode_stream_frame(
stream_id,
sequence_number as u32,
packet,
);
let packet = IpPacketRequest::new_data_request(bundled_packets).to_bytes()?;
let lane = TransmissionLane::General;
let packet_type = None;
let surbs = 0;
Ok(InputMessage::new_anonymous(
recipient.into(),
framed_packet,
packet,
surbs,
lane,
packet_type,
+5 -18
View File
@@ -256,8 +256,8 @@ pub async fn do_ping(
let (maybe_ip_pair, mut mixnet_client) =
connect_exit(mixnet_client, exit_router_address).await;
match maybe_ip_pair {
Some((ip_pair, stream_id)) => (
do_ping_exit(&mut mixnet_client, ip_pair, stream_id, exit_router_address).await,
Some(ip_pair) => (
do_ping_exit(&mut mixnet_client, ip_pair, exit_router_address).await,
mixnet_client,
),
None => (Ok(Some(Exit::fail_to_connect())), mixnet_client),
@@ -304,7 +304,7 @@ async fn do_ping_entry(
async fn connect_exit(
mixnet_client: MixnetClient,
exit_router_address: Recipient,
) -> (Option<(IpPair, u64)>, MixnetClient) {
) -> (Option<IpPair>, MixnetClient) {
// Step 2: connect to the exit gateway
info!(
"Connecting to exit gateway: {}",
@@ -315,19 +315,12 @@ async fn connect_exit(
let mut ipr_client = IprClientConnect::new(mixnet_client, cancel_token);
let maybe_ip_pair = ipr_client.connect(exit_router_address).await;
let stream_id = ipr_client.stream_id();
let mixnet_client = ipr_client.into_mixnet_client();
if let Ok(our_ips) = maybe_ip_pair {
info!("Successfully connected to exit gateway");
info!("Using mixnet VPN IP addresses: {our_ips}");
let Some(stream_id) = stream_id else {
tracing::warn!(
"No active IPR stream id set after connect; cannot run IPR data-plane tests"
);
return (None, mixnet_client);
};
(Some((our_ips, stream_id)), mixnet_client)
(Some(our_ips), mixnet_client)
} else {
(None, mixnet_client)
}
@@ -336,11 +329,10 @@ async fn connect_exit(
pub async fn do_ping_exit(
mixnet_client: &mut MixnetClient,
our_ips: IpPair,
stream_id: u64,
exit_router_address: Recipient,
) -> anyhow::Result<Option<Exit>> {
// Step 3: perform ICMP connectivity checks for the exit gateway
send_icmp_pings(mixnet_client, our_ips, exit_router_address, stream_id).await?;
send_icmp_pings(mixnet_client, our_ips, exit_router_address).await?;
listen_for_icmp_ping_replies(mixnet_client, our_ips).await
}
@@ -348,7 +340,6 @@ async fn send_icmp_pings(
mixnet_client: &MixnetClient,
our_ips: IpPair,
exit_router_address: Recipient,
stream_id: u64,
) -> anyhow::Result<()> {
// ipv4 addresses for testing
let ipr_tun_ip_v4 = NYM_TUN_DEVICE_ADDRESS_V4;
@@ -370,7 +361,6 @@ async fn send_icmp_pings(
ii,
ipr_tun_ip_v4,
exit_router_address,
stream_id,
)
.await?;
icmp::send_ping_v4(
@@ -379,7 +369,6 @@ async fn send_icmp_pings(
ii,
external_ip_v4,
exit_router_address,
stream_id,
)
.await?;
}
@@ -392,7 +381,6 @@ async fn send_icmp_pings(
ii,
ipr_tun_ip_v6,
exit_router_address,
stream_id,
)
.await?;
icmp::send_ping_v6(
@@ -401,7 +389,6 @@ async fn send_icmp_pings(
ii,
external_ip_v6,
exit_router_address,
stream_id,
)
.await?;
}
+16 -65
View File
@@ -14,7 +14,7 @@ use tracing::{debug, error};
use nym_ip_packet_requests::response_helpers::{self, IprResponseError};
use crate::{
current::{self, response::IpPacketResponse},
current::{request::IpPacketRequest, response::IpPacketResponse},
error::{Error, Result},
helpers::check_ipr_message_version,
};
@@ -24,12 +24,8 @@ const IPR_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Clone, Debug, PartialEq, Eq)]
enum ConnectionState {
Disconnected,
Connecting {
stream_id: u64,
},
Connected {
stream_id: u64,
},
Connecting,
Connected,
#[allow(unused)]
Disconnecting,
}
@@ -55,27 +51,17 @@ impl IprClientConnect {
self.mixnet_client
}
pub fn stream_id(&self) -> Option<u64> {
match self.connected {
ConnectionState::Connecting { stream_id }
| ConnectionState::Connected { stream_id } => Some(stream_id),
ConnectionState::Disconnected | ConnectionState::Disconnecting => None,
}
}
pub async fn connect(&mut self, ip_packet_router_address: Recipient) -> Result<IpPair> {
if self.connected != ConnectionState::Disconnected {
return Err(Error::AlreadyConnected);
}
tracing::info!("Connecting to exit gateway");
self.connected = ConnectionState::Connecting;
match self.connect_inner(ip_packet_router_address).await {
Ok(ips) => {
debug!("Successfully connected to the ip-packet-router");
let Some(stream_id) = self.stream_id() else {
return Err(Error::UnexpectedConnectResponse);
};
self.connected = ConnectionState::Connected { stream_id };
self.connected = ConnectionState::Connected;
Ok(ips)
}
Err(err) => {
@@ -88,37 +74,21 @@ impl IprClientConnect {
async fn connect_inner(&mut self, ip_packet_router_address: Recipient) -> Result<IpPair> {
let request_id = self.send_connect_request(ip_packet_router_address).await?;
self.connected = ConnectionState::Connecting {
stream_id: request_id,
};
debug!("Waiting for reply...");
self.listen_for_connect_response(request_id).await
}
async fn send_connect_request(&mut self, ip_packet_router_address: Recipient) -> Result<u64> {
let (request, request_id) = current::new_connect_request(None);
tracing::info!(
request_id = request_id,
protocol_version = request.protocol.version,
current_version = crate::current::VERSION,
"Sending IPR connect request"
);
if let Ok(bytes) = request.to_bytes() {
let prefix = bytes.get(0..2).unwrap_or(&bytes);
let prefix_hex = format!("{:02x?}", prefix);
tracing::info!(request_id = request_id, prefix = %prefix_hex, "IPR connect bytes prefix");
}
async fn send_connect_request(&self, ip_packet_router_address: Recipient) -> Result<u64> {
let (request, request_id) = IpPacketRequest::new_connect_request(None);
// We use 20 surbs for the connect request because typically the IPR is configured to have
// a min threshold of 10 surbs that it reserves for itself to request additional surbs.
let surbs = 20;
let request_bytes = request.to_bytes()?;
let framed_bytes = maybe_wrap_stream_frame(request_id, 0, request_bytes);
self.mixnet_client
.send(create_input_message(
ip_packet_router_address,
framed_bytes,
request,
surbs,
)?)
.await
@@ -159,19 +129,13 @@ impl IprClientConnect {
for msg in msgs {
// Confirm that the version is correct
if let Err(err) = check_ipr_message_version(&msg) {
let raw: &[u8] = msg.message.as_ref();
tracing::warn!(
first_byte = raw.first().copied(),
expected = crate::current::VERSION,
len = raw.len(),
"Mixnet message version mismatch: {err}"
);
tracing::info!("Mixnet message version mismatch: {err}");
continue;
}
// Then we deserialize the message
tracing::debug!("IprClient: got message while waiting for connect response");
let Ok(response) = ipr_response_from_reconstructed_message(&msg) else {
let Ok(response) = IpPacketResponse::from_reconstructed_message(&msg) else {
// This is ok, it's likely just one of our self-pings
tracing::debug!("Failed to deserialize mixnet message");
continue;
@@ -196,27 +160,14 @@ impl IprClientConnect {
}
}
fn maybe_wrap_stream_frame(stream_id: u64, sequence_num: u32, payload: Vec<u8>) -> Vec<u8> {
if !crate::lp_stream::current_requires_sphinx_stream_transport() {
return payload;
}
crate::lp_stream::encode_stream_frame(stream_id, sequence_num, payload)
}
fn ipr_response_from_reconstructed_message(
msg: &nym_sdk::mixnet::ReconstructedMessage,
) -> std::result::Result<IpPacketResponse, bincode::Error> {
let warn_on_unexpected = crate::lp_stream::current_requires_sphinx_stream_transport();
let payload =
crate::lp_stream::maybe_unwrap_lp_stream_payload(&msg.message, warn_on_unexpected);
IpPacketResponse::from_bytes(payload)
}
fn create_input_message(recipient: Recipient, bytes: Vec<u8>, surbs: u32) -> Result<InputMessage> {
fn create_input_message(
recipient: Recipient,
request: IpPacketRequest,
surbs: u32,
) -> Result<InputMessage> {
Ok(InputMessage::new_anonymous(
recipient,
bytes,
request.to_bytes()?,
surbs,
TransmissionLane::General,
None,
+14 -12
View File
@@ -7,16 +7,18 @@ use nym_sdk::mixnet::ReconstructedMessage;
use crate::{current::VERSION as CURRENT_VERSION, error::Result};
pub(crate) fn check_ipr_message_version(message: &ReconstructedMessage) -> Result<()> {
let payload = crate::lp_stream::maybe_unwrap_lp_stream_payload_from_reconstructed(message);
nym_ip_packet_requests::response_helpers::check_ipr_message_version(payload, CURRENT_VERSION)
.map_err(|e| match e {
IprResponseError::NoVersionByte => crate::Error::NoVersionInMessage,
IprResponseError::VersionMismatch { expected, received } if received < expected => {
crate::Error::ReceivedResponseWithOldVersion { expected, received }
}
IprResponseError::VersionMismatch { expected, received } => {
crate::Error::ReceivedResponseWithNewVersion { expected, received }
}
_ => crate::Error::NoVersionInMessage,
})
nym_ip_packet_requests::response_helpers::check_ipr_message_version(
&message.message,
CURRENT_VERSION,
)
.map_err(|e| match e {
IprResponseError::NoVersionByte => crate::Error::NoVersionInMessage,
IprResponseError::VersionMismatch { expected, received } if received < expected => {
crate::Error::ReceivedResponseWithOldVersion { expected, received }
}
IprResponseError::VersionMismatch { expected, received } => {
crate::Error::ReceivedResponseWithNewVersion { expected, received }
}
_ => crate::Error::NoVersionInMessage,
})
}
+1 -2
View File
@@ -5,11 +5,10 @@ mod connect;
mod error;
mod helpers;
mod listener;
pub mod lp_stream;
pub use connect::IprClientConnect;
pub use error::Error;
pub use listener::{IprListener, MixnetMessageOutcome};
// Re-export the currently used version
pub use nym_ip_packet_requests::v9 as current;
pub use nym_ip_packet_requests::v8 as current;
-100
View File
@@ -1,100 +0,0 @@
use bytes::BytesMut;
use nym_ip_packet_requests::SPHINX_STREAM_VERSION_THRESHOLD;
use nym_lp::packet::frame::{
LpFrame, LpFrameHeader, LpFrameKind, SphinxStreamFrameAttributes, SphinxStreamMsgType,
};
use nym_sdk::mixnet::ReconstructedMessage;
use tracing::warn;
/// Whether the "current" IPR client is operating at a version where the node expects
/// non-stream mixnet IPR messages to be LP Stream framed (see `SPHINX_STREAM_VERSION_THRESHOLD`).
pub(crate) fn current_requires_sphinx_stream_transport() -> bool {
crate::current::VERSION >= SPHINX_STREAM_VERSION_THRESHOLD
}
pub fn maybe_unwrap_lp_stream_payload(data: &[u8], warn_on_unexpected: bool) -> &[u8] {
if data.len() < LpFrameHeader::SIZE {
if warn_on_unexpected {
warn!(
len = data.len(),
header_size = LpFrameHeader::SIZE,
"expected LP SphinxStream frame for IPR payload, but message is shorter than LP header; treating as raw payload"
);
}
return data;
}
let Ok(header) = LpFrameHeader::parse(data) else {
if warn_on_unexpected {
warn!(
"expected LP SphinxStream frame for IPR payload, but failed to parse LP header; treating as raw payload"
);
}
return data;
};
if header.kind == LpFrameKind::SphinxStream {
&data[LpFrameHeader::SIZE..]
} else {
if warn_on_unexpected {
warn!(
kind = ?header.kind,
"expected LP SphinxStream frame for IPR payload, but got different LP frame kind; treating as raw payload"
);
}
data
}
}
pub fn maybe_unwrap_lp_stream_payload_from_reconstructed(message: &ReconstructedMessage) -> &[u8] {
maybe_unwrap_lp_stream_payload(&message.message, false)
}
pub fn encode_stream_frame(stream_id: u64, sequence_num: u32, payload: Vec<u8>) -> Vec<u8> {
let attrs = SphinxStreamFrameAttributes {
stream_id,
msg_type: SphinxStreamMsgType::Data,
sequence_num,
};
let frame = LpFrame::new_stream(attrs, payload);
let mut buf = BytesMut::with_capacity(LpFrameHeader::SIZE + frame.content.len());
frame.encode(&mut buf);
buf.to_vec()
}
#[cfg(test)]
mod tests {
use super::*;
use nym_lp::packet::frame::SphinxStreamFrameAttributes;
#[test]
fn stream_frame_roundtrip_unwraps_payload() {
let stream_id = 0x0123_4567_89ab_cdef;
let seq = 42u32;
let payload = b"hello-ipr".to_vec();
let framed = encode_stream_frame(stream_id, seq, payload.clone());
let header = LpFrameHeader::parse(&framed).expect("valid lp header");
assert_eq!(header.kind, LpFrameKind::SphinxStream);
let attrs =
SphinxStreamFrameAttributes::parse(&header.frame_attributes).expect("valid attrs");
assert_eq!(attrs.stream_id, stream_id);
assert_eq!(attrs.sequence_num, seq);
assert_eq!(attrs.msg_type, SphinxStreamMsgType::Data);
let unwrapped = maybe_unwrap_lp_stream_payload(&framed, false);
assert_eq!(unwrapped, payload.as_slice());
}
#[test]
fn unwrap_noops_on_non_stream_or_malformed_data() {
let raw = b"\x09\x00\x01\x02\x03";
assert_eq!(maybe_unwrap_lp_stream_payload(raw, false), raw);
// malformed header: not enough bytes for LP header
let short = b"\x00\x01";
assert_eq!(maybe_unwrap_lp_stream_payload(short, false), short);
}
}
@@ -143,7 +143,6 @@ impl RegistrationClientBuilder {
config,
bandwidth_controller,
cancel_token: self.config.cancel_token.clone(),
fallback_client_builder: Some(self),
})
}
}
+14 -23
View File
@@ -18,13 +18,12 @@ use rand09::{CryptoRng, RngCore, SeedableRng};
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio_util::sync::CancellationToken;
use tracing::warn;
pub struct LpBasedRegistrationClient {
pub(crate) config: RegistrationClientConfig,
pub(crate) bandwidth_controller: Box<dyn BandwidthTicketProvider>,
pub(crate) cancel_token: CancellationToken,
// While we allow a fallback, we need to be able to build it
pub(crate) fallback_client_builder: Option<RegistrationClientBuilder>,
}
impl LpBasedRegistrationClient {
@@ -162,15 +161,11 @@ impl LpBasedRegistrationClient {
self.register_wg_with_rng(&mut rng).await
}
pub(crate) async fn register(mut self) -> Result<RegistrationResult, RegistrationClientError> {
let fallback = self.fallback_client_builder.take();
async fn register_inner(mut self) -> Result<RegistrationResult, RegistrationClientError> {
match &self.config.mode {
RegistrationMode::Mixnet => {
if let Some(fallback) = fallback {
register_with_fallback(fallback).await
} else {
Err(RegistrationClientError::UnsupportedMode)
}
// mixnet registration is not supported for LP
Err(RegistrationClientError::UnsupportedMode)
}
RegistrationMode::Wireguard => {
let lp_registration_result = self
@@ -182,15 +177,9 @@ impl LpBasedRegistrationClient {
// Everything went fine
Some(Ok(res)) => Ok(res),
// LP reg failed, try fallback if we have one
Some(Err(e)) => {
tracing::error!("LP registration failed : {e}");
if let Some(fallback) = fallback {
tracing::info!("Registering with fallback");
register_with_fallback(fallback).await
} else {
Err(e)
}
Err(e)
}
// Cancelled registration
@@ -199,12 +188,14 @@ impl LpBasedRegistrationClient {
}
}
}
}
async fn register_with_fallback(
client_builder: RegistrationClientBuilder,
) -> Result<RegistrationResult, RegistrationClientError> {
// This is forcefully building a mixnet based client
let fallback_client = client_builder.build_mixnet().await?;
fallback_client.register().await
pub(crate) async fn register(mut self) -> Result<RegistrationResult, RegistrationClientError> {
let timeout = self.config.lp_registration_config.exchange_timeout;
tokio::time::timeout(timeout, self.register_inner())
.await
.unwrap_or_else(|timeout| {
warn!("timed out while attempting to complete LP registration");
Err(RegistrationClientError::Timeout(timeout))
})
}
}
+27 -24
View File
@@ -28,40 +28,43 @@ use std::time::Duration;
/// - Optimize for latency over throughput (small messages)
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct LpRegistrationConfig {
/// TCP connection timeout (nym-102).
/// TCP connection timeout.
///
/// Maximum time to wait for TCP connection establishment.
/// Default: 10 seconds.
/// Default: 5 seconds.
pub connect_timeout: Duration,
/// Noise protocol handshake timeout (nym-102).
/// KKT/PSQ protocol handshake timeout.
///
/// Maximum time to wait for Noise handshake completion (all round-trips).
/// Default: 15 seconds.
/// Maximum time to wait for KKT/PSQ handshake completion with the entry (all round-trips).
/// Default: 8 seconds.
pub handshake_timeout: Duration,
/// Registration request/response timeout (nym-102).
/// Registration request/response timeout.
///
/// Maximum time to wait for registration request send + response receive.
/// Includes credential verification on gateway side.
/// Default: 30 seconds.
/// Default: 8 seconds.
pub registration_timeout: Duration,
/// Maximum time for the whole exchange (handshake + registration).
/// Default: 20 seconds.
pub exchange_timeout: Duration,
/// Forward packet send/receive timeout.
///
/// Maximum time to wait for forward packet send + response receive via entry gateway.
/// Covers the entire round-trip through entry to exit gateway and back.
/// Default: 30 seconds.
/// Maximum time to wait for forward packet to get sent via entry gateway.
/// Default: 3 seconds.
pub forward_timeout: Duration,
/// Enable TCP_NODELAY (disable Nagle's algorithm) (nym-104).
/// Enable TCP_NODELAY (disable Nagle's algorithm).
///
/// When true, disables Nagle's algorithm for lower latency.
/// Recommended for registration messages which are small and latency-sensitive.
/// Default: true.
pub tcp_nodelay: bool,
/// TCP keepalive duration (nym-104).
/// TCP keepalive duration.
///
/// When Some, enables TCP keepalive with specified interval.
/// Since LP is registration-only with short-lived connections, keepalive is not needed.
@@ -72,15 +75,14 @@ pub struct LpRegistrationConfig {
impl Default for LpRegistrationConfig {
fn default() -> Self {
Self {
// nym-102: Sane timeout defaults for real network conditions
connect_timeout: Duration::from_secs(10),
handshake_timeout: Duration::from_secs(15),
registration_timeout: Duration::from_secs(30),
forward_timeout: Duration::from_secs(30),
connect_timeout: Duration::from_secs(5),
handshake_timeout: Duration::from_secs(8),
registration_timeout: Duration::from_secs(8),
exchange_timeout: Duration::from_secs(20),
forward_timeout: Duration::from_secs(3),
// nym-104: Optimized for registration-only protocol
tcp_nodelay: true, // Lower latency for small messages
tcp_keepalive: None, // Not needed for ephemeral connections
tcp_nodelay: true,
tcp_keepalive: None,
}
}
}
@@ -93,10 +95,11 @@ mod tests {
fn test_default_config() {
let config = LpRegistrationConfig::default();
assert_eq!(config.connect_timeout, Duration::from_secs(10));
assert_eq!(config.handshake_timeout, Duration::from_secs(15));
assert_eq!(config.registration_timeout, Duration::from_secs(30));
assert_eq!(config.forward_timeout, Duration::from_secs(30));
assert_eq!(config.connect_timeout, Duration::from_secs(5));
assert_eq!(config.handshake_timeout, Duration::from_secs(8));
assert_eq!(config.registration_timeout, Duration::from_secs(8));
assert_eq!(config.forward_timeout, Duration::from_secs(3));
assert_eq!(config.exchange_timeout, Duration::from_secs(20));
assert!(config.tcp_nodelay);
assert_eq!(config.tcp_keepalive, None);
}
+1 -1
View File
@@ -41,4 +41,4 @@ windows = { version = "0.61", features = [
[dev-dependencies]
tempfile.workspace = true
tracing-subscriber.workspace = true
tracing-test.workspace = true
+96 -27
View File
@@ -3,8 +3,9 @@
use std::{
io,
ops::{Deref, DerefMut},
ops::Deref,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
@@ -26,10 +27,8 @@ const CHECK_FILES_CLOSED_MAX_ATTEMPTS: u8 = 20;
/// Delay between file checks
const CHECK_FILES_CLOSED_RETRY_DELAY: Duration = Duration::from_millis(100);
/// `sqlx::SqlitePool` wrapper providing a workaround for the [known bug](https://github.com/launchbadge/sqlx/issues/3217).
/// In principle after requesting to close the sqlite pool, the wrapper monitors open file descriptor and polls periodically until all database files are closed.
#[derive(Debug, Clone)]
pub struct SqlitePoolGuard {
#[derive(Debug)]
struct SqlitePoolGuardInner {
/// Path to sqlite database file.
database_path: PathBuf,
@@ -37,6 +36,18 @@ pub struct SqlitePoolGuard {
connection_pool: sqlx::SqlitePool,
}
/// `sqlx::SqlitePool` wrapper providing a workaround for the [known bug](https://github.com/launchbadge/sqlx/issues/3217).
/// In principle after requesting to close the sqlite pool, the wrapper monitors open file descriptor and polls periodically until all database files are closed.
///
/// This type is cheaply [`Clone`]-able: all clones share the same underlying pool and the same
/// reference count. The `Drop` impl only emits a warning when the **last** reference is dropped
/// without an explicit [`close`](Self::close) call, so it is safe to clone this guard temporarily
/// (e.g. to pass into a spawned task) without triggering spurious warnings.
#[derive(Debug, Clone)]
pub struct SqlitePoolGuard {
inner: Arc<SqlitePoolGuardInner>,
}
impl SqlitePoolGuard {
/// Create new instance providing path to database and connection pool
pub fn new(connection_pool: sqlx::SqlitePool) -> Self {
@@ -46,46 +57,70 @@ impl SqlitePoolGuard {
.to_path_buf();
Self {
database_path,
connection_pool,
inner: Arc::new(SqlitePoolGuardInner {
database_path,
connection_pool,
}),
}
}
/// Returns database path
pub fn database_path(&self) -> &Path {
&self.database_path
&self.inner.database_path
}
/// Close udnerlying sqlite pool and wait for files to be closed before returning.
/// Close the underlying sqlite pool and wait for OS file handles to be released.
///
/// **Callers must invoke this method before dropping the guard.** The `Drop` impl does
/// not perform async cleanup; it only logs a warning when the pool was not closed
/// beforehand.
pub async fn close(&self) {
// Avoid waiting for db files once the pool is marked closed to ensure that we don't wait on some other sqlite pool to close the database.
if !self.connection_pool.is_closed() {
tracing::info!("Closing sqlite pool: {}", self.database_path.display());
if !self.inner.connection_pool.is_closed() {
tracing::info!(
"Closing sqlite pool: {}",
self.inner.database_path.display()
);
self.close_pool_inner().await.ok();
}
}
async fn close_pool_inner(&self) -> std::io::Result<()> {
self.connection_pool.close().await;
self.inner.connection_pool.close().await;
self.wait_for_db_files_close().await.inspect_err(|e| {
tracing::error!("Failed to wait for file to close: {e}");
})
if let Err(e) = self.wait_for_db_files_close().await {
if e.kind() == std::io::ErrorKind::TimedOut {
tracing::warn!(
"Timed out waiting for OS file handles for sqlite database to be released; \
another connection to the same file may still be open. Path = {}",
self.inner.database_path.display()
);
} else {
tracing::warn!(
"Failed to wait for sqlite database file handles to be released: Path = {}. Error = {}",
self.inner.database_path.display(),
e
);
}
}
Ok(())
}
/// Returns all database files, including shm and wal files.
fn all_database_files(&self) -> Vec<PathBuf> {
let mut database_files = vec![];
let canonical_path = self
.inner
.database_path
.canonicalize()
.inspect_err(|e| {
tracing::error!(
"Failed to canonicalize path: {}. Cause: {e}",
self.database_path.display()
self.inner.database_path.display()
);
})
.unwrap_or(self.database_path.clone());
.unwrap_or(self.inner.database_path.clone());
if let Some(ext) = canonical_path.extension() {
for added_ext in ["-shm", "-wal"] {
@@ -120,34 +155,38 @@ impl SqlitePoolGuard {
}
}
impl Drop for SqlitePoolGuard {
fn drop(&mut self) {
if Arc::strong_count(&self.inner) == 1 && !self.inner.connection_pool.is_closed() {
tracing::warn!(
"SqlitePoolGuard dropped without explicit close(); path={}",
self.inner.database_path.display()
);
}
}
}
impl Deref for SqlitePoolGuard {
type Target = sqlx::SqlitePool;
fn deref(&self) -> &Self::Target {
&self.connection_pool
&self.inner.connection_pool
}
}
impl DerefMut for SqlitePoolGuard {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.connection_pool
}
}
#[cfg(test)]
mod tests {
use sqlx::{
ConnectOptions, Executor,
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
};
use tracing_test::traced_test;
use super::*;
#[traced_test]
#[tokio::test]
async fn test_wait_close() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();
let temp_dir = tempfile::tempdir().unwrap();
let database_path = temp_dir.path().join("storage.db");
@@ -177,4 +216,34 @@ mod tests {
assert!(guard.close_pool_inner().await.is_ok());
tokio::fs::remove_file(database_path).await.unwrap();
}
#[traced_test]
#[tokio::test]
async fn test_clone_drop_no_warning() {
// Cloning the guard and dropping the clone should not warn because the original is still alive.
let temp_dir = tempfile::tempdir().unwrap();
let database_path = temp_dir.path().join("storage2.db");
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path.clone())
.create_if_missing(true)
.disable_statement_logging();
let connection_pool = sqlx::SqlitePool::connect_with(opts).await.unwrap();
let guard = SqlitePoolGuard::new(connection_pool);
{
let _clone = guard.clone();
assert_eq!(Arc::strong_count(&guard.inner), 2);
}
assert_eq!(Arc::strong_count(&guard.inner), 1);
assert!(!logs_contain(
"SqlitePoolGuard dropped without explicit close"
));
guard.close().await;
tokio::fs::remove_file(database_path).await.unwrap();
}
}
@@ -9,9 +9,11 @@ use crate::ip_packet_client::{
use crate::mixnet::{MixnetClient, MixnetStream, Recipient};
use crate::Error;
use bytes::Bytes;
use current_ipr::response::IpPacketResponse;
use nym_ip_packet_requests::response_helpers;
use nym_ip_packet_requests::{v9 as current_ipr, IpPair};
use nym_ip_packet_requests::{
v9::{self, response::IpPacketResponse},
IpPair,
};
use nym_network_defaults::NymNetworkDetails;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
@@ -105,7 +107,7 @@ impl IpMixStream {
}
async fn connect_tunnel(stream: &mut MixnetStream) -> Result<IpPair, Error> {
let (request, request_id) = current_ipr::new_connect_request(None);
let (request, request_id) = v9::new_connect_request(None);
debug!("Sending connect request with ID: {}", request_id);
let request_bytes = request.to_bytes()?;
@@ -144,7 +146,7 @@ impl IpMixStream {
/// Send an IP packet through the tunnel.
pub async fn send_ip_packet(&mut self, packet: &[u8]) -> Result<(), Error> {
self.check_connected()?;
let request = current_ipr::new_data_request(packet.to_vec().into());
let request = v9::new_data_request(packet.to_vec().into());
let request_bytes = request.to_bytes()?;
self.stream
.write_all(&request_bytes)
@@ -218,10 +218,7 @@ fn create_ip_packet_response(
ClientVersion::V6 => IpPacketResponseV6::new_ip_packet(packets).to_bytes(),
ClientVersion::V7 => IpPacketResponseV7::new_ip_packet(packets).to_bytes(),
ClientVersion::V8 => IpPacketResponseV8::new_ip_packet(packets).to_bytes(),
ClientVersion::V9 => {
let resp = v9::new_ip_packet_response(packets);
resp.to_bytes()
}
ClientVersion::V9 => v9::new_ip_packet_response(packets).to_bytes(),
}
}
@@ -10,7 +10,6 @@ use nym_ip_packet_requests::{
IpPair, v6::request::IpPacketRequest as IpPacketRequestV6,
v7::request::IpPacketRequest as IpPacketRequestV7,
v8::request::IpPacketRequest as IpPacketRequestV8,
v9::request::IpPacketRequest as IpPacketRequestV9,
};
use nym_sdk::mixnet::ReconstructedMessage;
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
@@ -132,14 +131,14 @@ impl TryFrom<&ReconstructedMessage> for IpPacketRequest {
Ok(IpPacketRequest::from((request_v8, sender_tag)))
}
9 => {
let request_v9 = IpPacketRequestV9::from_reconstructed_message(reconstructed)
let request_v8 = IpPacketRequestV8::from_reconstructed_message(reconstructed)
.map_err(
|source| IpPacketRouterError::FailedToDeserializeTaggedPacket { source },
)?;
let sender_tag = reconstructed
.sender_tag
.ok_or(IpPacketRouterError::MissingSenderTag)?;
Ok(v9::convert(request_v9, sender_tag))
Ok(v9::convert(request_v8, sender_tag))
}
_ => {
log::info!("Received packet with invalid version: v{request_version}");
@@ -130,12 +130,7 @@ impl VersionedResponse {
ClientVersion::V6 => IpPacketResponseV6::try_from(self)?.to_bytes(),
ClientVersion::V7 => IpPacketResponseV7::try_from(self)?.to_bytes(),
ClientVersion::V8 => IpPacketResponseV8::try_from(self)?.to_bytes(),
ClientVersion::V9 => {
let mut resp = IpPacketResponseV8::try_from(self)?;
resp.version = nym_ip_packet_requests::v9::VERSION;
let bytes = resp.to_bytes();
bytes
}
ClientVersion::V9 => IpPacketResponseV8::try_from(self)?.to_bytes(),
}
.map_err(|err| IpPacketRouterError::FailedToSerializeResponsePacket { source: err })
}