1
0
forked from GRIN/grim

Goblin Build 123: fast relay connect after app updates

Fix: a relay-pool cache written by an older build parses fine but lacks
the scoped-exit addresses, which silently disabled the fast money path
for up to 7 days after an update - relay connects rode the slow public
path for minutes. The cache is now ignored and replaced when it lacks
exits, and the pinned pool takes over immediately.

Fix: the scoped-exit mixnet client now prewarms at cold start, so the
sequencer's head start is real (it previously waited on a client that
nothing had started until the first relay dial).

Build: wallet submodule repinned to the upstream grim branch tip
(c2db754) - the previous pin was deleted upstream, breaking CI checkout
and fresh clones. Policy: submodules stay pinned to GRIM's sources.

Lean: drop verified-dead code (avatar upload pipeline, legacy UDP DNS
path, legacy watchdog, duplicate TLS config, one-use error type, dead
store helpers).
This commit is contained in:
2ro
2026-07-02 14:34:13 -04:00
parent 5869ff78be
commit 71bf9b90e5
12 changed files with 97 additions and 303 deletions
+2 -2
View File
@@ -46,12 +46,12 @@ fn main() {
if cfg!(target_os = "windows") {
Command::new("cmd")
.args(&["/C", &git_hooks])
.args(["/C", &git_hooks])
.output()
.expect("failed to execute git config for hooks");
} else {
Command::new("sh")
.args(&["-c", &git_hooks])
.args(["-c", &git_hooks])
.output()
.expect("failed to execute git config for hooks");
}
-9
View File
@@ -115,15 +115,6 @@ impl AvatarTextures {
None
}
/// Install the just-uploaded avatar without waiting for a round-trip.
pub fn set_own(&mut self, ctx: &egui::Context, name: &str, hash: &str, png: &[u8]) {
let name = name.trim_start_matches('@').to_lowercase();
self.cache.store(&name, hash, png);
let tex = decode(png)
.map(|img| ctx.load_texture(format!("avatar_{name}"), img, Default::default()));
self.textures.insert(name, tex);
}
/// Forget a name (released or rotated away).
pub fn invalidate(&mut self, name: &str) {
let name = name.trim_start_matches('@').to_lowercase();
+2 -93
View File
@@ -12,77 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Client-side avatar handling: local preprocessing of a picked picture
//! (mirrors the server pipeline so uploads over the mixnet stay small and previews
//! are instant — the server still re-validates everything), plus a small
//! disk cache of fetched avatars keyed by username.
//! Client-side avatar handling: a small disk cache of fetched avatars keyed
//! by username.
use image::codecs::png::PngEncoder;
use image::metadata::Orientation;
use image::{DynamicImage, ImageDecoder, ImageFormat, ImageReader, Limits};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::Cursor;
use std::path::PathBuf;
/// Output dimensions (square), matching the server.
pub const SIZE: u32 = 256;
/// Raw picked files larger than this are rejected before decoding.
const MAX_FILE_BYTES: u64 = 10 * 1024 * 1024;
/// Identify the image format from magic bytes alone (PNG/JPEG/WebP).
fn sniff(raw: &[u8]) -> Option<ImageFormat> {
if raw.len() >= 8 && raw.starts_with(&[0x89, b'P', b'N', b'G', 0x0D, 0x0A, 0x1A, 0x0A]) {
return Some(ImageFormat::Png);
}
if raw.len() >= 3 && raw.starts_with(&[0xFF, 0xD8, 0xFF]) {
return Some(ImageFormat::Jpeg);
}
if raw.len() >= 12 && &raw[0..4] == b"RIFF" && &raw[8..12] == b"WEBP" {
return Some(ImageFormat::WebP);
}
None
}
/// Read a picked picture file and normalize it to the canonical 256×256
/// PNG (EXIF orientation applied, every byte of metadata destroyed).
pub fn process_avatar_file(path: &str) -> Result<Vec<u8>, String> {
let meta = std::fs::metadata(path).map_err(|_| "Couldn't read that file".to_string())?;
if meta.len() > MAX_FILE_BYTES {
return Err("That picture is too large (10 MB max)".to_string());
}
let raw = std::fs::read(path).map_err(|_| "Couldn't read that file".to_string())?;
process_avatar_bytes(&raw)
}
/// Normalize raw image bytes to the canonical avatar PNG.
pub fn process_avatar_bytes(raw: &[u8]) -> Result<Vec<u8>, String> {
let err = || "That file doesn't look like a usable picture".to_string();
let format = sniff(raw).ok_or_else(err)?;
let mut reader = ImageReader::with_format(Cursor::new(raw), format);
let mut limits = Limits::default();
limits.max_image_width = Some(8192);
limits.max_image_height = Some(8192);
limits.max_alloc = Some(128 * 1024 * 1024);
reader.limits(limits);
let mut decoder = reader.into_decoder().map_err(|_| err())?;
let orientation = decoder.orientation().unwrap_or(Orientation::NoTransforms);
let mut img = DynamicImage::from_decoder(decoder).map_err(|_| err())?;
img.apply_orientation(orientation);
let (w, h) = (img.width(), img.height());
if w == 0 || h == 0 {
return Err(err());
}
let side = w.min(h);
let img = img.crop_imm((w - side) / 2, (h - side) / 2, side, side);
let img = img.resize_exact(SIZE, SIZE, image::imageops::FilterType::Lanczos3);
let rgba = img.to_rgba8();
let mut out = Vec::new();
rgba.write_with_encoder(PngEncoder::new(&mut out))
.map_err(|_| err())?;
Ok(out)
}
/// One cached profile probe.
#[derive(Serialize, Deserialize, Clone)]
pub struct CacheEntry {
@@ -196,33 +132,6 @@ impl AvatarCache {
#[cfg(test)]
mod tests {
use super::*;
use image::RgbaImage;
fn png_bytes(w: u32, h: u32) -> Vec<u8> {
let img = RgbaImage::from_fn(w, h, |x, y| {
image::Rgba([(x % 256) as u8, (y % 256) as u8, 7, 255])
});
let mut out = Vec::new();
image::DynamicImage::ImageRgba8(img)
.write_with_encoder(PngEncoder::new(&mut out))
.unwrap();
out
}
#[test]
fn processes_to_canonical_png() {
let out = process_avatar_bytes(&png_bytes(500, 300)).unwrap();
assert!(out.starts_with(&[0x89, b'P', b'N', b'G']));
let img = image::load_from_memory(&out).unwrap();
assert_eq!((img.width(), img.height()), (SIZE, SIZE));
}
#[test]
fn rejects_non_images() {
assert!(process_avatar_bytes(b"<svg onload=alert(1)></svg>").is_err());
assert!(process_avatar_bytes(b"GIF89a....").is_err());
assert!(process_avatar_bytes(&[]).is_err());
}
#[test]
fn cache_round_trip_and_remove() {
+1 -16
View File
@@ -32,7 +32,7 @@ pub enum AcceptPolicy {
}
/// Per-wallet nostr configuration.
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct NostrConfig {
/// Whether the nostr subsystem runs for this wallet.
enabled: Option<bool>,
@@ -59,21 +59,6 @@ pub struct NostrConfig {
path: Option<PathBuf>,
}
impl Default for NostrConfig {
fn default() -> Self {
Self {
enabled: None,
relays: None,
accept_from: None,
nip05_server: None,
expiry_secs: None,
cancel_grace_secs: None,
allow_incoming_requests: None,
path: None,
}
}
}
impl NostrConfig {
/// Nostr configuration file name inside the wallet directory.
pub const FILE_NAME: &'static str = "nostr.toml";
+14 -1
View File
@@ -203,6 +203,12 @@ pub fn load() -> RelayPool {
std::fs::read_to_string(cache_path())
.ok()
.and_then(|raw| RelayPool::parse(&raw))
// A cache written by a pre-exit build parses fine but hides the
// scoped-exit money path (and the current primary relay) for up to
// CACHE_MAX_AGE_SECS after an app update — relay connects then ride
// the slow public-IPR path for days. The pinned pool is newer than
// any exit-less file, so prefer it until the next gist refresh.
.filter(RelayPool::has_exit)
.unwrap_or_else(|| RelayPool::parse(PINNED_POOL).expect("pinned pool parses"))
}
@@ -220,7 +226,14 @@ pub async fn refresh_if_stale() {
.and_then(|m| m.modified().ok())
.and_then(|t| t.elapsed().ok())
.map(|age| age.as_secs() < CACHE_MAX_AGE_SECS)
.unwrap_or(false);
.unwrap_or(false)
// An exit-less cache predates the current pool shape (see `load`,
// which already ignores it) — replace it now instead of serving the
// pinned fallback for the rest of the file's 7 days.
&& std::fs::read_to_string(&path)
.ok()
.and_then(|raw| RelayPool::parse(&raw))
.is_some_and(|p| p.has_exit());
if fresh {
return;
}
-11
View File
@@ -170,10 +170,6 @@ impl NostrStore {
self.put_json(&self.contacts, &contact.npub, contact);
}
pub fn delete_contact(&self, npub_hex: &str) {
self.delete(&self.contacts, npub_hex);
}
pub fn all_contacts(&self) -> Vec<Contact> {
self.all_json(&self.contacts)
}
@@ -312,11 +308,4 @@ impl NostrStore {
self.clear(&self.requests);
self.clear(&self.processed);
}
/// Wipe everything including contacts.
pub fn wipe_all(&self) {
self.wipe_archive();
self.clear(&self.contacts);
self.clear(&self.settings);
}
}
+4 -98
View File
@@ -38,13 +38,9 @@
//! Answers land in a TTL-respecting in-memory cache and hosts are prewarmed at
//! startup, so a warm entry (not a fresh mixnet round trip) serves the common
//! case. IPv4-only, like the rest of the app (GRIM audit).
//!
//! A legacy UDP path is retained behind `GOBLIN_DNS_UDP=1` for measuring the
//! regression this replaced; it is never used in shipped builds.
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
@@ -82,12 +78,6 @@ const DOT_RESOLVERS: [DotResolver; 2] = [
},
];
/// Legacy UDP resolvers (port 53) — only used when `GOBLIN_DNS_UDP=1`.
const UDP_RESOLVERS: [SocketAddr; 2] = [
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)), 53),
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 53),
];
/// A DoH resolver: the IP:443 to dial through the tunnel, its SNI/cert + Host
/// name, and the RFC 8484 query path. DoH is the FALLBACK for an exit whose
/// policy blocks DoT (:853) — 443 is guaranteed reachable (relays + HTTPS ride
@@ -121,8 +111,6 @@ enum DnsMode {
Dot,
/// DoH — DNS-over-HTTPS on :443 (fallback when an exit blocks :853).
Doh,
/// Legacy UDP-over-mixnet (`GOBLIN_DNS_UDP=1`, measurement only).
Udp,
}
/// Sticky: set once an exit is found to block DoT (:853), so we stop paying the
@@ -134,15 +122,11 @@ static PREFER_DOH: AtomicBool = AtomicBool::new(false);
/// (a few seconds of deliberate per-hop delay), so allow more headroom than the
/// UDP path did; a round that exceeds this is retried rather than waited out.
const DOT_QUERY_TIMEOUT: Duration = Duration::from_secs(8);
/// UDP per-query wait (legacy path).
const UDP_QUERY_TIMEOUT: Duration = Duration::from_secs(4);
/// Quick race-both-resolvers rounds before giving up. DoT is TCP-reliable within
/// a round, so two rounds is plenty (the second only matters if a whole
/// connection was dropped); the UDP path needs one more because it loses
/// datagrams.
/// connection was dropped).
const DOT_ROUNDS: usize = 2;
const UDP_ROUNDS: usize = 3;
/// DoH per-query wait (TCP + TLS + one HTTP round trip over the mixnet) and its
/// round count. Same reliability as DoT (TCP), a touch more per-request overhead
@@ -159,24 +143,6 @@ lazy_static! {
/// host → (addresses, expiry).
static ref CACHE: RwLock<HashMap<String, (Vec<Ipv4Addr>, Instant)>> =
RwLock::new(HashMap::new());
/// Shared rustls client config for DoT (webpki roots; ring provider installed
/// at startup — the Build 65/66 rule), reused for every resolver handshake.
static ref DOT_TLS: Arc<rustls::ClientConfig> = {
let mut roots = rustls::RootCertStore::empty();
roots.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
Arc::new(
rustls::ClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth(),
)
};
}
/// Restore the pre-Build-98 UDP mix-dns path. Measurement/debug only (reproduce
/// the lossy-UDP regression DoT replaced); default OFF.
fn use_legacy_udp() -> bool {
matches!(std::env::var("GOBLIN_DNS_UDP").as_deref(), Ok("1"))
}
/// Resolve `host` to a socket address for `tcp_connect`, entirely over the
@@ -192,10 +158,6 @@ pub async fn resolve(tunnel: &Tunnel, host: &str, port: u16) -> Option<SocketAdd
if let Some(ip) = cached(host) {
return Some(SocketAddr::new(IpAddr::V4(ip), port));
}
// Legacy measurement path (UDP-over-mixnet), never in shipped builds.
if use_legacy_udp() {
return resolve_via(tunnel, host, port, DnsMode::Udp).await;
}
// If a previous lookup already learned this exit blocks DoT, go straight to
// DoH — still entirely inside the tunnel.
if PREFER_DOH.load(Ordering::Acquire) {
@@ -214,19 +176,17 @@ pub async fn resolve(tunnel: &Tunnel, host: &str, port: u16) -> Option<SocketAdd
}
/// Run the round loop for one in-tunnel DNS transport, writing the cache on the
/// first valid answer. Shared by DoT / DoH / legacy-UDP.
/// first valid answer. Shared by DoT / DoH.
async fn resolve_via(tunnel: &Tunnel, host: &str, port: u16, mode: DnsMode) -> Option<SocketAddr> {
let (proto, rounds) = match mode {
DnsMode::Dot => ("dot-dns", DOT_ROUNDS),
DnsMode::Doh => ("doh-dns", DOH_ROUNDS),
DnsMode::Udp => ("udp-dns", UDP_ROUNDS),
};
let start = Instant::now();
for round in 0..rounds {
let answer = match mode {
DnsMode::Dot => race_dot(tunnel, host).await,
DnsMode::Doh => race_doh(tunnel, host).await,
DnsMode::Udp => race_udp(tunnel, host).await,
};
if let Some((resolver, ips, ttl)) = answer {
let ttl = ttl.clamp(TTL_FLOOR_SECS, TTL_CEILING_SECS);
@@ -295,7 +255,7 @@ async fn query_dot(
.map_err(|e| debug!("dot-dns: connect to {} failed: {e}", resolver.addr))
.ok()?;
let server_name = rustls::pki_types::ServerName::try_from(resolver.sni.to_string()).ok()?;
let mut tls = tokio_rustls::TlsConnector::from(DOT_TLS.clone())
let mut tls = tokio_rustls::TlsConnector::from(super::tls_config())
.connect(server_name, tcp)
.await
.map_err(|e| debug!("dot-dns: tls handshake with {} failed: {e}", resolver.sni))
@@ -372,7 +332,7 @@ async fn query_doh(
.map_err(|e| debug!("doh-dns: connect to {} failed: {e}", resolver.ip))
.ok()?;
let server_name = rustls::pki_types::ServerName::try_from(resolver.sni.to_string()).ok()?;
let tls = tokio_rustls::TlsConnector::from(DOT_TLS.clone())
let tls = tokio_rustls::TlsConnector::from(super::tls_config())
.connect(server_name, tcp)
.await
.map_err(|e| debug!("doh-dns: tls handshake with {} failed: {e}", resolver.sni))
@@ -408,22 +368,6 @@ async fn query_doh(
parse_response(id, &body)
}
/// One legacy-UDP round (only reached with `GOBLIN_DNS_UDP=1`).
async fn race_udp(tunnel: &Tunnel, host: &str) -> Option<(SocketAddr, Vec<Ipv4Addr>, u32)> {
let mut inflight = FuturesUnordered::new();
for resolver in UDP_RESOLVERS {
inflight.push(async move { (resolver, query_udp(tunnel, host, resolver).await) });
}
while let Some((resolver, answer)) = inflight.next().await {
if let Some((ips, ttl)) = answer
&& !ips.is_empty()
{
return Some((resolver, ips, ttl));
}
}
None
}
/// Resolve a batch of hosts concurrently to populate the cache, so the first
/// real use (relay dial, NIP-05 name claim, price fetch) hits a warm entry
/// instead of paying the mixnet DoT round trip inline. Best-effort; the port is
@@ -475,44 +419,6 @@ pub async fn probe(tunnel: &Tunnel) -> bool {
}
}
/// One legacy-UDP A query/response round trip over the tunnel against `resolver`.
async fn query_udp(
tunnel: &Tunnel,
host: &str,
resolver: SocketAddr,
) -> Option<(Vec<Ipv4Addr>, u32)> {
let udp = match tunnel.udp_socket().await {
Ok(s) => s,
Err(e) => {
warn!("udp-dns: udp socket failed: {e}");
return None;
}
};
let id = rand::random::<u16>();
let query = encode_query(id, host)?;
if let Err(e) = udp.send_to(&query, resolver).await {
warn!("udp-dns: send to {resolver} failed: {e}");
return None;
}
let mut buf = vec![0u8; 1500];
let (n, from) = match tokio::time::timeout(UDP_QUERY_TIMEOUT, udp.recv_from(&mut buf)).await {
Ok(Ok(r)) => r,
Ok(Err(e)) => {
warn!("udp-dns: recv from {resolver} failed: {e}");
return None;
}
Err(_) => {
debug!("udp-dns: query to {resolver} timed out (will retry)");
return None;
}
};
if from != resolver {
warn!("udp-dns: dropping answer from unexpected source {from}");
return None;
}
parse_response(id, &buf[..n])
}
/// Encode a recursive A query for `host` with transaction id `id`.
fn encode_query(id: u16, host: &str) -> Option<Vec<u8>> {
let name = Name::from_ascii(host).ok()?;
+26 -19
View File
@@ -30,8 +30,7 @@
//! timeouts (~10s measured), tipping relay connects past the exit-condemnation
//! grace and driving a 2-3 minute reselect loop. Build 98 moves DNS to DoT
//! (TCP+TLS through the tunnel): TCP retransmits (no packet-loss stalls) and TLS
//! encrypts the query from the exit — reliable AND private. (`GOBLIN_DNS_UDP=1`
//! restores the old UDP path for measuring the regression.)
//! encrypts the query from the exit — reliable AND private.
pub mod dns;
pub mod nymproc;
@@ -255,11 +254,32 @@ async fn exit_connect(host: &str, exit: &str) -> Option<Box<dyn Stream>> {
}
}
/// Everything hyper needs from the tunneled stream, boxable for the plain
/// http / https split.
trait Stream: AsyncRead + AsyncWrite + Send + Unpin {}
/// Everything hyper (and the TLS/websocket layers) needs from a mixnet-carried
/// stream, boxable for the plain http / https / scoped-exit split. Shared with
/// the scoped-exit egress ([`streamexit::BoxedStream`]).
pub(crate) trait Stream: AsyncRead + AsyncWrite + Send + Unpin {}
impl<T: AsyncRead + AsyncWrite + Send + Unpin> Stream for T {}
lazy_static::lazy_static! {
/// Shared rustls client config (webpki roots; ring provider installed at
/// startup — the Build 65/66 rule), reused by every in-tunnel TLS handshake
/// (HTTPS here, DoT/DoH in [`dns`]).
static ref TLS_CONFIG: Arc<rustls::ClientConfig> = {
let mut roots = rustls::RootCertStore::empty();
roots.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
Arc::new(
rustls::ClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth(),
)
};
}
/// The shared rustls client config (cheap `Arc` bump).
pub(crate) fn tls_config() -> Arc<rustls::ClientConfig> {
TLS_CONFIG.clone()
}
/// TLS-wrap a tunneled TCP stream with rustls + webpki roots (never the
/// platform verifier — it panics on Android outside a full app context). The
/// certificate is validated against the HOSTNAME even though the dial went to a
@@ -268,21 +288,8 @@ async fn tls_connect<S>(host: &str, stream: S) -> Option<tokio_rustls::client::T
where
S: AsyncRead + AsyncWrite + Send + Unpin,
{
// Shared rustls client config (webpki roots; ring provider installed at
// startup — the Build 65/66 rule).
lazy_static::lazy_static! {
static ref TLS_CONFIG: Arc<rustls::ClientConfig> = {
let mut roots = rustls::RootCertStore::empty();
roots.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
Arc::new(
rustls::ClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth(),
)
};
}
let server_name = rustls::pki_types::ServerName::try_from(host.to_string()).ok()?;
tokio_rustls::TlsConnector::from(TLS_CONFIG.clone())
tokio_rustls::TlsConnector::from(tls_config())
.connect(server_name, stream)
.await
.map_err(|e| warn!("nym http: tls handshake with {host} failed: {e}"))
+11 -28
View File
@@ -200,6 +200,11 @@ fn run_tunnel() {
// exit in the pool → no wait. Cold start only: on a later reselect the
// exit is long-ready, so `is_ready()` returns instantly.
if crate::nostr::pool::load().has_exit() {
// Kick the exit client's bootstrap NOW — nothing else touches it
// until the first relay dial (after a wallet opens), so waiting
// without this would just burn the head start and the grant race
// would happen anyway.
tokio::spawn(super::streamexit::prewarm());
let head_start = Instant::now();
while !super::streamexit::is_ready() && head_start.elapsed() < EXIT_HEAD_START {
tokio::time::sleep(Duration::from_millis(200)).await;
@@ -342,11 +347,11 @@ fn run_tunnel() {
*TUNNEL.write() = None;
tunnel.shutdown().await;
// Rebuild floor: never re-select faster than once per
// MIN_EXIT_LIFETIME. In the legacy path (and any future bug)
// this is the hard guarantee that a condemnation can't thrash
// the mixnet into a tight reselect loop.
// MIN_EXIT_LIFETIME. Whatever condemned the exit (or any
// future bug), this is the hard guarantee that a condemnation
// can't thrash the mixnet into a tight reselect loop.
let alive = published.elapsed();
if !legacy_watchdog() && alive < MIN_EXIT_LIFETIME {
if alive < MIN_EXIT_LIFETIME {
let floor = MIN_EXIT_LIFETIME - alive;
info!(
"[timing] nym: rebuild floor — waiting {}ms before next exit select",
@@ -440,14 +445,6 @@ pub(crate) const BOOTSTRAP_TIMEOUT: Duration = Duration::from_secs(20);
/// tunnel more than briefly; the exit typically readies well inside it.
const EXIT_HEAD_START: Duration = Duration::from_secs(12);
/// Restore the pre-Build-98 watchdog (condemn on RELAY_GRACE of no-relay alone,
/// no connectivity gate, no rebuild floor). Debug/measurement only — lets a cold
/// run reproduce the old reselect loop for a BEFORE/AFTER comparison. Default
/// OFF.
fn legacy_watchdog() -> bool {
matches!(std::env::var("GOBLIN_LEGACY_WATCHDOG").as_deref(), Ok("1"))
}
/// Watchdog poll cadence. The relay-reachability check is a bare atomic load
/// (free), so a short cadence costs nothing and never touches the network; the
/// DNS keepalive still only fires every [`KEEPALIVE_PERIOD`], preserving the
@@ -468,7 +465,6 @@ const WATCH_TICK: Duration = Duration::from_secs(5);
/// Returns once either signal declares the current exit dead, whereupon
/// `run_tunnel` rebuilds on a fresh auto-selected exit.
async fn watch_tunnel(tunnel: &smolmix::Tunnel, generation: u64) {
let legacy = legacy_watchdog();
let published = Instant::now();
let mut dns_fails = 0u32;
let mut since_dns = Duration::ZERO;
@@ -481,21 +477,8 @@ async fn watch_tunnel(tunnel: &smolmix::Tunnel, generation: u64) {
if relay_consumer() && !relay_live_for(generation) {
let lost = *relay_lost.get_or_insert_with(Instant::now);
let absent = lost.elapsed();
if legacy {
// Pre-Build-98: condemn on RELAY_GRACE of no-relay alone. Kept for
// BEFORE/AFTER measurement; this is the branch that produced the
// reselect loop when mix-dns made relays slow to connect.
if absent >= RELAY_GRACE {
warn!(
"[timing] nym: CONDEMN gen {generation} reason=no-relay-{}s (legacy watchdog); \
exit lived {}s, re-selecting",
RELAY_GRACE.as_secs(),
published.elapsed().as_secs()
);
return;
}
} else if published.elapsed() >= MIN_EXIT_LIFETIME && absent >= RELAY_GRACE {
// Robust: past the settle floor AND relays absent for the grace.
if published.elapsed() >= MIN_EXIT_LIFETIME && absent >= RELAY_GRACE {
// Past the settle floor AND relays absent for the grace.
// Don't condemn on "no relay yet" alone — first prove the exit
// itself has NO connectivity (a genuine blackhole). If the probe
// SUCCEEDS the exit reaches the internet, so relays are merely slow
+33 -11
View File
@@ -35,16 +35,12 @@ use std::time::Duration;
use log::{info, warn};
use nym_sdk::mixnet::{MixnetClient, MixnetStream, Recipient};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::Mutex;
/// Everything the TLS/websocket layer needs from the egress stream.
pub trait ExitStream: AsyncRead + AsyncWrite + Send + Unpin {}
impl<T: AsyncRead + AsyncWrite + Send + Unpin> ExitStream for T {}
/// The boxed transport stream handed to the TLS/websocket layer — the same
/// seat the smolmix tunnel's TCP stream occupies on the fallback path.
pub type BoxedStream = Box<dyn ExitStream>;
/// seat the smolmix tunnel's TCP stream occupies on the fallback path
/// (everything that layer needs is the shared [`super::Stream`] trait).
pub(crate) type BoxedStream = Box<dyn super::Stream>;
/// After the Open is SENT, wait this long before handing back a writable
/// stream. `open_stream` returns once the Open message leaves the client, NOT
@@ -98,7 +94,7 @@ pub fn is_ready() -> bool {
/// mixnet — a DEAD exit still hands back a stream, and its death surfaces at
/// the caller's (timeout-bounded) TLS handshake, which doubles as the
/// liveness probe: no ServerHello through the pipe → fall back.
pub async fn open_stream(exit: &str, timeout: Duration) -> Result<BoxedStream, String> {
pub(crate) async fn open_stream(exit: &str, timeout: Duration) -> Result<BoxedStream, String> {
let recipient: Recipient = exit
.trim()
.parse()
@@ -113,8 +109,22 @@ pub async fn open_stream(exit: &str, timeout: Duration) -> Result<BoxedStream, S
Ok(Box::new(stream) as BoxedStream)
}
/// Ensure the shared client is connected, then open a stream on it.
async fn open(recipient: Recipient) -> Result<MixnetStream, String> {
/// Bootstrap the shared client ahead of the first dial. The cold-start
/// sequencer in [`super::nymproc`] spawns this when the pool advertises an
/// exit: without it the client would only bootstrap on the first relay dial —
/// which happens after a wallet opens, so the tunnel's bounded head-start wait
/// would just expire and both clients would race for their bandwidth grants
/// anyway. Failure is non-fatal (the first real dial retries the bootstrap).
pub async fn prewarm() {
if let Err(e) = ensure_client().await {
warn!("nym: streamexit prewarm failed: {e}");
}
}
/// Ensure the shared client is connected (bootstrapping it when absent or
/// dead) and READY reflects reality. Holds the client lock across the
/// bootstrap so concurrent callers coalesce onto one connect.
async fn ensure_client() -> Result<(), String> {
let mut guard = CLIENT.lock().await;
// A dead client (gateway dropped, hosting runtime gone) is discarded and
// rebuilt — the auto-reconnect-on-drop rule.
@@ -138,7 +148,19 @@ async fn open(recipient: Recipient) -> Result<MixnetStream, String> {
*guard = Some(client);
READY.store(true, Ordering::Relaxed);
}
let client = guard.as_mut().expect("client ensured above");
Ok(())
}
/// Ensure the shared client is connected, then open a stream on it.
async fn open(recipient: Recipient) -> Result<MixnetStream, String> {
ensure_client().await?;
let mut guard = CLIENT.lock().await;
// Re-acquired the lock after ensure_client — a concurrent failed dial may
// have dropped the client in between; error into the caller's fallback
// rather than panic.
let Some(client) = guard.as_mut() else {
return Err("exit client lost before dial".to_string());
};
match client.open_stream(recipient, None).await {
Ok(stream) => Ok(stream),
Err(e) => {
+3 -14
View File
@@ -24,7 +24,6 @@
//! payload + in-flight destination never touch the clear, and an exit failure
//! only ever falls back — never a lockout.
use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
@@ -37,20 +36,10 @@ use nostr_sdk::Url;
use nostr_sdk::util::BoxedFuture;
use tokio_tungstenite::tungstenite::Message as TgMessage;
/// Error type for transport failures outside the websocket layer.
#[derive(Debug)]
struct NymTransportError(String);
impl fmt::Display for NymTransportError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl std::error::Error for NymTransportError {}
/// A backend transport error (failures outside the websocket layer) carrying
/// `msg` as its display text.
fn terr(msg: impl Into<String>) -> TransportError {
TransportError::backend(NymTransportError(msg.into()))
TransportError::backend(std::io::Error::other(msg.into()))
}
/// Nostr websocket transport over the in-process Nym mixnet tunnel.
+1 -1
Submodule wallet updated: 906dc55b95...c2db754552