Compare commits

...

3 Commits

Author SHA1 Message Date
Jędrzej Stuczyński 351acb7875 expose cancelled 2025-10-13 15:46:29 +01:00
Jędrzej Stuczyński 7f4ef7f772 add drop guard to client tasks 2025-10-13 15:45:46 +01:00
Jędrzej Stuczyński 5025c49a0e using same hierarchy of trackers for client shutdown control 2025-10-13 15:45:46 +01:00
14 changed files with 50 additions and 18 deletions
@@ -341,8 +341,14 @@ where
debug_config.cover_traffic,
stats_tx,
);
shutdown_tracker
.try_spawn_named_with_shutdown(async move { stream.run().await }, "CoverTrafficStream");
let drop_guard = shutdown_tracker.clone_shutdown_token().drop_guard();
shutdown_tracker.try_spawn_named_with_shutdown(
async move {
let _ = drop_guard;
stream.run().await
},
"CoverTrafficStream",
);
}
#[allow(clippy::too_many_arguments)]
@@ -419,8 +425,10 @@ where
"AcknowledgementController::RetransmissionRequestListener",
);
let drop_guard = shutdown_tracker.clone_shutdown_token().drop_guard();
shutdown_tracker.try_spawn_named_with_shutdown(
async move {
let _ = drop_guard;
sent_notification_listener.run().await;
},
"AcknowledgementController::SentNotificationListener",
@@ -431,8 +439,6 @@ where
async move { ack_action_controller.run(shutdown_token).await },
"AcknowledgementController::ActionController",
);
// .start(packet_type);
}
// buffer controlling all messages fetched from provider
@@ -705,8 +711,12 @@ where
// don't spawn the refresher if we don't want to be refreshing the topology.
// only use the initial values obtained
info!("Starting topology refresher...");
let drop_guard = shutdown_tracker.clone_shutdown_token().drop_guard();
shutdown_tracker.try_spawn_named_with_shutdown(
async move { topology_refresher.run().await },
async move {
let _ = drop_guard;
topology_refresher.run().await
},
"TopologyRefresher",
);
}
@@ -892,7 +902,7 @@ where
// Create a shutdown tracker for this client - either as a child of provided tracker
// or get one from the registry
let shutdown_tracker = match self.shutdown {
Some(parent_tracker) => parent_tracker.child_tracker(),
Some(parent_tracker) => parent_tracker.clone(),
None => nym_task::get_sdk_shutdown_tracker()?,
};
@@ -926,7 +936,7 @@ where
self.user_agent.clone(),
generate_client_stats_id(*self_address.identity()),
input_sender.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
);
// needs to be started as the first thing to block if required waiting for the gateway
@@ -936,7 +946,7 @@ where
shared_topology_accessor.clone(),
self_address.gateway(),
self.wait_for_gateway,
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
)
.await?;
@@ -956,7 +966,7 @@ where
stats_reporter.clone(),
#[cfg(unix)]
self.connection_fd_callback,
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
)
.await?;
let gateway_ws_fd = gateway_transceiver.ws_fd();
@@ -964,7 +974,7 @@ where
let reply_storage = Self::setup_persistent_reply_storage(
reply_storage_backend,
key_rotation_config,
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
)
.await?;
@@ -975,7 +985,7 @@ where
reply_storage.key_storage(),
reply_controller_sender.clone(),
stats_reporter.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
);
// The message_sender is the transmitter for any component generating sphinx packets
@@ -983,10 +993,8 @@ where
// traffic stream.
// The MixTrafficController then sends the actual traffic
let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
gateway_transceiver,
&shutdown_tracker.child_tracker(),
);
let (message_sender, client_request_sender) =
Self::start_mix_traffic_controller(gateway_transceiver, &shutdown_tracker);
// Channels that the websocket listener can use to signal downstream to the real traffic
// controller that connections are closed.
@@ -1015,7 +1023,7 @@ where
shared_lane_queue_lengths.clone(),
client_connection_rx,
stats_reporter.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
);
if !self
@@ -1031,7 +1039,7 @@ where
shared_topology_accessor.clone(),
message_sender,
stats_reporter.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
);
}
@@ -138,6 +138,7 @@ impl MixTrafficController {
pub async fn run(&mut self) {
debug!("Started MixTrafficController with graceful shutdown support");
let _drop_guard = self.shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
@@ -80,6 +80,7 @@ impl AcknowledgementListener {
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started AcknowledgementListener with graceful shutdown support");
let _drop_guard = shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
@@ -245,6 +245,7 @@ impl ActionController {
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started ActionController with graceful shutdown support");
let _drop_guard = shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
@@ -216,6 +216,7 @@ where
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started InputMessageListener with graceful shutdown support");
let _drop_guard = shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
@@ -167,6 +167,7 @@ where
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started RetransmissionRequestListener with graceful shutdown support");
let _drop_guard = shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
@@ -585,6 +585,7 @@ where
// avoid borrow on self
let shutdown_token = self.shutdown_token.clone();
let _drop_guard = shutdown_token.clone().drop_guard();
#[cfg(not(target_arch = "wasm32"))]
{
let mut status_timer = tokio::time::interval(Duration::from_secs(5));
@@ -497,6 +497,8 @@ impl<R: MessageReceiver> RequestReceiver<R> {
pub(crate) async fn run(&mut self) {
debug!("Started RequestReceiver with graceful shutdown support");
let _drop_guard = self.shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
@@ -540,6 +542,8 @@ impl<R: MessageReceiver> FragmentedMessageReceiver<R> {
pub(crate) async fn run(&mut self) -> Result<(), MessageRecoveryError> {
debug!("Started FragmentedMessageReceiver with graceful shutdown support");
let _drop_guard = self.shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
@@ -152,6 +152,7 @@ where
let polling_rate = self.config.key_rotation.epoch_duration / 8;
let mut invalidation_inspection = new_interval_stream(polling_rate);
let _drop_guard = shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
@@ -119,6 +119,8 @@ impl StatisticsControl {
let mut snapshot_interval =
gloo_timers::future::IntervalStream::new(SNAPSHOT_INTERVAL.as_millis() as u32);
let _drop_guard = shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
@@ -46,6 +46,7 @@ where
debug!("Started PersistentReplyStorage");
if let Err(err) = self.backend.start_storage_session().await {
shutdown.cancel();
error!("failed to start the storage session - {err}");
return;
}
@@ -89,6 +89,8 @@ impl PartiallyDelegatedRouter {
async fn run(mut self, mut split_stream: SplitStream<WsConn>, shutdown_token: ShutdownToken) {
let mut chunked_stream = (&mut split_stream).ready_chunks(8);
let drop_guard = shutdown_token.clone().drop_guard();
let ret: Result<_, GatewayClientError> = loop {
tokio::select! {
biased;
@@ -101,6 +103,7 @@ impl PartiallyDelegatedRouter {
// received request to stop the task and return the stream
_ = &mut self.stream_return_requester => {
log::debug!("received request to return the split ws stream");
drop_guard.disarm();
break Ok(())
}
socket_msgs = chunked_stream.next() => {
+1 -1
View File
@@ -759,7 +759,7 @@ where
client_output,
client_state.clone(),
nym_address,
started_client.shutdown_handle.child_tracker(),
started_client.shutdown_handle.clone(),
packet_type,
);
@@ -25,6 +25,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::RwLockReadGuard;
use tokio_util::sync::CancellationToken;
use tokio_util::sync::WaitForCancellationFutureOwned;
/// Client connected to the Nym mixnet.
pub struct MixnetClient {
@@ -273,6 +274,12 @@ impl MixnetClient {
}
}
}
pub fn cancelled(&self) -> WaitForCancellationFutureOwned {
self.shutdown_handle
.clone_shutdown_token()
.cancelled_owned()
}
}
#[derive(Clone)]