Compare commits

...

5 Commits

Author SHA1 Message Date
Jon Häggblad bc59f94c1b Pass errors up the call stack 2024-05-06 11:56:30 +02:00
Jon Häggblad 8c28a12569 Start adding error return types 2024-05-06 10:24:50 +02:00
Jon Häggblad f7387c3229 wip: explore timeout on ws send 2024-05-05 22:42:56 +02:00
Jon Häggblad a3460d7f67 wip 2024-05-05 21:05:01 +02:00
Jon Häggblad d7b5f4f6d6 Exlicitly drop packet router when shutdown detected
In the vpn client I've observed that 2 out of 3 times the disconnect
procedure stalls out.

Investigating in the logs these 5 are still running when we hit shutdown
timeout.

[TaskClient-nym_vpn_lib-mixnet_client_main-real_traffic_controller-ack_control-retransmission_request_listener] Polling shutdown failed: channel closed
[TaskClient-nym_vpn_lib-mixnet_client_main-real_traffic_controller-out_queue_control] Polling shutdown failed: channel closed
[TaskClient-gateway-packet-router] Polling shutdown failed: channel closed
[TaskClient-nym_vpn_lib-mixnet_client_main-gateway_transceiver] Polling shutdown failed: channel closed
[TaskClient-nym_vpn_lib-mixnet_client_main-mix_traffic_controller] Polling shutdown failed: channel closed

I _think_ what was causing the problem here is that the task client in
the packet router is not being awaited in an event loop like other
client instances.

Explictly drop it in the socket state to make sure it's not blocking
shutdown.
2024-05-05 21:05:01 +02:00
8 changed files with 140 additions and 36 deletions
@@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
use crate::error::ClientCoreError;
use crate::spawn_future;
use log::*;
use nym_sphinx::forwarding::packet::MixPacket;
@@ -60,8 +61,15 @@ impl MixTrafficController {
)
}
async fn on_messages(&mut self, mut mix_packets: Vec<MixPacket>) {
async fn on_messages(
&mut self,
mut mix_packets: Vec<MixPacket>,
) -> Result<(), ClientCoreError> {
debug_assert!(!mix_packets.is_empty());
info!(
"JON: MixTrafficController: Sending {} sphinx packets to the gateway",
mix_packets.len()
);
let result = if mix_packets.len() == 1 {
let mix_packet = mix_packets.pop().unwrap();
@@ -72,42 +80,56 @@ impl MixTrafficController {
.await
};
match result {
let r = match result {
Err(err) => {
error!("Failed to send sphinx packet(s) to the gateway: {err}");
self.consecutive_gateway_failure_count += 1;
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
// todo: in the future this should initiate a 'graceful' shutdown or try
// to reconnect?
panic!("failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead. Can't do anything about it yet :(")
Err(ClientCoreError::GatewayMaxRetriesExceeded)
} else {
Err(ClientCoreError::GatewayClientSendError {
gateway_client_error: err.to_string(),
})
}
}
Ok(_) => {
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
Ok(())
}
}
};
info!("JON: MixTrafficController: done sending sphinx packets to the gateway");
r
}
pub fn start_with_shutdown(mut self, mut shutdown: nym_task::TaskClient) {
spawn_future(async move {
debug!("Started MixTrafficController with graceful shutdown support");
// let mut shutdown0 = shutdown.recv_with_delay();
// tokio::pin!(shutdown0);
loop {
tokio::select! {
biased;
_ = shutdown.recv_with_delay() => {
// _ = &mut shutdown0 => {
log::trace!("MixTrafficController: Received shutdown");
break;
}
mix_packets = self.mix_rx.recv() => match mix_packets {
Some(mix_packets) => {
self.on_messages(mix_packets).await;
log::info!("JON: MixTrafficController: mix_rx recv");
if let Err(err) = self.on_messages(mix_packets).await {
log::error!("MixTrafficController: failed to send mix packets to the gateway: {err}");
}
log::info!("JON: MixTrafficController: done with mix_rx recv");
},
None => {
log::trace!("MixTrafficController: Stopping since channel closed");
break;
}
},
_ = shutdown.recv_with_delay() => {
log::trace!("MixTrafficController: Received shutdown");
break;
}
}
}
shutdown.recv_timeout().await;
@@ -19,6 +19,12 @@ use futures::channel::{mpsc, oneshot};
#[error(transparent)]
pub struct ErasedGatewayError(Box<dyn std::error::Error + Send + Sync>);
impl ErasedGatewayError {
pub fn downcast<T: std::error::Error + 'static>(&self) -> Option<&T> {
self.0.downcast_ref::<T>()
}
}
fn erase_err<E: std::error::Error + Send + Sync + 'static>(err: E) -> ErasedGatewayError {
ErasedGatewayError(Box::new(err))
}
@@ -40,6 +46,7 @@ pub trait GatewaySender {
&mut self,
packets: Vec<MixPacket>,
) -> Result<(), ErasedGatewayError> {
log::info!("GatewaySender::batch_send_mix_packets - sending {} packets", packets.len());
// allow for optimisation when sending multiple packets
for packet in packets {
self.send_mix_packet(packet).await?;
@@ -78,7 +85,10 @@ impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
impl<G: GatewaySender + ?Sized + Send> GatewaySender for Box<G> {
#[inline]
async fn send_mix_packet(&mut self, packet: MixPacket) -> Result<(), ErasedGatewayError> {
(**self).send_mix_packet(packet).await
log::info!("JON: Box<GatewaySender>::send_mix_packet - sending a packet");
let r = (**self).send_mix_packet(packet).await;
log::info!("JON: Box<GatewaySender>::send_mix_packet - sent a packet");
r
}
#[inline]
@@ -86,7 +96,10 @@ impl<G: GatewaySender + ?Sized + Send> GatewaySender for Box<G> {
&mut self,
packets: Vec<MixPacket>,
) -> Result<(), ErasedGatewayError> {
(**self).batch_send_mix_packets(packets).await
log::info!("JON: Box<GatewaySender>::batch_send_mix_packets - sending {} packets", packets.len());
let r = (**self).batch_send_mix_packets(packets).await;
log::info!("JON: Box<GatewaySender>::batch_send_mix_packets - sent packets");
r
}
}
@@ -130,20 +143,26 @@ where
St: Send,
{
async fn send_mix_packet(&mut self, packet: MixPacket) -> Result<(), ErasedGatewayError> {
self.gateway_client
log::info!("JON: RemoteGateway::send_mix_packet - sending a packet");
let r = self.gateway_client
.send_mix_packet(packet)
.await
.map_err(erase_err)
.map_err(erase_err);
log::info!("JON: RemoteGateway::send_mix_packet - sent a packet");
r
}
async fn batch_send_mix_packets(
&mut self,
packets: Vec<MixPacket>,
) -> Result<(), ErasedGatewayError> {
self.gateway_client
log::info!("JON: RemoteGateway::batch_send_mix_packets - sending {} packets", packets.len());
let r = self.gateway_client
.batch_send_mix_packets(packets)
.await
.map_err(erase_err)
.map_err(erase_err);
log::info!("JON: RemoteGateway::batch_send_mix_packets - sent packets");
r
}
}
@@ -203,10 +222,13 @@ mod nonwasm_sealed {
#[async_trait]
impl GatewaySender for LocalGateway {
async fn send_mix_packet(&mut self, packet: MixPacket) -> Result<(), ErasedGatewayError> {
self.packet_forwarder
log::info!("JON: LocalGateway::send_mix_packet - sending a packet");
let r = self.packet_forwarder
.unbounded_send(packet)
.map_err(|err| err.into_send_error())
.map_err(erase_err)
.map_err(erase_err);
log::info!("JON: LocalGateway::send_mix_packet - sent a packet");
r
}
}
@@ -261,6 +283,7 @@ impl GatewayReceiver for MockGateway {
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl GatewaySender for MockGateway {
async fn send_mix_packet(&mut self, packet: MixPacket) -> Result<(), ErasedGatewayError> {
log::info!("MockGateway::send_mix_packet - sending a packet");
self.sent.push(packet);
Ok(())
}
@@ -632,15 +632,23 @@ where
}
pub(crate) fn update_ack_delay(&self, id: FragmentIdentifier, new_delay: Delay) {
self.action_sender
if self
.action_sender
.unbounded_send(Action::UpdateDelay(id, new_delay))
.expect("action control task has died")
.is_err()
{
log::debug!("action control task has died");
}
}
pub(crate) fn insert_pending_acks(&self, pending_acks: Vec<PendingAcknowledgement>) {
self.action_sender
if self
.action_sender
.unbounded_send(Action::new_insert(pending_acks))
.expect("action control task has died")
.is_err()
{
log::debug!("action control task has died");
}
}
// tells real message sender (with the poisson timer) to send this to the mix network
@@ -198,7 +198,9 @@ where
// queues and client load rather than the required delay. So realistically we can treat
// whatever is about to happen as negligible additional delay.
trace!("{} is about to get sent to the mixnet", frag_id);
self.sent_notifier.unbounded_send(frag_id).unwrap();
if self.sent_notifier.unbounded_send(frag_id).is_err() {
debug!("Failed to notify about sent packet");
}
}
fn loop_cover_message_size(&mut self) -> PacketSize {
@@ -270,7 +272,8 @@ where
};
if let Err(err) = self.mix_tx.send(vec![next_message]).await {
log::error!("Failed to send: {err}");
log::debug!("Failed to send: {err}");
return;
} else {
let event = if fragment_id.is_some() {
PacketStatisticsEvent::RealPacketSent(packet_size)
+8
View File
@@ -78,6 +78,9 @@ pub enum ClientCoreError {
source: tungstenite::Error,
},
#[error("max number of retries for gateway connection has been exceeded")]
GatewayMaxRetriesExceeded,
#[cfg(target_arch = "wasm32")]
#[error("failed to establish gateway connection (wasm)")]
GatewayJsConnectionFailure,
@@ -88,6 +91,11 @@ pub enum ClientCoreError {
#[error("timed out while trying to establish gateway connection")]
GatewayConnectionTimeout,
#[error("failed to send sphinx packet to the gateway: {gateway_client_error}")]
GatewayClientSendError {
gateway_client_error: String,
},
#[error("no ping measurements for the gateway ({identity}) performed")]
NoGatewayMeasurements { identity: String },
@@ -333,6 +333,7 @@ impl<C, St> GatewayClient<C, St> {
&mut self,
msg: Message,
) -> Result<ServerResponse, GatewayClientError> {
log::info!("JON: GatewayClient: send_websocket_message");
let should_restart_mixnet_listener = if self.connection.is_partially_delegated() {
self.recover_socket_connection().await?;
true
@@ -351,6 +352,7 @@ impl<C, St> GatewayClient<C, St> {
if should_restart_mixnet_listener {
self.start_listening_for_mixnet_messages()?;
}
log::info!("JON: GatewayClient: send_websocket_message done");
response
}
@@ -358,7 +360,8 @@ impl<C, St> GatewayClient<C, St> {
&mut self,
messages: Vec<Message>,
) -> Result<(), GatewayClientError> {
match self.connection {
log::info!("JON: GatewayClient: batch_send_websocket_messages_without_response");
let r = match self.connection {
SocketState::Available(ref mut conn) => {
let stream_messages: Vec<_> = messages.into_iter().map(Ok).collect();
let mut send_stream = futures::stream::iter(stream_messages);
@@ -381,14 +384,17 @@ impl<C, St> GatewayClient<C, St> {
}
SocketState::NotConnected => Err(GatewayClientError::ConnectionNotEstablished),
_ => Err(GatewayClientError::ConnectionInInvalidState),
}
};
log::info!("JON: GatewayClient: batch_send_websocket_messages_without_response done");
r
}
async fn send_websocket_message_without_response(
&mut self,
msg: Message,
) -> Result<(), GatewayClientError> {
match self.connection {
log::info!("JON: GatewayClient: send_websocket_message_without_response");
let r = match self.connection {
SocketState::Available(ref mut conn) => Ok(conn.send(msg).await?),
SocketState::PartiallyDelegated(ref mut partially_delegated) => {
if let Err(err) = partially_delegated.send_without_response(msg).await {
@@ -404,7 +410,9 @@ impl<C, St> GatewayClient<C, St> {
}
SocketState::NotConnected => Err(GatewayClientError::ConnectionNotEstablished),
_ => Err(GatewayClientError::ConnectionInInvalidState),
}
};
log::info!("JON: GatewayClient: send_websocket_message_without_response done");
r
}
fn check_gateway_protocol(
@@ -658,6 +666,7 @@ impl<C, St> GatewayClient<C, St> {
&mut self,
packets: Vec<MixPacket>,
) -> Result<(), GatewayClientError> {
info!("JON: GatewayClient: batch_send_mix_packets");
debug!("Sending {} mix packets", packets.len());
if !self.authenticated {
@@ -684,10 +693,11 @@ impl<C, St> GatewayClient<C, St> {
})
.collect();
if let Err(err) = self
let r = if let Err(err) = self
.batch_send_websocket_messages_without_response(messages)
.await
{
log::error!("GatewayClient: Failed to send sphinx packets to the gateway: {err}");
if err.is_closed_connection() && self.should_reconnect_on_failure {
self.attempt_reconnection().await
} else {
@@ -695,14 +705,17 @@ impl<C, St> GatewayClient<C, St> {
}
} else {
Ok(())
}
};
log::info!("JON: GatewayClient: batch_send_mix_packets done");
r
}
async fn send_with_reconnection_on_failure(
&mut self,
msg: Message,
) -> Result<(), GatewayClientError> {
if let Err(err) = self.send_websocket_message_without_response(msg).await {
log::info!("JON: GatewayClient: send_with_reconnection_on_failure");
let r = if let Err(err) = self.send_websocket_message_without_response(msg).await {
if err.is_closed_connection() && self.should_reconnect_on_failure {
debug!("Going to attempt a reconnection");
self.attempt_reconnection().await
@@ -712,7 +725,9 @@ impl<C, St> GatewayClient<C, St> {
}
} else {
Ok(())
}
};
log::info!("JON: GatewayClient: send_with_reconnection_on_failure done");
r
}
pub async fn send_ping_message(&mut self) -> Result<(), GatewayClientError> {
@@ -731,6 +746,7 @@ impl<C, St> GatewayClient<C, St> {
&mut self,
mix_packet: MixPacket,
) -> Result<(), GatewayClientError> {
log::info!("JON: GatewayClient: send_mix_packet");
if !self.authenticated {
return Err(GatewayClientError::NotAuthenticated);
}
@@ -750,7 +766,9 @@ impl<C, St> GatewayClient<C, St> {
.as_ref()
.expect("no shared key present even though we're authenticated!"),
);
self.send_with_reconnection_on_failure(msg).await
let r = self.send_with_reconnection_on_failure(msg).await;
log::info!("JON: GatewayClient: send_mix_packet done");
r
}
async fn recover_socket_connection(&mut self) -> Result<(), GatewayClientError> {
@@ -74,6 +74,9 @@ pub enum GatewayClientError {
#[error("Timed out")]
Timeout,
#[error("timeout sending ws message to the gateway")]
TimeoutOnSendingWs,
#[error("Failed to send mixnet message")]
MixnetMsgSenderFailedToSend,
@@ -93,6 +96,7 @@ pub enum GatewayClientError {
impl GatewayClientError {
pub fn is_closed_connection(&self) -> bool {
log::info!("GatewayClientError::is_closed_connection");
match self {
GatewayClientError::NetworkError(ws_err) => match ws_err {
WsError::AlreadyClosed | WsError::ConnectionClosed => true,
@@ -158,6 +158,9 @@ impl PartiallyDelegated {
_ = shutdown.recv() => {
log::trace!("GatewayClient listener: Received shutdown");
log::debug!("GatewayClient listener: Exiting");
// The packet router a task client, and as such we need to make
// sure it's dropped to not stall the shutdown process.
// drop(packet_router);
return;
}
_ = &mut notify_receiver => {
@@ -214,16 +217,31 @@ impl PartiallyDelegated {
&mut self,
msg: Message,
) -> Result<(), GatewayClientError> {
Ok(self.sink_half.send(msg).await?)
log::info!("JON: PartiallyDelegated::send_without_response - sending a message");
// let r = self.sink_half.send(msg).await;
// Ok(r?)
let r = tokio::time::timeout(Duration::from_secs(3), self.sink_half.send(msg)).await;
let rr = match r {
Ok(rr) => Ok(rr?),
Err(_) => {
log::error!("JON: PartiallyDelegated::send_without_response - timeout sending a message");
Err(GatewayClientError::TimeoutOnSendingWs)
}
};
log::info!("JON: PartiallyDelegated::send_without_response - sent a message: {rr:?}");
rr
}
pub(crate) async fn batch_send_without_response(
&mut self,
messages: Vec<Message>,
) -> Result<(), GatewayClientError> {
log::info!("JON: PartiallyDelegated::batch_send_without_response - sending {} messages", messages.len());
let stream_messages: Vec<_> = messages.into_iter().map(Ok).collect();
let mut send_stream = futures::stream::iter(stream_messages);
Ok(self.sink_half.send_all(&mut send_stream).await?)
let r = Ok(self.sink_half.send_all(&mut send_stream).await?);
log::info!("JON: PartiallyDelegated::batch_send_without_response - sent messages");
r
}
pub(crate) async fn merge(self) -> Result<WsConn, GatewayClientError> {