Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 351acb7875 | |||
| 7f4ef7f772 | |||
| 5025c49a0e |
@@ -341,8 +341,14 @@ where
|
||||
debug_config.cover_traffic,
|
||||
stats_tx,
|
||||
);
|
||||
shutdown_tracker
|
||||
.try_spawn_named_with_shutdown(async move { stream.run().await }, "CoverTrafficStream");
|
||||
let drop_guard = shutdown_tracker.clone_shutdown_token().drop_guard();
|
||||
shutdown_tracker.try_spawn_named_with_shutdown(
|
||||
async move {
|
||||
let _ = drop_guard;
|
||||
stream.run().await
|
||||
},
|
||||
"CoverTrafficStream",
|
||||
);
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
@@ -419,8 +425,10 @@ where
|
||||
"AcknowledgementController::RetransmissionRequestListener",
|
||||
);
|
||||
|
||||
let drop_guard = shutdown_tracker.clone_shutdown_token().drop_guard();
|
||||
shutdown_tracker.try_spawn_named_with_shutdown(
|
||||
async move {
|
||||
let _ = drop_guard;
|
||||
sent_notification_listener.run().await;
|
||||
},
|
||||
"AcknowledgementController::SentNotificationListener",
|
||||
@@ -431,8 +439,6 @@ where
|
||||
async move { ack_action_controller.run(shutdown_token).await },
|
||||
"AcknowledgementController::ActionController",
|
||||
);
|
||||
|
||||
// .start(packet_type);
|
||||
}
|
||||
|
||||
// buffer controlling all messages fetched from provider
|
||||
@@ -705,8 +711,12 @@ where
|
||||
// don't spawn the refresher if we don't want to be refreshing the topology.
|
||||
// only use the initial values obtained
|
||||
info!("Starting topology refresher...");
|
||||
let drop_guard = shutdown_tracker.clone_shutdown_token().drop_guard();
|
||||
shutdown_tracker.try_spawn_named_with_shutdown(
|
||||
async move { topology_refresher.run().await },
|
||||
async move {
|
||||
let _ = drop_guard;
|
||||
topology_refresher.run().await
|
||||
},
|
||||
"TopologyRefresher",
|
||||
);
|
||||
}
|
||||
@@ -892,7 +902,7 @@ where
|
||||
// Create a shutdown tracker for this client - either as a child of provided tracker
|
||||
// or get one from the registry
|
||||
let shutdown_tracker = match self.shutdown {
|
||||
Some(parent_tracker) => parent_tracker.child_tracker(),
|
||||
Some(parent_tracker) => parent_tracker.clone(),
|
||||
None => nym_task::get_sdk_shutdown_tracker()?,
|
||||
};
|
||||
|
||||
@@ -926,7 +936,7 @@ where
|
||||
self.user_agent.clone(),
|
||||
generate_client_stats_id(*self_address.identity()),
|
||||
input_sender.clone(),
|
||||
&shutdown_tracker.child_tracker(),
|
||||
&shutdown_tracker,
|
||||
);
|
||||
|
||||
// needs to be started as the first thing to block if required waiting for the gateway
|
||||
@@ -936,7 +946,7 @@ where
|
||||
shared_topology_accessor.clone(),
|
||||
self_address.gateway(),
|
||||
self.wait_for_gateway,
|
||||
&shutdown_tracker.child_tracker(),
|
||||
&shutdown_tracker,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -956,7 +966,7 @@ where
|
||||
stats_reporter.clone(),
|
||||
#[cfg(unix)]
|
||||
self.connection_fd_callback,
|
||||
&shutdown_tracker.child_tracker(),
|
||||
&shutdown_tracker,
|
||||
)
|
||||
.await?;
|
||||
let gateway_ws_fd = gateway_transceiver.ws_fd();
|
||||
@@ -964,7 +974,7 @@ where
|
||||
let reply_storage = Self::setup_persistent_reply_storage(
|
||||
reply_storage_backend,
|
||||
key_rotation_config,
|
||||
&shutdown_tracker.child_tracker(),
|
||||
&shutdown_tracker,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -975,7 +985,7 @@ where
|
||||
reply_storage.key_storage(),
|
||||
reply_controller_sender.clone(),
|
||||
stats_reporter.clone(),
|
||||
&shutdown_tracker.child_tracker(),
|
||||
&shutdown_tracker,
|
||||
);
|
||||
|
||||
// The message_sender is the transmitter for any component generating sphinx packets
|
||||
@@ -983,10 +993,8 @@ where
|
||||
// traffic stream.
|
||||
// The MixTrafficController then sends the actual traffic
|
||||
|
||||
let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
|
||||
gateway_transceiver,
|
||||
&shutdown_tracker.child_tracker(),
|
||||
);
|
||||
let (message_sender, client_request_sender) =
|
||||
Self::start_mix_traffic_controller(gateway_transceiver, &shutdown_tracker);
|
||||
|
||||
// Channels that the websocket listener can use to signal downstream to the real traffic
|
||||
// controller that connections are closed.
|
||||
@@ -1015,7 +1023,7 @@ where
|
||||
shared_lane_queue_lengths.clone(),
|
||||
client_connection_rx,
|
||||
stats_reporter.clone(),
|
||||
&shutdown_tracker.child_tracker(),
|
||||
&shutdown_tracker,
|
||||
);
|
||||
|
||||
if !self
|
||||
@@ -1031,7 +1039,7 @@ where
|
||||
shared_topology_accessor.clone(),
|
||||
message_sender,
|
||||
stats_reporter.clone(),
|
||||
&shutdown_tracker.child_tracker(),
|
||||
&shutdown_tracker,
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -138,6 +138,7 @@ impl MixTrafficController {
|
||||
|
||||
pub async fn run(&mut self) {
|
||||
debug!("Started MixTrafficController with graceful shutdown support");
|
||||
let _drop_guard = self.shutdown_token.clone().drop_guard();
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
+1
@@ -80,6 +80,7 @@ impl AcknowledgementListener {
|
||||
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
|
||||
debug!("Started AcknowledgementListener with graceful shutdown support");
|
||||
|
||||
let _drop_guard = shutdown_token.clone().drop_guard();
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
+1
@@ -245,6 +245,7 @@ impl ActionController {
|
||||
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
|
||||
debug!("Started ActionController with graceful shutdown support");
|
||||
|
||||
let _drop_guard = shutdown_token.clone().drop_guard();
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
+1
@@ -216,6 +216,7 @@ where
|
||||
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
|
||||
debug!("Started InputMessageListener with graceful shutdown support");
|
||||
|
||||
let _drop_guard = shutdown_token.clone().drop_guard();
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
+1
@@ -167,6 +167,7 @@ where
|
||||
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
|
||||
debug!("Started RetransmissionRequestListener with graceful shutdown support");
|
||||
|
||||
let _drop_guard = shutdown_token.clone().drop_guard();
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
@@ -585,6 +585,7 @@ where
|
||||
|
||||
// avoid borrow on self
|
||||
let shutdown_token = self.shutdown_token.clone();
|
||||
let _drop_guard = shutdown_token.clone().drop_guard();
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
{
|
||||
let mut status_timer = tokio::time::interval(Duration::from_secs(5));
|
||||
|
||||
@@ -497,6 +497,8 @@ impl<R: MessageReceiver> RequestReceiver<R> {
|
||||
|
||||
pub(crate) async fn run(&mut self) {
|
||||
debug!("Started RequestReceiver with graceful shutdown support");
|
||||
|
||||
let _drop_guard = self.shutdown_token.clone().drop_guard();
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
@@ -540,6 +542,8 @@ impl<R: MessageReceiver> FragmentedMessageReceiver<R> {
|
||||
|
||||
pub(crate) async fn run(&mut self) -> Result<(), MessageRecoveryError> {
|
||||
debug!("Started FragmentedMessageReceiver with graceful shutdown support");
|
||||
|
||||
let _drop_guard = self.shutdown_token.clone().drop_guard();
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
@@ -152,6 +152,7 @@ where
|
||||
let polling_rate = self.config.key_rotation.epoch_duration / 8;
|
||||
let mut invalidation_inspection = new_interval_stream(polling_rate);
|
||||
|
||||
let _drop_guard = shutdown_token.clone().drop_guard();
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
@@ -119,6 +119,8 @@ impl StatisticsControl {
|
||||
let mut snapshot_interval =
|
||||
gloo_timers::future::IntervalStream::new(SNAPSHOT_INTERVAL.as_millis() as u32);
|
||||
|
||||
let _drop_guard = shutdown_token.clone().drop_guard();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
@@ -46,6 +46,7 @@ where
|
||||
|
||||
debug!("Started PersistentReplyStorage");
|
||||
if let Err(err) = self.backend.start_storage_session().await {
|
||||
shutdown.cancel();
|
||||
error!("failed to start the storage session - {err}");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -89,6 +89,8 @@ impl PartiallyDelegatedRouter {
|
||||
|
||||
async fn run(mut self, mut split_stream: SplitStream<WsConn>, shutdown_token: ShutdownToken) {
|
||||
let mut chunked_stream = (&mut split_stream).ready_chunks(8);
|
||||
|
||||
let drop_guard = shutdown_token.clone().drop_guard();
|
||||
let ret: Result<_, GatewayClientError> = loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
@@ -101,6 +103,7 @@ impl PartiallyDelegatedRouter {
|
||||
// received request to stop the task and return the stream
|
||||
_ = &mut self.stream_return_requester => {
|
||||
log::debug!("received request to return the split ws stream");
|
||||
drop_guard.disarm();
|
||||
break Ok(())
|
||||
}
|
||||
socket_msgs = chunked_stream.next() => {
|
||||
|
||||
@@ -759,7 +759,7 @@ where
|
||||
client_output,
|
||||
client_state.clone(),
|
||||
nym_address,
|
||||
started_client.shutdown_handle.child_tracker(),
|
||||
started_client.shutdown_handle.clone(),
|
||||
packet_type,
|
||||
);
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use tokio::sync::RwLockReadGuard;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tokio_util::sync::WaitForCancellationFutureOwned;
|
||||
|
||||
/// Client connected to the Nym mixnet.
|
||||
pub struct MixnetClient {
|
||||
@@ -273,6 +274,12 @@ impl MixnetClient {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cancelled(&self) -> WaitForCancellationFutureOwned {
|
||||
self.shutdown_handle
|
||||
.clone_shutdown_token()
|
||||
.cancelled_owned()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
||||
Reference in New Issue
Block a user