chore: minor nym-node improvements (#6850)

* set TCP_NODELAY for mixnet connections

* bugfix: correctly compute count deferral threshold

* bugfix: make sure to flush pending packets waiting for bloomfilter check

* implement batch sending into mixnet connection

* adjust default nym-node connection settings
This commit is contained in:
Jędrzej Stuczyński
2026-06-08 08:37:41 +01:00
committed by GitHub
parent e8410b2302
commit b6202b5a6b
4 changed files with 158 additions and 12 deletions
+35 -5
View File
@@ -218,6 +218,11 @@ impl ManagedConnection {
"Managed to establish connection to {}", self.address
);
// disable Nagle: mix packets are latency-sensitive and flushed one at a time.
if let Err(err) = stream.set_nodelay(true) {
warn!(peer = %address, error = %err, "failed to set TCP_NODELAY on outbound mixnet connection");
}
// 3. perform noise handshake (if applicable)
let noise_start = tokio::time::Instant::now();
let noise_stream = match upgrade_noise_initiator(stream, &self.noise_config).await {
@@ -246,25 +251,42 @@ impl ManagedConnection {
noise_handshake_ms,
"Noise initiator handshake completed for {:?}", address
);
let conn = Framed::new(noise_stream, NymCodec);
let mut conn = Framed::new(noise_stream, NymCodec);
// let the write buffer accumulate several packets before flushing (see run_io_loop)
conn.set_backpressure_boundary(OUTBOUND_WRITE_BUFFER);
// 4. start handling the framed stream
run_io_loop(conn, self.message_receiver, address).await;
}
}
/// Upper bound on how many already-queued packets we drain into a single flush.
/// Bounds the per-batch allocation and how often we re-check the read side; the actual
/// write coalescing is governed by the Framed backpressure boundary below.
const OUTBOUND_FLUSH_BATCH: usize = 1024;
/// Write-buffer high-water mark for the egress `Framed`: packets are coalesced up to
/// roughly this many bytes before a flush, trading a larger write burst for far fewer
/// syscalls (and noise frames) under load. Kept under the ~64KiB noise frame ceiling so
/// a flush is usually a single frame.
const OUTBOUND_WRITE_BUFFER: usize = 32 * 1024;
// The connection is unidirectional (send-only); we read from it solely to
// notice peer FIN/RST while idle so we can evict the cache entry before the
// next outbound send finds it stale.
async fn run_io_loop<T>(
conn: Framed<T, NymCodec>,
mut receiver: ReceiverStream<FramedNymPacket>,
receiver: ReceiverStream<FramedNymPacket>,
address: SocketAddr,
) where
T: AsyncRead + AsyncWrite + Unpin,
{
let (mut sink, mut stream) = conn.split();
// drain all currently-queued packets into one flush rather than flushing per packet,
// which otherwise caps egress throughput and backs up the per-connection queue under load
let mut receiver = receiver.ready_chunks(OUTBOUND_FLUSH_BATCH);
loop {
tokio::select! {
msg = stream.next() => {
@@ -305,14 +327,22 @@ async fn run_io_loop<T>(
);
break;
}
Some(packet) => {
if let Err(err) = sink.send(packet).await {
Some(batch) => {
// feed the whole ready batch, then flush once
let res = async {
for packet in batch {
sink.feed(packet).await?;
}
sink.flush().await
}
.await;
if let Err(err) = res {
debug!(
event = "connection.forward_error",
peer = %address,
error = %err,
exit_reason = "forward_error",
"Failed to forward packet to {address}: {err}"
"failed to forward packet batch to {address}: {err}"
);
break;
}
+10 -3
View File
@@ -693,7 +693,12 @@ pub struct MixnetDebug {
#[serde(with = "humantime_serde")]
pub initial_connection_timeout: Duration,
/// Maximum number of packets that can be stored waiting to get sent to a particular connection.
/// Maximum number of packets buffered per egress connection awaiting a socket write.
/// This is a short-term burst absorber, not a queue: buffer depth converts directly into
/// added latency (roughly `depth / per-peer send rate`), so an oversized value is just
/// bufferbloat. Once it fills, further packets for that peer are dropped rather than
/// delayed, which is preferable in a mixnet where a packet held that long has already
/// missed its usefulness window. Keep worst-case queuing well under the per-hop mix delay.
pub maximum_connection_buffer_size: usize,
/// Specify whether any framed packets between nodes should use the legacy format (v7)
@@ -882,9 +887,11 @@ impl MixnetDebug {
// which for all intents and purposes will never happen
const DEFAULT_MAXIMUM_FORWARD_PACKET_DELAY: Duration = Duration::from_secs(10);
const DEFAULT_PACKET_FORWARDING_INITIAL_BACKOFF: Duration = Duration::from_millis(10_000);
const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_millis(300_000);
const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_secs(16);
const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500);
const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 2000;
// small enough to keep worst-case egress queuing in the tens-of-ms range at a few thousand
// pps per peer (vs. the old 2000, which was hundreds of ms of bufferbloat)
const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 192;
}
impl Default for MixnetDebug {
+106 -3
View File
@@ -18,6 +18,7 @@ use nym_sphinx_types::{Delay, REPLAY_TAG_SIZE};
use std::collections::HashMap;
use std::mem;
use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::time::Instant;
use tokio_util::codec::Framed;
@@ -59,6 +60,11 @@ impl PendingReplayCheckPackets {
self.packets.values().map(|v| v.len()).sum()
}
/// Instant at which the currently-deferred batch must be flushed, or `None` if nothing is pending.
fn flush_deadline(&self, deferral: Duration) -> Option<Instant> {
(self.total_count() > 0).then(|| self.last_acquired_mutex + deferral)
}
fn replay_tags(&self) -> HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>> {
let mut replay_tags = HashMap::with_capacity(self.packets.len());
'outer: for (rotation_id, packets) in &self.packets {
@@ -289,7 +295,7 @@ impl ConnectionHandler {
.processing_config
.maximum_replay_detection_deferral;
let count_threshold = self.pending_packets.packets.len()
let count_threshold = self.pending_packets.total_count()
< self
.shared
.processing_config
@@ -706,13 +712,23 @@ impl ConnectionHandler {
) {
let mut packets_processed: u64 = 0;
loop {
// make sure pending packets are not stuck in the queue if we don't get any more packets
// from this sender
let flush_deadline = self.pending_packets.flush_deadline(
self.shared
.processing_config
.maximum_replay_detection_deferral,
);
tokio::select! {
biased;
// 1. check for cancellation
_ = self.shared.shutdown_token.cancelled() => {
trace!("connection handler: received shutdown");
Span::current().record("exit_reason", "shutdown");
break
}
// 2. handle any incoming packet
maybe_framed_nym_packet = mixnet_connection.next() => {
match maybe_framed_nym_packet {
Some(Ok(packet)) => {
@@ -732,7 +748,7 @@ impl ConnectionHandler {
);
Span::current().record("exit_reason", "corrupted");
Span::current().record("packets_processed", packets_processed);
return
break
}
None => {
debug!(
@@ -742,14 +758,101 @@ impl ConnectionHandler {
);
Span::current().record("exit_reason", "closed_by_remote");
Span::current().record("packets_processed", packets_processed);
return
break
}
}
}
// 3. check for the deferred pending packets
_ = async move {
match flush_deadline {
Some(d) => tokio::time::sleep_until(d).await,
None => std::future::pending::<()>().await,
}
} => {
self.handle_pending_packets_batch(Instant::now()).await;
}
}
}
// drain any packets still deferred for replay-checking so they are forwarded
// rather than silently dropped when the connection closes, errors, or shuts down
self.handle_pending_packets_batch(Instant::now()).await;
Span::current().record("packets_processed", packets_processed);
debug!("exiting and closing connection");
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use nym_sphinx_params::{PacketSize, PacketType};
use nym_sphinx_types::{
DESTINATION_ADDRESS_LENGTH, Destination, DestinationAddressBytes, IDENTIFIER_LENGTH,
NODE_ADDRESS_LENGTH, Node, NodeAddressBytes, NymPacket, PrivateKey, PublicKey,
};
fn random_pubkey() -> PublicKey {
(&PrivateKey::random()).into()
}
// Build a real sphinx packet whose first hop validates against `key`, then partially
// unwrap it - enough to land one entry in the pending replay-check batch.
fn pending_packet(key: &PrivateKey) -> PartialyUnwrappedPacketWithKeyRotation {
let route = [
Node::new(
NodeAddressBytes::from_bytes([1u8; NODE_ADDRESS_LENGTH]),
key.into(),
),
Node::new(
NodeAddressBytes::from_bytes([2u8; NODE_ADDRESS_LENGTH]),
random_pubkey(),
),
];
let destination = Destination::new(
DestinationAddressBytes::from_bytes([3u8; DESTINATION_ADDRESS_LENGTH]),
[4u8; IDENTIFIER_LENGTH],
);
let delays: Vec<Delay> = std::iter::repeat_with(|| Delay::new_from_nanos(0))
.take(route.len())
.collect();
let packet = NymPacket::sphinx_build(
true,
PacketSize::RegularPacket.payload_size(),
b"x",
&route,
&destination,
&delays,
)
.expect("failed to build test sphinx packet");
let framed =
FramedNymPacket::new(packet, PacketType::Mix, SphinxKeyRotation::Unknown, true);
PartiallyUnwrappedPacket::new(framed, key)
.map_err(|(_, err)| err)
.expect("failed to partially unwrap test packet")
.with_key_rotation(0)
}
#[test]
fn no_flush_deadline_when_nothing_pending() {
let pending = PendingReplayCheckPackets::new();
assert!(pending.flush_deadline(Duration::from_millis(50)).is_none());
}
#[test]
fn flush_deadline_is_batch_start_plus_deferral() {
let key = PrivateKey::random();
let mut pending = PendingReplayCheckPackets::new();
let batch_start = Instant::now();
pending.push(batch_start, pending_packet(&key));
let deferral = Duration::from_millis(50);
assert_eq!(
pending.flush_deadline(deferral),
Some(batch_start + deferral)
);
}
}
+7 -1
View File
@@ -24,7 +24,7 @@ use std::time::Duration;
use tokio::net::TcpStream;
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tracing::{debug, error};
use tracing::{debug, error, warn};
pub(crate) mod final_hop;
@@ -192,6 +192,12 @@ impl SharedData {
match accepted {
Ok((socket, remote_addr)) => {
debug!("accepted incoming mixnet connection from: {remote_addr}");
// disable Nagle: mix packets are latency-sensitive and flushed one at a time.
if let Err(err) = socket.set_nodelay(true) {
warn!(
"failed to set TCP_NODELAY on mixnet connection from {remote_addr}: {err}"
);
}
let mut handler = ConnectionHandler::new(self, remote_addr);
let join_handle =
tokio::spawn(async move { handler.handle_connection(socket).await });