From b6202b5a6b26dadb5be6f22606dbb2869707b991 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Stuczy=C5=84ski?= Date: Mon, 8 Jun 2026 08:37:41 +0100 Subject: [PATCH] chore: minor nym-node improvements (#6850) * set TCP_NODELAY for mixnet connections * bugfix: correctly compute count deferral threshold * bugfix: make sure to flush pending packets waiting for bloomfilter check * implement batch sending into mixnet connection * adjust default nym-node connection settings --- .../client-libs/mixnet-client/src/client.rs | 40 ++++++- nym-node/src/config/mod.rs | 13 ++- nym-node/src/node/mixnet/handler.rs | 109 +++++++++++++++++- nym-node/src/node/mixnet/shared/mod.rs | 8 +- 4 files changed, 158 insertions(+), 12 deletions(-) diff --git a/common/client-libs/mixnet-client/src/client.rs b/common/client-libs/mixnet-client/src/client.rs index 9cef37e2d2..bc44bc1e35 100644 --- a/common/client-libs/mixnet-client/src/client.rs +++ b/common/client-libs/mixnet-client/src/client.rs @@ -218,6 +218,11 @@ impl ManagedConnection { "Managed to establish connection to {}", self.address ); + // disable Nagle: mix packets are latency-sensitive and flushed one at a time. + if let Err(err) = stream.set_nodelay(true) { + warn!(peer = %address, error = %err, "failed to set TCP_NODELAY on outbound mixnet connection"); + } + // 3. perform noise handshake (if applicable) let noise_start = tokio::time::Instant::now(); let noise_stream = match upgrade_noise_initiator(stream, &self.noise_config).await { @@ -246,25 +251,42 @@ impl ManagedConnection { noise_handshake_ms, "Noise initiator handshake completed for {:?}", address ); - let conn = Framed::new(noise_stream, NymCodec); + let mut conn = Framed::new(noise_stream, NymCodec); + // let the write buffer accumulate several packets before flushing (see run_io_loop) + conn.set_backpressure_boundary(OUTBOUND_WRITE_BUFFER); // 4. start handling the framed stream run_io_loop(conn, self.message_receiver, address).await; } } +/// Upper bound on how many already-queued packets we drain into a single flush. +/// Bounds the per-batch allocation and how often we re-check the read side; the actual +/// write coalescing is governed by the Framed backpressure boundary below. +const OUTBOUND_FLUSH_BATCH: usize = 1024; + +/// Write-buffer high-water mark for the egress `Framed`: packets are coalesced up to +/// roughly this many bytes before a flush, trading a larger write burst for far fewer +/// syscalls (and noise frames) under load. Kept under the ~64KiB noise frame ceiling so +/// a flush is usually a single frame. +const OUTBOUND_WRITE_BUFFER: usize = 32 * 1024; + // The connection is unidirectional (send-only); we read from it solely to // notice peer FIN/RST while idle so we can evict the cache entry before the // next outbound send finds it stale. async fn run_io_loop( conn: Framed, - mut receiver: ReceiverStream, + receiver: ReceiverStream, address: SocketAddr, ) where T: AsyncRead + AsyncWrite + Unpin, { let (mut sink, mut stream) = conn.split(); + // drain all currently-queued packets into one flush rather than flushing per packet, + // which otherwise caps egress throughput and backs up the per-connection queue under load + let mut receiver = receiver.ready_chunks(OUTBOUND_FLUSH_BATCH); + loop { tokio::select! { msg = stream.next() => { @@ -305,14 +327,22 @@ async fn run_io_loop( ); break; } - Some(packet) => { - if let Err(err) = sink.send(packet).await { + Some(batch) => { + // feed the whole ready batch, then flush once + let res = async { + for packet in batch { + sink.feed(packet).await?; + } + sink.flush().await + } + .await; + if let Err(err) = res { debug!( event = "connection.forward_error", peer = %address, error = %err, exit_reason = "forward_error", - "Failed to forward packet to {address}: {err}" + "failed to forward packet batch to {address}: {err}" ); break; } diff --git a/nym-node/src/config/mod.rs b/nym-node/src/config/mod.rs index 963f1bf5dd..ebec39e2c5 100644 --- a/nym-node/src/config/mod.rs +++ b/nym-node/src/config/mod.rs @@ -693,7 +693,12 @@ pub struct MixnetDebug { #[serde(with = "humantime_serde")] pub initial_connection_timeout: Duration, - /// Maximum number of packets that can be stored waiting to get sent to a particular connection. + /// Maximum number of packets buffered per egress connection awaiting a socket write. + /// This is a short-term burst absorber, not a queue: buffer depth converts directly into + /// added latency (roughly `depth / per-peer send rate`), so an oversized value is just + /// bufferbloat. Once it fills, further packets for that peer are dropped rather than + /// delayed, which is preferable in a mixnet where a packet held that long has already + /// missed its usefulness window. Keep worst-case queuing well under the per-hop mix delay. pub maximum_connection_buffer_size: usize, /// Specify whether any framed packets between nodes should use the legacy format (v7) @@ -882,9 +887,11 @@ impl MixnetDebug { // which for all intents and purposes will never happen const DEFAULT_MAXIMUM_FORWARD_PACKET_DELAY: Duration = Duration::from_secs(10); const DEFAULT_PACKET_FORWARDING_INITIAL_BACKOFF: Duration = Duration::from_millis(10_000); - const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_millis(300_000); + const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_secs(16); const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500); - const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 2000; + // small enough to keep worst-case egress queuing in the tens-of-ms range at a few thousand + // pps per peer (vs. the old 2000, which was hundreds of ms of bufferbloat) + const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 192; } impl Default for MixnetDebug { diff --git a/nym-node/src/node/mixnet/handler.rs b/nym-node/src/node/mixnet/handler.rs index bd70892686..9fa565fc22 100644 --- a/nym-node/src/node/mixnet/handler.rs +++ b/nym-node/src/node/mixnet/handler.rs @@ -18,6 +18,7 @@ use nym_sphinx_types::{Delay, REPLAY_TAG_SIZE}; use std::collections::HashMap; use std::mem; use std::net::SocketAddr; +use std::time::Duration; use tokio::net::TcpStream; use tokio::time::Instant; use tokio_util::codec::Framed; @@ -59,6 +60,11 @@ impl PendingReplayCheckPackets { self.packets.values().map(|v| v.len()).sum() } + /// Instant at which the currently-deferred batch must be flushed, or `None` if nothing is pending. + fn flush_deadline(&self, deferral: Duration) -> Option { + (self.total_count() > 0).then(|| self.last_acquired_mutex + deferral) + } + fn replay_tags(&self) -> HashMap> { let mut replay_tags = HashMap::with_capacity(self.packets.len()); 'outer: for (rotation_id, packets) in &self.packets { @@ -289,7 +295,7 @@ impl ConnectionHandler { .processing_config .maximum_replay_detection_deferral; - let count_threshold = self.pending_packets.packets.len() + let count_threshold = self.pending_packets.total_count() < self .shared .processing_config @@ -706,13 +712,23 @@ impl ConnectionHandler { ) { let mut packets_processed: u64 = 0; loop { + // make sure pending packets are not stuck in the queue if we don't get any more packets + // from this sender + let flush_deadline = self.pending_packets.flush_deadline( + self.shared + .processing_config + .maximum_replay_detection_deferral, + ); + tokio::select! { biased; + // 1. check for cancellation _ = self.shared.shutdown_token.cancelled() => { trace!("connection handler: received shutdown"); Span::current().record("exit_reason", "shutdown"); break } + // 2. handle any incoming packet maybe_framed_nym_packet = mixnet_connection.next() => { match maybe_framed_nym_packet { Some(Ok(packet)) => { @@ -732,7 +748,7 @@ impl ConnectionHandler { ); Span::current().record("exit_reason", "corrupted"); Span::current().record("packets_processed", packets_processed); - return + break } None => { debug!( @@ -742,14 +758,101 @@ impl ConnectionHandler { ); Span::current().record("exit_reason", "closed_by_remote"); Span::current().record("packets_processed", packets_processed); - return + break } } } + // 3. check for the deferred pending packets + _ = async move { + match flush_deadline { + Some(d) => tokio::time::sleep_until(d).await, + None => std::future::pending::<()>().await, + } + } => { + self.handle_pending_packets_batch(Instant::now()).await; + } } } + // drain any packets still deferred for replay-checking so they are forwarded + // rather than silently dropped when the connection closes, errors, or shuts down + self.handle_pending_packets_batch(Instant::now()).await; + Span::current().record("packets_processed", packets_processed); debug!("exiting and closing connection"); } } + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used)] +mod tests { + use super::*; + use nym_sphinx_params::{PacketSize, PacketType}; + use nym_sphinx_types::{ + DESTINATION_ADDRESS_LENGTH, Destination, DestinationAddressBytes, IDENTIFIER_LENGTH, + NODE_ADDRESS_LENGTH, Node, NodeAddressBytes, NymPacket, PrivateKey, PublicKey, + }; + + fn random_pubkey() -> PublicKey { + (&PrivateKey::random()).into() + } + + // Build a real sphinx packet whose first hop validates against `key`, then partially + // unwrap it - enough to land one entry in the pending replay-check batch. + fn pending_packet(key: &PrivateKey) -> PartialyUnwrappedPacketWithKeyRotation { + let route = [ + Node::new( + NodeAddressBytes::from_bytes([1u8; NODE_ADDRESS_LENGTH]), + key.into(), + ), + Node::new( + NodeAddressBytes::from_bytes([2u8; NODE_ADDRESS_LENGTH]), + random_pubkey(), + ), + ]; + let destination = Destination::new( + DestinationAddressBytes::from_bytes([3u8; DESTINATION_ADDRESS_LENGTH]), + [4u8; IDENTIFIER_LENGTH], + ); + let delays: Vec = std::iter::repeat_with(|| Delay::new_from_nanos(0)) + .take(route.len()) + .collect(); + let packet = NymPacket::sphinx_build( + true, + PacketSize::RegularPacket.payload_size(), + b"x", + &route, + &destination, + &delays, + ) + .expect("failed to build test sphinx packet"); + let framed = + FramedNymPacket::new(packet, PacketType::Mix, SphinxKeyRotation::Unknown, true); + + PartiallyUnwrappedPacket::new(framed, key) + .map_err(|(_, err)| err) + .expect("failed to partially unwrap test packet") + .with_key_rotation(0) + } + + #[test] + fn no_flush_deadline_when_nothing_pending() { + let pending = PendingReplayCheckPackets::new(); + assert!(pending.flush_deadline(Duration::from_millis(50)).is_none()); + } + + #[test] + fn flush_deadline_is_batch_start_plus_deferral() { + let key = PrivateKey::random(); + let mut pending = PendingReplayCheckPackets::new(); + + let batch_start = Instant::now(); + pending.push(batch_start, pending_packet(&key)); + + let deferral = Duration::from_millis(50); + assert_eq!( + pending.flush_deadline(deferral), + Some(batch_start + deferral) + ); + } +} diff --git a/nym-node/src/node/mixnet/shared/mod.rs b/nym-node/src/node/mixnet/shared/mod.rs index cbe3418d6a..2f68da5211 100644 --- a/nym-node/src/node/mixnet/shared/mod.rs +++ b/nym-node/src/node/mixnet/shared/mod.rs @@ -24,7 +24,7 @@ use std::time::Duration; use tokio::net::TcpStream; use tokio::task::JoinHandle; use tokio::time::Instant; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; pub(crate) mod final_hop; @@ -192,6 +192,12 @@ impl SharedData { match accepted { Ok((socket, remote_addr)) => { debug!("accepted incoming mixnet connection from: {remote_addr}"); + // disable Nagle: mix packets are latency-sensitive and flushed one at a time. + if let Err(err) = socket.set_nodelay(true) { + warn!( + "failed to set TCP_NODELAY on mixnet connection from {remote_addr}: {err}" + ); + } let mut handler = ConnectionHandler::new(self, remote_addr); let join_handle = tokio::spawn(async move { handler.handle_connection(socket).await });