Files
nym/nym-node/nym-node-metrics/src/network.rs
T
Jędrzej Stuczyński 8a93bce32f feat: additional mixnet improvements and metrics (#6874)
* wip

* batch processing of forward packets

* tmp: additional metrics for remote node

* fixed incorrect prometheus metric registration

* unified runtime metrics

* unify mixnet client metrics

* packet forwarding cleanup

* add batching for emptying the delay queue

* cleanup client io loop

* feat(nym-node): reap idle mixnet connections (ingress + egress)

Close mixnet connections that sit with no traffic past a configurable idle period (mixnet.debug.connection_idle_timeout, default 5min, 0 disables) to bound lingering tokio tasks/sockets.

Ingress handle_stream is read-only, so a silently-gone peer (NAT drop, crash without FIN, half-open) never triggers FIN/RST and the task would block on .next() forever; a new idle select arm closes it (the post-loop replay flush still runs, so nothing is stranded). Egress run_io_loop gets the symmetric arm keyed on last_send; on close EvictOnDrop clears the cache entry and the next packet transparently reconnects.

Adds a cumulative nym_node_network_idle_closed_ingress_mixnet_connections counter; egress reaping is observed via the existing active-egress gauge plus an exit_reason=idle_timeout log.

* downgrade sysinfo

* refactor(nym-node): split PacketForwarder into router + delay-queue tasks

Split the single PacketForwarder task into two concurrently-scheduled tasks connected by a bounded handoff channel, so intake and delayed-release no longer block each other.

PacketRouter (router.rs) is the intake task: sole consumer of the ingress channel, it applies the routing filter and either forwards zero/already-elapsed-delay packets directly or hands delayed ones to the delay task. Its per-packet work is sub-µs, so new packets no longer wait behind delayed-release processing (collapses the ForwarderQueue tail).

DelayForwarder (delay.rs) owns the NonExhaustiveDelayQueue exclusively (it can't be shared by reference). Its run loop services BOTH branches on every wakeup - draining pending inserts first to bring the queue current, then flushing everything now due - so the biased select can't let releases and inserts starve each other, and a freshly-arrived-but-already-due packet releases in the same pass (marginally improving DelayQueueOverrun).

The mixnet client is shared as Arc<C>; handoff-channel overflow is dropped as an egress drop rather than blocking, keeping intake decoupled from release.

* feat(nym-node): bound egress flush with a write timeout

Cap how long a single egress batch flush may block on a congested peer socket (mixnet.debug.connection_write_timeout, default 500ms, 0 disables), so a slow peer can no longer back this connection's egress queue up into the multi-second range - the root of the EgressQueue and SocketWrite tails.

A single timeout is treated as transient congestion: the un-fed tail of the batch is abandoned but the connection is retained. This is sound because NoiseStream::poll_write encrypts and buffers each frame synchronously, so a cancelled flush leaves the noise transport nonce-consistent and a later flush resumes the byte stream in order - so a momentary spike costs no re-handshake. Only MAX_CONSECUTIVE_WRITE_TIMEOUTS (3) timeouts in a row, i.e. a persistently congested peer, tears the connection down (it reconnects on the next packet); a successful flush resets the counter.

Buffer-size tuning (maximum_connection_buffer_size) deliberately left for live data.

* revert PacketForwarder split in favour of a single task that clears both channels on wake
2026-06-12 10:31:54 +01:00

117 lines
3.7 KiB
Rust

// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Default)]
pub struct NetworkStats {
// for now just experiment with basic data, we could always extend it
active_ingress_mixnet_connections: AtomicUsize,
active_ingress_websocket_connections: AtomicUsize,
// the reason for additional `Arc` on this one is that the handler wasn't
// designed with metrics in mind and this single counter has been woven through
// the call stack
active_egress_mixnet_connections: Arc<AtomicUsize>,
// incoming LP control connections from clients
active_lp_ingress_client_connections: AtomicUsize,
// incoming LP control connections from nodes
active_lp_ingress_node_connections: AtomicUsize,
// outgoing LP control connections to nodes
active_lp_egress_node_connections: AtomicUsize,
// cumulative count of ingress mixnet connections closed due to the idle timeout
idle_closed_ingress_mixnet_connections: AtomicUsize,
}
impl NetworkStats {
pub fn new_active_ingress_mixnet_client(&self) {
self.active_ingress_mixnet_connections
.fetch_add(1, Ordering::Relaxed);
}
pub fn disconnected_ingress_mixnet_client(&self) {
self.active_ingress_mixnet_connections
.fetch_sub(1, Ordering::Relaxed);
}
pub fn ingress_mixnet_idle_closed(&self) {
self.idle_closed_ingress_mixnet_connections
.fetch_add(1, Ordering::Relaxed);
}
pub fn idle_closed_ingress_mixnet_connections_count(&self) -> usize {
self.idle_closed_ingress_mixnet_connections
.load(Ordering::Relaxed)
}
pub fn new_ingress_websocket_client(&self) {
self.active_ingress_websocket_connections
.fetch_add(1, Ordering::Relaxed);
}
pub fn disconnected_ingress_websocket_client(&self) {
self.active_ingress_websocket_connections
.fetch_sub(1, Ordering::Relaxed);
}
pub fn active_ingress_mixnet_connections_count(&self) -> usize {
self.active_ingress_mixnet_connections
.load(Ordering::Relaxed)
}
pub fn active_ingress_websocket_connections_count(&self) -> usize {
self.active_ingress_websocket_connections
.load(Ordering::SeqCst)
}
pub fn active_egress_mixnet_connections_counter(&self) -> Arc<AtomicUsize> {
self.active_egress_mixnet_connections.clone()
}
pub fn active_egress_mixnet_connections_count(&self) -> usize {
self.active_egress_mixnet_connections
.load(Ordering::Relaxed)
}
pub fn new_ingress_lp_client_connection(&self) {
self.active_lp_ingress_client_connections
.fetch_add(1, Ordering::Relaxed);
}
pub fn closed_ingress_lp_client_connection(&self) {
self.active_lp_ingress_client_connections
.fetch_sub(1, Ordering::Relaxed);
}
pub fn new_ingress_lp_node_connection(&self) {
self.active_lp_ingress_node_connections
.fetch_add(1, Ordering::Relaxed);
}
pub fn closed_ingress_lp_node_connection(&self) {
self.active_lp_ingress_node_connections
.fetch_sub(1, Ordering::Relaxed);
}
pub fn new_egress_lp_node_connection(&self) {
self.active_lp_egress_node_connections
.fetch_add(1, Ordering::Relaxed);
}
pub fn closed_egress_lp_node_connection(&self) {
self.active_lp_egress_node_connections
.fetch_sub(1, Ordering::Relaxed);
}
pub fn active_lp_client_connections_count(&self) -> usize {
self.active_lp_ingress_client_connections
.load(Ordering::Relaxed)
}
}