Compare commits

...

9 Commits

Author SHA1 Message Date
Jędrzej Stuczyński 0b97d2bf93 wip 2026-03-06 15:37:49 +00:00
Jędrzej Stuczyński 16928a48a9 bugfix: setting correct LpPeerConfig during handshake 2026-03-06 10:18:45 +00:00
Jędrzej Stuczyński 24891adddf additional wiring of nested connections control 2026-03-05 16:07:50 +00:00
Jędrzej Stuczyński 6a42a8dd49 chore: add unit test for mutual KKT 2026-03-05 10:56:26 +00:00
Jędrzej Stuczyński bc0c2e5d19 basic node<>node KKT 2026-03-05 10:38:13 +00:00
Jędrzej Stuczyński 225178f95a reorganise control structure 2026-03-04 16:58:49 +00:00
Jędrzej Stuczyński 32cfb3fff8 basic node LP handler 2026-03-04 15:59:40 +00:00
Jędrzej Stuczyński f62a74a6af wip 2026-03-04 13:48:11 +00:00
Jędrzej Stuczyński fe9275274c scaffolding persistent gateway connections 2026-03-04 13:47:53 +00:00
34 changed files with 1525 additions and 378 deletions
Generated
+1
View File
@@ -6833,6 +6833,7 @@ dependencies = [
"nym-kkt-ciphersuite",
"nym-kkt-context",
"nym-pemstore",
"nym-test-utils",
"rand 0.9.2",
"rand_chacha 0.9.0",
"strum",
+1
View File
@@ -30,6 +30,7 @@ libcrux-ml-kem = { workspace = true }
[dev-dependencies]
rand_chacha = "0.9.0"
anyhow = { workspace = true }
nym-test-utils = { workspace = true }
[lints]
+122 -59
View File
@@ -16,10 +16,6 @@ pub use nym_kkt_context as context;
#[cfg(test)]
mod test {
use nym_kkt_ciphersuite::{Ciphersuite, HashFunction, HashLength, KEM, SignatureScheme};
use rand09::RngCore;
use std::collections::BTreeMap;
use crate::keys::KEMKeys;
use crate::{
initiator::KKTInitiator,
@@ -29,9 +25,13 @@ mod test {
},
responder::KKTResponder,
};
use nym_kkt_ciphersuite::{Ciphersuite, HashFunction, HashLength, KEM, SignatureScheme};
use nym_test_utils::helpers::deterministic_rng_09;
use rand09::RngCore;
use std::collections::BTreeMap;
#[test]
fn test_kkt_psq_e2e_encrypted_carrier() {
fn test_kkt_psq_e2e_one_way_encrypted_carrier() {
let mut rng = rand09::rng();
let mut payload: Vec<u8> = vec![0u8; 900_000];
@@ -47,7 +47,6 @@ mod test {
HashFunction::Shake256,
] {
// generate kem public keys
let responder_mlkem_keypair = generate_keypair_mlkem(&mut rng);
let responder_mceliece_keypair = generate_keypair_mceliece(&mut rng);
@@ -64,20 +63,6 @@ mod test {
HashLength::Default.value(),
responder_kem.mc_eliece_encapsulation_key().as_ref(),
);
let initiator_mlkem_keypair = generate_keypair_mlkem(&mut rng);
let initiator_mceliece_keypair = generate_keypair_mceliece(&mut rng);
let _i_dir_hash_mlkem = hash_encapsulation_key(
hash_function,
HashLength::Default.value(),
initiator_mlkem_keypair.public_key().as_slice(),
);
let _i_dir_hash_mceliece = hash_encapsulation_key(
hash_function,
HashLength::Default.value(),
initiator_mceliece_keypair.pk.as_ref(),
);
let init_hashes = BTreeMap::new();
@@ -128,41 +113,6 @@ mod test {
responder_kem.ml_kem768_encapsulation_key().as_slice(),
)
}
// Mutual - MlKem
{
let ciphersuite = Ciphersuite::resolve_ciphersuite(
KEM::MlKem768,
hash_function,
SignatureScheme::Ed25519,
None,
)
.unwrap();
let (mut initiator, request) = KKTInitiator::generate_one_way_request(
&mut rng,
ciphersuite,
&responder_x25519_keypair.pk,
&r_dir_hash_mlkem,
1u8,
Some(payload.clone()),
)
.unwrap();
let processed_request = responder.process_request(request, payload.len()).unwrap();
assert_eq!(processed_request.request_payload, payload);
// if we keep unverified keys, this should change
assert!(processed_request.remote_encapsulation_key.is_none());
let processed_response = initiator
.process_response(processed_request.response, 0)
.unwrap();
assert_eq!(
processed_response.encapsulation_key.as_bytes(),
responder_kem.ml_kem768_encapsulation_key().as_slice(),
)
}
// OneWay - McEliece
{
@@ -195,7 +145,110 @@ mod test {
responder_kem.mc_eliece_encapsulation_key().as_ref()
)
}
}
}
#[test]
fn test_kkt_psq_e2e_mutual_encrypted_carrier() {
let mut rng = deterministic_rng_09();
let mut payload: Vec<u8> = vec![0u8; 50000];
rng.fill_bytes(&mut payload);
// generate kem public keys
let initiator_mlkem_keypair = generate_keypair_mlkem(&mut rng);
let initiator_mceliece_keypair = generate_keypair_mceliece(&mut rng);
let responder_mlkem_keypair = generate_keypair_mlkem(&mut rng);
let responder_mceliece_keypair = generate_keypair_mceliece(&mut rng);
let responder_x25519_keypair = generate_lp_keypair_x25519(&mut rng);
let initiator_kem = KEMKeys::new(initiator_mceliece_keypair, initiator_mlkem_keypair);
let responder_kem = KEMKeys::new(responder_mceliece_keypair, responder_mlkem_keypair);
let init_hashes = initiator_kem.encapsulation_keys_digests();
let responder = KKTResponder::new(
&responder_x25519_keypair,
&responder_kem,
&init_hashes,
&[
HashFunction::Blake3,
HashFunction::SHA256,
HashFunction::Shake128,
HashFunction::Shake256,
],
&[SignatureScheme::Ed25519],
&[1],
)
.unwrap();
for hash_function in [
HashFunction::Blake3,
HashFunction::SHA256,
HashFunction::Shake128,
HashFunction::Shake256,
] {
let r_dir_hash_mlkem = hash_encapsulation_key(
hash_function,
HashLength::Default.value(),
responder_kem.ml_kem768_encapsulation_key().as_slice(),
);
let r_dir_hash_mceliece = hash_encapsulation_key(
hash_function,
HashLength::Default.value(),
responder_kem.mc_eliece_encapsulation_key().as_ref(),
);
// Mutual - MlKem
{
let ciphersuite = Ciphersuite::resolve_ciphersuite(
KEM::MlKem768,
hash_function,
SignatureScheme::Ed25519,
None,
)
.unwrap();
let (mut initiator, request) = KKTInitiator::generate_mutual_request(
&mut rng,
ciphersuite,
initiator_kem
.encoded_encapsulation_key(KEM::MlKem768)
.unwrap(),
&responder_x25519_keypair.pk,
&r_dir_hash_mlkem,
1u8,
Some(payload.clone()),
)
.unwrap();
let processed_request = responder.process_request(request, payload.len()).unwrap();
assert_eq!(processed_request.request_payload, payload);
assert_eq!(
processed_request
.remote_encapsulation_key
.unwrap()
.as_bytes(),
initiator_kem
.encapsulation_key(KEM::MlKem768)
.unwrap()
.as_bytes()
);
let processed_response = initiator
.process_response(processed_request.response, 0)
.unwrap();
assert_eq!(
processed_response.encapsulation_key.as_bytes(),
responder_kem.ml_kem768_encapsulation_key().as_slice(),
)
}
// Mutual - McEliece is not supported due to the key being too large
{
let ciphersuite = Ciphersuite::resolve_ciphersuite(
KEM::McEliece,
@@ -204,9 +257,12 @@ mod test {
None,
)
.unwrap();
let (mut initiator, request) = KKTInitiator::generate_one_way_request(
let (mut initiator, request) = KKTInitiator::generate_mutual_request(
&mut rng,
ciphersuite,
initiator_kem
.encoded_encapsulation_key(KEM::McEliece)
.unwrap(),
&responder_x25519_keypair.pk,
&r_dir_hash_mceliece,
1u8,
@@ -217,9 +273,16 @@ mod test {
let processed_request = responder.process_request(request, payload.len()).unwrap();
assert_eq!(processed_request.request_payload, payload);
// if we keep unverified keys, this should change
assert!(processed_request.remote_encapsulation_key.is_none());
assert_eq!(
processed_request
.remote_encapsulation_key
.unwrap()
.as_bytes(),
initiator_kem
.encapsulation_key(KEM::McEliece)
.unwrap()
.as_bytes()
);
let processed_response = initiator
.process_response(processed_request.response, 0)
+5 -1
View File
@@ -1,5 +1,5 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
// SPDX-License-Identifier: GPL-3.0-only
use crate::LpError;
use nym_kkt::keys::EncapsulationKey;
@@ -143,6 +143,10 @@ impl LpRemotePeer {
.ok_or(LpError::NoKnownKEMKeyDigests { kem, hash_function })
.cloned()
}
pub fn kem_key_digests(&self) -> &BTreeMap<KEM, KEMKeyDigests> {
&self.expected_kem_key_digests
}
}
impl From<DHPublicKey> for LpRemotePeer {
+5
View File
@@ -65,6 +65,7 @@ impl LpPeerConfig {
rng.random(),
)
}
/// Creates a new client to exit config.
/// Inputs:
/// hop_id: this value must be in the range (1..=15). This function returns an error if this is not the case.
@@ -79,6 +80,7 @@ impl LpPeerConfig {
{
Self::new(rng, hop_id, true, false, censorship_resistance)
}
/// Creates a new client to an intermediate node config.
/// Inputs:
/// hop_id: this value must be in the range (1..=14). This function returns an error if this is not the case.
@@ -130,6 +132,7 @@ impl LpPeerConfig {
rng.random(),
)
}
fn build(
hop_id: u8,
is_exit: bool,
@@ -147,6 +150,7 @@ impl LpPeerConfig {
seed,
}
}
fn build_checked(
hop_id: u8,
is_exit: bool,
@@ -203,6 +207,7 @@ impl LpPeerConfig {
output_bytes[4..].copy_from_slice(&self.seed);
output_bytes
}
pub fn deserialize(bytes: &[u8]) -> Result<Self, LpError> {
if bytes.len() != LP_PEER_CONFIG_SIZE {
return Err(LpError::DeserializationError(format!(
+43 -13
View File
@@ -24,10 +24,31 @@ use nym_kkt::message::{KKTRequest, KKTResponse};
use rand09::SeedableRng;
use tracing::debug;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HandshakeMode {
// Client <> Entry
OneWayEntry,
// Client <> Exit
OneWayExit,
// Entry <> Exit
MutualInternode,
// in the future more variants will be supported (such as individual mix hops)
}
impl HandshakeMode {
pub fn is_mutual(&self) -> bool {
matches!(self, HandshakeMode::MutualInternode)
}
}
pub struct PSQHandshakeStateInitiator<'a, S> {
pub(super) inner_state: PSQHandshakeState<'a, S>,
pub(super) initiator_data: InitiatorData,
pub(super) mutual: bool,
/// The mode of the handshake (mutual node-node, client-entry, entry-exit)
pub(super) mode: HandshakeMode,
}
pub(crate) fn build_psq_principal<R>(
@@ -78,13 +99,23 @@ impl<'a, S> PSQHandshakeStateInitiator<'a, S>
where
S: LpHandshakeChannel + Unpin,
{
pub fn set_mutual_kkt(mut self) -> Result<Self, LpError> {
if self.inner_state.local_peer.kem_keypairs.is_none() {
return Err(LpError::PSQMutualInitiatorMissingKemKey);
}
fn lp_peer_config<R>(&self, rng: &mut R) -> Result<LpPeerConfig, LpError>
where
R: rand09::CryptoRng,
{
// for now we don't support censorship resistance flag
let censorship_resistance = false;
self.mutual = true;
Ok(self)
match self.mode {
HandshakeMode::OneWayEntry => Ok(LpPeerConfig::new_client_to_entry(
rng,
censorship_resistance,
)),
HandshakeMode::OneWayExit => {
LpPeerConfig::new_client_to_exit(rng, 1, censorship_resistance)
}
HandshakeMode::MutualInternode => LpPeerConfig::new_node_to_node(rng),
}
}
/// Attempt to send KKT request to begin the handshake
@@ -132,7 +163,7 @@ where
let ciphersuite = self.inner_state.local_peer.ciphersuite();
let kem = ciphersuite.kem();
let lp_peer_config = LpPeerConfig::new_client_to_entry(rng, false);
let lp_peer_config = self.lp_peer_config(rng)?;
// 1. retrieve the expected kem key hash. if we don't know it,
let dir_hash = self
@@ -141,7 +172,7 @@ where
.expected_kem_key_hash(ciphersuite)?;
// 2. prepare and send KKT request
let (mut initiator, kkt_request) = if self.mutual {
let (mut initiator, kkt_request) = if self.mode.is_mutual() {
// this has been verified when setting the mutual flag
let Some(local_encapsulation_key) = self.inner_state.local_peer.encoded_kem_key(kem)
else {
@@ -273,8 +304,8 @@ mod tests {
resp.ciphersuite = ciphersuite;
let initiator_data = InitiatorData::new(1, resp_remote);
let handshake_init =
PSQHandshakeState::new(conn_init, init).as_initiator(initiator_data);
let handshake_init = PSQHandshakeState::new(conn_init, init)
.as_initiator(initiator_data, HandshakeMode::OneWayEntry)?;
let mut init_rng = DeterministicRng09Send::new(u64_seeded_rng_09(1));
@@ -396,8 +427,7 @@ mod tests {
let initiator_data = InitiatorData::new(1, resp_remote);
let handshake_init = PSQHandshakeState::new(conn_init, init)
.as_initiator(initiator_data)
.set_mutual_kkt()?;
.as_initiator(initiator_data, HandshakeMode::MutualInternode)?;
let mut init_rng = DeterministicRng09Send::new(u64_seeded_rng_09(1));
+22 -9
View File
@@ -12,6 +12,8 @@ mod helpers;
pub mod initiator;
pub mod responder;
use crate::LpError;
use crate::psq::initiator::HandshakeMode;
pub use initiator::PSQHandshakeStateInitiator;
pub use responder::PSQHandshakeStateResponder;
@@ -107,12 +109,20 @@ where
}
}
pub fn as_initiator(self, initiator_data: InitiatorData) -> PSQHandshakeStateInitiator<'a, S> {
PSQHandshakeStateInitiator {
pub fn as_initiator(
self,
initiator_data: InitiatorData,
mode: HandshakeMode,
) -> Result<PSQHandshakeStateInitiator<'a, S>, LpError> {
if mode.is_mutual() && self.local_peer.kem_keypairs.is_none() {
return Err(LpError::PSQMutualInitiatorMissingKemKey);
}
Ok(PSQHandshakeStateInitiator {
initiator_data,
inner_state: self,
mutual: false,
}
mode,
})
}
pub fn as_responder(self, responder_data: ResponderData) -> PSQHandshakeStateResponder<'a, S> {
@@ -160,8 +170,10 @@ mod tests {
resp.ciphersuite = ciphersuite;
let resp_remote = resp.as_remote();
let handshake_init = PSQHandshakeState::new(conn_init, init)
.as_initiator(InitiatorData::new(1, resp_remote));
let handshake_init = PSQHandshakeState::new(conn_init, init).as_initiator(
InitiatorData::new(1, resp_remote),
HandshakeMode::OneWayEntry,
)?;
let handshake_resp =
PSQHandshakeState::new(conn_resp, resp).as_responder(ResponderData::default());
@@ -232,9 +244,10 @@ mod tests {
let resp_remote = resp.as_remote();
let init_remote = init.as_remote();
let handshake_init = PSQHandshakeState::new(conn_init, init)
.as_initiator(InitiatorData::new(1, resp_remote))
.set_mutual_kkt()?;
let handshake_init = PSQHandshakeState::new(conn_init, init).as_initiator(
InitiatorData::new(1, resp_remote),
HandshakeMode::MutualInternode,
)?;
let handshake_resp = PSQHandshakeState::new(conn_resp, resp).as_responder(
ResponderData::default()
.with_initiator_kem_hashes(init_remote.expected_kem_key_digests),
+41 -3
View File
@@ -9,6 +9,7 @@ use crate::codec::{decrypt_lp_packet, encrypt_lp_packet};
use crate::packet::{EncryptedLpPacket, LpHeader, LpMessage, LpPacket};
use crate::peer::{LpLocalPeer, LpRemotePeer};
use crate::peer_config::LpReceiverIndex;
use crate::psq::initiator::HandshakeMode;
use crate::psq::{
InitiatorData, PSQHandshakeState, PSQHandshakeStateInitiator, PSQHandshakeStateResponder,
ResponderData,
@@ -19,6 +20,8 @@ use crate::{LpError, replay::ReceivingKeyCounterValidator};
use libcrux_psq::handshake::types::{Authenticator, DHPublicKey};
use libcrux_psq::session::{Session, SessionBinding};
use nym_kkt::keys::EncapsulationKey;
use nym_kkt_ciphersuite::{KEM, KEMKeyDigests};
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
/// Represents inputs that drive the state machine transitions.
@@ -154,12 +157,34 @@ impl LpTransportSession {
local_peer: LpLocalPeer,
remote_peer: LpRemotePeer,
remote_protocol_version: u8,
) -> PSQHandshakeStateInitiator<'_, S>
mode: HandshakeMode,
) -> Result<PSQHandshakeStateInitiator<'_, S>, LpError>
where
S: LpHandshakeChannel + Unpin,
{
PSQHandshakeState::new(connection, local_peer)
.as_initiator(InitiatorData::new(remote_protocol_version, remote_peer))
PSQHandshakeState::new(connection, local_peer).as_initiator(
InitiatorData::new(remote_protocol_version, remote_peer),
mode,
)
}
/// Helper function to create `PSQHandshakeState` for the handshake initiator for mutual KKT
pub fn psq_handshake_initiator_mutual_internode<S>(
connection: &'_ mut S,
local_peer: LpLocalPeer,
remote_peer: LpRemotePeer,
remote_protocol_version: u8,
) -> Result<PSQHandshakeStateInitiator<'_, S>, LpError>
where
S: LpHandshakeChannel + Unpin,
{
Self::psq_handshake_initiator(
connection,
local_peer,
remote_peer,
remote_protocol_version,
HandshakeMode::MutualInternode,
)
}
/// Helper function to create `PSQHandshakeState` for the handshake responder
@@ -173,6 +198,19 @@ impl LpTransportSession {
PSQHandshakeState::new(connection, local_peer).as_responder(ResponderData::default())
}
/// Helper function to create `PSQHandshakeState` for the handshake responder for mutual KKT
pub fn psq_handshake_responder_mutual<S>(
connection: &'_ mut S,
local_peer: LpLocalPeer,
initiator_kem_hashes: BTreeMap<KEM, KEMKeyDigests>,
) -> PSQHandshakeStateResponder<'_, S>
where
S: LpHandshakeChannel + Unpin,
{
PSQHandshakeState::new(connection, local_peer)
.as_responder(ResponderData::default().with_initiator_kem_hashes(initiator_kem_hashes))
}
pub fn session_binding(&self) -> &PersistentSessionBinding {
&self.session_binding
}
+14 -9
View File
@@ -17,9 +17,10 @@ mod tests {
use nym_lp::peer::LpLocalPeer;
use nym_node::config::{LpConfig, LpDebug};
use nym_node::node::GatewayStorage;
use nym_node::node::lp::control::handler::LpConnectionHandler;
use nym_node::node::lp::control::ingress::client_handler::LpClientConnectionHandler;
use nym_node::node::lp::error::LpHandlerError;
use nym_node::node::lp::{SharedLpControlState, SharedLpState};
use nym_node::node::lp::state::{ActiveLpSessions, NestedConnectionsManager};
use nym_node::node::lp::{SharedLpClientControlState, SharedLpState};
use nym_node::wireguard::{PeerManager, PeerRegistrator};
use nym_registration_client::{LpClientError, LpRegistrationClient};
use nym_test_utils::helpers::{CryptoRng09, seeded_rng};
@@ -35,7 +36,7 @@ mod tests {
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::{Receiver, channel};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::error;
@@ -120,7 +121,7 @@ mod tests {
enum SpawnedLpConnectionHandlerState {
NotCreated,
Ready {
handler: LpConnectionHandler<MockIOStream>,
handler: LpClientConnectionHandler<MockIOStream>,
},
Running {
handle: JoinHandle<Option<Result<(), LpHandlerError>>>,
@@ -130,7 +131,7 @@ mod tests {
struct Gateway {
base: Party,
lp_state: SharedLpControlState,
lp_state: SharedLpClientControlState,
ip_pool: IpPool,
mock_peer_controller: SpawnedPeerController,
@@ -216,6 +217,9 @@ mod tests {
let (mock_peer_controller, peer_controller_state) =
mock_peer_controller(peer_request_rx);
let (connection_ctrl_sender, _connection_manager_receiver) = channel(42);
let nested_connections_manager = NestedConnectionsManager::new(connection_ctrl_sender);
// registering particular responses for peer controller is up to given test
let ecash_verifier = Arc::new(ecash_verifier);
@@ -225,7 +229,7 @@ mod tests {
upgrade_mode_details,
);
let lp_state = SharedLpControlState {
let lp_state = SharedLpClientControlState {
local_lp_peer: base.peer.clone(),
forward_semaphore,
@@ -235,8 +239,9 @@ mod tests {
shared: SharedLpState {
metrics: Default::default(),
lp_config,
session_states: Arc::new(Default::default()),
session_states: ActiveLpSessions::new(),
},
nested_connections_manager,
};
Ok(Gateway {
@@ -262,7 +267,7 @@ mod tests {
};
self.lp_connection_handler = SpawnedLpConnectionHandlerState::Ready {
handler: LpConnectionHandler::new(
handler: LpClientConnectionHandler::new(
client_connection,
client_address,
self.lp_state.clone(),
@@ -290,7 +295,7 @@ mod tests {
}
fn spawn_lp_handler(&mut self) {
let SpawnedLpConnectionHandlerState::Ready { handler } = mem::replace(
let SpawnedLpConnectionHandlerState::Ready { mut handler } = mem::replace(
&mut self.lp_connection_handler,
SpawnedLpConnectionHandlerState::NotCreated,
) else {
+1
View File
@@ -139,6 +139,7 @@ harness = false
cargo_metadata = { workspace = true }
[dev-dependencies]
nym-lp = { workspace = true, features = ["mock"] }
criterion = { workspace = true, features = ["async_tokio"] }
nym-test-utils = { workspace = true }
+37 -7
View File
@@ -16,7 +16,14 @@ pub struct NetworkStats {
// the call stack
active_egress_mixnet_connections: Arc<AtomicUsize>,
active_lp_connections: AtomicUsize,
// incoming LP control connections from clients
active_lp_ingress_client_connections: AtomicUsize,
// incoming LP control connections from nodes
active_lp_ingress_node_connections: AtomicUsize,
// outgoing LP control connections to nodes
active_lp_egress_node_connections: AtomicUsize,
}
impl NetworkStats {
@@ -59,15 +66,38 @@ impl NetworkStats {
.load(Ordering::Relaxed)
}
pub fn new_lp_connection(&self) {
self.active_lp_connections.fetch_add(1, Ordering::Relaxed);
pub fn new_ingress_lp_client_connection(&self) {
self.active_lp_ingress_client_connections
.fetch_add(1, Ordering::Relaxed);
}
pub fn lp_connection_closed(&self) {
self.active_lp_connections.fetch_sub(1, Ordering::Relaxed);
pub fn closed_ingress_lp_client_connection(&self) {
self.active_lp_ingress_client_connections
.fetch_sub(1, Ordering::Relaxed);
}
pub fn active_lp_connections_count(&self) -> usize {
self.active_lp_connections.load(Ordering::Relaxed)
pub fn new_ingress_lp_node_connection(&self) {
self.active_lp_ingress_node_connections
.fetch_add(1, Ordering::Relaxed);
}
pub fn closed_ingress_lp_node_connection(&self) {
self.active_lp_ingress_node_connections
.fetch_sub(1, Ordering::Relaxed);
}
pub fn new_egress_lp_node_connection(&self) {
self.active_lp_egress_node_connections
.fetch_add(1, Ordering::Relaxed);
}
pub fn closed_egress_lp_node_connection(&self) {
self.active_lp_egress_node_connections
.fetch_sub(1, Ordering::Relaxed);
}
pub fn active_lp_client_connections_count(&self) -> usize {
self.active_lp_ingress_client_connections
.load(Ordering::Relaxed)
}
}
+4 -7
View File
@@ -2,11 +2,8 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::config::LpDebug;
use dashmap::DashMap;
use nym_lp::LpTransportSession;
use nym_lp::peer_config::LpReceiverIndex;
use crate::node::lp::state::ActiveLpSessions;
use nym_metrics::inc_by;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, info};
@@ -74,14 +71,14 @@ impl<T> TimestampedState<T> {
}
pub(crate) struct CleanupTask {
session_states: Arc<DashMap<LpReceiverIndex, TimestampedState<LpTransportSession>>>,
session_states: ActiveLpSessions,
cfg: LpDebug,
shutdown: nym_task::ShutdownToken,
}
impl CleanupTask {
pub fn new(
session_states: Arc<DashMap<LpReceiverIndex, TimestampedState<LpTransportSession>>>,
session_states: ActiveLpSessions,
cfg: LpDebug,
shutdown: nym_task::ShutdownToken,
) -> Self {
@@ -100,7 +97,7 @@ impl CleanupTask {
// Remove stale sessions (based on time since last activity)
// Use shorter TTL for demoted (ReadOnlyTransport) sessions
self.session_states.retain(|_, timestamped| {
self.session_states.sessions.retain(|_, timestamped| {
if timestamped.since_activity() > session_ttl {
ss_removed += 1;
false
@@ -0,0 +1,168 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::lp::control::LpConnectionStats;
use crate::node::lp::directory::LpNodeDetails;
use crate::node::lp::error::LpHandlerError;
use crate::node::lp::forwarding::client_connection::NestedClientConnectionSender;
use crate::node::lp::state::SharedLpNodeControlState;
use nym_lp::LpTransportSession;
use nym_lp::peer_config::LpReceiverIndex;
use nym_lp::transport::{LpHandshakeChannel, LpTransportChannel};
use std::collections::HashMap;
use std::net::SocketAddr;
use tracing::{debug, warn};
pub(crate) type NestedNodeConnectionSender = ();
pub(crate) type NestedNodeConnectionReceiver = ();
pub(crate) type NestedNodeControlSender = ();
pub(crate) type NestedNodeControlReceiver = ();
/// Initial connection handler for an egress LP node before completing the KKT/PSQ handshake.
pub struct InitialLpEgressNodeConnectionHandler<S> {
stream: S,
remote_addr: SocketAddr,
responder_details: LpNodeDetails,
state: SharedLpNodeControlState,
stats: LpConnectionStats,
}
impl<S> InitialLpEgressNodeConnectionHandler<S>
where
S: LpHandshakeChannel + LpHandshakeChannel + Unpin,
{
pub(crate) fn new(
stream: S,
remote_addr: SocketAddr,
responder_details: LpNodeDetails,
state: SharedLpNodeControlState,
) -> Self {
Self {
stream,
remote_addr,
responder_details,
state,
stats: LpConnectionStats::new(),
}
}
pub(crate) async fn complete_initial_handshake(
mut self,
) -> Option<Result<LpTransportSession, LpHandlerError>> {
let remote = self.remote_addr;
if self.responder_details.kem_key_hashes.is_empty() {
return Some(Err(LpHandlerError::MissingNodeKEMKeyHashes {
node_ip: self.remote_addr.ip(),
node_id: self.responder_details.node_id,
}));
}
// 1. complete KKT/PSQ handshake before doing anything else.
// bail if it takes too long
let timeout = self.state.shared.lp_config.debug.handshake_ttl;
let stream = &mut self.stream;
let handshake_state = match LpTransportSession::psq_handshake_initiator_mutual_internode(
stream,
self.state.local_lp_peer.clone(),
self.responder_details.to_lp_peer(),
self.responder_details.supported_protocol,
) {
Ok(handshake_state) => handshake_state,
Err(err) => {
debug!("failed to initiate mutual KTT/PSQ handshake with {remote}: {err}");
self.stats.emit_lifecycle_node_metrics(false);
return None;
}
};
let session = match tokio::time::timeout(timeout, handshake_state.complete_handshake())
.await
{
Err(_timeout) => {
debug!("timed out attempting to complete mutual KTT/PSQ handshake with {remote}");
self.stats.emit_lifecycle_node_metrics(false);
return None;
}
Ok(Err(handshake_failure)) => {
debug!(
"failed to complete mutual KKT/PSQ handshake with {remote}: {handshake_failure}"
);
self.stats.emit_lifecycle_node_metrics(false);
return None;
}
Ok(Ok(session)) => session,
};
debug!(
"completed egress KKT/PSQ handshake with node {}: {remote}",
self.responder_details.node_id
);
// TODO: change return type into complete handler
Some(Ok(session))
}
}
pub(crate) struct NestedNodeConnectionHandler<S> {
/// Persistent connection to exit gateway for forwarding.
/// Currently, it uses raw TCP socket, later it will be wrapped with dedicated PSQ tunnel
exit_stream: S,
/// Socket address of the remote of the established stream
exit_address: SocketAddr,
/// Map of senders to each known client handle (based on the inner receiver index)
client_handles: HashMap<LpReceiverIndex, NestedClientConnectionSender>,
/// Channel for receiving requests that are to be forwarded into the exit stream
data_receiver: NestedNodeConnectionReceiver,
/// Channel for adding new client handle and handling control requests from `NestedConnectionsController`
control_receiver: NestedNodeControlReceiver,
// client_streams: HashMap<StreamId, LpClientStream>,
}
impl<S> NestedNodeConnectionHandler<S>
where
S: LpTransportChannel + Unpin,
{
/// Attempt to extract outer receiver index from the received message
/// (that is meant to be an `LpPacket`)
fn extract_receiver_index(&self, raw: &[u8]) -> Option<LpReceiverIndex> {
if raw.len() < 4 {
return None;
}
Some(LpReceiverIndex::from_le_bytes([
raw[0], raw[1], raw[2], raw[3],
]))
}
/// Attempt to forward received packet to the client that established the inner LP session
async fn handle_exit_packet(&self, packet: Vec<u8>) {
let Some(receiver_index) = self.extract_receiver_index(&packet) else {
warn!("{} has sent us an invalid LP packet", self.exit_address);
return;
};
let Some(client_handle) = self.client_handles.get(&receiver_index) else {
warn!(
"no client handle for receiver index {receiver_index} received from {}",
self.exit_address
);
return;
};
// client_handle.send(packet).await;
}
async fn run(&mut self) {
// loop {
// tokio::select! {
//
// }
// }
}
}
@@ -0,0 +1,4 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
pub mod connection;
@@ -1,9 +1,11 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
// SPDX-License-Identifier: Apache-2.0
use crate::node::lp::cleanup::TimestampedState;
use crate::node::lp::control::{LP_DURATION_BUCKETS, LpConnectionStats};
use crate::node::lp::error::LpHandlerError;
use crate::node::lp::state::SharedLpControlState;
use crate::node::lp::forwarding::client_connection::NestedClientConnection;
use crate::node::lp::state::SharedLpClientControlState;
use dashmap::mapref::one::RefMut;
use nym_lp::packet::message::LpMessageType;
use nym_lp::packet::{EncryptedLpPacket, ForwardPacketData, LpMessage};
@@ -12,7 +14,8 @@ use nym_lp::session::{LpAction, LpInput};
use nym_lp::transport::LpHandshakeChannel;
use nym_lp::transport::traits::LpTransportChannel;
use nym_lp::{LpTransportSession, packet::message::ExpectedResponseSize};
use nym_metrics::{add_histogram_obs, inc, inc_by};
use nym_metrics::{add_histogram_obs, inc};
use nym_node_metrics::NymNodeMetrics;
use nym_registration_common::{LpRegistrationRequest, RegistrationStatus};
use std::net::SocketAddr;
use std::time::Duration;
@@ -20,69 +23,16 @@ use tokio::net::TcpStream;
use tokio::time::timeout;
use tracing::*;
// Histogram buckets for LP operation duration (legacy - used by unused forwarding methods)
const LP_DURATION_BUCKETS: &[f64] = &[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0];
// Timeout for forward I/O operations (send + receive on exit stream)
// Must be long enough to cover exit gateway processing time
const FORWARD_IO_TIMEOUT_SECS: u64 = 30;
// Histogram buckets for LP connection lifecycle duration
// LP connections can be very short (registration only: ~1s) or very long (dVPN sessions: hours/days)
// Covers full range from seconds to 24 hours
const LP_CONNECTION_DURATION_BUCKETS: &[f64] = &[
1.0, // 1 second
5.0, // 5 seconds
10.0, // 10 seconds
30.0, // 30 seconds
60.0, // 1 minute
300.0, // 5 minutes
600.0, // 10 minutes
1800.0, // 30 minutes
3600.0, // 1 hour
7200.0, // 2 hours
14400.0, // 4 hours
28800.0, // 8 hours
43200.0, // 12 hours
86400.0, // 24 hours
];
/// Connection lifecycle statistics tracking
struct ConnectionStats {
/// When the connection started
start_time: std::time::Instant,
/// Total bytes received (including protocol framing)
bytes_received: u64,
/// Total bytes sent (including protocol framing)
bytes_sent: u64,
}
impl ConnectionStats {
fn new() -> Self {
Self {
start_time: std::time::Instant::now(),
bytes_received: 0,
bytes_sent: 0,
}
}
fn record_bytes_received(&mut self, bytes: usize) {
self.bytes_received += bytes as u64;
}
fn record_bytes_sent(&mut self, bytes: usize) {
self.bytes_sent += bytes as u64;
}
}
pub struct LpConnectionHandler<S = TcpStream> {
pub struct LpClientConnectionHandler<S = TcpStream> {
stream: S,
remote_addr: SocketAddr,
state: SharedLpControlState,
stats: ConnectionStats,
state: SharedLpClientControlState,
stats: LpConnectionStats,
// /// Flag indicating whether this is a connection from an entry gateway serving as a proxy
// forwarded_connection: bool,
/// Bound receiver_idx for this connection (set after first packet).
/// All subsequent packets on this connection must use this receiver_idx.
/// Set from ClientHello's proposed receiver_index, or from header for non-bootstrap packets.
@@ -92,29 +42,32 @@ pub struct LpConnectionHandler<S = TcpStream> {
/// Opened on first forward, reused for subsequent forwards, closed when client disconnects.
/// Tuple contains (stream, target_address) to verify subsequent forwards go to same exit.
exit_stream: Option<(S, SocketAddr)>,
/// Forwarding channel for sending requests to the exit gateway and receiving responses.
#[allow(dead_code)]
forwarding_channel: Option<NestedClientConnection>,
}
impl<S> LpConnectionHandler<S>
impl<S> LpClientConnectionHandler<S>
where
S: LpTransportChannel + LpHandshakeChannel + Unpin,
{
pub fn new(
stream: S,
// forwarded_connection: bool,
remote_addr: SocketAddr,
state: SharedLpControlState,
) -> Self {
pub fn new(stream: S, remote_addr: SocketAddr, state: SharedLpClientControlState) -> Self {
Self {
stream,
remote_addr,
// forwarded_connection,
state,
stats: ConnectionStats::new(),
stats: LpConnectionStats::new(),
bound_receiver_idx: None,
exit_stream: None,
forwarding_channel: None,
}
}
pub(crate) fn metrics(&self) -> &NymNodeMetrics {
&self.state.shared.metrics
}
/// Get the mutable reference to the state machine associated with this client.
/// It is vital it's never held across await points or this might lead to a deadlock.
fn state_entry_mut(
@@ -125,8 +78,7 @@ where
self.state
.shared
.session_states
.get_mut(&receiver_index)
.ok_or_else(|| LpHandlerError::MissingLpSession { receiver_index })
.get_state_entry_mut(receiver_index)
}
/// AIDEV-NOTE: Stream-oriented packet loop
@@ -135,11 +87,12 @@ where
/// First packet binds the connection to a receiver_idx (session-affine).
/// Binding is set by handle_client_hello() from payload's receiver_index,
/// or by validate_or_set_binding() for non-bootstrap first packets.
pub async fn handle(mut self) -> Result<(), LpHandlerError> {
debug!("Handling LP connection from {}", self.remote_addr);
pub async fn handle(&mut self) -> Result<(), LpHandlerError> {
let remote = self.remote_addr;
debug!("Handling LP connection from {remote}");
// Track total LP connections handled
inc!("lp_connections_total");
inc!("lp_client_connections_total");
// ============================================================
// STREAM-ORIENTED PROCESSING: Loop until connection closes
@@ -160,18 +113,12 @@ where
.await
{
Err(_timeout) => {
debug!(
"timed out attempting to complete KTT/PSQ handshake with {}",
self.remote_addr
);
debug!("timed out attempting to complete KTT/PSQ handshake with {remote}",);
self.emit_lifecycle_metrics(false);
return Ok(());
}
Ok(Err(handshake_failure)) => {
debug!(
"failed to complete KKT/PSQ handshake with {}: {handshake_failure}",
self.remote_addr
);
debug!("failed to complete KKT/PSQ handshake with {remote}: {handshake_failure}",);
self.emit_lifecycle_metrics(false);
return Ok(());
}
@@ -180,10 +127,7 @@ where
let receiver_idx = session.receiver_index();
// 2. insert the state machine into the shared state
self.state
.shared
.session_states
.insert(receiver_idx, TimestampedState::new(session));
self.state.shared.session_states.insert_new_session(session);
self.bound_receiver_idx = Some(receiver_idx);
// 3. handle any new incoming packet
@@ -194,7 +138,7 @@ where
Err(err) => {
if err.is_connection_closed() {
// Graceful EOF - client closed connection
trace!("Connection closed by {} (EOF)", self.remote_addr);
trace!("Connection closed by {remote} (EOF)");
break;
} else {
inc!("lp_errors_receive_packet");
@@ -625,30 +569,7 @@ where
/// Emit connection lifecycle metrics
fn emit_lifecycle_metrics(&self, graceful: bool) {
// Track connection duration
let duration = self.stats.start_time.elapsed().as_secs_f64();
add_histogram_obs!(
"lp_connection_duration_seconds",
duration,
LP_CONNECTION_DURATION_BUCKETS
);
// Track bytes transferred
inc_by!(
"lp_connection_bytes_received_total",
self.stats.bytes_received as i64
);
inc_by!(
"lp_connection_bytes_sent_total",
self.stats.bytes_sent as i64
);
// Track completion type
if graceful {
inc!("lp_connections_completed_gracefully");
} else {
inc!("lp_connections_completed_with_error");
}
self.stats.emit_lifecycle_client_metrics(graceful);
}
}
@@ -657,7 +578,7 @@ mod tests {
use super::*;
use crate::config::LpConfig;
use crate::config::lp::LpDebug;
use crate::node::lp::state::SharedLpState;
use crate::node::lp::state::{ActiveLpSessions, SharedLpState};
use nym_lp::peer::{KEMKeys, LpLocalPeer, generate_keypair_mceliece, generate_keypair_mlkem};
use nym_lp::{Ciphersuite, SessionManager, sessions_for_tests};
use nym_test_utils::helpers::{deterministic_rng, deterministic_rng_09};
@@ -665,7 +586,7 @@ mod tests {
// ==================== Test Helpers ====================
/// Create a minimal test state for handler tests
async fn create_minimal_test_state() -> SharedLpControlState {
async fn create_minimal_test_state() -> SharedLpClientControlState {
use nym_crypto::asymmetric::ed25519;
let mut rng = deterministic_rng();
@@ -690,14 +611,14 @@ mod tests {
);
let lp_peer = LpLocalPeer::new(Ciphersuite::default(), x_keys).with_kem_keys(kem_keys);
SharedLpControlState {
SharedLpClientControlState {
local_lp_peer: lp_peer,
peer_registrator: None,
forward_semaphore,
shared: SharedLpState {
lp_config,
metrics: nym_node_metrics::NymNodeMetrics::default(),
session_states: Arc::new(dashmap::DashMap::new()),
metrics: NymNodeMetrics::default(),
session_states: ActiveLpSessions::new(),
},
}
}
@@ -724,7 +645,7 @@ mod tests {
let server_task = tokio::spawn(async move {
let (stream, remote_addr) = listener.accept().await.unwrap();
let state = create_minimal_test_state().await;
let mut handler = LpConnectionHandler::new(stream, remote_addr, state);
let mut handler = LpClientConnectionHandler::new(stream, remote_addr, state);
// Two-phase: receive raw bytes + header, then parse full packet
let packet = handler.receive_raw_packet().await?;
let header = packet.outer_header();
@@ -0,0 +1,188 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::config::LpConfig;
use crate::error::NymNodeError;
use crate::node::lp::control::ingress::client_handler::LpClientConnectionHandler;
use crate::node::lp::control::ingress::node_handler::InitialLpIngressNodeConnectionHandler;
use crate::node::lp::directory::LpNodeDetails;
use crate::node::lp::state::{SharedLpClientControlState, SharedLpNodeControlState};
use nym_task::ShutdownTracker;
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tracing::{debug, error, info, trace, warn};
/// LP listener that accepts TCP connections on port 41264
pub struct LpControlListener {
/// Address to bind to
bind_address: SocketAddr,
/// Shared state for clients connection handlers
clients_handler_state: SharedLpClientControlState,
/// Shared state for nodes connection handlers
nodes_handler_state: SharedLpNodeControlState,
/// Shutdown coordination
shutdown: ShutdownTracker,
}
impl LpControlListener {
pub fn new(
bind_address: SocketAddr,
clients_handler_state: SharedLpClientControlState,
nodes_handler_state: SharedLpNodeControlState,
shutdown: ShutdownTracker,
) -> Self {
Self {
bind_address,
clients_handler_state,
nodes_handler_state,
shutdown,
}
}
fn lp_config(&self) -> LpConfig {
self.clients_handler_state.shared.lp_config
}
pub async fn run(&mut self) -> Result<(), NymNodeError> {
let bind_address = self.bind_address;
info!("Starting LP control listener on {bind_address}");
let listener = TcpListener::bind(bind_address).await.map_err(|source| {
error!("Failed to bind LP listener to {bind_address}: {source}",);
NymNodeError::LpBindFailure {
address: bind_address,
source,
}
})?;
let shutdown_token = self.shutdown.clone_shutdown_token();
loop {
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
trace!("LP listener: received shutdown signal");
break;
}
result = listener.accept() => {
match result {
Ok((stream, addr)) => self.handle_connection(stream, addr),
Err(e) => warn!("Failed to accept LP connection: {e}")
}
}
}
}
info!("LP listener shutdown complete");
Ok(())
}
fn handle_node_connection(
&self,
stream: tokio::net::TcpStream,
remote_addr: SocketAddr,
initiator_details: LpNodeDetails,
) {
debug!("Accepting LP node connection from {remote_addr}");
// Spawn handler task
let mut handler = InitialLpIngressNodeConnectionHandler::new(
stream,
remote_addr,
initiator_details,
self.nodes_handler_state.clone(),
);
self.shutdown.try_spawn_named_with_shutdown(
async move {
let metrics = handler.metrics().clone();
// Increment connection counter
metrics.network.new_ingress_lp_node_connection();
let result = handler.handle().await;
// Decrement connection counter
metrics.network.closed_ingress_lp_node_connection();
// Handler emits lifecycle metrics internally on success
// For errors, we need to emit them here since handler is consumed
if let Err(e) = result {
warn!("LP node handler error for {remote_addr}: {e}");
// Note: metrics are emitted in handle() for graceful path
// On error path, handle() returns early without emitting
// So we track errors here
}
},
&format!("LP_NODE::{remote_addr}"),
);
}
fn handle_client_connection(&self, stream: tokio::net::TcpStream, remote_addr: SocketAddr) {
// Check connection limit (only for clients, nodes must always be allowed regardless of the limit)
let active_connections = self.active_client_connections();
let max_connections = self.lp_config().debug.max_connections;
if active_connections >= max_connections {
warn!(
"LP connection limit exceeded ({active_connections}/{max_connections}), rejecting connection from {remote_addr}"
);
return;
}
debug!(
"Accepting LP client connection from {remote_addr} ({active_connections} active connections)"
);
// Spawn handler task
let mut handler =
LpClientConnectionHandler::new(stream, remote_addr, self.clients_handler_state.clone());
self.shutdown.try_spawn_named_with_shutdown(
async move {
// Increment connection counter
handler.metrics().network.new_ingress_lp_client_connection();
let result = handler.handle().await;
// Decrement connection counter
handler
.metrics()
.network
.closed_ingress_lp_client_connection();
// Handler emits lifecycle metrics internally on success
// For errors, we need to emit them here since handler is consumed
if let Err(e) = result {
warn!("LP client handler error for {remote_addr}: {e}");
// Note: metrics are emitted in handle() for graceful path
// On error path, handle() returns early without emitting
// So we track errors here
}
},
&format!("LP_CLIENT::{remote_addr}"),
);
}
fn handle_connection(&self, stream: tokio::net::TcpStream, remote_addr: SocketAddr) {
if let Some(initiator_details) = self
.nodes_handler_state
.nodes
.get_node_details(remote_addr.ip())
{
self.handle_node_connection(stream, remote_addr, initiator_details);
} else {
self.handle_client_connection(stream, remote_addr);
}
}
fn active_client_connections(&self) -> usize {
self.clients_handler_state
.shared
.metrics
.network
.active_lp_client_connections_count()
}
}
@@ -0,0 +1,6 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod client_handler;
pub(crate) mod listener;
pub mod node_handler;
@@ -0,0 +1,148 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::node::lp::control::LpConnectionStats;
use crate::node::lp::directory::LpNodeDetails;
use crate::node::lp::error::LpHandlerError;
use crate::node::lp::state::SharedLpNodeControlState;
use nym_lp::LpTransportSession;
use nym_lp::transport::{LpHandshakeChannel, LpTransportChannel};
use nym_metrics::inc;
use nym_node_metrics::NymNodeMetrics;
use nym_topology::NodeId;
use std::net::SocketAddr;
use tokio::net::TcpStream;
use tracing::debug;
/// Initial connection handler for an ingress LP node before completing the KKT/PSQ handshake.
pub struct InitialLpIngressNodeConnectionHandler<S = TcpStream> {
stream: S,
remote_addr: SocketAddr,
initiator_details: LpNodeDetails,
state: SharedLpNodeControlState,
stats: LpConnectionStats,
}
impl<S> InitialLpIngressNodeConnectionHandler<S>
where
S: LpHandshakeChannel + LpTransportChannel + Unpin,
{
pub fn new(
stream: S,
remote_addr: SocketAddr,
initiator_details: LpNodeDetails,
state: SharedLpNodeControlState,
) -> Self {
Self {
stream,
remote_addr,
initiator_details,
state,
stats: LpConnectionStats::new(),
}
}
pub(crate) fn metrics(&self) -> &NymNodeMetrics {
&self.state.shared.metrics
}
pub(crate) async fn complete_initial_handshake(
mut self,
) -> Option<Result<LpIngressNodeConnectionHandler<S>, LpHandlerError>> {
let remote = self.remote_addr;
if self.initiator_details.kem_key_hashes.is_empty() {
return Some(Err(LpHandlerError::MissingNodeKEMKeyHashes {
node_ip: self.remote_addr.ip(),
node_id: self.initiator_details.node_id,
}));
}
// 1. complete KKT/PSQ handshake before doing anything else.
// bail if it takes too long
let timeout = self.state.shared.lp_config.debug.handshake_ttl;
let local_peer = self.state.local_lp_peer.clone();
let stream = &mut self.stream;
let kem_hashes = self.initiator_details.kem_key_hashes.clone();
let session = match tokio::time::timeout(timeout, async move {
LpTransportSession::psq_handshake_responder_mutual(stream, local_peer, kem_hashes)
.complete_handshake()
.await
})
.await
{
Err(_timeout) => {
debug!("timed out attempting to complete mutual KTT/PSQ handshake with {remote}");
self.stats.emit_lifecycle_node_metrics(false);
return None;
}
Ok(Err(handshake_failure)) => {
debug!(
"failed to complete mutual KKT/PSQ handshake with {remote}: {handshake_failure}"
);
self.stats.emit_lifecycle_node_metrics(false);
return None;
}
Ok(Ok(session)) => session,
};
debug!(
"completed ingress KKT/PSQ handshake with node {}: {remote}",
self.initiator_details.node_id
);
Some(Ok(LpIngressNodeConnectionHandler {
stream: self.stream,
remote_addr: remote,
remote_node_id: self.initiator_details.node_id,
state: self.state,
stats: self.stats,
transport_session: session,
}))
}
pub async fn handle(mut self) -> Result<(), LpHandlerError> {
// Track total LP connections handled
inc!("lp_node_connections_total");
// attempt to complete initial handshake
let upgraded_handler = match self.complete_initial_handshake().await {
None => return Ok(()),
Some(handler_res) => handler_res?,
};
// continue handling the requests with the transport session
upgraded_handler.handle().await
}
}
/// Connection handler for an LP node after completing the KKT/PSQ handshake.
pub struct LpIngressNodeConnectionHandler<S = TcpStream> {
stream: S,
remote_addr: SocketAddr,
remote_node_id: NodeId,
state: SharedLpNodeControlState,
stats: LpConnectionStats,
transport_session: LpTransportSession,
// LOCAL receiver index to stream id
// client_streams: HashMap<ReceiverIndex, ClientStreamId>,
}
impl<S> LpIngressNodeConnectionHandler<S>
where
S: LpHandshakeChannel + LpTransportChannel + Unpin,
{
async fn handle(mut self) -> Result<(), LpHandlerError> {
// handle all the forwarding here
self.stats.emit_lifecycle_node_metrics(true);
Ok(())
}
pub(crate) fn transport_session(&self) -> &LpTransportSession {
&self.transport_session
}
}
-130
View File
@@ -1,130 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::config::LpConfig;
use crate::error::NymNodeError;
use crate::node::lp::control::handler::LpConnectionHandler;
use crate::node::lp::state::SharedLpControlState;
use nym_task::ShutdownTracker;
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tracing::{debug, error, info, trace, warn};
/// LP listener that accepts TCP connections on port 41264
pub struct LpControlListener {
/// Address to bind to
bind_address: SocketAddr,
/// Shared state for connection handlers
handler_state: SharedLpControlState,
/// Shutdown coordination
shutdown: ShutdownTracker,
}
impl LpControlListener {
pub fn new(
bind_address: SocketAddr,
handler_state: SharedLpControlState,
shutdown: ShutdownTracker,
) -> Self {
Self {
bind_address,
handler_state,
shutdown,
}
}
fn lp_config(&self) -> LpConfig {
self.handler_state.shared.lp_config
}
pub async fn run(&mut self) -> Result<(), NymNodeError> {
let bind_address = self.bind_address;
info!("Starting LP control listener on {bind_address}");
let listener = TcpListener::bind(bind_address).await.map_err(|source| {
error!("Failed to bind LP listener to {bind_address}: {source}",);
NymNodeError::LpBindFailure {
address: bind_address,
source,
}
})?;
let shutdown_token = self.shutdown.clone_shutdown_token();
loop {
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
trace!("LP listener: received shutdown signal");
break;
}
result = listener.accept() => {
match result {
Ok((stream, addr)) => self.handle_connection(stream, addr),
Err(e) => warn!("Failed to accept LP connection: {e}")
}
}
}
}
info!("LP listener shutdown complete");
Ok(())
}
fn handle_connection(&self, stream: tokio::net::TcpStream, remote_addr: SocketAddr) {
// Check connection limit
let active_connections = self.active_lp_connections();
let max_connections = self.lp_config().debug.max_connections;
if active_connections >= max_connections {
warn!(
"LP connection limit exceeded ({active_connections}/{max_connections}), rejecting connection from {remote_addr}"
);
return;
}
debug!(
"Accepting LP connection from {remote_addr} ({active_connections} active connections)"
);
// Increment connection counter
self.handler_state
.shared
.metrics
.network
.new_lp_connection();
// Spawn handler task
let handler = LpConnectionHandler::new(stream, remote_addr, self.handler_state.clone());
let metrics = self.handler_state.shared.metrics.clone();
self.shutdown.try_spawn_named_with_shutdown(
async move {
let result = handler.handle().await;
// Handler emits lifecycle metrics internally on success
// For errors, we need to emit them here since handler is consumed
if let Err(e) = result {
warn!("LP handler error for {remote_addr}: {e}");
// Note: metrics are emitted in handle() for graceful path
// On error path, handle() returns early without emitting
// So we track errors here
}
// Decrement connection counter on exit
metrics.network.lp_connection_closed();
},
&format!("LP::{remote_addr}"),
);
}
fn active_lp_connections(&self) -> usize {
self.handler_state
.shared
.metrics
.network
.active_lp_connections_count()
}
}
+116 -2
View File
@@ -1,5 +1,119 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
pub mod handler;
pub(crate) mod listener;
use nym_metrics::{add_histogram_obs, inc, inc_by};
pub mod egress;
pub mod ingress;
mod tests;
// Histogram buckets for LP operation duration (legacy - used by unused forwarding methods)
const LP_DURATION_BUCKETS: &[f64] = &[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0];
// Histogram buckets for LP connection lifecycle duration
// LP connections can be very short (registration only: ~1s) or very long (dVPN sessions: hours/days)
// Covers full range from seconds to 24 hours
const LP_CONNECTION_DURATION_BUCKETS: &[f64] = &[
1.0, // 1 second
5.0, // 5 seconds
10.0, // 10 seconds
30.0, // 30 seconds
60.0, // 1 minute
300.0, // 5 minutes
600.0, // 10 minutes
1800.0, // 30 minutes
3600.0, // 1 hour
7200.0, // 2 hours
14400.0, // 4 hours
28800.0, // 8 hours
43200.0, // 12 hours
86400.0, // 24 hours
];
/// Connection lifecycle statistics tracking
pub(crate) struct LpConnectionStats {
/// When the connection started
start_time: std::time::Instant,
/// Total bytes received (including protocol framing)
bytes_received: u64,
/// Total bytes sent (including protocol framing)
bytes_sent: u64,
}
impl LpConnectionStats {
fn new() -> Self {
Self {
start_time: std::time::Instant::now(),
bytes_received: 0,
bytes_sent: 0,
}
}
fn duration(&self) -> std::time::Duration {
self.start_time.elapsed()
}
fn record_bytes_received(&mut self, bytes: usize) {
self.bytes_received += bytes as u64;
}
fn record_bytes_sent(&mut self, bytes: usize) {
self.bytes_sent += bytes as u64;
}
/// Emit connection lifecycle metrics for a client connection
fn emit_lifecycle_client_metrics(&self, graceful: bool) {
// Track connection duration
let duration = self.duration().as_secs_f64();
add_histogram_obs!(
"lp_client_connection_duration_seconds",
duration,
LP_CONNECTION_DURATION_BUCKETS
);
// Track bytes transferred
inc_by!(
"lp_client_connection_bytes_received_total",
self.bytes_received as i64
);
inc_by!(
"lp_client_connection_bytes_sent_total",
self.bytes_sent as i64
);
// Track completion type
if graceful {
inc!("lp_client_connections_completed_gracefully");
} else {
inc!("lp_client_connections_completed_with_error");
}
}
/// Emit connection lifecycle metrics for a node connection
fn emit_lifecycle_node_metrics(&self, graceful: bool) {
// Track connection duration
let duration = self.duration().as_secs_f64();
add_histogram_obs!(
"lp_node_connection_duration_seconds",
duration,
LP_CONNECTION_DURATION_BUCKETS
);
// Track bytes transferred
inc_by!(
"lp_node_connection_bytes_received_total",
self.bytes_received as i64
);
inc_by!(
"lp_node_connection_bytes_sent_total",
self.bytes_sent as i64
);
// Track completion type
if graceful {
inc!("lp_node_connections_completed_gracefully");
} else {
inc!("lp_node_connections_completed_with_error");
}
}
}
+89
View File
@@ -0,0 +1,89 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
#[cfg(test)]
mod tests {
use crate::node::lp::SharedLpState;
use crate::node::lp::control::egress::connection::InitialLpEgressNodeConnectionHandler;
use crate::node::lp::control::ingress::node_handler::InitialLpIngressNodeConnectionHandler;
use crate::node::lp::directory::LpNodeDetails;
use crate::node::lp::state::SharedLpNodeControlState;
use anyhow::Context;
use nym_lp::packet::version;
use nym_lp::peer::{LpLocalPeer, LpRemotePeer, mock_peers};
use nym_test_utils::helpers::seeded_rng;
use nym_test_utils::mocks::async_read_write::MockIOStream;
use nym_test_utils::traits::TimeboxedSpawnable;
use rand::RngCore;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
fn shared_node_state(peer: LpLocalPeer) -> SharedLpNodeControlState {
SharedLpNodeControlState {
local_lp_peer: peer,
nodes: Default::default(),
shared: SharedLpState {
metrics: Default::default(),
lp_config: Default::default(),
session_states: Default::default(),
},
}
}
fn lp_node_details(peer: LpRemotePeer) -> LpNodeDetails {
let key_bytes = peer.x25519().as_ref().try_into().unwrap();
let mut rng = seeded_rng(key_bytes);
LpNodeDetails::new(
rng.next_u32(),
peer.kem_key_digests().clone(),
peer.x25519().clone(),
version::CURRENT,
)
}
#[tokio::test]
async fn basic_node_to_node_handshake() -> anyhow::Result<()> {
nym_test_utils::helpers::setup_test_logger();
let (init, resp) = mock_peers();
let init_remote = init.as_remote();
let resp_remote = resp.as_remote();
let conn_init = MockIOStream::default();
let conn_resp = conn_init.try_get_remote_handle();
let init_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234);
let init_details = lp_node_details(init_remote);
let resp_details = lp_node_details(resp_remote);
let init_state = shared_node_state(init);
let resp_state = shared_node_state(resp);
let init_handler = InitialLpEgressNodeConnectionHandler::new(
conn_init,
init_addr,
resp_details,
init_state,
);
let resp_handler = InitialLpIngressNodeConnectionHandler::new(
conn_resp,
init_addr,
init_details,
resp_state,
);
let init_future = init_handler.complete_initial_handshake().spawn_timeboxed();
let resp_future = resp_handler.complete_initial_handshake().spawn_timeboxed();
let (init_result, resp_result) = tokio::join!(init_future, resp_future);
let init_result = init_result??.context("handshake failure")??;
let resp_result = resp_result??.context("handshake failure")??;
assert_eq!(
init_result.receiver_index(),
resp_result.transport_session().receiver_index()
);
Ok(())
}
}
+75
View File
@@ -0,0 +1,75 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use arc_swap::ArcSwap;
use nym_lp::peer::{DHPublicKey, LpRemotePeer};
use nym_lp::{KEM, KEMKeyDigests};
use nym_topology::NodeId;
use std::collections::{BTreeMap, HashMap};
use std::net::IpAddr;
use std::ops::Deref;
use std::sync::Arc;
/// Wrapper around all known LP nodes
#[derive(Clone, Default)]
pub struct LpNodes {
// map between all available ip addresses of other nodes and their details
nodes: Arc<ArcSwap<HashMap<IpAddr, LpNodeDetails>>>,
}
impl LpNodes {
pub(crate) fn get_node_details(&self, node_ip: IpAddr) -> Option<LpNodeDetails> {
self.nodes.load().get(&node_ip).cloned()
}
pub(crate) fn get_node_id(&self, node_ip: IpAddr) -> Option<NodeId> {
self.nodes
.load()
.get(&node_ip)
.map(|details| details.node_id)
}
}
#[derive(Clone)]
pub(crate) struct LpNodeDetails {
inner: Arc<LpNodeDetailsInner>,
}
impl LpNodeDetails {
pub(crate) fn new(
node_id: NodeId,
kem_key_hashes: BTreeMap<KEM, KEMKeyDigests>,
x25519: DHPublicKey,
supported_protocol: u8,
) -> Self {
LpNodeDetails {
inner: Arc::new(LpNodeDetailsInner {
node_id,
kem_key_hashes,
x25519,
supported_protocol,
}),
}
}
}
impl Deref for LpNodeDetails {
type Target = LpNodeDetailsInner;
fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}
pub(crate) struct LpNodeDetailsInner {
pub(crate) node_id: NodeId,
pub(crate) kem_key_hashes: BTreeMap<KEM, KEMKeyDigests>,
pub(crate) x25519: DHPublicKey,
pub(crate) supported_protocol: u8,
}
impl LpNodeDetailsInner {
pub(crate) fn to_lp_peer(&self) -> LpRemotePeer {
LpRemotePeer::new(self.x25519).with_key_digests(self.kem_key_hashes.clone())
}
}
+15 -1
View File
@@ -6,7 +6,8 @@ use nym_lp::peer_config::LpReceiverIndex;
use nym_lp::session::LpAction;
use nym_lp::transport::LpTransportError;
use nym_lp::{LpError, packet::MalformedLpPacketError};
use std::net::SocketAddr;
use nym_topology::NodeId;
use std::net::{IpAddr, SocketAddr};
use thiserror::Error;
#[derive(Debug, Error)]
@@ -47,9 +48,18 @@ pub enum LpHandlerError {
#[error("timed out while attempting to send to/receive from the connection")]
ConnectionTimeout,
#[error("missing KEM key hashes for node {node_id} connected from {node_ip}")]
MissingNodeKEMKeyHashes { node_ip: IpAddr, node_id: NodeId },
#[error("data channel is not yet implemented")]
UnimplementedDataChannel,
#[error("{ip_addr} does not correspond to any known LP node")]
NotLpNode { ip_addr: IpAddr },
#[error("{0}")]
Internal(String),
#[error("{0}")]
Other(String),
}
@@ -64,6 +74,10 @@ impl LpHandlerError {
}
}
pub fn internal(msg: impl Into<String>) -> Self {
LpHandlerError::Internal(msg.into())
}
pub fn other(msg: impl Into<String>) -> Self {
LpHandlerError::Other(msg.into())
}
@@ -0,0 +1,16 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::node::lp::control::egress::connection::NestedNodeConnectionSender;
use futures::channel::mpsc::UnboundedReceiver;
pub(crate) type NestedClientConnectionSender = ();
pub(crate) type NestedClientConnectionReceiver = UnboundedReceiver<Vec<u8>>;
pub(crate) struct NestedClientConnection {
// handle for sending into `NestedNodeConnectionHandler`
sender: NestedNodeConnectionSender,
// handle for receiving from `NestedNodeConnectionHandler`
receiver: NestedClientConnectionReceiver,
}
@@ -0,0 +1,116 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::lp::control::egress::connection::NestedNodeControlSender;
use crate::node::lp::directory::LpNodes;
use crate::node::lp::error::LpHandlerError;
use crate::node::lp::forwarding::{
ConnectionControllerResponse, ConnectionHandlerResponse, ControllerResponse,
GetConnectionHandler, NestedConnectionControllerRequest,
};
use nym_topology::NodeId;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Notify;
use tracing::info;
pub const CONTROL_CHANNEL_SIZE: usize = 64;
pub(crate) enum NodeHandle {
Established(NestedNodeControlSender),
Pending(Arc<Notify>),
}
/// Keep track of connections to the exit gateway
pub struct NestedConnectionsController {
/// Handle channel for sending requests to this controller
sender: super::NodeConnectionControllerSender,
/// Channel for receiving requests in this controller
receiver: super::NodeConnectionControllerReceiver,
/// Map of all LP node ip addresses to their details (and ids)
lp_nodes: LpNodes,
/// Handles to the active nested node connections
nodes_handles: HashMap<NodeId, NodeHandle>,
/// Shutdown token
shutdown: nym_task::ShutdownToken,
}
impl NestedConnectionsController {
pub fn new(lp_nodes: LpNodes, shutdown: nym_task::ShutdownToken) -> Self {
let (sender, receiver) = tokio::sync::mpsc::channel(CONTROL_CHANNEL_SIZE);
Self {
sender,
receiver,
lp_nodes,
nodes_handles: HashMap::new(),
shutdown,
}
}
pub fn request_sender(&self) -> super::NodeConnectionControllerSender {
self.sender.clone()
}
async fn handle_get_connection_handler(
&mut self,
request: GetConnectionHandler,
) -> ConnectionHandlerResponse {
let ip = request.target_gateway_lp_address.ip();
let Some(node_id) = self.lp_nodes.get_node_id(ip) else {
return Err(LpHandlerError::NotLpNode { ip_addr: ip });
};
match self.nodes_handles.get(&node_id) {
Some(NodeHandle::Established(handle)) => {
todo!()
}
Some(NodeHandle::Pending(notify)) => {
Ok(ConnectionControllerResponse::Pending(notify.clone()))
}
None => {
let (res, notify) = ConnectionControllerResponse::new_pending();
self.nodes_handles
.insert(node_id, NodeHandle::Pending(notify.clone()));
// create a new connection and return a pending response
todo!();
return Ok(res);
}
}
}
async fn handle_request(&mut self, request: NestedConnectionControllerRequest) {
match request {
NestedConnectionControllerRequest::ConnectionHandler {
request,
response_tx,
} => {
response_tx
.send(self.handle_get_connection_handler(request).await)
.ok();
}
}
}
pub async fn run(&mut self) {
loop {
tokio::select! {
biased;
_ = self.shutdown.cancelled() => {
break;
}
Some(request) = self.receiver.recv() => {
self.handle_request(request).await;
}
}
}
info!("Nested connection controller shutdown complete");
}
}
@@ -0,0 +1,73 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use super::{
ConnectionControllerResponse, ConnectionHandlerResponse, GetConnectionHandler,
NestedConnectionControllerRequest, NodeConnectionControllerSender,
};
use crate::node::lp::error::LpHandlerError;
use crate::node::lp::forwarding::client_connection::NestedClientConnection;
use nym_lp::peer_config::LpReceiverIndex;
use std::net::SocketAddr;
use tokio::sync::oneshot;
#[derive(Clone)]
pub struct NestedConnectionsManager {
sender: NodeConnectionControllerSender,
}
impl NestedConnectionsManager {
pub fn new(sender: NodeConnectionControllerSender) -> Self {
Self { sender }
}
async fn send_connection_handler_request(
&self,
request: GetConnectionHandler,
) -> Result<ConnectionHandlerResponse, LpHandlerError> {
let (response_tx, response_rx) = oneshot::channel();
self.sender
.send(NestedConnectionControllerRequest::ConnectionHandler {
request,
response_tx,
})
.await
.map_err(|_| LpHandlerError::internal("nested connection controller shut down"))?;
response_rx.await.map_err(|_| {
LpHandlerError::internal("nested connection controller hasn't send a response")
})
}
pub(crate) async fn get_connection_handle(
&self,
target_gateway_lp_address: SocketAddr,
inner_receiver_index: LpReceiverIndex,
) -> Result<NestedClientConnection, LpHandlerError> {
let request = GetConnectionHandler {
target_gateway_lp_address,
inner_receiver_index,
};
let notify = match self.send_connection_handler_request(request).await?? {
// if we have received a ready response, we can return the connection
ConnectionControllerResponse::Ready(conn) => return Ok(conn),
// otherwise we need to wait for the notification when it becomes available
ConnectionControllerResponse::Pending(notify) => notify,
};
// TODO: timeout
notify.notified().await;
match self.send_connection_handler_request(request).await?? {
// if we have received a ready response, we can return the connection
ConnectionControllerResponse::Ready(conn) => Ok(conn),
// otherwise we need to wait for the notification when it becomes available
ConnectionControllerResponse::Pending(_) => Err(LpHandlerError::internal(
"unavailable connection handler after successful notification",
)),
}
}
}
+58
View File
@@ -0,0 +1,58 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::lp::error::LpHandlerError;
use crate::node::lp::forwarding::client_connection::NestedClientConnection;
use nym_lp::peer_config::LpReceiverIndex;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{Notify, oneshot};
pub mod client_connection;
pub mod controller;
pub mod manager;
pub type NodeConnectionControllerReceiver = Receiver<NestedConnectionControllerRequest>;
pub type NodeConnectionControllerSender = Sender<NestedConnectionControllerRequest>;
pub(crate) enum ConnectionControllerResponse<T> {
/// The response is immediately available
Ready(T),
/// The response is in the process of being resolved. It will be ready once the returned
/// notify resolves. At this point the caller should repeat the query
Pending(Arc<Notify>),
}
impl<T> ConnectionControllerResponse<T> {
pub fn new_pending() -> (Self, Arc<Notify>) {
let notify = Arc::new(Notify::new());
(
ConnectionControllerResponse::Pending(notify.clone()),
notify,
)
}
}
pub type ControllerResponse<T> = Result<ConnectionControllerResponse<T>, LpHandlerError>;
pub type ConnectionHandlerResponse = ControllerResponse<NestedClientConnection>;
pub enum NestedConnectionControllerRequest {
/// Attempt to retrieve or create a handle to an exit gateway connection.
/// If the connection doesn't exist, it will be established
ConnectionHandler {
request: GetConnectionHandler,
response_tx: oneshot::Sender<ConnectionHandlerResponse>,
},
}
#[derive(Copy, Clone)]
pub(crate) struct GetConnectionHandler {
/// Target gateway's LP address
pub target_gateway_lp_address: SocketAddr,
/// Receiver index on the inner packet
pub inner_receiver_index: LpReceiverIndex,
}
+39 -10
View File
@@ -9,7 +9,7 @@
// ## Connection Metrics (via NetworkStats in nym-node-metrics)
// - active_lp_connections: Gauge tracking current active LP connections (incremented on accept, decremented on close)
//
// ## Handler Metrics (in handler.rs)
// ## Handler Metrics (in client_handler)
// - lp_connections_total: Counter for total LP connections handled
// - lp_client_hello_failed: Counter for ClientHello failures (timestamp validation, protocol errors)
// - lp_handshakes_success: Counter for successful handshake completions
@@ -46,7 +46,7 @@
// ## Error Categorization Metrics
// - lp_errors_wg_peer_registration: Counter for WireGuard peer registration failures
//
// ## Connection Lifecycle Metrics (in handler.rs)
// ## Connection Lifecycle Metrics (in client_handler)
// - lp_connection_duration_seconds: Histogram of connection duration from start to end (buckets: 1s to 24h)
// - lp_connection_bytes_received_total: Counter for total bytes received including protocol framing
// - lp_connection_bytes_sent_total: Counter for total bytes sent including protocol framing
@@ -58,7 +58,7 @@
// - lp_states_cleanup_session_removed: Counter for stale sessions removed by cleanup task
// - lp_states_cleanup_demoted_removed: Counter for demoted (read-only) sessions removed by cleanup task
//
// ## Subsession/Rekeying Metrics (in handler.rs)
// ## Subsession/Rekeying Metrics (in client_handler)
// - lp_subsession_kk2_sent: Counter for SubsessionKK2 responses sent (indicates client initiated rekeying)
// - lp_subsession_complete: Counter for successful subsession promotions
// - lp_subsession_receiver_index_collision: Counter for subsession receiver_index collisions
@@ -70,9 +70,8 @@
use crate::config::LpConfig;
use crate::error::NymNodeError;
use crate::node::lp::cleanup::CleanupTask;
use crate::node::lp::control::listener::LpControlListener;
use crate::node::lp::control::ingress::listener::LpControlListener;
use crate::node::lp::data::listener::LpDataListener;
use dashmap::DashMap;
use nym_gateway::node::wireguard::PeerRegistrator;
use nym_lp::peer::LpLocalPeer;
use nym_mixnet_client::forwarder::MixForwardingSender;
@@ -82,13 +81,19 @@ use std::sync::Arc;
use tokio::sync::Semaphore;
use tracing::error;
use crate::node::lp::directory::LpNodes;
use crate::node::lp::forwarding::controller::NestedConnectionsController;
use crate::node::lp::forwarding::manager::NestedConnectionsManager;
use crate::node::lp::state::{ActiveLpSessions, SharedLpNodeControlState};
pub use nym_mixnet_client::forwarder::{MixForwardingReceiver, mix_forwarding_channels};
pub use state::{SharedLpControlState, SharedLpDataState, SharedLpState};
pub use state::{SharedLpClientControlState, SharedLpDataState, SharedLpState};
mod cleanup;
pub mod control;
mod data;
pub mod directory;
pub mod error;
pub mod forwarding;
mod registration;
pub mod state;
@@ -96,6 +101,7 @@ pub struct LpSetup {
control_listener: LpControlListener,
data_listener: LpDataListener,
cleanup_task: CleanupTask,
nested_connections_controller: NestedConnectionsController,
/// Shutdown coordination
shutdown: ShutdownTracker,
@@ -107,11 +113,17 @@ impl LpSetup {
lp_config: LpConfig,
metrics: NymNodeMetrics,
peer_registrator: Option<PeerRegistrator>,
network_nodes: LpNodes,
mix_packet_sender: MixForwardingSender,
shutdown: ShutdownTracker,
) -> Result<Self, NymNodeError> {
// TODO: this will require loading old states from disk in the future
let session_states = Arc::new(DashMap::new());
let session_states = ActiveLpSessions::new();
let nested_connections_controller = NestedConnectionsController::new(
network_nodes.clone(),
shutdown.clone_shutdown_token(),
);
let shared_lp_state = SharedLpState {
metrics,
@@ -119,13 +131,22 @@ impl LpSetup {
session_states: session_states.clone(),
};
let control_state = SharedLpControlState {
local_lp_peer,
let client_control_state = SharedLpClientControlState {
local_lp_peer: local_lp_peer.clone(),
peer_registrator,
nested_connections_manager: NestedConnectionsManager::new(
nested_connections_controller.request_sender(),
),
forward_semaphore: Arc::new(Semaphore::new(lp_config.debug.max_concurrent_forwards)),
shared: shared_lp_state.clone(),
};
let nodes_control_state = SharedLpNodeControlState {
local_lp_peer,
nodes: network_nodes,
shared: shared_lp_state.clone(),
};
let data_state = SharedLpDataState {
outbound_mix_sender: mix_packet_sender,
shared: shared_lp_state,
@@ -133,7 +154,8 @@ impl LpSetup {
let control_listener = LpControlListener::new(
lp_config.control_bind_address,
control_state,
client_control_state,
nodes_control_state,
shutdown.clone(),
);
let data_listener = LpDataListener::new(
@@ -151,6 +173,7 @@ impl LpSetup {
control_listener,
data_listener,
cleanup_task,
nested_connections_controller,
shutdown,
})
}
@@ -187,5 +210,11 @@ impl LpSetup {
async move { self.cleanup_task.run().await },
"LP::CleanupTask",
);
// nested connections controller
self.shutdown.try_spawn_named(
async move { self.nested_connections_controller.run().await },
"LP::NestedConnectionsController",
);
}
}
+2 -2
View File
@@ -1,7 +1,7 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::lp::state::SharedLpControlState;
use crate::node::lp::state::SharedLpClientControlState;
use nym_lp::peer_config::LpReceiverIndex;
use nym_metrics::{add_histogram_obs, inc};
use nym_registration_common::dvpn::{
@@ -29,7 +29,7 @@ const LP_REGISTRATION_DURATION_BUCKETS: &[f64] = &[
30.0, // 30s
];
impl SharedLpControlState {
impl SharedLpClientControlState {
async fn process_dvpn_initial_registration(
&self,
sender: LpReceiverIndex,
+57 -6
View File
@@ -3,7 +3,10 @@
use crate::config::LpConfig;
use crate::node::lp::cleanup::TimestampedState;
use crate::node::lp::directory::LpNodes;
use crate::node::lp::error::LpHandlerError;
use dashmap::DashMap;
use dashmap::mapref::one::RefMut;
use nym_gateway::node::wireguard::PeerRegistrator;
use nym_lp::LpTransportSession;
use nym_lp::peer::LpLocalPeer;
@@ -13,27 +16,46 @@ use nym_node_metrics::NymNodeMetrics;
use std::sync::Arc;
use tokio::sync::Semaphore;
/// Shared state for LP control connections
pub use crate::node::lp::forwarding::manager::NestedConnectionsManager;
/// Shared state for LP clients control connections
#[derive(Clone)]
pub struct SharedLpControlState {
pub struct SharedLpClientControlState {
/// Encapsulates all required key information of a local Lewes Protocol Peer.
pub local_lp_peer: LpLocalPeer,
/// Handle registering new wireguard peers
pub peer_registrator: Option<PeerRegistrator>,
/// Controller for obtaining handles to forwarding channels
pub nested_connections_manager: NestedConnectionsManager,
/// Semaphore limiting concurrent forward connections
///
/// Prevents file descriptor exhaustion when forwarding LP packets during
/// telescope setup. When at capacity, forward requests return an error
/// so clients can choose a different gateway.
// this is temporary until there is persistent KKT/PSQ session between nodes
// #[deprecated]
pub forward_semaphore: Arc<Semaphore>,
/// Common shared data
pub shared: SharedLpState,
}
/// Shared state for LP nodes control connections
#[derive(Clone)]
pub struct SharedLpNodeControlState {
/// Encapsulates all required key information of a local Lewes Protocol Peer.
pub local_lp_peer: LpLocalPeer,
/// Information about all known LP nodes
pub nodes: LpNodes,
/// Common shared data
pub shared: SharedLpState,
}
/// Shared state for LP data connections
#[derive(Clone)]
pub struct SharedLpDataState {
@@ -48,6 +70,37 @@ pub struct SharedLpDataState {
pub shared: SharedLpState,
}
/// Established sessions keyed by the receiver index
///
/// Wrapped in TimestampedState for TTL-based cleanup of inactive sessions.
#[derive(Clone, Default)]
pub struct ActiveLpSessions {
// TODO: this might require split between client and node sessions. TBD
pub(crate) sessions: Arc<DashMap<LpReceiverIndex, TimestampedState<LpTransportSession>>>,
}
impl ActiveLpSessions {
pub fn new() -> Self {
Self::default()
}
pub(crate) fn get_state_entry_mut(
&self,
receiver_index: LpReceiverIndex,
) -> Result<RefMut<'_, LpReceiverIndex, TimestampedState<LpTransportSession>>, LpHandlerError>
{
self.sessions
.get_mut(&receiver_index)
.ok_or_else(|| LpHandlerError::MissingLpSession { receiver_index })
}
pub(crate) fn insert_new_session(&self, session: LpTransportSession) {
let receiver_index = session.receiver_index();
self.sessions
.insert(receiver_index, TimestampedState::new(session));
}
}
/// Shared state for LP connection handlers
#[derive(Clone)]
pub struct SharedLpState {
@@ -57,8 +110,6 @@ pub struct SharedLpState {
/// LP configuration (for timestamp validation, etc.)
pub lp_config: LpConfig,
/// Established sessions keyed by receiver index
///
/// Wrapped in TimestampedState for TTL-based cleanup of inactive sessions.
pub session_states: Arc<DashMap<LpReceiverIndex, TimestampedState<LpTransportSession>>>,
/// Currently active LP sessions
pub session_states: ActiveLpSessions,
}
+7 -1
View File
@@ -84,6 +84,7 @@ use tokio::sync::mpsc;
use tracing::{debug, info, trace};
use zeroize::Zeroizing;
use crate::node::lp::directory::LpNodes;
pub use nym_gateway::node::ActiveClientsStore;
pub use nym_gateway::node::GatewayStorage;
@@ -491,6 +492,7 @@ impl NymNode {
&self,
peer_registrator: Option<PeerRegistrator>,
mix_packet_sender: MixForwardingSender,
network_nodes: LpNodes,
) -> Result<LpSetup, NymNodeError> {
let lp_peer = LpLocalPeer::new(Ciphersuite::default(), self.x25519_lp_keys.clone())
.with_kem_keys(self.psq_kem_keys.clone());
@@ -500,6 +502,7 @@ impl NymNode {
self.config.lp,
self.metrics.clone(),
peer_registrator,
network_nodes,
mix_packet_sender,
self.shutdown_manager.shutdown_tracker().clone(),
)
@@ -683,6 +686,7 @@ impl NymNode {
async fn start_gateway_tasks(
&mut self,
cached_network: CachedNetwork,
lp_nodes: LpNodes,
metrics_sender: MetricEventsSender,
active_clients_store: ActiveClientsStore,
mix_packet_sender: MixForwardingSender,
@@ -766,7 +770,7 @@ impl NymNode {
self.config.lp.control_bind_address, self.config.lp.data_bind_address,
);
let lp_tasks = self
.build_lp_tasks(wg_peer_registrator.clone(), mix_packet_sender)
.build_lp_tasks(wg_peer_registrator.clone(), mix_packet_sender, lp_nodes)
.await?;
lp_tasks.start_tasks();
} else {
@@ -1355,6 +1359,7 @@ impl NymNode {
let network_refresher = self.build_network_refresher().await?;
let active_clients_store = ActiveClientsStore::new();
let lp_nodes = network_refresher.lp_nodes();
let bloomfilters_manager = self.setup_replay_detection().await?;
@@ -1381,6 +1386,7 @@ impl NymNode {
self.start_gateway_tasks(
network_refresher.cached_network(),
lp_nodes,
metrics_sender,
active_clients_store,
mix_packet_sender,
+9
View File
@@ -3,6 +3,7 @@
use crate::error::NymNodeError;
use crate::node::key_rotation::active_keys::ActiveSphinxKeys;
use crate::node::lp::directory::LpNodes;
use crate::node::routing_filter::network_filter::NetworkRoutingFilter;
use async_trait::async_trait;
use nym_crypto::asymmetric::ed25519;
@@ -212,6 +213,7 @@ pub struct NetworkRefresher {
network: CachedNetwork,
routing_filter: NetworkRoutingFilter,
noise_view: NoiseNetworkView,
lp_nodes: LpNodes,
}
impl NetworkRefresher {
@@ -240,6 +242,7 @@ impl NetworkRefresher {
network: CachedNetwork::new_empty(),
routing_filter: NetworkRoutingFilter::new_empty(testnet),
noise_view: NoiseNetworkView::new_empty(),
lp_nodes: Default::default(),
};
this.obtain_initial_network().await?;
@@ -335,6 +338,8 @@ impl NetworkRefresher {
.collect::<HashMap<_, _>>();
self.noise_view.swap_view(noise_nodes);
warn!("unimplemented LP nodes update");
let mut network_guard = self.network.inner.write().await;
network_guard.topology_metadata = metadata.to_topology_metadata();
network_guard.network_nodes = nodes;
@@ -373,6 +378,10 @@ impl NetworkRefresher {
self.noise_view.clone()
}
pub(crate) fn lp_nodes(&self) -> LpNodes {
self.lp_nodes.clone()
}
pub(crate) async fn run(&mut self) {
let mut full_refresh_interval = interval(self.full_refresh_interval);
full_refresh_interval.reset();
@@ -14,6 +14,7 @@ use nym_crypto::asymmetric::{ed25519, x25519};
use nym_lp::LpTransportSession;
use nym_lp::peer::{DHKeyPair, LpLocalPeer, LpRemotePeer};
use nym_lp::peer_config::LpReceiverIndex;
use nym_lp::psq::initiator::HandshakeMode;
use nym_lp::transport::traits::LpTransportChannel;
use nym_lp::transport::{LpHandshakeChannel, LpTransportError};
use nym_lp::{Ciphersuite, packet::EncryptedLpPacket, packet::version};
@@ -396,7 +397,8 @@ where
local_peer,
remote_peer,
protocol_version,
)
HandshakeMode::OneWayEntry,
)?
.complete_handshake()
.await?;
@@ -28,6 +28,7 @@ use nym_crypto::asymmetric::{ed25519, x25519};
use nym_lp::packet::version;
use nym_lp::packet::{EncryptedLpPacket, LpMessage};
use nym_lp::peer::{DHKeyPair, LpLocalPeer, LpRemotePeer};
use nym_lp::psq::initiator::HandshakeMode;
use nym_lp::transport::LpHandshakeChannel;
use nym_lp::transport::traits::LpTransportChannel;
use nym_lp::{Ciphersuite, KEM, LpTransportSession};
@@ -185,7 +186,8 @@ impl NestedLpSession {
local_peer,
remote_peer,
protocol_version,
)
HandshakeMode::OneWayExit,
)?
.complete_handshake()
.await?;