Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f4bd48263d | |||
| 5c40052d39 | |||
| 7b858dfd69 | |||
| a4bd547023 | |||
| db03ec31b1 | |||
| 9b285735b8 | |||
| 691280797a |
Generated
+1
-1
@@ -8243,7 +8243,7 @@ dependencies = [
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"tracing-test",
|
||||
"windows 0.61.3",
|
||||
]
|
||||
|
||||
|
||||
@@ -25,6 +25,8 @@ pub trait BandwidthTicketProvider: Send + Sync {
|
||||
) -> Result<PreparedCredential, BandwidthControllerError>;
|
||||
|
||||
async fn get_upgrade_mode_token(&self) -> Result<Option<String>, BandwidthControllerError>;
|
||||
|
||||
async fn close(&self) {}
|
||||
}
|
||||
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
@@ -56,6 +58,10 @@ where
|
||||
.map_err(|_| BandwidthControllerError::MalformedUpgradeModeToken)?;
|
||||
Ok(Some(token))
|
||||
}
|
||||
|
||||
async fn close(&self) {
|
||||
self.storage.close().await;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
@@ -75,4 +81,8 @@ impl<T: BandwidthTicketProvider + ?Sized + Send> BandwidthTicketProvider for Box
|
||||
async fn get_upgrade_mode_token(&self) -> Result<Option<String>, BandwidthControllerError> {
|
||||
(**self).get_upgrade_mode_token().await
|
||||
}
|
||||
|
||||
async fn close(&self) {
|
||||
(**self).close().await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1023,6 +1023,16 @@ where
|
||||
let encryption_keys = init_res.client_keys.encryption_keypair();
|
||||
let identity_keys = init_res.client_keys.identity_keypair();
|
||||
|
||||
let credential_store_for_close = credential_store.clone();
|
||||
let close_credential_token = shutdown_tracker.clone_shutdown_token();
|
||||
shutdown_tracker.try_spawn_named(
|
||||
async move {
|
||||
close_credential_token.cancelled().await;
|
||||
credential_store_for_close.close().await;
|
||||
},
|
||||
"CredentialStorage::close_on_shutdown",
|
||||
);
|
||||
|
||||
// the components are started in very specific order. Unless you know what you are doing,
|
||||
// do not change that.
|
||||
let bandwidth_controller = self
|
||||
|
||||
@@ -11,11 +11,17 @@ use nym_bandwidth_controller::BandwidthController;
|
||||
use nym_client_core_gateways_storage::OnDiskGatewaysDetails;
|
||||
use nym_credential_storage::storage::Storage as CredentialStorage;
|
||||
use nym_validator_client::{QueryHttpRpcNyxdClient, nyxd};
|
||||
use std::{io, path::Path};
|
||||
use std::{io, path::Path, time::Duration};
|
||||
use time::OffsetDateTime;
|
||||
use tracing::{error, info, trace};
|
||||
use url::Url;
|
||||
|
||||
/// Maximum rename retry attempts when the database file is temporarily locked.
|
||||
const ARCHIVE_MAX_RETRY_ATTEMPTS: u8 = 15;
|
||||
|
||||
/// Delay between archive rename retry attempts.
|
||||
const ARCHIVE_RETRY_DELAY: Duration = Duration::from_millis(200);
|
||||
|
||||
async fn setup_fresh_backend<P: AsRef<Path>>(
|
||||
db_path: P,
|
||||
surb_config: &config::ReplySurbs,
|
||||
@@ -74,13 +80,58 @@ async fn archive_corrupted_database<P: AsRef<Path>>(db_path: P) -> io::Result<()
|
||||
};
|
||||
let renamed = db_path.with_extension(new_extension);
|
||||
|
||||
tokio::fs::rename(db_path, &renamed).await.inspect_err(|_| {
|
||||
error!(
|
||||
"Failed to rename corrupt database file: {} to {}",
|
||||
db_path.display(),
|
||||
renamed.display()
|
||||
);
|
||||
})
|
||||
// On Windows, sqlx may release its OS file handles asynchronously after
|
||||
// pool.close() returns, briefly keeping the file locked
|
||||
// (ERROR_SHARING_VIOLATION, os error 32). Retry with a short delay to
|
||||
// give the OS time to flush the remaining handles.
|
||||
for attempt in 0..ARCHIVE_MAX_RETRY_ATTEMPTS {
|
||||
match tokio::fs::rename(db_path, &renamed).await {
|
||||
Ok(()) => return Ok(()),
|
||||
Err(e) if is_file_locked_error(&e) && (attempt + 1) < ARCHIVE_MAX_RETRY_ATTEMPTS => {
|
||||
trace!(
|
||||
"Database file is temporarily locked, retrying archive \
|
||||
(attempt {}/{}): {e}",
|
||||
attempt + 1,
|
||||
ARCHIVE_MAX_RETRY_ATTEMPTS
|
||||
);
|
||||
tokio::time::sleep(ARCHIVE_RETRY_DELAY).await;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to rename corrupt database file: {} to {}",
|
||||
db_path.display(),
|
||||
renamed.display()
|
||||
);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Reached only when every attempt was blocked by a file lock.
|
||||
error!(
|
||||
"Failed to rename corrupt database file after {} attempts: {} to {}",
|
||||
ARCHIVE_MAX_RETRY_ATTEMPTS,
|
||||
db_path.display(),
|
||||
renamed.display()
|
||||
);
|
||||
Err(io::Error::other(
|
||||
"corrupt database archive blocked by persistent file lock",
|
||||
))
|
||||
}
|
||||
|
||||
/// Returns `true` when the IO error indicates a temporary file lock held by another handle
|
||||
/// within the same process. Only meaningful on Windows; always `false` elsewhere.
|
||||
fn is_file_locked_error(e: &io::Error) -> bool {
|
||||
#[cfg(windows)]
|
||||
{
|
||||
// ERROR_SHARING_VIOLATION = 32, ERROR_LOCK_VIOLATION = 33
|
||||
matches!(e.raw_os_error(), Some(32) | Some(33))
|
||||
}
|
||||
#[cfg(not(windows))]
|
||||
{
|
||||
let _ = e;
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn setup_fs_reply_surb_backend<P: AsRef<Path>>(
|
||||
|
||||
@@ -337,6 +337,8 @@ impl ReplyStorageBackend for Backend {
|
||||
}
|
||||
|
||||
async fn stop_storage_session(self) -> Result<(), Self::StorageError> {
|
||||
self.stop_client_use().await
|
||||
let result = self.stop_client_use().await;
|
||||
self.shutdown().await;
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,6 +48,7 @@ where
|
||||
debug!("Started PersistentReplyStorage");
|
||||
if let Err(err) = self.backend.start_storage_session().await {
|
||||
error!("failed to start the storage session - {err}");
|
||||
self.backend.stop_storage_session().await.ok();
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -55,10 +56,11 @@ where
|
||||
|
||||
info!("PersistentReplyStorage is flushing all reply-related data to underlying storage");
|
||||
if let Err(err) = self.backend.flush_surb_storage(&mem_state).await {
|
||||
error!("failed to flush our reply-related data to the persistent storage: {err}")
|
||||
} else {
|
||||
info!("Data flush is complete")
|
||||
error!("failed to flush our reply-related data to the persistent storage: {err}");
|
||||
self.backend.stop_storage_session().await.ok();
|
||||
return;
|
||||
}
|
||||
info!("Data flush is complete");
|
||||
|
||||
if let Err(err) = self.backend.stop_storage_session().await {
|
||||
error!("failed to properly stop the storage session - {err}. We might not be able to smoothly restore it")
|
||||
|
||||
@@ -1173,7 +1173,16 @@ impl ApiClientCore for Client {
|
||||
};
|
||||
|
||||
match response {
|
||||
Ok(resp) => return Ok(resp),
|
||||
Ok(resp) => {
|
||||
// Check if the response includes a rate limit error from the vercel API
|
||||
if is_http_rate_limit_err(&resp) {
|
||||
warn!("encountered vercel rate limit error for {}", url.as_str());
|
||||
// if we have multiple urls, update to the next
|
||||
self.maybe_rotate_hosts(Some(url.clone()));
|
||||
}
|
||||
|
||||
return Ok(resp);
|
||||
}
|
||||
Err(err) => {
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let is_network_err = err.is_timeout();
|
||||
@@ -1226,17 +1235,39 @@ impl ApiClientCore for Client {
|
||||
}
|
||||
}
|
||||
|
||||
const VERCEL_CHALLENGE_HEADER: &str = "x-vercel-mitigated";
|
||||
const VERCEL_CHALLENGE_VALUE: &[u8] = b"challenge";
|
||||
|
||||
/// Check for Rate Limit challenge response from the vercel API
|
||||
pub(crate) fn is_http_rate_limit_err(resp: &Response) -> bool {
|
||||
let status = resp.status() == StatusCode::FORBIDDEN;
|
||||
let header = resp
|
||||
.headers()
|
||||
.get(VERCEL_CHALLENGE_HEADER)
|
||||
.is_some_and(|v| v.as_bytes() == VERCEL_CHALLENGE_VALUE);
|
||||
let content_type = resp
|
||||
.headers()
|
||||
.get(CONTENT_TYPE)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.and_then(|value| value.parse::<Mime>().ok())
|
||||
.is_some_and(|mime_type| {
|
||||
mime_type.type_() == mime::TEXT && mime_type.subtype() == mime::HTML
|
||||
});
|
||||
|
||||
status && header && content_type
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
const MAX_ERR_SOURCE_ITERATIONS: usize = 4;
|
||||
|
||||
/// This functions attempts to check the error returned by reqwest to see if
|
||||
/// rotating host informtion (for clients with mutliple hosts defined) could be
|
||||
/// helpful. This looks for situations where the error could plausibly be caused
|
||||
/// by a network adversary, or where rotating to an equival hostname might help.
|
||||
/// This functions attempts to check the error returned by reqwest to see if rotating host
|
||||
/// information (for clients with multiple hosts defined) could be helpful. This looks for
|
||||
/// situations where the error could plausibly be caused by a network adversary, or where rotating
|
||||
/// to an equivalent hostname might help.
|
||||
///
|
||||
/// For example --> NetworkUnreachable will not be helped by rotating domains,
|
||||
/// but ConnectionReset might be caused by a network adversary blocking by SNI
|
||||
/// which could possibly benefit from rotating domains.
|
||||
/// For example --> NetworkUnreachable will not be helped by rotating domains, but ConnectionReset
|
||||
/// might be caused by a network adversary blocking by SNI which could possibly benefit from
|
||||
/// rotating domains.
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub(crate) fn might_be_network_interference(err: &reqwest::Error) -> bool {
|
||||
if err.is_timeout() {
|
||||
@@ -1697,6 +1728,13 @@ where
|
||||
decode_raw_response(&headers, full)
|
||||
} else if res.status() == StatusCode::NOT_FOUND {
|
||||
Err(HttpClientError::NotFound { url: Box::new(url) })
|
||||
} else if is_http_rate_limit_err(&res) {
|
||||
Err(HttpClientError::EndpointFailure {
|
||||
url: Box::new(url),
|
||||
status,
|
||||
headers: Box::new(headers),
|
||||
error: String::from("received vercel rate limit challenge response"),
|
||||
})
|
||||
} else {
|
||||
let Ok(plaintext) = res.text().await else {
|
||||
return Err(HttpClientError::RequestFailure {
|
||||
|
||||
@@ -20,18 +20,6 @@ pub const MAX_NON_STREAM_VERSION: u8 = v8::VERSION;
|
||||
/// mixnet sends, matching the node-side enforcement in `ip-packet-router`.
|
||||
pub const SPHINX_STREAM_VERSION_THRESHOLD: u8 = v9::VERSION;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn stream_transport_threshold_is_consistent() {
|
||||
assert_eq!(MAX_NON_STREAM_VERSION, 8);
|
||||
assert_eq!(SPHINX_STREAM_VERSION_THRESHOLD, 9);
|
||||
assert!(SPHINX_STREAM_VERSION_THRESHOLD > MAX_NON_STREAM_VERSION);
|
||||
}
|
||||
}
|
||||
|
||||
// version 3: initial version
|
||||
// version 4: IPv6 support
|
||||
// version 5: Add severity level to info response
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use nym_ip_packet_client::current::response::{
|
||||
use nym_ip_packet_requests::v8::response::{
|
||||
ControlResponse, DataResponse, InfoLevel, IpPacketResponse, IpPacketResponseData,
|
||||
};
|
||||
use nym_ip_packet_client::lp_stream;
|
||||
use nym_sdk::{
|
||||
DebugConfig, NymApiTopologyProvider, NymApiTopologyProviderConfig, NymNetworkDetails,
|
||||
TopologyProvider, mixnet::ReconstructedMessage,
|
||||
@@ -33,10 +32,7 @@ pub fn mixnet_debug_config(
|
||||
}
|
||||
|
||||
pub fn unpack_data_response(reconstructed_message: &ReconstructedMessage) -> Option<DataResponse> {
|
||||
let payload =
|
||||
lp_stream::maybe_unwrap_lp_stream_payload_from_reconstructed(reconstructed_message);
|
||||
|
||||
match IpPacketResponse::from_bytes(payload) {
|
||||
match IpPacketResponse::from_reconstructed_message(reconstructed_message) {
|
||||
Ok(response) => match response.data {
|
||||
IpPacketResponseData::Data(data_response) => Some(data_response),
|
||||
IpPacketResponseData::Control(control) => match *control {
|
||||
|
||||
@@ -9,7 +9,7 @@ use nym_connection_monitor::{
|
||||
wrap_icmp_in_ipv6,
|
||||
},
|
||||
};
|
||||
use nym_ip_packet_requests::{IpPair, codec::MultiIpPacketCodec};
|
||||
use nym_ip_packet_requests::{IpPair, codec::MultiIpPacketCodec, v8::request::IpPacketRequest};
|
||||
use nym_sdk::mixnet::{
|
||||
InputMessage, MixnetClient, MixnetMessageSender, Recipient, TransmissionLane,
|
||||
};
|
||||
@@ -25,7 +25,6 @@ pub async fn send_ping_v4(
|
||||
sequence_number: u16,
|
||||
destination: Ipv4Addr,
|
||||
exit_router_address: Recipient,
|
||||
stream_id: u64,
|
||||
) -> anyhow::Result<()> {
|
||||
let icmp_identifier = icmp_identifier();
|
||||
let icmp_echo_request = create_icmpv4_echo_request(sequence_number, icmp_identifier)?;
|
||||
@@ -36,12 +35,7 @@ pub async fn send_ping_v4(
|
||||
MultiIpPacketCodec::bundle_one_packet(ipv4_packet.packet().to_vec().into());
|
||||
|
||||
// Wrap into a mixnet input message addressed to the IPR
|
||||
let mixnet_message = create_input_message(
|
||||
exit_router_address,
|
||||
bundled_packet,
|
||||
stream_id,
|
||||
sequence_number,
|
||||
)?;
|
||||
let mixnet_message = create_input_message(exit_router_address, bundled_packet)?;
|
||||
|
||||
mixnet_client.send(mixnet_message).await?;
|
||||
Ok(())
|
||||
@@ -53,7 +47,6 @@ pub async fn send_ping_v6(
|
||||
sequence_number: u16,
|
||||
destination: Ipv6Addr,
|
||||
exit_router_address: Recipient,
|
||||
stream_id: u64,
|
||||
) -> anyhow::Result<()> {
|
||||
let icmp_identifier = icmp_identifier();
|
||||
let icmp_echo_request = create_icmpv6_echo_request(
|
||||
@@ -69,12 +62,7 @@ pub async fn send_ping_v6(
|
||||
MultiIpPacketCodec::bundle_one_packet(ipv6_packet.packet().to_vec().into());
|
||||
|
||||
// Wrap into a mixnet input message addressed to the IPR
|
||||
let mixnet_message = create_input_message(
|
||||
exit_router_address,
|
||||
bundled_packet,
|
||||
stream_id,
|
||||
sequence_number,
|
||||
)?;
|
||||
let mixnet_message = create_input_message(exit_router_address, bundled_packet)?;
|
||||
|
||||
// Send across the mixnet
|
||||
mixnet_client.send(mixnet_message).await?;
|
||||
@@ -84,22 +72,15 @@ pub async fn send_ping_v6(
|
||||
fn create_input_message(
|
||||
recipient: impl Into<Recipient>,
|
||||
bundled_packets: Bytes,
|
||||
stream_id: u64,
|
||||
sequence_number: u16,
|
||||
) -> anyhow::Result<InputMessage> {
|
||||
let packet = nym_ip_packet_client::current::new_data_request(bundled_packets).to_bytes()?;
|
||||
let framed_packet = nym_ip_packet_client::lp_stream::encode_stream_frame(
|
||||
stream_id,
|
||||
sequence_number as u32,
|
||||
packet,
|
||||
);
|
||||
let packet = IpPacketRequest::new_data_request(bundled_packets).to_bytes()?;
|
||||
|
||||
let lane = TransmissionLane::General;
|
||||
let packet_type = None;
|
||||
let surbs = 0;
|
||||
Ok(InputMessage::new_anonymous(
|
||||
recipient.into(),
|
||||
framed_packet,
|
||||
packet,
|
||||
surbs,
|
||||
lane,
|
||||
packet_type,
|
||||
|
||||
@@ -256,8 +256,8 @@ pub async fn do_ping(
|
||||
let (maybe_ip_pair, mut mixnet_client) =
|
||||
connect_exit(mixnet_client, exit_router_address).await;
|
||||
match maybe_ip_pair {
|
||||
Some((ip_pair, stream_id)) => (
|
||||
do_ping_exit(&mut mixnet_client, ip_pair, stream_id, exit_router_address).await,
|
||||
Some(ip_pair) => (
|
||||
do_ping_exit(&mut mixnet_client, ip_pair, exit_router_address).await,
|
||||
mixnet_client,
|
||||
),
|
||||
None => (Ok(Some(Exit::fail_to_connect())), mixnet_client),
|
||||
@@ -304,7 +304,7 @@ async fn do_ping_entry(
|
||||
async fn connect_exit(
|
||||
mixnet_client: MixnetClient,
|
||||
exit_router_address: Recipient,
|
||||
) -> (Option<(IpPair, u64)>, MixnetClient) {
|
||||
) -> (Option<IpPair>, MixnetClient) {
|
||||
// Step 2: connect to the exit gateway
|
||||
info!(
|
||||
"Connecting to exit gateway: {}",
|
||||
@@ -315,19 +315,12 @@ async fn connect_exit(
|
||||
let mut ipr_client = IprClientConnect::new(mixnet_client, cancel_token);
|
||||
|
||||
let maybe_ip_pair = ipr_client.connect(exit_router_address).await;
|
||||
let stream_id = ipr_client.stream_id();
|
||||
let mixnet_client = ipr_client.into_mixnet_client();
|
||||
|
||||
if let Ok(our_ips) = maybe_ip_pair {
|
||||
info!("Successfully connected to exit gateway");
|
||||
info!("Using mixnet VPN IP addresses: {our_ips}");
|
||||
let Some(stream_id) = stream_id else {
|
||||
tracing::warn!(
|
||||
"No active IPR stream id set after connect; cannot run IPR data-plane tests"
|
||||
);
|
||||
return (None, mixnet_client);
|
||||
};
|
||||
(Some((our_ips, stream_id)), mixnet_client)
|
||||
(Some(our_ips), mixnet_client)
|
||||
} else {
|
||||
(None, mixnet_client)
|
||||
}
|
||||
@@ -336,11 +329,10 @@ async fn connect_exit(
|
||||
pub async fn do_ping_exit(
|
||||
mixnet_client: &mut MixnetClient,
|
||||
our_ips: IpPair,
|
||||
stream_id: u64,
|
||||
exit_router_address: Recipient,
|
||||
) -> anyhow::Result<Option<Exit>> {
|
||||
// Step 3: perform ICMP connectivity checks for the exit gateway
|
||||
send_icmp_pings(mixnet_client, our_ips, exit_router_address, stream_id).await?;
|
||||
send_icmp_pings(mixnet_client, our_ips, exit_router_address).await?;
|
||||
listen_for_icmp_ping_replies(mixnet_client, our_ips).await
|
||||
}
|
||||
|
||||
@@ -348,7 +340,6 @@ async fn send_icmp_pings(
|
||||
mixnet_client: &MixnetClient,
|
||||
our_ips: IpPair,
|
||||
exit_router_address: Recipient,
|
||||
stream_id: u64,
|
||||
) -> anyhow::Result<()> {
|
||||
// ipv4 addresses for testing
|
||||
let ipr_tun_ip_v4 = NYM_TUN_DEVICE_ADDRESS_V4;
|
||||
@@ -370,7 +361,6 @@ async fn send_icmp_pings(
|
||||
ii,
|
||||
ipr_tun_ip_v4,
|
||||
exit_router_address,
|
||||
stream_id,
|
||||
)
|
||||
.await?;
|
||||
icmp::send_ping_v4(
|
||||
@@ -379,7 +369,6 @@ async fn send_icmp_pings(
|
||||
ii,
|
||||
external_ip_v4,
|
||||
exit_router_address,
|
||||
stream_id,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@@ -392,7 +381,6 @@ async fn send_icmp_pings(
|
||||
ii,
|
||||
ipr_tun_ip_v6,
|
||||
exit_router_address,
|
||||
stream_id,
|
||||
)
|
||||
.await?;
|
||||
icmp::send_ping_v6(
|
||||
@@ -401,7 +389,6 @@ async fn send_icmp_pings(
|
||||
ii,
|
||||
external_ip_v6,
|
||||
exit_router_address,
|
||||
stream_id,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ use tracing::{debug, error};
|
||||
use nym_ip_packet_requests::response_helpers::{self, IprResponseError};
|
||||
|
||||
use crate::{
|
||||
current::{self, response::IpPacketResponse},
|
||||
current::{request::IpPacketRequest, response::IpPacketResponse},
|
||||
error::{Error, Result},
|
||||
helpers::check_ipr_message_version,
|
||||
};
|
||||
@@ -24,12 +24,8 @@ const IPR_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
enum ConnectionState {
|
||||
Disconnected,
|
||||
Connecting {
|
||||
stream_id: u64,
|
||||
},
|
||||
Connected {
|
||||
stream_id: u64,
|
||||
},
|
||||
Connecting,
|
||||
Connected,
|
||||
#[allow(unused)]
|
||||
Disconnecting,
|
||||
}
|
||||
@@ -55,27 +51,17 @@ impl IprClientConnect {
|
||||
self.mixnet_client
|
||||
}
|
||||
|
||||
pub fn stream_id(&self) -> Option<u64> {
|
||||
match self.connected {
|
||||
ConnectionState::Connecting { stream_id }
|
||||
| ConnectionState::Connected { stream_id } => Some(stream_id),
|
||||
ConnectionState::Disconnected | ConnectionState::Disconnecting => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn connect(&mut self, ip_packet_router_address: Recipient) -> Result<IpPair> {
|
||||
if self.connected != ConnectionState::Disconnected {
|
||||
return Err(Error::AlreadyConnected);
|
||||
}
|
||||
|
||||
tracing::info!("Connecting to exit gateway");
|
||||
self.connected = ConnectionState::Connecting;
|
||||
match self.connect_inner(ip_packet_router_address).await {
|
||||
Ok(ips) => {
|
||||
debug!("Successfully connected to the ip-packet-router");
|
||||
let Some(stream_id) = self.stream_id() else {
|
||||
return Err(Error::UnexpectedConnectResponse);
|
||||
};
|
||||
self.connected = ConnectionState::Connected { stream_id };
|
||||
self.connected = ConnectionState::Connected;
|
||||
Ok(ips)
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -88,37 +74,21 @@ impl IprClientConnect {
|
||||
|
||||
async fn connect_inner(&mut self, ip_packet_router_address: Recipient) -> Result<IpPair> {
|
||||
let request_id = self.send_connect_request(ip_packet_router_address).await?;
|
||||
self.connected = ConnectionState::Connecting {
|
||||
stream_id: request_id,
|
||||
};
|
||||
|
||||
debug!("Waiting for reply...");
|
||||
self.listen_for_connect_response(request_id).await
|
||||
}
|
||||
|
||||
async fn send_connect_request(&mut self, ip_packet_router_address: Recipient) -> Result<u64> {
|
||||
let (request, request_id) = current::new_connect_request(None);
|
||||
tracing::info!(
|
||||
request_id = request_id,
|
||||
protocol_version = request.protocol.version,
|
||||
current_version = crate::current::VERSION,
|
||||
"Sending IPR connect request"
|
||||
);
|
||||
if let Ok(bytes) = request.to_bytes() {
|
||||
let prefix = bytes.get(0..2).unwrap_or(&bytes);
|
||||
let prefix_hex = format!("{:02x?}", prefix);
|
||||
tracing::info!(request_id = request_id, prefix = %prefix_hex, "IPR connect bytes prefix");
|
||||
}
|
||||
async fn send_connect_request(&self, ip_packet_router_address: Recipient) -> Result<u64> {
|
||||
let (request, request_id) = IpPacketRequest::new_connect_request(None);
|
||||
|
||||
// We use 20 surbs for the connect request because typically the IPR is configured to have
|
||||
// a min threshold of 10 surbs that it reserves for itself to request additional surbs.
|
||||
let surbs = 20;
|
||||
let request_bytes = request.to_bytes()?;
|
||||
let framed_bytes = maybe_wrap_stream_frame(request_id, 0, request_bytes);
|
||||
self.mixnet_client
|
||||
.send(create_input_message(
|
||||
ip_packet_router_address,
|
||||
framed_bytes,
|
||||
request,
|
||||
surbs,
|
||||
)?)
|
||||
.await
|
||||
@@ -159,19 +129,13 @@ impl IprClientConnect {
|
||||
for msg in msgs {
|
||||
// Confirm that the version is correct
|
||||
if let Err(err) = check_ipr_message_version(&msg) {
|
||||
let raw: &[u8] = msg.message.as_ref();
|
||||
tracing::warn!(
|
||||
first_byte = raw.first().copied(),
|
||||
expected = crate::current::VERSION,
|
||||
len = raw.len(),
|
||||
"Mixnet message version mismatch: {err}"
|
||||
);
|
||||
tracing::info!("Mixnet message version mismatch: {err}");
|
||||
continue;
|
||||
}
|
||||
|
||||
// Then we deserialize the message
|
||||
tracing::debug!("IprClient: got message while waiting for connect response");
|
||||
let Ok(response) = ipr_response_from_reconstructed_message(&msg) else {
|
||||
let Ok(response) = IpPacketResponse::from_reconstructed_message(&msg) else {
|
||||
// This is ok, it's likely just one of our self-pings
|
||||
tracing::debug!("Failed to deserialize mixnet message");
|
||||
continue;
|
||||
@@ -196,27 +160,14 @@ impl IprClientConnect {
|
||||
}
|
||||
}
|
||||
|
||||
fn maybe_wrap_stream_frame(stream_id: u64, sequence_num: u32, payload: Vec<u8>) -> Vec<u8> {
|
||||
if !crate::lp_stream::current_requires_sphinx_stream_transport() {
|
||||
return payload;
|
||||
}
|
||||
|
||||
crate::lp_stream::encode_stream_frame(stream_id, sequence_num, payload)
|
||||
}
|
||||
|
||||
fn ipr_response_from_reconstructed_message(
|
||||
msg: &nym_sdk::mixnet::ReconstructedMessage,
|
||||
) -> std::result::Result<IpPacketResponse, bincode::Error> {
|
||||
let warn_on_unexpected = crate::lp_stream::current_requires_sphinx_stream_transport();
|
||||
let payload =
|
||||
crate::lp_stream::maybe_unwrap_lp_stream_payload(&msg.message, warn_on_unexpected);
|
||||
IpPacketResponse::from_bytes(payload)
|
||||
}
|
||||
|
||||
fn create_input_message(recipient: Recipient, bytes: Vec<u8>, surbs: u32) -> Result<InputMessage> {
|
||||
fn create_input_message(
|
||||
recipient: Recipient,
|
||||
request: IpPacketRequest,
|
||||
surbs: u32,
|
||||
) -> Result<InputMessage> {
|
||||
Ok(InputMessage::new_anonymous(
|
||||
recipient,
|
||||
bytes,
|
||||
request.to_bytes()?,
|
||||
surbs,
|
||||
TransmissionLane::General,
|
||||
None,
|
||||
|
||||
@@ -7,16 +7,18 @@ use nym_sdk::mixnet::ReconstructedMessage;
|
||||
use crate::{current::VERSION as CURRENT_VERSION, error::Result};
|
||||
|
||||
pub(crate) fn check_ipr_message_version(message: &ReconstructedMessage) -> Result<()> {
|
||||
let payload = crate::lp_stream::maybe_unwrap_lp_stream_payload_from_reconstructed(message);
|
||||
nym_ip_packet_requests::response_helpers::check_ipr_message_version(payload, CURRENT_VERSION)
|
||||
.map_err(|e| match e {
|
||||
IprResponseError::NoVersionByte => crate::Error::NoVersionInMessage,
|
||||
IprResponseError::VersionMismatch { expected, received } if received < expected => {
|
||||
crate::Error::ReceivedResponseWithOldVersion { expected, received }
|
||||
}
|
||||
IprResponseError::VersionMismatch { expected, received } => {
|
||||
crate::Error::ReceivedResponseWithNewVersion { expected, received }
|
||||
}
|
||||
_ => crate::Error::NoVersionInMessage,
|
||||
})
|
||||
nym_ip_packet_requests::response_helpers::check_ipr_message_version(
|
||||
&message.message,
|
||||
CURRENT_VERSION,
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
IprResponseError::NoVersionByte => crate::Error::NoVersionInMessage,
|
||||
IprResponseError::VersionMismatch { expected, received } if received < expected => {
|
||||
crate::Error::ReceivedResponseWithOldVersion { expected, received }
|
||||
}
|
||||
IprResponseError::VersionMismatch { expected, received } => {
|
||||
crate::Error::ReceivedResponseWithNewVersion { expected, received }
|
||||
}
|
||||
_ => crate::Error::NoVersionInMessage,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -5,11 +5,10 @@ mod connect;
|
||||
mod error;
|
||||
mod helpers;
|
||||
mod listener;
|
||||
pub mod lp_stream;
|
||||
|
||||
pub use connect::IprClientConnect;
|
||||
pub use error::Error;
|
||||
pub use listener::{IprListener, MixnetMessageOutcome};
|
||||
|
||||
// Re-export the currently used version
|
||||
pub use nym_ip_packet_requests::v9 as current;
|
||||
pub use nym_ip_packet_requests::v8 as current;
|
||||
|
||||
@@ -1,100 +0,0 @@
|
||||
use bytes::BytesMut;
|
||||
use nym_ip_packet_requests::SPHINX_STREAM_VERSION_THRESHOLD;
|
||||
use nym_lp::packet::frame::{
|
||||
LpFrame, LpFrameHeader, LpFrameKind, SphinxStreamFrameAttributes, SphinxStreamMsgType,
|
||||
};
|
||||
use nym_sdk::mixnet::ReconstructedMessage;
|
||||
use tracing::warn;
|
||||
|
||||
/// Whether the "current" IPR client is operating at a version where the node expects
|
||||
/// non-stream mixnet IPR messages to be LP Stream framed (see `SPHINX_STREAM_VERSION_THRESHOLD`).
|
||||
pub(crate) fn current_requires_sphinx_stream_transport() -> bool {
|
||||
crate::current::VERSION >= SPHINX_STREAM_VERSION_THRESHOLD
|
||||
}
|
||||
|
||||
pub fn maybe_unwrap_lp_stream_payload(data: &[u8], warn_on_unexpected: bool) -> &[u8] {
|
||||
if data.len() < LpFrameHeader::SIZE {
|
||||
if warn_on_unexpected {
|
||||
warn!(
|
||||
len = data.len(),
|
||||
header_size = LpFrameHeader::SIZE,
|
||||
"expected LP SphinxStream frame for IPR payload, but message is shorter than LP header; treating as raw payload"
|
||||
);
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
let Ok(header) = LpFrameHeader::parse(data) else {
|
||||
if warn_on_unexpected {
|
||||
warn!(
|
||||
"expected LP SphinxStream frame for IPR payload, but failed to parse LP header; treating as raw payload"
|
||||
);
|
||||
}
|
||||
return data;
|
||||
};
|
||||
|
||||
if header.kind == LpFrameKind::SphinxStream {
|
||||
&data[LpFrameHeader::SIZE..]
|
||||
} else {
|
||||
if warn_on_unexpected {
|
||||
warn!(
|
||||
kind = ?header.kind,
|
||||
"expected LP SphinxStream frame for IPR payload, but got different LP frame kind; treating as raw payload"
|
||||
);
|
||||
}
|
||||
data
|
||||
}
|
||||
}
|
||||
|
||||
pub fn maybe_unwrap_lp_stream_payload_from_reconstructed(message: &ReconstructedMessage) -> &[u8] {
|
||||
maybe_unwrap_lp_stream_payload(&message.message, false)
|
||||
}
|
||||
|
||||
pub fn encode_stream_frame(stream_id: u64, sequence_num: u32, payload: Vec<u8>) -> Vec<u8> {
|
||||
let attrs = SphinxStreamFrameAttributes {
|
||||
stream_id,
|
||||
msg_type: SphinxStreamMsgType::Data,
|
||||
sequence_num,
|
||||
};
|
||||
let frame = LpFrame::new_stream(attrs, payload);
|
||||
let mut buf = BytesMut::with_capacity(LpFrameHeader::SIZE + frame.content.len());
|
||||
frame.encode(&mut buf);
|
||||
buf.to_vec()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use nym_lp::packet::frame::SphinxStreamFrameAttributes;
|
||||
|
||||
#[test]
|
||||
fn stream_frame_roundtrip_unwraps_payload() {
|
||||
let stream_id = 0x0123_4567_89ab_cdef;
|
||||
let seq = 42u32;
|
||||
let payload = b"hello-ipr".to_vec();
|
||||
|
||||
let framed = encode_stream_frame(stream_id, seq, payload.clone());
|
||||
|
||||
let header = LpFrameHeader::parse(&framed).expect("valid lp header");
|
||||
assert_eq!(header.kind, LpFrameKind::SphinxStream);
|
||||
|
||||
let attrs =
|
||||
SphinxStreamFrameAttributes::parse(&header.frame_attributes).expect("valid attrs");
|
||||
assert_eq!(attrs.stream_id, stream_id);
|
||||
assert_eq!(attrs.sequence_num, seq);
|
||||
assert_eq!(attrs.msg_type, SphinxStreamMsgType::Data);
|
||||
|
||||
let unwrapped = maybe_unwrap_lp_stream_payload(&framed, false);
|
||||
assert_eq!(unwrapped, payload.as_slice());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unwrap_noops_on_non_stream_or_malformed_data() {
|
||||
let raw = b"\x09\x00\x01\x02\x03";
|
||||
assert_eq!(maybe_unwrap_lp_stream_payload(raw, false), raw);
|
||||
|
||||
// malformed header: not enough bytes for LP header
|
||||
let short = b"\x00\x01";
|
||||
assert_eq!(maybe_unwrap_lp_stream_payload(short, false), short);
|
||||
}
|
||||
}
|
||||
@@ -97,6 +97,8 @@ impl BuilderConfig {
|
||||
exit: self.exit_node.clone(),
|
||||
mode: self.mode,
|
||||
lp_registration_config: self.lp_registration_config,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: self.connection_fd_callback.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -143,7 +143,6 @@ impl RegistrationClientBuilder {
|
||||
config,
|
||||
bandwidth_controller,
|
||||
cancel_token: self.config.cancel_token.clone(),
|
||||
fallback_client_builder: Some(self),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,13 +18,12 @@ use rand09::{CryptoRng, RngCore, SeedableRng};
|
||||
use std::sync::Arc;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
|
||||
pub struct LpBasedRegistrationClient {
|
||||
pub(crate) config: RegistrationClientConfig,
|
||||
pub(crate) bandwidth_controller: Box<dyn BandwidthTicketProvider>,
|
||||
pub(crate) cancel_token: CancellationToken,
|
||||
// While we allow a fallback, we need to be able to build it
|
||||
pub(crate) fallback_client_builder: Option<RegistrationClientBuilder>,
|
||||
}
|
||||
|
||||
impl LpBasedRegistrationClient {
|
||||
@@ -81,6 +80,32 @@ impl LpBasedRegistrationClient {
|
||||
self.config.lp_registration_config,
|
||||
);
|
||||
|
||||
// Open the entry connection through a socket that has the connection
|
||||
// fd callback applied before connecting (sets SO_MARK on Linux), so
|
||||
// the connection is allowed through the VPN firewall during the
|
||||
// connecting state.
|
||||
#[cfg(unix)]
|
||||
{
|
||||
let fd_callback = self.config.connection_fd_callback.clone();
|
||||
entry_client.set_dialer(Arc::new(move |addr| {
|
||||
let fd_callback = fd_callback.clone();
|
||||
Box::pin(async move {
|
||||
let socket = if addr.is_ipv4() {
|
||||
tokio::net::TcpSocket::new_v4()
|
||||
} else {
|
||||
tokio::net::TcpSocket::new_v6()
|
||||
}
|
||||
.map_err(|err| {
|
||||
nym_lp::transport::LpTransportError::connection_failure(err.to_string())
|
||||
})?;
|
||||
fd_callback(std::os::fd::AsRawFd::as_raw_fd(&socket));
|
||||
socket.connect(addr).await.map_err(|err| {
|
||||
nym_lp::transport::LpTransportError::connection_failure(err.to_string())
|
||||
})
|
||||
})
|
||||
}));
|
||||
}
|
||||
|
||||
// Perform handshake with entry gateway (outer session now established)
|
||||
entry_client.perform_handshake().await.map_err(|source| {
|
||||
RegistrationClientError::EntryGatewayRegisterLp {
|
||||
@@ -162,15 +187,11 @@ impl LpBasedRegistrationClient {
|
||||
self.register_wg_with_rng(&mut rng).await
|
||||
}
|
||||
|
||||
pub(crate) async fn register(mut self) -> Result<RegistrationResult, RegistrationClientError> {
|
||||
let fallback = self.fallback_client_builder.take();
|
||||
async fn register_inner(mut self) -> Result<RegistrationResult, RegistrationClientError> {
|
||||
match &self.config.mode {
|
||||
RegistrationMode::Mixnet => {
|
||||
if let Some(fallback) = fallback {
|
||||
register_with_fallback(fallback).await
|
||||
} else {
|
||||
Err(RegistrationClientError::UnsupportedMode)
|
||||
}
|
||||
// mixnet registration is not supported for LP
|
||||
Err(RegistrationClientError::UnsupportedMode)
|
||||
}
|
||||
RegistrationMode::Wireguard => {
|
||||
let lp_registration_result = self
|
||||
@@ -182,15 +203,9 @@ impl LpBasedRegistrationClient {
|
||||
// Everything went fine
|
||||
Some(Ok(res)) => Ok(res),
|
||||
|
||||
// LP reg failed, try fallback if we have one
|
||||
Some(Err(e)) => {
|
||||
tracing::error!("LP registration failed : {e}");
|
||||
if let Some(fallback) = fallback {
|
||||
tracing::info!("Registering with fallback");
|
||||
register_with_fallback(fallback).await
|
||||
} else {
|
||||
Err(e)
|
||||
}
|
||||
Err(e)
|
||||
}
|
||||
|
||||
// Cancelled registration
|
||||
@@ -199,12 +214,14 @@ impl LpBasedRegistrationClient {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn register_with_fallback(
|
||||
client_builder: RegistrationClientBuilder,
|
||||
) -> Result<RegistrationResult, RegistrationClientError> {
|
||||
// This is forcefully building a mixnet based client
|
||||
let fallback_client = client_builder.build_mixnet().await?;
|
||||
fallback_client.register().await
|
||||
pub(crate) async fn register(mut self) -> Result<RegistrationResult, RegistrationClientError> {
|
||||
let timeout = self.config.lp_registration_config.exchange_timeout;
|
||||
tokio::time::timeout(timeout, self.register_inner())
|
||||
.await
|
||||
.unwrap_or_else(|timeout| {
|
||||
warn!("timed out while attempting to complete LP registration");
|
||||
Err(RegistrationClientError::Timeout(timeout))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,4 +19,9 @@ pub struct RegistrationClientConfig {
|
||||
pub(crate) exit: NymNodeWithKeys,
|
||||
pub(crate) mode: RegistrationMode,
|
||||
pub(crate) lp_registration_config: LpRegistrationConfig,
|
||||
/// Callback invoked with the raw fd of sockets opened for registration,
|
||||
/// before connecting. Used to set `SO_MARK` on Linux so the connection is
|
||||
/// allowed through the VPN firewall during the connecting state.
|
||||
#[cfg(unix)]
|
||||
pub(crate) connection_fd_callback: std::sync::Arc<dyn Fn(std::os::fd::RawFd) + Send + Sync>,
|
||||
}
|
||||
|
||||
@@ -33,6 +33,19 @@ use std::time::Duration;
|
||||
use tokio::net::TcpStream;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
/// Custom dialer used to open the connection to the gateway.
|
||||
///
|
||||
/// Allows the caller to configure the socket before the connection is
|
||||
/// initiated, e.g. set `SO_MARK` on Linux so the connection is allowed
|
||||
/// through the VPN firewall during the connecting state.
|
||||
pub type LpDialer<S> = Arc<
|
||||
dyn Fn(
|
||||
SocketAddr,
|
||||
) -> futures::future::BoxFuture<'static, std::result::Result<S, LpTransportError>>
|
||||
+ Send
|
||||
+ Sync,
|
||||
>;
|
||||
|
||||
/// LP (Lewes Protocol) registration client for direct gateway connections.
|
||||
///
|
||||
/// This client uses a persistent TCP connection model where a single TCP
|
||||
@@ -70,6 +83,11 @@ pub struct LpRegistrationClient<S = TcpStream> {
|
||||
/// Persistent TCP stream for the connection.
|
||||
/// Opened on first use, closed after registration.
|
||||
stream: Option<S>,
|
||||
|
||||
/// Optional custom dialer used to open the connection, allowing socket
|
||||
/// configuration (e.g. `SO_MARK`) before the connection is initiated.
|
||||
/// Falls back to `S::connect` when unset.
|
||||
dialer: Option<LpDialer<S>>,
|
||||
}
|
||||
|
||||
impl<S> LpRegistrationClient<S>
|
||||
@@ -115,9 +133,18 @@ where
|
||||
transport_session: None,
|
||||
config,
|
||||
stream: None,
|
||||
dialer: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets a custom dialer used to open the connection to the gateway.
|
||||
///
|
||||
/// Allows socket configuration (e.g. setting `SO_MARK` on Linux so the
|
||||
/// connection is allowed through the VPN firewall) before connecting.
|
||||
pub fn set_dialer(&mut self, dialer: LpDialer<S>) {
|
||||
self.dialer = Some(dialer);
|
||||
}
|
||||
|
||||
/// Attempt to use this `LpRegistrationClient` as transport for `NestedSession`
|
||||
pub fn as_nested_connection(&mut self, exit_address: SocketAddr) -> NestedConnection<'_, S> {
|
||||
NestedConnection {
|
||||
@@ -209,22 +236,32 @@ where
|
||||
self.gateway_lp_address
|
||||
);
|
||||
|
||||
let mut stream = tokio::time::timeout(
|
||||
self.config.connect_timeout,
|
||||
S::connect(self.gateway_lp_address),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| LpClientError::TcpConnection {
|
||||
address: self.gateway_lp_address.to_string(),
|
||||
source: LpTransportError::ConnectionFailure(format!(
|
||||
"Connection timeout after {:?}",
|
||||
self.config.connect_timeout
|
||||
)),
|
||||
})?
|
||||
.map_err(|source| LpClientError::TcpConnection {
|
||||
address: self.gateway_lp_address.to_string(),
|
||||
source,
|
||||
})?;
|
||||
let connect_result = match &self.dialer {
|
||||
Some(dialer) => {
|
||||
tokio::time::timeout(self.config.connect_timeout, dialer(self.gateway_lp_address))
|
||||
.await
|
||||
}
|
||||
None => {
|
||||
tokio::time::timeout(
|
||||
self.config.connect_timeout,
|
||||
S::connect(self.gateway_lp_address),
|
||||
)
|
||||
.await
|
||||
}
|
||||
};
|
||||
|
||||
let mut stream = connect_result
|
||||
.map_err(|_| LpClientError::TcpConnection {
|
||||
address: self.gateway_lp_address.to_string(),
|
||||
source: LpTransportError::ConnectionFailure(format!(
|
||||
"Connection timeout after {:?}",
|
||||
self.config.connect_timeout
|
||||
)),
|
||||
})?
|
||||
.map_err(|source| LpClientError::TcpConnection {
|
||||
address: self.gateway_lp_address.to_string(),
|
||||
source,
|
||||
})?;
|
||||
|
||||
// Set TCP_NODELAY for low latency
|
||||
stream
|
||||
|
||||
@@ -28,40 +28,43 @@ use std::time::Duration;
|
||||
/// - Optimize for latency over throughput (small messages)
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub struct LpRegistrationConfig {
|
||||
/// TCP connection timeout (nym-102).
|
||||
/// TCP connection timeout.
|
||||
///
|
||||
/// Maximum time to wait for TCP connection establishment.
|
||||
/// Default: 10 seconds.
|
||||
/// Default: 5 seconds.
|
||||
pub connect_timeout: Duration,
|
||||
|
||||
/// Noise protocol handshake timeout (nym-102).
|
||||
/// KKT/PSQ protocol handshake timeout.
|
||||
///
|
||||
/// Maximum time to wait for Noise handshake completion (all round-trips).
|
||||
/// Default: 15 seconds.
|
||||
/// Maximum time to wait for KKT/PSQ handshake completion with the entry (all round-trips).
|
||||
/// Default: 8 seconds.
|
||||
pub handshake_timeout: Duration,
|
||||
|
||||
/// Registration request/response timeout (nym-102).
|
||||
/// Registration request/response timeout.
|
||||
///
|
||||
/// Maximum time to wait for registration request send + response receive.
|
||||
/// Includes credential verification on gateway side.
|
||||
/// Default: 30 seconds.
|
||||
/// Default: 8 seconds.
|
||||
pub registration_timeout: Duration,
|
||||
|
||||
/// Maximum time for the whole exchange (handshake + registration).
|
||||
/// Default: 20 seconds.
|
||||
pub exchange_timeout: Duration,
|
||||
|
||||
/// Forward packet send/receive timeout.
|
||||
///
|
||||
/// Maximum time to wait for forward packet send + response receive via entry gateway.
|
||||
/// Covers the entire round-trip through entry to exit gateway and back.
|
||||
/// Default: 30 seconds.
|
||||
/// Maximum time to wait for forward packet to get sent via entry gateway.
|
||||
/// Default: 3 seconds.
|
||||
pub forward_timeout: Duration,
|
||||
|
||||
/// Enable TCP_NODELAY (disable Nagle's algorithm) (nym-104).
|
||||
/// Enable TCP_NODELAY (disable Nagle's algorithm).
|
||||
///
|
||||
/// When true, disables Nagle's algorithm for lower latency.
|
||||
/// Recommended for registration messages which are small and latency-sensitive.
|
||||
/// Default: true.
|
||||
pub tcp_nodelay: bool,
|
||||
|
||||
/// TCP keepalive duration (nym-104).
|
||||
/// TCP keepalive duration.
|
||||
///
|
||||
/// When Some, enables TCP keepalive with specified interval.
|
||||
/// Since LP is registration-only with short-lived connections, keepalive is not needed.
|
||||
@@ -72,15 +75,14 @@ pub struct LpRegistrationConfig {
|
||||
impl Default for LpRegistrationConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
// nym-102: Sane timeout defaults for real network conditions
|
||||
connect_timeout: Duration::from_secs(10),
|
||||
handshake_timeout: Duration::from_secs(15),
|
||||
registration_timeout: Duration::from_secs(30),
|
||||
forward_timeout: Duration::from_secs(30),
|
||||
connect_timeout: Duration::from_secs(5),
|
||||
handshake_timeout: Duration::from_secs(8),
|
||||
registration_timeout: Duration::from_secs(8),
|
||||
exchange_timeout: Duration::from_secs(20),
|
||||
forward_timeout: Duration::from_secs(3),
|
||||
|
||||
// nym-104: Optimized for registration-only protocol
|
||||
tcp_nodelay: true, // Lower latency for small messages
|
||||
tcp_keepalive: None, // Not needed for ephemeral connections
|
||||
tcp_nodelay: true,
|
||||
tcp_keepalive: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -93,10 +95,11 @@ mod tests {
|
||||
fn test_default_config() {
|
||||
let config = LpRegistrationConfig::default();
|
||||
|
||||
assert_eq!(config.connect_timeout, Duration::from_secs(10));
|
||||
assert_eq!(config.handshake_timeout, Duration::from_secs(15));
|
||||
assert_eq!(config.registration_timeout, Duration::from_secs(30));
|
||||
assert_eq!(config.forward_timeout, Duration::from_secs(30));
|
||||
assert_eq!(config.connect_timeout, Duration::from_secs(5));
|
||||
assert_eq!(config.handshake_timeout, Duration::from_secs(8));
|
||||
assert_eq!(config.registration_timeout, Duration::from_secs(8));
|
||||
assert_eq!(config.forward_timeout, Duration::from_secs(3));
|
||||
assert_eq!(config.exchange_timeout, Duration::from_secs(20));
|
||||
assert!(config.tcp_nodelay);
|
||||
assert_eq!(config.tcp_keepalive, None);
|
||||
}
|
||||
|
||||
@@ -41,4 +41,4 @@ windows = { version = "0.61", features = [
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
tracing-test.workspace = true
|
||||
|
||||
@@ -3,8 +3,9 @@
|
||||
|
||||
use std::{
|
||||
io,
|
||||
ops::{Deref, DerefMut},
|
||||
ops::Deref,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
@@ -26,10 +27,8 @@ const CHECK_FILES_CLOSED_MAX_ATTEMPTS: u8 = 20;
|
||||
/// Delay between file checks
|
||||
const CHECK_FILES_CLOSED_RETRY_DELAY: Duration = Duration::from_millis(100);
|
||||
|
||||
/// `sqlx::SqlitePool` wrapper providing a workaround for the [known bug](https://github.com/launchbadge/sqlx/issues/3217).
|
||||
/// In principle after requesting to close the sqlite pool, the wrapper monitors open file descriptor and polls periodically until all database files are closed.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SqlitePoolGuard {
|
||||
#[derive(Debug)]
|
||||
struct SqlitePoolGuardInner {
|
||||
/// Path to sqlite database file.
|
||||
database_path: PathBuf,
|
||||
|
||||
@@ -37,6 +36,18 @@ pub struct SqlitePoolGuard {
|
||||
connection_pool: sqlx::SqlitePool,
|
||||
}
|
||||
|
||||
/// `sqlx::SqlitePool` wrapper providing a workaround for the [known bug](https://github.com/launchbadge/sqlx/issues/3217).
|
||||
/// In principle after requesting to close the sqlite pool, the wrapper monitors open file descriptor and polls periodically until all database files are closed.
|
||||
///
|
||||
/// This type is cheaply [`Clone`]-able: all clones share the same underlying pool and the same
|
||||
/// reference count. The `Drop` impl only emits a warning when the **last** reference is dropped
|
||||
/// without an explicit [`close`](Self::close) call, so it is safe to clone this guard temporarily
|
||||
/// (e.g. to pass into a spawned task) without triggering spurious warnings.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SqlitePoolGuard {
|
||||
inner: Arc<SqlitePoolGuardInner>,
|
||||
}
|
||||
|
||||
impl SqlitePoolGuard {
|
||||
/// Create new instance providing path to database and connection pool
|
||||
pub fn new(connection_pool: sqlx::SqlitePool) -> Self {
|
||||
@@ -46,46 +57,70 @@ impl SqlitePoolGuard {
|
||||
.to_path_buf();
|
||||
|
||||
Self {
|
||||
database_path,
|
||||
connection_pool,
|
||||
inner: Arc::new(SqlitePoolGuardInner {
|
||||
database_path,
|
||||
connection_pool,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns database path
|
||||
pub fn database_path(&self) -> &Path {
|
||||
&self.database_path
|
||||
&self.inner.database_path
|
||||
}
|
||||
|
||||
/// Close udnerlying sqlite pool and wait for files to be closed before returning.
|
||||
/// Close the underlying sqlite pool and wait for OS file handles to be released.
|
||||
///
|
||||
/// **Callers must invoke this method before dropping the guard.** The `Drop` impl does
|
||||
/// not perform async cleanup; it only logs a warning when the pool was not closed
|
||||
/// beforehand.
|
||||
pub async fn close(&self) {
|
||||
// Avoid waiting for db files once the pool is marked closed to ensure that we don't wait on some other sqlite pool to close the database.
|
||||
if !self.connection_pool.is_closed() {
|
||||
tracing::info!("Closing sqlite pool: {}", self.database_path.display());
|
||||
if !self.inner.connection_pool.is_closed() {
|
||||
tracing::info!(
|
||||
"Closing sqlite pool: {}",
|
||||
self.inner.database_path.display()
|
||||
);
|
||||
self.close_pool_inner().await.ok();
|
||||
}
|
||||
}
|
||||
|
||||
async fn close_pool_inner(&self) -> std::io::Result<()> {
|
||||
self.connection_pool.close().await;
|
||||
self.inner.connection_pool.close().await;
|
||||
|
||||
self.wait_for_db_files_close().await.inspect_err(|e| {
|
||||
tracing::error!("Failed to wait for file to close: {e}");
|
||||
})
|
||||
if let Err(e) = self.wait_for_db_files_close().await {
|
||||
if e.kind() == std::io::ErrorKind::TimedOut {
|
||||
tracing::warn!(
|
||||
"Timed out waiting for OS file handles for sqlite database to be released; \
|
||||
another connection to the same file may still be open. Path = {}",
|
||||
self.inner.database_path.display()
|
||||
);
|
||||
} else {
|
||||
tracing::warn!(
|
||||
"Failed to wait for sqlite database file handles to be released: Path = {}. Error = {}",
|
||||
self.inner.database_path.display(),
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns all database files, including shm and wal files.
|
||||
fn all_database_files(&self) -> Vec<PathBuf> {
|
||||
let mut database_files = vec![];
|
||||
let canonical_path = self
|
||||
.inner
|
||||
.database_path
|
||||
.canonicalize()
|
||||
.inspect_err(|e| {
|
||||
tracing::error!(
|
||||
"Failed to canonicalize path: {}. Cause: {e}",
|
||||
self.database_path.display()
|
||||
self.inner.database_path.display()
|
||||
);
|
||||
})
|
||||
.unwrap_or(self.database_path.clone());
|
||||
.unwrap_or(self.inner.database_path.clone());
|
||||
|
||||
if let Some(ext) = canonical_path.extension() {
|
||||
for added_ext in ["-shm", "-wal"] {
|
||||
@@ -120,34 +155,38 @@ impl SqlitePoolGuard {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SqlitePoolGuard {
|
||||
fn drop(&mut self) {
|
||||
if Arc::strong_count(&self.inner) == 1 && !self.inner.connection_pool.is_closed() {
|
||||
tracing::warn!(
|
||||
"SqlitePoolGuard dropped without explicit close(); path={}",
|
||||
self.inner.database_path.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for SqlitePoolGuard {
|
||||
type Target = sqlx::SqlitePool;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.connection_pool
|
||||
&self.inner.connection_pool
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for SqlitePoolGuard {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.connection_pool
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use sqlx::{
|
||||
ConnectOptions, Executor,
|
||||
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
|
||||
};
|
||||
use tracing_test::traced_test;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[traced_test]
|
||||
#[tokio::test]
|
||||
async fn test_wait_close() {
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(tracing::Level::TRACE)
|
||||
.init();
|
||||
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let database_path = temp_dir.path().join("storage.db");
|
||||
|
||||
@@ -177,4 +216,34 @@ mod tests {
|
||||
assert!(guard.close_pool_inner().await.is_ok());
|
||||
tokio::fs::remove_file(database_path).await.unwrap();
|
||||
}
|
||||
|
||||
#[traced_test]
|
||||
#[tokio::test]
|
||||
async fn test_clone_drop_no_warning() {
|
||||
// Cloning the guard and dropping the clone should not warn because the original is still alive.
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let database_path = temp_dir.path().join("storage2.db");
|
||||
|
||||
let opts = sqlx::sqlite::SqliteConnectOptions::new()
|
||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
||||
.synchronous(SqliteSynchronous::Normal)
|
||||
.auto_vacuum(SqliteAutoVacuum::Incremental)
|
||||
.filename(database_path.clone())
|
||||
.create_if_missing(true)
|
||||
.disable_statement_logging();
|
||||
let connection_pool = sqlx::SqlitePool::connect_with(opts).await.unwrap();
|
||||
let guard = SqlitePoolGuard::new(connection_pool);
|
||||
|
||||
{
|
||||
let _clone = guard.clone();
|
||||
assert_eq!(Arc::strong_count(&guard.inner), 2);
|
||||
}
|
||||
assert_eq!(Arc::strong_count(&guard.inner), 1);
|
||||
assert!(!logs_contain(
|
||||
"SqlitePoolGuard dropped without explicit close"
|
||||
));
|
||||
|
||||
guard.close().await;
|
||||
tokio::fs::remove_file(database_path).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,9 +9,11 @@ use crate::ip_packet_client::{
|
||||
use crate::mixnet::{MixnetClient, MixnetStream, Recipient};
|
||||
use crate::Error;
|
||||
use bytes::Bytes;
|
||||
use current_ipr::response::IpPacketResponse;
|
||||
use nym_ip_packet_requests::response_helpers;
|
||||
use nym_ip_packet_requests::{v9 as current_ipr, IpPair};
|
||||
use nym_ip_packet_requests::{
|
||||
v9::{self, response::IpPacketResponse},
|
||||
IpPair,
|
||||
};
|
||||
use nym_network_defaults::NymNetworkDetails;
|
||||
use std::time::Duration;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
@@ -105,7 +107,7 @@ impl IpMixStream {
|
||||
}
|
||||
|
||||
async fn connect_tunnel(stream: &mut MixnetStream) -> Result<IpPair, Error> {
|
||||
let (request, request_id) = current_ipr::new_connect_request(None);
|
||||
let (request, request_id) = v9::new_connect_request(None);
|
||||
debug!("Sending connect request with ID: {}", request_id);
|
||||
|
||||
let request_bytes = request.to_bytes()?;
|
||||
@@ -144,7 +146,7 @@ impl IpMixStream {
|
||||
/// Send an IP packet through the tunnel.
|
||||
pub async fn send_ip_packet(&mut self, packet: &[u8]) -> Result<(), Error> {
|
||||
self.check_connected()?;
|
||||
let request = current_ipr::new_data_request(packet.to_vec().into());
|
||||
let request = v9::new_data_request(packet.to_vec().into());
|
||||
let request_bytes = request.to_bytes()?;
|
||||
self.stream
|
||||
.write_all(&request_bytes)
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
{
|
||||
"name": "@nymproject/nym-client-wasm",
|
||||
"version": "1.0.0",
|
||||
"sideEffects": false
|
||||
}
|
||||
@@ -218,10 +218,7 @@ fn create_ip_packet_response(
|
||||
ClientVersion::V6 => IpPacketResponseV6::new_ip_packet(packets).to_bytes(),
|
||||
ClientVersion::V7 => IpPacketResponseV7::new_ip_packet(packets).to_bytes(),
|
||||
ClientVersion::V8 => IpPacketResponseV8::new_ip_packet(packets).to_bytes(),
|
||||
ClientVersion::V9 => {
|
||||
let resp = v9::new_ip_packet_response(packets);
|
||||
resp.to_bytes()
|
||||
}
|
||||
ClientVersion::V9 => v9::new_ip_packet_response(packets).to_bytes(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ use nym_ip_packet_requests::{
|
||||
IpPair, v6::request::IpPacketRequest as IpPacketRequestV6,
|
||||
v7::request::IpPacketRequest as IpPacketRequestV7,
|
||||
v8::request::IpPacketRequest as IpPacketRequestV8,
|
||||
v9::request::IpPacketRequest as IpPacketRequestV9,
|
||||
};
|
||||
use nym_sdk::mixnet::ReconstructedMessage;
|
||||
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
|
||||
@@ -132,14 +131,14 @@ impl TryFrom<&ReconstructedMessage> for IpPacketRequest {
|
||||
Ok(IpPacketRequest::from((request_v8, sender_tag)))
|
||||
}
|
||||
9 => {
|
||||
let request_v9 = IpPacketRequestV9::from_reconstructed_message(reconstructed)
|
||||
let request_v8 = IpPacketRequestV8::from_reconstructed_message(reconstructed)
|
||||
.map_err(
|
||||
|source| IpPacketRouterError::FailedToDeserializeTaggedPacket { source },
|
||||
)?;
|
||||
let sender_tag = reconstructed
|
||||
.sender_tag
|
||||
.ok_or(IpPacketRouterError::MissingSenderTag)?;
|
||||
Ok(v9::convert(request_v9, sender_tag))
|
||||
Ok(v9::convert(request_v8, sender_tag))
|
||||
}
|
||||
_ => {
|
||||
log::info!("Received packet with invalid version: v{request_version}");
|
||||
|
||||
@@ -130,12 +130,7 @@ impl VersionedResponse {
|
||||
ClientVersion::V6 => IpPacketResponseV6::try_from(self)?.to_bytes(),
|
||||
ClientVersion::V7 => IpPacketResponseV7::try_from(self)?.to_bytes(),
|
||||
ClientVersion::V8 => IpPacketResponseV8::try_from(self)?.to_bytes(),
|
||||
ClientVersion::V9 => {
|
||||
let mut resp = IpPacketResponseV8::try_from(self)?;
|
||||
resp.version = nym_ip_packet_requests::v9::VERSION;
|
||||
let bytes = resp.to_bytes();
|
||||
bytes
|
||||
}
|
||||
ClientVersion::V9 => IpPacketResponseV8::try_from(self)?.to_bytes(),
|
||||
}
|
||||
.map_err(|err| IpPacketRouterError::FailedToSerializeResponsePacket { source: err })
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user