diff --git a/src/nostr/client.rs b/src/nostr/client.rs index 765bdfc..d8f5209 100644 --- a/src/nostr/client.rs +++ b/src/nostr/client.rs @@ -867,22 +867,45 @@ async fn run_service(svc: Arc, wallet: Wallet) { // Prewarm mix-dns for the hosts we're about to (or will soon) hit — the // relays being dialed, the NIP-05 name authority (Claim username), and the // price API — so those resolutions are already cached by the time the user - // acts, rather than each paying a cold mixnet round trip inline. Runs off the - // critical path (a raced+retried resolve is cheap); the node host is NOT here - // — it never rides the mixnet. - if let Some(tunnel) = crate::nym::nymproc::tunnel() { + // acts, rather than each paying a cold mixnet round trip inline. The node host + // is NOT here — it never rides the mixnet. + // + // Unlike before this no longer silently SKIPS when the tunnel isn't up yet + // (the cold-start case that used to leave the first relay dial to a cold DoT + // round trip): it WAITS for the tunnel, prewarms, then keeps the entries hot + // by re-prewarming on a cadence below the DNS cache TTL floor, so known/stable + // hosts are refreshed in the background before they can expire. + { let mut hosts: Vec = relays .iter() .filter_map(|r| nostr_sdk::Url::parse(r).ok()) .filter_map(|u| u.host_str().map(|h| h.to_string())) .collect(); + // The name authority, both from this service's config and the process-wide + // configured home domain (they're normally the same; dedup below folds it). hosts.push(svc.config.read().home_domain()); + hosts.push(crate::nostr::nip05::home_domain()); hosts.push("api.coingecko.com".to_string()); hosts.retain(|h| !h.is_empty()); hosts.sort(); hosts.dedup(); tokio::spawn(async move { + // Wait out the cold start rather than skipping the prewarm entirely. + let Some(tunnel) = crate::nym::nymproc::wait_for_tunnel(Duration::from_secs(60)).await + else { + return; + }; crate::nym::dns::prewarm(&tunnel, &hosts).await; + // Keep the entries warm: re-prewarm every 45s (below the 60s TTL + // floor) so a stable host never expires out of the cache between + // uses. Picks up the current tunnel each cycle, so it survives exit + // reselects. + loop { + tokio::time::sleep(Duration::from_secs(45)).await; + if let Some(t) = crate::nym::nymproc::tunnel() { + crate::nym::dns::prewarm(&t, &hosts).await; + } + } }); } for relay in &relays { diff --git a/src/nym/dns.rs b/src/nym/dns.rs index 6ad89f9..4793bc3 100644 --- a/src/nym/dns.rs +++ b/src/nym/dns.rs @@ -39,7 +39,7 @@ //! startup, so a warm entry (not a fresh mixnet round trip) serves the common //! case. IPv4-only, like the rest of the app (GRIM audit). -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, Instant}; @@ -139,10 +139,28 @@ const DOH_ROUNDS: usize = 2; const TTL_FLOOR_SECS: u32 = 60; const TTL_CEILING_SECS: u32 = 3600; +/// TTL floor for KNOWN/stable hosts (relays, the name authority, the price API, +/// the DoT/DoH resolvers) — the ones we prewarm. Their addresses change rarely, +/// so we keep them cached at least 15 min (up to the 60-min ceiling) instead of +/// re-resolving every minute. Combined with serve-stale (below) this means a +/// dial to one of these NEVER blocks on a fresh mixnet DoT round trip. +const KNOWN_TTL_FLOOR_SECS: u32 = 900; + lazy_static! { /// host → (addresses, expiry). static ref CACHE: RwLock, Instant)>> = RwLock::new(HashMap::new()); + /// Hosts we treat as known/stable (populated by [`prewarm`]). Known hosts get + /// the longer [`KNOWN_TTL_FLOOR_SECS`] floor AND serve-stale-while-revalidate. + static ref KNOWN: RwLock> = RwLock::new(HashSet::new()); + /// Hosts with a background revalidation in flight — single-flight guard so a + /// burst of dials to a stale known host spawns exactly one refresh. + static ref REFRESHING: RwLock> = RwLock::new(HashSet::new()); +} + +/// Whether `host` is a known/stable host (has been prewarmed at least once). +fn is_known(host: &str) -> bool { + KNOWN.read().contains(host) } /// Resolve `host` to a socket address for `tcp_connect`, entirely over the @@ -155,9 +173,25 @@ pub async fn resolve(tunnel: &Tunnel, host: &str, port: u16) -> Option() { return Some(SocketAddr::new(ip, port)); } - if let Some(ip) = cached(host) { - return Some(SocketAddr::new(IpAddr::V4(ip), port)); + match cache_hit(host) { + // Fresh entry: serve it, no network at all. + Some(CacheHit::Fresh(ip)) => return Some(SocketAddr::new(IpAddr::V4(ip), port)), + // SERVE-STALE-WHILE-REVALIDATE for known/stable hosts: hand back the + // last-known address immediately (so the dial never blocks on a cold DoT + // round trip) and refresh it in the background. Unknown hosts fall + // through to a blocking resolve, preserving correctness. + Some(CacheHit::Stale(ip)) if is_known(host) => { + spawn_revalidate(tunnel, host); + return Some(SocketAddr::new(IpAddr::V4(ip), port)); + } + _ => {} } + resolve_cold(tunnel, host, port).await +} + +/// The blocking DoT-then-DoH resolve, run when there is no usable cache entry. +/// Writes the cache on success. +async fn resolve_cold(tunnel: &Tunnel, host: &str, port: u16) -> Option { // If a previous lookup already learned this exit blocks DoT, go straight to // DoH — still entirely inside the tunnel. if PREFER_DOH.load(Ordering::Acquire) { @@ -175,6 +209,22 @@ pub async fn resolve(tunnel: &Tunnel, host: &str, port: u16) -> Option Option { @@ -189,7 +239,14 @@ async fn resolve_via(tunnel: &Tunnel, host: &str, port: u16, mode: DnsMode) -> O DnsMode::Doh => race_doh(tunnel, host).await, }; if let Some((resolver, ips, ttl)) = answer { - let ttl = ttl.clamp(TTL_FLOOR_SECS, TTL_CEILING_SECS); + // Known/stable hosts get the longer floor so they stay cached 15-60 + // min; everything else keeps the tight 60s..1h window. + let floor = if is_known(host) { + KNOWN_TTL_FLOOR_SECS + } else { + TTL_FLOOR_SECS + }; + let ttl = ttl.clamp(floor, TTL_CEILING_SECS); debug!( "{proto}: resolved {host} -> {} in {}ms (via {resolver}, round {}/{rounds}, \ ttl {ttl}s, {} record(s))", @@ -373,6 +430,13 @@ async fn query_doh( /// instead of paying the mixnet DoT round trip inline. Best-effort; the port is /// irrelevant here (only the host-keyed cache is filled) so a placeholder is used. pub async fn prewarm(tunnel: &Tunnel, hosts: &[String]) { + // Mark these as known/stable so they get the long TTL floor and serve-stale. + { + let mut known = KNOWN.write(); + for host in hosts { + known.insert(host.clone()); + } + } let mut inflight = FuturesUnordered::new(); for host in hosts { inflight.push(resolve(tunnel, host, 0)); @@ -380,15 +444,24 @@ pub async fn prewarm(tunnel: &Tunnel, hosts: &[String]) { while inflight.next().await.is_some() {} } -/// A cached, unexpired address for `host`. -fn cached(host: &str) -> Option { +/// A cache lookup outcome for `host`: fresh (within TTL) or stale (expired but +/// still remembered, usable via serve-stale for known hosts). +enum CacheHit { + Fresh(Ipv4Addr), + Stale(Ipv4Addr), +} + +/// Look up `host` in the cache, distinguishing fresh from stale entries. Returns +/// `None` only when the host has never been resolved. +fn cache_hit(host: &str) -> Option { let cache = CACHE.read(); let (ips, expiry) = cache.get(host)?; - if Instant::now() < *expiry { - ips.first().copied() + let ip = ips.first().copied()?; + Some(if Instant::now() < *expiry { + CacheHit::Fresh(ip) } else { - None - } + CacheHit::Stale(ip) + }) } /// Address the liveness probe dials THROUGH the tunnel: Cloudflare's anycast diff --git a/src/nym/mod.rs b/src/nym/mod.rs index 2779c90..ab11af7 100644 --- a/src/nym/mod.rs +++ b/src/nym/mod.rs @@ -37,8 +37,9 @@ pub mod nymproc; pub mod streamexit; pub mod transport; -use std::sync::Arc; -use std::time::Duration; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; use bytes::Bytes; use http_body_util::{BodyExt, Full}; @@ -123,78 +124,83 @@ fn redacted(url: &url::Url) -> String { url.host_str().unwrap_or("").to_string() } -/// A single HTTP/1.1 exchange over the tunnel. Returns the status, the -/// collected body and, for 3xx responses, the `Location` target. -async fn request_once( - tunnel: &smolmix::Tunnel, +/// How long a pooled keep-alive connection may sit idle before we discard it +/// rather than reuse a possibly half-dead handle (hyper's `is_closed()` catches +/// cleanly-closed ones; this bounds the silent-death window). +const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(60); + +/// Pool key: a live HTTP/1.1 keep-alive connection is reusable only for the same +/// host, port and scheme. +#[derive(Clone, PartialEq, Eq, Hash)] +struct ConnKey { + host: String, + port: u16, + https: bool, +} + +/// A pooled hyper request handle. The body type matches [`request_once`]'s. +type HttpSender = hyper::client::conn::http1::SendRequest>; + +struct Pooled { + sender: HttpSender, + idle_since: Instant, +} + +lazy_static::lazy_static! { + /// Idle keep-alive connections, keyed by (host, port, https). A sender is + /// REMOVED while in use and reinserted when the exchange finishes, so the map + /// only ever holds idle handles and the lock is never held across an await. + static ref CONN_POOL: Mutex> = Mutex::new(HashMap::new()); +} + +/// Take a live, non-idle-expired pooled sender for `key`, if one exists. A +/// closed or stale handle is dropped (tearing down its connection) and `None` +/// returned so the caller builds a fresh one. +fn take_pooled(key: &ConnKey) -> Option { + let mut pool = CONN_POOL.lock().ok()?; + let pooled = pool.remove(key)?; + if pooled.sender.is_closed() || pooled.idle_since.elapsed() >= POOL_IDLE_TIMEOUT { + return None; + } + Some(pooled.sender) +} + +/// Return a still-live sender to the pool for the next request to reuse. +fn store_pooled(key: ConnKey, sender: HttpSender) { + if sender.is_closed() { + return; + } + if let Ok(mut pool) = CONN_POOL.lock() { + pool.insert( + key, + Pooled { + sender, + idle_since: Instant::now(), + }, + ); + } +} + +/// Send one request/response exchange on `sender`. On success returns the parsed +/// `(status, body, location)` AND the sender (drained and ready for the next +/// request, so the caller can pool it). `None` if the connection failed. +async fn exchange( + mut sender: HttpSender, method: &str, url: &url::Url, body: Option>, headers: &[(String, String)], -) -> Option<(u16, Vec, Option)> { - let host = url.host_str()?.to_string(); - let https = url.scheme() == "https"; - let port = url.port().unwrap_or(if https { 443 } else { 80 }); - - // TUNNEL-FIRST for HTTP. NIP-11/HTTP is PUBLIC data (relay docs, price, name - // authority) and both egresses are mixnet-private, so in steady state we ride - // the already-warm tunnel — opening a fresh MixnetStream + settle to a scoped - // exit PER request was pure latency here. Only when the tunnel isn't up yet - // (`!is_ready()`) do we fall to a host's co-located scoped exit to avoid a cold - // wait; failure there just falls through to the tunnel path below. transport.rs - // (relay websockets) stays exit-first and is untouched — this is the HTTP path - // only. - let exit_io = if https && !nymproc::is_ready() { - match crate::nostr::pool::load().exit_for_host(&host) { - Some(exit) => exit_connect(&host, &exit).await, - None => None, - } - } else { - None - }; - - let io: Box = match exit_io { - Some(io) => io, - None => { - // Resolve the host over the tunnel (DoT — see dns), then dial that - // IP through the same tunnel so nothing (lookup or body) touches - // the clear. - let addr = dns::resolve(tunnel, &host, port).await?; - let tcp = match tunnel.tcp_connect(addr).await { - Ok(s) => s, - Err(e) => { - warn!("nym http: connect to {host} failed: {e}"); - return None; - } - }; - if https { - match tls_connect(&host, tcp).await { - Some(tls) => Box::new(tls), - None => return None, - } - } else { - Box::new(tcp) - } - } - }; - - let (mut sender, conn) = hyper::client::conn::http1::handshake(TokioIo::new(io)) - .await - .map_err(|e| warn!("nym http: handshake with {host} failed: {e}")) - .ok()?; - // Drive the connection until the exchange finishes; it ends itself once - // the response (and body) is done or the sender is dropped. - tokio::spawn(async move { - let _ = conn.await; - }); - + host: &str, + https: bool, + port: u16, +) -> Option<((u16, Vec, Option), HttpSender)> { let m = hyper::Method::from_bytes(method.as_bytes()).ok()?; let path = match url.query() { Some(q) => format!("{}?{q}", url.path()), None => url.path().to_string(), }; let host_header = if (https && port == 443) || (!https && port == 80) { - host.clone() + host.to_string() } else { format!("{host}:{port}") }; @@ -225,7 +231,112 @@ async fn request_once( None }; let bytes = resp.into_body().collect().await.ok()?.to_bytes().to_vec(); - Some((status, bytes, location)) + Some(((status, bytes, location), sender)) +} + +/// A single HTTP/1.1 exchange over the tunnel. Returns the status, the +/// collected body and, for 3xx responses, the `Location` target. +async fn request_once( + tunnel: &smolmix::Tunnel, + method: &str, + url: &url::Url, + body: Option>, + headers: &[(String, String)], +) -> Option<(u16, Vec, Option)> { + let host = url.host_str()?.to_string(); + let https = url.scheme() == "https"; + let port = url.port().unwrap_or(if https { 443 } else { 80 }); + let key = ConnKey { + host: host.clone(), + port, + https, + }; + + // KEEP-ALIVE FAST PATH: reuse a pooled connection for this (host, port, + // https) when one is live, skipping a fresh mixnet TCP + TLS + HTTP handshake. + // This is what makes the many small reads (price, contact-name resolution) + // fast. Only steady-state tunnel connections are pooled (see below); the + // cold-start scoped-exit fallback is one-shot. + if let Some(sender) = take_pooled(&key) { + if let Some((resp, sender)) = exchange( + sender, + method, + url, + body.clone(), + headers, + &host, + https, + port, + ) + .await + { + store_pooled(key, sender); + return Some(resp); + } + // Pooled connection died mid-exchange: fall through and build a fresh one. + } + + // TUNNEL-FIRST for HTTP. NIP-11/HTTP is PUBLIC data (relay docs, price, name + // authority) and both egresses are mixnet-private, so in steady state we ride + // the already-warm tunnel — opening a fresh MixnetStream + settle to a scoped + // exit PER request was pure latency here. Only when the tunnel isn't up yet + // (`!is_ready()`) do we fall to a host's co-located scoped exit to avoid a cold + // wait; failure there just falls through to the tunnel path below. transport.rs + // (relay websockets) stays exit-first and is untouched — this is the HTTP path + // only. + let exit_io = if https && !nymproc::is_ready() { + match crate::nostr::pool::load().exit_for_host(&host) { + Some(exit) => exit_connect(&host, &exit).await, + None => None, + } + } else { + None + }; + // The one-shot scoped-exit fallback is NOT pooled — it's a cold-start bridge + // while the tunnel comes up. Only tunnel-borne connections go in the pool. + let poolable = exit_io.is_none(); + + let io: Box = match exit_io { + Some(io) => io, + None => { + // Resolve the host over the tunnel (DoT — see dns), then dial that + // IP through the same tunnel so nothing (lookup or body) touches + // the clear. + let addr = dns::resolve(tunnel, &host, port).await?; + let tcp = match tunnel.tcp_connect(addr).await { + Ok(s) => s, + Err(e) => { + warn!("nym http: connect to {host} failed: {e}"); + return None; + } + }; + if https { + match tls_connect(&host, tcp).await { + Some(tls) => Box::new(tls), + None => return None, + } + } else { + Box::new(tcp) + } + } + }; + + let (sender, conn) = hyper::client::conn::http1::handshake(TokioIo::new(io)) + .await + .map_err(|e| warn!("nym http: handshake with {host} failed: {e}")) + .ok()?; + // Drive the connection in the background. It stays alive for keep-alive reuse + // as long as the pooled sender is held; it ends once the sender is dropped + // (evicted from the pool) or the peer closes the connection. + tokio::spawn(async move { + let _ = conn.await; + }); + + let (resp, sender) = exchange(sender, method, url, body, headers, &host, https, port).await?; + if poolable { + store_pooled(key, sender); + } + Some(resp) } /// Try the scoped-exit egress for an HTTPS `host`: a MixnetStream to the diff --git a/src/nym/nymproc.rs b/src/nym/nymproc.rs index 021ae5a..9c20a04 100644 --- a/src/nym/nymproc.rs +++ b/src/nym/nymproc.rs @@ -33,10 +33,13 @@ //! Should smolmix ever regress, the fallback design (SOCKS5 network requester //! + ordered exit failover) is specified in the plan, section G14. //! -//! Cover traffic: `TunnelBuilder` has no knob today, so the first cut accepts -//! smolmix defaults (cover traffic ON). The G13 low-power posture needs an -//! upstream nym-sdk patch exposing `IpMixStream::from_client` so a tuned -//! `MixnetClient` (loop-cover config) can back the tunnel; revisit then. +//! Cover traffic: the public READ tunnel is now backed by a tuned +//! `MixnetClient` (built in [`build_tunnel`] via `IpMixStream::from_client`) on +//! the balanced "high default traffic volume" preset — ~250 real msgs/s, ~10 ms +//! per-hop delay, loop cover traffic effectively off. Per-hop mix delays are +//! KEPT (no `set_no_per_hop_delays`), so timing obfuscation stays on; only cover +//! traffic is reduced, for the G13 low-power posture. The MONEY-PATH scoped exit +//! ([`super::streamexit`]) is a SEPARATE client and keeps full SDK defaults. use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::thread; @@ -651,11 +654,43 @@ fn parse_anchor(raw: &str) -> Option { /// back to `None` (see [`ExitSelector`]) or the single-exit SPOF — and a /// single party seeing all exit traffic — comes back. async fn build_tunnel(pin: Option) -> Result { - let mut builder = Tunnel::builder(); - if let Some(recipient) = pin { - builder = builder.ipr_address(recipient); - } - builder.build().await + use nym_sdk::DebugConfig; + use nym_sdk::ipr_wrapper::IpMixStream; + use nym_sdk::mixnet::MixnetClientBuilder; + + // READ-TUNNEL ANONYMITY TUNING — PUBLIC PATH ONLY. This tunes the mixnet + // client that backs the public read tunnel (relay/NIP-11/price/DoT); the + // MONEY-PATH scoped exit (`streamexit.rs`) is a SEPARATE MixnetClient and is + // deliberately left on full SDK defaults, untouched. + // + // The "balanced" preset (mirrors `Config::set_high_default_traffic_volume` + // upstream): ~10 ms average per-hop delay, ~250 real msgs/s send rate, and + // loop cover traffic effectively disabled. Per-hop delays are KEPT ON (we do + // NOT call `set_no_per_hop_delays`) so mix-layer timing obfuscation still + // applies to this public read tunnel — the tradeoff here is reduced *cover* + // traffic, not reduced mixing. + let mut cfg = DebugConfig::default(); + cfg.traffic.average_packet_delay = Duration::from_millis(10); + cfg.cover_traffic.loop_cover_traffic_average_delay = Duration::from_millis(2_000_000); + cfg.traffic.message_sending_average_delay = Duration::from_millis(4); + + // Mirror the mainnet env setup the SDK's own constructors run before connect. + nym_sdk::setup_env(None::<&std::path::Path>); + let client = MixnetClientBuilder::new_ephemeral() + .debug_config(cfg) + .build()? + .connect_to_mixnet() + .await?; + + // Pinned anchor when provided, else the auto-selected best public IPR — the + // same discovery the untuned `IpMixStream::new` path used, so anchor/fallback + // selection in `run_tunnel` is unchanged. + let ipr = match pin { + Some(recipient) => recipient, + None => IpMixStream::best_ipr().await?, + }; + let stream = IpMixStream::from_client(client, ipr).await?; + Tunnel::from_stream(stream).await } #[cfg(test)]