Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 30e60c738f | |||
| adf22e330a |
@@ -23,10 +23,9 @@ use nym_wireguard_types::PeerPublicKey;
|
||||
|
||||
mod error;
|
||||
mod helpers;
|
||||
mod mixnet_listener;
|
||||
pub mod mixnet_listener;
|
||||
|
||||
pub use crate::error::{Error, Result};
|
||||
pub use crate::mixnet_listener::{AuthClientMixnetListener, AuthClientMixnetListenerHandle};
|
||||
|
||||
pub struct AuthenticatorClient {
|
||||
mixnet_listener: MixnetMessageBroadcastReceiver,
|
||||
|
||||
@@ -16,119 +16,83 @@ pub type MixnetMessageBroadcastReceiver = broadcast::Receiver<Arc<ReconstructedM
|
||||
pub type MixnetMessageInputSender = mpsc::Sender<InputMessage>;
|
||||
pub type MixnetMessageInputReceiver = mpsc::Receiver<InputMessage>; // This could be another type, to abstract the mixnet message creation to here
|
||||
|
||||
// The AuthClientsMixnetListener listens to mixnet messages and rebroadcasts them to the
|
||||
// Spawn a task that listens to mixnet messages and rebroadcasts them to the
|
||||
// AuthClients, or whoever else is interested.
|
||||
// It also manages the message input for the mixnet so it can keep the sole ownership of the MixnetClient
|
||||
//
|
||||
// NOTE: this is potentially bit wasteful. Ideally we should have proper channels where the
|
||||
// recipient only gets messages they're interested in.
|
||||
pub struct AuthClientMixnetListener {
|
||||
// The mixnet client that we're listening to
|
||||
mixnet_client: MixnetClient,
|
||||
|
||||
// Broadcast channel for the messages that we re-broadcast to the AuthClients
|
||||
message_broadcast: MixnetMessageBroadcastSender,
|
||||
|
||||
// Channel for message to send to the mixnet
|
||||
input_message_tx: MixnetMessageInputSender, // we keep on to make sure it's open
|
||||
input_message_rx: MixnetMessageInputReceiver,
|
||||
|
||||
// Listen to cancel from the outside world
|
||||
pub fn spawn(
|
||||
mut mixnet_client: MixnetClient,
|
||||
shutdown_token: CancellationToken,
|
||||
}
|
||||
) -> (JoinHandle<()>, AuthClientMixnetListenerHandle) {
|
||||
// Broadcast channel for the messages that we re-broadcast to the AuthClients
|
||||
let (message_broadcast, _) = broadcast::channel(100);
|
||||
// Channel for message to send to the mixnet
|
||||
let (input_message_tx, mut input_message_rx) = mpsc::channel(100);
|
||||
|
||||
impl AuthClientMixnetListener {
|
||||
pub fn new(mixnet_client: MixnetClient, shutdown_token: CancellationToken) -> Self {
|
||||
let (message_broadcast, _) = broadcast::channel(100);
|
||||
let (input_message_tx, input_message_rx) = mpsc::channel(100);
|
||||
Self {
|
||||
mixnet_client,
|
||||
message_broadcast,
|
||||
input_message_tx,
|
||||
input_message_rx,
|
||||
shutdown_token,
|
||||
}
|
||||
}
|
||||
let cloned_message_broadcast = message_broadcast.clone();
|
||||
let cloned_message_sender = input_message_tx.clone();
|
||||
let child_shutdown_token = shutdown_token.child_token();
|
||||
|
||||
async fn run(mut self) -> Self {
|
||||
let mixnet_cancel_token = self.mixnet_client.cancellation_token();
|
||||
self.shutdown_token.run_until_cancelled(async {
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = mixnet_cancel_token.cancelled() => {
|
||||
tracing::debug!("AuthClientMixnetListener: mixnet client was shutdown");
|
||||
break;
|
||||
let join_handle = tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = child_shutdown_token.cancelled() => {
|
||||
tracing::debug!("AuthClientMixnetListener: received shutdown");
|
||||
// Disconnect mixnet which should send forget_me or remember_me if needed.
|
||||
mixnet_client.disconnect().await;
|
||||
break;
|
||||
}
|
||||
|
||||
// Sending loop
|
||||
input_msg = input_message_rx.recv() => {
|
||||
match input_msg {
|
||||
None => {
|
||||
tracing::error!("All senders were dropped. It shouldn't happen as we're holding one");
|
||||
break;
|
||||
},
|
||||
Some(mix_msg) => {
|
||||
if let Err(err) = mixnet_client.send(mix_msg).await {
|
||||
tracing::error!("Failed to send mixnet message: {err}");
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
// Receiving loop
|
||||
msg = mixnet_client.next() => {
|
||||
match msg {
|
||||
None => {
|
||||
tracing::error!("Mixnet client stream ended unexpectedly");
|
||||
break;
|
||||
},
|
||||
Some(event) => {
|
||||
if let Err(err) = message_broadcast.send(Arc::new(event)) {
|
||||
tracing::error!("Failed to broadcast mixnet message: {err}");
|
||||
}
|
||||
},
|
||||
|
||||
// Sending loop
|
||||
input_msg = self.input_message_rx.recv() => {
|
||||
match input_msg {
|
||||
None => {
|
||||
tracing::error!("All senders were dropped. It shouldn't happen as we're holding one");
|
||||
break;
|
||||
},
|
||||
Some(mix_msg) => {
|
||||
if let Err(err) = self.mixnet_client.send(mix_msg).await {
|
||||
tracing::error!("Failed to send mixnet message: {err}");
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
// Receiving loop
|
||||
msg = self.mixnet_client.next() => {
|
||||
match msg {
|
||||
None => {
|
||||
tracing::error!("Mixnet client stream ended unexpectedly");
|
||||
break;
|
||||
},
|
||||
Some(event) => {
|
||||
if let Err(err) = self.message_broadcast.send(Arc::new(event)) {
|
||||
tracing::error!("Failed to broadcast mixnet message: {err}");
|
||||
}
|
||||
},
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::debug!("AuthClientMixnetListener is shutting down");
|
||||
}).await;
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
// Disconnects the mixnet client and effectively drop itself, since it doesn't work without one, and reconnecting isn't supported
|
||||
pub async fn disconnect_mixnet_client(self) {
|
||||
if !self.mixnet_client.cancellation_token().is_cancelled() {
|
||||
self.mixnet_client.disconnect().await;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start(self) -> AuthClientMixnetListenerHandle {
|
||||
let message_broadcast = self.message_broadcast.clone();
|
||||
let message_sender = self.input_message_tx.clone();
|
||||
// Allows stopping only this, e.g. if we don't need it in the new bandwidth controller
|
||||
let cancellation_token = self.shutdown_token.clone();
|
||||
let mixnet_cancellation_token = self.mixnet_client.cancellation_token();
|
||||
let handle = tokio::spawn(self.run());
|
||||
tracing::debug!("AuthClientMixnetListener is shutting down");
|
||||
});
|
||||
|
||||
(
|
||||
join_handle,
|
||||
AuthClientMixnetListenerHandle {
|
||||
message_broadcast,
|
||||
message_sender,
|
||||
cancellation_token,
|
||||
mixnet_cancellation_token,
|
||||
handle,
|
||||
}
|
||||
}
|
||||
message_broadcast: cloned_message_broadcast,
|
||||
message_sender: cloned_message_sender,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
/// Handle that enables the consumer to subscribe or submit messages to the mixnet.
|
||||
pub struct AuthClientMixnetListenerHandle {
|
||||
message_broadcast: MixnetMessageBroadcastSender,
|
||||
message_sender: MixnetMessageInputSender,
|
||||
cancellation_token: CancellationToken,
|
||||
mixnet_cancellation_token: CancellationToken,
|
||||
handle: JoinHandle<AuthClientMixnetListener>,
|
||||
}
|
||||
|
||||
impl AuthClientMixnetListenerHandle {
|
||||
@@ -139,22 +103,4 @@ impl AuthClientMixnetListenerHandle {
|
||||
pub fn subscribe(&self) -> MixnetMessageBroadcastReceiver {
|
||||
self.message_broadcast.subscribe()
|
||||
}
|
||||
|
||||
pub fn mixnet_cancel_token(&self) -> CancellationToken {
|
||||
self.mixnet_cancellation_token.clone()
|
||||
}
|
||||
|
||||
pub async fn stop(self) {
|
||||
// If shutdown was externally called, that call is a no-op
|
||||
// If we're only stopping this, it is very much needed
|
||||
self.cancellation_token.cancel();
|
||||
match self.handle.await {
|
||||
Ok(auth_client_mixnet_listener) => {
|
||||
auth_client_mixnet_listener.disconnect_mixnet_client().await;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Error waiting for auth clients mixnet listener to stop: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ use base64::{Engine as _, engine::general_purpose};
|
||||
use bytes::BytesMut;
|
||||
use clap::Args;
|
||||
use futures::StreamExt;
|
||||
use nym_authenticator_client::{AuthClientMixnetListener, AuthenticatorClient};
|
||||
use nym_authenticator_client::{AuthenticatorClient, mixnet_listener};
|
||||
use nym_authenticator_requests::{
|
||||
AuthenticatorVersion, client_message::ClientMessage, response::AuthenticatorResponse, v2, v3,
|
||||
v4, v5,
|
||||
@@ -400,8 +400,9 @@ impl Probe {
|
||||
(node_info.authenticator_address, node_info.ip_address)
|
||||
{
|
||||
// Start the mixnet listener that the auth clients use to receive messages.
|
||||
let mixnet_listener_task =
|
||||
AuthClientMixnetListener::new(mixnet_client, CancellationToken::new()).start();
|
||||
let mixnet_listener_cancel_token = CancellationToken::new();
|
||||
let (mixnet_listener_join_handle, mixnet_listener_task) =
|
||||
mixnet_listener::spawn(mixnet_client, mixnet_listener_cancel_token.child_token());
|
||||
|
||||
let auth_client = AuthenticatorClient::new(
|
||||
mixnet_listener_task.subscribe(),
|
||||
@@ -441,7 +442,10 @@ impl Probe {
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
mixnet_listener_task.stop().await;
|
||||
mixnet_listener_cancel_token.cancel();
|
||||
if let Err(e) = mixnet_listener_join_handle.await {
|
||||
tracing::error!("Failed to join on mixnet listener: {e}");
|
||||
}
|
||||
|
||||
outcome
|
||||
} else {
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use nym_authenticator_client::{AuthClientMixnetListener, AuthenticatorClient};
|
||||
use nym_authenticator_client::{AuthenticatorClient, mixnet_listener};
|
||||
use nym_bandwidth_controller::BandwidthTicketProvider;
|
||||
use nym_credentials_interface::TicketType;
|
||||
use nym_ip_packet_client::IprClientConnect;
|
||||
@@ -85,8 +85,11 @@ impl RegistrationClient {
|
||||
|
||||
// Start the auth client mixnet listener, which will listen for incoming messages from the
|
||||
// mixnet and rebroadcast them to the auth clients.
|
||||
let mixnet_listener =
|
||||
AuthClientMixnetListener::new(self.mixnet_client, self.cancel_token.clone()).start();
|
||||
let (mixnet_listener_join_handle, mixnet_listener) = mixnet_listener::spawn(
|
||||
self.mixnet_client,
|
||||
// todo: are we sure we want to clone the cancel token here?
|
||||
self.cancel_token.clone(),
|
||||
);
|
||||
|
||||
let mut entry_auth_client = AuthenticatorClient::new(
|
||||
mixnet_listener.subscribe(),
|
||||
@@ -139,6 +142,7 @@ impl RegistrationClient {
|
||||
entry_gateway_data: entry,
|
||||
exit_gateway_data: exit,
|
||||
authenticator_listener_handle: mixnet_listener,
|
||||
mixnet_listener_join_handle,
|
||||
bw_controller: self.bandwidth_controller,
|
||||
},
|
||||
)))
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use nym_authenticator_client::{AuthClientMixnetListenerHandle, AuthenticatorClient};
|
||||
use nym_authenticator_client::{
|
||||
AuthenticatorClient, mixnet_listener::AuthClientMixnetListenerHandle,
|
||||
};
|
||||
use nym_bandwidth_controller::BandwidthTicketProvider;
|
||||
use nym_registration_common::{AssignedAddresses, GatewayData};
|
||||
use nym_sdk::mixnet::MixnetClient;
|
||||
@@ -22,5 +24,6 @@ pub struct WireguardRegistrationResult {
|
||||
pub entry_gateway_data: GatewayData,
|
||||
pub exit_gateway_data: GatewayData,
|
||||
pub authenticator_listener_handle: AuthClientMixnetListenerHandle,
|
||||
pub mixnet_listener_join_handle: tokio::task::JoinHandle<()>,
|
||||
pub bw_controller: Box<dyn BandwidthTicketProvider>,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user