Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| bc59f94c1b | |||
| 8c28a12569 | |||
| f7387c3229 | |||
| a3460d7f67 | |||
| d7b5f4f6d6 |
@@ -2,6 +2,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
|
||||
use crate::error::ClientCoreError;
|
||||
use crate::spawn_future;
|
||||
use log::*;
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
@@ -60,8 +61,15 @@ impl MixTrafficController {
|
||||
)
|
||||
}
|
||||
|
||||
async fn on_messages(&mut self, mut mix_packets: Vec<MixPacket>) {
|
||||
async fn on_messages(
|
||||
&mut self,
|
||||
mut mix_packets: Vec<MixPacket>,
|
||||
) -> Result<(), ClientCoreError> {
|
||||
debug_assert!(!mix_packets.is_empty());
|
||||
info!(
|
||||
"JON: MixTrafficController: Sending {} sphinx packets to the gateway",
|
||||
mix_packets.len()
|
||||
);
|
||||
|
||||
let result = if mix_packets.len() == 1 {
|
||||
let mix_packet = mix_packets.pop().unwrap();
|
||||
@@ -72,42 +80,56 @@ impl MixTrafficController {
|
||||
.await
|
||||
};
|
||||
|
||||
match result {
|
||||
let r = match result {
|
||||
Err(err) => {
|
||||
error!("Failed to send sphinx packet(s) to the gateway: {err}");
|
||||
self.consecutive_gateway_failure_count += 1;
|
||||
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
|
||||
// todo: in the future this should initiate a 'graceful' shutdown or try
|
||||
// to reconnect?
|
||||
panic!("failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead. Can't do anything about it yet :(")
|
||||
Err(ClientCoreError::GatewayMaxRetriesExceeded)
|
||||
} else {
|
||||
Err(ClientCoreError::GatewayClientSendError {
|
||||
gateway_client_error: err.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
Ok(_) => {
|
||||
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
|
||||
self.consecutive_gateway_failure_count = 0;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
};
|
||||
info!("JON: MixTrafficController: done sending sphinx packets to the gateway");
|
||||
r
|
||||
}
|
||||
|
||||
pub fn start_with_shutdown(mut self, mut shutdown: nym_task::TaskClient) {
|
||||
spawn_future(async move {
|
||||
debug!("Started MixTrafficController with graceful shutdown support");
|
||||
|
||||
// let mut shutdown0 = shutdown.recv_with_delay();
|
||||
// tokio::pin!(shutdown0);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.recv_with_delay() => {
|
||||
// _ = &mut shutdown0 => {
|
||||
log::trace!("MixTrafficController: Received shutdown");
|
||||
break;
|
||||
}
|
||||
mix_packets = self.mix_rx.recv() => match mix_packets {
|
||||
Some(mix_packets) => {
|
||||
self.on_messages(mix_packets).await;
|
||||
log::info!("JON: MixTrafficController: mix_rx recv");
|
||||
if let Err(err) = self.on_messages(mix_packets).await {
|
||||
log::error!("MixTrafficController: failed to send mix packets to the gateway: {err}");
|
||||
}
|
||||
log::info!("JON: MixTrafficController: done with mix_rx recv");
|
||||
},
|
||||
None => {
|
||||
log::trace!("MixTrafficController: Stopping since channel closed");
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = shutdown.recv_with_delay() => {
|
||||
log::trace!("MixTrafficController: Received shutdown");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
shutdown.recv_timeout().await;
|
||||
|
||||
@@ -19,6 +19,12 @@ use futures::channel::{mpsc, oneshot};
|
||||
#[error(transparent)]
|
||||
pub struct ErasedGatewayError(Box<dyn std::error::Error + Send + Sync>);
|
||||
|
||||
impl ErasedGatewayError {
|
||||
pub fn downcast<T: std::error::Error + 'static>(&self) -> Option<&T> {
|
||||
self.0.downcast_ref::<T>()
|
||||
}
|
||||
}
|
||||
|
||||
fn erase_err<E: std::error::Error + Send + Sync + 'static>(err: E) -> ErasedGatewayError {
|
||||
ErasedGatewayError(Box::new(err))
|
||||
}
|
||||
@@ -40,6 +46,7 @@ pub trait GatewaySender {
|
||||
&mut self,
|
||||
packets: Vec<MixPacket>,
|
||||
) -> Result<(), ErasedGatewayError> {
|
||||
log::info!("GatewaySender::batch_send_mix_packets - sending {} packets", packets.len());
|
||||
// allow for optimisation when sending multiple packets
|
||||
for packet in packets {
|
||||
self.send_mix_packet(packet).await?;
|
||||
@@ -78,7 +85,10 @@ impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
|
||||
impl<G: GatewaySender + ?Sized + Send> GatewaySender for Box<G> {
|
||||
#[inline]
|
||||
async fn send_mix_packet(&mut self, packet: MixPacket) -> Result<(), ErasedGatewayError> {
|
||||
(**self).send_mix_packet(packet).await
|
||||
log::info!("JON: Box<GatewaySender>::send_mix_packet - sending a packet");
|
||||
let r = (**self).send_mix_packet(packet).await;
|
||||
log::info!("JON: Box<GatewaySender>::send_mix_packet - sent a packet");
|
||||
r
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -86,7 +96,10 @@ impl<G: GatewaySender + ?Sized + Send> GatewaySender for Box<G> {
|
||||
&mut self,
|
||||
packets: Vec<MixPacket>,
|
||||
) -> Result<(), ErasedGatewayError> {
|
||||
(**self).batch_send_mix_packets(packets).await
|
||||
log::info!("JON: Box<GatewaySender>::batch_send_mix_packets - sending {} packets", packets.len());
|
||||
let r = (**self).batch_send_mix_packets(packets).await;
|
||||
log::info!("JON: Box<GatewaySender>::batch_send_mix_packets - sent packets");
|
||||
r
|
||||
}
|
||||
}
|
||||
|
||||
@@ -130,20 +143,26 @@ where
|
||||
St: Send,
|
||||
{
|
||||
async fn send_mix_packet(&mut self, packet: MixPacket) -> Result<(), ErasedGatewayError> {
|
||||
self.gateway_client
|
||||
log::info!("JON: RemoteGateway::send_mix_packet - sending a packet");
|
||||
let r = self.gateway_client
|
||||
.send_mix_packet(packet)
|
||||
.await
|
||||
.map_err(erase_err)
|
||||
.map_err(erase_err);
|
||||
log::info!("JON: RemoteGateway::send_mix_packet - sent a packet");
|
||||
r
|
||||
}
|
||||
|
||||
async fn batch_send_mix_packets(
|
||||
&mut self,
|
||||
packets: Vec<MixPacket>,
|
||||
) -> Result<(), ErasedGatewayError> {
|
||||
self.gateway_client
|
||||
log::info!("JON: RemoteGateway::batch_send_mix_packets - sending {} packets", packets.len());
|
||||
let r = self.gateway_client
|
||||
.batch_send_mix_packets(packets)
|
||||
.await
|
||||
.map_err(erase_err)
|
||||
.map_err(erase_err);
|
||||
log::info!("JON: RemoteGateway::batch_send_mix_packets - sent packets");
|
||||
r
|
||||
}
|
||||
}
|
||||
|
||||
@@ -203,10 +222,13 @@ mod nonwasm_sealed {
|
||||
#[async_trait]
|
||||
impl GatewaySender for LocalGateway {
|
||||
async fn send_mix_packet(&mut self, packet: MixPacket) -> Result<(), ErasedGatewayError> {
|
||||
self.packet_forwarder
|
||||
log::info!("JON: LocalGateway::send_mix_packet - sending a packet");
|
||||
let r = self.packet_forwarder
|
||||
.unbounded_send(packet)
|
||||
.map_err(|err| err.into_send_error())
|
||||
.map_err(erase_err)
|
||||
.map_err(erase_err);
|
||||
log::info!("JON: LocalGateway::send_mix_packet - sent a packet");
|
||||
r
|
||||
}
|
||||
}
|
||||
|
||||
@@ -261,6 +283,7 @@ impl GatewayReceiver for MockGateway {
|
||||
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
|
||||
impl GatewaySender for MockGateway {
|
||||
async fn send_mix_packet(&mut self, packet: MixPacket) -> Result<(), ErasedGatewayError> {
|
||||
log::info!("MockGateway::send_mix_packet - sending a packet");
|
||||
self.sent.push(packet);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -632,15 +632,23 @@ where
|
||||
}
|
||||
|
||||
pub(crate) fn update_ack_delay(&self, id: FragmentIdentifier, new_delay: Delay) {
|
||||
self.action_sender
|
||||
if self
|
||||
.action_sender
|
||||
.unbounded_send(Action::UpdateDelay(id, new_delay))
|
||||
.expect("action control task has died")
|
||||
.is_err()
|
||||
{
|
||||
log::debug!("action control task has died");
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn insert_pending_acks(&self, pending_acks: Vec<PendingAcknowledgement>) {
|
||||
self.action_sender
|
||||
if self
|
||||
.action_sender
|
||||
.unbounded_send(Action::new_insert(pending_acks))
|
||||
.expect("action control task has died")
|
||||
.is_err()
|
||||
{
|
||||
log::debug!("action control task has died");
|
||||
}
|
||||
}
|
||||
|
||||
// tells real message sender (with the poisson timer) to send this to the mix network
|
||||
|
||||
@@ -198,7 +198,9 @@ where
|
||||
// queues and client load rather than the required delay. So realistically we can treat
|
||||
// whatever is about to happen as negligible additional delay.
|
||||
trace!("{} is about to get sent to the mixnet", frag_id);
|
||||
self.sent_notifier.unbounded_send(frag_id).unwrap();
|
||||
if self.sent_notifier.unbounded_send(frag_id).is_err() {
|
||||
debug!("Failed to notify about sent packet");
|
||||
}
|
||||
}
|
||||
|
||||
fn loop_cover_message_size(&mut self) -> PacketSize {
|
||||
@@ -270,7 +272,8 @@ where
|
||||
};
|
||||
|
||||
if let Err(err) = self.mix_tx.send(vec![next_message]).await {
|
||||
log::error!("Failed to send: {err}");
|
||||
log::debug!("Failed to send: {err}");
|
||||
return;
|
||||
} else {
|
||||
let event = if fragment_id.is_some() {
|
||||
PacketStatisticsEvent::RealPacketSent(packet_size)
|
||||
|
||||
@@ -78,6 +78,9 @@ pub enum ClientCoreError {
|
||||
source: tungstenite::Error,
|
||||
},
|
||||
|
||||
#[error("max number of retries for gateway connection has been exceeded")]
|
||||
GatewayMaxRetriesExceeded,
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
#[error("failed to establish gateway connection (wasm)")]
|
||||
GatewayJsConnectionFailure,
|
||||
@@ -88,6 +91,11 @@ pub enum ClientCoreError {
|
||||
#[error("timed out while trying to establish gateway connection")]
|
||||
GatewayConnectionTimeout,
|
||||
|
||||
#[error("failed to send sphinx packet to the gateway: {gateway_client_error}")]
|
||||
GatewayClientSendError {
|
||||
gateway_client_error: String,
|
||||
},
|
||||
|
||||
#[error("no ping measurements for the gateway ({identity}) performed")]
|
||||
NoGatewayMeasurements { identity: String },
|
||||
|
||||
|
||||
@@ -333,6 +333,7 @@ impl<C, St> GatewayClient<C, St> {
|
||||
&mut self,
|
||||
msg: Message,
|
||||
) -> Result<ServerResponse, GatewayClientError> {
|
||||
log::info!("JON: GatewayClient: send_websocket_message");
|
||||
let should_restart_mixnet_listener = if self.connection.is_partially_delegated() {
|
||||
self.recover_socket_connection().await?;
|
||||
true
|
||||
@@ -351,6 +352,7 @@ impl<C, St> GatewayClient<C, St> {
|
||||
if should_restart_mixnet_listener {
|
||||
self.start_listening_for_mixnet_messages()?;
|
||||
}
|
||||
log::info!("JON: GatewayClient: send_websocket_message done");
|
||||
response
|
||||
}
|
||||
|
||||
@@ -358,7 +360,8 @@ impl<C, St> GatewayClient<C, St> {
|
||||
&mut self,
|
||||
messages: Vec<Message>,
|
||||
) -> Result<(), GatewayClientError> {
|
||||
match self.connection {
|
||||
log::info!("JON: GatewayClient: batch_send_websocket_messages_without_response");
|
||||
let r = match self.connection {
|
||||
SocketState::Available(ref mut conn) => {
|
||||
let stream_messages: Vec<_> = messages.into_iter().map(Ok).collect();
|
||||
let mut send_stream = futures::stream::iter(stream_messages);
|
||||
@@ -381,14 +384,17 @@ impl<C, St> GatewayClient<C, St> {
|
||||
}
|
||||
SocketState::NotConnected => Err(GatewayClientError::ConnectionNotEstablished),
|
||||
_ => Err(GatewayClientError::ConnectionInInvalidState),
|
||||
}
|
||||
};
|
||||
log::info!("JON: GatewayClient: batch_send_websocket_messages_without_response done");
|
||||
r
|
||||
}
|
||||
|
||||
async fn send_websocket_message_without_response(
|
||||
&mut self,
|
||||
msg: Message,
|
||||
) -> Result<(), GatewayClientError> {
|
||||
match self.connection {
|
||||
log::info!("JON: GatewayClient: send_websocket_message_without_response");
|
||||
let r = match self.connection {
|
||||
SocketState::Available(ref mut conn) => Ok(conn.send(msg).await?),
|
||||
SocketState::PartiallyDelegated(ref mut partially_delegated) => {
|
||||
if let Err(err) = partially_delegated.send_without_response(msg).await {
|
||||
@@ -404,7 +410,9 @@ impl<C, St> GatewayClient<C, St> {
|
||||
}
|
||||
SocketState::NotConnected => Err(GatewayClientError::ConnectionNotEstablished),
|
||||
_ => Err(GatewayClientError::ConnectionInInvalidState),
|
||||
}
|
||||
};
|
||||
log::info!("JON: GatewayClient: send_websocket_message_without_response done");
|
||||
r
|
||||
}
|
||||
|
||||
fn check_gateway_protocol(
|
||||
@@ -658,6 +666,7 @@ impl<C, St> GatewayClient<C, St> {
|
||||
&mut self,
|
||||
packets: Vec<MixPacket>,
|
||||
) -> Result<(), GatewayClientError> {
|
||||
info!("JON: GatewayClient: batch_send_mix_packets");
|
||||
debug!("Sending {} mix packets", packets.len());
|
||||
|
||||
if !self.authenticated {
|
||||
@@ -684,10 +693,11 @@ impl<C, St> GatewayClient<C, St> {
|
||||
})
|
||||
.collect();
|
||||
|
||||
if let Err(err) = self
|
||||
let r = if let Err(err) = self
|
||||
.batch_send_websocket_messages_without_response(messages)
|
||||
.await
|
||||
{
|
||||
log::error!("GatewayClient: Failed to send sphinx packets to the gateway: {err}");
|
||||
if err.is_closed_connection() && self.should_reconnect_on_failure {
|
||||
self.attempt_reconnection().await
|
||||
} else {
|
||||
@@ -695,14 +705,17 @@ impl<C, St> GatewayClient<C, St> {
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
};
|
||||
log::info!("JON: GatewayClient: batch_send_mix_packets done");
|
||||
r
|
||||
}
|
||||
|
||||
async fn send_with_reconnection_on_failure(
|
||||
&mut self,
|
||||
msg: Message,
|
||||
) -> Result<(), GatewayClientError> {
|
||||
if let Err(err) = self.send_websocket_message_without_response(msg).await {
|
||||
log::info!("JON: GatewayClient: send_with_reconnection_on_failure");
|
||||
let r = if let Err(err) = self.send_websocket_message_without_response(msg).await {
|
||||
if err.is_closed_connection() && self.should_reconnect_on_failure {
|
||||
debug!("Going to attempt a reconnection");
|
||||
self.attempt_reconnection().await
|
||||
@@ -712,7 +725,9 @@ impl<C, St> GatewayClient<C, St> {
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
};
|
||||
log::info!("JON: GatewayClient: send_with_reconnection_on_failure done");
|
||||
r
|
||||
}
|
||||
|
||||
pub async fn send_ping_message(&mut self) -> Result<(), GatewayClientError> {
|
||||
@@ -731,6 +746,7 @@ impl<C, St> GatewayClient<C, St> {
|
||||
&mut self,
|
||||
mix_packet: MixPacket,
|
||||
) -> Result<(), GatewayClientError> {
|
||||
log::info!("JON: GatewayClient: send_mix_packet");
|
||||
if !self.authenticated {
|
||||
return Err(GatewayClientError::NotAuthenticated);
|
||||
}
|
||||
@@ -750,7 +766,9 @@ impl<C, St> GatewayClient<C, St> {
|
||||
.as_ref()
|
||||
.expect("no shared key present even though we're authenticated!"),
|
||||
);
|
||||
self.send_with_reconnection_on_failure(msg).await
|
||||
let r = self.send_with_reconnection_on_failure(msg).await;
|
||||
log::info!("JON: GatewayClient: send_mix_packet done");
|
||||
r
|
||||
}
|
||||
|
||||
async fn recover_socket_connection(&mut self) -> Result<(), GatewayClientError> {
|
||||
|
||||
@@ -74,6 +74,9 @@ pub enum GatewayClientError {
|
||||
#[error("Timed out")]
|
||||
Timeout,
|
||||
|
||||
#[error("timeout sending ws message to the gateway")]
|
||||
TimeoutOnSendingWs,
|
||||
|
||||
#[error("Failed to send mixnet message")]
|
||||
MixnetMsgSenderFailedToSend,
|
||||
|
||||
@@ -93,6 +96,7 @@ pub enum GatewayClientError {
|
||||
|
||||
impl GatewayClientError {
|
||||
pub fn is_closed_connection(&self) -> bool {
|
||||
log::info!("GatewayClientError::is_closed_connection");
|
||||
match self {
|
||||
GatewayClientError::NetworkError(ws_err) => match ws_err {
|
||||
WsError::AlreadyClosed | WsError::ConnectionClosed => true,
|
||||
|
||||
@@ -158,6 +158,9 @@ impl PartiallyDelegated {
|
||||
_ = shutdown.recv() => {
|
||||
log::trace!("GatewayClient listener: Received shutdown");
|
||||
log::debug!("GatewayClient listener: Exiting");
|
||||
// The packet router a task client, and as such we need to make
|
||||
// sure it's dropped to not stall the shutdown process.
|
||||
// drop(packet_router);
|
||||
return;
|
||||
}
|
||||
_ = &mut notify_receiver => {
|
||||
@@ -214,16 +217,31 @@ impl PartiallyDelegated {
|
||||
&mut self,
|
||||
msg: Message,
|
||||
) -> Result<(), GatewayClientError> {
|
||||
Ok(self.sink_half.send(msg).await?)
|
||||
log::info!("JON: PartiallyDelegated::send_without_response - sending a message");
|
||||
// let r = self.sink_half.send(msg).await;
|
||||
// Ok(r?)
|
||||
let r = tokio::time::timeout(Duration::from_secs(3), self.sink_half.send(msg)).await;
|
||||
let rr = match r {
|
||||
Ok(rr) => Ok(rr?),
|
||||
Err(_) => {
|
||||
log::error!("JON: PartiallyDelegated::send_without_response - timeout sending a message");
|
||||
Err(GatewayClientError::TimeoutOnSendingWs)
|
||||
}
|
||||
};
|
||||
log::info!("JON: PartiallyDelegated::send_without_response - sent a message: {rr:?}");
|
||||
rr
|
||||
}
|
||||
|
||||
pub(crate) async fn batch_send_without_response(
|
||||
&mut self,
|
||||
messages: Vec<Message>,
|
||||
) -> Result<(), GatewayClientError> {
|
||||
log::info!("JON: PartiallyDelegated::batch_send_without_response - sending {} messages", messages.len());
|
||||
let stream_messages: Vec<_> = messages.into_iter().map(Ok).collect();
|
||||
let mut send_stream = futures::stream::iter(stream_messages);
|
||||
Ok(self.sink_half.send_all(&mut send_stream).await?)
|
||||
let r = Ok(self.sink_half.send_all(&mut send_stream).await?);
|
||||
log::info!("JON: PartiallyDelegated::batch_send_without_response - sent messages");
|
||||
r
|
||||
}
|
||||
|
||||
pub(crate) async fn merge(self) -> Result<WsConn, GatewayClientError> {
|
||||
|
||||
Reference in New Issue
Block a user