Compare commits

..

3 Commits

Author SHA1 Message Date
Jędrzej Stuczyński b6eb391e85 bugfix: clippy issues (#6876) 2026-06-12 11:10:06 +01:00
Jędrzej Stuczyński 931ec03b28 feat: expose node's chain address on self-described API (#6815)
* feat: expose nym-nodes' on-chain address on v2 auxiliary endpoint

* moved swagger page outside the v1 route

* fixed swagger endpoint for nym-api

* post rebasing fixes

* remove redundant impl OfflineSigner for Arc<DirectSecp256k1HdWallet>
2026-06-12 10:36:27 +01:00
Jędrzej Stuczyński 8a93bce32f feat: additional mixnet improvements and metrics (#6874)
* wip

* batch processing of forward packets

* tmp: additional metrics for remote node

* fixed incorrect prometheus metric registration

* unified runtime metrics

* unify mixnet client metrics

* packet forwarding cleanup

* add batching for emptying the delay queue

* cleanup client io loop

* feat(nym-node): reap idle mixnet connections (ingress + egress)

Close mixnet connections that sit with no traffic past a configurable idle period (mixnet.debug.connection_idle_timeout, default 5min, 0 disables) to bound lingering tokio tasks/sockets.

Ingress handle_stream is read-only, so a silently-gone peer (NAT drop, crash without FIN, half-open) never triggers FIN/RST and the task would block on .next() forever; a new idle select arm closes it (the post-loop replay flush still runs, so nothing is stranded). Egress run_io_loop gets the symmetric arm keyed on last_send; on close EvictOnDrop clears the cache entry and the next packet transparently reconnects.

Adds a cumulative nym_node_network_idle_closed_ingress_mixnet_connections counter; egress reaping is observed via the existing active-egress gauge plus an exit_reason=idle_timeout log.

* downgrade sysinfo

* refactor(nym-node): split PacketForwarder into router + delay-queue tasks

Split the single PacketForwarder task into two concurrently-scheduled tasks connected by a bounded handoff channel, so intake and delayed-release no longer block each other.

PacketRouter (router.rs) is the intake task: sole consumer of the ingress channel, it applies the routing filter and either forwards zero/already-elapsed-delay packets directly or hands delayed ones to the delay task. Its per-packet work is sub-µs, so new packets no longer wait behind delayed-release processing (collapses the ForwarderQueue tail).

DelayForwarder (delay.rs) owns the NonExhaustiveDelayQueue exclusively (it can't be shared by reference). Its run loop services BOTH branches on every wakeup - draining pending inserts first to bring the queue current, then flushing everything now due - so the biased select can't let releases and inserts starve each other, and a freshly-arrived-but-already-due packet releases in the same pass (marginally improving DelayQueueOverrun).

The mixnet client is shared as Arc<C>; handoff-channel overflow is dropped as an egress drop rather than blocking, keeping intake decoupled from release.

* feat(nym-node): bound egress flush with a write timeout

Cap how long a single egress batch flush may block on a congested peer socket (mixnet.debug.connection_write_timeout, default 500ms, 0 disables), so a slow peer can no longer back this connection's egress queue up into the multi-second range - the root of the EgressQueue and SocketWrite tails.

A single timeout is treated as transient congestion: the un-fed tail of the batch is abandoned but the connection is retained. This is sound because NoiseStream::poll_write encrypts and buffers each frame synchronously, so a cancelled flush leaves the noise transport nonce-consistent and a later flush resumes the byte stream in order - so a momentary spike costs no re-handshake. Only MAX_CONSECUTIVE_WRITE_TIMEOUTS (3) timeouts in a row, i.e. a persistently congested peer, tears the connection down (it reconnects on the next packet); a successful flush resets the counter.

Buffer-size tuning (maximum_connection_buffer_size) deliberately left for live data.

* revert PacketForwarder split in favour of a single task that clears both channels on wake
2026-06-12 10:31:54 +01:00
72 changed files with 1290 additions and 587 deletions
Generated
+3 -3
View File
@@ -12072,16 +12072,16 @@ dependencies = [
[[package]]
name = "sysinfo"
version = "0.37.2"
version = "0.38.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16607d5caffd1c07ce073528f9ed972d88db15dd44023fa57142963be3feb11f"
checksum = "92ab6a2f8bfe508deb3c6406578252e491d299cbbf3bc0529ecc3313aee4a52f"
dependencies = [
"libc",
"memchr",
"ntapi",
"objc2-core-foundation",
"objc2-io-kit",
"windows 0.61.3",
"windows 0.62.2",
]
[[package]]
+1 -1
View File
@@ -369,7 +369,7 @@ strum = "0.28.0"
strum_macros = "0.28.0"
subtle-encoding = "0.5"
syn = "2"
sysinfo = "0.37.0"
sysinfo = "0.38.4"
tap = "1.0.1"
tar = "0.4.45"
test-with = { version = "0.15.4", default-features = false }
+1 -1
View File
@@ -36,4 +36,4 @@ client = ["tokio-util", "nym-task", "nym-metrics", "tokio/net", "tokio/rt"]
[dev-dependencies]
nym-crypto = { workspace = true }
rand = { workspace = true }
tokio = { workspace = true, features = ["macros", "io-util", "rt", "rt-multi-thread"] }
tokio = { workspace = true, features = ["macros", "io-util", "rt", "rt-multi-thread", "test-util"] }
+262 -65
View File
@@ -1,9 +1,9 @@
// Copyright 2021-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::trace::{TraceStage, Traced};
use crate::metrics::{MixnetMetric, Traced};
use dashmap::DashMap;
use futures::{SinkExt, StreamExt};
use futures::{Sink, SinkExt, StreamExt};
use nym_noise::config::NoiseConfig;
use nym_noise::upgrade_noise_initiator;
use nym_sphinx::forwarding::packet::MixPacket;
@@ -11,7 +11,7 @@ use nym_sphinx::framing::codec::NymCodec;
use nym_sphinx::framing::packet::FramedNymPacket;
use std::io;
use std::net::SocketAddr;
use std::ops::Deref;
use std::ops::{ControlFlow, Deref};
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
@@ -19,7 +19,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::time::sleep;
use tokio::time::{sleep, Instant};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::codec::Framed;
use tracing::*;
@@ -31,6 +31,13 @@ pub struct Config {
pub initial_connection_timeout: Duration,
pub maximum_connection_buffer_size: usize,
pub use_legacy_packet_encoding: bool,
/// Close an egress connection after this long with no packets sent (0 disables). The cache
/// entry is evicted on close and the next packet to that peer transparently reconnects.
pub connection_idle_timeout: Duration,
/// Max time a single batch flush may block on the peer socket before we give up on it
/// (0 disables). One timeout is treated as transient congestion - the batch is abandoned but
/// the connection is retained (no re-handshake); only a few *consecutive* timeouts tear it down.
pub connection_write_timeout: Duration,
}
impl Config {
@@ -40,6 +47,8 @@ impl Config {
initial_connection_timeout: Duration,
maximum_connection_buffer_size: usize,
use_legacy_packet_encoding: bool,
connection_idle_timeout: Duration,
connection_write_timeout: Duration,
) -> Self {
Config {
initial_reconnection_backoff,
@@ -47,6 +56,8 @@ impl Config {
initial_connection_timeout,
maximum_connection_buffer_size,
use_legacy_packet_encoding,
connection_idle_timeout,
connection_write_timeout,
}
}
}
@@ -114,6 +125,8 @@ struct ManagedConnection {
noise_config: NoiseConfig,
message_receiver: ReceiverStream<Traced<FramedNymPacket>>,
connection_timeout: Duration,
idle_timeout: Duration,
write_timeout: Duration,
current_reconnection: Arc<AtomicU32>,
active_connections: ActiveConnections,
handle_token: Arc<()>,
@@ -143,11 +156,14 @@ impl Drop for EvictOnDrop {
}
impl ManagedConnection {
#[allow(clippy::too_many_arguments)]
fn new(
address: SocketAddr,
noise_config: NoiseConfig,
message_receiver: mpsc::Receiver<Traced<FramedNymPacket>>,
connection_timeout: Duration,
idle_timeout: Duration,
write_timeout: Duration,
current_reconnection: Arc<AtomicU32>,
active_connections: ActiveConnections,
handle_token: Arc<()>,
@@ -157,6 +173,8 @@ impl ManagedConnection {
noise_config,
message_receiver: ReceiverStream::new(message_receiver),
connection_timeout,
idle_timeout,
write_timeout,
current_reconnection,
active_connections,
handle_token,
@@ -165,6 +183,8 @@ impl ManagedConnection {
async fn run(self) {
let address = self.address;
let idle_timeout = self.idle_timeout;
let write_timeout = self.write_timeout;
let _evict_guard = EvictOnDrop {
active_connections: self.active_connections,
address,
@@ -259,7 +279,14 @@ impl ManagedConnection {
conn.set_backpressure_boundary(OUTBOUND_WRITE_BUFFER);
// 4. start handling the framed stream
run_io_loop(conn, self.message_receiver, address).await;
run_io_loop(
conn,
self.message_receiver,
address,
idle_timeout,
write_timeout,
)
.await;
}
}
@@ -274,6 +301,124 @@ const OUTBOUND_FLUSH_BATCH: usize = 1024;
/// a flush is usually a single frame.
const OUTBOUND_WRITE_BUFFER: usize = 32 * 1024;
/// Drive the read half solely to notice peer FIN/RST (the connection is send-only). Returns
/// `Break` when the peer closed the connection or the read errored, `Continue` otherwise.
fn handle_peer_read<P, E: std::fmt::Display>(
msg: Option<Result<P, E>>,
address: SocketAddr,
) -> ControlFlow<()> {
match msg {
None => {
debug!(
peer = %address,
exit_reason = "peer_closed",
"peer closed mixnet connection to {address}"
);
ControlFlow::Break(())
}
Some(Err(err)) => {
debug!(
event = "connection.read_error",
peer = %address,
error = %err,
exit_reason = "read_error",
"read error on mixnet connection to {address}: {err}"
);
ControlFlow::Break(())
}
Some(Ok(_)) => {
trace!(
peer = %address,
"unexpected inbound packet on mixnet connection to {address}; discarding"
);
ControlFlow::Continue(())
}
}
}
/// Number of consecutive flush timeouts to the same peer we tolerate before dropping the
/// connection. A single timeout is transient congestion (batch abandoned, connection retained to
/// avoid a re-handshake); this many in a row means the peer is persistently unable to keep up, so
/// we tear the connection down (it reconnects on the next packet).
const MAX_CONSECUTIVE_WRITE_TIMEOUTS: u32 = 3;
/// Outcome of attempting to flush one batch to the peer.
enum BatchOutcome {
/// the batch was flushed to the socket
Sent,
/// the flush exceeded the write timeout (peer congested): the un-fed tail of the batch is
/// dropped, but the already-encoded frames stay buffered for a later flush and the connection
/// is left intact - the noise transport stays nonce-consistent across the cancelled flush, so
/// resuming the write is sound
WriteTimedOut,
/// the sink errored: the connection is dead
Failed,
}
/// Feed a ready batch into the sink and flush it once (far fewer syscalls than per-packet), then
/// stamp the egress latency stages: `EgressQueue` before each feed, then `SocketWrite` + the
/// end-to-end total once the batch has hit the wire. The flush is bounded by `write_timeout`
/// (0 disables) so a congested peer can't block this connection's egress queue into the
/// multi-second range. The caller decides what a timeout means (see [`MAX_CONSECUTIVE_WRITE_TIMEOUTS`]).
async fn forward_batch<S>(
sink: &mut S,
batch: Vec<Traced<FramedNymPacket>>,
address: SocketAddr,
write_timeout: Duration,
) -> BatchOutcome
where
S: Sink<FramedNymPacket> + Unpin,
S::Error: std::fmt::Display,
{
let mut traces = Vec::with_capacity(batch.len());
let write = async {
for mut traced in batch {
// time spent waiting in this connection's egress buffer
traced.record(MixnetMetric::EgressQueue);
sink.feed(traced.inner).await?;
traces.push(traced.trace);
}
sink.flush().await
};
// bound how long we block on a slow/congested peer socket. On timeout the `write` future is
// cancelled, which is safe: every already-encoded frame is buffered (nonce-consistent), so a
// later flush resumes the byte stream in order.
let write_result = if write_timeout.is_zero() {
Ok(write.await)
} else {
tokio::time::timeout(write_timeout, write).await
};
// socket-write time + end-to-end total for whatever was fed (on a timeout, those frames are
// buffered and will hit the wire on a subsequent flush)
for mut trace in traces {
trace.record(MixnetMetric::SocketWrite);
trace.record_total();
}
match write_result {
Ok(Ok(())) => BatchOutcome::Sent,
Ok(Err(err)) => {
debug!(
event = "connection.forward_error",
peer = %address,
error = %err,
exit_reason = "forward_error",
"failed to forward packet batch to {address}: {err}"
);
BatchOutcome::Failed
}
Err(_elapsed) => BatchOutcome::WriteTimedOut,
}
}
/// Instant at which a connection idle since `last_activity` should be closed, or `None` if idle
/// reaping is disabled (`timeout` is zero).
fn idle_deadline(last_activity: Instant, timeout: Duration) -> Option<Instant> {
(!timeout.is_zero()).then(|| last_activity + timeout)
}
// The connection is unidirectional (send-only); we read from it solely to
// notice peer FIN/RST while idle so we can evict the cache entry before the
// next outbound send finds it stale.
@@ -281,6 +426,8 @@ async fn run_io_loop<T>(
conn: Framed<T, NymCodec>,
receiver: ReceiverStream<Traced<FramedNymPacket>>,
address: SocketAddr,
idle_timeout: Duration,
write_timeout: Duration,
) where
T: AsyncRead + AsyncWrite + Unpin,
{
@@ -290,78 +437,73 @@ async fn run_io_loop<T>(
// which otherwise caps egress throughput and backs up the per-connection queue under load
let mut receiver = receiver.ready_chunks(OUTBOUND_FLUSH_BATCH);
// reset by every batch we send; drives the idle-connection reaping below
let mut last_send = tokio::time::Instant::now();
// consecutive flush timeouts; a run of them (a persistently congested peer) drops the connection
let mut consecutive_write_timeouts = 0u32;
loop {
tokio::select! {
msg = stream.next() => {
match msg {
None => {
debug!(
peer = %address,
exit_reason = "peer_closed",
"peer closed mixnet connection to {address}"
);
break;
}
Some(Err(err)) => {
debug!(
event = "connection.read_error",
peer = %address,
error = %err,
exit_reason = "read_error",
"read error on mixnet connection to {address}: {err}"
);
break;
}
Some(Ok(_)) => {
trace!(
peer = %address,
"unexpected inbound packet on mixnet connection to {address}; discarding"
);
}
if handle_peer_read(msg, address).is_break() {
break;
}
}
outgoing = receiver.next() => {
match outgoing {
None => {
debug!(
peer = %address,
exit_reason = "sender_dropped",
"connection manager to {address} finished"
);
break;
let Some(batch) = outgoing else {
debug!(
peer = %address,
exit_reason = "sender_dropped",
"connection manager to {address} finished"
);
break;
};
match forward_batch(&mut sink, batch, address, write_timeout).await {
BatchOutcome::Sent => {
consecutive_write_timeouts = 0;
last_send = Instant::now();
}
Some(batch) => {
// feed the whole ready batch, then flush once
let mut traces = Vec::with_capacity(batch.len());
let res = async {
for mut traced in batch {
// time spent waiting in this connection's egress buffer
traced.record(TraceStage::EgressQueue);
sink.feed(traced.inner).await?;
traces.push(traced.trace);
}
sink.flush().await
}
.await;
// after the batch hit the wire: socket-write time and end-to-end total
for mut trace in traces {
trace.record(TraceStage::SocketWrite);
trace.record_total();
}
if let Err(err) = res {
BatchOutcome::WriteTimedOut => {
consecutive_write_timeouts += 1;
warn!(
event = "connection.write_congested",
peer = %address,
write_ms = write_timeout.as_millis() as u64,
attempt = consecutive_write_timeouts,
max_attempts = MAX_CONSECUTIVE_WRITE_TIMEOUTS,
"egress flush to {address} timed out (peer congested); abandoned batch, retaining connection"
);
if consecutive_write_timeouts >= MAX_CONSECUTIVE_WRITE_TIMEOUTS {
debug!(
event = "connection.forward_error",
peer = %address,
error = %err,
exit_reason = "forward_error",
"failed to forward packet batch to {address}: {err}"
exit_reason = "write_timeout",
"egress connection to {address} congested for {MAX_CONSECUTIVE_WRITE_TIMEOUTS} consecutive flushes; dropping it"
);
break;
}
// keep the connection: a single congestion spike shouldn't cost a
// re-handshake. `last_send` is deliberately not bumped, so a peer that goes
// congested-then-silent still idle-reaps on schedule.
}
BatchOutcome::Failed => break,
}
}
// close the connection (freeing the task/socket) if we haven't sent anything for too
// long; EvictOnDrop then clears the cache entry and the next packet reconnects
_ = async {
match idle_deadline(last_send, idle_timeout) {
Some(d) => tokio::time::sleep_until(d).await,
None => std::future::pending::<()>().await,
}
} => {
debug!(
peer = %address,
exit_reason = "idle_timeout",
idle_secs = idle_timeout.as_secs(),
"closing idle egress mixnet connection to {address}"
);
break;
}
}
}
}
@@ -430,8 +572,10 @@ impl Client {
let reconnection_attempt = current_reconnection_attempt.load(Ordering::Acquire);
let backoff = self.determine_backoff(reconnection_attempt);
// copy the value before moving into another task
// copy the values before moving into another task
let initial_connection_timeout = self.config.initial_connection_timeout;
let connection_idle_timeout = self.config.connection_idle_timeout;
let connection_write_timeout = self.config.connection_write_timeout;
let connections_count = self.connections_count.clone();
let noise_config = self.noise_config.clone();
@@ -449,6 +593,8 @@ impl Client {
noise_config,
receiver,
initial_connection_timeout,
connection_idle_timeout,
connection_write_timeout,
current_reconnection_attempt,
active_connections,
handle_token,
@@ -465,6 +611,9 @@ impl SendWithoutResponse for Client {
let address = packet.inner.next_hop_address();
trace!("Sending packet to {address}");
// capture the sample state before the trace is moved into `queued`
let sampled = packet.trace.is_sampled();
// TODO: optimisation for the future: rather than constantly using legacy encoding,
// use the mix packet type / flags to pick encoding per packet
let legacy = self.config.use_legacy_packet_encoding;
@@ -489,6 +638,11 @@ impl SendWithoutResponse for Client {
let channel_available = sender.channel.capacity();
let channel_used = channel_capacity - channel_available;
// record how full this peer's egress buffer was (sampled packets only, to bound cost)
if sampled {
crate::metrics::observe_egress_buffer_fill(channel_used, channel_capacity);
}
let sending_res = sender.channel.try_send(queued);
drop(sender);
@@ -544,6 +698,8 @@ mod tests {
initial_connection_timeout: Duration::from_millis(1_500),
maximum_connection_buffer_size: 128,
use_legacy_packet_encoding: false,
connection_idle_timeout: Duration::from_secs(300),
connection_write_timeout: Duration::from_millis(500),
},
NoiseConfig::new(
Arc::new(x25519::KeyPair::new(&mut rng)),
@@ -653,7 +809,14 @@ mod tests {
let conn = Framed::new(a, NymCodec);
let (_tx, rx) = mpsc::channel(1);
let task = tokio::spawn(run_io_loop(conn, ReceiverStream::new(rx), test_addr()));
// idle reaping disabled so only the peer-close path is exercised
let task = tokio::spawn(run_io_loop(
conn,
ReceiverStream::new(rx),
test_addr(),
Duration::ZERO,
Duration::ZERO,
));
// Simulate peer closing both directions of the connection.
drop(b);
@@ -670,7 +833,13 @@ mod tests {
let conn = Framed::new(a, NymCodec);
let (tx, rx) = mpsc::channel(1);
let task = tokio::spawn(run_io_loop(conn, ReceiverStream::new(rx), test_addr()));
let task = tokio::spawn(run_io_loop(
conn,
ReceiverStream::new(rx),
test_addr(),
Duration::ZERO,
Duration::ZERO,
));
drop(tx);
@@ -679,4 +848,32 @@ mod tests {
.expect("io_loop must exit when the upstream sender is dropped")
.expect("io_loop task must not panic");
}
#[tokio::test(start_paused = true)]
async fn io_loop_closes_idle_connection() {
// With no packets sent and the peer still connected, the idle timeout must eventually
// close the connection so the task/socket don't linger forever. The paused clock is
// virtual - it auto-advances to the next timer, so this completes instantly despite the
// durations below (no real waiting).
let (a, _b) = tokio::io::duplex(64);
let conn = Framed::new(a, NymCodec);
// keep the sender alive so the sender-dropped path can't fire instead
let (_tx, rx) = mpsc::channel(1);
let idle_timeout = Duration::from_millis(50);
let task = tokio::spawn(run_io_loop(
conn,
ReceiverStream::new(rx),
test_addr(),
idle_timeout,
Duration::ZERO,
));
// auto-advance fires the nearest timer (the 50ms idle deadline, sooner than this 500ms
// guard) once the task is otherwise idle, reaping the connection
tokio::time::timeout(Duration::from_millis(500), task)
.await
.expect("io_loop must close the connection after the idle timeout")
.expect("io_loop task must not panic");
}
}
@@ -1,7 +1,7 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::trace::PacketTrace;
use crate::metrics::PacketTrace;
use futures::channel::mpsc;
use futures::channel::mpsc::SendError;
use nym_sphinx::forwarding::packet::MixPacket;
+1 -1
View File
@@ -4,7 +4,7 @@
#[cfg(feature = "client")]
pub mod client;
pub mod forwarder;
pub mod trace;
pub mod metrics;
#[cfg(feature = "client")]
pub use client::{Client, Config, SendWithoutResponse};
@@ -4,18 +4,33 @@
use strum::{AsRefStr, EnumIter, EnumProperty, IntoEnumIterator};
use tokio::time::Instant;
/// Histogram buckets (seconds) for per-stage and total packet latency: exponential,
/// ~100us .. ~1.6s. Shared by every stage so the waterfall is directly comparable.
const STAGE_LATENCY_BUCKETS: [f64; 14] = [
/// Histogram buckets (seconds) for per-stage and total packet latency: exponential, ~100us .. ~6.5s.
/// Shared by every latency stage so the waterfall is directly comparable; the top finite bucket is
/// intentionally high so a rare multi-second processing spike is measured with magnitude rather than
/// being clipped into the `+Inf` overflow.
const STAGE_LATENCY_BUCKETS: [f64; 17] = [
0.0001, 0.0002, 0.0004, 0.0008, 0.0016, 0.0032, 0.0064, 0.0128, 0.0256, 0.0512, 0.1024, 0.2048,
0.4096, 0.8192,
0.4096, 0.8192, 1.6384, 3.2768, 6.5536,
];
/// A stage in the packet-forwarding pipeline, in order. Each maps to its own latency histogram
/// (`AsRefStr` = metric name, `help` prop = description); `Total` is the end-to-end
/// receive -> socket-write time. Defined here so call sites just name the stage.
/// Count buckets (1 .. MAX_DRAIN_BATCH) for the forwarder drain-batch-size histogram.
const DRAIN_BATCH_BUCKETS: [f64; 9] = [1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0];
/// Fill-ratio buckets (used/capacity) for the per-connection egress buffer. A ratio near 1.0 means
/// the buffer is close to full and packets to that peer are about to be dropped.
const EGRESS_FILL_BUCKETS: [f64; 9] = [0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 1.0];
/// Every histogram this crate emits, defined in one place. `AsRefStr` (`#[strum(to_string=...)]`)
/// gives the prometheus metric name - the bare `mixnet_packet_*` family, with no per-crate prefix
/// since this is a shared library writing straight to the process-global registry. The `help` prop
/// gives the description and [`MixnetMetric::buckets`] gives the bucket layout.
///
/// Register the whole family at boot with [`register_all`]. Latency-stage variants are observed via
/// the [`PacketTrace`] stopwatch; the auxiliary variants via the `observe_*` helpers. (Passing an
/// auxiliary variant to `PacketTrace::record` is meaningless but harmless.)
#[derive(Clone, Copy, EnumIter, AsRefStr, EnumProperty)]
pub enum TraceStage {
pub enum MixnetMetric {
// ----- latency stages: the per-packet waterfall, recorded via `PacketTrace` -----
/// receive -> sphinx unwrap (partial: shared secret + header MAC)
#[strum(to_string = "mixnet_packet_stage_unwrap_seconds")]
#[strum(props(help = "Seconds spent unwrapping a received sphinx packet"))]
@@ -57,31 +72,87 @@ pub enum TraceStage {
#[strum(to_string = "mixnet_packet_total_latency_seconds")]
#[strum(props(help = "Total in-node latency of a forwarded packet, receive to socket write"))]
Total,
// ----- auxiliary histograms: observed directly, not part of the latency waterfall -----
/// number of packets the forwarder drained from the ingress channel per wakeup
#[strum(to_string = "mixnet_packet_forwarder_drain_batch_size")]
#[strum(props(
help = "Number of ingress packets the forwarder drained per select! wakeup (batch size)"
))]
ForwarderDrainBatchSize,
/// number of expired packets the forwarder drained from the delay queue per wakeup
#[strum(to_string = "mixnet_packet_forwarder_delay_drain_batch_size")]
#[strum(props(
help = "Number of expired delay-queue packets the forwarder drained per select! wakeup (batch size)"
))]
ForwarderDelayDrainBatchSize,
/// per-connection egress buffer occupancy (used/capacity) at send time
#[strum(to_string = "mixnet_packet_egress_buffer_fill_ratio")]
#[strum(props(
help = "Per-connection egress buffer fill ratio (used/capacity) sampled at packet send time"
))]
EgressBufferFillRatio,
}
/// Pre-register every stage histogram (at zero) into the global metrics registry so the whole
/// `mixnet_packet_*` family is present on the prometheus endpoint from boot, before any sampled
/// packet has been observed. Idempotent.
pub fn register_stage_metrics() {
impl MixnetMetric {
/// Histogram bucket layout for this metric.
fn buckets(&self) -> &'static [f64] {
match self {
MixnetMetric::ForwarderDrainBatchSize | MixnetMetric::ForwarderDelayDrainBatchSize => {
&DRAIN_BATCH_BUCKETS
}
MixnetMetric::EgressBufferFillRatio => &EGRESS_FILL_BUCKETS,
// every latency stage shares the seconds buckets
_ => &STAGE_LATENCY_BUCKETS,
}
}
}
/// Pre-register every histogram (at zero) into the global metrics registry so the whole
/// `mixnet_packet_*` family is present on the prometheus endpoint from boot, before anything has
/// been observed. Idempotent.
pub fn register_all() {
let registry = nym_metrics::metrics_registry();
for stage in TraceStage::iter() {
for metric in MixnetMetric::iter() {
registry.register_histogram(
stage.as_ref(),
stage.get_str("help"),
Some(STAGE_LATENCY_BUCKETS.as_slice()),
metric.as_ref(),
metric.get_str("help"),
Some(metric.buckets()),
);
}
}
/// Observe a stage latency into the process-global metrics registry. Explicit metric name (no
/// per-crate prefix) so every stage lands in one uniform `mixnet_packet_*` family regardless of
/// which crate records it.
fn observe(stage: TraceStage, secs: f64) {
/// Observe a value into a metric's histogram in the process-global registry.
fn observe(metric: MixnetMetric, value: f64) {
nym_metrics::metrics_registry().maybe_register_and_add_to_histogram(
stage.as_ref(),
secs,
Some(STAGE_LATENCY_BUCKETS.as_slice()),
stage.get_str("help"),
metric.as_ref(),
value,
Some(metric.buckets()),
metric.get_str("help"),
);
}
/// Observe how many ingress-channel packets the forwarder drained in a single wakeup.
pub fn observe_drain_batch_size(batch_size: usize) {
observe(MixnetMetric::ForwarderDrainBatchSize, batch_size as f64);
}
/// Observe how many expired delay-queue packets the forwarder drained in a single wakeup.
pub fn observe_delay_drain_batch_size(batch_size: usize) {
observe(
MixnetMetric::ForwarderDelayDrainBatchSize,
batch_size as f64,
);
}
/// Observe how full a per-connection egress buffer was when a packet was queued for it.
pub fn observe_egress_buffer_fill(used: usize, capacity: usize) {
if capacity == 0 {
return;
}
observe(
MixnetMetric::EgressBufferFillRatio,
used as f64 / capacity as f64,
);
}
@@ -111,6 +182,11 @@ impl PacketTrace {
}
}
/// Whether this packet is being traced (sampled).
pub fn is_sampled(&self) -> bool {
matches!(self, PacketTrace::On { .. })
}
/// Seconds spent in the stage just completed, advancing the cursor to now.
/// Returns `None` for unsampled packets.
fn lap(&mut self) -> Option<f64> {
@@ -137,24 +213,24 @@ impl PacketTrace {
/// Close out the stage just completed: lap the timer and, only if the packet is sampled,
/// observe `stage`'s latency histogram.
pub fn record(&mut self, stage: TraceStage) {
pub fn record(&mut self, stage: MixnetMetric) {
if let Some(secs) = self.lap() {
observe(stage, secs);
}
}
/// Observe the end-to-end [`TraceStage::Total`] latency (since receive) if sampled. Unlike
/// Observe the end-to-end [`MixnetMetric::Total`] latency (since receive) if sampled. Unlike
/// [`PacketTrace::record`] this does not lap, so it can be called at the very end.
pub fn record_total(&self) {
if let Some(secs) = self.total() {
observe(TraceStage::Total, secs);
observe(MixnetMetric::Total, secs);
}
}
/// Observe an explicit `secs` value for `stage` if the packet is sampled, without lapping the
/// stage cursor. For diagnostics that don't fit the sequential waterfall (e.g. delay-queue
/// overrun, measured against the target deadline rather than the previous stage).
pub fn record_value(&self, stage: TraceStage, secs: f64) {
pub fn record_value(&self, stage: MixnetMetric, secs: f64) {
if matches!(self, PacketTrace::On { .. }) {
observe(stage, secs);
}
@@ -183,12 +259,12 @@ impl<T> Traced<T> {
}
/// Record the stage just completed for the carried trace (see [`PacketTrace::record`]).
pub fn record(&mut self, stage: TraceStage) {
pub fn record(&mut self, stage: MixnetMetric) {
self.trace.record(stage)
}
/// Observe an explicit value for the carried trace (see [`PacketTrace::record_value`]).
pub fn record_value(&self, stage: TraceStage, secs: f64) {
pub fn record_value(&self, stage: MixnetMetric, secs: f64) {
self.trace.record_value(stage, secs)
}
}
@@ -198,28 +274,38 @@ mod tests {
use super::*;
// guards that AsRefStr honours `#[strum(to_string = ...)]` (rather than falling back to the
// variant name) and that every stage carries a help string.
// variant name), that every metric is in the `mixnet_packet_*` family, and carries a help
// string, and that each metric resolves to a bucket layout.
#[test]
fn every_stage_has_a_mixnet_packet_name_and_help() {
for stage in TraceStage::iter() {
fn every_metric_has_a_mixnet_packet_name_help_and_buckets() {
for metric in MixnetMetric::iter() {
assert!(
stage.as_ref().starts_with("mixnet_packet_"),
metric.as_ref().starts_with("mixnet_packet_"),
"unexpected metric name: {}",
stage.as_ref()
metric.as_ref()
);
assert!(
stage.get_str("help").is_some(),
metric.get_str("help").is_some(),
"missing help for {}",
stage.as_ref()
metric.as_ref()
);
assert!(
!metric.buckets().is_empty(),
"missing buckets for {}",
metric.as_ref()
);
}
assert_eq!(
TraceStage::Unwrap.as_ref(),
MixnetMetric::Unwrap.as_ref(),
"mixnet_packet_stage_unwrap_seconds"
);
assert_eq!(
TraceStage::Total.as_ref(),
MixnetMetric::Total.as_ref(),
"mixnet_packet_total_latency_seconds"
);
assert_eq!(
MixnetMetric::ForwarderDrainBatchSize.as_ref(),
"mixnet_packet_forwarder_drain_batch_size"
);
}
}
@@ -20,7 +20,7 @@ use nym_api_requests::models::network_monitor::{
};
use nym_api_requests::models::node_families::NodeFamily;
use nym_api_requests::models::{
AnnotationResponseV1, ApiHealthResponse, BinaryBuildInformationOwned,
AnnotationResponseV1, AnnotationResponseV2, ApiHealthResponse, BinaryBuildInformationOwned,
ChainBlocksStatusResponse, ChainStatusResponse, KeyRotationInfoResponse,
NodePerformanceResponse, NodeRefreshBody, NymNodeDescriptionV1, NymNodeDescriptionV2,
PerformanceHistoryResponse, RewardedSetResponse, SignerInformationResponse,
@@ -1033,6 +1033,22 @@ pub trait NymApiClientExt: ApiClient {
.await
}
async fn get_node_annotation_v2(
&self,
node_id: NodeId,
) -> Result<AnnotationResponseV2, NymAPIError> {
self.get_json(
&[
routes::V2_API_VERSION,
routes::NYM_NODES_ROUTES,
routes::NYM_NODES_ANNOTATION,
&node_id.to_string(),
],
NO_PARAMS,
)
.await
}
#[deprecated]
async fn get_mixnode_avg_uptime(&self, mix_id: NodeId) -> Result<UptimeResponse, NymAPIError> {
self.get_json(
-1
View File
@@ -12,7 +12,6 @@ pub mod ecash;
#[cfg(all(feature = "env", feature = "network"))]
pub mod env_setup;
pub mod mainnet;
pub mod sandbox;
#[cfg(feature = "network")]
pub mod network;
-39
View File
@@ -61,7 +61,6 @@ pub struct NymNetworkDetails {
pub nym_vpn_api_url: Option<String>,
pub nym_api_urls: Option<Vec<ApiUrl>>,
pub nym_vpn_api_urls: Option<Vec<ApiUrl>>,
pub networking: NetworkingSpecifics,
}
#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize, JsonSchema)]
@@ -103,30 +102,6 @@ impl From<ApiUrlConst<'_>> for ApiUrl {
}
}
#[derive(Clone, Debug, Hash, Serialize, Deserialize, PartialEq, Eq, JsonSchema)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct DnsFallback {
pub url: String,
pub addresses: Vec<String>,
}
#[derive(Clone, Debug, Hash, Serialize, Deserialize, PartialEq, Eq, JsonSchema)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct NetworkingSpecifics {
pub nym_api_urls: Vec<ApiUrl>,
pub nym_vpn_api_urls: Vec<ApiUrl>,
pub dns_fallbacks: Vec<DnsFallback>,
// pub internal_nameservers: std::any::Any,
// pub covert channels: std::any::Any,
}
// by default we assume the same defaults as mainnet, i.e. same prefixes and denoms
impl Default for NetworkingSpecifics {
fn default() -> Self {
NymNetworkDetails::mainnet_specifics()
}
}
// by default we assume the same defaults as mainnet, i.e. same prefixes and denoms
impl Default for NymNetworkDetails {
fn default() -> Self {
@@ -156,7 +131,6 @@ impl NymNetworkDetails {
nym_vpn_api_url: Default::default(),
nym_api_urls: Default::default(),
nym_vpn_api_urls: Default::default(),
networking: Default::default(),
}
}
@@ -267,19 +241,6 @@ impl NymNetworkDetails {
.map(Into::into)
.collect(),
),
networking: Self::mainnet_specifics(),
}
}
pub(crate) fn mainnet_specifics() -> NetworkingSpecifics {
NetworkingSpecifics {
nym_api_urls: mainnet::NYM_APIS.iter().copied().map(Into::into).collect(),
nym_vpn_api_urls: mainnet::NYM_VPN_APIS
.iter()
.copied()
.map(Into::into)
.collect(),
dns_fallbacks: Vec::new(),
}
}
-138
View File
@@ -1,138 +0,0 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#[cfg(feature = "network")]
use crate::{
ApiUrlConst, ChainDetails, DenomDetails, NetworkingSpecifics, NymContracts, NymNetworkDetails,
ValidatorDetails,
};
#[cfg(feature = "network")]
use std::ops::Not;
pub const NETWORK_NAME: &str = "sandbox";
pub const BECH32_PREFIX: &str = "n";
#[cfg(feature = "network")]
pub const MIX_DENOM: DenomDetails = DenomDetails::new("unym", "nym", 6);
#[cfg(feature = "network")]
pub const STAKE_DENOM: DenomDetails = DenomDetails::new("unyx", "nyx", 6);
// -- Contract addresses --
pub const MIXNET_CONTRACT_ADDRESS: &str =
"n1xr3rq8yvd7qplsw5yx90ftsr2zdhg4e9z60h5duusgxpv72hud3sjkxkav";
pub const VESTING_CONTRACT_ADDRESS: &str =
"n1unyuj8qnmygvzuex3dwmg9yzt9alhvyeat0uu0jedg2wj33efl5qackslz";
pub const ECASH_CONTRACT_ADDRESS: &str =
"n1v3vydvs2ued84yv3khqwtgldmgwn0elljsdh08dr5s2j9x4rc5fs9jlwz9";
pub const GROUP_CONTRACT_ADDRESS: &str =
"n1ewmwz97xm0h8rdk8sw7h9mwn866qkx9hl9zlmagqfkhuzvwk5hhq844ue9";
pub const MULTISIG_CONTRACT_ADDRESS: &str =
"n1tz0setr8vkh9udp8xyxgpqc89ns27k4d0jx2h942hr0ax63yjhmqz6xct8";
pub const COCONUT_DKG_CONTRACT_ADDRESS: &str =
"n1v3n2ly2dp3a9ng3ff6rh26yfkn0pc5hed7w2shc5u9ca5c865utqj5elvh";
// \/ TODO: this has to be updated once the contract is deployed
pub const PERFORMANCE_CONTRACT_ADDRESS: &str = "";
// /\ TODO: this has to be updated once the contract is deployed
pub const NETWORK_MONITORS_CONTRACT_ADDRESS: &str =
"n1x5krtvyqklj360x38v62ze42g8s8trfsfqzlv8c9296chcpvqadssqnem5";
// \/ TODO: this has to be updated once the contract is deployed
pub const NODE_FAMILIES_CONTRACT_ADDRESS: &str =
"n13clyapdqk5umyynp20kqwf59rxlwlp24yf2ltzasflhsdhrxq7fsahyr6z";
// /\ TODO: this has to be updated once the contract is deployed
pub const NYXD_URL: &str = "https://rpc.nymtech.net";
pub const NYXD_WS: &str = "wss://rpc.nymtech.net/websocket";
// cluster of lite rpc nodes (not part of consensus, aggressive pruning, no archival state)
pub const NYXD_QUERY_LITE: &str = "https://blockstream.nymtech.net";
pub const NYXD_WS_LITE: &str = "wss://blockstream.nymtech.net/websocket";
pub const UPGRADE_MODE_ATTESTATION_URL: &str =
"https://nymtech.net/.wellknown/upgrade-mode/attestation.json";
pub const UPGRADE_MODE_ATTESTER_ED25519_BS58_PUBKEY: &str =
"3bgffBYcfFkTTXc2npNNn9MkddFZ3H2LrPjXDmnJzrqd";
#[cfg(feature = "network")]
pub const NYM_VPN_APIS: &[ApiUrlConst] = &[
ApiUrlConst {
url: "https://nym-vpn-api-git-deploy-sandbox-nyx-network-staging.vercel.app/api/",
front_hosts: None,
},
ApiUrlConst {
url: "https://nym-frontdoor.vercel.app/sandbox/nym-vpn-api/",
front_hosts: Some(&["vercel.com", "vercel.app"]),
},
];
#[cfg(feature = "network")]
pub const NYM_APIS: &[ApiUrlConst] = &[
ApiUrlConst {
url: "https://sandbox-nym-api1.nymtech.net/api/",
front_hosts: Some(&["yelp.global.ssl.fastly.net"]),
},
ApiUrlConst {
url: "https://nym-frontdoor.vercel.app/sandbox/nym-api/",
front_hosts: Some(&["vercel.com", "vercel.app"]),
},
];
pub const EXIT_POLICY_URL: &str =
"https://nymtech.net/.wellknown/network-requester/exit-policy.txt";
#[cfg(feature = "network")]
pub fn validators() -> Vec<ValidatorDetails> {
vec![ValidatorDetails::new(
"https://rpc.sandbox.nymtech.net",
Some("https://sandbox-nym-api1.nymtech.net/api"),
Some("wss://rpc.sandbox.nymtech.net/websocket"),
)]
}
#[cfg(feature = "network")]
pub fn network_details() -> NymNetworkDetails {
NymNetworkDetails {
network_name: NETWORK_NAME.into(),
chain_details: ChainDetails {
bech32_account_prefix: BECH32_PREFIX.to_string(),
mix_denom: MIX_DENOM.into(),
stake_denom: STAKE_DENOM.into(),
},
endpoints: validators(),
contracts: NymContracts {
mixnet_contract_address: parse_optional_str(MIXNET_CONTRACT_ADDRESS),
vesting_contract_address: parse_optional_str(VESTING_CONTRACT_ADDRESS),
performance_contract_address: parse_optional_str(PERFORMANCE_CONTRACT_ADDRESS),
network_monitors_contract_address: parse_optional_str(
NETWORK_MONITORS_CONTRACT_ADDRESS,
),
node_families_contract_address: parse_optional_str(NODE_FAMILIES_CONTRACT_ADDRESS),
ecash_contract_address: parse_optional_str(ECASH_CONTRACT_ADDRESS),
group_contract_address: parse_optional_str(GROUP_CONTRACT_ADDRESS),
multisig_contract_address: parse_optional_str(MULTISIG_CONTRACT_ADDRESS),
coconut_dkg_contract_address: parse_optional_str(COCONUT_DKG_CONTRACT_ADDRESS),
},
nym_vpn_api_url: None,
nym_vpn_api_urls: None,
nym_api_urls: None,
networking: network_specifics(),
}
}
#[cfg(feature = "network")]
pub fn network_specifics() -> NetworkingSpecifics {
NetworkingSpecifics {
nym_api_urls: NYM_APIS.iter().copied().map(Into::into).collect(),
nym_vpn_api_urls: NYM_VPN_APIS.iter().copied().map(Into::into).collect(),
dns_fallbacks: Vec::new(),
}
}
#[cfg(feature = "network")]
fn parse_optional_str(raw: &str) -> Option<String> {
raw.is_empty().not().then(|| raw.into())
}
@@ -73,6 +73,27 @@ impl<T> NonExhaustiveDelayQueue<T> {
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
/// Pop the next *already-expired* item without awaiting, or `None` if nothing is ready right
/// now (the queue is empty, or its earliest item has not reached its deadline yet). Lets a
/// caller drain a burst of simultaneously-expired items in a tight loop without yielding.
///
/// It polls the inner queue with a **no-op waker**, so a not-yet-due (`None`) result registers
/// no real wakeup. This is therefore sound ONLY when the caller subsequently polls the
/// [`Stream`] impl (`.next().await`) before parking the task - that re-arms the timer against
/// the task's real waker, superseding the no-op one. The intended use is "drain the extra ready
/// items right after `.next()` yielded one, in a loop that returns to `.next().await`". Calling
/// it as the last thing before suspending would drop the wakeup (same caveat as
/// `futures::FutureExt::now_or_never`).
pub fn try_next_expired(&mut self) -> Option<Expired<T>> {
let mut cx = Context::from_waker(Waker::noop());
match Pin::new(&mut self.inner).poll_expired(&mut cx) {
// a ready-expired item, or `None` because the queue is empty
Poll::Ready(maybe_item) => maybe_item,
// queue is non-empty but nothing is due yet
Poll::Pending => None,
}
}
}
impl<T> Default for NonExhaustiveDelayQueue<T> {
+1 -1
View File
@@ -19,7 +19,7 @@ use tracing::{debug, trace, warn};
pub use crate::node::{EntryDetails, RoutingNode, SupportedRoles};
pub use error::NymTopologyError;
pub use nym_mixnet_contract_common::nym_node::Role;
pub use nym_mixnet_contract_common::{EpochRewardedSet, NodeId};
pub use nym_mixnet_contract_common::{EpochRewardedSet, NodeId, RewardedSet};
pub use rewarded_set::CachedEpochRewardedSet;
pub mod error;
+15 -1
View File
@@ -8,6 +8,7 @@ use nym_mixnet_contract_common::NodeId;
use nym_sphinx_addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx_types::Node as SphinxNode;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::net::{IpAddr, SocketAddr};
use thiserror::Error;
@@ -45,7 +46,7 @@ impl From<DeclaredRolesV1> for SupportedRoles {
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub struct RoutingNode {
pub node_id: NodeId,
@@ -58,6 +59,19 @@ pub struct RoutingNode {
pub supported_roles: SupportedRoles,
}
impl Debug for RoutingNode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RoutingNode")
.field("node_id", &self.node_id)
.field("mix_host", &self.mix_host)
.field("entry", &self.entry)
.field("identity_key", &self.identity_key.to_base58_string())
.field("sphinx_key", &self.sphinx_key.to_base58_string())
.field("supported_roles", &self.supported_roles)
.finish()
}
}
impl RoutingNode {
pub fn ws_entry_address_tls(&self) -> Option<String> {
let entry = self.entry.as_ref()?;
@@ -395,8 +395,8 @@ impl From<nym_node_requests::api::v1::node::models::AnnouncePorts> for AnnounceP
}
}
impl From<nym_node_requests::api::v1::node::models::AuxiliaryDetails> for AuxiliaryDetailsV1 {
fn from(value: nym_node_requests::api::v1::node::models::AuxiliaryDetails) -> Self {
impl From<nym_node_requests::api::v1::node::models::AuxiliaryDetailsV1> for AuxiliaryDetailsV1 {
fn from(value: nym_node_requests::api::v1::node::models::AuxiliaryDetailsV1) -> Self {
AuxiliaryDetailsV1 {
location: value.location,
announce_ports: value.announce_ports.into(),
+1 -1
View File
@@ -41,7 +41,7 @@ use utoipauto::utoipauto;
nym_config::defaults::NymContracts,
ContractVersionSchemaResponse,
nym_bin_common::build_information::BinaryBuildInformationOwned,
nym_node_requests::api::v1::node::models::AuxiliaryDetails,
nym_node_requests::api::v1::node::models::AuxiliaryDetailsV1,
nym_contracts_common::ContractBuildInformation
))
)]
+1 -1
View File
@@ -16,7 +16,7 @@ use nym_lp::peer::{DHPublicKey, LpRemotePeer};
use nym_lp_data::packet::version;
use nym_network_defaults::DEFAULT_NYM_NODE_HTTP_PORT;
use nym_node_requests::api::client::NymNodeApiClientExt;
use nym_node_requests::api::v1::node::models::AuxiliaryDetails as NodeAuxiliaryDetails;
use nym_node_requests::api::v1::node::models::AuxiliaryDetailsV1 as NodeAuxiliaryDetails;
use nym_sdk::mixnet::NodeIdentity;
use nym_sdk::mixnet::Recipient;
use nym_validator_client::client::NymApiClientExt;
+13
View File
@@ -24,6 +24,9 @@ pub struct NetworkStats {
// outgoing LP control connections to nodes
active_lp_egress_node_connections: AtomicUsize,
// cumulative count of ingress mixnet connections closed due to the idle timeout
idle_closed_ingress_mixnet_connections: AtomicUsize,
}
impl NetworkStats {
@@ -37,6 +40,16 @@ impl NetworkStats {
.fetch_sub(1, Ordering::Relaxed);
}
pub fn ingress_mixnet_idle_closed(&self) {
self.idle_closed_ingress_mixnet_connections
.fetch_add(1, Ordering::Relaxed);
}
pub fn idle_closed_ingress_mixnet_connections_count(&self) -> usize {
self.idle_closed_ingress_mixnet_connections
.load(Ordering::Relaxed)
}
pub fn new_ingress_websocket_client(&self) {
self.active_ingress_websocket_connections
.fetch_add(1, Ordering::Relaxed);
@@ -195,6 +195,11 @@ pub enum PrometheusMetric {
#[strum(props(help = "The number of active egress mixnet connections"))]
NetworkActiveEgressMixnetConnections,
#[strum(props(
help = "The cumulative number of ingress mixnet connections closed due to the idle timeout"
))]
NetworkIdleClosedIngressMixnetConnections,
// # PROCESS
#[strum(props(help = "The current number of packets being delayed"))]
ProcessForwardHopPacketsBeingDelayed,
@@ -218,6 +223,26 @@ pub enum PrometheusMetric {
help = "The current number of forward hop packets stuck in channels waiting to get delivered to appropriate TCP connections"
))]
ProcessForwardHopPacketsPendingDelivery,
// # TOKIO RUNTIME
#[strum(props(help = "Number of tokio worker threads"))]
TokioRuntimeNumWorkers,
#[strum(props(help = "Currently alive (spawned, not yet completed) tokio tasks"))]
TokioRuntimeAliveTasks,
#[strum(props(
help = "Tasks waiting in the tokio global run queue (runtime scheduling pressure)"
))]
TokioRuntimeGlobalQueueDepth,
// the per-worker timing below is only exposed by tokio when the binary is built with
// `--cfg tokio_unstable`; without that flag the handler can't sample it and these stay at 0.
#[strum(props(help = "Fraction of worker-thread time spent busy over the last interval"))]
TokioRuntimeBusyRatio,
#[strum(props(help = "Cumulative tokio worker poll count across all workers"))]
TokioRuntimeWorkerPollCount,
}
impl PrometheusMetric {
@@ -350,6 +375,9 @@ impl PrometheusMetric {
PrometheusMetric::NetworkActiveEgressMixnetConnections => {
Metric::new_int_gauge(&name, help)
}
PrometheusMetric::NetworkIdleClosedIngressMixnetConnections => {
Metric::new_int_gauge(&name, help)
}
PrometheusMetric::ProcessForwardHopPacketsBeingDelayed => {
Metric::new_int_gauge(&name, help)
}
@@ -363,6 +391,11 @@ impl PrometheusMetric {
PrometheusMetric::ProcessForwardHopPacketsPendingDelivery => {
Metric::new_int_gauge(&name, help)
}
PrometheusMetric::TokioRuntimeNumWorkers => Metric::new_int_gauge(&name, help),
PrometheusMetric::TokioRuntimeAliveTasks => Metric::new_int_gauge(&name, help),
PrometheusMetric::TokioRuntimeGlobalQueueDepth => Metric::new_int_gauge(&name, help),
PrometheusMetric::TokioRuntimeBusyRatio => Metric::new_float_gauge(&name, help),
PrometheusMetric::TokioRuntimeWorkerPollCount => Metric::new_int_gauge(&name, help),
}
}
@@ -458,7 +491,7 @@ mod tests {
// a sanity check for anyone adding new metrics. if this test fails,
// make sure any methods on `PrometheusMetric` enum don't need updating
// or require custom Display impl
assert_eq!(47, PrometheusMetric::COUNT)
assert_eq!(53, PrometheusMetric::COUNT)
}
#[test]
+2 -2
View File
@@ -11,7 +11,7 @@ use crate::api::v1::ip_packet_router::models::IpPacketRouter;
use crate::api::v1::network_requester::exit_policy::models::UsedExitPolicy;
use crate::api::v1::network_requester::models::NetworkRequester;
use crate::api::v1::node::models::{
AuxiliaryDetails, NodeDescription, NodeRoles, SignedHostInformation,
AuxiliaryDetailsV1, NodeDescription, NodeRoles, SignedHostInformation,
};
use crate::api::v1::node_load::models::NodeLoad;
use crate::routes;
@@ -55,7 +55,7 @@ pub trait NymNodeApiClientExt: ApiClient {
self.get_json_from(routes::api::v1::roles_absolute()).await
}
async fn get_auxiliary_details(&self) -> Result<AuxiliaryDetails, NymNodeApiClientError> {
async fn get_auxiliary_details(&self) -> Result<AuxiliaryDetailsV1, NymNodeApiClientError> {
self.get_json_from(routes::api::v1::auxiliary_absolute())
.await
}
@@ -15,6 +15,7 @@ use std::ops::Deref;
pub mod client;
pub mod helpers;
pub mod v1;
pub mod v2;
#[cfg(feature = "client")]
pub use client::Client;
@@ -12,6 +12,7 @@ use serde::{Deserialize, Serialize};
use std::net::IpAddr;
pub use crate::api::SignedHostInformation;
use crate::api::v2::node::models::AuxiliaryDetailsV2;
pub use nym_bin_common::build_information::BinaryBuildInformationOwned;
#[derive(Clone, Default, Debug, Copy, Serialize, Deserialize, JsonSchema)]
@@ -366,7 +367,7 @@ pub struct NodeDescription {
/// Auxiliary details of the associated Nym Node.
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct AuxiliaryDetails {
pub struct AuxiliaryDetailsV1 {
/// Optional ISO 3166 alpha-2 two-letter country code of the node's **physical** location
#[cfg_attr(feature = "openapi", schema(example = "PL", value_type = Option<String>))]
#[schemars(with = "Option<String>")]
@@ -383,6 +384,16 @@ pub struct AuxiliaryDetails {
pub accepted_operator_terms_and_conditions: bool,
}
impl From<AuxiliaryDetailsV2> for AuxiliaryDetailsV1 {
fn from(v2: AuxiliaryDetailsV2) -> Self {
Self {
location: v2.location,
announce_ports: v2.announce_ports,
accepted_operator_terms_and_conditions: v2.accepted_operator_terms_and_conditions,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -0,0 +1,4 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
pub mod node;
@@ -0,0 +1,4 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
pub mod models;
@@ -0,0 +1,30 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::api::v1::node::models::AnnouncePorts;
use celes::Country;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
/// Auxiliary details of the associated Nym Node.
#[derive(Clone, Debug, Default, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct AuxiliaryDetailsV2 {
/// Optional ISO 3166 alpha-2 two-letter country code of the node's **physical** location
#[cfg_attr(feature = "openapi", schema(example = "PL", value_type = Option<String>))]
#[schemars(with = "Option<String>")]
#[schemars(length(equal = 2))]
pub location: Option<Country>,
/// On-chain address of this node
pub address: String,
#[serde(default)]
pub announce_ports: AnnouncePorts,
/// Specifies whether this node operator has agreed to the terms and conditions
/// as defined at <https://nymtech.net/terms-and-conditions/operators/v1.0.0>
// make sure to include the default deserialisation as this field hasn't existed when the struct was first created
#[serde(default)]
pub accepted_operator_terms_and_conditions: bool,
}
+14
View File
@@ -23,8 +23,14 @@ pub mod routes {
pub mod api {
pub const V1: &str = "/v1";
pub const V2: &str = "/v2";
// canonical, version-neutral Swagger UI mount
pub const SWAGGER: &str = "/swagger";
absolute_route!(v1_absolute, super::API, V1);
absolute_route!(v2_absolute, super::API, V2);
absolute_route!(swagger_absolute, super::API, SWAGGER);
pub mod v1 {
use super::*;
@@ -152,6 +158,14 @@ pub mod routes {
// use super::*;
}
}
pub mod v2 {
use super::*;
pub const AUXILIARY: &str = "/auxiliary-details";
absolute_route!(auxiliary_absolute, v2_absolute(), AUXILIARY);
}
}
}
+21
View File
@@ -693,6 +693,18 @@ pub struct MixnetDebug {
#[serde(with = "humantime_serde")]
pub initial_connection_timeout: Duration,
/// How long a mixnet connection (ingress or egress) may sit with no packets before it is
/// closed to free the lingering task/socket. Reset by any traffic on that connection.
/// 0 disables idle reaping.
#[serde(with = "humantime_serde")]
pub connection_idle_timeout: Duration,
/// Max time a single egress batch flush may block on a peer socket before the batch is
/// abandoned. A few consecutive timeouts drop the (congested) connection; a single one is
/// treated as transient and the connection is retained. 0 disables the bound.
#[serde(with = "humantime_serde")]
pub connection_write_timeout: Duration,
/// Maximum number of packets buffered per egress connection awaiting a socket write.
/// This is a short-term burst absorber, not a queue: buffer depth converts directly into
/// added latency (roughly `depth / per-peer send rate`), so an oversized value is just
@@ -893,6 +905,13 @@ impl MixnetDebug {
const DEFAULT_PACKET_FORWARDING_INITIAL_BACKOFF: Duration = Duration::from_millis(10_000);
const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_secs(16);
const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500);
// reap a mixnet connection after 5min of no traffic; under cover traffic real neighbours
// exchange packets far more often, so only genuinely-silent/half-open peers are closed
const DEFAULT_CONNECTION_IDLE_TIMEOUT: Duration = Duration::from_secs(300);
// bound a single egress flush; healthy flushes are sub-ms, so this only trips on genuine
// peer congestion. A single trip just drops the batch (connection retained); see
// MAX_CONSECUTIVE_WRITE_TIMEOUTS in the mixnet client for the teardown threshold.
const DEFAULT_CONNECTION_WRITE_TIMEOUT: Duration = Duration::from_millis(500);
// small enough to keep worst-case egress queuing in the tens-of-ms range at a few thousand
// pps per peer (vs. the old 2000, which was hundreds of ms of bufferbloat)
const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 192;
@@ -906,6 +925,8 @@ impl Default for MixnetDebug {
packet_forwarding_initial_backoff: Self::DEFAULT_PACKET_FORWARDING_INITIAL_BACKOFF,
packet_forwarding_maximum_backoff: Self::DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF,
initial_connection_timeout: Self::DEFAULT_INITIAL_CONNECTION_TIMEOUT,
connection_idle_timeout: Self::DEFAULT_CONNECTION_IDLE_TIMEOUT,
connection_write_timeout: Self::DEFAULT_CONNECTION_WRITE_TIMEOUT,
maximum_connection_buffer_size: Self::DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE,
egress_trace_sample_rate: Self::DEFAULT_EGRESS_TRACE_SAMPLE_RATE,
// TODO: update this in few releases...
@@ -531,6 +531,8 @@ pub async fn try_upgrade_config_v13<P: AsRef<Path>>(
.debug
.packet_forwarding_maximum_backoff,
initial_connection_timeout: old_cfg.mixnet.debug.initial_connection_timeout,
connection_idle_timeout: MixnetDebug::DEFAULT_CONNECTION_IDLE_TIMEOUT,
connection_write_timeout: MixnetDebug::DEFAULT_CONNECTION_WRITE_TIMEOUT,
maximum_connection_buffer_size: old_cfg.mixnet.debug.maximum_connection_buffer_size,
egress_trace_sample_rate: MixnetDebug::DEFAULT_EGRESS_TRACE_SAMPLE_RATE,
unsafe_disable_noise: old_cfg.mixnet.debug.unsafe_disable_noise,
+6 -1
View File
@@ -5,15 +5,20 @@ use crate::node::http::state::AppState;
use axum::Router;
use nym_node_requests::routes;
pub mod openapi;
pub mod v1;
pub mod v2;
pub use nym_node_requests::api as api_requests;
#[derive(Debug, Clone)]
pub struct Config {
pub v1_config: v1::Config,
pub v2_config: v2::Config,
}
pub(super) fn routes(config: Config) -> Router<AppState> {
Router::new().nest(routes::api::V1, v1::routes(config.v1_config))
Router::new()
.nest(routes::api::V1, v1::routes(config.v1_config))
.nest(routes::api::V2, v2::routes(config.v2_config))
}
@@ -3,7 +3,7 @@
use axum::Router;
use nym_node_requests::api as api_requests;
use nym_node_requests::routes::api::{v1, v1_absolute};
use nym_node_requests::routes;
use utoipa::openapi::security::{Http, HttpAuthScheme};
use utoipa::{Modify, OpenApi, openapi::security::SecurityScheme};
use utoipa_swagger_ui::SwaggerUi;
@@ -37,12 +37,14 @@ use utoipa_swagger_ui::SwaggerUi;
crate::node::http::router::api::v1::gateway::client_interfaces::wireguard_details,
crate::node::http::router::api::v1::gateway::root::root_gateway,
crate::node::http::router::api::v1::lewes_protocol::root::root_lewes_protocol,
crate::node::http::router::api::v2::node::auxiliary::auxiliary,
),
components(
schemas(
nym_http_api_common::Output,
nym_http_api_common::OutputParams,
nym_http_api_common::OutputV2,
nym_http_api_common::OutputParamsV2,
api_requests::v1::health::models::NodeHealth,
api_requests::v1::health::models::NodeStatus,
api_requests::v1::node_load::models::NodeLoad,
@@ -56,7 +58,7 @@ use utoipa_swagger_ui::SwaggerUi;
api_requests::v1::node::models::Cpu,
api_requests::v1::node::models::CryptoHardware,
api_requests::v1::node::models::NodeDescription,
api_requests::v1::node::models::AuxiliaryDetails,
api_requests::v1::node::models::AuxiliaryDetailsV1,
api_requests::v1::metrics::models::LegacyMixingStats,
api_requests::v1::metrics::models::VerlocStats,
api_requests::v1::metrics::models::VerlocResult,
@@ -77,6 +79,7 @@ use utoipa_swagger_ui::SwaggerUi;
api_requests::v1::network_requester::exit_policy::models::UsedExitPolicy,
api_requests::v1::ip_packet_router::models::IpPacketRouter,
api_requests::v1::lewes_protocol::models::LewesProtocol,
api_requests::v2::node::models::AuxiliaryDetailsV2,
),
),
modifiers(&SecurityAddon),
@@ -97,11 +100,14 @@ impl Modify for SecurityAddon {
}
pub(crate) fn route<S: Send + Sync + 'static + Clone>() -> Router<S> {
// provide absolute path to the openapi.json
let config =
utoipa_swagger_ui::Config::from(format!("{}/api-docs/openapi.json", v1_absolute()));
SwaggerUi::new(v1::SWAGGER)
.url("/api-docs/openapi.json", ApiDoc::openapi())
// SwaggerUi must be mounted with its absolute path: it emits internal redirects
// (e.g. `/swagger` → `/swagger/`) whose `Location` header uses this string
// literally and is not aware of any `.nest()` prefix above it. For the same
// reason, this router must be merged at the outer router level — not nested.
let openapi_json = format!("{}/api-docs/openapi.json", routes::API);
let config = utoipa_swagger_ui::Config::from(openapi_json.clone());
SwaggerUi::new(routes::api::swagger_absolute())
.url(openapi_json, ApiDoc::openapi())
.config(config)
.into()
}
@@ -11,7 +11,7 @@ use nym_node_requests::api::v1::authenticator::models::Authenticator;
get,
path = "",
context_path = "/api/v1/authenticator",
tag = "Authenticator",
tag = "v1 / Authenticator",
responses(
(status = 501, description = "the endpoint hasn't been implemented yet"),
(status = 200, content(
@@ -41,7 +41,7 @@ pub(crate) fn routes<S: Send + Sync + 'static + Clone>(
get,
path = "/client-interfaces",
context_path = "/api/v1/gateway",
tag = "Gateway",
tag = "v1 / Gateway",
responses(
(status = 501, description = "the endpoint hasn't been implemented yet"),
(status = 200, content(
@@ -67,7 +67,7 @@ pub type ClientInterfacesResponse = FormattedResponse<ClientInterfaces>;
get,
path = "/mixnet-websockets",
context_path = "/api/v1/gateway/client-interfaces",
tag = "Gateway",
tag = "v1 / Gateway",
responses(
(status = 501, description = "the endpoint hasn't been implemented yet"),
(status = 200, content(
@@ -93,7 +93,7 @@ pub type MixnetWebSocketsResponse = FormattedResponse<WebSockets>;
get,
path = "/wireguard",
context_path = "/api/v1/gateway/client-interfaces",
tag = "Gateway",
tag = "v1 / Gateway",
responses(
(status = 501, description = "the endpoint hasn't been implemented yet"),
(status = 200, content(
@@ -11,7 +11,7 @@ use nym_node_requests::api::v1::gateway::models::Gateway;
get,
path = "",
context_path = "/api/v1/gateway",
tag = "Gateway",
tag = "v1 / Gateway",
responses(
(status = 501, description = "the endpoint hasn't been implemented yet"),
(status = 200, content(
@@ -11,7 +11,7 @@ use nym_node_requests::api::v1::health::models::NodeHealth;
get,
path = "/health",
context_path = "/api/v1",
tag = "Health",
tag = "v1 / Health",
responses(
(status = 200, content(
(NodeHealth = "application/json"),
@@ -11,7 +11,7 @@ use nym_node_requests::api::v1::ip_packet_router::models::IpPacketRouter;
get,
path = "",
context_path = "/api/v1/ip-packet-router",
tag = "IP Packet Router",
tag = "v1 / IP Packet Router",
responses(
(status = 501, description = "the endpoint hasn't been implemented yet"),
(status = 200, content(
@@ -1,23 +1,15 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::http::state::AppState;
use axum::Router;
use axum::routing::get;
use nym_node_requests::api::SignedLewesProtocol;
pub mod root;
#[derive(Debug, Clone)]
pub struct Config {
pub details: SignedLewesProtocol,
}
#[derive(Debug, Clone, Default)]
pub struct Config {}
pub(crate) fn routes<S: Send + Sync + 'static + Clone>(config: Config) -> Router<S> {
Router::new().route(
"/",
get({
let lp_config = config.details;
move |query| root::root_lewes_protocol(lp_config, query)
}),
)
pub(crate) fn routes(_config: Config) -> Router<AppState> {
Router::new().route("/", get(root::root_lewes_protocol))
}
@@ -1,7 +1,8 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use axum::extract::Query;
use crate::node::http::state::AppState;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use nym_http_api_common::{FormattedResponse, OutputParams};
use nym_node_requests::api::{SignedLewesProtocol, SignedLewesProtocolInfo};
@@ -11,7 +12,7 @@ use nym_node_requests::api::{SignedLewesProtocol, SignedLewesProtocolInfo};
get,
path = "/lewes-protocol",
context_path = "/api/v1",
tag = "Lewes Protocol",
tag = "v1 / Lewes Protocol",
responses(
(status = 501, description = "the endpoint hasn't been implemented yet"),
(status = 200, content(
@@ -23,10 +24,10 @@ use nym_node_requests::api::{SignedLewesProtocol, SignedLewesProtocolInfo};
params(OutputParams)
)]
pub(crate) async fn root_lewes_protocol(
config: SignedLewesProtocol,
Query(output): Query<OutputParams>,
State(state): State<AppState>,
) -> Result<LewesProtocolResponse, StatusCode> {
Ok(output.to_response(config))
Ok(output.to_response(state.static_information.lewes_protocol.clone()))
}
pub type LewesProtocolResponse = FormattedResponse<SignedLewesProtocol>;
+1 -1
View File
@@ -11,7 +11,7 @@ use nym_node_requests::api::v1::node_load::models::NodeLoad;
get,
path = "/load",
context_path = "/api/v1",
tag = "Node",
tag = "v1 / Node",
responses(
(status = 200, content(
(NodeLoad = "application/json"),
@@ -13,7 +13,7 @@ use nym_node_requests::api::v1::metrics::models::LegacyMixingStats;
get,
path = "/mixing",
context_path = "/api/v1/metrics",
tag = "Metrics",
tag = "v1 / Metrics",
responses(
(status = 200, content(
(LegacyMixingStats = "application/json"),
@@ -15,7 +15,7 @@ use nym_node_requests::api::v1::metrics::models::packets::{
get,
path = "/packets-stats",
context_path = "/api/v1/metrics",
tag = "Metrics",
tag = "v1 / Metrics",
responses(
(status = 200, content(
(PacketsStats = "application/json"),
@@ -8,7 +8,7 @@ use nym_metrics::metrics;
get,
path = "/prometheus",
context_path = "/api/v1/metrics",
tag = "Metrics",
tag = "v1 / Metrics",
responses(
(status = 200, body = String),
(status = 400, description = "`Authorization` header was missing"),
@@ -14,7 +14,7 @@ use time::macros::time;
get,
path = "/sessions",
context_path = "/api/v1/metrics",
tag = "Metrics",
tag = "v1 / Metrics",
responses(
(status = 200, content(
(SessionStats = "application/json"),
@@ -15,7 +15,7 @@ use crate::node::http::state::metrics::MetricsAppState;
get,
path = "/verloc",
context_path = "/api/v1/metrics",
tag = "Metrics",
tag = "v1 / Metrics",
responses(
(status = 200, content(
(VerlocStats = "application/json"),
@@ -13,7 +13,7 @@ use nym_node_requests::api::v1::metrics::models::WireguardStats;
get,
path = "/wireguard-stats",
context_path = "/api/v1/metrics",
tag = "Metrics",
tag = "v1 / Metrics",
responses(
(status = 200, content(
(WireguardStats = "application/json"),
@@ -11,7 +11,7 @@ use nym_node_requests::api::v1::mixnode::models::Mixnode;
get,
path = "",
context_path = "/api/v1/mixnode",
tag = "Mixnode",
tag = "v1 / Mixnode",
responses(
(status = 501, description = "the endpoint hasn't been implemented yet"),
(status = 200, content(
+7 -2
View File
@@ -3,7 +3,9 @@
use crate::node::http::state::AppState;
use axum::Router;
use axum::response::Redirect;
use axum::routing::get;
use nym_node_requests::routes;
use nym_node_requests::routes::api::v1;
pub mod authenticator;
@@ -18,7 +20,6 @@ pub mod mixnode;
pub mod network;
pub mod network_requester;
pub mod node;
pub mod openapi;
#[derive(Debug, Clone)]
pub struct Config {
@@ -34,7 +35,12 @@ pub struct Config {
}
pub(super) fn routes(config: Config) -> Router<AppState> {
// legacy redirects: the Swagger UI moved to a version-neutral /api/swagger
let swagger_redirect = get(|| async { Redirect::temporary(&routes::api::swagger_absolute()) });
Router::new()
.route(v1::SWAGGER, swagger_redirect.clone())
.route(&format!("{}/", v1::SWAGGER), swagger_redirect)
.route(v1::HEALTH, get(health::root_health))
.route(v1::LOAD, get(load::root_load))
.nest(v1::NETWORK, network::routes())
@@ -59,5 +65,4 @@ pub(super) fn routes(config: Config) -> Router<AppState> {
lewes_protocol::routes(config.lewes_protocol),
)
.merge(node::routes(config.node))
.merge(openapi::route())
}
@@ -11,7 +11,7 @@ use nym_node_requests::api::v1::network::models::UpgradeModeStatus;
get,
path = "/upgrade-mode-status",
context_path = "/api/v1/network",
tag = "Network",
tag = "v1 / Network",
responses(
(status = 200, content(
(UpgradeModeStatus = "application/json"),
@@ -10,7 +10,7 @@ use nym_node_requests::api::v1::network_requester::exit_policy::models::UsedExit
get,
path = "/exit-policy",
context_path = "/api/v1/network-requester",
tag = "Network Requester",
tag = "v1 / Network Requester",
responses(
(status = 200, content(
(UsedExitPolicy = "application/json"),
@@ -11,7 +11,7 @@ use nym_node_requests::api::v1::network_requester::models::NetworkRequester;
get,
path = "",
context_path = "/api/v1/network-requester",
tag = "Network Requester",
tag = "v1 / Network Requester",
responses(
(status = 501, description = "the endpoint hasn't been implemented yet"),
(status = 200, content(
@@ -2,30 +2,31 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::http::router::types::RequestError;
use axum::extract::Query;
use crate::node::http::state::AppState;
use axum::extract::{Query, State};
use nym_http_api_common::{FormattedResponse, OutputParams};
use nym_node_requests::api::v1::node::models::AuxiliaryDetails;
use nym_node_requests::api::v1::node::models::AuxiliaryDetailsV1;
/// Returns auxiliary details of this node.
#[utoipa::path(
get,
path = "/auxiliary-details",
context_path = "/api/v1",
tag = "Node",
tag = "v1 / Node",
responses(
(status = 200, content(
(AuxiliaryDetails = "application/json"),
(AuxiliaryDetails = "application/yaml")
(AuxiliaryDetailsV1 = "application/json"),
(AuxiliaryDetailsV1 = "application/yaml")
)),
),
params(OutputParams)
)]
pub(crate) async fn auxiliary(
description: AuxiliaryDetails,
Query(output): Query<OutputParams>,
State(state): State<AppState>,
) -> Result<AuxiliaryDetailsResponse, RequestError> {
let output = output.output.unwrap_or_default();
Ok(output.to_response(description))
Ok(output.to_response(state.static_information.auxiliary_data.clone().into()))
}
pub type AuxiliaryDetailsResponse = FormattedResponse<AuxiliaryDetails>;
pub type AuxiliaryDetailsResponse = FormattedResponse<AuxiliaryDetailsV1>;
@@ -1,7 +1,8 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use axum::extract::Query;
use crate::node::http::state::AppState;
use axum::extract::{Query, State};
use nym_http_api_common::{FormattedResponse, OutputParams};
use nym_node_requests::api::v1::node::models::BinaryBuildInformationOwned;
@@ -10,7 +11,7 @@ use nym_node_requests::api::v1::node::models::BinaryBuildInformationOwned;
get,
path = "/build-information",
context_path = "/api/v1",
tag = "Node",
tag = "v1 / Node",
responses(
(status = 200, content(
(BinaryBuildInformationOwned = "application/json"),
@@ -20,11 +21,11 @@ use nym_node_requests::api::v1::node::models::BinaryBuildInformationOwned;
params(OutputParams)
)]
pub(crate) async fn build_information(
build_information: BinaryBuildInformationOwned,
Query(output): Query<OutputParams>,
State(state): State<AppState>,
) -> BuildInformationResponse {
let output = output.output.unwrap_or_default();
output.to_response(build_information)
output.to_response(state.static_information.build_information.clone())
}
pub type BuildInformationResponse = FormattedResponse<BinaryBuildInformationOwned>;
@@ -2,7 +2,8 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::http::router::types::RequestError;
use axum::extract::Query;
use crate::node::http::state::AppState;
use axum::extract::{Query, State};
use nym_http_api_common::{FormattedResponse, OutputParams};
use nym_node_requests::api::v1::node::models::NodeDescription;
@@ -11,7 +12,7 @@ use nym_node_requests::api::v1::node::models::NodeDescription;
get,
path = "/description",
context_path = "/api/v1",
tag = "Node",
tag = "v1 / Node",
responses(
(status = 200, content(
(NodeDescription = "application/json"),
@@ -21,11 +22,11 @@ use nym_node_requests::api::v1::node::models::NodeDescription;
params(OutputParams)
)]
pub(crate) async fn description(
description: NodeDescription,
Query(output): Query<OutputParams>,
State(state): State<AppState>,
) -> Result<NodeDescriptionResponse, RequestError> {
let output = output.output.unwrap_or_default();
Ok(output.to_response(description))
Ok(output.to_response(state.static_information.description.clone()))
}
pub type NodeDescriptionResponse = FormattedResponse<NodeDescription>;
@@ -2,7 +2,8 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::http::router::types::RequestError;
use axum::extract::Query;
use crate::node::http::state::AppState;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use nym_http_api_common::{FormattedResponse, OutputParams};
use nym_node_requests::api::v1::node::models::HostSystem;
@@ -12,7 +13,7 @@ use nym_node_requests::api::v1::node::models::HostSystem;
get,
path = "/system-info",
context_path = "/api/v1",
tag = "Node",
tag = "v1 / Node",
responses(
(status = 200, content(
(HostSystem = "application/json"),
@@ -23,12 +24,12 @@ use nym_node_requests::api::v1::node::models::HostSystem;
params(OutputParams)
)]
pub(crate) async fn host_system(
system_info: Option<HostSystem>,
Query(output): Query<OutputParams>,
State(state): State<AppState>,
) -> Result<HostSystemResponse, RequestError> {
let output = output.output.unwrap_or_default();
let Some(system_info) = system_info else {
let Some(system_info) = state.static_information.system_info.clone() else {
return Err(RequestError::new(
"this nym-node does not wish to expose the system information",
StatusCode::FORBIDDEN,
@@ -12,7 +12,7 @@ use nym_node_requests::api::{SignedDataHostInfo, v1::node::models::SignedHostInf
get,
path = "/host-information",
context_path = "/api/v1",
tag = "Node",
tag = "v1 / Node",
responses(
(status = 200, content(
(SignedDataHostInfo = "application/json"),
@@ -10,7 +10,6 @@ use crate::node::http::api::v1::node::roles::roles;
use crate::node::http::state::AppState;
use axum::Router;
use axum::routing::get;
use nym_node_requests::api::v1::node::models;
use nym_node_requests::routes::api::v1;
pub mod auxiliary;
@@ -20,51 +19,15 @@ pub mod hardware;
pub mod host_information;
pub mod roles;
#[derive(Debug, Clone)]
pub struct Config {
pub build_information: models::BinaryBuildInformationOwned,
pub system_info: Option<models::HostSystem>,
pub roles: models::NodeRoles,
pub description: models::NodeDescription,
pub auxiliary_details: models::AuxiliaryDetails,
}
#[derive(Debug, Clone, Copy)]
pub struct Config {}
pub(super) fn routes(config: Config) -> Router<AppState> {
pub(super) fn routes(_config: Config) -> Router<AppState> {
Router::new()
.route(
v1::BUILD_INFO,
get({
let build_info = config.build_information;
move |query| build_information(build_info, query)
}),
)
.route(
v1::ROLES,
get({
let node_roles = config.roles;
move |query| roles(node_roles, query)
}),
)
.route(v1::BUILD_INFO, get(build_information))
.route(v1::ROLES, get(roles))
.route(v1::HOST_INFO, get(host_information))
.route(
v1::SYSTEM_INFO,
get({
let system_info = config.system_info;
move |query| host_system(system_info, query)
}),
)
.route(
v1::NODE_DESCRIPTION,
get({
let node_description = config.description;
move |query| description(node_description, query)
}),
)
.route(
v1::AUXILIARY,
get({
let auxiliary_details = config.auxiliary_details;
move |query| auxiliary(auxiliary_details, query)
}),
)
.route(v1::SYSTEM_INFO, get(host_system))
.route(v1::NODE_DESCRIPTION, get(description))
.route(v1::AUXILIARY, get(auxiliary))
}
@@ -1,7 +1,8 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use axum::extract::Query;
use crate::node::http::state::AppState;
use axum::extract::{Query, State};
use nym_http_api_common::{FormattedResponse, OutputParams};
use nym_node_requests::api::v1::node::models::NodeRoles;
@@ -10,7 +11,7 @@ use nym_node_requests::api::v1::node::models::NodeRoles;
get,
path = "/roles",
context_path = "/api/v1",
tag = "Node",
tag = "v1 / Node",
responses(
(status = 200, content(
(NodeRoles = "application/json"),
@@ -20,11 +21,11 @@ use nym_node_requests::api::v1::node::models::NodeRoles;
params(OutputParams)
)]
pub(crate) async fn roles(
node_roles: NodeRoles,
Query(output): Query<OutputParams>,
State(state): State<AppState>,
) -> RolesResponse {
let output = output.output.unwrap_or_default();
output.to_response(node_roles)
output.to_response(state.static_information.roles)
}
pub type RolesResponse = FormattedResponse<NodeRoles>;
@@ -0,0 +1,16 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::http::state::AppState;
use axum::Router;
pub mod node;
#[derive(Debug, Clone)]
pub struct Config {
pub node: node::Config,
}
pub(super) fn routes(config: Config) -> Router<AppState> {
Router::new().merge(node::routes(config.node))
}
@@ -0,0 +1,35 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::http::router::types::RequestError;
use crate::node::http::state::AppState;
use axum::extract::{Query, State};
use nym_http_api_common::{FormattedResponse, OutputParamsV2};
use nym_node_requests::api::v2::node::models::AuxiliaryDetailsV2;
/// Returns auxiliary details of this node.
#[utoipa::path(
get,
path = "/auxiliary-details",
context_path = "/api/v2",
tag = "v2 / Node",
// distinct from v1's `auxiliary`: OpenAPI requires operationId to be unique
// across the whole document, and Swagger UI routes "Try it out" by operationId
operation_id = "v2_auxiliary",
responses(
(status = 200, content(
(AuxiliaryDetailsV2 = "application/json"),
(AuxiliaryDetailsV2 = "application/yaml")
)),
),
params(OutputParamsV2)
)]
pub(crate) async fn auxiliary(
Query(output): Query<OutputParamsV2>,
State(state): State<AppState>,
) -> Result<AuxiliaryDetailsResponse, RequestError> {
let output = output.output.unwrap_or_default();
Ok(output.to_response(state.static_information.auxiliary_data.clone()))
}
pub type AuxiliaryDetailsResponse = FormattedResponse<AuxiliaryDetailsV2>;
@@ -0,0 +1,17 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::http::api::v2::node::auxiliary::auxiliary;
use crate::node::http::state::AppState;
use axum::Router;
use axum::routing::get;
use nym_node_requests::routes::api::v2;
pub mod auxiliary;
#[derive(Debug, Clone, Copy)]
pub struct Config {}
pub(super) fn routes(_config: Config) -> Router<AppState> {
Router::new().route(v2::AUXILIARY, get(auxiliary))
}
@@ -27,7 +27,7 @@ pub(super) async fn default() -> Html<&'static str> {
<div>
<p> default page of the nym node - you can customize it by setting the 'assets' path under '[http]' section of your config. </p>
You can explore the REST API at <a href = "/api/v1/swagger/">/api/v1/swagger/</a>
You can explore the REST API at <a href = "/api/swagger/">/api/swagger/</a>
</div>
"#,
)
+11 -44
View File
@@ -2,22 +2,18 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::http::NymNodeHttpServer;
use crate::node::http::api::v1::lewes_protocol;
use crate::node::http::error::NymNodeHttpError;
use crate::node::http::state::AppState;
use axum::Router;
use axum::response::Redirect;
use axum::routing::get;
use nym_bin_common::bin_info_owned;
use nym_http_api_common::middleware::logging;
use nym_node_requests::api::SignedLewesProtocol;
use nym_node_requests::api::v1::authenticator::models::Authenticator;
use nym_node_requests::api::v1::gateway::models::{Bridges, Gateway};
use nym_node_requests::api::v1::ip_packet_router::models::IpPacketRouter;
use nym_node_requests::api::v1::mixnode::models::Mixnode;
use nym_node_requests::api::v1::network_requester::exit_policy::models::UsedExitPolicy;
use nym_node_requests::api::v1::network_requester::models::NetworkRequester;
use nym_node_requests::api::v1::node::models::{AuxiliaryDetails, HostSystem, NodeDescription};
use nym_node_requests::routes;
use std::net::SocketAddr;
use std::path::Path;
@@ -36,18 +32,12 @@ pub struct HttpServerConfig {
}
impl HttpServerConfig {
pub fn new(signed_lewes_protocol: SignedLewesProtocol) -> Self {
pub fn new() -> Self {
HttpServerConfig {
landing: Default::default(),
api: api::Config {
v1_config: api::v1::Config {
node: api::v1::node::Config {
build_information: bin_info_owned!(),
system_info: None,
roles: Default::default(),
description: Default::default(),
auxiliary_details: Default::default(),
},
node: api::v1::node::Config {},
metrics: Default::default(),
gateway: Default::default(),
mixnode: Default::default(),
@@ -55,9 +45,10 @@ impl HttpServerConfig {
network_requester: Default::default(),
ip_packet_router: Default::default(),
authenticator: Default::default(),
lewes_protocol: lewes_protocol::Config {
details: signed_lewes_protocol,
},
lewes_protocol: Default::default(),
},
v2_config: api::v2::Config {
node: api::v2::node::Config {},
},
},
}
@@ -69,24 +60,6 @@ impl HttpServerConfig {
self
}
#[must_use]
pub fn with_system_info(mut self, info: HostSystem) -> Self {
self.api.v1_config.node.system_info = Some(info);
self
}
#[must_use]
pub fn with_description(mut self, description: NodeDescription) -> Self {
self.api.v1_config.node.description = description;
self
}
#[must_use]
pub fn with_auxiliary_details(mut self, auxiliary_details: AuxiliaryDetails) -> Self {
self.api.v1_config.node.auxiliary_details = auxiliary_details;
self
}
#[must_use]
pub fn with_gateway_details(mut self, gateway: Gateway) -> Self {
self.api.v1_config.gateway.details = Some(gateway);
@@ -179,6 +152,10 @@ impl NymNodeRouter {
)
.merge(landing_page::routes(config.landing))
.nest(routes::API, api::routes(config.api))
// openapi must be merged at the outer router level (not nested) —
// SwaggerUi emits internal redirects that use absolute paths
// unaware of any `.nest()` prefix
.merge(api::openapi::route())
.layer(axum::middleware::from_fn(logging::log_request_info))
.with_state(state),
}
@@ -208,20 +185,10 @@ impl NymNodeRouter {
#[cfg(test)]
mod tests {
use super::*;
use nym_crypto::asymmetric::{ed25519, x25519};
use nym_node_requests::api::SignedData;
use nym_node_requests::api::v1::lewes_protocol::models::LewesProtocol;
use nym_test_utils::helpers::deterministic_rng;
use std::collections::BTreeMap;
#[test]
fn router_constructs_without_panic() {
let mut rng = deterministic_rng();
let signing = ed25519::KeyPair::new(&mut rng);
let x25519_pub: x25519::DHPublicKey = x25519::PrivateKey::new(&mut rng).public_key().into();
let lp = LewesProtocol::new(false, 0, 0, x25519_pub, BTreeMap::new());
let signed = SignedData::new(lp, signing.private_key()).unwrap();
let config = HttpServerConfig::new(signed);
let config = HttpServerConfig::new();
let _ = NymNodeRouter::new(config, AppState::dummy());
}
}
+32 -1
View File
@@ -4,9 +4,13 @@
use crate::node::http::state::load::CachedNodeLoad;
use crate::node::http::state::metrics::MetricsAppState;
use crate::node::key_rotation::active_keys::ActiveSphinxKeys;
use nym_bin_common::build_information::BinaryBuildInformationOwned;
use nym_credential_verification::UpgradeModeState;
use nym_crypto::asymmetric::ed25519;
use nym_node_metrics::NymNodeMetrics;
use nym_node_requests::api::SignedLewesProtocol;
use nym_node_requests::api::v1::node::models::{HostSystem, NodeDescription, NodeRoles};
use nym_node_requests::api::v2::node::models::AuxiliaryDetailsV2;
use nym_noise_keys::VersionedNoiseKeyV1;
use nym_verloc::measurements::SharedVerlocStats;
use std::net::IpAddr;
@@ -23,6 +27,14 @@ pub(crate) struct StaticNodeInformation {
pub(crate) x25519_versioned_noise_key: Option<VersionedNoiseKeyV1>,
pub(crate) ip_addresses: Vec<IpAddr>,
pub(crate) hostname: Option<String>,
// TODO: move other fields here too
pub(crate) build_information: BinaryBuildInformationOwned,
pub(crate) system_info: Option<HostSystem>,
pub(crate) roles: NodeRoles,
pub(crate) description: NodeDescription,
pub(crate) auxiliary_data: AuxiliaryDetailsV2,
pub(crate) lewes_protocol: SignedLewesProtocol,
}
#[derive(Clone)]
@@ -77,15 +89,34 @@ impl AppState {
#[cfg(test)]
pub(crate) fn dummy() -> Self {
use crate::node::key_rotation::key::SphinxPrivateKey;
use nym_crypto::asymmetric::x25519;
use rand::rngs::OsRng;
let ed25519_keys = ed25519::KeyPair::new(&mut OsRng);
let mut rng = nym_test_utils::helpers::deterministic_rng();
let ed25519_keys = ed25519::KeyPair::new(&mut rng);
let x25519_pub: x25519::DHPublicKey = x25519::PrivateKey::new(&mut rng).public_key().into();
let lp = nym_node_requests::api::v1::lewes_protocol::models::LewesProtocol::new(
false,
0,
0,
x25519_pub,
std::collections::BTreeMap::new(),
);
let signed =
nym_node_requests::api::SignedData::new(lp, ed25519_keys.private_key()).unwrap();
let attester_pk = *ed25519_keys.public_key();
let static_information = StaticNodeInformation {
ed25519_identity_keys: Arc::new(ed25519_keys),
x25519_versioned_noise_key: None,
ip_addresses: vec![],
hostname: None,
build_information: nym_bin_common::bin_info_owned!(),
system_info: None,
roles: Default::default(),
description: Default::default(),
auxiliary_data: Default::default(),
lewes_protocol: signed,
};
let active_sphinx = ActiveSphinxKeys::new_fresh(SphinxPrivateKey::new(&mut OsRng, 0));
@@ -145,11 +145,17 @@ impl OnUpdateMetricsHandler for PrometheusGlobalNodeMetricsRegistryUpdater {
.active_ingress_websocket_connections_count() as i64,
);
self.prometheus_wrapper.set(
NetworkActiveIngressWebSocketConnections,
NetworkActiveEgressMixnetConnections,
self.metrics
.network
.active_egress_mixnet_connections_count() as i64,
);
self.prometheus_wrapper.set(
NetworkIdleClosedIngressMixnetConnections,
self.metrics
.network
.idle_closed_ingress_mixnet_connections_count() as i64,
);
// # PROCESS
self.prometheus_wrapper.set(
+1
View File
@@ -14,6 +14,7 @@ pub(crate) mod legacy_packet_data;
pub(crate) mod mixnet_data_cleaner;
pub(crate) mod pending_egress_packets_updater;
pub(crate) mod prometheus_events_handler;
pub(crate) mod tokio_runtime_updater;
pub(crate) trait RegistrableHandler:
Downcast + OnStartMetricsHandler + OnUpdateMetricsHandler + Send + Sync + 'static
@@ -0,0 +1,105 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
//! Samples tokio runtime metrics (scheduling pressure, busy ratio) into the prometheus registry
//! on each aggregator update tick.
//!
//! `num_workers` / `alive_tasks` / `global_queue_depth` are always available. The per-worker
//! timing (`busy_ratio`, `worker_poll_count`) is only exposed by tokio when the binary is built
//! with `RUSTFLAGS="--cfg tokio_unstable"`; without that flag those two gauges are left at 0.
use crate::node::metrics::handler::{
MetricsHandler, OnStartMetricsHandler, OnUpdateMetricsHandler,
};
use async_trait::async_trait;
use nym_node_metrics::prometheus_wrapper::{
NymNodePrometheusMetrics, PROMETHEUS_METRICS, PrometheusMetric,
};
use tokio::runtime::Handle;
// unique marker type so the aggregator can key this handler (it has no real events)
pub struct TokioRuntimeData;
// a snapshot of cumulative worker-busy time, used to derive the busy ratio over the interval
// between two samples
#[cfg(tokio_unstable)]
#[derive(Clone, Copy)]
struct BusySample {
/// summed busy duration across all workers at the time of the sample
busy: std::time::Duration,
/// when the sample was taken
at: tokio::time::Instant,
}
pub struct TokioRuntimeMetricsUpdater {
prometheus_wrapper: &'static NymNodePrometheusMetrics,
// previous busy snapshot, for deriving the busy ratio
#[cfg(tokio_unstable)]
prev_busy: Option<BusySample>,
}
impl TokioRuntimeMetricsUpdater {
pub(crate) fn new() -> Self {
Self {
prometheus_wrapper: &PROMETHEUS_METRICS,
#[cfg(tokio_unstable)]
prev_busy: None,
}
}
}
#[async_trait]
impl OnStartMetricsHandler for TokioRuntimeMetricsUpdater {}
#[async_trait]
impl OnUpdateMetricsHandler for TokioRuntimeMetricsUpdater {
async fn on_update(&mut self) {
use PrometheusMetric::*;
let m = Handle::current().metrics();
self.prometheus_wrapper
.set(TokioRuntimeNumWorkers, m.num_workers() as i64);
self.prometheus_wrapper
.set(TokioRuntimeAliveTasks, m.num_alive_tasks() as i64);
self.prometheus_wrapper
.set(TokioRuntimeGlobalQueueDepth, m.global_queue_depth() as i64);
// left at their registered 0 unless built with `--cfg tokio_unstable`
#[cfg(tokio_unstable)]
{
let workers = m.num_workers();
let busy: std::time::Duration =
(0..workers).map(|w| m.worker_total_busy_duration(w)).sum();
let now = tokio::time::Instant::now();
if let Some(prev) = self.prev_busy {
let elapsed = now.duration_since(prev.at).as_secs_f64();
let ratio = if workers > 0 && elapsed > 0.0 {
busy.saturating_sub(prev.busy).as_secs_f64() / (elapsed * workers as f64)
} else {
0.0
};
self.prometheus_wrapper
.set_float(TokioRuntimeBusyRatio, ratio);
}
self.prev_busy = Some(BusySample { busy, at: now });
let polls: u64 = (0..workers).map(|w| m.worker_poll_count(w)).sum();
self.prometheus_wrapper
.set(TokioRuntimeWorkerPollCount, polls as i64);
}
}
}
#[async_trait]
impl MetricsHandler for TokioRuntimeMetricsUpdater {
type Events = TokioRuntimeData;
// SAFETY: this handler has no associated events; it only acts on the periodic `on_update`.
#[allow(clippy::panic)]
async fn handle_event(&mut self, _event: Self::Events) {
panic!(
"MetricsHandler::handle_event incorrectly called on TokioRuntimeMetricsUpdater - it has no events"
)
}
}
+51 -5
View File
@@ -4,7 +4,7 @@
use crate::node::key_rotation::active_keys::SphinxKeyGuard;
use crate::node::mixnet::shared::SharedData;
use futures::StreamExt;
use nym_mixnet_client::trace::{PacketTrace, TraceStage, Traced};
use nym_mixnet_client::metrics::{MixnetMetric, PacketTrace, Traced};
use nym_noise::connection::Connection;
use nym_noise::upgrade_noise_responder;
use nym_sphinx_forwarding::packet::MixPacket;
@@ -28,6 +28,12 @@ use tracing::{Span, debug, error, instrument, trace, warn};
/// How often (in packets) the stream-level span updates its packet count.
const SPAN_UPDATE_INTERVAL: u64 = 10_000;
/// Instant at which a connection idle since `last_activity` should be closed, or `None` if idle
/// reaping is disabled (`timeout` is zero).
fn idle_deadline(last_activity: Instant, timeout: Duration) -> Option<Instant> {
(!timeout.is_zero()).then(|| last_activity + timeout)
}
struct PendingReplayCheckPackets {
// map of rotation id used for packet creation to the packets (each carrying the latency
// trace started at receive, so the deferral wait is attributed to the ReplayCheck stage)
@@ -476,7 +482,7 @@ impl ConnectionHandler {
};
// close out the Unwrap stage (partial unwrap: shared secret + header MAC)
trace.record(TraceStage::Unwrap);
trace.record(MixnetMetric::Unwrap);
self.pending_packets.push(now, partially_unwrapped, trace);
// 2. check for packet replay
@@ -567,7 +573,7 @@ impl ConnectionHandler {
rotation_id,
"dropping replayed packet"
);
trace.record(TraceStage::ReplayCheck);
trace.record(MixnetMetric::ReplayCheck);
self.handle_unwrapped_packet(
now,
Err(PacketProcessingError::PacketReplay),
@@ -581,7 +587,7 @@ impl ConnectionHandler {
// finalise the (expensive) full unwrapping, then close out the ReplayCheck stage:
// it spans partial-unwrap -> deferral -> replay check -> finalise
let unwrapped_packet = packet.finalise_unwrapping();
trace.record(TraceStage::ReplayCheck);
trace.record(MixnetMetric::ReplayCheck);
self.handle_unwrapped_packet(now, unwrapped_packet, network_monitor_packet, trace)
.await;
}
@@ -678,7 +684,7 @@ impl ConnectionHandler {
let packet = packet.inner;
let unwrapped_packet = self.try_full_unwrap_packet(packet);
// no replay batching on this path: the Unwrap stage covers the full unwrapping
trace.record(TraceStage::Unwrap);
trace.record(MixnetMetric::Unwrap);
let is_network_monitor_packet = self.is_from_authorised_network_monitor_agent();
self.handle_unwrapped_packet(now, unwrapped_packet, is_network_monitor_packet, trace)
@@ -757,6 +763,8 @@ impl ConnectionHandler {
mut mixnet_connection: Framed<Connection<TcpStream>, NymCodec>,
) {
let mut packets_processed: u64 = 0;
// reset by every received packet; drives the idle-connection reaping below
let mut last_activity = Instant::now();
loop {
// make sure pending packets are not stuck in the queue if we don't get any more packets
// from this sender
@@ -765,6 +773,12 @@ impl ConnectionHandler {
.processing_config
.maximum_replay_detection_deferral,
);
// close the connection (freeing the task/socket) if no packets arrive for too long;
// ingress is read-only, so without this a silently-gone peer would linger forever
let idle_deadline = idle_deadline(
last_activity,
self.shared.processing_config.connection_idle_timeout,
);
tokio::select! {
biased;
@@ -778,6 +792,7 @@ impl ConnectionHandler {
maybe_framed_nym_packet = mixnet_connection.next() => {
match maybe_framed_nym_packet {
Some(Ok(packet)) => {
last_activity = Instant::now();
self.handle_received_nym_packet(packet).await;
packets_processed += 1;
if packets_processed.is_multiple_of(SPAN_UPDATE_INTERVAL) {
@@ -817,6 +832,25 @@ impl ConnectionHandler {
} => {
self.handle_pending_packets_batch(Instant::now()).await;
}
// 4. reap the connection if it has been idle for too long
_ = async move {
match idle_deadline {
Some(d) => tokio::time::sleep_until(d).await,
None => std::future::pending::<()>().await,
}
} => {
debug!(
event = "connection.idle_timeout",
remote_addr = %self.remote_address,
packets_processed,
idle_secs = self.shared.processing_config.connection_idle_timeout.as_secs(),
"closing idle ingress mixnet connection"
);
Span::current().record("exit_reason", "idle_timeout");
Span::current().record("packets_processed", packets_processed);
self.shared.metrics.network.ingress_mixnet_idle_closed();
break
}
}
}
@@ -887,6 +921,18 @@ mod tests {
assert!(pending.flush_deadline(Duration::from_millis(50)).is_none());
}
#[test]
fn idle_deadline_disabled_when_timeout_zero() {
assert!(idle_deadline(Instant::now(), Duration::ZERO).is_none());
}
#[test]
fn idle_deadline_is_last_activity_plus_timeout() {
let now = Instant::now();
let timeout = Duration::from_secs(300);
assert_eq!(idle_deadline(now, timeout), Some(now + timeout));
}
#[test]
fn flush_deadline_is_batch_start_plus_deferral() {
let key = PrivateKey::random();
+155 -54
View File
@@ -7,7 +7,9 @@ use nym_mixnet_client::SendWithoutResponse;
use nym_mixnet_client::forwarder::{
MixForwardingReceiver, MixForwardingSender, PacketToForward, mix_forwarding_channels,
};
use nym_mixnet_client::trace::{TraceStage, Traced};
use nym_mixnet_client::metrics::{
MixnetMetric, Traced, observe_delay_drain_batch_size, observe_drain_batch_size,
};
use nym_node_metrics::NymNodeMetrics;
use nym_nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue};
use nym_sphinx_forwarding::packet::MixPacket;
@@ -16,6 +18,35 @@ use std::io;
use tokio::time::Instant;
use tracing::{debug, error, trace, warn};
/// Max packets handled per `select!` wakeup, per drainable branch (ingress channel and expired
/// delay-queue items), before yielding back to the biased select so shutdown stays responsive.
/// Per-packet work is sub-µs to low-µs, so 256 bounds the worst-case stall to <~1ms.
const MAX_DRAIN_BATCH: usize = 256;
/// The node's single forward-hop egress engine - the last in-node stage of the mixnet pipeline.
///
/// **Where it sits.** Inbound packets are accepted by the mixnet listener and processed
/// per-connection by a `ConnectionHandler`: sphinx unwrap, replay check, and - for *forward* hops -
/// computation of the intended (Poisson) mix delay. The handler hands each one off as a
/// [`PacketToForward`] over the unbounded ingress-to-forwarder channel. This forwarder is the sole
/// consumer of that channel: every forward-hop packet in the node (plus acks) funnels through it.
/// Final-hop packets never reach here; they are delivered to local clients instead.
///
/// **What it does**, per packet:
/// 1. drops it if the [`RoutingFilter`] doesn't recognise the next hop;
/// 2. holds it in the delay queue until its target release instant (the mix delay), or forwards it
/// immediately when the delay is zero or has already elapsed;
/// 3. on release, forwards it to the next hop via the mixnet client (`C: SendWithoutResponse`),
/// which owns the per-connection egress TCP sockets.
///
/// **Design notes.** It runs as one dedicated task owning both the ingress channel and the delay
/// queue. Its [`run`](Self::run) loop wakes on either a new packet or a delay-queue release, then
/// services *both* branches every wakeup - draining queued ingress packets first (to bring the
/// delay queue current) and then releasing everything now due - so a burst on one branch can never
/// starve the other (the failure mode a biased "one branch per wakeup" select would have). Each
/// drain is bounded by [`MAX_DRAIN_BATCH`] so shutdown stays responsive. Along the way it stamps
/// the latency-trace stages it owns (`ForwarderQueue`, `DelayQueue`, `DelayQueueOverrun`), feeding
/// the `mixnet_packet_*` metrics family.
pub struct PacketForwarder<C, F> {
delay_queue: NonExhaustiveDelayQueue<Traced<MixPacket>>,
mixnet_client: C,
@@ -48,7 +79,6 @@ impl<C, F> PacketForwarder<C, F> {
fn forward_packet(&mut self, packet: Traced<MixPacket>)
where
C: SendWithoutResponse,
F: RoutingFilter,
{
let next_hop = packet.inner.next_hop_address();
@@ -77,25 +107,42 @@ impl<C, F> PacketForwarder<C, F> {
fn handle_done_delaying(&mut self, packet: Expired<Traced<MixPacket>>)
where
C: SendWithoutResponse,
F: RoutingFilter,
{
// how late beyond the target release the queue actually handed the packet back: the
// delay-queue's own scheduling/retrieval overhead (timer granularity + task wakeup)
let overrun = Instant::now().saturating_duration_since(packet.deadline());
let mut delayed_packet = packet.into_inner();
// close out the DelayQueue stage (the full wait: intended mix delay + overrun)
delayed_packet.record(TraceStage::DelayQueue);
delayed_packet.record_value(TraceStage::DelayQueueOverrun, overrun.as_secs_f64());
delayed_packet.record(MixnetMetric::DelayQueue);
delayed_packet.record_value(MixnetMetric::DelayQueueOverrun, overrun.as_secs_f64());
self.forward_packet(delayed_packet);
}
/// Drain every packet whose release deadline has already passed (a burst of simultaneous
/// releases), bounded by [`MAX_DRAIN_BATCH`]. `try_next_expired` never blocks, so this is a
/// no-op (returns 0) when nothing is due. Returns how many packets were released.
fn drain_expired(&mut self) -> usize
where
C: SendWithoutResponse,
{
let mut released = 0;
while released < MAX_DRAIN_BATCH {
let Some(expired) = self.delay_queue.try_next_expired() else {
break;
};
self.handle_done_delaying(expired);
released += 1;
}
released
}
fn handle_new_packet(&mut self, mut new_packet: PacketToForward)
where
C: SendWithoutResponse,
F: RoutingFilter,
{
// close out the ForwarderQueue stage (wait in the ingress -> forwarder channel)
new_packet.trace.record(TraceStage::ForwarderQueue);
new_packet.trace.record(MixnetMetric::ForwarderQueue);
let next_hop = new_packet.packet.next_hop();
@@ -119,23 +166,44 @@ impl<C, F> PacketForwarder<C, F> {
// in case of a zero delay packet, don't bother putting it in the delay queue,
// just forward it immediately
if let Some(instant) = delay_target {
// check if the delay has already expired, if so, don't bother putting it through
// the delay queue only to retrieve it immediately. Just forward it.
if instant.checked_duration_since(Instant::now()).is_none() {
// the target elapsed before we could even queue it: upstream overhead already
// ate the whole intended delay, so the overrun is now - target
let overrun = Instant::now().saturating_duration_since(instant);
traced.record_value(TraceStage::DelayQueueOverrun, overrun.as_secs_f64());
self.forward_packet(traced)
} else {
self.delay_queue.insert_at(traced, instant);
}
let Some(instant) = delay_target else {
self.forward_packet(traced);
return;
};
// check if the delay has already expired, if so, don't bother putting it through the delay
// queue only to retrieve it immediately. Just forward it.
if instant.checked_duration_since(Instant::now()).is_none() {
// the target elapsed before we could even queue it: upstream overhead already
// ate the whole intended delay, so the overrun is now - target
let overrun = Instant::now().saturating_duration_since(instant);
traced.record_value(MixnetMetric::DelayQueueOverrun, overrun.as_secs_f64());
self.forward_packet(traced);
} else {
self.forward_packet(traced)
self.delay_queue.insert_at(traced, instant);
}
}
/// Drain every packet currently queued in the ingress channel, bounded by [`MAX_DRAIN_BATCH`]
/// so the per-wakeup `select!`/waker/coop overhead is amortised across the burst rather than
/// paid per packet. `try_recv` never blocks. Returns how many packets were handled.
fn drain_ingress(&mut self) -> usize
where
C: SendWithoutResponse,
F: RoutingFilter,
{
let mut batch_size = 0;
while batch_size < MAX_DRAIN_BATCH {
// Err = channel empty (or closed, which is unreachable since we hold a sender)
let Ok(packet) = self.packet_receiver.try_recv() else {
break;
};
self.handle_new_packet(packet);
batch_size += 1;
}
batch_size
}
fn update_queue_len_metric(&self) {
self.metrics
.process
@@ -148,62 +216,95 @@ impl<C, F> PacketForwarder<C, F> {
.update_packet_forwarder_queue_size(channel_size)
}
/// Log the forwarder's queue depth at a severity reflecting how overloaded it is. Called
/// periodically (~every 1000 packets), not per packet.
fn log_queue_status(
&self,
channel_depth: usize,
packets_processed: u64,
last_drain_batch: usize,
) {
let delay_queue_depth = self.delay_queue.len();
match channel_depth {
n if n > 1000 => error!(
event = "forwarder.queue_overload",
channel_depth = n,
delay_queue_depth,
packets_processed,
last_drain_batch,
"there are currently {n} mix packets waiting to get forwarded - the node seems to be significantly overloaded!"
),
n if n > 500 => warn!(
event = "forwarder.queue_high",
channel_depth = n,
delay_queue_depth,
packets_processed,
last_drain_batch,
"there are currently {n} mix packets waiting to get forwarded - is the node overloaded?"
),
n => trace!(
channel_depth = n,
delay_queue_depth, packets_processed, last_drain_batch, "forwarder queue status"
),
}
}
pub async fn run(&mut self, shutdown_token: ShutdownToken)
where
C: SendWithoutResponse,
F: RoutingFilter,
{
let mut processed: u64 = 0;
let mut last_logged: u64 = 0;
trace!("starting PacketForwarder");
loop {
// packets handled this wakeup per branch; the select arm seeds the one it consumed
let mut ingress = 0usize;
let mut released = 0usize;
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
debug!("PacketForwarder: Received shutdown");
break;
}
new_packet = self.packet_receiver.next() => {
// impossible to panic: the struct holds a sender, so not all senders can drop
#[allow(clippy::unwrap_used)]
self.handle_new_packet(new_packet.unwrap());
ingress = 1;
}
delayed = self.delay_queue.next() => {
// SAFETY: `stream` implementation of `NonExhaustiveDelayQueue` never returns `None`
#[allow(clippy::unwrap_used)]
self.handle_done_delaying(delayed.unwrap());
}
new_packet = self.packet_receiver.next() => {
// this one is impossible to ever panic - the struct itself contains a sender
// and hence it can't happen that ALL senders are dropped
#[allow(clippy::unwrap_used)]
self.handle_new_packet(new_packet.unwrap());
let channel_len = self.packet_sender.len();
let delay_queue_len = self.delay_queue.len();
if processed.is_multiple_of(1000) {
match channel_len {
n if n > 1000 => error!(
event = "forwarder.queue_overload",
channel_depth = n,
delay_queue_depth = delay_queue_len,
packets_processed = processed,
"there are currently {n} mix packets waiting to get forwarded - the node seems to be significantly overloaded!"
),
n if n > 500 => warn!(
event = "forwarder.queue_high",
channel_depth = n,
delay_queue_depth = delay_queue_len,
packets_processed = processed,
"there are currently {n} mix packets waiting to get forwarded - is the node overloaded?"
),
n => trace!(
channel_depth = n,
delay_queue_depth = delay_queue_len,
packets_processed = processed,
"forwarder queue status"
),
}
}
self.update_channel_size_metric(channel_len);
processed += 1;
released = 1;
}
}
// update the metrics on either new packet being inserted or packet being removed
// service both branches every wakeup so neither can starve the other: drain queued
// ingress packets first (routing + insert/forward, bringing the delay queue current),
// then release everything now due
ingress += self.drain_ingress();
released += self.drain_expired();
if ingress > 0 {
observe_drain_batch_size(ingress);
processed += ingress as u64;
let channel_len = self.packet_sender.len();
// log roughly every 1000 packets; `processed` advances in batches, so use a
// crossing test rather than an exact modulo (which a batch could step over)
if processed - last_logged >= 1000 {
last_logged = processed;
self.log_queue_status(channel_len, processed, ingress);
}
self.update_channel_size_metric(channel_len);
}
if released > 0 {
observe_delay_drain_batch_size(released);
}
// update the metric on either a new packet being inserted or a packet being released
self.update_queue_len_metric();
}
trace!("PacketForwarder: Exiting");
+5 -1
View File
@@ -9,7 +9,7 @@ use crate::node::replay_protection::bloomfilter::ReplayProtectionBloomfilters;
use crate::node::routing_filter::network_filter::RoutableNetworkMonitors;
use nym_gateway::node::GatewayStorageError;
use nym_mixnet_client::forwarder::{MixForwardingSender, PacketToForward};
use nym_mixnet_client::trace::PacketTrace;
use nym_mixnet_client::metrics::PacketTrace;
use nym_node_metrics::NymNodeMetrics;
use nym_node_metrics::mixnet::PacketKind;
use nym_noise::config::NoiseConfig;
@@ -45,6 +45,9 @@ pub(crate) struct ProcessingConfig {
/// sample 1-in-N forwarded packets for per-stage latency tracing (0 disables)
pub(crate) egress_trace_sample_rate: u64,
/// close an ingress connection after this long with no received packets (0 disables)
pub(crate) connection_idle_timeout: Duration,
}
impl ProcessingConfig {
@@ -65,6 +68,7 @@ impl ProcessingConfig {
final_hop_processing_enabled: config.modes.expects_final_hop_traffic()
|| config.wireguard.enabled,
egress_trace_sample_rate: config.mixnet.debug.egress_trace_sample_rate,
connection_idle_timeout: config.mixnet.debug.connection_idle_timeout,
}
}
}
+56 -29
View File
@@ -31,6 +31,7 @@ use crate::node::metrics::handler::global_prometheus_updater::PrometheusGlobalNo
use crate::node::metrics::handler::legacy_packet_data::LegacyMixingStatsUpdater;
use crate::node::metrics::handler::mixnet_data_cleaner::MixnetMetricsCleaner;
use crate::node::metrics::handler::pending_egress_packets_updater::PendingEgressPacketsUpdater;
use crate::node::metrics::handler::tokio_runtime_updater::TokioRuntimeMetricsUpdater;
use crate::node::mixnet::SharedFinalHopData;
use crate::node::mixnet::packet_forwarding::PacketForwarder;
use crate::node::mixnet::shared::ProcessingConfig;
@@ -44,12 +45,10 @@ use crate::node::routing_filter::{OpenFilter, RoutingFilter};
use crate::node::shared_network::CachedNetwork;
use crate::node::shared_network::refresher::{NetworkRefresher, NetworkRefresherConfig};
use crate::node::shared_network::topology_provider::{CachedTopologyProvider, LocalGatewayNode};
use nym_bin_common::bin_info;
use nym_bin_common::{bin_info, bin_info_owned};
use nym_config::defaults::NymNetworkDetails;
use nym_credential_verification::UpgradeModeState;
use nym_crypto::asymmetric::{ed25519, x25519};
pub use nym_gateway::node::ActiveClientsStore;
pub use nym_gateway::node::GatewayStorage;
use nym_gateway::node::wireguard::PeerRegistrator;
use nym_gateway::node::{GatewayTasksBuilder, UpgradeModeCheckRequestSender};
use nym_kkt::key_utils::{
@@ -68,15 +67,18 @@ use nym_node_metrics::NymNodeMetrics;
use nym_node_metrics::events::MetricEventsSender;
use nym_node_requests::api::SignedData;
use nym_node_requests::api::v1::lewes_protocol::models::{LPHashFunction, LPKEM, LewesProtocol};
use nym_node_requests::api::v1::node::models::{AnnouncePorts, NodeDescription};
use nym_node_requests::api::v1::node::models::{AnnouncePorts, NodeDescription, NodeRoles};
use nym_noise::config::{NetworkMonitorAgentNode, NoiseConfig, NoiseNetworkView};
use nym_noise_keys::VersionedNoiseKeyV1;
use nym_sphinx_acknowledgements::AckKey;
use nym_sphinx_addressing::Recipient;
use nym_task::{ShutdownManager, ShutdownToken, ShutdownTracker};
use nym_validator_client::nyxd::AccountId;
use nym_validator_client::nyxd::contract_traits::PagedNetworkMonitorsQueryClient;
use nym_validator_client::nyxd::error::NyxdError;
use nym_validator_client::nyxd::nym_network_monitors_contract_common::AuthorisedNetworkMonitor;
use nym_validator_client::{QueryHttpRpcNyxdClient, UserAgent};
use nym_validator_client::signing::signer::OfflineSigner;
use nym_validator_client::{DirectSecp256k1HdWallet, QueryHttpRpcNyxdClient, UserAgent};
use nym_verloc::measurements::SharedVerlocStats;
use nym_verloc::{self, measurements::VerlocMeasurer};
use nym_wireguard::{WireguardGatewayData, peer_controller::PeerControlRequest};
@@ -94,6 +96,9 @@ use tokio_util::sync::WaitForCancellationFutureOwned;
use tracing::{debug, error, info, trace};
use zeroize::Zeroizing;
pub use nym_gateway::node::ActiveClientsStore;
pub use nym_gateway::node::GatewayStorage;
pub mod bonding_information;
pub mod description;
pub mod helpers;
@@ -891,12 +896,27 @@ impl NymNode {
.collect()
}
fn node_chain_address(&self) -> Result<AccountId, NymNodeError> {
let network_details = NymNetworkDetails::new_from_env();
// derive the address (annoyingly, this will derive our private keys that we will rederive
// when starting the gateway, but changing this behaviour requires too much refactoring)
let wallet = DirectSecp256k1HdWallet::checked_from_mnemonic(
&network_details.chain_details.bech32_account_prefix,
(**self.entry_gateway.mnemonic).clone(),
)
.map_err(NyxdError::from)?;
Ok(wallet.get_accounts()[0].address.clone())
}
pub(crate) async fn build_http_server(
&self,
shutdown: WaitForCancellationFutureOwned,
) -> Result<NymNodeHttpServer, NymNodeError> {
let auxiliary_details = api_requests::v1::node::models::AuxiliaryDetails {
let auxiliary_data = api_requests::v2::node::models::AuxiliaryDetailsV2 {
location: self.config.host.location,
address: self.node_chain_address()?.to_string(),
announce_ports: AnnouncePorts {
verloc_port: self.config.verloc.announce_port,
mix_port: self.config.mixnet.announce_port,
@@ -981,7 +1001,7 @@ impl NymNode {
let signed_lewes_protocol =
SignedData::new(lewes_protocol, self.ed25519_identity_keys.private_key()).unwrap();
let mut config = HttpServerConfig::new(signed_lewes_protocol)
let mut config = HttpServerConfig::new()
.with_landing_page_assets(self.config.http.landing_page_assets_path.as_ref())
.with_mixnode_details(mixnode_details)
.with_gateway_details(gateway_details)
@@ -989,28 +1009,16 @@ impl NymNode {
.with_ip_packet_router_details(ipr_details)
.with_authenticator_details(auth_details)
.with_used_exit_policy(exit_policy_details)
.with_description(self.description.clone())
.with_auxiliary_details(auxiliary_details)
.with_prometheus_bearer_token(self.config.http.access_token.clone());
if self.config.http.expose_system_info {
config = config.with_system_info(get_system_info(
let system_info = if self.config.http.expose_system_info {
Some(get_system_info(
self.config.http.expose_system_hardware,
self.config.http.expose_crypto_hardware,
))
}
if self.config.modes.mixnode {
config.api.v1_config.node.roles.mixnode_enabled = true;
}
if self.config.modes.entry {
config.api.v1_config.node.roles.gateway_enabled = true
}
if self.config.modes.exit {
config.api.v1_config.node.roles.network_requester_enabled = true;
config.api.v1_config.node.roles.ip_packet_router_enabled = true;
}
} else {
None
};
if let Some(path) = &self.config.gateway_tasks.storage_paths.bridge_client_params {
config = config.with_bridge_client_params_file(path);
@@ -1031,6 +1039,17 @@ impl NymNode {
x25519_versioned_noise_key,
ip_addresses: self.config.host.public_ips.clone(),
hostname: self.config.host.hostname.clone(),
build_information: bin_info_owned!(),
system_info,
roles: NodeRoles {
mixnode_enabled: self.config.modes.mixnode,
gateway_enabled: self.config.modes.entry,
network_requester_enabled: self.config.modes.exit,
ip_packet_router_enabled: self.config.modes.exit,
},
description: self.description.clone(),
auxiliary_data,
lewes_protocol: signed_lewes_protocol,
},
self.active_sphinx_keys()?.clone(),
self.metrics.clone(),
@@ -1151,6 +1170,14 @@ impl NymNode {
.global_prometheus_counters_update_rate,
);
// handler sampling tokio runtime scheduling metrics (run-queue depth, busy ratio) into
// the prometheus registry. run-queue depth is a transient gauge, so we sample at the base
// aggregator cadence (~5s) rather than the coarse 30s global-prometheus-counters rate.
metrics_aggregator.register_handler(
TokioRuntimeMetricsUpdater::new(),
self.config.metrics.debug.aggregator_update_rate,
);
// handler for handling prometheus metrics events
// metrics_aggregator.register_handler(PrometheusEventsHandler{}, None);
@@ -1270,10 +1297,9 @@ impl NymNode {
{
let processing_config = ProcessingConfig::new(&self.config);
// pre-register the per-stage packet-latency histograms so the whole mixnet_packet_* family
// is present on the prometheus endpoint at zero from boot (not just after the first
// sampled packet)
nym_mixnet_client::trace::register_stage_metrics();
// pre-register the whole mixnet_packet_* histogram family so it's present on the
// prometheus endpoint at zero from boot (not just after the first sampled packet)
nym_mixnet_client::metrics::register_all();
// we're ALWAYS listening for mixnet packets, either for forward or final hops (or both)
info!(
@@ -1289,6 +1315,8 @@ impl NymNode {
self.config.mixnet.debug.initial_connection_timeout,
self.config.mixnet.debug.maximum_connection_buffer_size,
self.config.mixnet.debug.use_legacy_packet_encoding,
self.config.mixnet.debug.connection_idle_timeout,
self.config.mixnet.debug.connection_write_timeout,
);
let mixnet_client = nym_mixnet_client::Client::new(
mixnet_client_config,
@@ -1304,7 +1332,6 @@ impl NymNode {
let mix_packet_sender = packet_forwarder.sender();
let shutdown_token = self.shutdown_token();
self.shutdown_tracker().try_spawn_named(
async move { packet_forwarder.run(shutdown_token).await },
"PacketForwarder",
+34 -34
View File
@@ -4927,7 +4927,7 @@ dependencies = [
[[package]]
name = "nym-api-requests"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"bs58",
"celes",
@@ -4967,7 +4967,7 @@ dependencies = [
[[package]]
name = "nym-bin-common"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"const-str",
"log",
@@ -4998,7 +4998,7 @@ dependencies = [
[[package]]
name = "nym-coconut-dkg-common"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"cosmwasm-schema",
"cosmwasm-std",
@@ -5011,7 +5011,7 @@ dependencies = [
[[package]]
name = "nym-compact-ecash"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"bincode",
"bs58",
@@ -5033,7 +5033,7 @@ dependencies = [
[[package]]
name = "nym-config"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"dirs 6.0.0",
"handlebars",
@@ -5047,7 +5047,7 @@ dependencies = [
[[package]]
name = "nym-contracts-common"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"bs58",
"cosmwasm-schema",
@@ -5061,7 +5061,7 @@ dependencies = [
[[package]]
name = "nym-credentials-interface"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"nym-bls12_381-fork",
"nym-compact-ecash",
@@ -5079,7 +5079,7 @@ dependencies = [
[[package]]
name = "nym-crypto"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"base64 0.22.1",
"bs58",
@@ -5101,7 +5101,7 @@ dependencies = [
[[package]]
name = "nym-ecash-contract-common"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"bs58",
"cosmwasm-schema",
@@ -5114,7 +5114,7 @@ dependencies = [
[[package]]
name = "nym-ecash-signer-check-types"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"nym-coconut-dkg-common",
"nym-crypto",
@@ -5129,14 +5129,14 @@ dependencies = [
[[package]]
name = "nym-ecash-time"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"time",
]
[[package]]
name = "nym-exit-policy"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"serde",
"serde_json",
@@ -5147,7 +5147,7 @@ dependencies = [
[[package]]
name = "nym-group-contract-common"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"cosmwasm-schema",
"cw-controllers",
@@ -5158,7 +5158,7 @@ dependencies = [
[[package]]
name = "nym-http-api-client"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"async-trait",
"bincode",
@@ -5190,7 +5190,7 @@ dependencies = [
[[package]]
name = "nym-http-api-client-macro"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"proc-macro-crate 3.3.0",
"proc-macro2",
@@ -5201,7 +5201,7 @@ dependencies = [
[[package]]
name = "nym-http-api-common"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"bincode",
"serde",
@@ -5211,7 +5211,7 @@ dependencies = [
[[package]]
name = "nym-kkt-ciphersuite"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"num_enum",
"semver",
@@ -5222,7 +5222,7 @@ dependencies = [
[[package]]
name = "nym-mixnet-contract-common"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"bs58",
"cosmwasm-schema",
@@ -5243,7 +5243,7 @@ dependencies = [
[[package]]
name = "nym-multisig-contract-common"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"cosmwasm-schema",
"cosmwasm-std",
@@ -5258,7 +5258,7 @@ dependencies = [
[[package]]
name = "nym-network-defaults"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"cargo_metadata 0.19.2",
"dotenvy",
@@ -5273,7 +5273,7 @@ dependencies = [
[[package]]
name = "nym-network-monitors-contract-common"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"cosmwasm-schema",
"cosmwasm-std",
@@ -5285,7 +5285,7 @@ dependencies = [
[[package]]
name = "nym-node-families-contract-common"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"cosmwasm-schema",
"cosmwasm-std",
@@ -5300,7 +5300,7 @@ dependencies = [
[[package]]
name = "nym-node-requests"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"async-trait",
"celes",
@@ -5327,7 +5327,7 @@ dependencies = [
[[package]]
name = "nym-noise-keys"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"nym-crypto",
"schemars",
@@ -5337,7 +5337,7 @@ dependencies = [
[[package]]
name = "nym-pemstore"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"pem",
"tracing",
@@ -5346,7 +5346,7 @@ dependencies = [
[[package]]
name = "nym-performance-contract-common"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"cosmwasm-schema",
"cosmwasm-std",
@@ -5359,7 +5359,7 @@ dependencies = [
[[package]]
name = "nym-serde-helpers"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"base64 0.22.1",
"bs58",
@@ -5370,7 +5370,7 @@ dependencies = [
[[package]]
name = "nym-store-cipher"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"aes-gcm",
"argon2",
@@ -5385,7 +5385,7 @@ dependencies = [
[[package]]
name = "nym-ticketbooks-merkle"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"nym-credentials-interface",
"nym-serde-helpers",
@@ -5399,7 +5399,7 @@ dependencies = [
[[package]]
name = "nym-types"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"base64 0.22.1",
"cosmrs",
@@ -5429,7 +5429,7 @@ dependencies = [
[[package]]
name = "nym-upgrade-mode-check"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"jwt-simple",
"nym-crypto",
@@ -5445,7 +5445,7 @@ dependencies = [
[[package]]
name = "nym-validator-client"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -5496,7 +5496,7 @@ dependencies = [
[[package]]
name = "nym-vesting-contract-common"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"cosmwasm-schema",
"cosmwasm-std",
@@ -5542,7 +5542,7 @@ dependencies = [
[[package]]
name = "nym-wireguard-types"
version = "1.21.0"
version = "1.21.1"
dependencies = [
"base64 0.22.1",
"nym-crypto",
+7 -1
View File
@@ -1,12 +1,14 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_config::defaults::{mainnet, sandbox, DenomDetails, NymNetworkDetails};
use nym_config::defaults::{mainnet, DenomDetails, NymNetworkDetails};
use nym_types::{currency::DecCoin, error::TypesError};
use serde::{Deserialize, Serialize};
use std::{fmt, ops::Not, str::FromStr};
use strum_macros::EnumIter;
mod sandbox;
#[allow(clippy::upper_case_acronyms)]
#[cfg_attr(feature = "generate-ts", derive(ts_rs::TS))]
#[cfg_attr(
@@ -77,3 +79,7 @@ impl FromStr for Network {
}
}
}
fn parse_optional_str(raw: &str) -> Option<String> {
raw.is_empty().not().then(|| raw.into())
}
@@ -0,0 +1,75 @@
use super::parse_optional_str;
use nym_network_defaults::{ChainDetails, DenomDetails, NymContracts, ValidatorDetails};
// -- Chain details --
pub(crate) const NETWORK_NAME: &str = "sandbox";
pub(crate) const BECH32_PREFIX: &str = "n";
pub(crate) const MIX_DENOM: DenomDetails = DenomDetails::new("unym", "nym", 6);
pub(crate) const STAKE_DENOM: DenomDetails = DenomDetails::new("unyx", "nyx", 6);
// -- Contract addresses --
pub(crate) const MIXNET_CONTRACT_ADDRESS: &str =
"n1xr3rq8yvd7qplsw5yx90ftsr2zdhg4e9z60h5duusgxpv72hud3sjkxkav";
pub(crate) const VESTING_CONTRACT_ADDRESS: &str =
"n1unyuj8qnmygvzuex3dwmg9yzt9alhvyeat0uu0jedg2wj33efl5qackslz";
pub(crate) const ECASH_CONTRACT_ADDRESS: &str =
"n1v3vydvs2ued84yv3khqwtgldmgwn0elljsdh08dr5s2j9x4rc5fs9jlwz9";
pub(crate) const GROUP_CONTRACT_ADDRESS: &str =
"n1ewmwz97xm0h8rdk8sw7h9mwn866qkx9hl9zlmagqfkhuzvwk5hhq844ue9";
pub(crate) const MULTISIG_CONTRACT_ADDRESS: &str =
"n1tz0setr8vkh9udp8xyxgpqc89ns27k4d0jx2h942hr0ax63yjhmqz6xct8";
pub(crate) const COCONUT_DKG_CONTRACT_ADDRESS: &str =
"n1v3n2ly2dp3a9ng3ff6rh26yfkn0pc5hed7w2shc5u9ca5c865utqj5elvh";
// \/ TODO: this has to be updated once the contract is deployed
pub(crate) const PERFORMANCE_CONTRACT_ADDRESS: &str = "";
// /\ TODO: this has to be updated once the contract is deployed
pub const NETWORK_MONITORS_CONTRACT_ADDRESS: &str =
"n1x5krtvyqklj360x38v62ze42g8s8trfsfqzlv8c9296chcpvqadssqnem5";
// \/ TODO: this has to be updated once the contract is deployed
pub(crate) const NODE_FAMILIES_CONTRACT_ADDRESS: &str =
"n13clyapdqk5umyynp20kqwf59rxlwlp24yf2ltzasflhsdhrxq7fsahyr6z";
// /\ TODO: this has to be updated once the contract is deployed
// -- Constructor functions --
pub(crate) fn validators() -> Vec<ValidatorDetails> {
vec![ValidatorDetails::new(
"https://rpc.sandbox.nymtech.net",
Some("https://sandbox-nym-api1.nymtech.net/api"),
Some("wss://rpc.sandbox.nymtech.net/websocket"),
)]
}
pub(crate) fn network_details() -> nym_network_defaults::NymNetworkDetails {
nym_network_defaults::NymNetworkDetails {
network_name: NETWORK_NAME.into(),
chain_details: ChainDetails {
bech32_account_prefix: BECH32_PREFIX.to_string(),
mix_denom: MIX_DENOM.into(),
stake_denom: STAKE_DENOM.into(),
},
endpoints: validators(),
contracts: NymContracts {
mixnet_contract_address: parse_optional_str(MIXNET_CONTRACT_ADDRESS),
vesting_contract_address: parse_optional_str(VESTING_CONTRACT_ADDRESS),
performance_contract_address: parse_optional_str(PERFORMANCE_CONTRACT_ADDRESS),
network_monitors_contract_address: parse_optional_str(
NETWORK_MONITORS_CONTRACT_ADDRESS,
),
node_families_contract_address: parse_optional_str(NODE_FAMILIES_CONTRACT_ADDRESS),
ecash_contract_address: parse_optional_str(ECASH_CONTRACT_ADDRESS),
group_contract_address: parse_optional_str(GROUP_CONTRACT_ADDRESS),
multisig_contract_address: parse_optional_str(MULTISIG_CONTRACT_ADDRESS),
coconut_dkg_contract_address: parse_optional_str(COCONUT_DKG_CONTRACT_ADDRESS),
},
nym_vpn_api_url: None,
nym_vpn_api_urls: None,
nym_api_urls: None,
}
}