Compare commits

...

3 Commits

Author SHA1 Message Date
Bogdan-Ștefan Neacşu 02af95c65e Inner select on on_messages 2025-01-31 17:02:50 +01:00
Bogdan-Ștefan Neacşu 85be73401e Add debug timeout on sink 2025-01-31 16:36:23 +01:00
Jon Häggblad 8385f719e2 Send shutdown instead of panic when reaching max fail (#5398)
* Send shutdown instead of panic when reaching max fail

* Stop quicker on failure

* Update comment
2025-01-31 13:42:01 +01:00
6 changed files with 58 additions and 21 deletions
@@ -2,10 +2,12 @@
// SPDX-License-Identifier: Apache-2.0
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
use crate::error::ClientCoreError;
use crate::{spawn_future, ForgetMe};
use log::*;
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
use transceiver::ErasedGatewayError;
pub type BatchMixMessageSender = tokio::sync::mpsc::Sender<Vec<MixPacket>>;
pub type BatchMixMessageReceiver = tokio::sync::mpsc::Receiver<Vec<MixPacket>>;
@@ -68,7 +70,10 @@ impl MixTrafficController {
)
}
async fn on_messages(&mut self, mut mix_packets: Vec<MixPacket>) {
async fn on_messages(
&mut self,
mut mix_packets: Vec<MixPacket>,
) -> Result<(), ErasedGatewayError> {
debug_assert!(!mix_packets.is_empty());
let result = if mix_packets.len() == 1 {
@@ -80,21 +85,14 @@ impl MixTrafficController {
.await
};
match result {
Err(err) => {
error!("Failed to send sphinx packet(s) to the gateway: {err}");
self.consecutive_gateway_failure_count += 1;
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
// todo: in the future this should initiate a 'graceful' shutdown or try
// to reconnect?
panic!("failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead. Can't do anything about it yet :(")
}
}
Ok(_) => {
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
}
if result.is_err() {
self.consecutive_gateway_failure_count += 1;
} else {
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
}
result
}
pub fn start_with_shutdown(mut self, mut shutdown: nym_task::TaskClient) {
@@ -105,7 +103,26 @@ impl MixTrafficController {
tokio::select! {
mix_packets = self.mix_rx.recv() => match mix_packets {
Some(mix_packets) => {
self.on_messages(mix_packets).await;
tokio::select! {
ret = self.on_messages(mix_packets) => {
if let Err(err) = ret {
error!("Failed to send sphinx packet(s) to the gateway: {err}");
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
// Disconnect from the gateway. If we should try to re-connect
// is handled at a higher layer.
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
// Do we need to handle the embedded mixnet client case
// separately?
shutdown.send_we_stopped(Box::new(ClientCoreError::GatewayFailedToForwardMessages));
break;
}
}
}
_ = shutdown.recv_with_delay() => {
log::trace!("MixTrafficController: Received shutdown");
break;
}
}
},
None => {
log::trace!("MixTrafficController: Stopping since channel closed");
@@ -626,9 +626,14 @@ where
messages: Vec<RealMessage>,
transmission_lane: TransmissionLane,
) {
self.real_message_sender
if let Err(err) = self
.real_message_sender
.send((messages, transmission_lane))
.await
.expect("real message receiver task (OutQueueControl) has died");
{
error!(
"Failed to forward messages to the real message sender (OutQueueControl): {err}"
);
}
}
}
@@ -545,7 +545,7 @@ where
loop {
tokio::select! {
biased;
_ = shutdown.recv_with_delay() => {
_ = shutdown.recv() => {
log::trace!("OutQueueControl: Received shutdown");
break;
}
+3
View File
@@ -96,6 +96,9 @@ pub enum ClientCoreError {
#[error("timed out while trying to establish gateway connection")]
GatewayConnectionTimeout,
#[error("failed to forward mix messages to gateway")]
GatewayFailedToForwardMessages,
#[error("no ping measurements for the gateway ({identity}) performed")]
NoGatewayMeasurements { identity: String },
@@ -118,6 +118,9 @@ pub enum GatewayClientError {
"this operation couldn't be completed as the program is in the process of shutting down"
)]
ShutdownInProgress,
#[error("Timed out on sending message(s)")]
TimeoutOnSink,
}
impl GatewayClientError {
@@ -291,7 +291,10 @@ impl PartiallyDelegatedHandle {
&mut self,
msg: Message,
) -> Result<(), GatewayClientError> {
Ok(self.sink_half.send(msg).await?)
tokio::time::timeout(std::time::Duration::from_secs(1), self.sink_half.send(msg))
.await
.map_err(|_| GatewayClientError::TimeoutOnSink)??;
Ok(())
}
pub(crate) async fn batch_send_without_response(
@@ -300,7 +303,13 @@ impl PartiallyDelegatedHandle {
) -> Result<(), GatewayClientError> {
let stream_messages: Vec<_> = messages.into_iter().map(Ok).collect();
let mut send_stream = futures::stream::iter(stream_messages);
Ok(self.sink_half.send_all(&mut send_stream).await?)
tokio::time::timeout(
std::time::Duration::from_secs(1),
self.sink_half.send_all(&mut send_stream),
)
.await
.map_err(|_| GatewayClientError::TimeoutOnSink)??;
Ok(())
}
pub(crate) async fn merge(self) -> Result<WsConn, GatewayClientError> {