Transparent ShutdownManager with cascading ShutdownTrackers (#6040)

* Idea for transparent ShutdownManager use

* Tracker hierarchies

* Fix wasm shutdown, convinience shutdown method
This commit is contained in:
Drazen Urch
2025-09-17 12:51:00 +02:00
committed by GitHub
parent 44ac5e1ced
commit aa1cad4422
13 changed files with 219 additions and 90 deletions
@@ -1,9 +1,7 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::ClientCoreError;
use crate::{client::replies::reply_storage, config::DebugConfig};
use nym_task::{ShutdownManager, ShutdownToken, ShutdownTracker};
pub fn setup_empty_reply_surb_backend(debug_config: &DebugConfig) -> reply_storage::Empty {
reply_storage::Empty {
@@ -15,49 +13,3 @@ pub fn setup_empty_reply_surb_backend(debug_config: &DebugConfig) -> reply_stora
.maximum_reply_surb_storage_threshold,
}
}
// old 'TaskHandle'
pub(crate) enum ShutdownHelper {
Internal(ShutdownManager),
External(ShutdownTracker),
}
fn new_shutdown_manager() -> Result<ShutdownManager, ClientCoreError> {
cfg_if::cfg_if! {
if #[cfg(not(target_arch = "wasm32"))] {
Ok(ShutdownManager::build_new_default()?)
} else {
Ok(ShutdownManager::new_without_signals())
}
}
}
impl ShutdownHelper {
pub(crate) fn new(shutdown_tracker: Option<ShutdownTracker>) -> Result<Self, ClientCoreError> {
match shutdown_tracker {
None => Ok(ShutdownHelper::Internal(new_shutdown_manager()?)),
Some(shutdown_tracker) => Ok(ShutdownHelper::External(shutdown_tracker)),
}
}
pub(crate) fn into_internal(self) -> Option<ShutdownManager> {
match self {
ShutdownHelper::Internal(manager) => Some(manager),
ShutdownHelper::External(_) => None,
}
}
pub(crate) fn shutdown_token(&self) -> ShutdownToken {
match self {
ShutdownHelper::External(shutdown) => shutdown.clone_shutdown_token(),
ShutdownHelper::Internal(shutdown) => shutdown.clone_shutdown_token(),
}
}
pub(crate) fn tracker(&self) -> &ShutdownTracker {
match self {
ShutdownHelper::External(shutdown) => shutdown,
ShutdownHelper::Internal(shutdown) => shutdown.shutdown_tracker(),
}
}
}
@@ -4,7 +4,6 @@
use super::mix_traffic::ClientRequestSender;
use super::received_buffer::ReceivedBufferMessage;
use super::statistics_control::StatisticsControl;
use crate::client::base_client::helpers::ShutdownHelper;
use crate::client::base_client::storage::helpers::store_client_keys;
use crate::client::base_client::storage::MixnetClientStorage;
use crate::client::cover_traffic_stream::LoopCoverTrafficStream;
@@ -53,7 +52,7 @@ use nym_sphinx::receiver::{ReconstructedMessage, SphinxMessageReceiver};
use nym_statistics_common::clients::ClientStatsSender;
use nym_statistics_common::generate_client_stats_id;
use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender, LaneQueueLengths};
use nym_task::{ShutdownManager, ShutdownTracker};
use nym_task::ShutdownTracker;
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::HardcodedTopologyProvider;
use nym_validator_client::nym_api::NymApiClientExt;
@@ -881,8 +880,12 @@ where
let shared_topology_accessor =
TopologyAccessor::new(self.config.debug.topology.ignore_egress_epoch_role);
// Shutdown notifier for signalling tasks to stop
let shutdown = ShutdownHelper::new(self.shutdown)?;
// Create a shutdown tracker for this client - either as a child of provided tracker
// or get one from the registry
let shutdown_tracker = match self.shutdown {
Some(parent_tracker) => parent_tracker.child_tracker(),
None => nym_task::get_sdk_shutdown_tracker()?,
};
// channels responsible for dealing with reply-related fun
let (reply_controller_sender, reply_controller_receiver) =
@@ -914,7 +917,7 @@ where
self.user_agent.clone(),
generate_client_stats_id(*self_address.identity()),
input_sender.clone(),
shutdown.tracker(),
&shutdown_tracker.child_tracker(),
);
// needs to be started as the first thing to block if required waiting for the gateway
@@ -924,14 +927,14 @@ where
shared_topology_accessor.clone(),
self_address.gateway(),
self.wait_for_gateway,
shutdown.tracker(),
&shutdown_tracker.child_tracker(),
)
.await?;
let gateway_packet_router = PacketRouter::new(
ack_sender,
mixnet_messages_sender,
shutdown.shutdown_token(),
shutdown_tracker.clone_shutdown_token(),
);
let gateway_transceiver = Self::setup_gateway_transceiver(
@@ -944,7 +947,7 @@ where
stats_reporter.clone(),
#[cfg(unix)]
self.connection_fd_callback,
shutdown.tracker(),
&shutdown_tracker.child_tracker(),
)
.await?;
let gateway_ws_fd = gateway_transceiver.ws_fd();
@@ -952,7 +955,7 @@ where
let reply_storage = Self::setup_persistent_reply_storage(
reply_storage_backend,
key_rotation_config,
shutdown.tracker(),
&shutdown_tracker.child_tracker(),
)
.await?;
@@ -963,7 +966,7 @@ where
reply_storage.key_storage(),
reply_controller_sender.clone(),
stats_reporter.clone(),
shutdown.tracker(),
&shutdown_tracker.child_tracker(),
);
// The message_sender is the transmitter for any component generating sphinx packets
@@ -971,8 +974,10 @@ where
// traffic stream.
// The MixTrafficController then sends the actual traffic
let (message_sender, client_request_sender) =
Self::start_mix_traffic_controller(gateway_transceiver, shutdown.tracker());
let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
gateway_transceiver,
&shutdown_tracker.child_tracker(),
);
// Channels that the websocket listener can use to signal downstream to the real traffic
// controller that connections are closed.
@@ -1001,7 +1006,7 @@ where
shared_lane_queue_lengths.clone(),
client_connection_rx,
stats_reporter.clone(),
shutdown.tracker(),
&shutdown_tracker.child_tracker(),
);
if !self
@@ -1017,7 +1022,7 @@ where
shared_topology_accessor.clone(),
message_sender,
stats_reporter.clone(),
shutdown.tracker(),
&shutdown_tracker.child_tracker(),
);
}
@@ -1045,7 +1050,7 @@ where
gateway_connection: GatewayConnection { gateway_ws_fd },
},
stats_reporter,
shutdown_handle: shutdown.into_internal(),
shutdown_handle: Some(shutdown_tracker), // The primary tracker for this client
client_request_sender,
forget_me: self.config.debug.forget_me,
remember_me: self.config.debug.remember_me,
@@ -1061,7 +1066,7 @@ pub struct BaseClient {
pub client_state: ClientState,
pub stats_reporter: ClientStatsSender,
pub client_request_sender: ClientRequestSender,
pub shutdown_handle: Option<ShutdownManager>,
pub shutdown_handle: Option<ShutdownTracker>,
pub forget_me: ForgetMe,
pub remember_me: RememberMe,
}
+4
View File
@@ -4,6 +4,7 @@
use crate::client::mix_traffic::transceiver::ErasedGatewayError;
use nym_crypto::asymmetric::ed25519::Ed25519RecoveryError;
use nym_gateway_client::error::GatewayClientError;
use nym_task::RegistryAccessError;
use nym_topology::node::RoutingNodeError;
use nym_topology::{NodeId, NymTopologyError};
use nym_validator_client::nym_api::error::NymAPIError;
@@ -242,6 +243,9 @@ pub enum ClientCoreError {
#[error("failed to select valid gateway due to incomputable latency")]
GatewaySelectionFailure { source: WeightedError },
#[error("Could not access task registry, {0}")]
RegistryAccess(#[from] RegistryAccessError),
}
impl From<tungstenite::Error> for ClientCoreError {
+13
View File
@@ -17,6 +17,12 @@ pub struct ShutdownToken {
inner: CancellationToken,
}
impl From<CancellationToken> for ShutdownToken {
fn from(inner: CancellationToken) -> Self {
ShutdownToken { inner }
}
}
impl ShutdownToken {
/// A drop in no-op replacement for `send_status_msg` for easier migration from [TaskClient](crate::TaskClient).
#[deprecated]
@@ -45,6 +51,13 @@ impl ShutdownToken {
&self.inner
}
/// Get an owned [CancellationToken](tokio_util::sync::CancellationToken) for public API use.
/// This is useful when you need to expose cancellation to SDK users without
/// exposing the internal ShutdownToken type.
pub fn to_cancellation_token(&self) -> CancellationToken {
self.inner.clone()
}
/// Creates a `ShutdownToken` which will get cancelled whenever the
/// current token gets cancelled. Unlike a cloned `ShutdownToken`,
/// cancelling a child token does not cancel the parent token.
+36
View File
@@ -314,4 +314,40 @@ impl ShutdownTracker {
pub fn clone_shutdown_token(&self) -> ShutdownToken {
self.root_cancellation_token.clone()
}
/// Create a child ShutdownTracker that inherits cancellation from this tracker
/// but has its own TaskTracker for managing sub-tasks.
///
/// This enables hierarchical task management where:
/// - Parent cancellation flows to all children
/// - Each level tracks its own tasks independently
/// - Components can wait for their specific sub-tasks to complete
pub fn child_tracker(&self) -> ShutdownTracker {
// Child token inherits cancellation from parent
let child_token = self.root_cancellation_token.child_token();
// New TaskTracker for this level's tasks
let child_task_tracker = TaskTracker::new();
ShutdownTracker {
root_cancellation_token: child_token,
tracker: child_task_tracker,
}
}
/// Convenience method to perform a complete shutdown sequence.
/// This method:
/// 1. Signals cancellation to all tasks
/// 2. Closes the tracker to prevent new tasks
/// 3. Waits for all existing tasks to complete
pub async fn shutdown(self) {
// Signal cancellation to all tasks
self.root_cancellation_token.cancel();
// Close the tracker to prevent new tasks from being spawned
self.tracker.close();
// Wait for all existing tasks to complete
self.tracker.wait().await;
}
}
+9
View File
@@ -5,6 +5,7 @@ pub mod cancellation;
pub mod connections;
pub mod event;
pub mod manager;
pub(crate) mod runtime_registry;
#[cfg(not(target_arch = "wasm32"))]
pub mod signal;
pub mod spawn;
@@ -18,3 +19,11 @@ pub use tokio_util::task::TaskTracker;
#[cfg(not(target_arch = "wasm32"))]
pub use signal::{wait_for_signal, wait_for_signal_and_error};
pub use crate::runtime_registry::RegistryAccessError;
/// Get or create a ShutdownTracker for SDK use.
/// This provides automatic task management without requiring manual setup.
pub fn get_sdk_shutdown_tracker() -> Result<ShutdownTracker, RegistryAccessError> {
Ok(runtime_registry::RuntimeRegistry::get_or_create_sdk()?.shutdown_tracker_owned())
}
+96
View File
@@ -0,0 +1,96 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use thiserror::Error;
use crate::ShutdownManager;
use std::sync::RwLock;
use std::sync::{Arc, LazyLock};
/// Global registry that manages ShutdownManagers transparently.
/// This allows SDK components to get automatic task management without
/// exposing the complexity to end users.
pub(crate) struct RuntimeRegistry {
// For SDK clients: auto-created manager without signal handling
sdk_manager: RwLock<Option<Arc<ShutdownManager>>>,
}
#[derive(Debug, Error)]
pub enum RegistryAccessError {
#[error("the runtime registry is poisoned")]
Poisoned,
}
impl RuntimeRegistry {
/// Get or create a ShutdownManager for SDK use.
/// This manager doesn't listen to OS signals, making it suitable for library use.
pub(crate) fn get_or_create_sdk() -> Result<Arc<ShutdownManager>, RegistryAccessError> {
let guard = REGISTRY
.sdk_manager
.read()
.map_err(|_| RegistryAccessError::Poisoned)?;
if let Some(manager) = guard.as_ref() {
return Ok(manager.clone());
}
drop(guard);
let mut guard = REGISTRY
.sdk_manager
.write()
.map_err(|_| RegistryAccessError::Poisoned)?;
Ok(guard
.get_or_insert_with(|| Arc::new(ShutdownManager::new_without_signals()))
.clone())
}
/// Check if an SDK manager has been created.
/// Useful for testing and debugging.
#[allow(dead_code)]
pub(crate) fn has_sdk_manager() -> Result<bool, RegistryAccessError> {
Ok(REGISTRY
.sdk_manager
.read()
.map_err(|_| RegistryAccessError::Poisoned)?
.is_some())
}
/// Clear the SDK manager.
/// This is primarily for testing to ensure isolation between tests.
#[cfg(test)]
pub(crate) async fn clear() -> Result<(), RegistryAccessError> {
*REGISTRY
.sdk_manager
.write()
.map_err(|_| RegistryAccessError::Poisoned)? = None;
Ok(())
}
}
/// Global instance of the runtime registry.
/// Uses LazyLock for on-demand initialization.
static REGISTRY: LazyLock<RuntimeRegistry> = LazyLock::new(|| RuntimeRegistry {
sdk_manager: RwLock::new(None),
});
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_get_or_create_sdk() {
// Clear any existing manager
let _ = RuntimeRegistry::clear().await;
assert!(!RuntimeRegistry::has_sdk_manager().unwrap());
let manager1 = RuntimeRegistry::get_or_create_sdk().unwrap();
assert!(RuntimeRegistry::has_sdk_manager().unwrap());
let manager2 = RuntimeRegistry::get_or_create_sdk().unwrap();
// Should return the same instance
assert!(Arc::ptr_eq(&manager1, &manager2));
let _ = RuntimeRegistry::clear().await;
assert!(!RuntimeRegistry::has_sdk_manager().unwrap());
}
}
+3
View File
@@ -96,6 +96,9 @@ pub enum Error {
#[error(transparent)]
Bincode(#[from] bincode::Error),
#[error("Failed to get shutdown tracker from the task runtime registry: {0}")]
RegistryAccess(#[from] nym_task::RegistryAccessError),
}
impl Error {
+18 -6
View File
@@ -37,6 +37,7 @@ use std::path::Path;
use std::path::PathBuf;
#[cfg(unix)]
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use url::Url;
use zeroize::Zeroizing;
@@ -118,6 +119,11 @@ where
<S::KeyStore as KeyStore>::StorageError: Send + Sync,
<S::GatewaysDetailsStore as GatewaysDetailsStore>::StorageError: Send + Sync,
{
pub fn with_shutdown_token(self, token: CancellationToken) -> Self {
let shutdown_tracker = ShutdownTracker::new_from_external_shutdown_token(token.into());
self.custom_shutdown(shutdown_tracker)
}
/// Creates a client builder with the provided client storage implementation.
#[must_use]
pub fn new_with_storage(storage: S) -> MixnetClientBuilder<S> {
@@ -686,9 +692,15 @@ where
base_builder = base_builder.with_topology_provider(topology_provider);
}
if let Some(custom_shutdown) = self.custom_shutdown {
base_builder = base_builder.with_shutdown(custom_shutdown)
}
// Use custom shutdown if provided, otherwise get from registry
let shutdown_tracker = match self.custom_shutdown {
Some(custom) => custom,
None => {
// Auto-create from registry for SDK use
nym_task::get_sdk_shutdown_tracker()?
}
};
base_builder = base_builder.with_shutdown(shutdown_tracker);
if let Some(gateway_transceiver) = self.custom_gateway_transceiver {
base_builder = base_builder.with_gateway_transceiver(gateway_transceiver);
@@ -740,7 +752,7 @@ where
let (mut started_client, nym_address) = self.connect_to_mixnet_common().await?;
// TODO: more graceful handling here, surely both variants should work... I think?
let Some(task_manager) = started_client.shutdown_handle else {
let Some(tracker) = started_client.shutdown_handle else {
return Err(Error::new_unsupported(
"connecting with socks5 is currently unsupported with custom shutdown",
));
@@ -757,14 +769,14 @@ where
client_output,
client_state.clone(),
nym_address,
task_manager.shutdown_tracker_owned(),
tracker.child_tracker(),
packet_type,
);
Ok(Socks5MixnetClient {
nym_address,
client_state,
task_handle: task_manager,
task_handle: tracker,
socks5_config,
})
}
+6 -8
View File
@@ -17,10 +17,8 @@ use nym_gateway_requests::ClientRequest;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::{params::PacketType, receiver::ReconstructedMessage};
use nym_statistics_common::clients::{ClientStatsEvents, ClientStatsSender};
use nym_task::{
connections::{ConnectionCommandSender, LaneQueueLengths},
ShutdownManager,
};
use nym_task::connections::{ConnectionCommandSender, LaneQueueLengths};
use nym_task::ShutdownTracker;
use nym_topology::{NymRouteProvider, NymTopology};
use std::pin::Pin;
use std::sync::Arc;
@@ -54,7 +52,7 @@ pub struct MixnetClient {
pub(crate) stats_events_reporter: ClientStatsSender,
/// The task manager that controls all the spawned tasks that the clients uses to do it's job.
pub(crate) shutdown_handle: Option<ShutdownManager>,
pub(crate) shutdown_handle: Option<ShutdownTracker>,
pub(crate) packet_type: Option<PacketType>,
// internal state used for the `Stream` implementation
@@ -74,7 +72,7 @@ impl MixnetClient {
client_state: ClientState,
reconstructed_receiver: ReconstructedMessagesReceiver,
stats_events_reporter: ClientStatsSender,
task_handle: Option<ShutdownManager>,
task_handle: Option<ShutdownTracker>,
packet_type: Option<PacketType>,
client_request_sender: ClientRequestSender,
forget_me: ForgetMe,
@@ -240,8 +238,8 @@ impl MixnetClient {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}
if let Some(mut task_manager) = self.shutdown_handle {
task_manager.perform_shutdown().await;
if let Some(tracker) = self.shutdown_handle {
tracker.shutdown().await;
}
}
+5 -4
View File
@@ -1,7 +1,8 @@
use nym_client_core::client::base_client::ClientState;
use nym_socks5_client_core::config::Socks5;
use nym_sphinx::addressing::clients::Recipient;
use nym_task::{connections::LaneQueueLengths, ShutdownManager};
use nym_task::connections::LaneQueueLengths;
use nym_task::ShutdownTracker;
use nym_topology::NymTopology;
@@ -18,7 +19,7 @@ pub struct Socks5MixnetClient {
pub(crate) client_state: ClientState,
/// The task manager that controls all the spawned tasks that the clients uses to do it's job.
pub(crate) task_handle: ShutdownManager,
pub(crate) task_handle: ShutdownTracker,
/// SOCKS5 configuration parameters.
pub(crate) socks5_config: Socks5,
@@ -81,7 +82,7 @@ impl Socks5MixnetClient {
/// Disconnect from the mixnet. Currently it is not supported to reconnect a disconnected
/// client.
pub async fn disconnect(mut self) {
self.task_handle.run_until_shutdown().await;
pub async fn disconnect(self) {
self.task_handle.shutdown().await;
}
}
+2 -2
View File
@@ -28,7 +28,7 @@ use wasm_client_core::helpers::{
add_gateway, generate_new_client_keys, parse_recipient, parse_sender_tag,
};
use wasm_client_core::nym_task::connections::TransmissionLane;
use wasm_client_core::nym_task::ShutdownManager;
use wasm_client_core::nym_task::ShutdownTracker;
use wasm_client_core::storage::core_client_traits::FullWasmClientStorage;
use wasm_client_core::storage::wasm_client_traits::WasmClientStorage;
use wasm_client_core::storage::ClientStorage;
@@ -59,7 +59,7 @@ pub struct NymClient {
// even though we don't use graceful shutdowns, other components rely on existence of this struct
// and if it's dropped, everything will start going offline
_task_manager: ShutdownManager,
_task_manager: ShutdownTracker,
packet_type: PacketType,
}
+6 -6
View File
@@ -20,7 +20,7 @@ use wasm_client_core::client::base_client::{BaseClientBuilder, ClientInput, Clie
use wasm_client_core::client::inbound_messages::InputMessage;
use wasm_client_core::helpers::{add_gateway, generate_new_client_keys};
use wasm_client_core::nym_task::connections::TransmissionLane;
use wasm_client_core::nym_task::ShutdownManager;
use wasm_client_core::nym_task::ShutdownTracker;
use wasm_client_core::storage::core_client_traits::FullWasmClientStorage;
use wasm_client_core::storage::wasm_client_traits::WasmClientStorage;
use wasm_client_core::storage::ClientStorage;
@@ -41,7 +41,7 @@ pub struct MixFetchClient {
requests: ActiveRequests,
// this has to be guarded by a mutex to be able to disconnect with an immutable reference
_shutdown_manager: Mutex<ShutdownManager>,
_shutdown_manager: Mutex<ShutdownTracker>,
}
#[wasm_bindgen]
@@ -233,11 +233,11 @@ impl MixFetchClient {
self.invalidated.store(true, Ordering::Relaxed);
console_log!("sending shutdown signal");
let mut shutdown_guard = self._shutdown_manager.lock().await;
shutdown_guard.send_cancellation();
let shutdown_guard = self._shutdown_manager.lock().await;
shutdown_guard.clone_shutdown_token().cancel();
shutdown_guard.close_tracker();
console_log!("waiting for shutdown to complete");
shutdown_guard.run_until_shutdown().await;
shutdown_guard.wait_for_tracker().await;
self.requests.invalidate_all().await;