Compare commits
5 Commits
gear
...
tv-auth-issues
| Author | SHA1 | Date | |
|---|---|---|---|
| 7abe9a987d | |||
| 24ccba3a19 | |||
| 21a1376900 | |||
| 451107efb5 | |||
| 8dbbd2bdf4 |
Generated
+1
@@ -6432,6 +6432,7 @@ dependencies = [
|
||||
"futures",
|
||||
"hkdf",
|
||||
"human-repr",
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"indicatif",
|
||||
"ipnetwork",
|
||||
|
||||
@@ -7,6 +7,7 @@ use nym_network_defaults::{
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
|
||||
use std::time::Duration;
|
||||
|
||||
pub use nym_client_core::config::Config as BaseClientConfig;
|
||||
pub use persistence::AuthenticatorPaths;
|
||||
@@ -26,7 +27,6 @@ pub struct Config {
|
||||
|
||||
impl Config {
|
||||
pub fn validate(&self) -> bool {
|
||||
// no other sections have explicit requirements (yet)
|
||||
self.base.validate()
|
||||
}
|
||||
}
|
||||
@@ -57,6 +57,12 @@ pub struct Authenticator {
|
||||
/// The prefix denoting the maximum number of the clients that can be connected via Wireguard using IPv6.
|
||||
/// The maximum value for IPv6 is 128
|
||||
pub private_network_prefix_v6: u8,
|
||||
|
||||
/// Timeout to wait for responses from the peer controller before failing.
|
||||
/// Helps the authenticator recover from suspend/resume scenarios where the peer controller
|
||||
/// process/task can get stuck and never respond to oneshot RPC responses, which previously
|
||||
/// caused the authenticator to block forever waiting on the oneshot channel.
|
||||
pub peer_interaction_timeout: Duration,
|
||||
}
|
||||
|
||||
impl Default for Authenticator {
|
||||
@@ -68,6 +74,7 @@ impl Default for Authenticator {
|
||||
tunnel_announced_port: WG_TUNNEL_PORT,
|
||||
private_network_prefix_v4: WG_TUN_DEVICE_NETMASK_V4,
|
||||
private_network_prefix_v6: WG_TUN_DEVICE_NETMASK_V6,
|
||||
peer_interaction_timeout: default_peer_interaction_timeout(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -85,3 +92,7 @@ impl From<Authenticator> for nym_wireguard_types::Config {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn default_peer_interaction_timeout() -> Duration {
|
||||
Duration::from_millis(5_000)
|
||||
}
|
||||
|
||||
@@ -85,6 +85,9 @@ pub enum AuthenticatorError {
|
||||
#[error("peers can't be interacted with anymore")]
|
||||
PeerInteractionStopped,
|
||||
|
||||
#[error("peers interaction timed out while attempting to {operation}")]
|
||||
PeerInteractionTimeout { operation: &'static str },
|
||||
|
||||
#[error("unknown version number")]
|
||||
UnknownVersion,
|
||||
|
||||
|
||||
@@ -42,7 +42,6 @@ use std::{
|
||||
sync::Arc,
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
use tokio::sync::RwLock;
|
||||
use tokio_stream::wrappers::IntervalStream;
|
||||
|
||||
type AuthenticatorHandleResult = Result<(Vec<u8>, Option<Recipient>), AuthenticatorError>;
|
||||
@@ -74,7 +73,7 @@ pub(crate) struct MixnetListener {
|
||||
pub(crate) mixnet_client: nym_sdk::mixnet::MixnetClient,
|
||||
|
||||
// Registrations awaiting confirmation
|
||||
pub(crate) registered_and_free: RwLock<RegisteredAndFree>,
|
||||
pub(crate) registered_and_free: RegisteredAndFree,
|
||||
|
||||
pub(crate) peer_manager: PeerManager,
|
||||
|
||||
@@ -95,14 +94,15 @@ impl MixnetListener {
|
||||
mixnet_client: nym_sdk::mixnet::MixnetClient,
|
||||
upgrade_mode: UpgradeModeDetails,
|
||||
ecash_verifier: Arc<dyn EcashManager + Send + Sync>,
|
||||
peer_interaction_timeout: Duration,
|
||||
) -> Self {
|
||||
let timeout_check_interval =
|
||||
IntervalStream::new(tokio::time::interval(DEFAULT_REGISTRATION_TIMEOUT_CHECK));
|
||||
MixnetListener {
|
||||
config,
|
||||
mixnet_client,
|
||||
registered_and_free: RwLock::new(RegisteredAndFree::new(free_private_network_ips)),
|
||||
peer_manager: PeerManager::new(wireguard_gateway_data),
|
||||
registered_and_free: RegisteredAndFree::new(free_private_network_ips),
|
||||
peer_manager: PeerManager::new(wireguard_gateway_data, peer_interaction_timeout),
|
||||
upgrade_mode,
|
||||
ecash_verifier,
|
||||
timeout_check_interval,
|
||||
@@ -131,8 +131,8 @@ impl MixnetListener {
|
||||
))
|
||||
}
|
||||
|
||||
async fn remove_stale_registrations(&self) -> Result<(), AuthenticatorError> {
|
||||
let mut registered_and_free = self.registered_and_free.write().await;
|
||||
async fn remove_stale_registrations(&mut self) -> Result<(), AuthenticatorError> {
|
||||
let registered_and_free = &mut self.registered_and_free;
|
||||
let registered_values: Vec<_> = registered_and_free
|
||||
.registration_in_progres
|
||||
.values()
|
||||
@@ -185,8 +185,9 @@ impl MixnetListener {
|
||||
) -> AuthenticatorHandleResult {
|
||||
let remote_public = init_message.pub_key();
|
||||
let nonce: u64 = fastrand::u64(..);
|
||||
let mut registered_and_free = self.registered_and_free.write().await;
|
||||
if let Some(registration_data) = registered_and_free
|
||||
|
||||
if let Some(registration_data) = self
|
||||
.registered_and_free
|
||||
.registration_in_progres
|
||||
.get(&remote_public)
|
||||
{
|
||||
@@ -292,7 +293,17 @@ impl MixnetListener {
|
||||
return Ok((bytes, reply_to));
|
||||
}
|
||||
|
||||
let peer = self.peer_manager.query_peer(remote_public).await?;
|
||||
let peer = match self.peer_manager.query_peer(remote_public).await {
|
||||
Ok(peer) => peer,
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"Failed to query peer {}: {err}. Continuing with fresh registration",
|
||||
remote_public
|
||||
);
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(peer) = peer {
|
||||
let allowed_ipv4 = peer
|
||||
.allowed_ips
|
||||
@@ -383,19 +394,21 @@ impl MixnetListener {
|
||||
return Ok((bytes, reply_to));
|
||||
}
|
||||
|
||||
let private_ip_ref = registered_and_free
|
||||
let private_ip = self
|
||||
.registered_and_free
|
||||
.free_private_network_ips
|
||||
.iter_mut()
|
||||
.filter(|r| r.1.is_none())
|
||||
.choose(&mut thread_rng())
|
||||
.ok_or(AuthenticatorError::NoFreeIp)?;
|
||||
let private_ips = *private_ip_ref.0;
|
||||
let private_ips = *private_ip.0;
|
||||
// mark it as used, even though it's not final
|
||||
*private_ip_ref.1 = Some(SystemTime::now());
|
||||
*private_ip.1 = Some(SystemTime::now());
|
||||
|
||||
let gateway_data = GatewayClient::new(
|
||||
self.keypair().private_key(),
|
||||
remote_public.inner(),
|
||||
*private_ip_ref.0,
|
||||
private_ips,
|
||||
nonce,
|
||||
);
|
||||
let registration_data = latest::registration::RegistrationData {
|
||||
@@ -403,7 +416,8 @@ impl MixnetListener {
|
||||
gateway_data: gateway_data.clone(),
|
||||
wg_port: self.config.authenticator.tunnel_announced_port,
|
||||
};
|
||||
registered_and_free
|
||||
|
||||
self.registered_and_free
|
||||
.registration_in_progres
|
||||
.insert(remote_public, registration_data.clone());
|
||||
let bytes = match AuthenticatorVersion::from(protocol) {
|
||||
@@ -539,12 +553,12 @@ impl MixnetListener {
|
||||
request_id: u64,
|
||||
reply_to: Option<Recipient>,
|
||||
) -> AuthenticatorHandleResult {
|
||||
let mut registered_and_free = self.registered_and_free.write().await;
|
||||
let registration_data = registered_and_free
|
||||
let registration_data = self
|
||||
.registered_and_free
|
||||
.registration_in_progres
|
||||
.get(&final_message.gateway_client_pub_key())
|
||||
.ok_or(AuthenticatorError::RegistrationNotInProgress)?
|
||||
.clone();
|
||||
.cloned()
|
||||
.ok_or(AuthenticatorError::RegistrationNotInProgress)?;
|
||||
|
||||
if final_message
|
||||
.verify(self.keypair().private_key(), registration_data.nonce)
|
||||
@@ -595,7 +609,7 @@ impl MixnetListener {
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
registered_and_free
|
||||
self.registered_and_free
|
||||
.registration_in_progres
|
||||
.remove(&final_message.gateway_client_pub_key());
|
||||
|
||||
@@ -818,7 +832,7 @@ impl MixnetListener {
|
||||
.to_bytes()
|
||||
.map_err(AuthenticatorError::response_serialisation)?,
|
||||
AuthenticatorVersion::V1 | AuthenticatorVersion::V2 | AuthenticatorVersion::UNKNOWN => {
|
||||
return Err(AuthenticatorError::UnknownVersion)
|
||||
return Err(AuthenticatorError::UnknownVersion);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -151,6 +151,7 @@ impl Authenticator {
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let peer_timeout = self.config.authenticator.peer_interaction_timeout;
|
||||
let mixnet_listener = crate::node::internal_service_providers::authenticator::mixnet_listener::MixnetListener::new(
|
||||
self.config,
|
||||
free_private_network_ips,
|
||||
@@ -158,6 +159,7 @@ impl Authenticator {
|
||||
mixnet_client,
|
||||
self.upgrade_mode_state,
|
||||
self.ecash_verifier,
|
||||
peer_timeout,
|
||||
);
|
||||
|
||||
tracing::info!("The address of this client is: {self_address}");
|
||||
|
||||
@@ -8,15 +8,19 @@ use nym_credential_verification::{ClientBandwidth, TicketVerifier};
|
||||
use nym_credentials_interface::CredentialSpendingData;
|
||||
use nym_wireguard::{peer_controller::PeerControlRequest, WireguardGatewayData};
|
||||
use nym_wireguard_types::PeerPublicKey;
|
||||
use std::time::Duration;
|
||||
use tokio::time::timeout;
|
||||
|
||||
pub struct PeerManager {
|
||||
pub(crate) wireguard_gateway_data: WireguardGatewayData,
|
||||
response_timeout: Duration,
|
||||
}
|
||||
|
||||
impl PeerManager {
|
||||
pub fn new(wireguard_gateway_data: WireguardGatewayData) -> Self {
|
||||
pub fn new(wireguard_gateway_data: WireguardGatewayData, response_timeout: Duration) -> Self {
|
||||
PeerManager {
|
||||
wireguard_gateway_data,
|
||||
response_timeout,
|
||||
}
|
||||
}
|
||||
pub async fn add_peer(&self, peer: Peer) -> Result<(), AuthenticatorError> {
|
||||
@@ -28,9 +32,8 @@ impl PeerManager {
|
||||
.await
|
||||
.map_err(|_| AuthenticatorError::PeerInteractionStopped)?;
|
||||
|
||||
response_rx
|
||||
.await
|
||||
.map_err(|_| AuthenticatorError::InternalError("no response for add peer".to_string()))?
|
||||
recv_with_timeout(response_rx, "add peer", self.response_timeout)
|
||||
.await?
|
||||
.map_err(|err| {
|
||||
AuthenticatorError::InternalError(format!(
|
||||
"adding peer could not be performed: {err:?}"
|
||||
@@ -48,11 +51,8 @@ impl PeerManager {
|
||||
.await
|
||||
.map_err(|_| AuthenticatorError::PeerInteractionStopped)?;
|
||||
|
||||
response_rx
|
||||
.await
|
||||
.map_err(|_| {
|
||||
AuthenticatorError::InternalError("no response for remove peer".to_string())
|
||||
})?
|
||||
recv_with_timeout(response_rx, "remove peer", self.response_timeout)
|
||||
.await?
|
||||
.map_err(|err| {
|
||||
AuthenticatorError::InternalError(format!(
|
||||
"removing peer could not be performed: {err:?}"
|
||||
@@ -73,11 +73,8 @@ impl PeerManager {
|
||||
.await
|
||||
.map_err(|_| AuthenticatorError::PeerInteractionStopped)?;
|
||||
|
||||
response_rx
|
||||
.await
|
||||
.map_err(|_| {
|
||||
AuthenticatorError::InternalError("no response for query peer".to_string())
|
||||
})?
|
||||
recv_with_timeout(response_rx, "query peer", self.response_timeout)
|
||||
.await?
|
||||
.map_err(|err| {
|
||||
AuthenticatorError::InternalError(format!(
|
||||
"querying peer could not be performed: {err:?}"
|
||||
@@ -106,13 +103,8 @@ impl PeerManager {
|
||||
.await
|
||||
.map_err(|_| AuthenticatorError::PeerInteractionStopped)?;
|
||||
|
||||
response_rx
|
||||
.await
|
||||
.map_err(|_| {
|
||||
AuthenticatorError::InternalError(
|
||||
"no response for query client bandwidth".to_string(),
|
||||
)
|
||||
})?
|
||||
recv_with_timeout(response_rx, "query client bandwidth", self.response_timeout)
|
||||
.await?
|
||||
.map_err(|err| {
|
||||
AuthenticatorError::InternalError(format!(
|
||||
"querying client bandwidth could not be performed: {err:?}"
|
||||
@@ -138,11 +130,8 @@ impl PeerManager {
|
||||
.await
|
||||
.map_err(|_| AuthenticatorError::PeerInteractionStopped)?;
|
||||
|
||||
response_rx
|
||||
.await
|
||||
.map_err(|_| {
|
||||
AuthenticatorError::InternalError("no response for query verifier".to_string())
|
||||
})?
|
||||
recv_with_timeout(response_rx, "query verifier", self.response_timeout)
|
||||
.await?
|
||||
.map_err(|err| {
|
||||
AuthenticatorError::InternalError(format!(
|
||||
"querying verifier could not be performed: {err:?}"
|
||||
@@ -151,10 +140,31 @@ impl PeerManager {
|
||||
}
|
||||
}
|
||||
|
||||
async fn recv_with_timeout<T>(
|
||||
response_rx: oneshot::Receiver<T>,
|
||||
operation: &'static str,
|
||||
timeout_duration: Duration,
|
||||
) -> Result<T, AuthenticatorError> {
|
||||
// Suspend/resume can wedge the peer controller, so we bound the wait to avoid deadlocking
|
||||
// authenticator responses on a stuck oneshot channel.
|
||||
match timeout(timeout_duration, response_rx).await {
|
||||
Ok(Ok(value)) => Ok(value),
|
||||
Ok(Err(_)) => Err(AuthenticatorError::PeerInteractionStopped),
|
||||
Err(_) => {
|
||||
tracing::warn!(
|
||||
"peer controller response timed out while attempting to {operation} after {:?}",
|
||||
timeout_duration
|
||||
);
|
||||
Err(AuthenticatorError::PeerInteractionTimeout { operation })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{str::FromStr, sync::Arc};
|
||||
|
||||
use futures::channel::oneshot;
|
||||
use nym_credential_verification::{
|
||||
bandwidth_storage_manager::BandwidthStorageManager, ecash::MockEcashManager,
|
||||
};
|
||||
@@ -163,7 +173,8 @@ mod tests {
|
||||
use nym_gateway_storage::traits::{mock::MockGatewayStorage, BandwidthGatewayStorage};
|
||||
use nym_wireguard::peer_controller::{start_controller, stop_controller};
|
||||
use rand::rngs::OsRng;
|
||||
use time::{Duration, OffsetDateTime};
|
||||
use std::time::Duration;
|
||||
use time::{Duration as TimeDuration, OffsetDateTime};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::nym_authenticator::{
|
||||
@@ -243,7 +254,7 @@ mod tests {
|
||||
Authenticator::default().into(),
|
||||
Arc::new(KeyPair::new(&mut OsRng)),
|
||||
);
|
||||
let peer_manager = PeerManager::new(wireguard_data);
|
||||
let peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5));
|
||||
let (storage, task_manager) = start_controller(
|
||||
peer_manager.wireguard_gateway_data.peer_tx().clone(),
|
||||
request_rx,
|
||||
@@ -291,7 +302,7 @@ mod tests {
|
||||
Authenticator::default().into(),
|
||||
Arc::new(KeyPair::new(&mut OsRng)),
|
||||
);
|
||||
let mut peer_manager = PeerManager::new(wireguard_data);
|
||||
let mut peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5));
|
||||
let key = Key::default();
|
||||
let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap();
|
||||
let (storage, task_manager) = start_controller(
|
||||
@@ -311,7 +322,7 @@ mod tests {
|
||||
Authenticator::default().into(),
|
||||
Arc::new(KeyPair::new(&mut OsRng)),
|
||||
);
|
||||
let mut peer_manager = PeerManager::new(wireguard_data);
|
||||
let mut peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5));
|
||||
let key = Key::default();
|
||||
let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap();
|
||||
let (storage, task_manager) = start_controller(
|
||||
@@ -334,7 +345,7 @@ mod tests {
|
||||
Authenticator::default().into(),
|
||||
Arc::new(KeyPair::new(&mut OsRng)),
|
||||
);
|
||||
let mut peer_manager = PeerManager::new(wireguard_data);
|
||||
let mut peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5));
|
||||
let key = Key::default();
|
||||
let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap();
|
||||
let (storage, task_manager) = start_controller(
|
||||
@@ -357,7 +368,7 @@ mod tests {
|
||||
Authenticator::default().into(),
|
||||
Arc::new(KeyPair::new(&mut OsRng)),
|
||||
);
|
||||
let mut peer_manager = PeerManager::new(wireguard_data);
|
||||
let mut peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5));
|
||||
let key = Key::default();
|
||||
let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap();
|
||||
let (storage, task_manager) = start_controller(
|
||||
@@ -388,7 +399,7 @@ mod tests {
|
||||
Authenticator::default().into(),
|
||||
Arc::new(KeyPair::new(&mut OsRng)),
|
||||
);
|
||||
let mut peer_manager = PeerManager::new(wireguard_data);
|
||||
let mut peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5));
|
||||
let key = Key::default();
|
||||
let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap();
|
||||
let (storage, task_manager) = start_controller(
|
||||
@@ -417,7 +428,7 @@ mod tests {
|
||||
Authenticator::default().into(),
|
||||
Arc::new(KeyPair::new(&mut OsRng)),
|
||||
);
|
||||
let mut peer_manager = PeerManager::new(wireguard_data);
|
||||
let mut peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5));
|
||||
let key = Key::default();
|
||||
let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap();
|
||||
let top_up = 42;
|
||||
@@ -444,7 +455,7 @@ mod tests {
|
||||
.increase_bandwidth(
|
||||
Bandwidth::new_unchecked(top_up as u64),
|
||||
OffsetDateTime::now_utc()
|
||||
.checked_add(Duration::minutes(1))
|
||||
.checked_add(TimeDuration::minutes(1))
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
@@ -466,4 +477,28 @@ mod tests {
|
||||
|
||||
stop_controller(task_manager).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn recv_with_timeout_errors_after_deadline() {
|
||||
let (_tx, rx) = oneshot::channel::<()>();
|
||||
let err = super::recv_with_timeout(rx, "unit-test", Duration::from_millis(10))
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(matches!(
|
||||
err,
|
||||
AuthenticatorError::PeerInteractionTimeout {
|
||||
operation: "unit-test"
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn recv_with_timeout_succeeds_before_deadline() {
|
||||
let (tx, rx) = oneshot::channel::<u8>();
|
||||
tx.send(42).unwrap();
|
||||
let value = super::recv_with_timeout(rx, "unit-test", Duration::from_secs(1))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(value, 42);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ console-subscriber = { workspace = true, optional = true }
|
||||
csv = { workspace = true }
|
||||
clap = { workspace = true, features = ["cargo", "env"] }
|
||||
futures = { workspace = true }
|
||||
humantime = { workspace = true }
|
||||
humantime-serde = { workspace = true }
|
||||
human-repr = { workspace = true }
|
||||
ipnetwork = { workspace = true }
|
||||
|
||||
@@ -944,6 +944,7 @@ pub struct Wireguard {
|
||||
|
||||
/// Tunnel port announced to external clients wishing to connect to the wireguard interface.
|
||||
/// Useful in the instances where the node is behind a proxy.
|
||||
#[serde(alias = "announced_port")]
|
||||
pub announced_tunnel_port: u16,
|
||||
|
||||
/// Metadata port announced to external clients wishing to connect to the metadata endpoint.
|
||||
@@ -1001,6 +1002,7 @@ impl From<Wireguard> for nym_authenticator::config::Authenticator {
|
||||
tunnel_announced_port: value.announced_tunnel_port,
|
||||
private_network_prefix_v4: value.private_network_prefix_v4,
|
||||
private_network_prefix_v6: value.private_network_prefix_v6,
|
||||
peer_interaction_timeout: nym_authenticator::config::default_peer_interaction_timeout(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,6 +63,7 @@ pub struct WireguardV10 {
|
||||
|
||||
/// Port announced to external clients wishing to connect to the wireguard interface.
|
||||
/// Useful in the instances where the node is behind a proxy.
|
||||
#[serde(alias = "announced_tunnel_port")]
|
||||
pub announced_port: u16,
|
||||
|
||||
/// The prefix denoting the maximum number of the clients that can be connected via Wireguard using IPv4.
|
||||
|
||||
Reference in New Issue
Block a user