Compare commits

...

5 Commits

Author SHA1 Message Date
Tommy Verrall 7abe9a987d PR comments addressed
- Changed peer_interaction_timeout from u64 milliseconds to Duration type
- Made timeout non-configurable (always uses safe 5s default) to prevent
  users from setting unsafe values
- Removed timeout from CLI args, env vars, and config template
- Added suspend/resume scenario documentation
- Simplified code by removing unnecessary tuple
- Reverted old_config migration code (to be handled in separate PR)
2025-12-11 14:46:16 +01:00
Tommy Verrall 24ccba3a19 expose peer timeout config + update migration 2025-11-21 16:34:25 +01:00
Tommy Verrall 21a1376900 drop registration lock and enhance gateway reservation 2025-11-21 12:25:03 +01:00
Tommy Verrall 451107efb5 add configurable peer-controller timeout and make registration lockless 2025-11-21 09:57:20 +01:00
Tommy Verrall 8dbbd2bdf4 gateway/authenticator: add peer-controller timeouts and lock tweaks to keep registration responsive after suspend 2025-11-21 09:26:41 +01:00
9 changed files with 126 additions and 56 deletions
Generated
+1
View File
@@ -6432,6 +6432,7 @@ dependencies = [
"futures",
"hkdf",
"human-repr",
"humantime",
"humantime-serde",
"indicatif",
"ipnetwork",
@@ -7,6 +7,7 @@ use nym_network_defaults::{
};
use serde::{Deserialize, Serialize};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::time::Duration;
pub use nym_client_core::config::Config as BaseClientConfig;
pub use persistence::AuthenticatorPaths;
@@ -26,7 +27,6 @@ pub struct Config {
impl Config {
pub fn validate(&self) -> bool {
// no other sections have explicit requirements (yet)
self.base.validate()
}
}
@@ -57,6 +57,12 @@ pub struct Authenticator {
/// The prefix denoting the maximum number of the clients that can be connected via Wireguard using IPv6.
/// The maximum value for IPv6 is 128
pub private_network_prefix_v6: u8,
/// Timeout to wait for responses from the peer controller before failing.
/// Helps the authenticator recover from suspend/resume scenarios where the peer controller
/// process/task can get stuck and never respond to oneshot RPC responses, which previously
/// caused the authenticator to block forever waiting on the oneshot channel.
pub peer_interaction_timeout: Duration,
}
impl Default for Authenticator {
@@ -68,6 +74,7 @@ impl Default for Authenticator {
tunnel_announced_port: WG_TUNNEL_PORT,
private_network_prefix_v4: WG_TUN_DEVICE_NETMASK_V4,
private_network_prefix_v6: WG_TUN_DEVICE_NETMASK_V6,
peer_interaction_timeout: default_peer_interaction_timeout(),
}
}
}
@@ -85,3 +92,7 @@ impl From<Authenticator> for nym_wireguard_types::Config {
}
}
}
pub fn default_peer_interaction_timeout() -> Duration {
Duration::from_millis(5_000)
}
@@ -85,6 +85,9 @@ pub enum AuthenticatorError {
#[error("peers can't be interacted with anymore")]
PeerInteractionStopped,
#[error("peers interaction timed out while attempting to {operation}")]
PeerInteractionTimeout { operation: &'static str },
#[error("unknown version number")]
UnknownVersion,
@@ -42,7 +42,6 @@ use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use tokio::sync::RwLock;
use tokio_stream::wrappers::IntervalStream;
type AuthenticatorHandleResult = Result<(Vec<u8>, Option<Recipient>), AuthenticatorError>;
@@ -74,7 +73,7 @@ pub(crate) struct MixnetListener {
pub(crate) mixnet_client: nym_sdk::mixnet::MixnetClient,
// Registrations awaiting confirmation
pub(crate) registered_and_free: RwLock<RegisteredAndFree>,
pub(crate) registered_and_free: RegisteredAndFree,
pub(crate) peer_manager: PeerManager,
@@ -95,14 +94,15 @@ impl MixnetListener {
mixnet_client: nym_sdk::mixnet::MixnetClient,
upgrade_mode: UpgradeModeDetails,
ecash_verifier: Arc<dyn EcashManager + Send + Sync>,
peer_interaction_timeout: Duration,
) -> Self {
let timeout_check_interval =
IntervalStream::new(tokio::time::interval(DEFAULT_REGISTRATION_TIMEOUT_CHECK));
MixnetListener {
config,
mixnet_client,
registered_and_free: RwLock::new(RegisteredAndFree::new(free_private_network_ips)),
peer_manager: PeerManager::new(wireguard_gateway_data),
registered_and_free: RegisteredAndFree::new(free_private_network_ips),
peer_manager: PeerManager::new(wireguard_gateway_data, peer_interaction_timeout),
upgrade_mode,
ecash_verifier,
timeout_check_interval,
@@ -131,8 +131,8 @@ impl MixnetListener {
))
}
async fn remove_stale_registrations(&self) -> Result<(), AuthenticatorError> {
let mut registered_and_free = self.registered_and_free.write().await;
async fn remove_stale_registrations(&mut self) -> Result<(), AuthenticatorError> {
let registered_and_free = &mut self.registered_and_free;
let registered_values: Vec<_> = registered_and_free
.registration_in_progres
.values()
@@ -185,8 +185,9 @@ impl MixnetListener {
) -> AuthenticatorHandleResult {
let remote_public = init_message.pub_key();
let nonce: u64 = fastrand::u64(..);
let mut registered_and_free = self.registered_and_free.write().await;
if let Some(registration_data) = registered_and_free
if let Some(registration_data) = self
.registered_and_free
.registration_in_progres
.get(&remote_public)
{
@@ -292,7 +293,17 @@ impl MixnetListener {
return Ok((bytes, reply_to));
}
let peer = self.peer_manager.query_peer(remote_public).await?;
let peer = match self.peer_manager.query_peer(remote_public).await {
Ok(peer) => peer,
Err(err) => {
tracing::warn!(
"Failed to query peer {}: {err}. Continuing with fresh registration",
remote_public
);
None
}
};
if let Some(peer) = peer {
let allowed_ipv4 = peer
.allowed_ips
@@ -383,19 +394,21 @@ impl MixnetListener {
return Ok((bytes, reply_to));
}
let private_ip_ref = registered_and_free
let private_ip = self
.registered_and_free
.free_private_network_ips
.iter_mut()
.filter(|r| r.1.is_none())
.choose(&mut thread_rng())
.ok_or(AuthenticatorError::NoFreeIp)?;
let private_ips = *private_ip_ref.0;
let private_ips = *private_ip.0;
// mark it as used, even though it's not final
*private_ip_ref.1 = Some(SystemTime::now());
*private_ip.1 = Some(SystemTime::now());
let gateway_data = GatewayClient::new(
self.keypair().private_key(),
remote_public.inner(),
*private_ip_ref.0,
private_ips,
nonce,
);
let registration_data = latest::registration::RegistrationData {
@@ -403,7 +416,8 @@ impl MixnetListener {
gateway_data: gateway_data.clone(),
wg_port: self.config.authenticator.tunnel_announced_port,
};
registered_and_free
self.registered_and_free
.registration_in_progres
.insert(remote_public, registration_data.clone());
let bytes = match AuthenticatorVersion::from(protocol) {
@@ -539,12 +553,12 @@ impl MixnetListener {
request_id: u64,
reply_to: Option<Recipient>,
) -> AuthenticatorHandleResult {
let mut registered_and_free = self.registered_and_free.write().await;
let registration_data = registered_and_free
let registration_data = self
.registered_and_free
.registration_in_progres
.get(&final_message.gateway_client_pub_key())
.ok_or(AuthenticatorError::RegistrationNotInProgress)?
.clone();
.cloned()
.ok_or(AuthenticatorError::RegistrationNotInProgress)?;
if final_message
.verify(self.keypair().private_key(), registration_data.nonce)
@@ -595,7 +609,7 @@ impl MixnetListener {
return Err(e);
}
registered_and_free
self.registered_and_free
.registration_in_progres
.remove(&final_message.gateway_client_pub_key());
@@ -818,7 +832,7 @@ impl MixnetListener {
.to_bytes()
.map_err(AuthenticatorError::response_serialisation)?,
AuthenticatorVersion::V1 | AuthenticatorVersion::V2 | AuthenticatorVersion::UNKNOWN => {
return Err(AuthenticatorError::UnknownVersion)
return Err(AuthenticatorError::UnknownVersion);
}
};
@@ -151,6 +151,7 @@ impl Authenticator {
}
})
.collect();
let peer_timeout = self.config.authenticator.peer_interaction_timeout;
let mixnet_listener = crate::node::internal_service_providers::authenticator::mixnet_listener::MixnetListener::new(
self.config,
free_private_network_ips,
@@ -158,6 +159,7 @@ impl Authenticator {
mixnet_client,
self.upgrade_mode_state,
self.ecash_verifier,
peer_timeout,
);
tracing::info!("The address of this client is: {self_address}");
@@ -8,15 +8,19 @@ use nym_credential_verification::{ClientBandwidth, TicketVerifier};
use nym_credentials_interface::CredentialSpendingData;
use nym_wireguard::{peer_controller::PeerControlRequest, WireguardGatewayData};
use nym_wireguard_types::PeerPublicKey;
use std::time::Duration;
use tokio::time::timeout;
pub struct PeerManager {
pub(crate) wireguard_gateway_data: WireguardGatewayData,
response_timeout: Duration,
}
impl PeerManager {
pub fn new(wireguard_gateway_data: WireguardGatewayData) -> Self {
pub fn new(wireguard_gateway_data: WireguardGatewayData, response_timeout: Duration) -> Self {
PeerManager {
wireguard_gateway_data,
response_timeout,
}
}
pub async fn add_peer(&self, peer: Peer) -> Result<(), AuthenticatorError> {
@@ -28,9 +32,8 @@ impl PeerManager {
.await
.map_err(|_| AuthenticatorError::PeerInteractionStopped)?;
response_rx
.await
.map_err(|_| AuthenticatorError::InternalError("no response for add peer".to_string()))?
recv_with_timeout(response_rx, "add peer", self.response_timeout)
.await?
.map_err(|err| {
AuthenticatorError::InternalError(format!(
"adding peer could not be performed: {err:?}"
@@ -48,11 +51,8 @@ impl PeerManager {
.await
.map_err(|_| AuthenticatorError::PeerInteractionStopped)?;
response_rx
.await
.map_err(|_| {
AuthenticatorError::InternalError("no response for remove peer".to_string())
})?
recv_with_timeout(response_rx, "remove peer", self.response_timeout)
.await?
.map_err(|err| {
AuthenticatorError::InternalError(format!(
"removing peer could not be performed: {err:?}"
@@ -73,11 +73,8 @@ impl PeerManager {
.await
.map_err(|_| AuthenticatorError::PeerInteractionStopped)?;
response_rx
.await
.map_err(|_| {
AuthenticatorError::InternalError("no response for query peer".to_string())
})?
recv_with_timeout(response_rx, "query peer", self.response_timeout)
.await?
.map_err(|err| {
AuthenticatorError::InternalError(format!(
"querying peer could not be performed: {err:?}"
@@ -106,13 +103,8 @@ impl PeerManager {
.await
.map_err(|_| AuthenticatorError::PeerInteractionStopped)?;
response_rx
.await
.map_err(|_| {
AuthenticatorError::InternalError(
"no response for query client bandwidth".to_string(),
)
})?
recv_with_timeout(response_rx, "query client bandwidth", self.response_timeout)
.await?
.map_err(|err| {
AuthenticatorError::InternalError(format!(
"querying client bandwidth could not be performed: {err:?}"
@@ -138,11 +130,8 @@ impl PeerManager {
.await
.map_err(|_| AuthenticatorError::PeerInteractionStopped)?;
response_rx
.await
.map_err(|_| {
AuthenticatorError::InternalError("no response for query verifier".to_string())
})?
recv_with_timeout(response_rx, "query verifier", self.response_timeout)
.await?
.map_err(|err| {
AuthenticatorError::InternalError(format!(
"querying verifier could not be performed: {err:?}"
@@ -151,10 +140,31 @@ impl PeerManager {
}
}
async fn recv_with_timeout<T>(
response_rx: oneshot::Receiver<T>,
operation: &'static str,
timeout_duration: Duration,
) -> Result<T, AuthenticatorError> {
// Suspend/resume can wedge the peer controller, so we bound the wait to avoid deadlocking
// authenticator responses on a stuck oneshot channel.
match timeout(timeout_duration, response_rx).await {
Ok(Ok(value)) => Ok(value),
Ok(Err(_)) => Err(AuthenticatorError::PeerInteractionStopped),
Err(_) => {
tracing::warn!(
"peer controller response timed out while attempting to {operation} after {:?}",
timeout_duration
);
Err(AuthenticatorError::PeerInteractionTimeout { operation })
}
}
}
#[cfg(test)]
mod tests {
use std::{str::FromStr, sync::Arc};
use futures::channel::oneshot;
use nym_credential_verification::{
bandwidth_storage_manager::BandwidthStorageManager, ecash::MockEcashManager,
};
@@ -163,7 +173,8 @@ mod tests {
use nym_gateway_storage::traits::{mock::MockGatewayStorage, BandwidthGatewayStorage};
use nym_wireguard::peer_controller::{start_controller, stop_controller};
use rand::rngs::OsRng;
use time::{Duration, OffsetDateTime};
use std::time::Duration;
use time::{Duration as TimeDuration, OffsetDateTime};
use tokio::sync::RwLock;
use crate::nym_authenticator::{
@@ -243,7 +254,7 @@ mod tests {
Authenticator::default().into(),
Arc::new(KeyPair::new(&mut OsRng)),
);
let peer_manager = PeerManager::new(wireguard_data);
let peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5));
let (storage, task_manager) = start_controller(
peer_manager.wireguard_gateway_data.peer_tx().clone(),
request_rx,
@@ -291,7 +302,7 @@ mod tests {
Authenticator::default().into(),
Arc::new(KeyPair::new(&mut OsRng)),
);
let mut peer_manager = PeerManager::new(wireguard_data);
let mut peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5));
let key = Key::default();
let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap();
let (storage, task_manager) = start_controller(
@@ -311,7 +322,7 @@ mod tests {
Authenticator::default().into(),
Arc::new(KeyPair::new(&mut OsRng)),
);
let mut peer_manager = PeerManager::new(wireguard_data);
let mut peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5));
let key = Key::default();
let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap();
let (storage, task_manager) = start_controller(
@@ -334,7 +345,7 @@ mod tests {
Authenticator::default().into(),
Arc::new(KeyPair::new(&mut OsRng)),
);
let mut peer_manager = PeerManager::new(wireguard_data);
let mut peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5));
let key = Key::default();
let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap();
let (storage, task_manager) = start_controller(
@@ -357,7 +368,7 @@ mod tests {
Authenticator::default().into(),
Arc::new(KeyPair::new(&mut OsRng)),
);
let mut peer_manager = PeerManager::new(wireguard_data);
let mut peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5));
let key = Key::default();
let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap();
let (storage, task_manager) = start_controller(
@@ -388,7 +399,7 @@ mod tests {
Authenticator::default().into(),
Arc::new(KeyPair::new(&mut OsRng)),
);
let mut peer_manager = PeerManager::new(wireguard_data);
let mut peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5));
let key = Key::default();
let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap();
let (storage, task_manager) = start_controller(
@@ -417,7 +428,7 @@ mod tests {
Authenticator::default().into(),
Arc::new(KeyPair::new(&mut OsRng)),
);
let mut peer_manager = PeerManager::new(wireguard_data);
let mut peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5));
let key = Key::default();
let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap();
let top_up = 42;
@@ -444,7 +455,7 @@ mod tests {
.increase_bandwidth(
Bandwidth::new_unchecked(top_up as u64),
OffsetDateTime::now_utc()
.checked_add(Duration::minutes(1))
.checked_add(TimeDuration::minutes(1))
.unwrap(),
)
.await
@@ -466,4 +477,28 @@ mod tests {
stop_controller(task_manager).await;
}
#[tokio::test]
async fn recv_with_timeout_errors_after_deadline() {
let (_tx, rx) = oneshot::channel::<()>();
let err = super::recv_with_timeout(rx, "unit-test", Duration::from_millis(10))
.await
.unwrap_err();
assert!(matches!(
err,
AuthenticatorError::PeerInteractionTimeout {
operation: "unit-test"
}
));
}
#[tokio::test]
async fn recv_with_timeout_succeeds_before_deadline() {
let (tx, rx) = oneshot::channel::<u8>();
tx.send(42).unwrap();
let value = super::recv_with_timeout(rx, "unit-test", Duration::from_secs(1))
.await
.unwrap();
assert_eq!(value, 42);
}
}
+1
View File
@@ -27,6 +27,7 @@ console-subscriber = { workspace = true, optional = true }
csv = { workspace = true }
clap = { workspace = true, features = ["cargo", "env"] }
futures = { workspace = true }
humantime = { workspace = true }
humantime-serde = { workspace = true }
human-repr = { workspace = true }
ipnetwork = { workspace = true }
+2
View File
@@ -944,6 +944,7 @@ pub struct Wireguard {
/// Tunnel port announced to external clients wishing to connect to the wireguard interface.
/// Useful in the instances where the node is behind a proxy.
#[serde(alias = "announced_port")]
pub announced_tunnel_port: u16,
/// Metadata port announced to external clients wishing to connect to the metadata endpoint.
@@ -1001,6 +1002,7 @@ impl From<Wireguard> for nym_authenticator::config::Authenticator {
tunnel_announced_port: value.announced_tunnel_port,
private_network_prefix_v4: value.private_network_prefix_v4,
private_network_prefix_v6: value.private_network_prefix_v6,
peer_interaction_timeout: nym_authenticator::config::default_peer_interaction_timeout(),
}
}
}
@@ -63,6 +63,7 @@ pub struct WireguardV10 {
/// Port announced to external clients wishing to connect to the wireguard interface.
/// Useful in the instances where the node is behind a proxy.
#[serde(alias = "announced_tunnel_port")]
pub announced_port: u16,
/// The prefix denoting the maximum number of the clients that can be connected via Wireguard using IPv4.