nym: major transport speedup (throughput + reuse), money path unchanged
The in-process smolmix tunnel was far slower than the old SOCKS5 model. Fixes: - smolmix TCP buffers 8KB -> 256KB and burst 1 -> 64: bulk throughput ceiling rises ~32x (was capping relay backfill and JSON reads at a few KB/s). - Read tunnel runs a high-traffic mixnet profile (cover traffic off, higher send rate, fewer reply SURBs) for lower latency; per-hop mix delays kept. The scoped-exit money-path client is separate and unchanged. - HTTP over the mixnet now reuses connections (keep-alive pool) instead of a fresh TCP+TLS+HTTP handshake per request - fixes slow price and username reads. - DNS prewarm no longer skips on cold start and serves stale-while-revalidate for known hosts, so a dial never blocks on DoT/DoH. Money path (streamexit.rs, transport.rs) byte-for-byte unchanged.
This commit is contained in:
+27
-4
@@ -867,22 +867,45 @@ async fn run_service(svc: Arc<NostrService>, wallet: Wallet) {
|
|||||||
// Prewarm mix-dns for the hosts we're about to (or will soon) hit — the
|
// Prewarm mix-dns for the hosts we're about to (or will soon) hit — the
|
||||||
// relays being dialed, the NIP-05 name authority (Claim username), and the
|
// relays being dialed, the NIP-05 name authority (Claim username), and the
|
||||||
// price API — so those resolutions are already cached by the time the user
|
// price API — so those resolutions are already cached by the time the user
|
||||||
// acts, rather than each paying a cold mixnet round trip inline. Runs off the
|
// acts, rather than each paying a cold mixnet round trip inline. The node host
|
||||||
// critical path (a raced+retried resolve is cheap); the node host is NOT here
|
// is NOT here — it never rides the mixnet.
|
||||||
// — it never rides the mixnet.
|
//
|
||||||
if let Some(tunnel) = crate::nym::nymproc::tunnel() {
|
// Unlike before this no longer silently SKIPS when the tunnel isn't up yet
|
||||||
|
// (the cold-start case that used to leave the first relay dial to a cold DoT
|
||||||
|
// round trip): it WAITS for the tunnel, prewarms, then keeps the entries hot
|
||||||
|
// by re-prewarming on a cadence below the DNS cache TTL floor, so known/stable
|
||||||
|
// hosts are refreshed in the background before they can expire.
|
||||||
|
{
|
||||||
let mut hosts: Vec<String> = relays
|
let mut hosts: Vec<String> = relays
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|r| nostr_sdk::Url::parse(r).ok())
|
.filter_map(|r| nostr_sdk::Url::parse(r).ok())
|
||||||
.filter_map(|u| u.host_str().map(|h| h.to_string()))
|
.filter_map(|u| u.host_str().map(|h| h.to_string()))
|
||||||
.collect();
|
.collect();
|
||||||
|
// The name authority, both from this service's config and the process-wide
|
||||||
|
// configured home domain (they're normally the same; dedup below folds it).
|
||||||
hosts.push(svc.config.read().home_domain());
|
hosts.push(svc.config.read().home_domain());
|
||||||
|
hosts.push(crate::nostr::nip05::home_domain());
|
||||||
hosts.push("api.coingecko.com".to_string());
|
hosts.push("api.coingecko.com".to_string());
|
||||||
hosts.retain(|h| !h.is_empty());
|
hosts.retain(|h| !h.is_empty());
|
||||||
hosts.sort();
|
hosts.sort();
|
||||||
hosts.dedup();
|
hosts.dedup();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
// Wait out the cold start rather than skipping the prewarm entirely.
|
||||||
|
let Some(tunnel) = crate::nym::nymproc::wait_for_tunnel(Duration::from_secs(60)).await
|
||||||
|
else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
crate::nym::dns::prewarm(&tunnel, &hosts).await;
|
crate::nym::dns::prewarm(&tunnel, &hosts).await;
|
||||||
|
// Keep the entries warm: re-prewarm every 45s (below the 60s TTL
|
||||||
|
// floor) so a stable host never expires out of the cache between
|
||||||
|
// uses. Picks up the current tunnel each cycle, so it survives exit
|
||||||
|
// reselects.
|
||||||
|
loop {
|
||||||
|
tokio::time::sleep(Duration::from_secs(45)).await;
|
||||||
|
if let Some(t) = crate::nym::nymproc::tunnel() {
|
||||||
|
crate::nym::dns::prewarm(&t, &hosts).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
for relay in &relays {
|
for relay in &relays {
|
||||||
|
|||||||
+83
-10
@@ -39,7 +39,7 @@
|
|||||||
//! startup, so a warm entry (not a fresh mixnet round trip) serves the common
|
//! startup, so a warm entry (not a fresh mixnet round trip) serves the common
|
||||||
//! case. IPv4-only, like the rest of the app (GRIM audit).
|
//! case. IPv4-only, like the rest of the app (GRIM audit).
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
@@ -139,10 +139,28 @@ const DOH_ROUNDS: usize = 2;
|
|||||||
const TTL_FLOOR_SECS: u32 = 60;
|
const TTL_FLOOR_SECS: u32 = 60;
|
||||||
const TTL_CEILING_SECS: u32 = 3600;
|
const TTL_CEILING_SECS: u32 = 3600;
|
||||||
|
|
||||||
|
/// TTL floor for KNOWN/stable hosts (relays, the name authority, the price API,
|
||||||
|
/// the DoT/DoH resolvers) — the ones we prewarm. Their addresses change rarely,
|
||||||
|
/// so we keep them cached at least 15 min (up to the 60-min ceiling) instead of
|
||||||
|
/// re-resolving every minute. Combined with serve-stale (below) this means a
|
||||||
|
/// dial to one of these NEVER blocks on a fresh mixnet DoT round trip.
|
||||||
|
const KNOWN_TTL_FLOOR_SECS: u32 = 900;
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
/// host → (addresses, expiry).
|
/// host → (addresses, expiry).
|
||||||
static ref CACHE: RwLock<HashMap<String, (Vec<Ipv4Addr>, Instant)>> =
|
static ref CACHE: RwLock<HashMap<String, (Vec<Ipv4Addr>, Instant)>> =
|
||||||
RwLock::new(HashMap::new());
|
RwLock::new(HashMap::new());
|
||||||
|
/// Hosts we treat as known/stable (populated by [`prewarm`]). Known hosts get
|
||||||
|
/// the longer [`KNOWN_TTL_FLOOR_SECS`] floor AND serve-stale-while-revalidate.
|
||||||
|
static ref KNOWN: RwLock<HashSet<String>> = RwLock::new(HashSet::new());
|
||||||
|
/// Hosts with a background revalidation in flight — single-flight guard so a
|
||||||
|
/// burst of dials to a stale known host spawns exactly one refresh.
|
||||||
|
static ref REFRESHING: RwLock<HashSet<String>> = RwLock::new(HashSet::new());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Whether `host` is a known/stable host (has been prewarmed at least once).
|
||||||
|
fn is_known(host: &str) -> bool {
|
||||||
|
KNOWN.read().contains(host)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Resolve `host` to a socket address for `tcp_connect`, entirely over the
|
/// Resolve `host` to a socket address for `tcp_connect`, entirely over the
|
||||||
@@ -155,9 +173,25 @@ pub async fn resolve(tunnel: &Tunnel, host: &str, port: u16) -> Option<SocketAdd
|
|||||||
if let Ok(ip) = host.parse::<IpAddr>() {
|
if let Ok(ip) = host.parse::<IpAddr>() {
|
||||||
return Some(SocketAddr::new(ip, port));
|
return Some(SocketAddr::new(ip, port));
|
||||||
}
|
}
|
||||||
if let Some(ip) = cached(host) {
|
match cache_hit(host) {
|
||||||
return Some(SocketAddr::new(IpAddr::V4(ip), port));
|
// Fresh entry: serve it, no network at all.
|
||||||
|
Some(CacheHit::Fresh(ip)) => return Some(SocketAddr::new(IpAddr::V4(ip), port)),
|
||||||
|
// SERVE-STALE-WHILE-REVALIDATE for known/stable hosts: hand back the
|
||||||
|
// last-known address immediately (so the dial never blocks on a cold DoT
|
||||||
|
// round trip) and refresh it in the background. Unknown hosts fall
|
||||||
|
// through to a blocking resolve, preserving correctness.
|
||||||
|
Some(CacheHit::Stale(ip)) if is_known(host) => {
|
||||||
|
spawn_revalidate(tunnel, host);
|
||||||
|
return Some(SocketAddr::new(IpAddr::V4(ip), port));
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
}
|
}
|
||||||
|
resolve_cold(tunnel, host, port).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The blocking DoT-then-DoH resolve, run when there is no usable cache entry.
|
||||||
|
/// Writes the cache on success.
|
||||||
|
async fn resolve_cold(tunnel: &Tunnel, host: &str, port: u16) -> Option<SocketAddr> {
|
||||||
// If a previous lookup already learned this exit blocks DoT, go straight to
|
// If a previous lookup already learned this exit blocks DoT, go straight to
|
||||||
// DoH — still entirely inside the tunnel.
|
// DoH — still entirely inside the tunnel.
|
||||||
if PREFER_DOH.load(Ordering::Acquire) {
|
if PREFER_DOH.load(Ordering::Acquire) {
|
||||||
@@ -175,6 +209,22 @@ pub async fn resolve(tunnel: &Tunnel, host: &str, port: u16) -> Option<SocketAdd
|
|||||||
resolve_via(tunnel, host, port, DnsMode::Doh).await
|
resolve_via(tunnel, host, port, DnsMode::Doh).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Kick off a background refresh of a stale known host through the current
|
||||||
|
/// tunnel, at most one in flight per host.
|
||||||
|
fn spawn_revalidate(tunnel: &Tunnel, host: &str) {
|
||||||
|
let host = host.to_string();
|
||||||
|
// Single-flight: skip if a refresh for this host is already running.
|
||||||
|
if !REFRESHING.write().insert(host.clone()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let tunnel = tunnel.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
// Port is irrelevant here — only the host-keyed cache is refreshed.
|
||||||
|
let _ = resolve_cold(&tunnel, &host, 0).await;
|
||||||
|
REFRESHING.write().remove(&host);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/// Run the round loop for one in-tunnel DNS transport, writing the cache on the
|
/// Run the round loop for one in-tunnel DNS transport, writing the cache on the
|
||||||
/// first valid answer. Shared by DoT / DoH.
|
/// first valid answer. Shared by DoT / DoH.
|
||||||
async fn resolve_via(tunnel: &Tunnel, host: &str, port: u16, mode: DnsMode) -> Option<SocketAddr> {
|
async fn resolve_via(tunnel: &Tunnel, host: &str, port: u16, mode: DnsMode) -> Option<SocketAddr> {
|
||||||
@@ -189,7 +239,14 @@ async fn resolve_via(tunnel: &Tunnel, host: &str, port: u16, mode: DnsMode) -> O
|
|||||||
DnsMode::Doh => race_doh(tunnel, host).await,
|
DnsMode::Doh => race_doh(tunnel, host).await,
|
||||||
};
|
};
|
||||||
if let Some((resolver, ips, ttl)) = answer {
|
if let Some((resolver, ips, ttl)) = answer {
|
||||||
let ttl = ttl.clamp(TTL_FLOOR_SECS, TTL_CEILING_SECS);
|
// Known/stable hosts get the longer floor so they stay cached 15-60
|
||||||
|
// min; everything else keeps the tight 60s..1h window.
|
||||||
|
let floor = if is_known(host) {
|
||||||
|
KNOWN_TTL_FLOOR_SECS
|
||||||
|
} else {
|
||||||
|
TTL_FLOOR_SECS
|
||||||
|
};
|
||||||
|
let ttl = ttl.clamp(floor, TTL_CEILING_SECS);
|
||||||
debug!(
|
debug!(
|
||||||
"{proto}: resolved {host} -> {} in {}ms (via {resolver}, round {}/{rounds}, \
|
"{proto}: resolved {host} -> {} in {}ms (via {resolver}, round {}/{rounds}, \
|
||||||
ttl {ttl}s, {} record(s))",
|
ttl {ttl}s, {} record(s))",
|
||||||
@@ -373,6 +430,13 @@ async fn query_doh(
|
|||||||
/// instead of paying the mixnet DoT round trip inline. Best-effort; the port is
|
/// instead of paying the mixnet DoT round trip inline. Best-effort; the port is
|
||||||
/// irrelevant here (only the host-keyed cache is filled) so a placeholder is used.
|
/// irrelevant here (only the host-keyed cache is filled) so a placeholder is used.
|
||||||
pub async fn prewarm(tunnel: &Tunnel, hosts: &[String]) {
|
pub async fn prewarm(tunnel: &Tunnel, hosts: &[String]) {
|
||||||
|
// Mark these as known/stable so they get the long TTL floor and serve-stale.
|
||||||
|
{
|
||||||
|
let mut known = KNOWN.write();
|
||||||
|
for host in hosts {
|
||||||
|
known.insert(host.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
let mut inflight = FuturesUnordered::new();
|
let mut inflight = FuturesUnordered::new();
|
||||||
for host in hosts {
|
for host in hosts {
|
||||||
inflight.push(resolve(tunnel, host, 0));
|
inflight.push(resolve(tunnel, host, 0));
|
||||||
@@ -380,15 +444,24 @@ pub async fn prewarm(tunnel: &Tunnel, hosts: &[String]) {
|
|||||||
while inflight.next().await.is_some() {}
|
while inflight.next().await.is_some() {}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A cached, unexpired address for `host`.
|
/// A cache lookup outcome for `host`: fresh (within TTL) or stale (expired but
|
||||||
fn cached(host: &str) -> Option<Ipv4Addr> {
|
/// still remembered, usable via serve-stale for known hosts).
|
||||||
|
enum CacheHit {
|
||||||
|
Fresh(Ipv4Addr),
|
||||||
|
Stale(Ipv4Addr),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Look up `host` in the cache, distinguishing fresh from stale entries. Returns
|
||||||
|
/// `None` only when the host has never been resolved.
|
||||||
|
fn cache_hit(host: &str) -> Option<CacheHit> {
|
||||||
let cache = CACHE.read();
|
let cache = CACHE.read();
|
||||||
let (ips, expiry) = cache.get(host)?;
|
let (ips, expiry) = cache.get(host)?;
|
||||||
if Instant::now() < *expiry {
|
let ip = ips.first().copied()?;
|
||||||
ips.first().copied()
|
Some(if Instant::now() < *expiry {
|
||||||
|
CacheHit::Fresh(ip)
|
||||||
} else {
|
} else {
|
||||||
None
|
CacheHit::Stale(ip)
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Address the liveness probe dials THROUGH the tunnel: Cloudflare's anycast
|
/// Address the liveness probe dials THROUGH the tunnel: Cloudflare's anycast
|
||||||
|
|||||||
+176
-65
@@ -37,8 +37,9 @@ pub mod nymproc;
|
|||||||
pub mod streamexit;
|
pub mod streamexit;
|
||||||
pub mod transport;
|
pub mod transport;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::collections::HashMap;
|
||||||
use std::time::Duration;
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use http_body_util::{BodyExt, Full};
|
use http_body_util::{BodyExt, Full};
|
||||||
@@ -123,78 +124,83 @@ fn redacted(url: &url::Url) -> String {
|
|||||||
url.host_str().unwrap_or("<no-host>").to_string()
|
url.host_str().unwrap_or("<no-host>").to_string()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A single HTTP/1.1 exchange over the tunnel. Returns the status, the
|
/// How long a pooled keep-alive connection may sit idle before we discard it
|
||||||
/// collected body and, for 3xx responses, the `Location` target.
|
/// rather than reuse a possibly half-dead handle (hyper's `is_closed()` catches
|
||||||
async fn request_once(
|
/// cleanly-closed ones; this bounds the silent-death window).
|
||||||
tunnel: &smolmix::Tunnel,
|
const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(60);
|
||||||
|
|
||||||
|
/// Pool key: a live HTTP/1.1 keep-alive connection is reusable only for the same
|
||||||
|
/// host, port and scheme.
|
||||||
|
#[derive(Clone, PartialEq, Eq, Hash)]
|
||||||
|
struct ConnKey {
|
||||||
|
host: String,
|
||||||
|
port: u16,
|
||||||
|
https: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A pooled hyper request handle. The body type matches [`request_once`]'s.
|
||||||
|
type HttpSender = hyper::client::conn::http1::SendRequest<Full<Bytes>>;
|
||||||
|
|
||||||
|
struct Pooled {
|
||||||
|
sender: HttpSender,
|
||||||
|
idle_since: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
lazy_static::lazy_static! {
|
||||||
|
/// Idle keep-alive connections, keyed by (host, port, https). A sender is
|
||||||
|
/// REMOVED while in use and reinserted when the exchange finishes, so the map
|
||||||
|
/// only ever holds idle handles and the lock is never held across an await.
|
||||||
|
static ref CONN_POOL: Mutex<HashMap<ConnKey, Pooled>> = Mutex::new(HashMap::new());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Take a live, non-idle-expired pooled sender for `key`, if one exists. A
|
||||||
|
/// closed or stale handle is dropped (tearing down its connection) and `None`
|
||||||
|
/// returned so the caller builds a fresh one.
|
||||||
|
fn take_pooled(key: &ConnKey) -> Option<HttpSender> {
|
||||||
|
let mut pool = CONN_POOL.lock().ok()?;
|
||||||
|
let pooled = pool.remove(key)?;
|
||||||
|
if pooled.sender.is_closed() || pooled.idle_since.elapsed() >= POOL_IDLE_TIMEOUT {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Some(pooled.sender)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return a still-live sender to the pool for the next request to reuse.
|
||||||
|
fn store_pooled(key: ConnKey, sender: HttpSender) {
|
||||||
|
if sender.is_closed() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if let Ok(mut pool) = CONN_POOL.lock() {
|
||||||
|
pool.insert(
|
||||||
|
key,
|
||||||
|
Pooled {
|
||||||
|
sender,
|
||||||
|
idle_since: Instant::now(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send one request/response exchange on `sender`. On success returns the parsed
|
||||||
|
/// `(status, body, location)` AND the sender (drained and ready for the next
|
||||||
|
/// request, so the caller can pool it). `None` if the connection failed.
|
||||||
|
async fn exchange(
|
||||||
|
mut sender: HttpSender,
|
||||||
method: &str,
|
method: &str,
|
||||||
url: &url::Url,
|
url: &url::Url,
|
||||||
body: Option<Vec<u8>>,
|
body: Option<Vec<u8>>,
|
||||||
headers: &[(String, String)],
|
headers: &[(String, String)],
|
||||||
) -> Option<(u16, Vec<u8>, Option<String>)> {
|
host: &str,
|
||||||
let host = url.host_str()?.to_string();
|
https: bool,
|
||||||
let https = url.scheme() == "https";
|
port: u16,
|
||||||
let port = url.port().unwrap_or(if https { 443 } else { 80 });
|
) -> Option<((u16, Vec<u8>, Option<String>), HttpSender)> {
|
||||||
|
|
||||||
// TUNNEL-FIRST for HTTP. NIP-11/HTTP is PUBLIC data (relay docs, price, name
|
|
||||||
// authority) and both egresses are mixnet-private, so in steady state we ride
|
|
||||||
// the already-warm tunnel — opening a fresh MixnetStream + settle to a scoped
|
|
||||||
// exit PER request was pure latency here. Only when the tunnel isn't up yet
|
|
||||||
// (`!is_ready()`) do we fall to a host's co-located scoped exit to avoid a cold
|
|
||||||
// wait; failure there just falls through to the tunnel path below. transport.rs
|
|
||||||
// (relay websockets) stays exit-first and is untouched — this is the HTTP path
|
|
||||||
// only.
|
|
||||||
let exit_io = if https && !nymproc::is_ready() {
|
|
||||||
match crate::nostr::pool::load().exit_for_host(&host) {
|
|
||||||
Some(exit) => exit_connect(&host, &exit).await,
|
|
||||||
None => None,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
let io: Box<dyn Stream> = match exit_io {
|
|
||||||
Some(io) => io,
|
|
||||||
None => {
|
|
||||||
// Resolve the host over the tunnel (DoT — see dns), then dial that
|
|
||||||
// IP through the same tunnel so nothing (lookup or body) touches
|
|
||||||
// the clear.
|
|
||||||
let addr = dns::resolve(tunnel, &host, port).await?;
|
|
||||||
let tcp = match tunnel.tcp_connect(addr).await {
|
|
||||||
Ok(s) => s,
|
|
||||||
Err(e) => {
|
|
||||||
warn!("nym http: connect to {host} failed: {e}");
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if https {
|
|
||||||
match tls_connect(&host, tcp).await {
|
|
||||||
Some(tls) => Box::new(tls),
|
|
||||||
None => return None,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Box::new(tcp)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let (mut sender, conn) = hyper::client::conn::http1::handshake(TokioIo::new(io))
|
|
||||||
.await
|
|
||||||
.map_err(|e| warn!("nym http: handshake with {host} failed: {e}"))
|
|
||||||
.ok()?;
|
|
||||||
// Drive the connection until the exchange finishes; it ends itself once
|
|
||||||
// the response (and body) is done or the sender is dropped.
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let _ = conn.await;
|
|
||||||
});
|
|
||||||
|
|
||||||
let m = hyper::Method::from_bytes(method.as_bytes()).ok()?;
|
let m = hyper::Method::from_bytes(method.as_bytes()).ok()?;
|
||||||
let path = match url.query() {
|
let path = match url.query() {
|
||||||
Some(q) => format!("{}?{q}", url.path()),
|
Some(q) => format!("{}?{q}", url.path()),
|
||||||
None => url.path().to_string(),
|
None => url.path().to_string(),
|
||||||
};
|
};
|
||||||
let host_header = if (https && port == 443) || (!https && port == 80) {
|
let host_header = if (https && port == 443) || (!https && port == 80) {
|
||||||
host.clone()
|
host.to_string()
|
||||||
} else {
|
} else {
|
||||||
format!("{host}:{port}")
|
format!("{host}:{port}")
|
||||||
};
|
};
|
||||||
@@ -225,7 +231,112 @@ async fn request_once(
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
let bytes = resp.into_body().collect().await.ok()?.to_bytes().to_vec();
|
let bytes = resp.into_body().collect().await.ok()?.to_bytes().to_vec();
|
||||||
Some((status, bytes, location))
|
Some(((status, bytes, location), sender))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A single HTTP/1.1 exchange over the tunnel. Returns the status, the
|
||||||
|
/// collected body and, for 3xx responses, the `Location` target.
|
||||||
|
async fn request_once(
|
||||||
|
tunnel: &smolmix::Tunnel,
|
||||||
|
method: &str,
|
||||||
|
url: &url::Url,
|
||||||
|
body: Option<Vec<u8>>,
|
||||||
|
headers: &[(String, String)],
|
||||||
|
) -> Option<(u16, Vec<u8>, Option<String>)> {
|
||||||
|
let host = url.host_str()?.to_string();
|
||||||
|
let https = url.scheme() == "https";
|
||||||
|
let port = url.port().unwrap_or(if https { 443 } else { 80 });
|
||||||
|
let key = ConnKey {
|
||||||
|
host: host.clone(),
|
||||||
|
port,
|
||||||
|
https,
|
||||||
|
};
|
||||||
|
|
||||||
|
// KEEP-ALIVE FAST PATH: reuse a pooled connection for this (host, port,
|
||||||
|
// https) when one is live, skipping a fresh mixnet TCP + TLS + HTTP handshake.
|
||||||
|
// This is what makes the many small reads (price, contact-name resolution)
|
||||||
|
// fast. Only steady-state tunnel connections are pooled (see below); the
|
||||||
|
// cold-start scoped-exit fallback is one-shot.
|
||||||
|
if let Some(sender) = take_pooled(&key) {
|
||||||
|
if let Some((resp, sender)) = exchange(
|
||||||
|
sender,
|
||||||
|
method,
|
||||||
|
url,
|
||||||
|
body.clone(),
|
||||||
|
headers,
|
||||||
|
&host,
|
||||||
|
https,
|
||||||
|
port,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
store_pooled(key, sender);
|
||||||
|
return Some(resp);
|
||||||
|
}
|
||||||
|
// Pooled connection died mid-exchange: fall through and build a fresh one.
|
||||||
|
}
|
||||||
|
|
||||||
|
// TUNNEL-FIRST for HTTP. NIP-11/HTTP is PUBLIC data (relay docs, price, name
|
||||||
|
// authority) and both egresses are mixnet-private, so in steady state we ride
|
||||||
|
// the already-warm tunnel — opening a fresh MixnetStream + settle to a scoped
|
||||||
|
// exit PER request was pure latency here. Only when the tunnel isn't up yet
|
||||||
|
// (`!is_ready()`) do we fall to a host's co-located scoped exit to avoid a cold
|
||||||
|
// wait; failure there just falls through to the tunnel path below. transport.rs
|
||||||
|
// (relay websockets) stays exit-first and is untouched — this is the HTTP path
|
||||||
|
// only.
|
||||||
|
let exit_io = if https && !nymproc::is_ready() {
|
||||||
|
match crate::nostr::pool::load().exit_for_host(&host) {
|
||||||
|
Some(exit) => exit_connect(&host, &exit).await,
|
||||||
|
None => None,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
// The one-shot scoped-exit fallback is NOT pooled — it's a cold-start bridge
|
||||||
|
// while the tunnel comes up. Only tunnel-borne connections go in the pool.
|
||||||
|
let poolable = exit_io.is_none();
|
||||||
|
|
||||||
|
let io: Box<dyn Stream> = match exit_io {
|
||||||
|
Some(io) => io,
|
||||||
|
None => {
|
||||||
|
// Resolve the host over the tunnel (DoT — see dns), then dial that
|
||||||
|
// IP through the same tunnel so nothing (lookup or body) touches
|
||||||
|
// the clear.
|
||||||
|
let addr = dns::resolve(tunnel, &host, port).await?;
|
||||||
|
let tcp = match tunnel.tcp_connect(addr).await {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("nym http: connect to {host} failed: {e}");
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if https {
|
||||||
|
match tls_connect(&host, tcp).await {
|
||||||
|
Some(tls) => Box::new(tls),
|
||||||
|
None => return None,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Box::new(tcp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let (sender, conn) = hyper::client::conn::http1::handshake(TokioIo::new(io))
|
||||||
|
.await
|
||||||
|
.map_err(|e| warn!("nym http: handshake with {host} failed: {e}"))
|
||||||
|
.ok()?;
|
||||||
|
// Drive the connection in the background. It stays alive for keep-alive reuse
|
||||||
|
// as long as the pooled sender is held; it ends once the sender is dropped
|
||||||
|
// (evicted from the pool) or the peer closes the connection.
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let _ = conn.await;
|
||||||
|
});
|
||||||
|
|
||||||
|
let (resp, sender) = exchange(sender, method, url, body, headers, &host, https, port).await?;
|
||||||
|
if poolable {
|
||||||
|
store_pooled(key, sender);
|
||||||
|
}
|
||||||
|
Some(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Try the scoped-exit egress for an HTTPS `host`: a MixnetStream to the
|
/// Try the scoped-exit egress for an HTTPS `host`: a MixnetStream to the
|
||||||
|
|||||||
+44
-9
@@ -33,10 +33,13 @@
|
|||||||
//! Should smolmix ever regress, the fallback design (SOCKS5 network requester
|
//! Should smolmix ever regress, the fallback design (SOCKS5 network requester
|
||||||
//! + ordered exit failover) is specified in the plan, section G14.
|
//! + ordered exit failover) is specified in the plan, section G14.
|
||||||
//!
|
//!
|
||||||
//! Cover traffic: `TunnelBuilder` has no knob today, so the first cut accepts
|
//! Cover traffic: the public READ tunnel is now backed by a tuned
|
||||||
//! smolmix defaults (cover traffic ON). The G13 low-power posture needs an
|
//! `MixnetClient` (built in [`build_tunnel`] via `IpMixStream::from_client`) on
|
||||||
//! upstream nym-sdk patch exposing `IpMixStream::from_client` so a tuned
|
//! the balanced "high default traffic volume" preset — ~250 real msgs/s, ~10 ms
|
||||||
//! `MixnetClient` (loop-cover config) can back the tunnel; revisit then.
|
//! per-hop delay, loop cover traffic effectively off. Per-hop mix delays are
|
||||||
|
//! KEPT (no `set_no_per_hop_delays`), so timing obfuscation stays on; only cover
|
||||||
|
//! traffic is reduced, for the G13 low-power posture. The MONEY-PATH scoped exit
|
||||||
|
//! ([`super::streamexit`]) is a SEPARATE client and keeps full SDK defaults.
|
||||||
|
|
||||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
@@ -651,11 +654,43 @@ fn parse_anchor(raw: &str) -> Option<Recipient> {
|
|||||||
/// back to `None` (see [`ExitSelector`]) or the single-exit SPOF — and a
|
/// back to `None` (see [`ExitSelector`]) or the single-exit SPOF — and a
|
||||||
/// single party seeing all exit traffic — comes back.
|
/// single party seeing all exit traffic — comes back.
|
||||||
async fn build_tunnel(pin: Option<Recipient>) -> Result<Tunnel, smolmix::SmolmixError> {
|
async fn build_tunnel(pin: Option<Recipient>) -> Result<Tunnel, smolmix::SmolmixError> {
|
||||||
let mut builder = Tunnel::builder();
|
use nym_sdk::DebugConfig;
|
||||||
if let Some(recipient) = pin {
|
use nym_sdk::ipr_wrapper::IpMixStream;
|
||||||
builder = builder.ipr_address(recipient);
|
use nym_sdk::mixnet::MixnetClientBuilder;
|
||||||
}
|
|
||||||
builder.build().await
|
// READ-TUNNEL ANONYMITY TUNING — PUBLIC PATH ONLY. This tunes the mixnet
|
||||||
|
// client that backs the public read tunnel (relay/NIP-11/price/DoT); the
|
||||||
|
// MONEY-PATH scoped exit (`streamexit.rs`) is a SEPARATE MixnetClient and is
|
||||||
|
// deliberately left on full SDK defaults, untouched.
|
||||||
|
//
|
||||||
|
// The "balanced" preset (mirrors `Config::set_high_default_traffic_volume`
|
||||||
|
// upstream): ~10 ms average per-hop delay, ~250 real msgs/s send rate, and
|
||||||
|
// loop cover traffic effectively disabled. Per-hop delays are KEPT ON (we do
|
||||||
|
// NOT call `set_no_per_hop_delays`) so mix-layer timing obfuscation still
|
||||||
|
// applies to this public read tunnel — the tradeoff here is reduced *cover*
|
||||||
|
// traffic, not reduced mixing.
|
||||||
|
let mut cfg = DebugConfig::default();
|
||||||
|
cfg.traffic.average_packet_delay = Duration::from_millis(10);
|
||||||
|
cfg.cover_traffic.loop_cover_traffic_average_delay = Duration::from_millis(2_000_000);
|
||||||
|
cfg.traffic.message_sending_average_delay = Duration::from_millis(4);
|
||||||
|
|
||||||
|
// Mirror the mainnet env setup the SDK's own constructors run before connect.
|
||||||
|
nym_sdk::setup_env(None::<&std::path::Path>);
|
||||||
|
let client = MixnetClientBuilder::new_ephemeral()
|
||||||
|
.debug_config(cfg)
|
||||||
|
.build()?
|
||||||
|
.connect_to_mixnet()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Pinned anchor when provided, else the auto-selected best public IPR — the
|
||||||
|
// same discovery the untuned `IpMixStream::new` path used, so anchor/fallback
|
||||||
|
// selection in `run_tunnel` is unchanged.
|
||||||
|
let ipr = match pin {
|
||||||
|
Some(recipient) => recipient,
|
||||||
|
None => IpMixStream::best_ipr().await?,
|
||||||
|
};
|
||||||
|
let stream = IpMixStream::from_client(client, ipr).await?;
|
||||||
|
Tunnel::from_stream(stream).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
Reference in New Issue
Block a user