Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0b97d2bf93 | |||
| 16928a48a9 | |||
| 24891adddf | |||
| 6a42a8dd49 | |||
| bc0c2e5d19 | |||
| 225178f95a | |||
| 32cfb3fff8 | |||
| f62a74a6af | |||
| fe9275274c |
Generated
+1
@@ -6833,6 +6833,7 @@ dependencies = [
|
||||
"nym-kkt-ciphersuite",
|
||||
"nym-kkt-context",
|
||||
"nym-pemstore",
|
||||
"nym-test-utils",
|
||||
"rand 0.9.2",
|
||||
"rand_chacha 0.9.0",
|
||||
"strum",
|
||||
|
||||
@@ -30,6 +30,7 @@ libcrux-ml-kem = { workspace = true }
|
||||
[dev-dependencies]
|
||||
rand_chacha = "0.9.0"
|
||||
anyhow = { workspace = true }
|
||||
nym-test-utils = { workspace = true }
|
||||
|
||||
|
||||
[lints]
|
||||
|
||||
+122
-59
@@ -16,10 +16,6 @@ pub use nym_kkt_context as context;
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use nym_kkt_ciphersuite::{Ciphersuite, HashFunction, HashLength, KEM, SignatureScheme};
|
||||
use rand09::RngCore;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use crate::keys::KEMKeys;
|
||||
use crate::{
|
||||
initiator::KKTInitiator,
|
||||
@@ -29,9 +25,13 @@ mod test {
|
||||
},
|
||||
responder::KKTResponder,
|
||||
};
|
||||
use nym_kkt_ciphersuite::{Ciphersuite, HashFunction, HashLength, KEM, SignatureScheme};
|
||||
use nym_test_utils::helpers::deterministic_rng_09;
|
||||
use rand09::RngCore;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
#[test]
|
||||
fn test_kkt_psq_e2e_encrypted_carrier() {
|
||||
fn test_kkt_psq_e2e_one_way_encrypted_carrier() {
|
||||
let mut rng = rand09::rng();
|
||||
|
||||
let mut payload: Vec<u8> = vec![0u8; 900_000];
|
||||
@@ -47,7 +47,6 @@ mod test {
|
||||
HashFunction::Shake256,
|
||||
] {
|
||||
// generate kem public keys
|
||||
|
||||
let responder_mlkem_keypair = generate_keypair_mlkem(&mut rng);
|
||||
let responder_mceliece_keypair = generate_keypair_mceliece(&mut rng);
|
||||
|
||||
@@ -64,20 +63,6 @@ mod test {
|
||||
HashLength::Default.value(),
|
||||
responder_kem.mc_eliece_encapsulation_key().as_ref(),
|
||||
);
|
||||
let initiator_mlkem_keypair = generate_keypair_mlkem(&mut rng);
|
||||
let initiator_mceliece_keypair = generate_keypair_mceliece(&mut rng);
|
||||
|
||||
let _i_dir_hash_mlkem = hash_encapsulation_key(
|
||||
hash_function,
|
||||
HashLength::Default.value(),
|
||||
initiator_mlkem_keypair.public_key().as_slice(),
|
||||
);
|
||||
|
||||
let _i_dir_hash_mceliece = hash_encapsulation_key(
|
||||
hash_function,
|
||||
HashLength::Default.value(),
|
||||
initiator_mceliece_keypair.pk.as_ref(),
|
||||
);
|
||||
|
||||
let init_hashes = BTreeMap::new();
|
||||
|
||||
@@ -128,41 +113,6 @@ mod test {
|
||||
responder_kem.ml_kem768_encapsulation_key().as_slice(),
|
||||
)
|
||||
}
|
||||
// Mutual - MlKem
|
||||
{
|
||||
let ciphersuite = Ciphersuite::resolve_ciphersuite(
|
||||
KEM::MlKem768,
|
||||
hash_function,
|
||||
SignatureScheme::Ed25519,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
let (mut initiator, request) = KKTInitiator::generate_one_way_request(
|
||||
&mut rng,
|
||||
ciphersuite,
|
||||
&responder_x25519_keypair.pk,
|
||||
&r_dir_hash_mlkem,
|
||||
1u8,
|
||||
Some(payload.clone()),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let processed_request = responder.process_request(request, payload.len()).unwrap();
|
||||
|
||||
assert_eq!(processed_request.request_payload, payload);
|
||||
|
||||
// if we keep unverified keys, this should change
|
||||
assert!(processed_request.remote_encapsulation_key.is_none());
|
||||
|
||||
let processed_response = initiator
|
||||
.process_response(processed_request.response, 0)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
processed_response.encapsulation_key.as_bytes(),
|
||||
responder_kem.ml_kem768_encapsulation_key().as_slice(),
|
||||
)
|
||||
}
|
||||
|
||||
// OneWay - McEliece
|
||||
{
|
||||
@@ -195,7 +145,110 @@ mod test {
|
||||
responder_kem.mc_eliece_encapsulation_key().as_ref()
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_kkt_psq_e2e_mutual_encrypted_carrier() {
|
||||
let mut rng = deterministic_rng_09();
|
||||
|
||||
let mut payload: Vec<u8> = vec![0u8; 50000];
|
||||
rng.fill_bytes(&mut payload);
|
||||
|
||||
// generate kem public keys
|
||||
let initiator_mlkem_keypair = generate_keypair_mlkem(&mut rng);
|
||||
let initiator_mceliece_keypair = generate_keypair_mceliece(&mut rng);
|
||||
|
||||
let responder_mlkem_keypair = generate_keypair_mlkem(&mut rng);
|
||||
let responder_mceliece_keypair = generate_keypair_mceliece(&mut rng);
|
||||
|
||||
let responder_x25519_keypair = generate_lp_keypair_x25519(&mut rng);
|
||||
|
||||
let initiator_kem = KEMKeys::new(initiator_mceliece_keypair, initiator_mlkem_keypair);
|
||||
let responder_kem = KEMKeys::new(responder_mceliece_keypair, responder_mlkem_keypair);
|
||||
|
||||
let init_hashes = initiator_kem.encapsulation_keys_digests();
|
||||
|
||||
let responder = KKTResponder::new(
|
||||
&responder_x25519_keypair,
|
||||
&responder_kem,
|
||||
&init_hashes,
|
||||
&[
|
||||
HashFunction::Blake3,
|
||||
HashFunction::SHA256,
|
||||
HashFunction::Shake128,
|
||||
HashFunction::Shake256,
|
||||
],
|
||||
&[SignatureScheme::Ed25519],
|
||||
&[1],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
for hash_function in [
|
||||
HashFunction::Blake3,
|
||||
HashFunction::SHA256,
|
||||
HashFunction::Shake128,
|
||||
HashFunction::Shake256,
|
||||
] {
|
||||
let r_dir_hash_mlkem = hash_encapsulation_key(
|
||||
hash_function,
|
||||
HashLength::Default.value(),
|
||||
responder_kem.ml_kem768_encapsulation_key().as_slice(),
|
||||
);
|
||||
|
||||
let r_dir_hash_mceliece = hash_encapsulation_key(
|
||||
hash_function,
|
||||
HashLength::Default.value(),
|
||||
responder_kem.mc_eliece_encapsulation_key().as_ref(),
|
||||
);
|
||||
|
||||
// Mutual - MlKem
|
||||
{
|
||||
let ciphersuite = Ciphersuite::resolve_ciphersuite(
|
||||
KEM::MlKem768,
|
||||
hash_function,
|
||||
SignatureScheme::Ed25519,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
let (mut initiator, request) = KKTInitiator::generate_mutual_request(
|
||||
&mut rng,
|
||||
ciphersuite,
|
||||
initiator_kem
|
||||
.encoded_encapsulation_key(KEM::MlKem768)
|
||||
.unwrap(),
|
||||
&responder_x25519_keypair.pk,
|
||||
&r_dir_hash_mlkem,
|
||||
1u8,
|
||||
Some(payload.clone()),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let processed_request = responder.process_request(request, payload.len()).unwrap();
|
||||
|
||||
assert_eq!(processed_request.request_payload, payload);
|
||||
assert_eq!(
|
||||
processed_request
|
||||
.remote_encapsulation_key
|
||||
.unwrap()
|
||||
.as_bytes(),
|
||||
initiator_kem
|
||||
.encapsulation_key(KEM::MlKem768)
|
||||
.unwrap()
|
||||
.as_bytes()
|
||||
);
|
||||
|
||||
let processed_response = initiator
|
||||
.process_response(processed_request.response, 0)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
processed_response.encapsulation_key.as_bytes(),
|
||||
responder_kem.ml_kem768_encapsulation_key().as_slice(),
|
||||
)
|
||||
}
|
||||
|
||||
// Mutual - McEliece is not supported due to the key being too large
|
||||
{
|
||||
let ciphersuite = Ciphersuite::resolve_ciphersuite(
|
||||
KEM::McEliece,
|
||||
@@ -204,9 +257,12 @@ mod test {
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
let (mut initiator, request) = KKTInitiator::generate_one_way_request(
|
||||
let (mut initiator, request) = KKTInitiator::generate_mutual_request(
|
||||
&mut rng,
|
||||
ciphersuite,
|
||||
initiator_kem
|
||||
.encoded_encapsulation_key(KEM::McEliece)
|
||||
.unwrap(),
|
||||
&responder_x25519_keypair.pk,
|
||||
&r_dir_hash_mceliece,
|
||||
1u8,
|
||||
@@ -217,9 +273,16 @@ mod test {
|
||||
let processed_request = responder.process_request(request, payload.len()).unwrap();
|
||||
|
||||
assert_eq!(processed_request.request_payload, payload);
|
||||
|
||||
// if we keep unverified keys, this should change
|
||||
assert!(processed_request.remote_encapsulation_key.is_none());
|
||||
assert_eq!(
|
||||
processed_request
|
||||
.remote_encapsulation_key
|
||||
.unwrap()
|
||||
.as_bytes(),
|
||||
initiator_kem
|
||||
.encapsulation_key(KEM::McEliece)
|
||||
.unwrap()
|
||||
.as_bytes()
|
||||
);
|
||||
|
||||
let processed_response = initiator
|
||||
.process_response(processed_request.response, 0)
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::LpError;
|
||||
use nym_kkt::keys::EncapsulationKey;
|
||||
@@ -143,6 +143,10 @@ impl LpRemotePeer {
|
||||
.ok_or(LpError::NoKnownKEMKeyDigests { kem, hash_function })
|
||||
.cloned()
|
||||
}
|
||||
|
||||
pub fn kem_key_digests(&self) -> &BTreeMap<KEM, KEMKeyDigests> {
|
||||
&self.expected_kem_key_digests
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DHPublicKey> for LpRemotePeer {
|
||||
|
||||
@@ -65,6 +65,7 @@ impl LpPeerConfig {
|
||||
rng.random(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Creates a new client to exit config.
|
||||
/// Inputs:
|
||||
/// hop_id: this value must be in the range (1..=15). This function returns an error if this is not the case.
|
||||
@@ -79,6 +80,7 @@ impl LpPeerConfig {
|
||||
{
|
||||
Self::new(rng, hop_id, true, false, censorship_resistance)
|
||||
}
|
||||
|
||||
/// Creates a new client to an intermediate node config.
|
||||
/// Inputs:
|
||||
/// hop_id: this value must be in the range (1..=14). This function returns an error if this is not the case.
|
||||
@@ -130,6 +132,7 @@ impl LpPeerConfig {
|
||||
rng.random(),
|
||||
)
|
||||
}
|
||||
|
||||
fn build(
|
||||
hop_id: u8,
|
||||
is_exit: bool,
|
||||
@@ -147,6 +150,7 @@ impl LpPeerConfig {
|
||||
seed,
|
||||
}
|
||||
}
|
||||
|
||||
fn build_checked(
|
||||
hop_id: u8,
|
||||
is_exit: bool,
|
||||
@@ -203,6 +207,7 @@ impl LpPeerConfig {
|
||||
output_bytes[4..].copy_from_slice(&self.seed);
|
||||
output_bytes
|
||||
}
|
||||
|
||||
pub fn deserialize(bytes: &[u8]) -> Result<Self, LpError> {
|
||||
if bytes.len() != LP_PEER_CONFIG_SIZE {
|
||||
return Err(LpError::DeserializationError(format!(
|
||||
|
||||
@@ -24,10 +24,31 @@ use nym_kkt::message::{KKTRequest, KKTResponse};
|
||||
use rand09::SeedableRng;
|
||||
use tracing::debug;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum HandshakeMode {
|
||||
// Client <> Entry
|
||||
OneWayEntry,
|
||||
|
||||
// Client <> Exit
|
||||
OneWayExit,
|
||||
|
||||
// Entry <> Exit
|
||||
MutualInternode,
|
||||
// in the future more variants will be supported (such as individual mix hops)
|
||||
}
|
||||
|
||||
impl HandshakeMode {
|
||||
pub fn is_mutual(&self) -> bool {
|
||||
matches!(self, HandshakeMode::MutualInternode)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PSQHandshakeStateInitiator<'a, S> {
|
||||
pub(super) inner_state: PSQHandshakeState<'a, S>,
|
||||
pub(super) initiator_data: InitiatorData,
|
||||
pub(super) mutual: bool,
|
||||
|
||||
/// The mode of the handshake (mutual node-node, client-entry, entry-exit)
|
||||
pub(super) mode: HandshakeMode,
|
||||
}
|
||||
|
||||
pub(crate) fn build_psq_principal<R>(
|
||||
@@ -78,13 +99,23 @@ impl<'a, S> PSQHandshakeStateInitiator<'a, S>
|
||||
where
|
||||
S: LpHandshakeChannel + Unpin,
|
||||
{
|
||||
pub fn set_mutual_kkt(mut self) -> Result<Self, LpError> {
|
||||
if self.inner_state.local_peer.kem_keypairs.is_none() {
|
||||
return Err(LpError::PSQMutualInitiatorMissingKemKey);
|
||||
}
|
||||
fn lp_peer_config<R>(&self, rng: &mut R) -> Result<LpPeerConfig, LpError>
|
||||
where
|
||||
R: rand09::CryptoRng,
|
||||
{
|
||||
// for now we don't support censorship resistance flag
|
||||
let censorship_resistance = false;
|
||||
|
||||
self.mutual = true;
|
||||
Ok(self)
|
||||
match self.mode {
|
||||
HandshakeMode::OneWayEntry => Ok(LpPeerConfig::new_client_to_entry(
|
||||
rng,
|
||||
censorship_resistance,
|
||||
)),
|
||||
HandshakeMode::OneWayExit => {
|
||||
LpPeerConfig::new_client_to_exit(rng, 1, censorship_resistance)
|
||||
}
|
||||
HandshakeMode::MutualInternode => LpPeerConfig::new_node_to_node(rng),
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to send KKT request to begin the handshake
|
||||
@@ -132,7 +163,7 @@ where
|
||||
let ciphersuite = self.inner_state.local_peer.ciphersuite();
|
||||
let kem = ciphersuite.kem();
|
||||
|
||||
let lp_peer_config = LpPeerConfig::new_client_to_entry(rng, false);
|
||||
let lp_peer_config = self.lp_peer_config(rng)?;
|
||||
|
||||
// 1. retrieve the expected kem key hash. if we don't know it,
|
||||
let dir_hash = self
|
||||
@@ -141,7 +172,7 @@ where
|
||||
.expected_kem_key_hash(ciphersuite)?;
|
||||
|
||||
// 2. prepare and send KKT request
|
||||
let (mut initiator, kkt_request) = if self.mutual {
|
||||
let (mut initiator, kkt_request) = if self.mode.is_mutual() {
|
||||
// this has been verified when setting the mutual flag
|
||||
let Some(local_encapsulation_key) = self.inner_state.local_peer.encoded_kem_key(kem)
|
||||
else {
|
||||
@@ -273,8 +304,8 @@ mod tests {
|
||||
resp.ciphersuite = ciphersuite;
|
||||
let initiator_data = InitiatorData::new(1, resp_remote);
|
||||
|
||||
let handshake_init =
|
||||
PSQHandshakeState::new(conn_init, init).as_initiator(initiator_data);
|
||||
let handshake_init = PSQHandshakeState::new(conn_init, init)
|
||||
.as_initiator(initiator_data, HandshakeMode::OneWayEntry)?;
|
||||
|
||||
let mut init_rng = DeterministicRng09Send::new(u64_seeded_rng_09(1));
|
||||
|
||||
@@ -396,8 +427,7 @@ mod tests {
|
||||
let initiator_data = InitiatorData::new(1, resp_remote);
|
||||
|
||||
let handshake_init = PSQHandshakeState::new(conn_init, init)
|
||||
.as_initiator(initiator_data)
|
||||
.set_mutual_kkt()?;
|
||||
.as_initiator(initiator_data, HandshakeMode::MutualInternode)?;
|
||||
|
||||
let mut init_rng = DeterministicRng09Send::new(u64_seeded_rng_09(1));
|
||||
|
||||
|
||||
@@ -12,6 +12,8 @@ mod helpers;
|
||||
pub mod initiator;
|
||||
pub mod responder;
|
||||
|
||||
use crate::LpError;
|
||||
use crate::psq::initiator::HandshakeMode;
|
||||
pub use initiator::PSQHandshakeStateInitiator;
|
||||
pub use responder::PSQHandshakeStateResponder;
|
||||
|
||||
@@ -107,12 +109,20 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_initiator(self, initiator_data: InitiatorData) -> PSQHandshakeStateInitiator<'a, S> {
|
||||
PSQHandshakeStateInitiator {
|
||||
pub fn as_initiator(
|
||||
self,
|
||||
initiator_data: InitiatorData,
|
||||
mode: HandshakeMode,
|
||||
) -> Result<PSQHandshakeStateInitiator<'a, S>, LpError> {
|
||||
if mode.is_mutual() && self.local_peer.kem_keypairs.is_none() {
|
||||
return Err(LpError::PSQMutualInitiatorMissingKemKey);
|
||||
}
|
||||
|
||||
Ok(PSQHandshakeStateInitiator {
|
||||
initiator_data,
|
||||
inner_state: self,
|
||||
mutual: false,
|
||||
}
|
||||
mode,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn as_responder(self, responder_data: ResponderData) -> PSQHandshakeStateResponder<'a, S> {
|
||||
@@ -160,8 +170,10 @@ mod tests {
|
||||
resp.ciphersuite = ciphersuite;
|
||||
let resp_remote = resp.as_remote();
|
||||
|
||||
let handshake_init = PSQHandshakeState::new(conn_init, init)
|
||||
.as_initiator(InitiatorData::new(1, resp_remote));
|
||||
let handshake_init = PSQHandshakeState::new(conn_init, init).as_initiator(
|
||||
InitiatorData::new(1, resp_remote),
|
||||
HandshakeMode::OneWayEntry,
|
||||
)?;
|
||||
let handshake_resp =
|
||||
PSQHandshakeState::new(conn_resp, resp).as_responder(ResponderData::default());
|
||||
|
||||
@@ -232,9 +244,10 @@ mod tests {
|
||||
let resp_remote = resp.as_remote();
|
||||
let init_remote = init.as_remote();
|
||||
|
||||
let handshake_init = PSQHandshakeState::new(conn_init, init)
|
||||
.as_initiator(InitiatorData::new(1, resp_remote))
|
||||
.set_mutual_kkt()?;
|
||||
let handshake_init = PSQHandshakeState::new(conn_init, init).as_initiator(
|
||||
InitiatorData::new(1, resp_remote),
|
||||
HandshakeMode::MutualInternode,
|
||||
)?;
|
||||
let handshake_resp = PSQHandshakeState::new(conn_resp, resp).as_responder(
|
||||
ResponderData::default()
|
||||
.with_initiator_kem_hashes(init_remote.expected_kem_key_digests),
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::codec::{decrypt_lp_packet, encrypt_lp_packet};
|
||||
use crate::packet::{EncryptedLpPacket, LpHeader, LpMessage, LpPacket};
|
||||
use crate::peer::{LpLocalPeer, LpRemotePeer};
|
||||
use crate::peer_config::LpReceiverIndex;
|
||||
use crate::psq::initiator::HandshakeMode;
|
||||
use crate::psq::{
|
||||
InitiatorData, PSQHandshakeState, PSQHandshakeStateInitiator, PSQHandshakeStateResponder,
|
||||
ResponderData,
|
||||
@@ -19,6 +20,8 @@ use crate::{LpError, replay::ReceivingKeyCounterValidator};
|
||||
use libcrux_psq::handshake::types::{Authenticator, DHPublicKey};
|
||||
use libcrux_psq::session::{Session, SessionBinding};
|
||||
use nym_kkt::keys::EncapsulationKey;
|
||||
use nym_kkt_ciphersuite::{KEM, KEMKeyDigests};
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
|
||||
/// Represents inputs that drive the state machine transitions.
|
||||
@@ -154,12 +157,34 @@ impl LpTransportSession {
|
||||
local_peer: LpLocalPeer,
|
||||
remote_peer: LpRemotePeer,
|
||||
remote_protocol_version: u8,
|
||||
) -> PSQHandshakeStateInitiator<'_, S>
|
||||
mode: HandshakeMode,
|
||||
) -> Result<PSQHandshakeStateInitiator<'_, S>, LpError>
|
||||
where
|
||||
S: LpHandshakeChannel + Unpin,
|
||||
{
|
||||
PSQHandshakeState::new(connection, local_peer)
|
||||
.as_initiator(InitiatorData::new(remote_protocol_version, remote_peer))
|
||||
PSQHandshakeState::new(connection, local_peer).as_initiator(
|
||||
InitiatorData::new(remote_protocol_version, remote_peer),
|
||||
mode,
|
||||
)
|
||||
}
|
||||
|
||||
/// Helper function to create `PSQHandshakeState` for the handshake initiator for mutual KKT
|
||||
pub fn psq_handshake_initiator_mutual_internode<S>(
|
||||
connection: &'_ mut S,
|
||||
local_peer: LpLocalPeer,
|
||||
remote_peer: LpRemotePeer,
|
||||
remote_protocol_version: u8,
|
||||
) -> Result<PSQHandshakeStateInitiator<'_, S>, LpError>
|
||||
where
|
||||
S: LpHandshakeChannel + Unpin,
|
||||
{
|
||||
Self::psq_handshake_initiator(
|
||||
connection,
|
||||
local_peer,
|
||||
remote_peer,
|
||||
remote_protocol_version,
|
||||
HandshakeMode::MutualInternode,
|
||||
)
|
||||
}
|
||||
|
||||
/// Helper function to create `PSQHandshakeState` for the handshake responder
|
||||
@@ -173,6 +198,19 @@ impl LpTransportSession {
|
||||
PSQHandshakeState::new(connection, local_peer).as_responder(ResponderData::default())
|
||||
}
|
||||
|
||||
/// Helper function to create `PSQHandshakeState` for the handshake responder for mutual KKT
|
||||
pub fn psq_handshake_responder_mutual<S>(
|
||||
connection: &'_ mut S,
|
||||
local_peer: LpLocalPeer,
|
||||
initiator_kem_hashes: BTreeMap<KEM, KEMKeyDigests>,
|
||||
) -> PSQHandshakeStateResponder<'_, S>
|
||||
where
|
||||
S: LpHandshakeChannel + Unpin,
|
||||
{
|
||||
PSQHandshakeState::new(connection, local_peer)
|
||||
.as_responder(ResponderData::default().with_initiator_kem_hashes(initiator_kem_hashes))
|
||||
}
|
||||
|
||||
pub fn session_binding(&self) -> &PersistentSessionBinding {
|
||||
&self.session_binding
|
||||
}
|
||||
|
||||
@@ -17,9 +17,10 @@ mod tests {
|
||||
use nym_lp::peer::LpLocalPeer;
|
||||
use nym_node::config::{LpConfig, LpDebug};
|
||||
use nym_node::node::GatewayStorage;
|
||||
use nym_node::node::lp::control::handler::LpConnectionHandler;
|
||||
use nym_node::node::lp::control::ingress::client_handler::LpClientConnectionHandler;
|
||||
use nym_node::node::lp::error::LpHandlerError;
|
||||
use nym_node::node::lp::{SharedLpControlState, SharedLpState};
|
||||
use nym_node::node::lp::state::{ActiveLpSessions, NestedConnectionsManager};
|
||||
use nym_node::node::lp::{SharedLpClientControlState, SharedLpState};
|
||||
use nym_node::wireguard::{PeerManager, PeerRegistrator};
|
||||
use nym_registration_client::{LpClientError, LpRegistrationClient};
|
||||
use nym_test_utils::helpers::{CryptoRng09, seeded_rng};
|
||||
@@ -35,7 +36,7 @@ mod tests {
|
||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::sync::mpsc::{Receiver, channel};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::error;
|
||||
@@ -120,7 +121,7 @@ mod tests {
|
||||
enum SpawnedLpConnectionHandlerState {
|
||||
NotCreated,
|
||||
Ready {
|
||||
handler: LpConnectionHandler<MockIOStream>,
|
||||
handler: LpClientConnectionHandler<MockIOStream>,
|
||||
},
|
||||
Running {
|
||||
handle: JoinHandle<Option<Result<(), LpHandlerError>>>,
|
||||
@@ -130,7 +131,7 @@ mod tests {
|
||||
|
||||
struct Gateway {
|
||||
base: Party,
|
||||
lp_state: SharedLpControlState,
|
||||
lp_state: SharedLpClientControlState,
|
||||
ip_pool: IpPool,
|
||||
mock_peer_controller: SpawnedPeerController,
|
||||
|
||||
@@ -216,6 +217,9 @@ mod tests {
|
||||
let (mock_peer_controller, peer_controller_state) =
|
||||
mock_peer_controller(peer_request_rx);
|
||||
|
||||
let (connection_ctrl_sender, _connection_manager_receiver) = channel(42);
|
||||
let nested_connections_manager = NestedConnectionsManager::new(connection_ctrl_sender);
|
||||
|
||||
// registering particular responses for peer controller is up to given test
|
||||
let ecash_verifier = Arc::new(ecash_verifier);
|
||||
|
||||
@@ -225,7 +229,7 @@ mod tests {
|
||||
upgrade_mode_details,
|
||||
);
|
||||
|
||||
let lp_state = SharedLpControlState {
|
||||
let lp_state = SharedLpClientControlState {
|
||||
local_lp_peer: base.peer.clone(),
|
||||
|
||||
forward_semaphore,
|
||||
@@ -235,8 +239,9 @@ mod tests {
|
||||
shared: SharedLpState {
|
||||
metrics: Default::default(),
|
||||
lp_config,
|
||||
session_states: Arc::new(Default::default()),
|
||||
session_states: ActiveLpSessions::new(),
|
||||
},
|
||||
nested_connections_manager,
|
||||
};
|
||||
|
||||
Ok(Gateway {
|
||||
@@ -262,7 +267,7 @@ mod tests {
|
||||
};
|
||||
|
||||
self.lp_connection_handler = SpawnedLpConnectionHandlerState::Ready {
|
||||
handler: LpConnectionHandler::new(
|
||||
handler: LpClientConnectionHandler::new(
|
||||
client_connection,
|
||||
client_address,
|
||||
self.lp_state.clone(),
|
||||
@@ -290,7 +295,7 @@ mod tests {
|
||||
}
|
||||
|
||||
fn spawn_lp_handler(&mut self) {
|
||||
let SpawnedLpConnectionHandlerState::Ready { handler } = mem::replace(
|
||||
let SpawnedLpConnectionHandlerState::Ready { mut handler } = mem::replace(
|
||||
&mut self.lp_connection_handler,
|
||||
SpawnedLpConnectionHandlerState::NotCreated,
|
||||
) else {
|
||||
|
||||
@@ -139,6 +139,7 @@ harness = false
|
||||
cargo_metadata = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
nym-lp = { workspace = true, features = ["mock"] }
|
||||
criterion = { workspace = true, features = ["async_tokio"] }
|
||||
nym-test-utils = { workspace = true }
|
||||
|
||||
|
||||
@@ -16,7 +16,14 @@ pub struct NetworkStats {
|
||||
// the call stack
|
||||
active_egress_mixnet_connections: Arc<AtomicUsize>,
|
||||
|
||||
active_lp_connections: AtomicUsize,
|
||||
// incoming LP control connections from clients
|
||||
active_lp_ingress_client_connections: AtomicUsize,
|
||||
|
||||
// incoming LP control connections from nodes
|
||||
active_lp_ingress_node_connections: AtomicUsize,
|
||||
|
||||
// outgoing LP control connections to nodes
|
||||
active_lp_egress_node_connections: AtomicUsize,
|
||||
}
|
||||
|
||||
impl NetworkStats {
|
||||
@@ -59,15 +66,38 @@ impl NetworkStats {
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn new_lp_connection(&self) {
|
||||
self.active_lp_connections.fetch_add(1, Ordering::Relaxed);
|
||||
pub fn new_ingress_lp_client_connection(&self) {
|
||||
self.active_lp_ingress_client_connections
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn lp_connection_closed(&self) {
|
||||
self.active_lp_connections.fetch_sub(1, Ordering::Relaxed);
|
||||
pub fn closed_ingress_lp_client_connection(&self) {
|
||||
self.active_lp_ingress_client_connections
|
||||
.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn active_lp_connections_count(&self) -> usize {
|
||||
self.active_lp_connections.load(Ordering::Relaxed)
|
||||
pub fn new_ingress_lp_node_connection(&self) {
|
||||
self.active_lp_ingress_node_connections
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn closed_ingress_lp_node_connection(&self) {
|
||||
self.active_lp_ingress_node_connections
|
||||
.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn new_egress_lp_node_connection(&self) {
|
||||
self.active_lp_egress_node_connections
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn closed_egress_lp_node_connection(&self) {
|
||||
self.active_lp_egress_node_connections
|
||||
.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn active_lp_client_connections_count(&self) -> usize {
|
||||
self.active_lp_ingress_client_connections
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,11 +2,8 @@
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::config::LpDebug;
|
||||
use dashmap::DashMap;
|
||||
use nym_lp::LpTransportSession;
|
||||
use nym_lp::peer_config::LpReceiverIndex;
|
||||
use crate::node::lp::state::ActiveLpSessions;
|
||||
use nym_metrics::inc_by;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tracing::{debug, info};
|
||||
|
||||
@@ -74,14 +71,14 @@ impl<T> TimestampedState<T> {
|
||||
}
|
||||
|
||||
pub(crate) struct CleanupTask {
|
||||
session_states: Arc<DashMap<LpReceiverIndex, TimestampedState<LpTransportSession>>>,
|
||||
session_states: ActiveLpSessions,
|
||||
cfg: LpDebug,
|
||||
shutdown: nym_task::ShutdownToken,
|
||||
}
|
||||
|
||||
impl CleanupTask {
|
||||
pub fn new(
|
||||
session_states: Arc<DashMap<LpReceiverIndex, TimestampedState<LpTransportSession>>>,
|
||||
session_states: ActiveLpSessions,
|
||||
cfg: LpDebug,
|
||||
shutdown: nym_task::ShutdownToken,
|
||||
) -> Self {
|
||||
@@ -100,7 +97,7 @@ impl CleanupTask {
|
||||
|
||||
// Remove stale sessions (based on time since last activity)
|
||||
// Use shorter TTL for demoted (ReadOnlyTransport) sessions
|
||||
self.session_states.retain(|_, timestamped| {
|
||||
self.session_states.sessions.retain(|_, timestamped| {
|
||||
if timestamped.since_activity() > session_ttl {
|
||||
ss_removed += 1;
|
||||
false
|
||||
|
||||
@@ -0,0 +1,168 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::node::lp::control::LpConnectionStats;
|
||||
use crate::node::lp::directory::LpNodeDetails;
|
||||
use crate::node::lp::error::LpHandlerError;
|
||||
use crate::node::lp::forwarding::client_connection::NestedClientConnectionSender;
|
||||
use crate::node::lp::state::SharedLpNodeControlState;
|
||||
use nym_lp::LpTransportSession;
|
||||
use nym_lp::peer_config::LpReceiverIndex;
|
||||
use nym_lp::transport::{LpHandshakeChannel, LpTransportChannel};
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
pub(crate) type NestedNodeConnectionSender = ();
|
||||
pub(crate) type NestedNodeConnectionReceiver = ();
|
||||
|
||||
pub(crate) type NestedNodeControlSender = ();
|
||||
pub(crate) type NestedNodeControlReceiver = ();
|
||||
|
||||
/// Initial connection handler for an egress LP node before completing the KKT/PSQ handshake.
|
||||
|
||||
pub struct InitialLpEgressNodeConnectionHandler<S> {
|
||||
stream: S,
|
||||
remote_addr: SocketAddr,
|
||||
responder_details: LpNodeDetails,
|
||||
|
||||
state: SharedLpNodeControlState,
|
||||
stats: LpConnectionStats,
|
||||
}
|
||||
|
||||
impl<S> InitialLpEgressNodeConnectionHandler<S>
|
||||
where
|
||||
S: LpHandshakeChannel + LpHandshakeChannel + Unpin,
|
||||
{
|
||||
pub(crate) fn new(
|
||||
stream: S,
|
||||
remote_addr: SocketAddr,
|
||||
responder_details: LpNodeDetails,
|
||||
state: SharedLpNodeControlState,
|
||||
) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
remote_addr,
|
||||
responder_details,
|
||||
state,
|
||||
stats: LpConnectionStats::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn complete_initial_handshake(
|
||||
mut self,
|
||||
) -> Option<Result<LpTransportSession, LpHandlerError>> {
|
||||
let remote = self.remote_addr;
|
||||
|
||||
if self.responder_details.kem_key_hashes.is_empty() {
|
||||
return Some(Err(LpHandlerError::MissingNodeKEMKeyHashes {
|
||||
node_ip: self.remote_addr.ip(),
|
||||
node_id: self.responder_details.node_id,
|
||||
}));
|
||||
}
|
||||
|
||||
// 1. complete KKT/PSQ handshake before doing anything else.
|
||||
// bail if it takes too long
|
||||
let timeout = self.state.shared.lp_config.debug.handshake_ttl;
|
||||
let stream = &mut self.stream;
|
||||
|
||||
let handshake_state = match LpTransportSession::psq_handshake_initiator_mutual_internode(
|
||||
stream,
|
||||
self.state.local_lp_peer.clone(),
|
||||
self.responder_details.to_lp_peer(),
|
||||
self.responder_details.supported_protocol,
|
||||
) {
|
||||
Ok(handshake_state) => handshake_state,
|
||||
Err(err) => {
|
||||
debug!("failed to initiate mutual KTT/PSQ handshake with {remote}: {err}");
|
||||
self.stats.emit_lifecycle_node_metrics(false);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let session = match tokio::time::timeout(timeout, handshake_state.complete_handshake())
|
||||
.await
|
||||
{
|
||||
Err(_timeout) => {
|
||||
debug!("timed out attempting to complete mutual KTT/PSQ handshake with {remote}");
|
||||
self.stats.emit_lifecycle_node_metrics(false);
|
||||
return None;
|
||||
}
|
||||
Ok(Err(handshake_failure)) => {
|
||||
debug!(
|
||||
"failed to complete mutual KKT/PSQ handshake with {remote}: {handshake_failure}"
|
||||
);
|
||||
self.stats.emit_lifecycle_node_metrics(false);
|
||||
return None;
|
||||
}
|
||||
Ok(Ok(session)) => session,
|
||||
};
|
||||
|
||||
debug!(
|
||||
"completed egress KKT/PSQ handshake with node {}: {remote}",
|
||||
self.responder_details.node_id
|
||||
);
|
||||
|
||||
// TODO: change return type into complete handler
|
||||
Some(Ok(session))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct NestedNodeConnectionHandler<S> {
|
||||
/// Persistent connection to exit gateway for forwarding.
|
||||
/// Currently, it uses raw TCP socket, later it will be wrapped with dedicated PSQ tunnel
|
||||
exit_stream: S,
|
||||
|
||||
/// Socket address of the remote of the established stream
|
||||
exit_address: SocketAddr,
|
||||
|
||||
/// Map of senders to each known client handle (based on the inner receiver index)
|
||||
client_handles: HashMap<LpReceiverIndex, NestedClientConnectionSender>,
|
||||
|
||||
/// Channel for receiving requests that are to be forwarded into the exit stream
|
||||
data_receiver: NestedNodeConnectionReceiver,
|
||||
|
||||
/// Channel for adding new client handle and handling control requests from `NestedConnectionsController`
|
||||
control_receiver: NestedNodeControlReceiver,
|
||||
// client_streams: HashMap<StreamId, LpClientStream>,
|
||||
}
|
||||
|
||||
impl<S> NestedNodeConnectionHandler<S>
|
||||
where
|
||||
S: LpTransportChannel + Unpin,
|
||||
{
|
||||
/// Attempt to extract outer receiver index from the received message
|
||||
/// (that is meant to be an `LpPacket`)
|
||||
fn extract_receiver_index(&self, raw: &[u8]) -> Option<LpReceiverIndex> {
|
||||
if raw.len() < 4 {
|
||||
return None;
|
||||
}
|
||||
Some(LpReceiverIndex::from_le_bytes([
|
||||
raw[0], raw[1], raw[2], raw[3],
|
||||
]))
|
||||
}
|
||||
|
||||
/// Attempt to forward received packet to the client that established the inner LP session
|
||||
async fn handle_exit_packet(&self, packet: Vec<u8>) {
|
||||
let Some(receiver_index) = self.extract_receiver_index(&packet) else {
|
||||
warn!("{} has sent us an invalid LP packet", self.exit_address);
|
||||
return;
|
||||
};
|
||||
let Some(client_handle) = self.client_handles.get(&receiver_index) else {
|
||||
warn!(
|
||||
"no client handle for receiver index {receiver_index} received from {}",
|
||||
self.exit_address
|
||||
);
|
||||
return;
|
||||
};
|
||||
// client_handle.send(packet).await;
|
||||
}
|
||||
|
||||
async fn run(&mut self) {
|
||||
// loop {
|
||||
// tokio::select! {
|
||||
//
|
||||
// }
|
||||
// }
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
pub mod connection;
|
||||
+37
-116
@@ -1,9 +1,11 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::node::lp::cleanup::TimestampedState;
|
||||
use crate::node::lp::control::{LP_DURATION_BUCKETS, LpConnectionStats};
|
||||
use crate::node::lp::error::LpHandlerError;
|
||||
use crate::node::lp::state::SharedLpControlState;
|
||||
use crate::node::lp::forwarding::client_connection::NestedClientConnection;
|
||||
use crate::node::lp::state::SharedLpClientControlState;
|
||||
use dashmap::mapref::one::RefMut;
|
||||
use nym_lp::packet::message::LpMessageType;
|
||||
use nym_lp::packet::{EncryptedLpPacket, ForwardPacketData, LpMessage};
|
||||
@@ -12,7 +14,8 @@ use nym_lp::session::{LpAction, LpInput};
|
||||
use nym_lp::transport::LpHandshakeChannel;
|
||||
use nym_lp::transport::traits::LpTransportChannel;
|
||||
use nym_lp::{LpTransportSession, packet::message::ExpectedResponseSize};
|
||||
use nym_metrics::{add_histogram_obs, inc, inc_by};
|
||||
use nym_metrics::{add_histogram_obs, inc};
|
||||
use nym_node_metrics::NymNodeMetrics;
|
||||
use nym_registration_common::{LpRegistrationRequest, RegistrationStatus};
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
@@ -20,69 +23,16 @@ use tokio::net::TcpStream;
|
||||
use tokio::time::timeout;
|
||||
use tracing::*;
|
||||
|
||||
// Histogram buckets for LP operation duration (legacy - used by unused forwarding methods)
|
||||
const LP_DURATION_BUCKETS: &[f64] = &[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0];
|
||||
|
||||
// Timeout for forward I/O operations (send + receive on exit stream)
|
||||
// Must be long enough to cover exit gateway processing time
|
||||
const FORWARD_IO_TIMEOUT_SECS: u64 = 30;
|
||||
|
||||
// Histogram buckets for LP connection lifecycle duration
|
||||
// LP connections can be very short (registration only: ~1s) or very long (dVPN sessions: hours/days)
|
||||
// Covers full range from seconds to 24 hours
|
||||
const LP_CONNECTION_DURATION_BUCKETS: &[f64] = &[
|
||||
1.0, // 1 second
|
||||
5.0, // 5 seconds
|
||||
10.0, // 10 seconds
|
||||
30.0, // 30 seconds
|
||||
60.0, // 1 minute
|
||||
300.0, // 5 minutes
|
||||
600.0, // 10 minutes
|
||||
1800.0, // 30 minutes
|
||||
3600.0, // 1 hour
|
||||
7200.0, // 2 hours
|
||||
14400.0, // 4 hours
|
||||
28800.0, // 8 hours
|
||||
43200.0, // 12 hours
|
||||
86400.0, // 24 hours
|
||||
];
|
||||
|
||||
/// Connection lifecycle statistics tracking
|
||||
struct ConnectionStats {
|
||||
/// When the connection started
|
||||
start_time: std::time::Instant,
|
||||
/// Total bytes received (including protocol framing)
|
||||
bytes_received: u64,
|
||||
/// Total bytes sent (including protocol framing)
|
||||
bytes_sent: u64,
|
||||
}
|
||||
|
||||
impl ConnectionStats {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
start_time: std::time::Instant::now(),
|
||||
bytes_received: 0,
|
||||
bytes_sent: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn record_bytes_received(&mut self, bytes: usize) {
|
||||
self.bytes_received += bytes as u64;
|
||||
}
|
||||
|
||||
fn record_bytes_sent(&mut self, bytes: usize) {
|
||||
self.bytes_sent += bytes as u64;
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LpConnectionHandler<S = TcpStream> {
|
||||
pub struct LpClientConnectionHandler<S = TcpStream> {
|
||||
stream: S,
|
||||
remote_addr: SocketAddr,
|
||||
state: SharedLpControlState,
|
||||
stats: ConnectionStats,
|
||||
state: SharedLpClientControlState,
|
||||
stats: LpConnectionStats,
|
||||
|
||||
// /// Flag indicating whether this is a connection from an entry gateway serving as a proxy
|
||||
// forwarded_connection: bool,
|
||||
/// Bound receiver_idx for this connection (set after first packet).
|
||||
/// All subsequent packets on this connection must use this receiver_idx.
|
||||
/// Set from ClientHello's proposed receiver_index, or from header for non-bootstrap packets.
|
||||
@@ -92,29 +42,32 @@ pub struct LpConnectionHandler<S = TcpStream> {
|
||||
/// Opened on first forward, reused for subsequent forwards, closed when client disconnects.
|
||||
/// Tuple contains (stream, target_address) to verify subsequent forwards go to same exit.
|
||||
exit_stream: Option<(S, SocketAddr)>,
|
||||
|
||||
/// Forwarding channel for sending requests to the exit gateway and receiving responses.
|
||||
#[allow(dead_code)]
|
||||
forwarding_channel: Option<NestedClientConnection>,
|
||||
}
|
||||
|
||||
impl<S> LpConnectionHandler<S>
|
||||
impl<S> LpClientConnectionHandler<S>
|
||||
where
|
||||
S: LpTransportChannel + LpHandshakeChannel + Unpin,
|
||||
{
|
||||
pub fn new(
|
||||
stream: S,
|
||||
// forwarded_connection: bool,
|
||||
remote_addr: SocketAddr,
|
||||
state: SharedLpControlState,
|
||||
) -> Self {
|
||||
pub fn new(stream: S, remote_addr: SocketAddr, state: SharedLpClientControlState) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
remote_addr,
|
||||
// forwarded_connection,
|
||||
state,
|
||||
stats: ConnectionStats::new(),
|
||||
stats: LpConnectionStats::new(),
|
||||
bound_receiver_idx: None,
|
||||
exit_stream: None,
|
||||
forwarding_channel: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn metrics(&self) -> &NymNodeMetrics {
|
||||
&self.state.shared.metrics
|
||||
}
|
||||
|
||||
/// Get the mutable reference to the state machine associated with this client.
|
||||
/// It is vital it's never held across await points or this might lead to a deadlock.
|
||||
fn state_entry_mut(
|
||||
@@ -125,8 +78,7 @@ where
|
||||
self.state
|
||||
.shared
|
||||
.session_states
|
||||
.get_mut(&receiver_index)
|
||||
.ok_or_else(|| LpHandlerError::MissingLpSession { receiver_index })
|
||||
.get_state_entry_mut(receiver_index)
|
||||
}
|
||||
|
||||
/// AIDEV-NOTE: Stream-oriented packet loop
|
||||
@@ -135,11 +87,12 @@ where
|
||||
/// First packet binds the connection to a receiver_idx (session-affine).
|
||||
/// Binding is set by handle_client_hello() from payload's receiver_index,
|
||||
/// or by validate_or_set_binding() for non-bootstrap first packets.
|
||||
pub async fn handle(mut self) -> Result<(), LpHandlerError> {
|
||||
debug!("Handling LP connection from {}", self.remote_addr);
|
||||
pub async fn handle(&mut self) -> Result<(), LpHandlerError> {
|
||||
let remote = self.remote_addr;
|
||||
debug!("Handling LP connection from {remote}");
|
||||
|
||||
// Track total LP connections handled
|
||||
inc!("lp_connections_total");
|
||||
inc!("lp_client_connections_total");
|
||||
|
||||
// ============================================================
|
||||
// STREAM-ORIENTED PROCESSING: Loop until connection closes
|
||||
@@ -160,18 +113,12 @@ where
|
||||
.await
|
||||
{
|
||||
Err(_timeout) => {
|
||||
debug!(
|
||||
"timed out attempting to complete KTT/PSQ handshake with {}",
|
||||
self.remote_addr
|
||||
);
|
||||
debug!("timed out attempting to complete KTT/PSQ handshake with {remote}",);
|
||||
self.emit_lifecycle_metrics(false);
|
||||
return Ok(());
|
||||
}
|
||||
Ok(Err(handshake_failure)) => {
|
||||
debug!(
|
||||
"failed to complete KKT/PSQ handshake with {}: {handshake_failure}",
|
||||
self.remote_addr
|
||||
);
|
||||
debug!("failed to complete KKT/PSQ handshake with {remote}: {handshake_failure}",);
|
||||
self.emit_lifecycle_metrics(false);
|
||||
return Ok(());
|
||||
}
|
||||
@@ -180,10 +127,7 @@ where
|
||||
let receiver_idx = session.receiver_index();
|
||||
|
||||
// 2. insert the state machine into the shared state
|
||||
self.state
|
||||
.shared
|
||||
.session_states
|
||||
.insert(receiver_idx, TimestampedState::new(session));
|
||||
self.state.shared.session_states.insert_new_session(session);
|
||||
self.bound_receiver_idx = Some(receiver_idx);
|
||||
|
||||
// 3. handle any new incoming packet
|
||||
@@ -194,7 +138,7 @@ where
|
||||
Err(err) => {
|
||||
if err.is_connection_closed() {
|
||||
// Graceful EOF - client closed connection
|
||||
trace!("Connection closed by {} (EOF)", self.remote_addr);
|
||||
trace!("Connection closed by {remote} (EOF)");
|
||||
break;
|
||||
} else {
|
||||
inc!("lp_errors_receive_packet");
|
||||
@@ -625,30 +569,7 @@ where
|
||||
|
||||
/// Emit connection lifecycle metrics
|
||||
fn emit_lifecycle_metrics(&self, graceful: bool) {
|
||||
// Track connection duration
|
||||
let duration = self.stats.start_time.elapsed().as_secs_f64();
|
||||
add_histogram_obs!(
|
||||
"lp_connection_duration_seconds",
|
||||
duration,
|
||||
LP_CONNECTION_DURATION_BUCKETS
|
||||
);
|
||||
|
||||
// Track bytes transferred
|
||||
inc_by!(
|
||||
"lp_connection_bytes_received_total",
|
||||
self.stats.bytes_received as i64
|
||||
);
|
||||
inc_by!(
|
||||
"lp_connection_bytes_sent_total",
|
||||
self.stats.bytes_sent as i64
|
||||
);
|
||||
|
||||
// Track completion type
|
||||
if graceful {
|
||||
inc!("lp_connections_completed_gracefully");
|
||||
} else {
|
||||
inc!("lp_connections_completed_with_error");
|
||||
}
|
||||
self.stats.emit_lifecycle_client_metrics(graceful);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -657,7 +578,7 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::config::LpConfig;
|
||||
use crate::config::lp::LpDebug;
|
||||
use crate::node::lp::state::SharedLpState;
|
||||
use crate::node::lp::state::{ActiveLpSessions, SharedLpState};
|
||||
use nym_lp::peer::{KEMKeys, LpLocalPeer, generate_keypair_mceliece, generate_keypair_mlkem};
|
||||
use nym_lp::{Ciphersuite, SessionManager, sessions_for_tests};
|
||||
use nym_test_utils::helpers::{deterministic_rng, deterministic_rng_09};
|
||||
@@ -665,7 +586,7 @@ mod tests {
|
||||
// ==================== Test Helpers ====================
|
||||
|
||||
/// Create a minimal test state for handler tests
|
||||
async fn create_minimal_test_state() -> SharedLpControlState {
|
||||
async fn create_minimal_test_state() -> SharedLpClientControlState {
|
||||
use nym_crypto::asymmetric::ed25519;
|
||||
|
||||
let mut rng = deterministic_rng();
|
||||
@@ -690,14 +611,14 @@ mod tests {
|
||||
);
|
||||
let lp_peer = LpLocalPeer::new(Ciphersuite::default(), x_keys).with_kem_keys(kem_keys);
|
||||
|
||||
SharedLpControlState {
|
||||
SharedLpClientControlState {
|
||||
local_lp_peer: lp_peer,
|
||||
peer_registrator: None,
|
||||
forward_semaphore,
|
||||
shared: SharedLpState {
|
||||
lp_config,
|
||||
metrics: nym_node_metrics::NymNodeMetrics::default(),
|
||||
session_states: Arc::new(dashmap::DashMap::new()),
|
||||
metrics: NymNodeMetrics::default(),
|
||||
session_states: ActiveLpSessions::new(),
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -724,7 +645,7 @@ mod tests {
|
||||
let server_task = tokio::spawn(async move {
|
||||
let (stream, remote_addr) = listener.accept().await.unwrap();
|
||||
let state = create_minimal_test_state().await;
|
||||
let mut handler = LpConnectionHandler::new(stream, remote_addr, state);
|
||||
let mut handler = LpClientConnectionHandler::new(stream, remote_addr, state);
|
||||
// Two-phase: receive raw bytes + header, then parse full packet
|
||||
let packet = handler.receive_raw_packet().await?;
|
||||
let header = packet.outer_header();
|
||||
@@ -0,0 +1,188 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::config::LpConfig;
|
||||
use crate::error::NymNodeError;
|
||||
use crate::node::lp::control::ingress::client_handler::LpClientConnectionHandler;
|
||||
use crate::node::lp::control::ingress::node_handler::InitialLpIngressNodeConnectionHandler;
|
||||
use crate::node::lp::directory::LpNodeDetails;
|
||||
use crate::node::lp::state::{SharedLpClientControlState, SharedLpNodeControlState};
|
||||
use nym_task::ShutdownTracker;
|
||||
use std::net::SocketAddr;
|
||||
use tokio::net::TcpListener;
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
|
||||
/// LP listener that accepts TCP connections on port 41264
|
||||
pub struct LpControlListener {
|
||||
/// Address to bind to
|
||||
bind_address: SocketAddr,
|
||||
|
||||
/// Shared state for clients connection handlers
|
||||
clients_handler_state: SharedLpClientControlState,
|
||||
|
||||
/// Shared state for nodes connection handlers
|
||||
nodes_handler_state: SharedLpNodeControlState,
|
||||
|
||||
/// Shutdown coordination
|
||||
shutdown: ShutdownTracker,
|
||||
}
|
||||
|
||||
impl LpControlListener {
|
||||
pub fn new(
|
||||
bind_address: SocketAddr,
|
||||
clients_handler_state: SharedLpClientControlState,
|
||||
nodes_handler_state: SharedLpNodeControlState,
|
||||
shutdown: ShutdownTracker,
|
||||
) -> Self {
|
||||
Self {
|
||||
bind_address,
|
||||
clients_handler_state,
|
||||
nodes_handler_state,
|
||||
shutdown,
|
||||
}
|
||||
}
|
||||
|
||||
fn lp_config(&self) -> LpConfig {
|
||||
self.clients_handler_state.shared.lp_config
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) -> Result<(), NymNodeError> {
|
||||
let bind_address = self.bind_address;
|
||||
info!("Starting LP control listener on {bind_address}");
|
||||
|
||||
let listener = TcpListener::bind(bind_address).await.map_err(|source| {
|
||||
error!("Failed to bind LP listener to {bind_address}: {source}",);
|
||||
NymNodeError::LpBindFailure {
|
||||
address: bind_address,
|
||||
source,
|
||||
}
|
||||
})?;
|
||||
|
||||
let shutdown_token = self.shutdown.clone_shutdown_token();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown_token.cancelled() => {
|
||||
trace!("LP listener: received shutdown signal");
|
||||
break;
|
||||
}
|
||||
|
||||
result = listener.accept() => {
|
||||
match result {
|
||||
Ok((stream, addr)) => self.handle_connection(stream, addr),
|
||||
Err(e) => warn!("Failed to accept LP connection: {e}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("LP listener shutdown complete");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_node_connection(
|
||||
&self,
|
||||
stream: tokio::net::TcpStream,
|
||||
remote_addr: SocketAddr,
|
||||
initiator_details: LpNodeDetails,
|
||||
) {
|
||||
debug!("Accepting LP node connection from {remote_addr}");
|
||||
|
||||
// Spawn handler task
|
||||
let mut handler = InitialLpIngressNodeConnectionHandler::new(
|
||||
stream,
|
||||
remote_addr,
|
||||
initiator_details,
|
||||
self.nodes_handler_state.clone(),
|
||||
);
|
||||
|
||||
self.shutdown.try_spawn_named_with_shutdown(
|
||||
async move {
|
||||
let metrics = handler.metrics().clone();
|
||||
|
||||
// Increment connection counter
|
||||
metrics.network.new_ingress_lp_node_connection();
|
||||
|
||||
let result = handler.handle().await;
|
||||
|
||||
// Decrement connection counter
|
||||
metrics.network.closed_ingress_lp_node_connection();
|
||||
|
||||
// Handler emits lifecycle metrics internally on success
|
||||
// For errors, we need to emit them here since handler is consumed
|
||||
if let Err(e) = result {
|
||||
warn!("LP node handler error for {remote_addr}: {e}");
|
||||
// Note: metrics are emitted in handle() for graceful path
|
||||
// On error path, handle() returns early without emitting
|
||||
// So we track errors here
|
||||
}
|
||||
},
|
||||
&format!("LP_NODE::{remote_addr}"),
|
||||
);
|
||||
}
|
||||
|
||||
fn handle_client_connection(&self, stream: tokio::net::TcpStream, remote_addr: SocketAddr) {
|
||||
// Check connection limit (only for clients, nodes must always be allowed regardless of the limit)
|
||||
let active_connections = self.active_client_connections();
|
||||
let max_connections = self.lp_config().debug.max_connections;
|
||||
if active_connections >= max_connections {
|
||||
warn!(
|
||||
"LP connection limit exceeded ({active_connections}/{max_connections}), rejecting connection from {remote_addr}"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
debug!(
|
||||
"Accepting LP client connection from {remote_addr} ({active_connections} active connections)"
|
||||
);
|
||||
|
||||
// Spawn handler task
|
||||
let mut handler =
|
||||
LpClientConnectionHandler::new(stream, remote_addr, self.clients_handler_state.clone());
|
||||
|
||||
self.shutdown.try_spawn_named_with_shutdown(
|
||||
async move {
|
||||
// Increment connection counter
|
||||
handler.metrics().network.new_ingress_lp_client_connection();
|
||||
|
||||
let result = handler.handle().await;
|
||||
// Decrement connection counter
|
||||
handler
|
||||
.metrics()
|
||||
.network
|
||||
.closed_ingress_lp_client_connection();
|
||||
|
||||
// Handler emits lifecycle metrics internally on success
|
||||
// For errors, we need to emit them here since handler is consumed
|
||||
if let Err(e) = result {
|
||||
warn!("LP client handler error for {remote_addr}: {e}");
|
||||
// Note: metrics are emitted in handle() for graceful path
|
||||
// On error path, handle() returns early without emitting
|
||||
// So we track errors here
|
||||
}
|
||||
},
|
||||
&format!("LP_CLIENT::{remote_addr}"),
|
||||
);
|
||||
}
|
||||
|
||||
fn handle_connection(&self, stream: tokio::net::TcpStream, remote_addr: SocketAddr) {
|
||||
if let Some(initiator_details) = self
|
||||
.nodes_handler_state
|
||||
.nodes
|
||||
.get_node_details(remote_addr.ip())
|
||||
{
|
||||
self.handle_node_connection(stream, remote_addr, initiator_details);
|
||||
} else {
|
||||
self.handle_client_connection(stream, remote_addr);
|
||||
}
|
||||
}
|
||||
|
||||
fn active_client_connections(&self) -> usize {
|
||||
self.clients_handler_state
|
||||
.shared
|
||||
.metrics
|
||||
.network
|
||||
.active_lp_client_connections_count()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
pub mod client_handler;
|
||||
pub(crate) mod listener;
|
||||
pub mod node_handler;
|
||||
@@ -0,0 +1,148 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::node::lp::control::LpConnectionStats;
|
||||
use crate::node::lp::directory::LpNodeDetails;
|
||||
use crate::node::lp::error::LpHandlerError;
|
||||
use crate::node::lp::state::SharedLpNodeControlState;
|
||||
use nym_lp::LpTransportSession;
|
||||
use nym_lp::transport::{LpHandshakeChannel, LpTransportChannel};
|
||||
use nym_metrics::inc;
|
||||
use nym_node_metrics::NymNodeMetrics;
|
||||
use nym_topology::NodeId;
|
||||
use std::net::SocketAddr;
|
||||
use tokio::net::TcpStream;
|
||||
use tracing::debug;
|
||||
|
||||
/// Initial connection handler for an ingress LP node before completing the KKT/PSQ handshake.
|
||||
pub struct InitialLpIngressNodeConnectionHandler<S = TcpStream> {
|
||||
stream: S,
|
||||
remote_addr: SocketAddr,
|
||||
initiator_details: LpNodeDetails,
|
||||
|
||||
state: SharedLpNodeControlState,
|
||||
stats: LpConnectionStats,
|
||||
}
|
||||
|
||||
impl<S> InitialLpIngressNodeConnectionHandler<S>
|
||||
where
|
||||
S: LpHandshakeChannel + LpTransportChannel + Unpin,
|
||||
{
|
||||
pub fn new(
|
||||
stream: S,
|
||||
remote_addr: SocketAddr,
|
||||
initiator_details: LpNodeDetails,
|
||||
state: SharedLpNodeControlState,
|
||||
) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
remote_addr,
|
||||
initiator_details,
|
||||
state,
|
||||
stats: LpConnectionStats::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn metrics(&self) -> &NymNodeMetrics {
|
||||
&self.state.shared.metrics
|
||||
}
|
||||
|
||||
pub(crate) async fn complete_initial_handshake(
|
||||
mut self,
|
||||
) -> Option<Result<LpIngressNodeConnectionHandler<S>, LpHandlerError>> {
|
||||
let remote = self.remote_addr;
|
||||
|
||||
if self.initiator_details.kem_key_hashes.is_empty() {
|
||||
return Some(Err(LpHandlerError::MissingNodeKEMKeyHashes {
|
||||
node_ip: self.remote_addr.ip(),
|
||||
node_id: self.initiator_details.node_id,
|
||||
}));
|
||||
}
|
||||
|
||||
// 1. complete KKT/PSQ handshake before doing anything else.
|
||||
// bail if it takes too long
|
||||
let timeout = self.state.shared.lp_config.debug.handshake_ttl;
|
||||
let local_peer = self.state.local_lp_peer.clone();
|
||||
let stream = &mut self.stream;
|
||||
let kem_hashes = self.initiator_details.kem_key_hashes.clone();
|
||||
|
||||
let session = match tokio::time::timeout(timeout, async move {
|
||||
LpTransportSession::psq_handshake_responder_mutual(stream, local_peer, kem_hashes)
|
||||
.complete_handshake()
|
||||
.await
|
||||
})
|
||||
.await
|
||||
{
|
||||
Err(_timeout) => {
|
||||
debug!("timed out attempting to complete mutual KTT/PSQ handshake with {remote}");
|
||||
self.stats.emit_lifecycle_node_metrics(false);
|
||||
return None;
|
||||
}
|
||||
Ok(Err(handshake_failure)) => {
|
||||
debug!(
|
||||
"failed to complete mutual KKT/PSQ handshake with {remote}: {handshake_failure}"
|
||||
);
|
||||
self.stats.emit_lifecycle_node_metrics(false);
|
||||
return None;
|
||||
}
|
||||
Ok(Ok(session)) => session,
|
||||
};
|
||||
|
||||
debug!(
|
||||
"completed ingress KKT/PSQ handshake with node {}: {remote}",
|
||||
self.initiator_details.node_id
|
||||
);
|
||||
|
||||
Some(Ok(LpIngressNodeConnectionHandler {
|
||||
stream: self.stream,
|
||||
remote_addr: remote,
|
||||
remote_node_id: self.initiator_details.node_id,
|
||||
state: self.state,
|
||||
stats: self.stats,
|
||||
transport_session: session,
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn handle(mut self) -> Result<(), LpHandlerError> {
|
||||
// Track total LP connections handled
|
||||
inc!("lp_node_connections_total");
|
||||
|
||||
// attempt to complete initial handshake
|
||||
let upgraded_handler = match self.complete_initial_handshake().await {
|
||||
None => return Ok(()),
|
||||
Some(handler_res) => handler_res?,
|
||||
};
|
||||
|
||||
// continue handling the requests with the transport session
|
||||
upgraded_handler.handle().await
|
||||
}
|
||||
}
|
||||
|
||||
/// Connection handler for an LP node after completing the KKT/PSQ handshake.
|
||||
pub struct LpIngressNodeConnectionHandler<S = TcpStream> {
|
||||
stream: S,
|
||||
remote_addr: SocketAddr,
|
||||
remote_node_id: NodeId,
|
||||
|
||||
state: SharedLpNodeControlState,
|
||||
stats: LpConnectionStats,
|
||||
transport_session: LpTransportSession,
|
||||
// LOCAL receiver index to stream id
|
||||
// client_streams: HashMap<ReceiverIndex, ClientStreamId>,
|
||||
}
|
||||
|
||||
impl<S> LpIngressNodeConnectionHandler<S>
|
||||
where
|
||||
S: LpHandshakeChannel + LpTransportChannel + Unpin,
|
||||
{
|
||||
async fn handle(mut self) -> Result<(), LpHandlerError> {
|
||||
// handle all the forwarding here
|
||||
|
||||
self.stats.emit_lifecycle_node_metrics(true);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn transport_session(&self) -> &LpTransportSession {
|
||||
&self.transport_session
|
||||
}
|
||||
}
|
||||
@@ -1,130 +0,0 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::config::LpConfig;
|
||||
use crate::error::NymNodeError;
|
||||
use crate::node::lp::control::handler::LpConnectionHandler;
|
||||
use crate::node::lp::state::SharedLpControlState;
|
||||
use nym_task::ShutdownTracker;
|
||||
use std::net::SocketAddr;
|
||||
use tokio::net::TcpListener;
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
|
||||
/// LP listener that accepts TCP connections on port 41264
|
||||
pub struct LpControlListener {
|
||||
/// Address to bind to
|
||||
bind_address: SocketAddr,
|
||||
|
||||
/// Shared state for connection handlers
|
||||
handler_state: SharedLpControlState,
|
||||
|
||||
/// Shutdown coordination
|
||||
shutdown: ShutdownTracker,
|
||||
}
|
||||
|
||||
impl LpControlListener {
|
||||
pub fn new(
|
||||
bind_address: SocketAddr,
|
||||
handler_state: SharedLpControlState,
|
||||
shutdown: ShutdownTracker,
|
||||
) -> Self {
|
||||
Self {
|
||||
bind_address,
|
||||
handler_state,
|
||||
shutdown,
|
||||
}
|
||||
}
|
||||
|
||||
fn lp_config(&self) -> LpConfig {
|
||||
self.handler_state.shared.lp_config
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) -> Result<(), NymNodeError> {
|
||||
let bind_address = self.bind_address;
|
||||
info!("Starting LP control listener on {bind_address}");
|
||||
|
||||
let listener = TcpListener::bind(bind_address).await.map_err(|source| {
|
||||
error!("Failed to bind LP listener to {bind_address}: {source}",);
|
||||
NymNodeError::LpBindFailure {
|
||||
address: bind_address,
|
||||
source,
|
||||
}
|
||||
})?;
|
||||
|
||||
let shutdown_token = self.shutdown.clone_shutdown_token();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown_token.cancelled() => {
|
||||
trace!("LP listener: received shutdown signal");
|
||||
break;
|
||||
}
|
||||
|
||||
result = listener.accept() => {
|
||||
match result {
|
||||
Ok((stream, addr)) => self.handle_connection(stream, addr),
|
||||
Err(e) => warn!("Failed to accept LP connection: {e}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("LP listener shutdown complete");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_connection(&self, stream: tokio::net::TcpStream, remote_addr: SocketAddr) {
|
||||
// Check connection limit
|
||||
let active_connections = self.active_lp_connections();
|
||||
let max_connections = self.lp_config().debug.max_connections;
|
||||
if active_connections >= max_connections {
|
||||
warn!(
|
||||
"LP connection limit exceeded ({active_connections}/{max_connections}), rejecting connection from {remote_addr}"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
debug!(
|
||||
"Accepting LP connection from {remote_addr} ({active_connections} active connections)"
|
||||
);
|
||||
|
||||
// Increment connection counter
|
||||
self.handler_state
|
||||
.shared
|
||||
.metrics
|
||||
.network
|
||||
.new_lp_connection();
|
||||
|
||||
// Spawn handler task
|
||||
let handler = LpConnectionHandler::new(stream, remote_addr, self.handler_state.clone());
|
||||
|
||||
let metrics = self.handler_state.shared.metrics.clone();
|
||||
self.shutdown.try_spawn_named_with_shutdown(
|
||||
async move {
|
||||
let result = handler.handle().await;
|
||||
|
||||
// Handler emits lifecycle metrics internally on success
|
||||
// For errors, we need to emit them here since handler is consumed
|
||||
if let Err(e) = result {
|
||||
warn!("LP handler error for {remote_addr}: {e}");
|
||||
// Note: metrics are emitted in handle() for graceful path
|
||||
// On error path, handle() returns early without emitting
|
||||
// So we track errors here
|
||||
}
|
||||
|
||||
// Decrement connection counter on exit
|
||||
metrics.network.lp_connection_closed();
|
||||
},
|
||||
&format!("LP::{remote_addr}"),
|
||||
);
|
||||
}
|
||||
|
||||
fn active_lp_connections(&self) -> usize {
|
||||
self.handler_state
|
||||
.shared
|
||||
.metrics
|
||||
.network
|
||||
.active_lp_connections_count()
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,119 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
pub mod handler;
|
||||
pub(crate) mod listener;
|
||||
use nym_metrics::{add_histogram_obs, inc, inc_by};
|
||||
|
||||
pub mod egress;
|
||||
pub mod ingress;
|
||||
mod tests;
|
||||
|
||||
// Histogram buckets for LP operation duration (legacy - used by unused forwarding methods)
|
||||
const LP_DURATION_BUCKETS: &[f64] = &[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0];
|
||||
|
||||
// Histogram buckets for LP connection lifecycle duration
|
||||
// LP connections can be very short (registration only: ~1s) or very long (dVPN sessions: hours/days)
|
||||
// Covers full range from seconds to 24 hours
|
||||
const LP_CONNECTION_DURATION_BUCKETS: &[f64] = &[
|
||||
1.0, // 1 second
|
||||
5.0, // 5 seconds
|
||||
10.0, // 10 seconds
|
||||
30.0, // 30 seconds
|
||||
60.0, // 1 minute
|
||||
300.0, // 5 minutes
|
||||
600.0, // 10 minutes
|
||||
1800.0, // 30 minutes
|
||||
3600.0, // 1 hour
|
||||
7200.0, // 2 hours
|
||||
14400.0, // 4 hours
|
||||
28800.0, // 8 hours
|
||||
43200.0, // 12 hours
|
||||
86400.0, // 24 hours
|
||||
];
|
||||
|
||||
/// Connection lifecycle statistics tracking
|
||||
pub(crate) struct LpConnectionStats {
|
||||
/// When the connection started
|
||||
start_time: std::time::Instant,
|
||||
/// Total bytes received (including protocol framing)
|
||||
bytes_received: u64,
|
||||
/// Total bytes sent (including protocol framing)
|
||||
bytes_sent: u64,
|
||||
}
|
||||
|
||||
impl LpConnectionStats {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
start_time: std::time::Instant::now(),
|
||||
bytes_received: 0,
|
||||
bytes_sent: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn duration(&self) -> std::time::Duration {
|
||||
self.start_time.elapsed()
|
||||
}
|
||||
|
||||
fn record_bytes_received(&mut self, bytes: usize) {
|
||||
self.bytes_received += bytes as u64;
|
||||
}
|
||||
|
||||
fn record_bytes_sent(&mut self, bytes: usize) {
|
||||
self.bytes_sent += bytes as u64;
|
||||
}
|
||||
|
||||
/// Emit connection lifecycle metrics for a client connection
|
||||
fn emit_lifecycle_client_metrics(&self, graceful: bool) {
|
||||
// Track connection duration
|
||||
let duration = self.duration().as_secs_f64();
|
||||
add_histogram_obs!(
|
||||
"lp_client_connection_duration_seconds",
|
||||
duration,
|
||||
LP_CONNECTION_DURATION_BUCKETS
|
||||
);
|
||||
|
||||
// Track bytes transferred
|
||||
inc_by!(
|
||||
"lp_client_connection_bytes_received_total",
|
||||
self.bytes_received as i64
|
||||
);
|
||||
inc_by!(
|
||||
"lp_client_connection_bytes_sent_total",
|
||||
self.bytes_sent as i64
|
||||
);
|
||||
|
||||
// Track completion type
|
||||
if graceful {
|
||||
inc!("lp_client_connections_completed_gracefully");
|
||||
} else {
|
||||
inc!("lp_client_connections_completed_with_error");
|
||||
}
|
||||
}
|
||||
|
||||
/// Emit connection lifecycle metrics for a node connection
|
||||
fn emit_lifecycle_node_metrics(&self, graceful: bool) {
|
||||
// Track connection duration
|
||||
let duration = self.duration().as_secs_f64();
|
||||
add_histogram_obs!(
|
||||
"lp_node_connection_duration_seconds",
|
||||
duration,
|
||||
LP_CONNECTION_DURATION_BUCKETS
|
||||
);
|
||||
|
||||
// Track bytes transferred
|
||||
inc_by!(
|
||||
"lp_node_connection_bytes_received_total",
|
||||
self.bytes_received as i64
|
||||
);
|
||||
inc_by!(
|
||||
"lp_node_connection_bytes_sent_total",
|
||||
self.bytes_sent as i64
|
||||
);
|
||||
|
||||
// Track completion type
|
||||
if graceful {
|
||||
inc!("lp_node_connections_completed_gracefully");
|
||||
} else {
|
||||
inc!("lp_node_connections_completed_with_error");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,89 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::node::lp::SharedLpState;
|
||||
use crate::node::lp::control::egress::connection::InitialLpEgressNodeConnectionHandler;
|
||||
use crate::node::lp::control::ingress::node_handler::InitialLpIngressNodeConnectionHandler;
|
||||
use crate::node::lp::directory::LpNodeDetails;
|
||||
use crate::node::lp::state::SharedLpNodeControlState;
|
||||
use anyhow::Context;
|
||||
use nym_lp::packet::version;
|
||||
use nym_lp::peer::{LpLocalPeer, LpRemotePeer, mock_peers};
|
||||
use nym_test_utils::helpers::seeded_rng;
|
||||
use nym_test_utils::mocks::async_read_write::MockIOStream;
|
||||
use nym_test_utils::traits::TimeboxedSpawnable;
|
||||
use rand::RngCore;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
|
||||
fn shared_node_state(peer: LpLocalPeer) -> SharedLpNodeControlState {
|
||||
SharedLpNodeControlState {
|
||||
local_lp_peer: peer,
|
||||
nodes: Default::default(),
|
||||
shared: SharedLpState {
|
||||
metrics: Default::default(),
|
||||
lp_config: Default::default(),
|
||||
session_states: Default::default(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn lp_node_details(peer: LpRemotePeer) -> LpNodeDetails {
|
||||
let key_bytes = peer.x25519().as_ref().try_into().unwrap();
|
||||
let mut rng = seeded_rng(key_bytes);
|
||||
LpNodeDetails::new(
|
||||
rng.next_u32(),
|
||||
peer.kem_key_digests().clone(),
|
||||
peer.x25519().clone(),
|
||||
version::CURRENT,
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn basic_node_to_node_handshake() -> anyhow::Result<()> {
|
||||
nym_test_utils::helpers::setup_test_logger();
|
||||
|
||||
let (init, resp) = mock_peers();
|
||||
let init_remote = init.as_remote();
|
||||
let resp_remote = resp.as_remote();
|
||||
|
||||
let conn_init = MockIOStream::default();
|
||||
let conn_resp = conn_init.try_get_remote_handle();
|
||||
|
||||
let init_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234);
|
||||
let init_details = lp_node_details(init_remote);
|
||||
let resp_details = lp_node_details(resp_remote);
|
||||
|
||||
let init_state = shared_node_state(init);
|
||||
let resp_state = shared_node_state(resp);
|
||||
|
||||
let init_handler = InitialLpEgressNodeConnectionHandler::new(
|
||||
conn_init,
|
||||
init_addr,
|
||||
resp_details,
|
||||
init_state,
|
||||
);
|
||||
|
||||
let resp_handler = InitialLpIngressNodeConnectionHandler::new(
|
||||
conn_resp,
|
||||
init_addr,
|
||||
init_details,
|
||||
resp_state,
|
||||
);
|
||||
|
||||
let init_future = init_handler.complete_initial_handshake().spawn_timeboxed();
|
||||
let resp_future = resp_handler.complete_initial_handshake().spawn_timeboxed();
|
||||
|
||||
let (init_result, resp_result) = tokio::join!(init_future, resp_future);
|
||||
let init_result = init_result??.context("handshake failure")??;
|
||||
let resp_result = resp_result??.context("handshake failure")??;
|
||||
|
||||
assert_eq!(
|
||||
init_result.receiver_index(),
|
||||
resp_result.transport_session().receiver_index()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use nym_lp::peer::{DHPublicKey, LpRemotePeer};
|
||||
use nym_lp::{KEM, KEMKeyDigests};
|
||||
use nym_topology::NodeId;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::net::IpAddr;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Wrapper around all known LP nodes
|
||||
#[derive(Clone, Default)]
|
||||
pub struct LpNodes {
|
||||
// map between all available ip addresses of other nodes and their details
|
||||
nodes: Arc<ArcSwap<HashMap<IpAddr, LpNodeDetails>>>,
|
||||
}
|
||||
|
||||
impl LpNodes {
|
||||
pub(crate) fn get_node_details(&self, node_ip: IpAddr) -> Option<LpNodeDetails> {
|
||||
self.nodes.load().get(&node_ip).cloned()
|
||||
}
|
||||
|
||||
pub(crate) fn get_node_id(&self, node_ip: IpAddr) -> Option<NodeId> {
|
||||
self.nodes
|
||||
.load()
|
||||
.get(&node_ip)
|
||||
.map(|details| details.node_id)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct LpNodeDetails {
|
||||
inner: Arc<LpNodeDetailsInner>,
|
||||
}
|
||||
|
||||
impl LpNodeDetails {
|
||||
pub(crate) fn new(
|
||||
node_id: NodeId,
|
||||
kem_key_hashes: BTreeMap<KEM, KEMKeyDigests>,
|
||||
x25519: DHPublicKey,
|
||||
supported_protocol: u8,
|
||||
) -> Self {
|
||||
LpNodeDetails {
|
||||
inner: Arc::new(LpNodeDetailsInner {
|
||||
node_id,
|
||||
kem_key_hashes,
|
||||
x25519,
|
||||
supported_protocol,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for LpNodeDetails {
|
||||
type Target = LpNodeDetailsInner;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.inner.deref()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct LpNodeDetailsInner {
|
||||
pub(crate) node_id: NodeId,
|
||||
pub(crate) kem_key_hashes: BTreeMap<KEM, KEMKeyDigests>,
|
||||
pub(crate) x25519: DHPublicKey,
|
||||
pub(crate) supported_protocol: u8,
|
||||
}
|
||||
|
||||
impl LpNodeDetailsInner {
|
||||
pub(crate) fn to_lp_peer(&self) -> LpRemotePeer {
|
||||
LpRemotePeer::new(self.x25519).with_key_digests(self.kem_key_hashes.clone())
|
||||
}
|
||||
}
|
||||
@@ -6,7 +6,8 @@ use nym_lp::peer_config::LpReceiverIndex;
|
||||
use nym_lp::session::LpAction;
|
||||
use nym_lp::transport::LpTransportError;
|
||||
use nym_lp::{LpError, packet::MalformedLpPacketError};
|
||||
use std::net::SocketAddr;
|
||||
use nym_topology::NodeId;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
@@ -47,9 +48,18 @@ pub enum LpHandlerError {
|
||||
#[error("timed out while attempting to send to/receive from the connection")]
|
||||
ConnectionTimeout,
|
||||
|
||||
#[error("missing KEM key hashes for node {node_id} connected from {node_ip}")]
|
||||
MissingNodeKEMKeyHashes { node_ip: IpAddr, node_id: NodeId },
|
||||
|
||||
#[error("data channel is not yet implemented")]
|
||||
UnimplementedDataChannel,
|
||||
|
||||
#[error("{ip_addr} does not correspond to any known LP node")]
|
||||
NotLpNode { ip_addr: IpAddr },
|
||||
|
||||
#[error("{0}")]
|
||||
Internal(String),
|
||||
|
||||
#[error("{0}")]
|
||||
Other(String),
|
||||
}
|
||||
@@ -64,6 +74,10 @@ impl LpHandlerError {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn internal(msg: impl Into<String>) -> Self {
|
||||
LpHandlerError::Internal(msg.into())
|
||||
}
|
||||
|
||||
pub fn other(msg: impl Into<String>) -> Self {
|
||||
LpHandlerError::Other(msg.into())
|
||||
}
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::node::lp::control::egress::connection::NestedNodeConnectionSender;
|
||||
use futures::channel::mpsc::UnboundedReceiver;
|
||||
|
||||
pub(crate) type NestedClientConnectionSender = ();
|
||||
pub(crate) type NestedClientConnectionReceiver = UnboundedReceiver<Vec<u8>>;
|
||||
|
||||
pub(crate) struct NestedClientConnection {
|
||||
// handle for sending into `NestedNodeConnectionHandler`
|
||||
sender: NestedNodeConnectionSender,
|
||||
|
||||
// handle for receiving from `NestedNodeConnectionHandler`
|
||||
receiver: NestedClientConnectionReceiver,
|
||||
}
|
||||
@@ -0,0 +1,116 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::node::lp::control::egress::connection::NestedNodeControlSender;
|
||||
use crate::node::lp::directory::LpNodes;
|
||||
use crate::node::lp::error::LpHandlerError;
|
||||
use crate::node::lp::forwarding::{
|
||||
ConnectionControllerResponse, ConnectionHandlerResponse, ControllerResponse,
|
||||
GetConnectionHandler, NestedConnectionControllerRequest,
|
||||
};
|
||||
use nym_topology::NodeId;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Notify;
|
||||
use tracing::info;
|
||||
|
||||
pub const CONTROL_CHANNEL_SIZE: usize = 64;
|
||||
|
||||
pub(crate) enum NodeHandle {
|
||||
Established(NestedNodeControlSender),
|
||||
Pending(Arc<Notify>),
|
||||
}
|
||||
|
||||
/// Keep track of connections to the exit gateway
|
||||
pub struct NestedConnectionsController {
|
||||
/// Handle channel for sending requests to this controller
|
||||
sender: super::NodeConnectionControllerSender,
|
||||
|
||||
/// Channel for receiving requests in this controller
|
||||
receiver: super::NodeConnectionControllerReceiver,
|
||||
|
||||
/// Map of all LP node ip addresses to their details (and ids)
|
||||
lp_nodes: LpNodes,
|
||||
|
||||
/// Handles to the active nested node connections
|
||||
nodes_handles: HashMap<NodeId, NodeHandle>,
|
||||
|
||||
/// Shutdown token
|
||||
shutdown: nym_task::ShutdownToken,
|
||||
}
|
||||
|
||||
impl NestedConnectionsController {
|
||||
pub fn new(lp_nodes: LpNodes, shutdown: nym_task::ShutdownToken) -> Self {
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel(CONTROL_CHANNEL_SIZE);
|
||||
|
||||
Self {
|
||||
sender,
|
||||
receiver,
|
||||
lp_nodes,
|
||||
nodes_handles: HashMap::new(),
|
||||
shutdown,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn request_sender(&self) -> super::NodeConnectionControllerSender {
|
||||
self.sender.clone()
|
||||
}
|
||||
|
||||
async fn handle_get_connection_handler(
|
||||
&mut self,
|
||||
request: GetConnectionHandler,
|
||||
) -> ConnectionHandlerResponse {
|
||||
let ip = request.target_gateway_lp_address.ip();
|
||||
|
||||
let Some(node_id) = self.lp_nodes.get_node_id(ip) else {
|
||||
return Err(LpHandlerError::NotLpNode { ip_addr: ip });
|
||||
};
|
||||
|
||||
match self.nodes_handles.get(&node_id) {
|
||||
Some(NodeHandle::Established(handle)) => {
|
||||
todo!()
|
||||
}
|
||||
Some(NodeHandle::Pending(notify)) => {
|
||||
Ok(ConnectionControllerResponse::Pending(notify.clone()))
|
||||
}
|
||||
None => {
|
||||
let (res, notify) = ConnectionControllerResponse::new_pending();
|
||||
self.nodes_handles
|
||||
.insert(node_id, NodeHandle::Pending(notify.clone()));
|
||||
|
||||
// create a new connection and return a pending response
|
||||
todo!();
|
||||
return Ok(res);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_request(&mut self, request: NestedConnectionControllerRequest) {
|
||||
match request {
|
||||
NestedConnectionControllerRequest::ConnectionHandler {
|
||||
request,
|
||||
response_tx,
|
||||
} => {
|
||||
response_tx
|
||||
.send(self.handle_get_connection_handler(request).await)
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) {
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.shutdown.cancelled() => {
|
||||
break;
|
||||
}
|
||||
Some(request) = self.receiver.recv() => {
|
||||
self.handle_request(request).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Nested connection controller shutdown complete");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use super::{
|
||||
ConnectionControllerResponse, ConnectionHandlerResponse, GetConnectionHandler,
|
||||
NestedConnectionControllerRequest, NodeConnectionControllerSender,
|
||||
};
|
||||
use crate::node::lp::error::LpHandlerError;
|
||||
use crate::node::lp::forwarding::client_connection::NestedClientConnection;
|
||||
use nym_lp::peer_config::LpReceiverIndex;
|
||||
use std::net::SocketAddr;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct NestedConnectionsManager {
|
||||
sender: NodeConnectionControllerSender,
|
||||
}
|
||||
|
||||
impl NestedConnectionsManager {
|
||||
pub fn new(sender: NodeConnectionControllerSender) -> Self {
|
||||
Self { sender }
|
||||
}
|
||||
|
||||
async fn send_connection_handler_request(
|
||||
&self,
|
||||
request: GetConnectionHandler,
|
||||
) -> Result<ConnectionHandlerResponse, LpHandlerError> {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.sender
|
||||
.send(NestedConnectionControllerRequest::ConnectionHandler {
|
||||
request,
|
||||
response_tx,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| LpHandlerError::internal("nested connection controller shut down"))?;
|
||||
|
||||
response_rx.await.map_err(|_| {
|
||||
LpHandlerError::internal("nested connection controller hasn't send a response")
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn get_connection_handle(
|
||||
&self,
|
||||
target_gateway_lp_address: SocketAddr,
|
||||
inner_receiver_index: LpReceiverIndex,
|
||||
) -> Result<NestedClientConnection, LpHandlerError> {
|
||||
let request = GetConnectionHandler {
|
||||
target_gateway_lp_address,
|
||||
inner_receiver_index,
|
||||
};
|
||||
|
||||
let notify = match self.send_connection_handler_request(request).await?? {
|
||||
// if we have received a ready response, we can return the connection
|
||||
ConnectionControllerResponse::Ready(conn) => return Ok(conn),
|
||||
|
||||
// otherwise we need to wait for the notification when it becomes available
|
||||
ConnectionControllerResponse::Pending(notify) => notify,
|
||||
};
|
||||
|
||||
// TODO: timeout
|
||||
notify.notified().await;
|
||||
|
||||
match self.send_connection_handler_request(request).await?? {
|
||||
// if we have received a ready response, we can return the connection
|
||||
ConnectionControllerResponse::Ready(conn) => Ok(conn),
|
||||
|
||||
// otherwise we need to wait for the notification when it becomes available
|
||||
ConnectionControllerResponse::Pending(_) => Err(LpHandlerError::internal(
|
||||
"unavailable connection handler after successful notification",
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::node::lp::error::LpHandlerError;
|
||||
use crate::node::lp::forwarding::client_connection::NestedClientConnection;
|
||||
use nym_lp::peer_config::LpReceiverIndex;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
use tokio::sync::{Notify, oneshot};
|
||||
|
||||
pub mod client_connection;
|
||||
pub mod controller;
|
||||
pub mod manager;
|
||||
|
||||
pub type NodeConnectionControllerReceiver = Receiver<NestedConnectionControllerRequest>;
|
||||
pub type NodeConnectionControllerSender = Sender<NestedConnectionControllerRequest>;
|
||||
|
||||
pub(crate) enum ConnectionControllerResponse<T> {
|
||||
/// The response is immediately available
|
||||
Ready(T),
|
||||
|
||||
/// The response is in the process of being resolved. It will be ready once the returned
|
||||
/// notify resolves. At this point the caller should repeat the query
|
||||
Pending(Arc<Notify>),
|
||||
}
|
||||
|
||||
impl<T> ConnectionControllerResponse<T> {
|
||||
pub fn new_pending() -> (Self, Arc<Notify>) {
|
||||
let notify = Arc::new(Notify::new());
|
||||
(
|
||||
ConnectionControllerResponse::Pending(notify.clone()),
|
||||
notify,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub type ControllerResponse<T> = Result<ConnectionControllerResponse<T>, LpHandlerError>;
|
||||
|
||||
pub type ConnectionHandlerResponse = ControllerResponse<NestedClientConnection>;
|
||||
|
||||
pub enum NestedConnectionControllerRequest {
|
||||
/// Attempt to retrieve or create a handle to an exit gateway connection.
|
||||
/// If the connection doesn't exist, it will be established
|
||||
ConnectionHandler {
|
||||
request: GetConnectionHandler,
|
||||
response_tx: oneshot::Sender<ConnectionHandlerResponse>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub(crate) struct GetConnectionHandler {
|
||||
/// Target gateway's LP address
|
||||
pub target_gateway_lp_address: SocketAddr,
|
||||
|
||||
/// Receiver index on the inner packet
|
||||
pub inner_receiver_index: LpReceiverIndex,
|
||||
}
|
||||
+39
-10
@@ -9,7 +9,7 @@
|
||||
// ## Connection Metrics (via NetworkStats in nym-node-metrics)
|
||||
// - active_lp_connections: Gauge tracking current active LP connections (incremented on accept, decremented on close)
|
||||
//
|
||||
// ## Handler Metrics (in handler.rs)
|
||||
// ## Handler Metrics (in client_handler)
|
||||
// - lp_connections_total: Counter for total LP connections handled
|
||||
// - lp_client_hello_failed: Counter for ClientHello failures (timestamp validation, protocol errors)
|
||||
// - lp_handshakes_success: Counter for successful handshake completions
|
||||
@@ -46,7 +46,7 @@
|
||||
// ## Error Categorization Metrics
|
||||
// - lp_errors_wg_peer_registration: Counter for WireGuard peer registration failures
|
||||
//
|
||||
// ## Connection Lifecycle Metrics (in handler.rs)
|
||||
// ## Connection Lifecycle Metrics (in client_handler)
|
||||
// - lp_connection_duration_seconds: Histogram of connection duration from start to end (buckets: 1s to 24h)
|
||||
// - lp_connection_bytes_received_total: Counter for total bytes received including protocol framing
|
||||
// - lp_connection_bytes_sent_total: Counter for total bytes sent including protocol framing
|
||||
@@ -58,7 +58,7 @@
|
||||
// - lp_states_cleanup_session_removed: Counter for stale sessions removed by cleanup task
|
||||
// - lp_states_cleanup_demoted_removed: Counter for demoted (read-only) sessions removed by cleanup task
|
||||
//
|
||||
// ## Subsession/Rekeying Metrics (in handler.rs)
|
||||
// ## Subsession/Rekeying Metrics (in client_handler)
|
||||
// - lp_subsession_kk2_sent: Counter for SubsessionKK2 responses sent (indicates client initiated rekeying)
|
||||
// - lp_subsession_complete: Counter for successful subsession promotions
|
||||
// - lp_subsession_receiver_index_collision: Counter for subsession receiver_index collisions
|
||||
@@ -70,9 +70,8 @@
|
||||
use crate::config::LpConfig;
|
||||
use crate::error::NymNodeError;
|
||||
use crate::node::lp::cleanup::CleanupTask;
|
||||
use crate::node::lp::control::listener::LpControlListener;
|
||||
use crate::node::lp::control::ingress::listener::LpControlListener;
|
||||
use crate::node::lp::data::listener::LpDataListener;
|
||||
use dashmap::DashMap;
|
||||
use nym_gateway::node::wireguard::PeerRegistrator;
|
||||
use nym_lp::peer::LpLocalPeer;
|
||||
use nym_mixnet_client::forwarder::MixForwardingSender;
|
||||
@@ -82,13 +81,19 @@ use std::sync::Arc;
|
||||
use tokio::sync::Semaphore;
|
||||
use tracing::error;
|
||||
|
||||
use crate::node::lp::directory::LpNodes;
|
||||
use crate::node::lp::forwarding::controller::NestedConnectionsController;
|
||||
use crate::node::lp::forwarding::manager::NestedConnectionsManager;
|
||||
use crate::node::lp::state::{ActiveLpSessions, SharedLpNodeControlState};
|
||||
pub use nym_mixnet_client::forwarder::{MixForwardingReceiver, mix_forwarding_channels};
|
||||
pub use state::{SharedLpControlState, SharedLpDataState, SharedLpState};
|
||||
pub use state::{SharedLpClientControlState, SharedLpDataState, SharedLpState};
|
||||
|
||||
mod cleanup;
|
||||
pub mod control;
|
||||
mod data;
|
||||
pub mod directory;
|
||||
pub mod error;
|
||||
pub mod forwarding;
|
||||
mod registration;
|
||||
pub mod state;
|
||||
|
||||
@@ -96,6 +101,7 @@ pub struct LpSetup {
|
||||
control_listener: LpControlListener,
|
||||
data_listener: LpDataListener,
|
||||
cleanup_task: CleanupTask,
|
||||
nested_connections_controller: NestedConnectionsController,
|
||||
|
||||
/// Shutdown coordination
|
||||
shutdown: ShutdownTracker,
|
||||
@@ -107,11 +113,17 @@ impl LpSetup {
|
||||
lp_config: LpConfig,
|
||||
metrics: NymNodeMetrics,
|
||||
peer_registrator: Option<PeerRegistrator>,
|
||||
network_nodes: LpNodes,
|
||||
mix_packet_sender: MixForwardingSender,
|
||||
shutdown: ShutdownTracker,
|
||||
) -> Result<Self, NymNodeError> {
|
||||
// TODO: this will require loading old states from disk in the future
|
||||
let session_states = Arc::new(DashMap::new());
|
||||
let session_states = ActiveLpSessions::new();
|
||||
|
||||
let nested_connections_controller = NestedConnectionsController::new(
|
||||
network_nodes.clone(),
|
||||
shutdown.clone_shutdown_token(),
|
||||
);
|
||||
|
||||
let shared_lp_state = SharedLpState {
|
||||
metrics,
|
||||
@@ -119,13 +131,22 @@ impl LpSetup {
|
||||
session_states: session_states.clone(),
|
||||
};
|
||||
|
||||
let control_state = SharedLpControlState {
|
||||
local_lp_peer,
|
||||
let client_control_state = SharedLpClientControlState {
|
||||
local_lp_peer: local_lp_peer.clone(),
|
||||
peer_registrator,
|
||||
nested_connections_manager: NestedConnectionsManager::new(
|
||||
nested_connections_controller.request_sender(),
|
||||
),
|
||||
forward_semaphore: Arc::new(Semaphore::new(lp_config.debug.max_concurrent_forwards)),
|
||||
shared: shared_lp_state.clone(),
|
||||
};
|
||||
|
||||
let nodes_control_state = SharedLpNodeControlState {
|
||||
local_lp_peer,
|
||||
nodes: network_nodes,
|
||||
shared: shared_lp_state.clone(),
|
||||
};
|
||||
|
||||
let data_state = SharedLpDataState {
|
||||
outbound_mix_sender: mix_packet_sender,
|
||||
shared: shared_lp_state,
|
||||
@@ -133,7 +154,8 @@ impl LpSetup {
|
||||
|
||||
let control_listener = LpControlListener::new(
|
||||
lp_config.control_bind_address,
|
||||
control_state,
|
||||
client_control_state,
|
||||
nodes_control_state,
|
||||
shutdown.clone(),
|
||||
);
|
||||
let data_listener = LpDataListener::new(
|
||||
@@ -151,6 +173,7 @@ impl LpSetup {
|
||||
control_listener,
|
||||
data_listener,
|
||||
cleanup_task,
|
||||
nested_connections_controller,
|
||||
shutdown,
|
||||
})
|
||||
}
|
||||
@@ -187,5 +210,11 @@ impl LpSetup {
|
||||
async move { self.cleanup_task.run().await },
|
||||
"LP::CleanupTask",
|
||||
);
|
||||
|
||||
// nested connections controller
|
||||
self.shutdown.try_spawn_named(
|
||||
async move { self.nested_connections_controller.run().await },
|
||||
"LP::NestedConnectionsController",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::node::lp::state::SharedLpControlState;
|
||||
use crate::node::lp::state::SharedLpClientControlState;
|
||||
use nym_lp::peer_config::LpReceiverIndex;
|
||||
use nym_metrics::{add_histogram_obs, inc};
|
||||
use nym_registration_common::dvpn::{
|
||||
@@ -29,7 +29,7 @@ const LP_REGISTRATION_DURATION_BUCKETS: &[f64] = &[
|
||||
30.0, // 30s
|
||||
];
|
||||
|
||||
impl SharedLpControlState {
|
||||
impl SharedLpClientControlState {
|
||||
async fn process_dvpn_initial_registration(
|
||||
&self,
|
||||
sender: LpReceiverIndex,
|
||||
|
||||
@@ -3,7 +3,10 @@
|
||||
|
||||
use crate::config::LpConfig;
|
||||
use crate::node::lp::cleanup::TimestampedState;
|
||||
use crate::node::lp::directory::LpNodes;
|
||||
use crate::node::lp::error::LpHandlerError;
|
||||
use dashmap::DashMap;
|
||||
use dashmap::mapref::one::RefMut;
|
||||
use nym_gateway::node::wireguard::PeerRegistrator;
|
||||
use nym_lp::LpTransportSession;
|
||||
use nym_lp::peer::LpLocalPeer;
|
||||
@@ -13,27 +16,46 @@ use nym_node_metrics::NymNodeMetrics;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
/// Shared state for LP control connections
|
||||
pub use crate::node::lp::forwarding::manager::NestedConnectionsManager;
|
||||
|
||||
/// Shared state for LP clients control connections
|
||||
#[derive(Clone)]
|
||||
pub struct SharedLpControlState {
|
||||
pub struct SharedLpClientControlState {
|
||||
/// Encapsulates all required key information of a local Lewes Protocol Peer.
|
||||
pub local_lp_peer: LpLocalPeer,
|
||||
|
||||
/// Handle registering new wireguard peers
|
||||
pub peer_registrator: Option<PeerRegistrator>,
|
||||
|
||||
/// Controller for obtaining handles to forwarding channels
|
||||
pub nested_connections_manager: NestedConnectionsManager,
|
||||
|
||||
/// Semaphore limiting concurrent forward connections
|
||||
///
|
||||
/// Prevents file descriptor exhaustion when forwarding LP packets during
|
||||
/// telescope setup. When at capacity, forward requests return an error
|
||||
/// so clients can choose a different gateway.
|
||||
// this is temporary until there is persistent KKT/PSQ session between nodes
|
||||
// #[deprecated]
|
||||
pub forward_semaphore: Arc<Semaphore>,
|
||||
|
||||
/// Common shared data
|
||||
pub shared: SharedLpState,
|
||||
}
|
||||
|
||||
/// Shared state for LP nodes control connections
|
||||
#[derive(Clone)]
|
||||
pub struct SharedLpNodeControlState {
|
||||
/// Encapsulates all required key information of a local Lewes Protocol Peer.
|
||||
pub local_lp_peer: LpLocalPeer,
|
||||
|
||||
/// Information about all known LP nodes
|
||||
pub nodes: LpNodes,
|
||||
|
||||
/// Common shared data
|
||||
pub shared: SharedLpState,
|
||||
}
|
||||
|
||||
/// Shared state for LP data connections
|
||||
#[derive(Clone)]
|
||||
pub struct SharedLpDataState {
|
||||
@@ -48,6 +70,37 @@ pub struct SharedLpDataState {
|
||||
pub shared: SharedLpState,
|
||||
}
|
||||
|
||||
/// Established sessions keyed by the receiver index
|
||||
///
|
||||
/// Wrapped in TimestampedState for TTL-based cleanup of inactive sessions.
|
||||
#[derive(Clone, Default)]
|
||||
pub struct ActiveLpSessions {
|
||||
// TODO: this might require split between client and node sessions. TBD
|
||||
pub(crate) sessions: Arc<DashMap<LpReceiverIndex, TimestampedState<LpTransportSession>>>,
|
||||
}
|
||||
|
||||
impl ActiveLpSessions {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub(crate) fn get_state_entry_mut(
|
||||
&self,
|
||||
receiver_index: LpReceiverIndex,
|
||||
) -> Result<RefMut<'_, LpReceiverIndex, TimestampedState<LpTransportSession>>, LpHandlerError>
|
||||
{
|
||||
self.sessions
|
||||
.get_mut(&receiver_index)
|
||||
.ok_or_else(|| LpHandlerError::MissingLpSession { receiver_index })
|
||||
}
|
||||
|
||||
pub(crate) fn insert_new_session(&self, session: LpTransportSession) {
|
||||
let receiver_index = session.receiver_index();
|
||||
self.sessions
|
||||
.insert(receiver_index, TimestampedState::new(session));
|
||||
}
|
||||
}
|
||||
|
||||
/// Shared state for LP connection handlers
|
||||
#[derive(Clone)]
|
||||
pub struct SharedLpState {
|
||||
@@ -57,8 +110,6 @@ pub struct SharedLpState {
|
||||
/// LP configuration (for timestamp validation, etc.)
|
||||
pub lp_config: LpConfig,
|
||||
|
||||
/// Established sessions keyed by receiver index
|
||||
///
|
||||
/// Wrapped in TimestampedState for TTL-based cleanup of inactive sessions.
|
||||
pub session_states: Arc<DashMap<LpReceiverIndex, TimestampedState<LpTransportSession>>>,
|
||||
/// Currently active LP sessions
|
||||
pub session_states: ActiveLpSessions,
|
||||
}
|
||||
|
||||
@@ -84,6 +84,7 @@ use tokio::sync::mpsc;
|
||||
use tracing::{debug, info, trace};
|
||||
use zeroize::Zeroizing;
|
||||
|
||||
use crate::node::lp::directory::LpNodes;
|
||||
pub use nym_gateway::node::ActiveClientsStore;
|
||||
pub use nym_gateway::node::GatewayStorage;
|
||||
|
||||
@@ -491,6 +492,7 @@ impl NymNode {
|
||||
&self,
|
||||
peer_registrator: Option<PeerRegistrator>,
|
||||
mix_packet_sender: MixForwardingSender,
|
||||
network_nodes: LpNodes,
|
||||
) -> Result<LpSetup, NymNodeError> {
|
||||
let lp_peer = LpLocalPeer::new(Ciphersuite::default(), self.x25519_lp_keys.clone())
|
||||
.with_kem_keys(self.psq_kem_keys.clone());
|
||||
@@ -500,6 +502,7 @@ impl NymNode {
|
||||
self.config.lp,
|
||||
self.metrics.clone(),
|
||||
peer_registrator,
|
||||
network_nodes,
|
||||
mix_packet_sender,
|
||||
self.shutdown_manager.shutdown_tracker().clone(),
|
||||
)
|
||||
@@ -683,6 +686,7 @@ impl NymNode {
|
||||
async fn start_gateway_tasks(
|
||||
&mut self,
|
||||
cached_network: CachedNetwork,
|
||||
lp_nodes: LpNodes,
|
||||
metrics_sender: MetricEventsSender,
|
||||
active_clients_store: ActiveClientsStore,
|
||||
mix_packet_sender: MixForwardingSender,
|
||||
@@ -766,7 +770,7 @@ impl NymNode {
|
||||
self.config.lp.control_bind_address, self.config.lp.data_bind_address,
|
||||
);
|
||||
let lp_tasks = self
|
||||
.build_lp_tasks(wg_peer_registrator.clone(), mix_packet_sender)
|
||||
.build_lp_tasks(wg_peer_registrator.clone(), mix_packet_sender, lp_nodes)
|
||||
.await?;
|
||||
lp_tasks.start_tasks();
|
||||
} else {
|
||||
@@ -1355,6 +1359,7 @@ impl NymNode {
|
||||
|
||||
let network_refresher = self.build_network_refresher().await?;
|
||||
let active_clients_store = ActiveClientsStore::new();
|
||||
let lp_nodes = network_refresher.lp_nodes();
|
||||
|
||||
let bloomfilters_manager = self.setup_replay_detection().await?;
|
||||
|
||||
@@ -1381,6 +1386,7 @@ impl NymNode {
|
||||
|
||||
self.start_gateway_tasks(
|
||||
network_refresher.cached_network(),
|
||||
lp_nodes,
|
||||
metrics_sender,
|
||||
active_clients_store,
|
||||
mix_packet_sender,
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
use crate::error::NymNodeError;
|
||||
use crate::node::key_rotation::active_keys::ActiveSphinxKeys;
|
||||
use crate::node::lp::directory::LpNodes;
|
||||
use crate::node::routing_filter::network_filter::NetworkRoutingFilter;
|
||||
use async_trait::async_trait;
|
||||
use nym_crypto::asymmetric::ed25519;
|
||||
@@ -212,6 +213,7 @@ pub struct NetworkRefresher {
|
||||
network: CachedNetwork,
|
||||
routing_filter: NetworkRoutingFilter,
|
||||
noise_view: NoiseNetworkView,
|
||||
lp_nodes: LpNodes,
|
||||
}
|
||||
|
||||
impl NetworkRefresher {
|
||||
@@ -240,6 +242,7 @@ impl NetworkRefresher {
|
||||
network: CachedNetwork::new_empty(),
|
||||
routing_filter: NetworkRoutingFilter::new_empty(testnet),
|
||||
noise_view: NoiseNetworkView::new_empty(),
|
||||
lp_nodes: Default::default(),
|
||||
};
|
||||
|
||||
this.obtain_initial_network().await?;
|
||||
@@ -335,6 +338,8 @@ impl NetworkRefresher {
|
||||
.collect::<HashMap<_, _>>();
|
||||
self.noise_view.swap_view(noise_nodes);
|
||||
|
||||
warn!("unimplemented LP nodes update");
|
||||
|
||||
let mut network_guard = self.network.inner.write().await;
|
||||
network_guard.topology_metadata = metadata.to_topology_metadata();
|
||||
network_guard.network_nodes = nodes;
|
||||
@@ -373,6 +378,10 @@ impl NetworkRefresher {
|
||||
self.noise_view.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn lp_nodes(&self) -> LpNodes {
|
||||
self.lp_nodes.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn run(&mut self) {
|
||||
let mut full_refresh_interval = interval(self.full_refresh_interval);
|
||||
full_refresh_interval.reset();
|
||||
|
||||
@@ -14,6 +14,7 @@ use nym_crypto::asymmetric::{ed25519, x25519};
|
||||
use nym_lp::LpTransportSession;
|
||||
use nym_lp::peer::{DHKeyPair, LpLocalPeer, LpRemotePeer};
|
||||
use nym_lp::peer_config::LpReceiverIndex;
|
||||
use nym_lp::psq::initiator::HandshakeMode;
|
||||
use nym_lp::transport::traits::LpTransportChannel;
|
||||
use nym_lp::transport::{LpHandshakeChannel, LpTransportError};
|
||||
use nym_lp::{Ciphersuite, packet::EncryptedLpPacket, packet::version};
|
||||
@@ -396,7 +397,8 @@ where
|
||||
local_peer,
|
||||
remote_peer,
|
||||
protocol_version,
|
||||
)
|
||||
HandshakeMode::OneWayEntry,
|
||||
)?
|
||||
.complete_handshake()
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@ use nym_crypto::asymmetric::{ed25519, x25519};
|
||||
use nym_lp::packet::version;
|
||||
use nym_lp::packet::{EncryptedLpPacket, LpMessage};
|
||||
use nym_lp::peer::{DHKeyPair, LpLocalPeer, LpRemotePeer};
|
||||
use nym_lp::psq::initiator::HandshakeMode;
|
||||
use nym_lp::transport::LpHandshakeChannel;
|
||||
use nym_lp::transport::traits::LpTransportChannel;
|
||||
use nym_lp::{Ciphersuite, KEM, LpTransportSession};
|
||||
@@ -185,7 +186,8 @@ impl NestedLpSession {
|
||||
local_peer,
|
||||
remote_peer,
|
||||
protocol_version,
|
||||
)
|
||||
HandshakeMode::OneWayExit,
|
||||
)?
|
||||
.complete_handshake()
|
||||
.await?;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user