Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1e5f249275 | |||
| 77c5c8f0ca | |||
| 424e94a04e |
@@ -2,10 +2,12 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
|
||||
use crate::error::ClientCoreError;
|
||||
use crate::{spawn_future, ForgetMe};
|
||||
use log::*;
|
||||
use nym_gateway_requests::ClientRequest;
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use transceiver::ErasedGatewayError;
|
||||
|
||||
pub type BatchMixMessageSender = tokio::sync::mpsc::Sender<Vec<MixPacket>>;
|
||||
pub type BatchMixMessageReceiver = tokio::sync::mpsc::Receiver<Vec<MixPacket>>;
|
||||
@@ -68,7 +70,10 @@ impl MixTrafficController {
|
||||
)
|
||||
}
|
||||
|
||||
async fn on_messages(&mut self, mut mix_packets: Vec<MixPacket>) {
|
||||
async fn on_messages(
|
||||
&mut self,
|
||||
mut mix_packets: Vec<MixPacket>,
|
||||
) -> Result<(), ErasedGatewayError> {
|
||||
debug_assert!(!mix_packets.is_empty());
|
||||
|
||||
let result = if mix_packets.len() == 1 {
|
||||
@@ -80,21 +85,14 @@ impl MixTrafficController {
|
||||
.await
|
||||
};
|
||||
|
||||
match result {
|
||||
Err(err) => {
|
||||
error!("Failed to send sphinx packet(s) to the gateway: {err}");
|
||||
self.consecutive_gateway_failure_count += 1;
|
||||
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
|
||||
// todo: in the future this should initiate a 'graceful' shutdown or try
|
||||
// to reconnect?
|
||||
panic!("failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead. Can't do anything about it yet :(")
|
||||
}
|
||||
}
|
||||
Ok(_) => {
|
||||
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
|
||||
self.consecutive_gateway_failure_count = 0;
|
||||
}
|
||||
if result.is_err() {
|
||||
self.consecutive_gateway_failure_count += 1;
|
||||
} else {
|
||||
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
|
||||
self.consecutive_gateway_failure_count = 0;
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
pub fn start_with_shutdown(mut self, mut shutdown: nym_task::TaskClient) {
|
||||
@@ -105,7 +103,18 @@ impl MixTrafficController {
|
||||
tokio::select! {
|
||||
mix_packets = self.mix_rx.recv() => match mix_packets {
|
||||
Some(mix_packets) => {
|
||||
self.on_messages(mix_packets).await;
|
||||
if let Err(err) = self.on_messages(mix_packets).await {
|
||||
error!("Failed to send sphinx packet(s) to the gateway: {err}");
|
||||
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
|
||||
// Disconnect from the gateway. If we should try to re-connect
|
||||
// is handled at a higher layer.
|
||||
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
|
||||
// Do we need to handle the embedded mixnet client case
|
||||
// separately?
|
||||
shutdown.send_we_stopped(Box::new(ClientCoreError::GatewayFailedToForwardMessages));
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
None => {
|
||||
log::trace!("MixTrafficController: Stopping since channel closed");
|
||||
|
||||
@@ -626,9 +626,14 @@ where
|
||||
messages: Vec<RealMessage>,
|
||||
transmission_lane: TransmissionLane,
|
||||
) {
|
||||
self.real_message_sender
|
||||
if let Err(err) = self
|
||||
.real_message_sender
|
||||
.send((messages, transmission_lane))
|
||||
.await
|
||||
.expect("real message receiver task (OutQueueControl) has died");
|
||||
{
|
||||
error!(
|
||||
"Failed to forward messages to the real message sender (OutQueueControl): {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -545,7 +545,7 @@ where
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.recv_with_delay() => {
|
||||
_ = shutdown.recv() => {
|
||||
log::trace!("OutQueueControl: Received shutdown");
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -123,6 +123,11 @@ impl StatisticsControl {
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = task_client.recv() => {
|
||||
log::trace!("StatisticsControl: Received shutdown");
|
||||
break;
|
||||
},
|
||||
stats_event = self.stats_rx.recv() => match stats_event {
|
||||
Some(stats_event) => self.stats.handle_event(stats_event),
|
||||
None => {
|
||||
@@ -146,13 +151,8 @@ impl StatisticsControl {
|
||||
_ = local_report_interval.next() => {
|
||||
self.stats.local_report(&mut task_client);
|
||||
}
|
||||
_ = task_client.recv_with_delay() => {
|
||||
log::trace!("StatisticsControl: Received shutdown");
|
||||
break;
|
||||
},
|
||||
}
|
||||
}
|
||||
task_client.recv_timeout().await;
|
||||
log::debug!("StatisticsControl: Exiting");
|
||||
}
|
||||
|
||||
|
||||
@@ -96,6 +96,9 @@ pub enum ClientCoreError {
|
||||
#[error("timed out while trying to establish gateway connection")]
|
||||
GatewayConnectionTimeout,
|
||||
|
||||
#[error("failed to forward mix messages to gateway")]
|
||||
GatewayFailedToForwardMessages,
|
||||
|
||||
#[error("no ping measurements for the gateway ({identity}) performed")]
|
||||
NoGatewayMeasurements { identity: String },
|
||||
|
||||
|
||||
@@ -221,7 +221,7 @@ impl TaskManager {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_for_graceful_shutdown(&mut self) {
|
||||
pub async fn wait_for_graceful_shutdown(&mut self) {
|
||||
if let Some(notify_rx) = self.notify_rx.take() {
|
||||
drop(notify_rx);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user