Compare commits

...

3 Commits

Author SHA1 Message Date
Andrej Mihajlov 81e72ba61b Add shutdown event handle 2025-10-14 20:01:47 +02:00
Jędrzej Stuczyński 0e3ffb749e using same hierarchy of trackers for client shutdown control 2025-10-14 13:13:39 +03:00
Bogdan-Ștefan Neacşu bd2e6ee812 Signal shutdown when gateway is considered unreachable 2025-10-14 12:12:16 +03:00
7 changed files with 49 additions and 36 deletions
Generated
-1
View File
@@ -5746,7 +5746,6 @@ dependencies = [
"criterion",
"ff",
"group",
"lazy_static",
"nym-contracts-common",
"nym-pemstore",
"rand 0.8.5",
@@ -892,7 +892,7 @@ where
// Create a shutdown tracker for this client - either as a child of provided tracker
// or get one from the registry
let shutdown_tracker = match self.shutdown {
Some(parent_tracker) => parent_tracker.child_tracker(),
Some(parent_tracker) => parent_tracker.clone(),
None => nym_task::get_sdk_shutdown_tracker()?,
};
@@ -926,7 +926,7 @@ where
self.user_agent.clone(),
generate_client_stats_id(*self_address.identity()),
input_sender.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker.clone(),
);
// needs to be started as the first thing to block if required waiting for the gateway
@@ -936,7 +936,7 @@ where
shared_topology_accessor.clone(),
self_address.gateway(),
self.wait_for_gateway,
&shutdown_tracker.child_tracker(),
&shutdown_tracker.clone(),
)
.await?;
@@ -956,7 +956,7 @@ where
stats_reporter.clone(),
#[cfg(unix)]
self.connection_fd_callback,
&shutdown_tracker.child_tracker(),
&shutdown_tracker.clone(),
)
.await?;
let gateway_ws_fd = gateway_transceiver.ws_fd();
@@ -964,7 +964,7 @@ where
let reply_storage = Self::setup_persistent_reply_storage(
reply_storage_backend,
key_rotation_config,
&shutdown_tracker.child_tracker(),
&shutdown_tracker.clone(),
)
.await?;
@@ -975,7 +975,7 @@ where
reply_storage.key_storage(),
reply_controller_sender.clone(),
stats_reporter.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker.clone(),
);
// The message_sender is the transmitter for any component generating sphinx packets
@@ -983,10 +983,8 @@ where
// traffic stream.
// The MixTrafficController then sends the actual traffic
let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
gateway_transceiver,
&shutdown_tracker.child_tracker(),
);
let (message_sender, client_request_sender) =
Self::start_mix_traffic_controller(gateway_transceiver, &shutdown_tracker.clone());
// Channels that the websocket listener can use to signal downstream to the real traffic
// controller that connections are closed.
@@ -1015,7 +1013,7 @@ where
shared_lane_queue_lengths.clone(),
client_connection_rx,
stats_reporter.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker.clone(),
);
if !self
@@ -1031,7 +1029,7 @@ where
shared_topology_accessor.clone(),
message_sender,
stats_reporter.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker.clone(),
);
}
@@ -155,6 +155,7 @@ impl MixTrafficController {
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
// Do we need to handle the embedded mixnet client case
// separately?
self.shutdown_token.cancel();
break;
}
}
@@ -51,12 +51,12 @@ impl AuthClientMixnetListener {
}
async fn run(mut self) -> Self {
let mixnet_cancel_token = self.mixnet_client.cancellation_token();
self.shutdown_token.run_until_cancelled(async {
let shutdown_event = self.mixnet_client.shutdown_event();
loop {
tokio::select! {
biased;
_ = mixnet_cancel_token.cancelled() => {
_ = shutdown_event.wait() => {
tracing::debug!("AuthClientMixnetListener: mixnet client was shutdown");
break;
}
@@ -100,9 +100,7 @@ impl AuthClientMixnetListener {
// Disconnects the mixnet client and effectively drop itself, since it doesn't work without one, and reconnecting isn't supported
pub async fn disconnect_mixnet_client(self) {
if !self.mixnet_client.cancellation_token().is_cancelled() {
self.mixnet_client.disconnect().await;
}
self.mixnet_client.disconnect().await;
}
pub fn start(self) -> AuthClientMixnetListenerHandle {
@@ -110,14 +108,14 @@ impl AuthClientMixnetListener {
let message_sender = self.input_message_tx.clone();
// Allows stopping only this, e.g. if we don't need it in the new bandwidth controller
let cancellation_token = self.shutdown_token.clone();
let mixnet_cancellation_token = self.mixnet_client.cancellation_token();
// let mixnet_cancellation_token = self.mixnet_client.cancellation_token();
let handle = tokio::spawn(self.run());
AuthClientMixnetListenerHandle {
message_broadcast,
message_sender,
cancellation_token,
mixnet_cancellation_token,
// mixnet_cancellation_token,
handle,
}
}
@@ -127,7 +125,7 @@ pub struct AuthClientMixnetListenerHandle {
message_broadcast: MixnetMessageBroadcastSender,
message_sender: MixnetMessageInputSender,
cancellation_token: CancellationToken,
mixnet_cancellation_token: CancellationToken,
// mixnet_cancellation_token: CancellationToken,
handle: JoinHandle<AuthClientMixnetListener>,
}
@@ -140,9 +138,9 @@ impl AuthClientMixnetListenerHandle {
self.message_broadcast.subscribe()
}
pub fn mixnet_cancel_token(&self) -> CancellationToken {
self.mixnet_cancellation_token.clone()
}
// pub fn mixnet_cancel_token(&self) -> CancellationToken {
// self.mixnet_cancellation_token.clone()
// }
pub async fn stop(self) {
// If shutdown was externally called, that call is a no-op
+5 -7
View File
@@ -133,8 +133,7 @@ impl IprClientConnect {
let timeout = sleep(IPR_CONNECT_TIMEOUT);
tokio::pin!(timeout);
let mixnet_cancel_token = self.mixnet_client.cancellation_token();
let shutdown_event = self.mixnet_client.shutdown_event();
loop {
tokio::select! {
@@ -142,15 +141,14 @@ impl IprClientConnect {
error!("Cancelled while waiting for reply to connect request");
return Err(Error::Cancelled);
},
_ = mixnet_cancel_token.cancelled() => {
error!("Mixnet client stopped while waiting for reply to connect request");
return Err(Error::Cancelled);
},
_ = &mut timeout => {
error!("Timed out waiting for reply to connect request");
return Err(Error::TimeoutWaitingForConnectResponse);
},
_ = shutdown_event.wait() => {
error!("Mixnet client stopped while waiting for reply to connect request");
return Err(Error::Cancelled);
},
msgs = self.mixnet_client.wait_for_messages() => match msgs {
None => {
return Err(Error::NoMixnetMessagesReceived);
+1 -1
View File
@@ -770,7 +770,7 @@ where
client_output,
client_state.clone(),
nym_address,
started_client.shutdown_handle.child_tracker(),
started_client.shutdown_handle.clone(),
packet_type,
);
+23 -4
View File
@@ -24,7 +24,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::RwLockReadGuard;
use tokio_util::sync::CancellationToken;
/// Client connected to the Nym mixnet.
pub struct MixnetClient {
@@ -124,9 +123,9 @@ impl MixnetClient {
}
/// Get a child token of the root, to monitor unexpected shutdown, without causing one
pub fn cancellation_token(&self) -> CancellationToken {
self.shutdown_handle.child_shutdown_token().inner().clone()
}
// pub fn cancellation_token(&self) -> CancellationToken {
// self.shutdown_handle.child_shutdown_token().inner().clone()
// }
pub fn client_request_sender(&self) -> ClientRequestSender {
self.client_request_sender.clone()
@@ -199,6 +198,13 @@ impl MixnetClient {
self.client_state.topology_accessor.release_manual_control()
}
/// Returns a shutdown event handle that can be used for waiting for the client to shutdown.
pub fn shutdown_event(&self) -> ShutdownEventHandle {
ShutdownEventHandle {
shutdown_handle: self.shutdown_handle.clone(),
}
}
/// Wait for messages from the mixnet
pub async fn wait_for_messages(&mut self) -> Option<Vec<ReconstructedMessage>> {
self.reconstructed_receiver.next().await
@@ -340,3 +346,16 @@ impl MixnetMessageSender for MixnetClientSender {
.map_err(|_| Error::MessageSendingFailure)
}
}
/// Handle for waiting on the shutdown event of the mixnet client.
pub struct ShutdownEventHandle {
shutdown_handle: ShutdownTracker,
}
impl ShutdownEventHandle {
/// Returns once mixnet client has been shut down.
/// If mixnet client is already shut down, returns immediately.
pub async fn wait(&self) {
self.shutdown_handle.wait_for_tracker().await
}
}