Compare commits

...

3 Commits

Author SHA1 Message Date
Bogdan-Ștefan Neacşu 120220517d Retry sign&broadcast on sequence errors 2025-08-22 18:25:07 +02:00
benedetta davico 7677c02494 bump patched version for consistency 2025-08-11 10:46:34 +02:00
Jędrzej Stuczyński c20c952633 feat: introduce additional checks when attempting to send to bounded channels (#5941)
* feat: introduce additional checks when attempting to send to bounded channels

or to a fallible gateway

* return error rather than panic when merging socket during shutdown
2025-08-11 09:18:51 +01:00
10 changed files with 129 additions and 51 deletions
@@ -96,24 +96,46 @@ impl MixTrafficController {
mut mix_packets: Vec<MixPacket>,
) -> Result<(), ErasedGatewayError> {
debug_assert!(!mix_packets.is_empty());
let result = if mix_packets.len() == 1 {
let send_future = if mix_packets.len() == 1 {
// SAFETY: we just checked we have one packet
#[allow(clippy::unwrap_used)]
let mix_packet = mix_packets.pop().unwrap();
self.gateway_transceiver.send_mix_packet(mix_packet).await
self.gateway_transceiver.send_mix_packet(mix_packet)
} else {
self.gateway_transceiver
.batch_send_mix_packets(mix_packets)
.await
self.gateway_transceiver.batch_send_mix_packets(mix_packets)
};
if result.is_err() {
self.consecutive_gateway_failure_count += 1;
} else {
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
}
tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown while handling messages");
Ok(())
}
result = send_future => {
if result.is_err() {
self.consecutive_gateway_failure_count += 1;
} else {
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
}
result
result
}
}
}
async fn on_client_request(&mut self, client_request: ClientRequest) {
tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown while handling client request");
}
result = self.gateway_transceiver.send_client_request(client_request) => {
if let Err(err) = result {
error!("Failed to send client request: {err}")
}
}
}
}
pub fn start(mut self) {
@@ -122,7 +144,11 @@ impl MixTrafficController {
while !self.task_client.is_shutdown() {
tokio::select! {
mix_packets = self.mix_rx.recv() => match mix_packets {
biased;
_ = self.task_client.recv() => {
tracing::trace!("MixTrafficController: Received shutdown");
break;
}mix_packets = self.mix_rx.recv() => match mix_packets {
Some(mix_packets) => {
if let Err(err) = self.on_messages(mix_packets).await {
error!("Failed to send sphinx packet(s) to the gateway: {err}");
@@ -144,19 +170,14 @@ impl MixTrafficController {
},
client_request = self.client_rx.recv() => match client_request {
Some(client_request) => {
match self.gateway_transceiver.send_client_request(client_request).await {
Ok(_) => (),
Err(e) => error!("Failed to send client request: {e}"),
};
self.on_client_request(client_request).await;
},
None => {
tracing::trace!("MixTrafficController, client request channel closed");
}
},
_ = self.task_client.recv() => {
tracing::trace!("MixTrafficController: Received shutdown");
break;
}
}
}
self.task_client.recv_timeout().await;
@@ -223,6 +223,11 @@ where
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = self.task_client.recv() => {
tracing::trace!("InputMessageListener: Received shutdown");
break;
}
input_msg = self.input_receiver.recv() => match input_msg {
Some(input_msg) => {
self.on_input_message(input_msg).await;
@@ -232,9 +237,7 @@ where
break;
}
},
_ = self.task_client.recv() => {
tracing::trace!("InputMessageListener: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
@@ -179,6 +179,11 @@ where
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = self.task_client.recv() => {
tracing::trace!("RetransmissionRequestListener: Received shutdown");
break;
}
timed_out_ack = self.request_receiver.next() => match timed_out_ack {
Some(timed_out_ack) => self.on_retransmission_request(timed_out_ack, packet_type).await,
None => {
@@ -186,9 +191,7 @@ where
break;
}
},
_ = self.task_client.recv() => {
tracing::trace!("RetransmissionRequestListener: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
@@ -535,6 +535,7 @@ where
pending_acks.push(pending_ack);
}
drop(topology_permit);
self.insert_pending_acks(pending_acks);
self.forward_messages(real_messages, lane).await;
@@ -716,17 +717,21 @@ where
// tells real message sender (with the poisson timer) to send this to the mix network
pub(crate) async fn forward_messages(
&self,
&mut self,
messages: Vec<RealMessage>,
transmission_lane: TransmissionLane,
) {
if let Err(err) = self
.real_message_sender
.send((messages, transmission_lane))
.await
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to forward messages to the real message sender: {err}");
tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown while attempting to forward mixnet messages");
}
sending_res = self.real_message_sender.send((messages, transmission_lane)) => {
if sending_res.is_err() {
error!(
"failed to forward mixnet messages due to closed channel (outside of shutdown!)"
);
}
}
}
}
@@ -278,17 +278,33 @@ where
}
};
if let Err(err) = self.mix_tx.send(vec![next_message]).await {
if !self.task_client.is_shutdown_poll() {
tracing::error!("Failed to send: {err}");
let sending_res = tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown signal while attempting to send mix message");
return
}
sending_res = self.mix_tx.send(vec![next_message]) => {
sending_res
}
};
match sending_res {
Err(_) => {
if !self.task_client.is_shutdown_poll() {
tracing::error!(
"failed to send mixnet packet due to closed channel (outside of shutdown!)"
);
}
}
Ok(_) => {
let event = if fragment_id.is_some() {
PacketStatisticsEvent::RealPacketSent(packet_size)
} else {
PacketStatisticsEvent::CoverPacketSent(packet_size)
};
self.stats_tx.report(event.into());
}
} else {
let event = if fragment_id.is_some() {
PacketStatisticsEvent::RealPacketSent(packet_size)
} else {
PacketStatisticsEvent::CoverPacketSent(packet_size)
};
self.stats_tx.report(event.into());
}
// notify ack controller about sending our message only after we actually managed to push it
@@ -201,7 +201,7 @@ impl<C, St> GatewayClient<C, St> {
#[cfg(not(target_arch = "wasm32"))]
pub async fn establish_connection(&mut self) -> Result<(), GatewayClientError> {
debug!(
"Attemting to establish connection to gateway at: {}",
"Attempting to establish connection to gateway at: {}",
self.gateway_address
);
let (ws_stream, _) = connect_async(
@@ -337,7 +337,7 @@ impl PartiallyDelegatedHandle {
// check if the split stream didn't error out
let receive_res = stream_receiver
.try_recv()
.expect("stream sender was somehow dropped without sending anything!");
.map_err(|_| GatewayClientError::ConnectionAbruptlyClosed)?;
if let Some(res) = receive_res {
let _res = res?;
@@ -665,8 +665,7 @@ where
CosmWasmClient::broadcast_tx_commit(self, tx_bytes).await
}
/// Broadcast a transaction to the network and monitors its inclusion in a block.
async fn sign_and_broadcast(
async fn sign_and_broadcast_inner(
&self,
signer_address: &AccountId,
messages: Vec<Any>,
@@ -686,6 +685,35 @@ where
self.broadcast_tx(tx_bytes, None, None).await
}
/// Broadcast a transaction to the network and monitors its inclusion in a block.
async fn sign_and_broadcast(
&self,
signer_address: &AccountId,
messages: Vec<Any>,
fee: Fee,
memo: impl Into<String> + Send + 'static,
) -> Result<TxResponse, NyxdError> {
let memo = memo.into();
loop {
match self
.sign_and_broadcast_inner(
signer_address,
messages.clone(),
fee.clone(),
memo.clone(),
)
.await
{
Ok(res) => return Ok(res),
Err(err) => {
if !err.to_string().contains("sequence") {
return Err(err);
}
}
}
}
}
async fn sign(
&self,
signer_address: &AccountId,
+2
View File
@@ -10,6 +10,7 @@ where
}
#[cfg(not(target_arch = "wasm32"))]
#[track_caller]
pub fn spawn<F>(future: F)
where
F: Future + Send + 'static,
@@ -18,6 +19,7 @@ where
tokio::spawn(future);
}
#[track_caller]
pub fn spawn_with_report_error<F, T, E>(future: F, mut shutdown: TaskClient)
where
F: Future<Output = Result<T, E>> + Send + 'static,
+1 -1
View File
@@ -3,7 +3,7 @@
[package]
name = "nym-node"
version = "1.15.0"
version = "1.15.1"
authors.workspace = true
repository.workspace = true
homepage.workspace = true