Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 81e72ba61b | |||
| 0e3ffb749e | |||
| bd2e6ee812 |
Generated
-1
@@ -5746,7 +5746,6 @@ dependencies = [
|
||||
"criterion",
|
||||
"ff",
|
||||
"group",
|
||||
"lazy_static",
|
||||
"nym-contracts-common",
|
||||
"nym-pemstore",
|
||||
"rand 0.8.5",
|
||||
|
||||
@@ -892,7 +892,7 @@ where
|
||||
// Create a shutdown tracker for this client - either as a child of provided tracker
|
||||
// or get one from the registry
|
||||
let shutdown_tracker = match self.shutdown {
|
||||
Some(parent_tracker) => parent_tracker.child_tracker(),
|
||||
Some(parent_tracker) => parent_tracker.clone(),
|
||||
None => nym_task::get_sdk_shutdown_tracker()?,
|
||||
};
|
||||
|
||||
@@ -926,7 +926,7 @@ where
|
||||
self.user_agent.clone(),
|
||||
generate_client_stats_id(*self_address.identity()),
|
||||
input_sender.clone(),
|
||||
&shutdown_tracker.child_tracker(),
|
||||
&shutdown_tracker.clone(),
|
||||
);
|
||||
|
||||
// needs to be started as the first thing to block if required waiting for the gateway
|
||||
@@ -936,7 +936,7 @@ where
|
||||
shared_topology_accessor.clone(),
|
||||
self_address.gateway(),
|
||||
self.wait_for_gateway,
|
||||
&shutdown_tracker.child_tracker(),
|
||||
&shutdown_tracker.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -956,7 +956,7 @@ where
|
||||
stats_reporter.clone(),
|
||||
#[cfg(unix)]
|
||||
self.connection_fd_callback,
|
||||
&shutdown_tracker.child_tracker(),
|
||||
&shutdown_tracker.clone(),
|
||||
)
|
||||
.await?;
|
||||
let gateway_ws_fd = gateway_transceiver.ws_fd();
|
||||
@@ -964,7 +964,7 @@ where
|
||||
let reply_storage = Self::setup_persistent_reply_storage(
|
||||
reply_storage_backend,
|
||||
key_rotation_config,
|
||||
&shutdown_tracker.child_tracker(),
|
||||
&shutdown_tracker.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -975,7 +975,7 @@ where
|
||||
reply_storage.key_storage(),
|
||||
reply_controller_sender.clone(),
|
||||
stats_reporter.clone(),
|
||||
&shutdown_tracker.child_tracker(),
|
||||
&shutdown_tracker.clone(),
|
||||
);
|
||||
|
||||
// The message_sender is the transmitter for any component generating sphinx packets
|
||||
@@ -983,10 +983,8 @@ where
|
||||
// traffic stream.
|
||||
// The MixTrafficController then sends the actual traffic
|
||||
|
||||
let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
|
||||
gateway_transceiver,
|
||||
&shutdown_tracker.child_tracker(),
|
||||
);
|
||||
let (message_sender, client_request_sender) =
|
||||
Self::start_mix_traffic_controller(gateway_transceiver, &shutdown_tracker.clone());
|
||||
|
||||
// Channels that the websocket listener can use to signal downstream to the real traffic
|
||||
// controller that connections are closed.
|
||||
@@ -1015,7 +1013,7 @@ where
|
||||
shared_lane_queue_lengths.clone(),
|
||||
client_connection_rx,
|
||||
stats_reporter.clone(),
|
||||
&shutdown_tracker.child_tracker(),
|
||||
&shutdown_tracker.clone(),
|
||||
);
|
||||
|
||||
if !self
|
||||
@@ -1031,7 +1029,7 @@ where
|
||||
shared_topology_accessor.clone(),
|
||||
message_sender,
|
||||
stats_reporter.clone(),
|
||||
&shutdown_tracker.child_tracker(),
|
||||
&shutdown_tracker.clone(),
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -155,6 +155,7 @@ impl MixTrafficController {
|
||||
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
|
||||
// Do we need to handle the embedded mixnet client case
|
||||
// separately?
|
||||
self.shutdown_token.cancel();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,12 +51,12 @@ impl AuthClientMixnetListener {
|
||||
}
|
||||
|
||||
async fn run(mut self) -> Self {
|
||||
let mixnet_cancel_token = self.mixnet_client.cancellation_token();
|
||||
self.shutdown_token.run_until_cancelled(async {
|
||||
let shutdown_event = self.mixnet_client.shutdown_event();
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = mixnet_cancel_token.cancelled() => {
|
||||
_ = shutdown_event.wait() => {
|
||||
tracing::debug!("AuthClientMixnetListener: mixnet client was shutdown");
|
||||
break;
|
||||
}
|
||||
@@ -100,9 +100,7 @@ impl AuthClientMixnetListener {
|
||||
|
||||
// Disconnects the mixnet client and effectively drop itself, since it doesn't work without one, and reconnecting isn't supported
|
||||
pub async fn disconnect_mixnet_client(self) {
|
||||
if !self.mixnet_client.cancellation_token().is_cancelled() {
|
||||
self.mixnet_client.disconnect().await;
|
||||
}
|
||||
self.mixnet_client.disconnect().await;
|
||||
}
|
||||
|
||||
pub fn start(self) -> AuthClientMixnetListenerHandle {
|
||||
@@ -110,14 +108,14 @@ impl AuthClientMixnetListener {
|
||||
let message_sender = self.input_message_tx.clone();
|
||||
// Allows stopping only this, e.g. if we don't need it in the new bandwidth controller
|
||||
let cancellation_token = self.shutdown_token.clone();
|
||||
let mixnet_cancellation_token = self.mixnet_client.cancellation_token();
|
||||
// let mixnet_cancellation_token = self.mixnet_client.cancellation_token();
|
||||
let handle = tokio::spawn(self.run());
|
||||
|
||||
AuthClientMixnetListenerHandle {
|
||||
message_broadcast,
|
||||
message_sender,
|
||||
cancellation_token,
|
||||
mixnet_cancellation_token,
|
||||
// mixnet_cancellation_token,
|
||||
handle,
|
||||
}
|
||||
}
|
||||
@@ -127,7 +125,7 @@ pub struct AuthClientMixnetListenerHandle {
|
||||
message_broadcast: MixnetMessageBroadcastSender,
|
||||
message_sender: MixnetMessageInputSender,
|
||||
cancellation_token: CancellationToken,
|
||||
mixnet_cancellation_token: CancellationToken,
|
||||
// mixnet_cancellation_token: CancellationToken,
|
||||
handle: JoinHandle<AuthClientMixnetListener>,
|
||||
}
|
||||
|
||||
@@ -140,9 +138,9 @@ impl AuthClientMixnetListenerHandle {
|
||||
self.message_broadcast.subscribe()
|
||||
}
|
||||
|
||||
pub fn mixnet_cancel_token(&self) -> CancellationToken {
|
||||
self.mixnet_cancellation_token.clone()
|
||||
}
|
||||
// pub fn mixnet_cancel_token(&self) -> CancellationToken {
|
||||
// self.mixnet_cancellation_token.clone()
|
||||
// }
|
||||
|
||||
pub async fn stop(self) {
|
||||
// If shutdown was externally called, that call is a no-op
|
||||
|
||||
@@ -133,8 +133,7 @@ impl IprClientConnect {
|
||||
|
||||
let timeout = sleep(IPR_CONNECT_TIMEOUT);
|
||||
tokio::pin!(timeout);
|
||||
|
||||
let mixnet_cancel_token = self.mixnet_client.cancellation_token();
|
||||
let shutdown_event = self.mixnet_client.shutdown_event();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
@@ -142,15 +141,14 @@ impl IprClientConnect {
|
||||
error!("Cancelled while waiting for reply to connect request");
|
||||
return Err(Error::Cancelled);
|
||||
},
|
||||
|
||||
_ = mixnet_cancel_token.cancelled() => {
|
||||
error!("Mixnet client stopped while waiting for reply to connect request");
|
||||
return Err(Error::Cancelled);
|
||||
},
|
||||
_ = &mut timeout => {
|
||||
error!("Timed out waiting for reply to connect request");
|
||||
return Err(Error::TimeoutWaitingForConnectResponse);
|
||||
},
|
||||
_ = shutdown_event.wait() => {
|
||||
error!("Mixnet client stopped while waiting for reply to connect request");
|
||||
return Err(Error::Cancelled);
|
||||
},
|
||||
msgs = self.mixnet_client.wait_for_messages() => match msgs {
|
||||
None => {
|
||||
return Err(Error::NoMixnetMessagesReceived);
|
||||
|
||||
@@ -770,7 +770,7 @@ where
|
||||
client_output,
|
||||
client_state.clone(),
|
||||
nym_address,
|
||||
started_client.shutdown_handle.child_tracker(),
|
||||
started_client.shutdown_handle.clone(),
|
||||
packet_type,
|
||||
);
|
||||
|
||||
|
||||
@@ -24,7 +24,6 @@ use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use tokio::sync::RwLockReadGuard;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
/// Client connected to the Nym mixnet.
|
||||
pub struct MixnetClient {
|
||||
@@ -124,9 +123,9 @@ impl MixnetClient {
|
||||
}
|
||||
|
||||
/// Get a child token of the root, to monitor unexpected shutdown, without causing one
|
||||
pub fn cancellation_token(&self) -> CancellationToken {
|
||||
self.shutdown_handle.child_shutdown_token().inner().clone()
|
||||
}
|
||||
// pub fn cancellation_token(&self) -> CancellationToken {
|
||||
// self.shutdown_handle.child_shutdown_token().inner().clone()
|
||||
// }
|
||||
|
||||
pub fn client_request_sender(&self) -> ClientRequestSender {
|
||||
self.client_request_sender.clone()
|
||||
@@ -199,6 +198,13 @@ impl MixnetClient {
|
||||
self.client_state.topology_accessor.release_manual_control()
|
||||
}
|
||||
|
||||
/// Returns a shutdown event handle that can be used for waiting for the client to shutdown.
|
||||
pub fn shutdown_event(&self) -> ShutdownEventHandle {
|
||||
ShutdownEventHandle {
|
||||
shutdown_handle: self.shutdown_handle.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait for messages from the mixnet
|
||||
pub async fn wait_for_messages(&mut self) -> Option<Vec<ReconstructedMessage>> {
|
||||
self.reconstructed_receiver.next().await
|
||||
@@ -340,3 +346,16 @@ impl MixnetMessageSender for MixnetClientSender {
|
||||
.map_err(|_| Error::MessageSendingFailure)
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle for waiting on the shutdown event of the mixnet client.
|
||||
pub struct ShutdownEventHandle {
|
||||
shutdown_handle: ShutdownTracker,
|
||||
}
|
||||
|
||||
impl ShutdownEventHandle {
|
||||
/// Returns once mixnet client has been shut down.
|
||||
/// If mixnet client is already shut down, returns immediately.
|
||||
pub async fn wait(&self) {
|
||||
self.shutdown_handle.wait_for_tracker().await
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user