disconnect mixnet client if registration fails (#6169)

Co-authored-by: Simon Wicky <simon@nymtech.net>
This commit is contained in:
Jędrzej Stuczyński
2025-10-31 12:07:22 +00:00
committed by GitHub
parent 3e9b8d237f
commit 378f32e6d7
4 changed files with 108 additions and 66 deletions
@@ -50,7 +50,7 @@ impl AuthClientMixnetListener {
}
}
async fn run(mut self) -> Self {
async fn run(mut self) {
let mixnet_cancel_token = self.mixnet_client.cancellation_token();
self.shutdown_token.run_until_cancelled(async {
loop {
@@ -95,12 +95,8 @@ impl AuthClientMixnetListener {
tracing::debug!("AuthClientMixnetListener is shutting down");
}).await;
self
}
// Disconnects the mixnet client and effectively drop itself, since it doesn't work without one, and reconnecting isn't supported
pub async fn disconnect_mixnet_client(self) {
if !self.mixnet_client.cancellation_token().is_cancelled() {
tracing::debug!("AuthClientMixnetListener: Disconnect mixnet client");
if !mixnet_cancel_token.is_cancelled() {
self.mixnet_client.disconnect().await;
}
}
@@ -128,7 +124,7 @@ pub struct AuthClientMixnetListenerHandle {
message_sender: MixnetMessageInputSender,
cancellation_token: CancellationToken,
mixnet_cancellation_token: CancellationToken,
handle: JoinHandle<AuthClientMixnetListener>,
handle: JoinHandle<()>,
}
impl AuthClientMixnetListenerHandle {
@@ -148,13 +144,8 @@ impl AuthClientMixnetListenerHandle {
// If shutdown was externally called, that call is a no-op
// If we're only stopping this, it is very much needed
self.cancellation_token.cancel();
match self.handle.await {
Ok(auth_client_mixnet_listener) => {
auth_client_mixnet_listener.disconnect_mixnet_client().await;
}
Err(e) => {
tracing::error!("Error waiting for auth clients mixnet listener to stop: {e}");
}
if let Err(e) = self.handle.await {
tracing::error!("Error waiting for auth clients mixnet listener to stop: {e}")
}
}
}
+1 -1
View File
@@ -765,7 +765,7 @@ async fn connect_exit(
);
// The IPR supports cancellation, but it's unused in the gateway probe
let cancel_token = CancellationToken::new();
let mut ipr_client = IprClientConnect::new(mixnet_client, cancel_token).await;
let mut ipr_client = IprClientConnect::new(mixnet_client, cancel_token);
let maybe_ip_pair = ipr_client.connect(exit_router_address).await;
let mixnet_client = ipr_client.into_mixnet_client();
+1 -1
View File
@@ -43,7 +43,7 @@ pub struct IprClientConnect {
}
impl IprClientConnect {
pub async fn new(mixnet_client: MixnetClient, cancel_token: CancellationToken) -> Self {
pub fn new(mixnet_client: MixnetClient, cancel_token: CancellationToken) -> Self {
Self {
mixnet_client,
connected: ConnectionState::Disconnected,
+100 -49
View File
@@ -9,6 +9,7 @@ use nym_credentials_interface::TicketType;
use nym_ip_packet_client::IprClientConnect;
use nym_registration_common::AssignedAddresses;
use nym_sdk::mixnet::{EventReceiver, MixnetClient, Recipient};
use tracing::debug;
use crate::config::RegistrationClientConfig;
@@ -34,23 +35,49 @@ pub struct RegistrationClient {
event_rx: EventReceiver,
}
// Bundle of an actual error and the underlying mixnet client so it can be shutdown correctly if needed
struct RegistrationError {
mixnet_client: Option<MixnetClient>,
source: crate::RegistrationClientError,
}
impl RegistrationClient {
async fn register_mix_exit(self) -> Result<RegistrationResult, RegistrationClientError> {
async fn register_mix_exit(self) -> Result<RegistrationResult, RegistrationError> {
let entry_mixnet_gateway_ip = self.config.entry.node.ip_address;
let exit_mixnet_gateway_ip = self.config.exit.node.ip_address;
let ipr_address = self.config.exit.node.ipr_address.ok_or(
RegistrationClientError::NoIpPacketRouterAddress {
node_id: self.config.exit.node.identity.to_base58_string(),
},
)?;
let Some(ipr_address) = self.config.exit.node.ipr_address else {
return Err(RegistrationError {
mixnet_client: Some(self.mixnet_client),
source: RegistrationClientError::NoIpPacketRouterAddress {
node_id: self.config.exit.node.identity.to_base58_string(),
},
});
};
let mut ipr_client =
IprClientConnect::new(self.mixnet_client, self.cancel_token.clone()).await;
let interface_addresses = ipr_client
.connect(ipr_address)
IprClientConnect::new(self.mixnet_client, self.cancel_token.child_token());
let interface_addresses = match self
.cancel_token
.run_until_cancelled(ipr_client.connect(ipr_address))
.await
.map_err(RegistrationClientError::ConnectToIpPacketRouter)?;
{
Some(Ok(addr)) => addr,
Some(Err(e)) => {
return Err(RegistrationError {
mixnet_client: Some(ipr_client.into_mixnet_client()),
source: RegistrationClientError::ConnectToIpPacketRouter(e),
});
}
None => {
return Err(RegistrationError {
mixnet_client: Some(ipr_client.into_mixnet_client()),
source: RegistrationClientError::Cancelled,
});
}
};
Ok(RegistrationResult::Mixnet(Box::new(
MixnetRegistrationResult {
@@ -67,18 +94,24 @@ impl RegistrationClient {
)))
}
async fn register_wg(self) -> Result<RegistrationResult, RegistrationClientError> {
let entry_auth_address = self.config.entry.node.authenticator_address.ok_or(
RegistrationClientError::AuthenticationNotPossible {
node_id: self.config.entry.node.identity.to_base58_string(),
},
)?;
async fn register_wg(self) -> Result<RegistrationResult, RegistrationError> {
let Some(entry_auth_address) = self.config.entry.node.authenticator_address else {
return Err(RegistrationError {
mixnet_client: Some(self.mixnet_client),
source: RegistrationClientError::AuthenticationNotPossible {
node_id: self.config.entry.node.identity.to_base58_string(),
},
});
};
let exit_auth_address = self.config.exit.node.authenticator_address.ok_or(
RegistrationClientError::AuthenticationNotPossible {
node_id: self.config.exit.node.identity.to_base58_string(),
},
)?;
let Some(exit_auth_address) = self.config.exit.node.authenticator_address else {
return Err(RegistrationError {
mixnet_client: Some(self.mixnet_client),
source: RegistrationClientError::AuthenticationNotPossible {
node_id: self.config.exit.node.identity.to_base58_string(),
},
});
};
let entry_version = self.config.entry.node.version;
tracing::debug!("Entry gateway version: {entry_version}");
@@ -87,8 +120,10 @@ impl RegistrationClient {
// Start the auth client mixnet listener, which will listen for incoming messages from the
// mixnet and rebroadcast them to the auth clients.
// From this point on, we don't need to care about the mixnet client anymore
let mixnet_listener =
AuthClientMixnetListener::new(self.mixnet_client, self.cancel_token.clone()).start();
AuthClientMixnetListener::new(self.mixnet_client, self.cancel_token.child_token())
.start();
let mut entry_auth_client = AuthenticatorClient::new(
mixnet_listener.subscribe(),
@@ -115,24 +150,33 @@ impl RegistrationClient {
let exit_fut = exit_auth_client
.register_wireguard(&*self.bandwidth_controller, TicketType::V1WireguardExit);
let (entry, exit) = Box::pin(async { tokio::join!(entry_fut, exit_fut) }).await;
let (entry, exit) = Box::pin(
self.cancel_token
.run_until_cancelled(async { tokio::join!(entry_fut, exit_fut) }),
)
.await
.ok_or(RegistrationError {
mixnet_client: None,
source: RegistrationClientError::Cancelled,
})?;
let entry =
entry.map_err(
|source| RegistrationClientError::EntryGatewayRegisterWireguard {
gateway_id: self.config.entry.node.identity.to_base58_string(),
authenticator_address: Box::new(entry_auth_address),
source: Box::new(source),
},
)?;
let exit =
exit.map_err(
|source| RegistrationClientError::ExitGatewayRegisterWireguard {
gateway_id: self.config.exit.node.identity.to_base58_string(),
authenticator_address: Box::new(exit_auth_address),
source: Box::new(source),
},
)?;
let entry = entry.map_err(|source| RegistrationError {
mixnet_client: None,
source: RegistrationClientError::EntryGatewayRegisterWireguard {
gateway_id: self.config.entry.node.identity.to_base58_string(),
authenticator_address: Box::new(entry_auth_address),
source: Box::new(source),
},
})?;
let exit = exit.map_err(|source| RegistrationError {
mixnet_client: None,
source: RegistrationClientError::EntryGatewayRegisterWireguard {
gateway_id: self.config.exit.node.identity.to_base58_string(),
authenticator_address: Box::new(exit_auth_address),
source: Box::new(source),
},
})?;
Ok(RegistrationResult::Wireguard(Box::new(
WireguardRegistrationResult {
@@ -147,16 +191,23 @@ impl RegistrationClient {
}
pub async fn register(self) -> Result<RegistrationResult, RegistrationClientError> {
self.cancel_token
.clone()
.run_until_cancelled(async {
if self.config.two_hops {
self.register_wg().await
} else {
self.register_mix_exit().await
let registration_result = if self.config.two_hops {
self.register_wg().await
} else {
self.register_mix_exit().await
};
// If we failed to register, and we were the owner of the mixnet client, shut it down
match registration_result {
Ok(result) => Ok(result),
Err(error) => {
debug!("Registration failed");
if let Some(mixnet_client) = error.mixnet_client {
debug!("Shutting down mixnet client");
mixnet_client.disconnect().await;
}
})
.await
.ok_or(RegistrationClientError::Cancelled)?
Err(error.source)
}
}
}
}