Smolmix RTT storm fix (#6846)

* RT fix for TLS

* Condense comment

* Coderabbit nits

* Clippy fix?

* Clippy 2:electric boogaloo

* Logging aggregate for very noisy tcp stuff
This commit is contained in:
mfahampshire
2026-06-03 17:31:15 +00:00
committed by GitHub
parent 7324bb23b6
commit 4ad00dba3d
7 changed files with 357 additions and 26 deletions
Generated
+20 -5
View File
@@ -11571,7 +11571,7 @@ dependencies = [
"nym-sdk",
"reqwest 0.13.1",
"rustls 0.23.37",
"smoltcp",
"smoltcp 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"thiserror 2.0.18",
"tokio",
"tokio-rustls 0.26.2",
@@ -11609,7 +11609,7 @@ dependencies = [
"semver 1.0.27",
"serde",
"serde-wasm-bindgen 0.6.5",
"smoltcp",
"smoltcp 0.12.0 (git+https://github.com/nymtech/smoltcp?rev=62ac5b8b3287d4773694f19a3b55e4c004354a0b)",
"thiserror 2.0.18",
"tokio",
"tsify",
@@ -11636,6 +11636,21 @@ dependencies = [
"managed",
]
[[package]]
name = "smoltcp"
version = "0.12.0"
source = "git+https://github.com/nymtech/smoltcp?rev=62ac5b8b3287d4773694f19a3b55e4c004354a0b#62ac5b8b3287d4773694f19a3b55e4c004354a0b"
dependencies = [
"bitflags 1.3.2",
"byteorder",
"cfg-if",
"defmt 0.3.100",
"heapless",
"libc",
"log",
"managed",
]
[[package]]
name = "snafu"
version = "0.7.5"
@@ -12642,7 +12657,7 @@ dependencies = [
"futures",
"parking_lot",
"pin-project-lite",
"smoltcp",
"smoltcp 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio",
"tokio-util",
]
@@ -15000,9 +15015,9 @@ dependencies = [
[[package]]
name = "zeroize_derive"
version = "1.4.2"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e"
dependencies = [
"proc-macro2",
"quote",
+7 -2
View File
@@ -49,8 +49,13 @@ async-tungstenite = { workspace = true, features = ["handshake"], optional = tru
# WASM utils (panic hook, console_log)
nym-wasm-utils = { workspace = true }
# Tunnel: smoltcp stack + Nym mixnet client + IPR protocol
smoltcp = { workspace = true, features = ["std", "medium-ip", "proto-ipv4", "socket-tcp", "socket-udp", "async"] }
# RTT-patched smoltcp fork, scoped to THIS crate only(not a workspace-wide `[patch.crates-io]`),
# so native `smolmix/core` builds and publishes against vanilla crates.io smoltcp untouched for
# the moment - needs further investigation whether that also suffers the same problem (probably does)
# but makes crates publication more complicated.
#
# This commit is tag `v0.12.0-nym-rtt`.
smoltcp = { git = "https://github.com/nymtech/smoltcp", rev = "62ac5b8b3287d4773694f19a3b55e4c004354a0b", features = ["std", "medium-ip", "proto-ipv4", "socket-tcp", "socket-udp", "async"] }
nym-wasm-client-core = { workspace = true }
nym-ip-packet-requests = { workspace = true }
# LP wire types (frame, header, codec)
+252 -9
View File
@@ -8,10 +8,12 @@
//! `Device` impl instead. The bridge pushes incoming IP packets into `rx_queue`
//! and pops outgoing packets from `tx_queue`.
use std::collections::VecDeque;
use std::collections::{BTreeSet, VecDeque};
use smoltcp::phy::{Device, DeviceCapabilities, Medium, RxToken, TxToken};
use smoltcp::time::Instant;
use smoltcp::wire::{IpAddress, IpProtocol, Ipv4Address, Ipv4Packet, TcpPacket};
use std::collections::BTreeMap;
/// smoltcp device backed by in-memory packet queues.
///
@@ -22,19 +24,18 @@ pub struct WasmDevice {
rx_queue: VecDeque<Vec<u8>>,
tx_queue: VecDeque<Vec<u8>>,
capabilities: DeviceCapabilities,
rx_stats: RxStats,
tx_stats: TxStats,
}
impl WasmDevice {
pub fn new() -> Self {
let mut capabilities = DeviceCapabilities::default();
capabilities.medium = Medium::Ip;
// Sized so one IP packet fits in one sphinx packet payload (no
// chunking-layer fragmentation). Budget in bytes from the 2048 B
// sphinx plaintext: 344 (SURB-ack) 32 (x25519 ephemeral key,
// Repliable msgs) 7 (frag header) 1 (padding) 53 (LP+IPR
// framing + AEAD) ≈ 1611. 1600 leaves ~11 B headroom for IPR
// overhead variability.
capabilities.max_transmission_unit = 1600;
// Match the standard Ethernet MTU (1500). 1500 keeps a
// packet (plus its LP/IPR framing and SURB headers) within a single
// Sphinx packet's payload, avoiding LP-layer fragmentation.
capabilities.max_transmission_unit = 1500;
// Native smolmix also uses Some(1) in the device, but tokio-smoltcp
// compensates with a burst loop that calls Interface::poll() up to 100
// times per reactor iteration (each processing 1 packet). Our WASM
@@ -46,11 +47,26 @@ impl WasmDevice {
rx_queue: VecDeque::new(),
tx_queue: VecDeque::new(),
capabilities,
rx_stats: RxStats::default(),
tx_stats: TxStats::default(),
}
}
/// Push an incoming IP packet (from the mixnet) into the receive queue.
pub fn push_rx(&mut self, packet: Vec<u8>) {
// Aggregate diagnostic (replaces the former per-packet log). Only folds
// when runtime debug logging is on, so the production path stays a bare
// enqueue: one atomic load, no parsing. The window is flushed on a
// connection lifecycle flag or when it fills; see `RxStats`.
if nym_wasm_utils::debug_logging_enabled() {
if let Some(pkt) = parse_inbound_tcp(&packet) {
let lifecycle = pkt.syn || pkt.rst || pkt.fin;
self.rx_stats.record(&pkt);
if lifecycle || self.rx_stats.segments >= RX_STATS_WINDOW {
self.rx_stats.flush();
}
}
}
self.rx_queue.push_back(packet);
}
@@ -70,6 +86,7 @@ impl Device for WasmDevice {
WasmRxToken { buffer: packet },
WasmTxToken {
queue: &mut self.tx_queue,
stats: &mut self.tx_stats,
},
))
}
@@ -77,6 +94,7 @@ impl Device for WasmDevice {
fn transmit(&mut self, _timestamp: Instant) -> Option<Self::TxToken<'_>> {
Some(WasmTxToken {
queue: &mut self.tx_queue,
stats: &mut self.tx_stats,
})
}
@@ -99,9 +117,223 @@ impl RxToken for WasmRxToken {
}
}
/// Flush the rx aggregate after this many segments even when no lifecycle flag
/// has appeared, so a long bulk transfer still reports periodically rather than
/// only at teardown.
const RX_STATS_WINDOW: u32 = 256;
/// Fields aggregated from one inbound TCP packet.
struct InboundTcp {
/// Our local ephemeral port: which client socket this belongs to. It is the
/// discriminator for "one connection or several?". Each tcp_connect (and
/// each connect retry) uses a fresh port, so a window touching several
/// dports means several of our own connections, not one flow.
dport: u16,
payload_len: usize,
cksum_ok: bool,
syn: bool,
rst: bool,
fin: bool,
}
/// Parse the fields we aggregate from one inbound packet. IPv4/TCP only:
/// IPv6 and non-TCP return `None` and are simply not counted. A bad checksum
/// is recorded (`cksum_ok = false`), not dropped, since seeing it is the point.
fn parse_inbound_tcp(buf: &[u8]) -> Option<InboundTcp> {
let ip = Ipv4Packet::new_checked(buf).ok()?;
let ip_ok = ip.verify_checksum();
if ip.next_header() != IpProtocol::Tcp {
return None;
}
let src = IpAddress::Ipv4(ip.src_addr());
let dst = IpAddress::Ipv4(ip.dst_addr());
let tcp = TcpPacket::new_checked(ip.payload()).ok()?;
Some(InboundTcp {
dport: tcp.dst_port(),
payload_len: tcp.payload().len(),
cksum_ok: ip_ok && tcp.verify_checksum(&src, &dst),
syn: tcp.syn(),
rst: tcp.rst(),
fin: tcp.fin(),
})
}
/// Rolling aggregate of inbound TCP packets, flushed to the debug log on a
/// connection lifecycle flag (SYN/RST/FIN) or when [`RX_STATS_WINDOW`] segments
/// accumulate. Replaces the former per-packet firehose: it keeps the same
/// investigative signal (segment and byte counts, checksum failures, how many
/// distinct local ports, ie how many connections, and the handshake/teardown
/// flags) without a line per packet.
///
/// Only touched when runtime debug logging is on. A *silent* stall (the flight
/// just stops, no RST) won't flush, since the device has no timer; the RST that
/// usually ends a stalled flight over the mixnet does trigger a flush.
#[derive(Default)]
struct RxStats {
segments: u32,
bytes: usize,
bad_cksum: u32,
syn: u32,
rst: u32,
fin: u32,
dports: BTreeSet<u16>,
}
impl RxStats {
/// Fold one parsed inbound packet into the current window.
fn record(&mut self, pkt: &InboundTcp) {
self.segments += 1;
self.bytes += pkt.payload_len;
if !pkt.cksum_ok {
self.bad_cksum += 1;
}
self.syn += u32::from(pkt.syn);
self.rst += u32::from(pkt.rst);
self.fin += u32::from(pkt.fin);
self.dports.insert(pkt.dport);
}
/// Emit the window summary and reset. No-op on an empty window.
///
/// How to read a line:
/// seg inbound TCP segments in this window
/// payload bytes total TCP payload across those segments (no headers); a
/// climbing total with no completion means a stalled flight
/// bad cksum segments that failed IP or TCP checksum; should stay 0.
/// Nonzero means corruption on the path, not just loss
/// dports distinct local ports touched, ie how many of our own
/// connections this window spans (retries open fresh ports)
/// S / R / F SYN / RST / FIN counts. R>0 is the handshake-era failure
/// signature (connection reset); a clean flow is S then F,
/// R=0 throughout
fn flush(&mut self) {
if self.segments == 0 {
return;
}
crate::util::debug_log!(
"[device] rx window: {} seg, {} payload bytes, {} bad cksum, {} dports, S={} R={} F={}",
self.segments,
self.bytes,
self.bad_cksum,
self.dports.len(),
self.syn,
self.rst,
self.fin,
);
*self = RxStats::default();
}
}
/// Flush the tx aggregate after this many segments, mirroring the rx window.
const TX_STATS_WINDOW: u32 = 256;
/// Fields aggregated from one outbound TCP packet.
struct OutboundTcp {
/// Our local ephemeral port: smoltcp keeps it stable for a connection's
/// life, so it identifies the flow. This is the field an IPR egress capture
/// is correlated against (the exit's NAT may remap it on the way out).
src_port: u16,
/// (destination address, destination port).
dst: (Ipv4Address, u16),
payload_len: usize,
syn: bool,
rst: bool,
fin: bool,
}
/// Parse the fields we aggregate from one outbound packet. IPv4/TCP only.
/// No checksum check: smoltcp fills those in after we observe the packet.
fn parse_outbound_tcp(buf: &[u8]) -> Option<OutboundTcp> {
let ip = Ipv4Packet::new_checked(buf).ok()?;
if ip.next_header() != IpProtocol::Tcp {
return None;
}
let tcp = TcpPacket::new_checked(ip.payload()).ok()?;
Some(OutboundTcp {
src_port: tcp.src_port(),
dst: (ip.dst_addr(), tcp.dst_port()),
payload_len: tcp.payload().len(),
syn: tcp.syn(),
rst: tcp.rst(),
fin: tcp.fin(),
})
}
/// Rolling aggregate of outbound TCP packets, flushed on a connection lifecycle
/// flag (SYN/RST/FIN) or when [`TX_STATS_WINDOW`] segments accumulate. Replaces
/// the former per-packet tx log.
///
/// The outbound diagnostic is about source-port stability: each flow's
/// `src_port -> dst` mapping is the data an egress capture is correlated
/// against. smoltcp keeps that port stable for us, so this stays quiet, listing
/// the flows seen each window. `port_reuses` counts a `src_port` re-pointing at
/// a different dst within one window: usually a legitimate fast close-and-reopen
/// reusing the port, but a sustained nonzero count is worth a look (the port
/// allocator handing out a still-live port).
///
/// Only touched when runtime debug logging is on.
#[derive(Default)]
struct TxStats {
segments: u32,
bytes: usize,
syn: u32,
rst: u32,
fin: u32,
/// `src_port -> dst` seen this window: the correlation mapping, and the
/// basis for the reuse check.
flows: BTreeMap<u16, (Ipv4Address, u16)>,
port_reuses: u32,
}
impl TxStats {
/// Fold one parsed outbound packet into the current window.
fn record(&mut self, pkt: &OutboundTcp) {
self.segments += 1;
self.bytes += pkt.payload_len;
self.syn += u32::from(pkt.syn);
self.rst += u32::from(pkt.rst);
self.fin += u32::from(pkt.fin);
if let Some(prev) = self.flows.insert(pkt.src_port, pkt.dst) {
if prev != pkt.dst {
self.port_reuses += 1;
}
}
}
/// Emit the window summary and reset. No-op on an empty window.
///
/// seg / bytes outbound segments and total payload this window
/// flows `src_port->dst` per flow, for egress-capture correlation
/// port-reuse a src_port re-pointed at a new dst mid-window (see above)
/// S / R / F SYN / RST / FIN counts we sent
fn flush(&mut self) {
if self.segments == 0 {
return;
}
let flows: Vec<String> = self
.flows
.iter()
.map(|(sport, (ip, dport))| format!("{sport}->{ip}:{dport}"))
.collect();
crate::util::debug_log!(
"[device] tx window: {} seg, {} payload bytes, {} flows [{}], {} port-reuse, S={} R={} F={}",
self.segments,
self.bytes,
self.flows.len(),
flows.join(" "),
self.port_reuses,
self.syn,
self.rst,
self.fin,
);
*self = TxStats::default();
}
}
/// Transmit token: captures one packet from smoltcp into the tx queue.
pub struct WasmTxToken<'a> {
queue: &'a mut VecDeque<Vec<u8>>,
stats: &'a mut TxStats,
}
impl<'a> TxToken for WasmTxToken<'a> {
@@ -111,6 +343,17 @@ impl<'a> TxToken for WasmTxToken<'a> {
{
let mut buffer = vec![0u8; len];
let result = f(&mut buffer);
// Aggregate diagnostic (see TxStats). Only folds when runtime debug
// logging is on, so the production path stays a bare enqueue.
if nym_wasm_utils::debug_logging_enabled() {
if let Some(pkt) = parse_outbound_tcp(&buffer) {
let lifecycle = pkt.syn || pkt.rst || pkt.fin;
self.stats.record(&pkt);
if lifecycle || self.stats.segments >= TX_STATS_WINDOW {
self.stats.flush();
}
}
}
self.queue.push_back(buffer);
result
}
@@ -157,6 +400,6 @@ mod tests {
let dev = WasmDevice::new();
let caps = dev.capabilities();
assert_eq!(caps.medium, Medium::Ip);
assert_eq!(caps.max_transmission_unit, 1980);
assert_eq!(caps.max_transmission_unit, 1500);
}
}
+60 -1
View File
@@ -5,6 +5,7 @@
//! plus (under the `fetch` feature) the HTTP orchestration + JS `RequestInit` shim.
use std::net::SocketAddr;
use std::time::Duration;
use crate::dns;
use crate::error::FetchError;
@@ -191,16 +192,74 @@ pub async fn fetch(
)))
}
/// Create a fresh connection: DNS resolve → TCP connect → optional TLS.
/// Connect + TLS-handshake attempts on fresh sockets before giving up.
const CONNECT_ATTEMPTS: u32 = 3;
/// Base delay between connect retries, multiplied by the attempt number for a
/// mild linear backoff (50ms before the 2nd attempt, 100ms before the 3rd).
///
/// Kept small on purpose: the transient failure here is a lost handshake
/// segment, not congestion that needs time to clear.
/// The backoff only avoids re-launching a fresh socket the instant
/// the previous one reset.
const CONNECT_BACKOFF_MS: u64 = 50;
/// Create a fresh connection: DNS resolve → TCP connect → optional TLS, with
/// retry on transient failure.
///
/// The mixnet reorders and drops packets, so a single multi-segment handshake
/// flight (the server's certificate) can stall on a lost segment and the
/// connection resets with a handshake EOF. A fresh socket (new ephemeral port,
/// new sphinx packets) usually dodges the specific loss. Connect + handshake
/// send no application data, so retrying them carries no idempotency risk for
/// any HTTP method.
pub(crate) async fn new_connection(
tunnel: &WasmTunnel,
host: &str,
port: u16,
is_https: bool,
) -> Result<PooledConn, FetchError> {
// Resolve once: a DNS failure is not transient the way a lost handshake
// segment is, and re-resolving per attempt would just repeat the lookup.
let ip = dns::resolve(tunnel, host).await?;
let addr = SocketAddr::new(ip, port);
let mut last_err = None;
for attempt in 1..=CONNECT_ATTEMPTS {
match connect_once(tunnel, addr, host, is_https).await {
Ok(conn) => return Ok(conn),
// Only connect / handshake I/O errors are transient over the
// mixnet. A bad TLS server name or other non-I/O error fails
// identically on retry, so propagate it immediately.
Err(e @ FetchError::Io(_)) => {
if attempt < CONNECT_ATTEMPTS {
crate::util::debug_log!(
"[fetch] connect attempt {attempt}/{CONNECT_ATTEMPTS} to '{host}' failed ({e}), retrying with fresh connection"
);
wasmtimer::tokio::sleep(Duration::from_millis(
attempt as u64 * CONNECT_BACKOFF_MS,
))
.await;
} else {
crate::util::debug_error!(
"[fetch] connect to '{host}' failed after {CONNECT_ATTEMPTS} attempts: {e}"
);
}
last_err = Some(e);
}
Err(e) => return Err(e),
}
}
Err(last_err.expect("loop body runs at least once"))
}
/// One connect + optional TLS handshake on a fresh socket.
async fn connect_once(
tunnel: &WasmTunnel,
addr: SocketAddr,
host: &str,
is_https: bool,
) -> Result<PooledConn, FetchError> {
crate::util::debug_log!("[fetch] TCP connecting to {addr}...");
let tcp = tunnel.tcp_connect(addr).await.map_err(FetchError::Io)?;
crate::util::debug_log!("[fetch] TCP connected to {addr}");
+15 -7
View File
@@ -31,13 +31,21 @@ use nym_wasm_client_core::nym_task::connections::TransmissionLane;
use crate::error::FetchError;
/// Reply-SURB counts for Open and Data frames. Defaults: `open=10, data=0`.
/// Reply-SURB counts for the Open and Data frames. Defaults: `open=10, data=2`.
///
/// The Open frame seeds the IPR's SURB bucket; from there, the reply
/// controller's pre-emptive topup refills it when the bucket dips below
/// the `min_surbs_threshold` (10, per nym-client-core), so per-data-packet
/// SURBs are unnecessary in steady state. Override `data` upwards for
/// workloads that burst faster than topup round-trip can keep up.
/// `open` seeds the IPR's SURB bucket on the connect handshake. `data` is the
/// number of reply-SURBs attached to every packet we send (including TCP ACKs);
/// it funds the IPR's return traffic for the connection.
///
/// `data` is deliberately small, and raising it has a cost that is easy to
/// miss. A reply-SURB is not a flag on the packet: it is a full layer-encrypted
/// return header that travels as forward payload, and each Sphinx packet has a fixed
/// payload budget.
///
/// Return capacity for downloads does not need a large `data`: every ACK we send
/// during a transfer carries `data` SURBs, so capacity scales with the ACK rate
/// (which scales with the download rate), and the reply controller's pre-emptive
/// topup refills the bucket besides.
#[derive(Clone, Copy)]
pub struct SurbsConfig {
pub open: u32,
@@ -46,7 +54,7 @@ pub struct SurbsConfig {
impl Default for SurbsConfig {
fn default() -> Self {
Self { open: 10, data: 0 }
Self { open: 10, data: 2 }
}
}
+2 -1
View File
@@ -81,9 +81,10 @@ impl AsyncRead for WasmTcpStream {
Poll::Ready(Ok(0))
} else {
crate::util::debug_log!(
"[tcp:read] Pending (state={:?}, buf={})",
"[tcp:read] Pending (state={:?}, buf={}, recv_queue={})",
socket.state(),
buf.len(),
socket.recv_queue(),
);
// smoltcp wakes this waker on any state change affecting `recv`,
// including FIN/CloseWait transitions that produce EOF.
+1 -1
View File
@@ -56,7 +56,7 @@ pub struct TunnelOpts {
/// Disable cover traffic loop (default: `false`).
pub disable_cover_traffic: bool,
/// Reply-SURB counts for the LP Open frame and each Data frame the
/// bridge sends. See [`ipr::SurbsConfig`]. Defaults to open=5, data=2.
/// bridge sends. See [`ipr::SurbsConfig`] for the values and rationale.
pub surbs: ipr::SurbsConfig,
/// Primary DNS resolver. `None` falls back to [`dns::DEFAULT_PRIMARY_DNS`].
pub primary_dns: Option<SocketAddr>,