chore: split up lp listener (#6507)

* chore: split up lp listener

* rename 'build_lp'
This commit is contained in:
Jędrzej Stuczyński
2026-03-03 13:59:48 +00:00
committed by GitHub
parent a450b6f984
commit 2cc9b05520
14 changed files with 606 additions and 525 deletions
+9 -22
View File
@@ -17,9 +17,9 @@ mod tests {
use nym_lp::peer::LpLocalPeer;
use nym_node::config::{LpConfig, LpDebug};
use nym_node::node::GatewayStorage;
use nym_node::node::lp::control::handler::LpConnectionHandler;
use nym_node::node::lp::error::LpHandlerError;
use nym_node::node::lp::handler::LpConnectionHandler;
use nym_node::node::lp::{LpHandlerState, MixForwardingReceiver, mix_forwarding_channels};
use nym_node::node::lp::{SharedLpControlState, SharedLpState};
use nym_node::wireguard::{PeerManager, PeerRegistrator};
use nym_registration_client::{LpClientError, LpRegistrationClient};
use nym_test_utils::helpers::{CryptoRng09, seeded_rng};
@@ -130,11 +130,8 @@ mod tests {
struct Gateway {
base: Party,
lp_state: LpHandlerState,
lp_state: SharedLpControlState,
ip_pool: IpPool,
// might be used later for mixnet registration tests
#[allow(unused)]
mix_receiver: MixForwardingReceiver,
mock_peer_controller: SpawnedPeerController,
tasks_cancellation: CancellationToken,
@@ -210,9 +207,6 @@ mod tests {
let forward_semaphore =
Arc::new(Semaphore::new(lp_config.debug.max_concurrent_forwards));
// Create mix forwarding channel (unused in tests but required by struct)
let (mix_sender, mix_receiver) = mix_forwarding_channels();
// create wireguard data
let (wireguard_data, peer_request_rx) = Self::wireguard_data(&base);
@@ -231,31 +225,24 @@ mod tests {
upgrade_mode_details,
);
let lp_state = LpHandlerState {
let lp_state = SharedLpControlState {
local_lp_peer: base.peer.clone(),
metrics: Default::default(),
// use default lp config (with enabled flag)
lp_config,
// TODO: might be needed later on for mixnet registration
outbound_mix_sender: mix_sender,
// we start with empty state
session_states: Arc::new(Default::default()),
forward_semaphore,
// handles for dealing with new peers
peer_registrator: Some(peer_registrator),
shared: SharedLpState {
metrics: Default::default(),
lp_config,
session_states: Arc::new(Default::default()),
},
};
Ok(Gateway {
base,
lp_state,
ip_pool: Self::ip_pool(),
mix_receiver,
mock_peer_controller: SpawnedPeerController::Ready {
controller: mock_peer_controller,
},
+153
View File
@@ -0,0 +1,153 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::config::LpDebug;
use dashmap::DashMap;
use nym_lp::LpStateMachine;
use nym_lp::peer_config::LpReceiverIndex;
use nym_metrics::inc_by;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, info};
/// Wrapper for state entries with timestamp tracking for cleanup
///
/// This wrapper adds `created_at` and `last_activity` timestamps to state entries,
/// enabling TTL-based cleanup of stale handshakes and sessions.
pub struct TimestampedState<T> {
/// The actual state (LpStateMachine or LpSession)
pub state: T,
/// When this state was created (never changes)
created_at: std::time::Instant,
/// Last activity timestamp (unix seconds, atomically updated)
///
/// For handshakes: never updated (use created_at for TTL)
/// For sessions: updated on every packet received
last_activity: std::sync::atomic::AtomicU64,
}
impl<T> TimestampedState<T> {
/// Create a new timestamped state
pub fn new(state: T) -> Self {
let now_instant = std::time::Instant::now();
let now_unix = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
Self {
state,
created_at: now_instant,
last_activity: std::sync::atomic::AtomicU64::new(now_unix),
}
}
/// Update last_activity timestamp (cheap, lock-free operation)
pub fn touch(&self) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.last_activity
.store(now, std::sync::atomic::Ordering::Relaxed);
}
/// Get age since creation
#[allow(dead_code)]
pub fn age(&self) -> Duration {
self.created_at.elapsed()
}
/// Get time since last activity
pub fn since_activity(&self) -> Duration {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let last = self
.last_activity
.load(std::sync::atomic::Ordering::Relaxed);
Duration::from_secs(now.saturating_sub(last))
}
}
pub(crate) struct CleanupTask {
session_states: Arc<DashMap<LpReceiverIndex, TimestampedState<LpStateMachine>>>,
cfg: LpDebug,
shutdown: nym_task::ShutdownToken,
}
impl CleanupTask {
pub fn new(
session_states: Arc<DashMap<LpReceiverIndex, TimestampedState<LpStateMachine>>>,
cfg: LpDebug,
shutdown: nym_task::ShutdownToken,
) -> Self {
CleanupTask {
session_states,
cfg,
shutdown,
}
}
fn perform_cleanup(&self) {
let session_ttl = self.cfg.session_ttl;
let start = std::time::Instant::now();
let mut ss_removed = 0u64;
// Remove stale sessions (based on time since last activity)
// Use shorter TTL for demoted (ReadOnlyTransport) sessions
self.session_states.retain(|_, timestamped| {
if timestamped.since_activity() > session_ttl {
ss_removed += 1;
false
} else {
true
}
});
if ss_removed > 0 {
let duration = start.elapsed();
info!(
"LP state cleanup: {ss_removed} sessions (took {:.3}s)",
duration.as_secs_f64()
);
// Track metrics
if ss_removed > 0 {
inc_by!("lp_states_cleanup_session_removed", ss_removed as i64);
}
}
}
/// Background loop for cleaning up stale state entries
///
/// Runs periodically to scan handshake_states and session_states maps,
/// removing entries that have exceeded their TTL.
///
/// Demoted sessions (ReadOnlyTransport) use shorter TTL since they
/// only need to drain in-flight packets after subsession promotion.
pub(crate) async fn run(&self) {
let interval = self.cfg.state_cleanup_interval;
let mut cleanup_interval = tokio::time::interval(interval);
loop {
tokio::select! {
biased;
_ = self.shutdown.cancelled() => {
debug!("LP state cleanup task: received shutdown signal");
break;
}
_ = cleanup_interval.tick() => {
self.perform_cleanup();
}
}
}
info!("LP state cleanup task shutdown complete");
}
}
@@ -1,10 +1,12 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::{LpHandlerState, LpReceiverIndex, TimestampedState};
use crate::node::lp::cleanup::TimestampedState;
use crate::node::lp::error::LpHandlerError;
use crate::node::lp::state::SharedLpControlState;
use dashmap::mapref::one::RefMut;
use nym_lp::packet::{EncryptedLpPacket, ForwardPacketData};
use nym_lp::peer_config::LpReceiverIndex;
use nym_lp::state_machine::{LpAction, LpData, LpDataKind, LpInput};
use nym_lp::transport::LpHandshakeChannel;
use nym_lp::transport::traits::LpTransportChannel;
@@ -75,7 +77,7 @@ impl ConnectionStats {
pub struct LpConnectionHandler<S = TcpStream> {
stream: S,
remote_addr: SocketAddr,
state: LpHandlerState,
state: SharedLpControlState,
stats: ConnectionStats,
// /// Flag indicating whether this is a connection from an entry gateway serving as a proxy
@@ -99,7 +101,7 @@ where
stream: S,
// forwarded_connection: bool,
remote_addr: SocketAddr,
state: LpHandlerState,
state: SharedLpControlState,
) -> Self {
Self {
stream,
@@ -119,6 +121,7 @@ where
) -> Result<RefMut<'_, LpReceiverIndex, TimestampedState<LpStateMachine>>, LpHandlerError> {
let receiver_index = self.bound_receiver_index()?;
self.state
.shared
.session_states
.get_mut(&receiver_index)
.ok_or_else(|| LpHandlerError::MissingLpSession { receiver_index })
@@ -143,7 +146,7 @@ where
// 1. complete KKT/PSQ handshake before doing anything else.
// bail if it takes too long
let timeout = self.state.lp_config.debug.handshake_ttl;
let timeout = self.state.shared.lp_config.debug.handshake_ttl;
let local_peer = self.state.local_lp_peer.clone();
let stream = &mut self.stream;
@@ -177,6 +180,7 @@ where
// 2. insert the state machine into the shared state
let state_machine = LpStateMachine::new(session);
self.state
.shared
.session_states
.insert(receiver_idx, TimestampedState::new(state_machine));
self.bound_receiver_idx = Some(receiver_idx);
@@ -663,8 +667,9 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::config::LpConfig;
use crate::config::lp::LpDebug;
use crate::node::lp::LpConfig;
use crate::node::lp::state::SharedLpState;
use nym_lp::peer::{KEMKeys, LpLocalPeer, generate_keypair_mceliece, generate_keypair_mlkem};
use nym_lp::{Ciphersuite, SessionManager, sessions_for_tests};
use nym_test_utils::helpers::{deterministic_rng, deterministic_rng_09};
@@ -672,7 +677,7 @@ mod tests {
// ==================== Test Helpers ====================
/// Create a minimal test state for handler tests
async fn create_minimal_test_state() -> LpHandlerState {
async fn create_minimal_test_state() -> SharedLpControlState {
use nym_crypto::asymmetric::ed25519;
let mut rng = deterministic_rng();
@@ -688,9 +693,6 @@ mod tests {
lp_config.debug.max_concurrent_forwards,
));
// Create mix forwarding channel (unused in tests but required by struct)
let (mix_sender, _mix_receiver) = nym_mixnet_client::forwarder::mix_forwarding_channels();
let id_keys = Arc::new(ed25519::KeyPair::new(&mut rng));
let x_keys = Arc::new(id_keys.to_x25519().try_into().unwrap());
@@ -700,14 +702,15 @@ mod tests {
);
let lp_peer = LpLocalPeer::new(Ciphersuite::default(), x_keys).with_kem_keys(kem_keys);
LpHandlerState {
lp_config,
SharedLpControlState {
local_lp_peer: lp_peer,
metrics: nym_node_metrics::NymNodeMetrics::default(),
outbound_mix_sender: mix_sender,
session_states: Arc::new(dashmap::DashMap::new()),
peer_registrator: None,
forward_semaphore,
shared: SharedLpState {
lp_config,
metrics: nym_node_metrics::NymNodeMetrics::default(),
session_states: Arc::new(dashmap::DashMap::new()),
},
}
}
+130
View File
@@ -0,0 +1,130 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::config::LpConfig;
use crate::error::NymNodeError;
use crate::node::lp::control::handler::LpConnectionHandler;
use crate::node::lp::state::SharedLpControlState;
use nym_task::ShutdownTracker;
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tracing::{debug, error, info, trace, warn};
/// LP listener that accepts TCP connections on port 41264
pub struct LpControlListener {
/// Address to bind to
bind_address: SocketAddr,
/// Shared state for connection handlers
handler_state: SharedLpControlState,
/// Shutdown coordination
shutdown: ShutdownTracker,
}
impl LpControlListener {
pub fn new(
bind_address: SocketAddr,
handler_state: SharedLpControlState,
shutdown: ShutdownTracker,
) -> Self {
Self {
bind_address,
handler_state,
shutdown,
}
}
fn lp_config(&self) -> LpConfig {
self.handler_state.shared.lp_config
}
pub async fn run(&mut self) -> Result<(), NymNodeError> {
let bind_address = self.bind_address;
info!("Starting LP control listener on {bind_address}");
let listener = TcpListener::bind(bind_address).await.map_err(|source| {
error!("Failed to bind LP listener to {bind_address}: {source}",);
NymNodeError::LpBindFailure {
address: bind_address,
source,
}
})?;
let shutdown_token = self.shutdown.clone_shutdown_token();
loop {
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
trace!("LP listener: received shutdown signal");
break;
}
result = listener.accept() => {
match result {
Ok((stream, addr)) => self.handle_connection(stream, addr),
Err(e) => warn!("Failed to accept LP connection: {e}")
}
}
}
}
info!("LP listener shutdown complete");
Ok(())
}
fn handle_connection(&self, stream: tokio::net::TcpStream, remote_addr: SocketAddr) {
// Check connection limit
let active_connections = self.active_lp_connections();
let max_connections = self.lp_config().debug.max_connections;
if active_connections >= max_connections {
warn!(
"LP connection limit exceeded ({active_connections}/{max_connections}), rejecting connection from {remote_addr}"
);
return;
}
debug!(
"Accepting LP connection from {remote_addr} ({active_connections} active connections)"
);
// Increment connection counter
self.handler_state
.shared
.metrics
.network
.new_lp_connection();
// Spawn handler task
let handler = LpConnectionHandler::new(stream, remote_addr, self.handler_state.clone());
let metrics = self.handler_state.shared.metrics.clone();
self.shutdown.try_spawn_named_with_shutdown(
async move {
let result = handler.handle().await;
// Handler emits lifecycle metrics internally on success
// For errors, we need to emit them here since handler is consumed
if let Err(e) = result {
warn!("LP handler error for {remote_addr}: {e}");
// Note: metrics are emitted in handle() for graceful path
// On error path, handle() returns early without emitting
// So we track errors here
}
// Decrement connection counter on exit
metrics.network.lp_connection_closed();
},
&format!("LP::{remote_addr}"),
);
}
fn active_lp_connections(&self) -> usize {
self.handler_state
.shared
.metrics
.network
.active_lp_connections_count()
}
}
+5
View File
@@ -0,0 +1,5 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
pub mod handler;
pub(crate) mod listener;
@@ -1,5 +1,5 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! LP Data Handler - UDP listener for LP data plane (port 51264)
//!
@@ -15,90 +15,24 @@
//! ```
//!
use super::LpHandlerState;
use crate::error::NymNodeError;
use crate::node::lp::error::LpHandlerError;
use crate::node::lp::state::SharedLpDataState;
use nym_lp::packet::OuterHeader;
use nym_metrics::inc;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
use tracing::*;
/// Maximum UDP packet size we'll accept
/// Sphinx packets are typically ~2KB, LP overhead is ~50 bytes, so 4KB is plenty
const MAX_UDP_PACKET_SIZE: usize = 4096;
/// LP Data Handler for UDP data plane
pub struct LpDataHandler {
/// UDP socket for receiving LP-wrapped Sphinx packets
socket: Arc<UdpSocket>,
/// Shared state with TCP control plane
/// State used for handling received requests
#[allow(dead_code)]
state: LpHandlerState,
/// Shutdown token
shutdown: nym_task::ShutdownToken,
state: SharedLpDataState,
}
impl LpDataHandler {
/// Create a new LP data handler
pub async fn new(
bind_addr: SocketAddr,
state: LpHandlerState,
shutdown: nym_task::ShutdownToken,
) -> Result<Self, NymNodeError> {
let socket = UdpSocket::bind(bind_addr).await.map_err(|source| {
error!("Failed to bind LP data socket to {bind_addr}: {source}");
NymNodeError::LpBindFailure {
address: bind_addr,
source,
}
})?;
info!("LP data handler listening on UDP {bind_addr}");
Ok(Self {
socket: Arc::new(socket),
state,
shutdown,
})
}
/// Run the UDP packet receive loop
pub async fn run(self) -> Result<(), LpHandlerError> {
let mut buf = vec![0u8; MAX_UDP_PACKET_SIZE];
loop {
tokio::select! {
biased;
_ = self.shutdown.cancelled() => {
info!("LP data handler: received shutdown signal");
break;
}
result = self.socket.recv_from(&mut buf) => {
match result {
Ok((len, src_addr)) => {
// Process packet in place (no spawn - UDP is fast)
if let Err(e) = self.handle_packet(&buf[..len], src_addr).await {
debug!("LP data packet error from {src_addr}: {e}");
inc!("lp_data_packet_errors");
}
}
Err(e) => {
warn!("LP data socket recv error: {e}");
inc!("lp_data_recv_errors");
}
}
}
}
}
info!("LP data handler shutdown complete");
Ok(())
pub fn new(state: SharedLpDataState) -> Self {
Self { state }
}
/// Handle a single UDP packet
@@ -115,7 +49,7 @@ impl LpDataHandler {
/// - Marking counter as used after successful decryption
///
/// This prevents replay attacks where captured packets are re-sent.
async fn handle_packet(
pub(crate) async fn handle_packet(
&self,
packet: &[u8],
src_addr: SocketAddr,
@@ -223,15 +157,3 @@ impl LpDataHandler {
// }
}
}
#[cfg(test)]
mod tests {
use super::*;
// Sphinx packets are typically around 2KB
// LP overhead is small (~50 bytes header + AEAD tag)
// 4KB should be plenty with room to spare
const _: () = {
assert!(MAX_UDP_PACKET_SIZE >= 2048 + 100);
};
}
+81
View File
@@ -0,0 +1,81 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::error::NymNodeError;
use crate::node::lp::data::MAX_UDP_PACKET_SIZE;
use crate::node::lp::data::handler::LpDataHandler;
use crate::node::lp::state::SharedLpDataState;
use nym_metrics::inc;
use std::net::SocketAddr;
use tokio::net::UdpSocket;
use tracing::log::warn;
use tracing::{debug, error, info};
/// LP UDP listener that accepts TCP connections on port 51264 (by default)
pub struct LpDataListener {
/// Address to bind to
bind_address: SocketAddr,
/// State used for handling received requests
handler: LpDataHandler,
/// Shutdown token
shutdown: nym_task::ShutdownToken,
}
impl LpDataListener {
pub fn new(
bind_address: SocketAddr,
state: SharedLpDataState,
shutdown: nym_task::ShutdownToken,
) -> Self {
Self {
bind_address,
handler: LpDataHandler::new(state),
shutdown,
}
}
pub async fn run(&self) -> Result<(), NymNodeError> {
let bind_address = self.bind_address;
info!("Starting LP data listener on {bind_address}");
let socket = UdpSocket::bind(bind_address).await.map_err(|source| {
error!("Failed to bind LP data socket to {bind_address}: {source}");
NymNodeError::LpBindFailure {
address: bind_address,
source,
}
})?;
let mut buf = vec![0u8; MAX_UDP_PACKET_SIZE];
loop {
tokio::select! {
biased;
_ = self.shutdown.cancelled() => {
info!("LP data listener: received shutdown signal");
break;
}
result = socket.recv_from(&mut buf) => {
match result {
Ok((len, src_addr)) => {
// Process packet in place (no spawn - UDP is fast)
if let Err(e) = self.handler.handle_packet(&buf[..len], src_addr).await {
debug!("LP data packet error from {src_addr}: {e}");
inc!("lp_data_packet_errors");
}
}
Err(e) => {
warn!("LP data socket recv error: {e}");
inc!("lp_data_recv_errors");
}
}
}
}
}
info!("LP data handler shutdown complete");
Ok(())
}
}
+20
View File
@@ -0,0 +1,20 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
/// Maximum UDP packet size we'll accept
/// Sphinx packets are typically ~2KB, LP overhead is ~50 bytes, so 4KB is plenty
const MAX_UDP_PACKET_SIZE: usize = 4096;
pub mod handler;
pub(crate) mod listener;
#[cfg(test)]
mod tests {
use super::*;
// Sphinx packets are typically around 2KB
// 4KB should be plenty with room to spare
const _: () = {
assert!(MAX_UDP_PACKET_SIZE >= 2048 + 100);
};
}
+1 -1
View File
@@ -1,7 +1,7 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::node::lp::LpReceiverIndex;
use nym_lp::peer_config::LpReceiverIndex;
use nym_lp::state_machine::{LpAction, LpDataKind};
use nym_lp::transport::LpTransportError;
use nym_lp::{LpError, packet::MalformedLpPacketError};
+94 -373
View File
@@ -67,404 +67,125 @@
// To view metrics, the nym-metrics registry automatically collects all metrics.
// They can be exported via Prometheus format using the metrics endpoint.
use crate::config::lp::LpConfig;
use crate::config::LpConfig;
use crate::error::NymNodeError;
use crate::node::lp::cleanup::CleanupTask;
use crate::node::lp::control::listener::LpControlListener;
use crate::node::lp::data::listener::LpDataListener;
use dashmap::DashMap;
use nym_gateway::node::wireguard::PeerRegistrator;
use nym_lp::peer::LpLocalPeer;
use nym_lp::peer_config::LpReceiverIndex;
use nym_lp::state_machine::LpStateMachine;
use nym_mixnet_client::forwarder::MixForwardingSender;
use nym_node_metrics::NymNodeMetrics;
use nym_task::ShutdownTracker;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::sync::Semaphore;
use tracing::*;
use tracing::error;
pub use nym_mixnet_client::forwarder::{MixForwardingReceiver, mix_forwarding_channels};
pub use state::{SharedLpControlState, SharedLpDataState, SharedLpState};
mod data_handler;
mod cleanup;
pub mod control;
mod data;
pub mod error;
pub mod handler;
mod registration;
pub mod state;
/// Wrapper for state entries with timestamp tracking for cleanup
///
/// This wrapper adds `created_at` and `last_activity` timestamps to state entries,
/// enabling TTL-based cleanup of stale handshakes and sessions.
pub struct TimestampedState<T> {
/// The actual state (LpStateMachine or LpSession)
pub state: T,
/// When this state was created (never changes)
created_at: std::time::Instant,
/// Last activity timestamp (unix seconds, atomically updated)
///
/// For handshakes: never updated (use created_at for TTL)
/// For sessions: updated on every packet received
last_activity: std::sync::atomic::AtomicU64,
}
impl<T> TimestampedState<T> {
/// Create a new timestamped state
pub fn new(state: T) -> Self {
let now_instant = std::time::Instant::now();
let now_unix = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
Self {
state,
created_at: now_instant,
last_activity: std::sync::atomic::AtomicU64::new(now_unix),
}
}
/// Update last_activity timestamp (cheap, lock-free operation)
pub fn touch(&self) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.last_activity
.store(now, std::sync::atomic::Ordering::Relaxed);
}
/// Get age since creation
#[allow(dead_code)]
pub fn age(&self) -> Duration {
self.created_at.elapsed()
}
/// Get time since last activity
pub fn since_activity(&self) -> Duration {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let last = self
.last_activity
.load(std::sync::atomic::Ordering::Relaxed);
Duration::from_secs(now.saturating_sub(last))
}
}
/// Shared state for LP connection handlers
#[derive(Clone)]
pub struct LpHandlerState {
/// Encapsulates all required key information of a local Lewes Protocol Peer.
pub local_lp_peer: LpLocalPeer,
/// Metrics collection
pub metrics: NymNodeMetrics,
/// Handle registering new wireguard peers
pub peer_registrator: Option<PeerRegistrator>,
/// LP configuration (for timestamp validation, etc.)
pub lp_config: LpConfig,
/// Channel for forwarding Sphinx packets into the mixnet
///
/// Used by the LP data handler (UDP:51264) to forward decrypted Sphinx packets
/// from LP clients into the mixnet for routing.
#[allow(dead_code)]
pub outbound_mix_sender: MixForwardingSender,
/// Established sessions keyed by session_id
///
/// Used after handshake completes (session_id is deterministically computed from
/// both parties' X25519 keys). Enables stateless transport - each packet lookup
/// by session_id, decrypt/process, respond.
///
/// Wrapped in TimestampedState for TTL-based cleanup of inactive sessions.
///
/// Sessions are stored as LpStateMachine (not LpSession) to enable
/// subsession/rekeying support. The state machine handles subsession initiation
/// (SubsessionKK1/KK2/Ready) during transport phase, allowing long-lived connections
/// to rekey without re-authentication.
pub session_states: Arc<DashMap<LpReceiverIndex, TimestampedState<LpStateMachine>>>,
/// Semaphore limiting concurrent forward connections
///
/// Prevents file descriptor exhaustion when forwarding LP packets during
/// telescope setup. When at capacity, forward requests return an error
/// so clients can choose a different gateway.
// Connection limiting (not pooling) chosen for forward requests.
//
// Why not connection pooling?
// 1. Forwarding is one-time per telescope setup (handshake only), not ongoing traffic.
// Once telescope is established, data flows directly through the tunnel.
// 2. Telescope targets are distributed across many different gateways - each client
// typically connects to a different exit gateway, so pooled connections would
// rarely be reused.
// 3. Connections already go out of scope after each request-response. FD exhaustion
// only happens from concurrent spikes, not accumulation.
// 4. A pool would accumulate one idle connection per unique destination, most of
// which would never be reused before TTL expiration.
//
// Why semaphore limiting is better:
// 1. Directly caps concurrent forward connections regardless of destination.
// 2. When at capacity, returns "busy" error - client can choose another gateway.
// This is better than silently queuing requests behind a pool.
// 3. Simple implementation: no TTL management, stale connection handling, or cleanup.
pub forward_semaphore: Arc<Semaphore>,
}
/// LP listener that accepts TCP connections on port 41264
pub struct LpListener {
/// Shared state for connection handlers
handler_state: LpHandlerState,
pub struct LpSetup {
control_listener: LpControlListener,
data_listener: LpDataListener,
cleanup_task: CleanupTask,
/// Shutdown coordination
shutdown: ShutdownTracker,
}
impl LpListener {
pub fn new(handler_state: LpHandlerState, shutdown: ShutdownTracker) -> Self {
Self {
handler_state,
impl LpSetup {
pub async fn new(
local_lp_peer: LpLocalPeer,
lp_config: LpConfig,
metrics: NymNodeMetrics,
peer_registrator: Option<PeerRegistrator>,
mix_packet_sender: MixForwardingSender,
shutdown: ShutdownTracker,
) -> Result<Self, NymNodeError> {
// TODO: this will require loading old states from disk in the future
let session_states = Arc::new(DashMap::new());
let shared_lp_state = SharedLpState {
metrics,
lp_config,
session_states: session_states.clone(),
};
let control_state = SharedLpControlState {
local_lp_peer,
peer_registrator,
forward_semaphore: Arc::new(Semaphore::new(lp_config.debug.max_concurrent_forwards)),
shared: shared_lp_state.clone(),
};
let data_state = SharedLpDataState {
outbound_mix_sender: mix_packet_sender,
shared: shared_lp_state,
};
let control_listener = LpControlListener::new(
lp_config.control_bind_address,
control_state,
shutdown.clone(),
);
let data_listener = LpDataListener::new(
lp_config.data_bind_address,
data_state,
shutdown.clone_shutdown_token(),
);
let cleanup_task = CleanupTask::new(
session_states,
lp_config.debug,
shutdown.clone_shutdown_token(),
);
Ok(LpSetup {
control_listener,
data_listener,
cleanup_task,
shutdown,
}
})
}
fn lp_config(&self) -> LpConfig {
self.handler_state.lp_config
}
pub async fn run(&mut self) -> Result<(), NymNodeError> {
let control_bind_address = self.lp_config().control_bind_address;
let data_bind_address = self.lp_config().data_bind_address;
let listener = TcpListener::bind(control_bind_address)
.await
.map_err(|source| {
error!("Failed to bind LP listener to {control_bind_address}: {source}",);
NymNodeError::LpBindFailure {
address: control_bind_address,
source,
}
})?;
pub fn start_tasks(mut self) {
// control listener
let shutdown_token = self.shutdown.clone_shutdown_token();
// Spawn background task for state cleanup
let _cleanup_handle = self.spawn_state_cleanup_task();
// Spawn UDP data handler for LP data plane (port 51264)
let _data_handler_handle = self.spawn_data_handler().await?;
info!(
"LP listener started on {control_bind_address} (data handler on: {data_bind_address})",
);
loop {
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
trace!("LP listener: received shutdown signal");
break;
}
result = listener.accept() => {
match result {
Ok((stream, addr)) => self.handle_connection(stream, addr),
Err(e) => warn!("Failed to accept LP connection: {e}")
}
}
}
}
info!("LP listener shutdown complete");
Ok(())
}
fn handle_connection(&self, stream: tokio::net::TcpStream, remote_addr: SocketAddr) {
// Check connection limit
let active_connections = self.active_lp_connections();
let max_connections = self.lp_config().debug.max_connections;
if active_connections >= max_connections {
warn!(
"LP connection limit exceeded ({active_connections}/{max_connections}), rejecting connection from {remote_addr}"
);
return;
}
debug!(
"Accepting LP connection from {remote_addr} ({active_connections} active connections)"
);
// Increment connection counter
self.handler_state.metrics.network.new_lp_connection();
// Spawn handler task
let handler =
handler::LpConnectionHandler::new(stream, remote_addr, self.handler_state.clone());
let metrics = self.handler_state.metrics.clone();
self.shutdown.try_spawn_named_with_shutdown(
async move {
let result = handler.handle().await;
// Handler emits lifecycle metrics internally on success
// For errors, we need to emit them here since handler is consumed
if let Err(e) = result {
warn!("LP handler error for {remote_addr}: {e}");
// Note: metrics are emitted in handle() for graceful path
// On error path, handle() returns early without emitting
// So we track errors here
}
// Decrement connection counter on exit
metrics.network.lp_connection_closed();
},
&format!("LP::{remote_addr}"),
);
}
/// Spawn the UDP data handler for LP data plane
///
/// The data handler listens on UDP port 51264 and processes LP-wrapped Sphinx packets
/// from registered clients. It decrypts the LP layer and forwards the Sphinx packets
/// into the mixnet.
async fn spawn_data_handler(&self) -> Result<tokio::task::JoinHandle<()>, NymNodeError> {
// Create data handler
let data_handler = data_handler::LpDataHandler::new(
self.lp_config().data_bind_address,
self.handler_state.clone(),
self.shutdown.clone_shutdown_token(),
)
.await?;
// Spawn data handler task
let handle = self.shutdown.try_spawn_named(
async move {
if let Err(e) = data_handler.run().await {
error!("LP data handler error: {e}");
}
},
"LP::DataHandler",
);
Ok(handle)
}
/// Spawn background task for cleaning up stale state entries
///
/// This task runs periodically (every `state_cleanup_interval_secs`) to remove:
/// - Handshake states older than `handshake_ttl_secs`
/// - Session states with no activity for `session_ttl_secs`
///
/// The task automatically stops when the shutdown signal is received.
fn spawn_state_cleanup_task(&self) -> tokio::task::JoinHandle<()> {
let session_states = Arc::clone(&self.handler_state.session_states);
let dbg_cfg = self.handler_state.lp_config.debug;
let handshake_ttl = dbg_cfg.handshake_ttl;
let session_ttl = dbg_cfg.session_ttl;
let interval = dbg_cfg.state_cleanup_interval;
let shutdown = self.shutdown.clone_shutdown_token();
let metrics = self.handler_state.metrics.clone();
info!(
"Starting LP state cleanup task (handshake_ttl={}s, session_ttl={}s, interval={}s)",
handshake_ttl.as_secs(),
session_ttl.as_secs(),
interval.as_secs()
);
self.shutdown.try_spawn_named(
cleanup_task::cleanup_loop(session_states, dbg_cfg, shutdown, metrics),
"LP::StateCleanup",
)
}
async move {
if let Err(err) = self.control_listener.run().await {
shutdown_token.cancel();
error!("LP control listener error: {err}");
}
},
"LP::LpControlListener",
);
fn active_lp_connections(&self) -> usize {
self.handler_state
.metrics
.network
.active_lp_connections_count()
}
}
pub(crate) mod cleanup_task {
use crate::config::lp::LpDebug;
use crate::node::lp::{LpReceiverIndex, TimestampedState};
use dashmap::DashMap;
use nym_lp::LpStateMachine;
use nym_metrics::inc_by;
use nym_node_metrics::NymNodeMetrics;
use std::sync::Arc;
use tracing::{debug, info};
async fn perform_cleanup(
session_states: &Arc<DashMap<LpReceiverIndex, TimestampedState<LpStateMachine>>>,
cfg: LpDebug,
) {
let session_ttl = cfg.session_ttl;
let start = std::time::Instant::now();
let mut ss_removed = 0u64;
// Remove stale sessions (based on time since last activity)
// Use shorter TTL for demoted (ReadOnlyTransport) sessions
session_states.retain(|_, timestamped| {
if timestamped.since_activity() > session_ttl {
ss_removed += 1;
false
} else {
true
}
});
if ss_removed > 0 {
let duration = start.elapsed();
info!(
"LP state cleanup: {ss_removed} sessions (took {:.3}s)",
duration.as_secs_f64()
);
// Track metrics
if ss_removed > 0 {
inc_by!("lp_states_cleanup_session_removed", ss_removed as i64);
}
}
}
/// Background loop for cleaning up stale state entries
///
/// Runs periodically to scan handshake_states and session_states maps,
/// removing entries that have exceeded their TTL.
///
/// Demoted sessions (ReadOnlyTransport) use shorter TTL since they
/// only need to drain in-flight packets after subsession promotion.
pub(crate) async fn cleanup_loop(
session_states: Arc<DashMap<LpReceiverIndex, TimestampedState<LpStateMachine>>>,
cfg: LpDebug,
shutdown: nym_task::ShutdownToken,
_metrics: NymNodeMetrics,
) {
let interval = cfg.state_cleanup_interval;
let mut cleanup_interval = tokio::time::interval(interval);
loop {
tokio::select! {
biased;
_ = shutdown.cancelled() => {
debug!("LP state cleanup task: received shutdown signal");
break;
}
_ = cleanup_interval.tick() => {
perform_cleanup(&session_states, cfg).await;
}
}
}
info!("LP state cleanup task shutdown complete");
// Spawn the UDP data handler for LP data plane
// The data handler listens on UDP port 51264 and processes LP-wrapped Sphinx packets
// from registered clients. It decrypts the LP layer and forwards the Sphinx packets
let shutdown_token = self.shutdown.clone_shutdown_token();
self.shutdown.try_spawn_named(
async move {
if let Err(err) = self.data_listener.run().await {
shutdown_token.cancel();
error!("LP data listener error: {err}");
}
},
"LP::LpDataListener",
);
// cleanup task
self.shutdown.try_spawn_named(
async move { self.cleanup_task.run().await },
"LP::CleanupTask",
);
}
}
+3 -2
View File
@@ -1,7 +1,8 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::lp::{LpHandlerState, LpReceiverIndex};
use crate::node::lp::state::SharedLpControlState;
use nym_lp::peer_config::LpReceiverIndex;
use nym_metrics::{add_histogram_obs, inc};
use nym_registration_common::dvpn::{
LpDvpnRegistrationFinalisation, LpDvpnRegistrationInitialRequest,
@@ -28,7 +29,7 @@ const LP_REGISTRATION_DURATION_BUCKETS: &[f64] = &[
30.0, // 30s
];
impl LpHandlerState {
impl SharedLpControlState {
async fn process_dvpn_initial_registration(
&self,
sender: LpReceiverIndex,
+64
View File
@@ -0,0 +1,64 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::config::LpConfig;
use crate::node::lp::cleanup::TimestampedState;
use dashmap::DashMap;
use nym_gateway::node::wireguard::PeerRegistrator;
use nym_lp::LpStateMachine;
use nym_lp::peer::LpLocalPeer;
use nym_lp::peer_config::LpReceiverIndex;
use nym_mixnet_client::forwarder::MixForwardingSender;
use nym_node_metrics::NymNodeMetrics;
use std::sync::Arc;
use tokio::sync::Semaphore;
/// Shared state for LP control connections
#[derive(Clone)]
pub struct SharedLpControlState {
/// Encapsulates all required key information of a local Lewes Protocol Peer.
pub local_lp_peer: LpLocalPeer,
/// Handle registering new wireguard peers
pub peer_registrator: Option<PeerRegistrator>,
/// Semaphore limiting concurrent forward connections
///
/// Prevents file descriptor exhaustion when forwarding LP packets during
/// telescope setup. When at capacity, forward requests return an error
/// so clients can choose a different gateway.
// this is temporary until there is persistent KKT/PSQ session between nodes
pub forward_semaphore: Arc<Semaphore>,
/// Common shared data
pub shared: SharedLpState,
}
/// Shared state for LP data connections
#[derive(Clone)]
pub struct SharedLpDataState {
/// Channel for forwarding Sphinx packets into the mixnet
///
/// Used by the LP data handler (UDP:51264) to forward decrypted Sphinx packets
/// from LP clients into the mixnet for routing.
#[allow(dead_code)]
pub outbound_mix_sender: MixForwardingSender,
/// Common shared data
pub shared: SharedLpState,
}
/// Shared state for LP connection handlers
#[derive(Clone)]
pub struct SharedLpState {
/// Metrics collection
pub metrics: NymNodeMetrics,
/// LP configuration (for timestamp validation, etc.)
pub lp_config: LpConfig,
/// Established sessions keyed by receiver index
///
/// Wrapped in TimestampedState for TTL-based cleanup of inactive sessions.
pub session_states: Arc<DashMap<LpReceiverIndex, TimestampedState<LpStateMachine>>>,
}
+18 -24
View File
@@ -22,7 +22,7 @@ use crate::node::http::{HttpServerConfig, NymNodeHttpServer, NymNodeRouter};
use crate::node::key_rotation::active_keys::ActiveSphinxKeys;
use crate::node::key_rotation::controller::KeyRotationController;
use crate::node::key_rotation::manager::SphinxKeyManager;
use crate::node::lp::{LpHandlerState, LpListener};
use crate::node::lp::LpSetup;
use crate::node::metrics::aggregator::MetricsAggregator;
use crate::node::metrics::console_logger::ConsoleLogger;
use crate::node::metrics::handler::client_sessions::GatewaySessionStatsHandler;
@@ -80,7 +80,7 @@ use std::net::SocketAddr;
use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use tokio::sync::{Semaphore, mpsc};
use tokio::sync::mpsc;
use tracing::{debug, info, trace};
use zeroize::Zeroizing;
@@ -487,28 +487,23 @@ impl NymNode {
config.save()
}
pub async fn build_lp_listener(
&mut self,
pub async fn build_lp_tasks(
&self,
peer_registrator: Option<PeerRegistrator>,
mix_packet_sender: MixForwardingSender,
) -> Result<LpListener, NymNodeError> {
let handler_state = LpHandlerState {
local_lp_peer: LpLocalPeer::new(Ciphersuite::default(), self.x25519_lp_keys.clone())
.with_kem_keys(self.psq_kem_keys.clone()),
metrics: self.metrics.clone(),
peer_registrator,
lp_config: self.config.lp,
outbound_mix_sender: mix_packet_sender,
session_states: Arc::new(dashmap::DashMap::new()),
forward_semaphore: Arc::new(Semaphore::new(
self.config.lp.debug.max_concurrent_forwards,
)),
};
) -> Result<LpSetup, NymNodeError> {
let lp_peer = LpLocalPeer::new(Ciphersuite::default(), self.x25519_lp_keys.clone())
.with_kem_keys(self.psq_kem_keys.clone());
Ok(LpListener::new(
handler_state,
LpSetup::new(
lp_peer,
self.config.lp,
self.metrics.clone(),
peer_registrator,
mix_packet_sender,
self.shutdown_manager.shutdown_tracker().clone(),
))
)
.await
}
pub(crate) async fn new(config: Config) -> Result<Self, NymNodeError> {
@@ -770,11 +765,10 @@ impl NymNode {
"starting the LP listener on {} (data handler on: {})",
self.config.lp.control_bind_address, self.config.lp.data_bind_address,
);
let mut lp_listener = self
.build_lp_listener(wg_peer_registrator.clone(), mix_packet_sender)
let lp_tasks = self
.build_lp_tasks(wg_peer_registrator.clone(), mix_packet_sender)
.await?;
self.shutdown_tracker()
.try_spawn_named(async move { lp_listener.run().await }, "LpListener");
lp_tasks.start_tasks();
} else {
info!("node not running in entry mode: the websocket and LP will remain closed");
}
+1 -1
View File
@@ -421,7 +421,7 @@ impl SpeedtestClient {
/// This is the primary method for sending data through the mixnet via the LP transport.
/// Requires `init_lp_session()` to be called first to establish the LP cryptographic session.
///
/// # Data Flow (see gateway/src/node/lp_listener/data_handler.rs)
/// # Data Flow (see gateway/src/node/lp_listener/handler)
/// ```text
/// LP Client → UDP:51264 → LP Data Handler → Mixnet Entry
/// LP(Sphinx) decrypt LP forward Sphinx