Compare commits

...

2 Commits

Author SHA1 Message Date
Andrej Mihajlov 30e60c738f Return join handle separately 2025-10-15 10:22:00 +02:00
Andrej Mihajlov adf22e330a Remove superficial AuthClientMixnetListener, replace with spawn() 2025-10-15 09:27:44 +02:00
5 changed files with 76 additions and 120 deletions
+1 -2
View File
@@ -23,10 +23,9 @@ use nym_wireguard_types::PeerPublicKey;
mod error;
mod helpers;
mod mixnet_listener;
pub mod mixnet_listener;
pub use crate::error::{Error, Result};
pub use crate::mixnet_listener::{AuthClientMixnetListener, AuthClientMixnetListenerHandle};
pub struct AuthenticatorClient {
mixnet_listener: MixnetMessageBroadcastReceiver,
+56 -110
View File
@@ -16,119 +16,83 @@ pub type MixnetMessageBroadcastReceiver = broadcast::Receiver<Arc<ReconstructedM
pub type MixnetMessageInputSender = mpsc::Sender<InputMessage>;
pub type MixnetMessageInputReceiver = mpsc::Receiver<InputMessage>; // This could be another type, to abstract the mixnet message creation to here
// The AuthClientsMixnetListener listens to mixnet messages and rebroadcasts them to the
// Spawn a task that listens to mixnet messages and rebroadcasts them to the
// AuthClients, or whoever else is interested.
// It also manages the message input for the mixnet so it can keep the sole ownership of the MixnetClient
//
// NOTE: this is potentially bit wasteful. Ideally we should have proper channels where the
// recipient only gets messages they're interested in.
pub struct AuthClientMixnetListener {
// The mixnet client that we're listening to
mixnet_client: MixnetClient,
// Broadcast channel for the messages that we re-broadcast to the AuthClients
message_broadcast: MixnetMessageBroadcastSender,
// Channel for message to send to the mixnet
input_message_tx: MixnetMessageInputSender, // we keep on to make sure it's open
input_message_rx: MixnetMessageInputReceiver,
// Listen to cancel from the outside world
pub fn spawn(
mut mixnet_client: MixnetClient,
shutdown_token: CancellationToken,
}
) -> (JoinHandle<()>, AuthClientMixnetListenerHandle) {
// Broadcast channel for the messages that we re-broadcast to the AuthClients
let (message_broadcast, _) = broadcast::channel(100);
// Channel for message to send to the mixnet
let (input_message_tx, mut input_message_rx) = mpsc::channel(100);
impl AuthClientMixnetListener {
pub fn new(mixnet_client: MixnetClient, shutdown_token: CancellationToken) -> Self {
let (message_broadcast, _) = broadcast::channel(100);
let (input_message_tx, input_message_rx) = mpsc::channel(100);
Self {
mixnet_client,
message_broadcast,
input_message_tx,
input_message_rx,
shutdown_token,
}
}
let cloned_message_broadcast = message_broadcast.clone();
let cloned_message_sender = input_message_tx.clone();
let child_shutdown_token = shutdown_token.child_token();
async fn run(mut self) -> Self {
let mixnet_cancel_token = self.mixnet_client.cancellation_token();
self.shutdown_token.run_until_cancelled(async {
loop {
tokio::select! {
biased;
_ = mixnet_cancel_token.cancelled() => {
tracing::debug!("AuthClientMixnetListener: mixnet client was shutdown");
break;
let join_handle = tokio::spawn(async move {
loop {
tokio::select! {
biased;
_ = child_shutdown_token.cancelled() => {
tracing::debug!("AuthClientMixnetListener: received shutdown");
// Disconnect mixnet which should send forget_me or remember_me if needed.
mixnet_client.disconnect().await;
break;
}
// Sending loop
input_msg = input_message_rx.recv() => {
match input_msg {
None => {
tracing::error!("All senders were dropped. It shouldn't happen as we're holding one");
break;
},
Some(mix_msg) => {
if let Err(err) = mixnet_client.send(mix_msg).await {
tracing::error!("Failed to send mixnet message: {err}");
}
},
}
}
// Receiving loop
msg = mixnet_client.next() => {
match msg {
None => {
tracing::error!("Mixnet client stream ended unexpectedly");
break;
},
Some(event) => {
if let Err(err) = message_broadcast.send(Arc::new(event)) {
tracing::error!("Failed to broadcast mixnet message: {err}");
}
},
// Sending loop
input_msg = self.input_message_rx.recv() => {
match input_msg {
None => {
tracing::error!("All senders were dropped. It shouldn't happen as we're holding one");
break;
},
Some(mix_msg) => {
if let Err(err) = self.mixnet_client.send(mix_msg).await {
tracing::error!("Failed to send mixnet message: {err}");
}
},
}
}
// Receiving loop
msg = self.mixnet_client.next() => {
match msg {
None => {
tracing::error!("Mixnet client stream ended unexpectedly");
break;
},
Some(event) => {
if let Err(err) = self.message_broadcast.send(Arc::new(event)) {
tracing::error!("Failed to broadcast mixnet message: {err}");
}
},
}
}
}
}
tracing::debug!("AuthClientMixnetListener is shutting down");
}).await;
self
}
// Disconnects the mixnet client and effectively drop itself, since it doesn't work without one, and reconnecting isn't supported
pub async fn disconnect_mixnet_client(self) {
if !self.mixnet_client.cancellation_token().is_cancelled() {
self.mixnet_client.disconnect().await;
}
}
pub fn start(self) -> AuthClientMixnetListenerHandle {
let message_broadcast = self.message_broadcast.clone();
let message_sender = self.input_message_tx.clone();
// Allows stopping only this, e.g. if we don't need it in the new bandwidth controller
let cancellation_token = self.shutdown_token.clone();
let mixnet_cancellation_token = self.mixnet_client.cancellation_token();
let handle = tokio::spawn(self.run());
tracing::debug!("AuthClientMixnetListener is shutting down");
});
(
join_handle,
AuthClientMixnetListenerHandle {
message_broadcast,
message_sender,
cancellation_token,
mixnet_cancellation_token,
handle,
}
}
message_broadcast: cloned_message_broadcast,
message_sender: cloned_message_sender,
},
)
}
/// Handle that enables the consumer to subscribe or submit messages to the mixnet.
pub struct AuthClientMixnetListenerHandle {
message_broadcast: MixnetMessageBroadcastSender,
message_sender: MixnetMessageInputSender,
cancellation_token: CancellationToken,
mixnet_cancellation_token: CancellationToken,
handle: JoinHandle<AuthClientMixnetListener>,
}
impl AuthClientMixnetListenerHandle {
@@ -139,22 +103,4 @@ impl AuthClientMixnetListenerHandle {
pub fn subscribe(&self) -> MixnetMessageBroadcastReceiver {
self.message_broadcast.subscribe()
}
pub fn mixnet_cancel_token(&self) -> CancellationToken {
self.mixnet_cancellation_token.clone()
}
pub async fn stop(self) {
// If shutdown was externally called, that call is a no-op
// If we're only stopping this, it is very much needed
self.cancellation_token.cancel();
match self.handle.await {
Ok(auth_client_mixnet_listener) => {
auth_client_mixnet_listener.disconnect_mixnet_client().await;
}
Err(e) => {
tracing::error!("Error waiting for auth clients mixnet listener to stop: {e}");
}
}
}
}
+8 -4
View File
@@ -13,7 +13,7 @@ use base64::{Engine as _, engine::general_purpose};
use bytes::BytesMut;
use clap::Args;
use futures::StreamExt;
use nym_authenticator_client::{AuthClientMixnetListener, AuthenticatorClient};
use nym_authenticator_client::{AuthenticatorClient, mixnet_listener};
use nym_authenticator_requests::{
AuthenticatorVersion, client_message::ClientMessage, response::AuthenticatorResponse, v2, v3,
v4, v5,
@@ -400,8 +400,9 @@ impl Probe {
(node_info.authenticator_address, node_info.ip_address)
{
// Start the mixnet listener that the auth clients use to receive messages.
let mixnet_listener_task =
AuthClientMixnetListener::new(mixnet_client, CancellationToken::new()).start();
let mixnet_listener_cancel_token = CancellationToken::new();
let (mixnet_listener_join_handle, mixnet_listener_task) =
mixnet_listener::spawn(mixnet_client, mixnet_listener_cancel_token.child_token());
let auth_client = AuthenticatorClient::new(
mixnet_listener_task.subscribe(),
@@ -441,7 +442,10 @@ impl Probe {
.await
.unwrap_or_default();
mixnet_listener_task.stop().await;
mixnet_listener_cancel_token.cancel();
if let Err(e) = mixnet_listener_join_handle.await {
tracing::error!("Failed to join on mixnet listener: {e}");
}
outcome
} else {
+7 -3
View File
@@ -3,7 +3,7 @@
use tokio_util::sync::CancellationToken;
use nym_authenticator_client::{AuthClientMixnetListener, AuthenticatorClient};
use nym_authenticator_client::{AuthenticatorClient, mixnet_listener};
use nym_bandwidth_controller::BandwidthTicketProvider;
use nym_credentials_interface::TicketType;
use nym_ip_packet_client::IprClientConnect;
@@ -85,8 +85,11 @@ impl RegistrationClient {
// Start the auth client mixnet listener, which will listen for incoming messages from the
// mixnet and rebroadcast them to the auth clients.
let mixnet_listener =
AuthClientMixnetListener::new(self.mixnet_client, self.cancel_token.clone()).start();
let (mixnet_listener_join_handle, mixnet_listener) = mixnet_listener::spawn(
self.mixnet_client,
// todo: are we sure we want to clone the cancel token here?
self.cancel_token.clone(),
);
let mut entry_auth_client = AuthenticatorClient::new(
mixnet_listener.subscribe(),
@@ -139,6 +142,7 @@ impl RegistrationClient {
entry_gateway_data: entry,
exit_gateway_data: exit,
authenticator_listener_handle: mixnet_listener,
mixnet_listener_join_handle,
bw_controller: self.bandwidth_controller,
},
)))
+4 -1
View File
@@ -1,7 +1,9 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_authenticator_client::{AuthClientMixnetListenerHandle, AuthenticatorClient};
use nym_authenticator_client::{
AuthenticatorClient, mixnet_listener::AuthClientMixnetListenerHandle,
};
use nym_bandwidth_controller::BandwidthTicketProvider;
use nym_registration_common::{AssignedAddresses, GatewayData};
use nym_sdk::mixnet::MixnetClient;
@@ -22,5 +24,6 @@ pub struct WireguardRegistrationResult {
pub entry_gateway_data: GatewayData,
pub exit_gateway_data: GatewayData,
pub authenticator_listener_handle: AuthClientMixnetListenerHandle,
pub mixnet_listener_join_handle: tokio::task::JoinHandle<()>,
pub bw_controller: Box<dyn BandwidthTicketProvider>,
}