Compare commits

..

5 Commits

Author SHA1 Message Date
benedettadavico d1ba5acd72 mebbe 2026-03-06 07:53:44 +01:00
benedettadavico b7b238584d add file 2026-03-06 07:53:44 +01:00
benedettadavico 7ed5b5477b ? 2026-03-06 07:53:44 +01:00
benedettadavico 09506683ce test.. 2026-03-06 07:53:44 +01:00
benedettadavico 98f1480ae8 attempt at fix 2026-03-06 07:53:44 +01:00
100 changed files with 2021 additions and 3338 deletions
@@ -15,6 +15,9 @@ env:
jobs:
publish-dry-run:
runs-on: arc-linux-latest
timeout-minutes: 35
env:
RUSTUP_PERMIT_COPY_RENAME: 1
steps:
- name: Checkout repo
uses: actions/checkout@v6
@@ -59,20 +62,60 @@ jobs:
- name: Bump versions (local only)
run: |
cargo workspaces version custom ${{ inputs.version }} \
--allow-branch ${{ github.ref_name }} \
--no-git-commit \
--yes
- name: Preflight publish checks
run: |
python3 tools/internal/check_publish_preflight.py
# Dry run may show cascading dependency errors because packages aren't
# actually uploaded - these are expected and ignored. We check for real
# errors like packaging failures, missing metadata, or invalid Cargo.toml.
- name: Publish (dry run)
run: |
output=$(cargo workspaces publish --dry-run --allow-dirty 2>&1) || true
echo "$output"
set +e
publish_status=1
max_attempts=2
attempt=1
rm -f /tmp/publish-dry-run.log
# Check for real errors (not cascading dependency errors)
# Cascading errors mention "crates.io index", real errors mention "Cargo.toml"
echo "$output" | grep -i "Cargo.toml" && exit 1 || true
while [ "$attempt" -le "$max_attempts" ]; do
echo "Dry-run publish attempt ${attempt}/${max_attempts}"
cargo workspaces publish --dry-run --allow-dirty 2>&1 | tee /tmp/publish-dry-run.log
publish_status=${PIPESTATUS[0]}
if [ "$publish_status" -eq 0 ]; then
break
fi
# Retry once for interruption/runner issues.
if [ "$attempt" -lt "$max_attempts" ] && \
{ [ "$publish_status" -eq 130 ] || [ "$publish_status" -eq 137 ]; }; then
echo "Publish dry-run interrupted (exit ${publish_status}), retrying in 10s..."
sleep 10
attempt=$((attempt + 1))
continue
fi
break
done
set -e
if grep -Eiq \
"failed to verify manifest|failed to parse manifest|invalid Cargo.toml|error: package .* has no (description|license|repository)" \
/tmp/publish-dry-run.log; then
echo "Detected real packaging/manifest errors"
exit 1
fi
# In dry-run mode, non-zero publish status is expected due to
# dependency-cascade failures against crates.io index.
if [ "$publish_status" -ne 0 ]; then
echo "Dry-run publish returned non-zero (${publish_status}) but no real manifest blockers were detected."
fi
echo "Only expected dry-run dependency cascade errors detected (if any)."
# Show the list of packages published
- name: Show package versions
@@ -17,6 +17,8 @@ on:
jobs:
publish:
runs-on: arc-linux-latest
env:
RUSTUP_PERMIT_COPY_RENAME: 1
steps:
- name: Checkout repo
uses: actions/checkout@v6
+2
View File
@@ -17,6 +17,8 @@ on:
jobs:
publish:
runs-on: arc-linux-latest
env:
RUSTUP_PERMIT_COPY_RENAME: 1
steps:
- name: Checkout repo
uses: actions/checkout@v6
@@ -15,6 +15,8 @@ env:
jobs:
version-bump:
runs-on: arc-linux-latest
env:
RUSTUP_PERMIT_COPY_RENAME: 1
permissions:
contents: write
steps:
@@ -25,6 +25,10 @@ jobs:
- name: Install cargo-workspaces
run: cargo install cargo-workspaces
- name: Preflight publish checks
run: |
python3 tools/internal/check_publish_preflight.py
- name: Publish remaining crates
env:
CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }}
Generated
+1 -2
View File
@@ -6833,7 +6833,6 @@ dependencies = [
"nym-kkt-ciphersuite",
"nym-kkt-context",
"nym-pemstore",
"nym-test-utils",
"rand 0.9.2",
"rand_chacha 0.9.0",
"strum",
@@ -6876,7 +6875,7 @@ dependencies = [
[[package]]
name = "nym-lp"
version = "1.20.4"
version = "0.1.0"
dependencies = [
"anyhow",
"bs58",
+1 -3
View File
@@ -202,7 +202,7 @@ homepage = "https://nymtech.net"
documentation = "https://nymtech.net"
edition = "2024"
license = "Apache-2.0"
rust-version = "1.87.0"
rust-version = "1.85"
readme = "README.md"
version = "1.20.4"
@@ -448,10 +448,8 @@ nym-http-api-common = { version = "1.20.4", path = "common/http-api-common", def
nym-id = { version = "1.20.4", path = "common/nym-id" }
nym-ip-packet-client = { version = "1.20.4", path = "nym-ip-packet-client" }
nym-ip-packet-requests = { version = "1.20.4", path = "common/ip-packet-requests" }
nym-lp = { version = "1.20.4", path = "common/nym-lp" }
nym-kkt = { version = "0.1.0", path = "common/nym-kkt" }
nym-kkt-ciphersuite = { version = "1.20.4", path = "common/nym-kkt-ciphersuite" }
nym-kkt-context = { version = "1.20.4", path = "common/nym-kkt-context" }
nym-metrics = { version = "1.20.4", path = "common/nym-metrics" }
nym-mixnet-client = { version = "1.20.4", path = "common/client-libs/mixnet-client" }
nym-mixnet-contract-common = { version = "1.20.4", path = "common/cosmwasm-smart-contracts/mixnet-contract" }
@@ -23,7 +23,7 @@ use nym_api_requests::models::{
MixnodeCoreStatusResponse, NymNodeDescriptionV1, NymNodeDescriptionV2,
};
use nym_api_requests::nym_nodes::{
NodesByAddressesResponse, SemiSkimmedNodesWithMetadata, SkimmedNodeV1, SkimmedNodesWithMetadata,
NodesByAddressesResponse, SemiSkimmedNodesWithMetadata, SkimmedNode, SkimmedNodesWithMetadata,
};
use nym_coconut_dkg_common::types::EpochId;
use nym_http_api_client::UserAgent;
@@ -354,12 +354,12 @@ impl NymApiClient {
}
#[deprecated(note = "use get_all_basic_active_mixing_assigned_nodes instead")]
pub async fn get_basic_mixnodes(&self) -> Result<Vec<SkimmedNodeV1>, ValidatorClientError> {
pub async fn get_basic_mixnodes(&self) -> Result<Vec<SkimmedNode>, ValidatorClientError> {
Ok(self.nym_api.get_basic_mixnodes().await?.nodes)
}
#[deprecated(note = "use get_all_basic_entry_assigned_nodes instead")]
pub async fn get_basic_gateways(&self) -> Result<Vec<SkimmedNodeV1>, ValidatorClientError> {
pub async fn get_basic_gateways(&self) -> Result<Vec<SkimmedNode>, ValidatorClientError> {
Ok(self.nym_api.get_basic_gateways().await?.nodes)
}
@@ -372,7 +372,7 @@ impl NymApiClient {
#[deprecated(note = "use get_all_basic_entry_assigned_nodes_with_metadata instead")]
pub async fn get_all_basic_entry_assigned_nodes(
&self,
) -> Result<Vec<SkimmedNodeV1>, ValidatorClientError> {
) -> Result<Vec<SkimmedNode>, ValidatorClientError> {
self.get_all_basic_entry_assigned_nodes_with_metadata()
.await
.map(|res| res.nodes)
@@ -389,7 +389,7 @@ impl NymApiClient {
#[deprecated(note = "use get_all_basic_active_mixing_assigned_nodes_with_metadata instead")]
pub async fn get_all_basic_active_mixing_assigned_nodes(
&self,
) -> Result<Vec<SkimmedNodeV1>, ValidatorClientError> {
) -> Result<Vec<SkimmedNode>, ValidatorClientError> {
self.get_all_basic_active_mixing_assigned_nodes_with_metadata()
.await
.map(|res| res.nodes)
@@ -406,7 +406,7 @@ impl NymApiClient {
#[deprecated(note = "use get_all_basic_mixing_capable_nodes_with_metadata instead")]
pub async fn get_all_basic_mixing_capable_nodes(
&self,
) -> Result<Vec<SkimmedNodeV1>, ValidatorClientError> {
) -> Result<Vec<SkimmedNode>, ValidatorClientError> {
self.get_all_basic_mixing_capable_nodes_with_metadata()
.await
.map(|res| res.nodes)
@@ -420,7 +420,7 @@ impl NymApiClient {
/// retrieve basic information for all bonded nodes on the network
#[deprecated(note = "use get_all_basic_nodes_with_metadata instead")]
pub async fn get_all_basic_nodes(&self) -> Result<Vec<SkimmedNodeV1>, ValidatorClientError> {
pub async fn get_all_basic_nodes(&self) -> Result<Vec<SkimmedNode>, ValidatorClientError> {
self.get_all_basic_nodes_with_metadata()
.await
.map(|res| res.nodes)
@@ -4,7 +4,6 @@
use crate::nym_api::error::NymAPIError;
use crate::nym_api::routes::{ecash, CORE_STATUS_COUNT, SINCE_ARG};
use crate::nym_nodes::SkimmedNodesWithMetadata;
use crate::ValidatorClientError;
use async_trait::async_trait;
use nym_api_requests::ecash::models::{
AggregatedCoinIndicesSignatureResponse, AggregatedExpirationDateSignatureResponse,
@@ -21,14 +20,11 @@ use nym_api_requests::models::{
NymNodeDescriptionV1, NymNodeDescriptionV2, PerformanceHistoryResponse, RewardedSetResponse,
SignerInformationResponse,
};
use nym_api_requests::nym_nodes::{
NodesByAddressesRequestBody, NodesByAddressesResponse, PaginatedCachedNodesResponseV1,
PaginatedCachedNodesResponseV2,
};
use nym_api_requests::pagination::PaginatedResponse;
use nym_http_api_client::{ApiClient, NO_PARAMS};
use nym_mixnet_contract_common::{IdentityKeyRef, NodeId, NymNodeDetails};
use std::net::IpAddr;
use time::format_description::BorrowedFormatItem;
use time::Date;
use tracing::instrument;
pub use nym_api_requests::{
ecash::{
models::SpentCredentialsResponse, BlindSignRequestBody, BlindedSignatureResponse,
@@ -40,14 +36,17 @@ pub use nym_api_requests::{
MixnodeCoreStatusResponse, MixnodeStatusReportResponse, MixnodeStatusResponse,
MixnodeUptimeHistoryResponse, StakeSaturationResponse, UptimeResponse,
},
nym_nodes::{
CachedNodesResponse, NodesByAddressesRequestBody, NodesByAddressesResponse,
PaginatedCachedNodesResponseV1, PaginatedCachedNodesResponseV2, SemiSkimmedNodeV1,
SemiSkimmedNodeV3, SemiSkimmedNodesWithMetadata, SkimmedNodeV1,
},
nym_nodes::{CachedNodesResponse, SemiSkimmedNode, SemiSkimmedNodesWithMetadata, SkimmedNode},
NymNetworkDetailsResponse,
};
use nym_http_api_client::{ApiClient, NO_PARAMS};
use nym_mixnet_contract_common::{IdentityKeyRef, NodeId, NymNodeDetails};
use std::net::IpAddr;
use time::format_description::BorrowedFormatItem;
use time::Date;
use tracing::instrument;
use crate::ValidatorClientError;
pub use nym_coconut_dkg_common::types::EpochId;
pub mod error;
@@ -391,7 +390,7 @@ pub trait NymApiClientExt: ApiClient {
#[deprecated]
#[tracing::instrument(level = "debug", skip_all)]
async fn get_basic_mixnodes(&self) -> Result<CachedNodesResponse<SkimmedNodeV1>, NymAPIError> {
async fn get_basic_mixnodes(&self) -> Result<CachedNodesResponse<SkimmedNode>, NymAPIError> {
self.get_json(
&[
routes::V1_API_VERSION,
@@ -407,7 +406,7 @@ pub trait NymApiClientExt: ApiClient {
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_basic_gateways(&self) -> Result<CachedNodesResponse<SkimmedNodeV1>, NymAPIError> {
async fn get_basic_gateways(&self) -> Result<CachedNodesResponse<SkimmedNode>, NymAPIError> {
self.get_json(
&[
routes::V1_API_VERSION,
@@ -444,7 +443,7 @@ pub trait NymApiClientExt: ApiClient {
page: Option<u32>,
per_page: Option<u32>,
use_bincode: bool,
) -> Result<PaginatedCachedNodesResponseV1<SkimmedNodeV1>, NymAPIError> {
) -> Result<PaginatedCachedNodesResponseV1<SkimmedNode>, NymAPIError> {
let mut params = Vec::new();
if no_legacy {
@@ -486,7 +485,7 @@ pub trait NymApiClientExt: ApiClient {
page: Option<u32>,
per_page: Option<u32>,
use_bincode: bool,
) -> Result<PaginatedCachedNodesResponseV2<SkimmedNodeV1>, NymAPIError> {
) -> Result<PaginatedCachedNodesResponseV2<SkimmedNode>, NymAPIError> {
let mut params = Vec::new();
if no_legacy {
@@ -528,7 +527,7 @@ pub trait NymApiClientExt: ApiClient {
page: Option<u32>,
per_page: Option<u32>,
use_bincode: bool,
) -> Result<PaginatedCachedNodesResponseV1<SkimmedNodeV1>, NymAPIError> {
) -> Result<PaginatedCachedNodesResponseV1<SkimmedNode>, NymAPIError> {
let mut params = Vec::new();
if no_legacy {
@@ -570,7 +569,7 @@ pub trait NymApiClientExt: ApiClient {
page: Option<u32>,
per_page: Option<u32>,
use_bincode: bool,
) -> Result<PaginatedCachedNodesResponseV2<SkimmedNodeV1>, NymAPIError> {
) -> Result<PaginatedCachedNodesResponseV2<SkimmedNode>, NymAPIError> {
let mut params = Vec::new();
if no_legacy {
@@ -613,7 +612,7 @@ pub trait NymApiClientExt: ApiClient {
page: Option<u32>,
per_page: Option<u32>,
use_bincode: bool,
) -> Result<PaginatedCachedNodesResponseV1<SkimmedNodeV1>, NymAPIError> {
) -> Result<PaginatedCachedNodesResponseV1<SkimmedNode>, NymAPIError> {
let mut params = Vec::new();
if no_legacy {
@@ -655,7 +654,7 @@ pub trait NymApiClientExt: ApiClient {
page: Option<u32>,
per_page: Option<u32>,
use_bincode: bool,
) -> Result<PaginatedCachedNodesResponseV2<SkimmedNodeV1>, NymAPIError> {
) -> Result<PaginatedCachedNodesResponseV2<SkimmedNode>, NymAPIError> {
let mut params = Vec::new();
if no_legacy {
@@ -696,7 +695,7 @@ pub trait NymApiClientExt: ApiClient {
page: Option<u32>,
per_page: Option<u32>,
use_bincode: bool,
) -> Result<PaginatedCachedNodesResponseV1<SkimmedNodeV1>, NymAPIError> {
) -> Result<PaginatedCachedNodesResponseV1<SkimmedNode>, NymAPIError> {
let mut params = Vec::new();
if no_legacy {
@@ -734,7 +733,7 @@ pub trait NymApiClientExt: ApiClient {
page: Option<u32>,
per_page: Option<u32>,
use_bincode: bool,
) -> Result<PaginatedCachedNodesResponseV2<SkimmedNodeV1>, NymAPIError> {
) -> Result<PaginatedCachedNodesResponseV2<SkimmedNode>, NymAPIError> {
let mut params = Vec::new();
if no_legacy {
@@ -771,7 +770,7 @@ pub trait NymApiClientExt: ApiClient {
no_legacy: bool,
page: Option<u32>,
per_page: Option<u32>,
) -> Result<PaginatedCachedNodesResponseV2<SemiSkimmedNodeV1>, NymAPIError> {
) -> Result<PaginatedCachedNodesResponseV2<SemiSkimmedNode>, NymAPIError> {
let mut params = Vec::new();
if no_legacy {
@@ -798,21 +797,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_expanded_nodes_v3(
&self,
use_bincode: bool,
) -> Result<PaginatedCachedNodesResponseV2<SemiSkimmedNodeV3>, NymAPIError> {
let mut params = Vec::new();
if use_bincode {
params.push(("output", "bincode".to_string()))
}
self.get_response("/v3/unstable/nym-nodes/semi-skimmed", &params)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_report(
@@ -3,7 +3,6 @@
pub const V1_API_VERSION: &str = "v1";
pub const V2_API_VERSION: &str = "v2";
pub const V3_API_VERSION: &str = "v3";
pub const MIXNODES: &str = "mixnodes";
pub const GATEWAYS: &str = "gateways";
pub const DESCRIBED: &str = "described";
+4 -2
View File
@@ -511,12 +511,14 @@ mod tests {
#[test]
fn test_key_conversion() {
let dalek_kp = KeyPair::new(&mut rand::thread_rng());
let dalek_kp = super::KeyPair::new(&mut rand::thread_rng());
let mut dalek_private_key_bytes = dalek_kp.private_key().as_bytes().to_owned();
libcrux_curve25519::clamp(&mut dalek_private_key_bytes);
let libcrux_private_key = DHPrivateKey::from_bytes(&dalek_private_key_bytes).unwrap();
let libcrux_private_key =
libcrux_psq::handshake::types::DHPrivateKey::from_bytes(&dalek_private_key_bytes)
.unwrap();
let libcrux_public_key = libcrux_private_key.to_public();
assert_eq!(libcrux_public_key.as_ref(), dalek_kp.public_key.as_bytes());
-1
View File
@@ -1401,7 +1401,6 @@ pub trait ApiClient: ApiClientCore {
/// 'get' data from the segment-defined path, e.g. `["api", "v1", "mixnodes"]`, with tuple
/// defined key-value parameters, e.g. `[("since", "12345")]`. Attempt to parse the response
/// into the provided type `T` based on the content type header
#[instrument(level = "debug", skip_all, fields(path=?path))]
async fn get_response<P, T, K, V>(
&self,
path: P,
+1
View File
@@ -1,5 +1,6 @@
[package]
name = "nym-kkt-ciphersuite"
description = "Nym KKT ciphersuite"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
+2 -3
View File
@@ -12,9 +12,9 @@ num_enum = { workspace = true }
strum = { workspace = true }
# internal
nym-crypto = { workspace = true, features = ["hashing"] }
nym-crypto = { path = "../crypto", features = ["hashing"] }
nym-kkt-ciphersuite = { workspace = true, features = ["digests"] }
nym-kkt-context = { workspace = true }
nym-kkt-context = { path = "../nym-kkt-context" }
nym-pemstore = { workspace = true }
libcrux-kem = { workspace = true }
@@ -30,7 +30,6 @@ libcrux-ml-kem = { workspace = true }
[dev-dependencies]
rand_chacha = "0.9.0"
anyhow = { workspace = true }
nym-test-utils = { workspace = true }
[lints]
+2 -3
View File
@@ -10,7 +10,6 @@ use crate::error::KKTError;
pub const MAX_PAYLOAD_LEN: usize = 1_000_000;
const CARRIER_KDF_INFO_TX: &str = "CARRIER_V1_KDF_TX";
const CARRIER_KDF_INFO_RX: &str = "CARRIER_V1_KDF_RX";
const CARRIER_KKT_AAD: &[u8] = b"kkt-carrier-v1";
#[derive(Zeroize, ZeroizeOnDrop)]
pub struct Carrier {
@@ -108,7 +107,7 @@ impl Carrier {
&self.tx_key,
plaintext,
&mut output_buffer,
CARRIER_KKT_AAD,
b"kkt-carrier-v1",
&as_nonce_bytes(self.tx_counter),
)?;
@@ -127,7 +126,7 @@ impl Carrier {
&self.rx_key,
&mut output_buffer,
ciphertext,
CARRIER_KKT_AAD,
b"kkt-carrier-v1",
&as_nonce_bytes(self.rx_counter),
)?;
+11 -20
View File
@@ -3,7 +3,6 @@
use crate::context::KKTStatus;
use nym_kkt_ciphersuite::error::KKTCiphersuiteError;
use nym_kkt_ciphersuite::{HashFunction, KEM};
use nym_kkt_context::KKTContextEncodingError;
use std::fmt::Debug;
use thiserror::Error;
@@ -16,40 +15,40 @@ pub enum KKTError {
#[error(transparent)]
MaskedByteError(#[from] MaskedByteError),
#[error("KEM mapping failure: {info}")]
#[error("KEM mapping failure: {}", info)]
KEMMapping { info: &'static str },
#[error("Insecure Encapsulation Key Hash Length")]
InsecureHashLen,
#[error("KKT Frame Decoding Error: {info}")]
#[error("KKT Frame Decoding Error: {}", info)]
FrameDecodingError { info: String },
#[error("KKT Frame Encoding Error: {info}")]
#[error("KKT Frame Encoding Error: {}", info)]
FrameEncodingError { info: String },
#[error("KKT Incompatibility Error: {info}")]
#[error("KKT Incompatibility Error: {}", info)]
IncompatibilityError { info: &'static str },
#[error("KKT Responder Flagged Error: {status}")]
#[error("KKT Responder Flagged Error: {}", status)]
ResponderFlaggedError { status: KKTStatus },
#[error("PSQ KEM Error: {info}")]
#[error("PSQ KEM Error: {}", info)]
KEMError { info: &'static str },
#[error("Local Function Input Error: {info}")]
#[error("Local Function Input Error: {}", info)]
FunctionInputError { info: &'static str },
#[error("{info}")]
#[error("{}", info)]
X25519Error { info: &'static str },
#[error("{info}")]
#[error("{}", info)]
AEADError { info: &'static str },
#[error("{info}")]
#[error("{}", info)]
DecodingError { info: &'static str },
#[error("{info}")]
#[error("{}", info)]
UnsupportedAlgorithm { info: &'static str },
#[error("Generic libcrux error")]
@@ -63,14 +62,6 @@ pub enum KKTError {
#[error("the received encapsulation key hash does not match the expected value")]
MismatchedKEMHash,
#[error(
"there are no known digests for initiator's KEM key with {kem} KEM and {hash_function} hash function"
)]
NoKnownKEMKeyDigests {
kem: KEM,
hash_function: HashFunction,
},
#[error(transparent)]
MalformedContext(#[from] KKTContextEncodingError),
}
+8 -1
View File
@@ -140,7 +140,14 @@ pub fn initiator_process(
},
};
Ok(KKTFrame::new(context, body, payload.unwrap_or_default()))
Ok(KKTFrame::new(
context,
body,
match payload {
Some(payload_vec) => payload_vec,
None => Vec::with_capacity(0),
},
))
}
pub fn initiator_ingest_response(
+57 -124
View File
@@ -16,6 +16,9 @@ pub use nym_kkt_context as context;
#[cfg(test)]
mod test {
use nym_kkt_ciphersuite::{Ciphersuite, HashFunction, HashLength, KEM, SignatureScheme};
use rand09::RngCore;
use crate::keys::KEMKeys;
use crate::{
initiator::KKTInitiator,
@@ -25,13 +28,9 @@ mod test {
},
responder::KKTResponder,
};
use nym_kkt_ciphersuite::{Ciphersuite, HashFunction, HashLength, KEM, SignatureScheme};
use nym_test_utils::helpers::deterministic_rng_09;
use rand09::RngCore;
use std::collections::BTreeMap;
#[test]
fn test_kkt_psq_e2e_one_way_encrypted_carrier() {
fn test_kkt_psq_e2e_encrypted_carrier() {
let mut rng = rand09::rng();
let mut payload: Vec<u8> = vec![0u8; 900_000];
@@ -47,6 +46,7 @@ mod test {
HashFunction::Shake256,
] {
// generate kem public keys
let responder_mlkem_keypair = generate_keypair_mlkem(&mut rng);
let responder_mceliece_keypair = generate_keypair_mceliece(&mut rng);
@@ -63,13 +63,24 @@ mod test {
HashLength::Default.value(),
responder_kem.mc_eliece_encapsulation_key().as_ref(),
);
let initiator_mlkem_keypair = generate_keypair_mlkem(&mut rng);
let initiator_mceliece_keypair = generate_keypair_mceliece(&mut rng);
let init_hashes = BTreeMap::new();
let _i_dir_hash_mlkem = hash_encapsulation_key(
hash_function,
HashLength::Default.value(),
initiator_mlkem_keypair.public_key().as_slice(),
);
let _i_dir_hash_mceliece = hash_encapsulation_key(
hash_function,
HashLength::Default.value(),
initiator_mceliece_keypair.pk.as_ref(),
);
let responder = KKTResponder::new(
&responder_x25519_keypair,
&responder_kem,
&init_hashes,
&[
HashFunction::Blake3,
HashFunction::SHA256,
@@ -113,6 +124,41 @@ mod test {
responder_kem.ml_kem768_encapsulation_key().as_slice(),
)
}
// Mutual - MlKem
{
let ciphersuite = Ciphersuite::resolve_ciphersuite(
KEM::MlKem768,
hash_function,
SignatureScheme::Ed25519,
None,
)
.unwrap();
let (mut initiator, request) = KKTInitiator::generate_one_way_request(
&mut rng,
ciphersuite,
&responder_x25519_keypair.pk,
&r_dir_hash_mlkem,
1u8,
Some(payload.clone()),
)
.unwrap();
let processed_request = responder.process_request(request, payload.len()).unwrap();
assert_eq!(processed_request.request_payload, payload);
// if we keep unverified keys, this should change
assert!(processed_request.remote_encapsulation_key.is_none());
let processed_response = initiator
.process_response(processed_request.response, 0)
.unwrap();
assert_eq!(
processed_response.encapsulation_key.as_bytes(),
responder_kem.ml_kem768_encapsulation_key().as_slice(),
)
}
// OneWay - McEliece
{
@@ -145,110 +191,7 @@ mod test {
responder_kem.mc_eliece_encapsulation_key().as_ref()
)
}
}
}
#[test]
fn test_kkt_psq_e2e_mutual_encrypted_carrier() {
let mut rng = deterministic_rng_09();
let mut payload: Vec<u8> = vec![0u8; 50000];
rng.fill_bytes(&mut payload);
// generate kem public keys
let initiator_mlkem_keypair = generate_keypair_mlkem(&mut rng);
let initiator_mceliece_keypair = generate_keypair_mceliece(&mut rng);
let responder_mlkem_keypair = generate_keypair_mlkem(&mut rng);
let responder_mceliece_keypair = generate_keypair_mceliece(&mut rng);
let responder_x25519_keypair = generate_lp_keypair_x25519(&mut rng);
let initiator_kem = KEMKeys::new(initiator_mceliece_keypair, initiator_mlkem_keypair);
let responder_kem = KEMKeys::new(responder_mceliece_keypair, responder_mlkem_keypair);
let init_hashes = initiator_kem.encapsulation_keys_digests();
let responder = KKTResponder::new(
&responder_x25519_keypair,
&responder_kem,
&init_hashes,
&[
HashFunction::Blake3,
HashFunction::SHA256,
HashFunction::Shake128,
HashFunction::Shake256,
],
&[SignatureScheme::Ed25519],
&[1],
)
.unwrap();
for hash_function in [
HashFunction::Blake3,
HashFunction::SHA256,
HashFunction::Shake128,
HashFunction::Shake256,
] {
let r_dir_hash_mlkem = hash_encapsulation_key(
hash_function,
HashLength::Default.value(),
responder_kem.ml_kem768_encapsulation_key().as_slice(),
);
let r_dir_hash_mceliece = hash_encapsulation_key(
hash_function,
HashLength::Default.value(),
responder_kem.mc_eliece_encapsulation_key().as_ref(),
);
// Mutual - MlKem
{
let ciphersuite = Ciphersuite::resolve_ciphersuite(
KEM::MlKem768,
hash_function,
SignatureScheme::Ed25519,
None,
)
.unwrap();
let (mut initiator, request) = KKTInitiator::generate_mutual_request(
&mut rng,
ciphersuite,
initiator_kem
.encoded_encapsulation_key(KEM::MlKem768)
.unwrap(),
&responder_x25519_keypair.pk,
&r_dir_hash_mlkem,
1u8,
Some(payload.clone()),
)
.unwrap();
let processed_request = responder.process_request(request, payload.len()).unwrap();
assert_eq!(processed_request.request_payload, payload);
assert_eq!(
processed_request
.remote_encapsulation_key
.unwrap()
.as_bytes(),
initiator_kem
.encapsulation_key(KEM::MlKem768)
.unwrap()
.as_bytes()
);
let processed_response = initiator
.process_response(processed_request.response, 0)
.unwrap();
assert_eq!(
processed_response.encapsulation_key.as_bytes(),
responder_kem.ml_kem768_encapsulation_key().as_slice(),
)
}
// Mutual - McEliece is not supported due to the key being too large
{
let ciphersuite = Ciphersuite::resolve_ciphersuite(
KEM::McEliece,
@@ -257,12 +200,9 @@ mod test {
None,
)
.unwrap();
let (mut initiator, request) = KKTInitiator::generate_mutual_request(
let (mut initiator, request) = KKTInitiator::generate_one_way_request(
&mut rng,
ciphersuite,
initiator_kem
.encoded_encapsulation_key(KEM::McEliece)
.unwrap(),
&responder_x25519_keypair.pk,
&r_dir_hash_mceliece,
1u8,
@@ -273,16 +213,9 @@ mod test {
let processed_request = responder.process_request(request, payload.len()).unwrap();
assert_eq!(processed_request.request_payload, payload);
assert_eq!(
processed_request
.remote_encapsulation_key
.unwrap()
.as_bytes(),
initiator_kem
.encapsulation_key(KEM::McEliece)
.unwrap()
.as_bytes()
);
// if we keep unverified keys, this should change
assert!(processed_request.remote_encapsulation_key.is_none());
let processed_response = initiator
.process_response(processed_request.response, 0)
+2 -2
View File
@@ -114,14 +114,14 @@ impl KKTRequestPlaintext {
}
pub(crate) fn to_bytes(&self) -> Vec<u8> {
let mut out = Vec::with_capacity(Self::SIZE);
let mut out = Vec::with_capacity(x25519::PUBLIC_KEY_LENGTH + MASKED_BYTE_LEN);
out.extend_from_slice(self.dh_pubkey.as_ref());
out.extend_from_slice(self.masked_version_bytes.as_slice());
out
}
pub(crate) fn try_from_bytes(b: &[u8]) -> Result<Self, KKTError> {
if b.len() != Self::SIZE {
if b.len() != x25519::PUBLIC_KEY_LENGTH + MASKED_BYTE_LEN {
return Err(KKTError::FrameDecodingError {
info: "the KKTRequest frame has invalid length".to_string(),
});
+5 -3
View File
@@ -4,13 +4,15 @@
//! Post-Quantum Re-Key Protocol
/// This module implements a stateless post-quantum re-keying protocol in one round-trip.
/// We currently support MlKem768.
/// We currently support MlKem768 and XWing.
///
/// This protocol is safe if it runs under a trusted secure channel.
///
/// Bandwidth costs:
/// Request (MlKem768): 1216 bytes
/// Response (MlKem768): 1088 bytes
/// Request (XWing): 1248 bytes
/// Response (XWing): 1120 bytes
use libcrux_kem::*;
use nym_crypto::hkdf::blake3::derive_key_blake3;
use nym_kkt_ciphersuite::{KEM, mceliece, ml_kem768, x25519, xwing};
@@ -58,7 +60,7 @@ impl RekeyInitiator {
///
/// Inputs:
/// rng: something that implements CryptoRng + RngCore
/// kem: a KEM algorithm (we currently support MlKem768 only)
/// kem: a KEM algorithm (we currently support MlKem768 and XWing)
///
/// Outputs:
/// RekeyInitiator: A struct which contains the decapsulation key, the salt and the kem algorithm in use.
@@ -169,7 +171,7 @@ where
Some(num) => match num {
// If message length is 1216 (32 + 1184) then the algorithm should be MlKem768
ml_kem768::PUBLIC_KEY_LENGTH => Algorithm::MlKem768,
// If message length is 1248 (32 + 1216) then the algorithm should be xwing
// If message length is 1248 (32 + 1216) then the algorithm should be MlKem768
xwing::PUBLIC_KEY_LENGTH => Algorithm::XWingKemDraft06,
// We don't support McEliece because the keys are massive.
// If this is a deal-breaker, users can start a new session with PSQ which can use McEliece.
+16 -26
View File
@@ -10,8 +10,7 @@ use crate::{
frame::KKTFrame,
};
use libcrux_psq::handshake::types::DHKeyPair;
use nym_kkt_ciphersuite::{Ciphersuite, HashFunction, KEM, KEMKeyDigests, SignatureScheme};
use std::collections::BTreeMap;
use nym_kkt_ciphersuite::{Ciphersuite, HashFunction, SignatureScheme};
/// Representation of a KKT Responder
pub struct KKTResponder<'a> {
@@ -21,9 +20,6 @@ pub struct KKTResponder<'a> {
/// KEM keys of this responder
kem_keys: &'a KEMKeys,
/// Digests of the initiator's kem key
expected_initiator_kem_digests: &'a BTreeMap<KEM, KEMKeyDigests>,
/// List of supported Hash Functions by this Responder
supported_hash_functions: Vec<HashFunction>,
@@ -38,7 +34,6 @@ impl<'a> KKTResponder<'a> {
pub fn new(
x25519_keypair: &'a DHKeyPair,
kem_keys: &'a KEMKeys,
expected_initiator_kem_digests: &'a BTreeMap<KEM, KEMKeyDigests>,
supported_hash_functions: &[HashFunction],
supported_signature_schemes: &[SignatureScheme],
supported_outer_protocol_versions: &[u8],
@@ -64,28 +59,12 @@ impl<'a> KKTResponder<'a> {
Ok(Self {
x25519_keypair,
kem_keys,
expected_initiator_kem_digests,
supported_hash_functions: supported_hash_functions.to_vec(),
supported_signature_schemes: supported_signature_schemes.to_vec(),
supported_outer_protocol_versions: supported_outer_protocol_versions.to_vec(),
})
}
/// Attempt to retrieve expected KEM key hash of the initiator based on the received `Ciphersuite`
pub(crate) fn expected_initiator_kem_digest(
&self,
ciphersuite: Ciphersuite,
) -> Result<&Vec<u8>, KKTError> {
let kem = ciphersuite.kem();
let hash_function = ciphersuite.hash_function();
self.expected_initiator_kem_digests
.get(&kem)
.ok_or(KKTError::NoKnownKEMKeyDigests { kem, hash_function })?
.get(&hash_function)
.ok_or(KKTError::NoKnownKEMKeyDigests { kem, hash_function })
}
fn check_ciphersuite_compatiblity(
&self,
remote_ciphersuite: Ciphersuite,
@@ -123,7 +102,6 @@ impl<'a> KKTResponder<'a> {
)?;
let remote_context = *processed_req.remote_context();
let remote_frame = processed_req.remote_frame;
let request_payload = remote_frame.payload().to_vec();
let mut carrier = processed_req.carrier;
@@ -133,8 +111,12 @@ impl<'a> KKTResponder<'a> {
let (local_context, remote_encapsulation_key) = match remote_context.mode() {
KKTMode::OneWay => responder_ingest_message(None, remote_frame)?,
KKTMode::Mutual => {
let digest = self.expected_initiator_kem_digest(remote_context.ciphersuite())?;
responder_ingest_message(Some(digest), remote_frame)?
// So we can either fetch the remote hash here using some async call to the directory,
// which might make registration hang or accept the sent key then verify later.
// If we choose to not accept, the response's status will be KKTStatus::UnverifiedKEMKey.
// The response would still contain the responder's encapsulation key.
responder_ingest_message(None, remote_frame)?
}
};
@@ -146,7 +128,7 @@ impl<'a> KKTResponder<'a> {
};
// for now the response payload is empty
let response_payload = Vec::new();
let response_payload = Vec::with_capacity(0);
let frame = KKTFrame::new(local_context, kem_key, response_payload);
@@ -180,6 +162,14 @@ pub fn responder_ingest_message(
own_context.update_status(KKTStatus::UnverifiedKEMKey);
// we don't store an unverified key
// changing the status notifies the initiator that we didn't
// we could still keep it here and then verify later...
// let received_encapsulation_key = EncapsulationKey::decode(
// own_context.ciphersuite().kem(),
// remote_frame.body_ref(),
// )?;
// Ok((own_context, Some(received_encapsulation_key)))
//
return Ok((own_context, None));
};
+7 -13
View File
@@ -1,14 +1,8 @@
[package]
name = "nym-lp"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
readme.workspace = true
version.workspace = true
version = "0.1.0"
edition = { workspace = true }
license = { workspace = true }
publish = false
[dependencies]
@@ -17,11 +11,11 @@ bs58 = { workspace = true }
bytes = { workspace = true }
tracing = { workspace = true }
rand09 = { workspace = true }
tls_codec = { workspace = true }
tls_codec = { workspace = true }
tokio = { workspace = true, features = ["net", "io-util"] }
nym-crypto = { workspace = true, features = ["hashing"] }
nym-kkt = { workspace = true }
nym-crypto = { path = "../crypto", features = ["hashing"] }
nym-kkt = { path = "../nym-kkt" }
nym-kkt-ciphersuite = { workspace = true }
# libcrux dependencies for PSQ (Post-Quantum PSK derivation)
@@ -34,7 +28,7 @@ zeroize = { workspace = true, features = ["zeroize_derive"] }
nym-test-utils = { workspace = true, optional = true }
[dev-dependencies]
criterion = { workspace = true, features = ["html_reports"] }
criterion = { version = "0.5", features = ["html_reports"] }
nym-test-utils = { workspace = true }
anyhow = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
+79
View File
@@ -0,0 +1,79 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! Configuration for LP protocol.
//!
//! LP security stack = KKT (key fetch) → PSQ (PQ PSK) → Noise (transport).
//! KEM algorithm selection affects only PSQ layer. Noise always uses X25519 DH.
//! Migration to PQ KEMs (MlKem768, XWing) requires only config change.
use nym_kkt::ciphersuite::KEM;
use serde::{Deserialize, Serialize};
use std::time::Duration;
/// Default PSK time-to-live (1 hour, matches psk.rs implementation).
pub const DEFAULT_PSK_TTL_SECS: u64 = 3600;
/// Configuration for LP protocol.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LpConfig {
/// KEM algorithm for PSQ key encapsulation.
/// Supported KEMs: MlKem768, McEliece
#[serde(with = "kem_serde")]
pub kem_algorithm: KEM,
/// PSK time-to-live in seconds.
pub psk_ttl_secs: u64,
/// Enable KKT for authenticated key distribution.
pub enable_kkt: bool,
}
impl Default for LpConfig {
fn default() -> Self {
Self {
kem_algorithm: KEM::MlKem768,
psk_ttl_secs: DEFAULT_PSK_TTL_SECS,
enable_kkt: true,
}
}
}
impl LpConfig {
/// Returns PSK TTL as Duration.
pub fn psk_ttl(&self) -> Duration {
Duration::from_secs(self.psk_ttl_secs)
}
}
mod kem_serde {
use nym_kkt::ciphersuite::KEM;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub fn serialize<S>(kem: &KEM, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match kem {
KEM::MlKem768 => "MlKem768",
KEM::McEliece => "McEliece",
KEM::X25519 => return Err(serde::ser::Error::custom("Unsupported KEM: X25519")),
KEM::XWing => return Err(serde::ser::Error::custom("Unsupported KEM: XWing")),
}
.serialize(serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<KEM, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
match s.as_str() {
"MlKem768" => Ok(KEM::MlKem768),
"McEliece" => Ok(KEM::McEliece),
"X25519" => Err(serde::de::Error::custom("Unsupported KEM: X25519")),
"XWing" => Err(serde::de::Error::custom("Unsupported KEM: XWing")),
_ => Err(serde::de::Error::custom(format!("Unknown KEM: {}", s))),
}
}
}
+3 -3
View File
@@ -65,12 +65,12 @@ pub enum LpError {
#[error("State machine not found for lp_id: {0}")]
StateMachineNotFound(LpReceiverIndex),
// /// Ed25519 to X25519 conversion error.
// #[error("Ed25519 key conversion error: {0}")]
// Ed25519RecoveryError(#[from] Ed25519RecoveryError),
#[error("attempted to create an LP responder without providing a valid KEM keys")]
ResponderWithMissingKEMKeys,
#[error("attempted to create an LP mutual initiator without providing a valid KEM key")]
PSQMutualInitiatorMissingKemKey,
#[error(
"there are no known digests for remote's KEM key with {kem} KEM and {hash_function} hash function"
)]
+10 -9
View File
@@ -11,6 +11,7 @@ pub mod replay;
pub mod session;
mod session_integration;
pub mod session_manager;
pub mod state_machine;
pub mod transport;
pub use error::LpError;
@@ -20,8 +21,9 @@ pub use nym_kkt_ciphersuite::{
#[cfg(any(feature = "mock", test))]
pub use replay::{ReceivingKeyCounterValidator, ReplayError};
pub use session::LpTransportSession;
pub use session::LpSession;
pub use session_manager::SessionManager;
pub use state_machine::LpStateMachine;
#[cfg(any(feature = "mock", test))]
use nym_test_utils::helpers::u64_seeded_rng_09;
@@ -37,8 +39,8 @@ use libcrux_psq::{Channel, IntoSession};
#[cfg(any(feature = "mock", test))]
pub struct SessionsMock {
pub initiator: LpTransportSession,
pub responder: LpTransportSession,
pub initiator: LpSession,
pub responder: LpSession,
}
#[cfg(any(feature = "mock", test))]
@@ -107,18 +109,17 @@ impl SessionsMock {
initiator_authenticator,
responder_ecdh_pk: resp_remote.x25519_public,
responder_pq_pk: Some(encapsulation_key),
initiator_pq_pk: None,
};
SessionsMock {
initiator: LpTransportSession::new(
initiator: LpSession::new(
initiator.into_session().unwrap(),
binding.clone(),
receiver_index,
1,
)
.unwrap(),
responder: LpTransportSession::new(
responder: LpSession::new(
responder.into_session().unwrap(),
binding,
receiver_index,
@@ -133,18 +134,18 @@ impl SessionsMock {
}
// we just need a dummy 'valid' session for simpler tests
pub fn mock_initiator() -> LpTransportSession {
pub fn mock_initiator() -> LpSession {
Self::mock_post_handshake(KEM::default()).initiator
}
}
#[cfg(any(feature = "mock", test))]
pub fn sessions_for_tests() -> (LpTransportSession, LpTransportSession) {
pub fn sessions_for_tests() -> (LpSession, LpSession) {
let sessions = SessionsMock::mock_post_handshake(KEM::default());
(sessions.initiator, sessions.responder)
}
#[cfg(any(feature = "mock", test))]
pub fn mock_session_for_test() -> LpTransportSession {
pub fn mock_session_for_test() -> LpSession {
SessionsMock::mock_initiator()
}
+2 -22
View File
@@ -1,8 +1,7 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
// SPDX-License-Identifier: Apache-2.0
use crate::LpError;
use nym_kkt::keys::EncapsulationKey;
use nym_kkt_ciphersuite::{Ciphersuite, KEM, KEMKeyDigests};
use std::collections::BTreeMap;
use std::fmt::Debug;
@@ -42,18 +41,6 @@ impl LpLocalPeer {
self
}
pub fn kem_key(&self, kem: KEM) -> Option<EncapsulationKey> {
self.kem_keypairs
.as_ref()
.and_then(|k| k.encapsulation_key(kem))
}
pub fn encoded_kem_key(&self, kem: KEM) -> Option<&[u8]> {
self.kem_keypairs
.as_ref()
.and_then(|k| k.encoded_encapsulation_key(kem))
}
pub fn x25519(&self) -> &Arc<DHKeyPair> {
&self.x25519
}
@@ -82,10 +69,7 @@ impl Debug for LpLocalPeer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LpLocalPeer")
.field("ciphersuite", &self.ciphersuite)
.field(
"x25519",
&bs58::encode(self.x25519.pk.as_ref()).into_string(),
)
.field("x25519", &self.x25519.pk)
.field("kem_keypairs", &self.kem_keypairs)
.finish()
}
@@ -143,10 +127,6 @@ impl LpRemotePeer {
.ok_or(LpError::NoKnownKEMKeyDigests { kem, hash_function })
.cloned()
}
pub fn kem_key_digests(&self) -> &BTreeMap<KEM, KEMKeyDigests> {
&self.expected_kem_key_digests
}
}
impl From<DHPublicKey> for LpRemotePeer {
+23 -27
View File
@@ -29,7 +29,7 @@ pub struct LpPeerConfig {
// Determine the hop id.
// Should be 0 if node_initiator is true
// Should be > 1 && < 16 if is_exit is true
// Should be > 1 if is_exit is true
hop_id: u8,
// Determine if the recipient should be an exit node
@@ -65,7 +65,6 @@ impl LpPeerConfig {
rng.random(),
)
}
/// Creates a new client to exit config.
/// Inputs:
/// hop_id: this value must be in the range (1..=15). This function returns an error if this is not the case.
@@ -80,7 +79,6 @@ impl LpPeerConfig {
{
Self::new(rng, hop_id, true, false, censorship_resistance)
}
/// Creates a new client to an intermediate node config.
/// Inputs:
/// hop_id: this value must be in the range (1..=14). This function returns an error if this is not the case.
@@ -132,7 +130,6 @@ impl LpPeerConfig {
rng.random(),
)
}
fn build(
hop_id: u8,
is_exit: bool,
@@ -150,7 +147,6 @@ impl LpPeerConfig {
seed,
}
}
fn build_checked(
hop_id: u8,
is_exit: bool,
@@ -202,37 +198,37 @@ impl LpPeerConfig {
}
pub fn serialize(&self) -> [u8; LP_PEER_CONFIG_SIZE] {
let mut output_bytes = [0u8; LP_PEER_CONFIG_SIZE];
output_bytes[0..4].copy_from_slice(&self.pack_config());
let mut output_bytes: [u8; LP_PEER_CONFIG_SIZE] = [0u8; LP_PEER_CONFIG_SIZE];
output_bytes[0..4].copy_from_slice(self.pack_config().as_slice());
output_bytes[4..].copy_from_slice(&self.seed);
output_bytes
}
pub fn deserialize(bytes: &[u8]) -> Result<Self, LpError> {
if bytes.len() != LP_PEER_CONFIG_SIZE {
return Err(LpError::DeserializationError(format!(
Err(LpError::DeserializationError(format!(
"Invalid Lp Config Length ({}), expected ({})",
bytes.len(),
LP_PEER_CONFIG_SIZE
)));
)))
} else {
let (hop_id, is_exit, node_initiator, censorship_resistance) =
Self::unpack_first_byte(bytes[0]);
let mut filler: [u8; FILLER_LEN] = [0u8; FILLER_LEN];
filler.copy_from_slice(&bytes[CONFIG_LEN..CONFIG_LEN + FILLER_LEN]);
let mut seed: [u8; SEED_LEN] = [0u8; SEED_LEN];
seed.copy_from_slice(&bytes[CONFIG_LEN + FILLER_LEN..LP_PEER_CONFIG_SIZE]);
Self::build_checked(
hop_id,
is_exit,
node_initiator,
censorship_resistance,
seed,
filler,
)
}
let (hop_id, is_exit, node_initiator, censorship_resistance) =
Self::unpack_first_byte(bytes[0]);
let mut filler = [0u8; FILLER_LEN];
filler.copy_from_slice(&bytes[CONFIG_LEN..CONFIG_LEN + FILLER_LEN]);
let mut seed = [0u8; SEED_LEN];
seed.copy_from_slice(&bytes[CONFIG_LEN + FILLER_LEN..LP_PEER_CONFIG_SIZE]);
Self::build_checked(
hop_id,
is_exit,
node_initiator,
censorship_resistance,
seed,
filler,
)
}
fn pack_config(&self) -> [u8; 4] {
+21 -210
View File
@@ -11,7 +11,7 @@ use crate::psq::{
};
use crate::session::PersistentSessionBinding;
use crate::transport::traits::LpHandshakeChannel;
use crate::{LpError, LpTransportSession};
use crate::{LpError, LpSession};
use libcrux_psq::handshake::RegistrationInitiator;
use libcrux_psq::handshake::builders::{
CiphersuiteBuilder, InitiatorCiphersuite, PrincipalBuilder,
@@ -24,31 +24,9 @@ use nym_kkt::message::{KKTRequest, KKTResponse};
use rand09::SeedableRng;
use tracing::debug;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HandshakeMode {
// Client <> Entry
OneWayEntry,
// Client <> Exit
OneWayExit,
// Entry <> Exit
MutualInternode,
// in the future more variants will be supported (such as individual mix hops)
}
impl HandshakeMode {
pub fn is_mutual(&self) -> bool {
matches!(self, HandshakeMode::MutualInternode)
}
}
pub struct PSQHandshakeStateInitiator<'a, S> {
pub(super) inner_state: PSQHandshakeState<'a, S>,
pub(super) initiator_data: InitiatorData,
/// The mode of the handshake (mutual node-node, client-entry, entry-exit)
pub(super) mode: HandshakeMode,
}
pub(crate) fn build_psq_principal<R>(
@@ -99,25 +77,6 @@ impl<'a, S> PSQHandshakeStateInitiator<'a, S>
where
S: LpHandshakeChannel + Unpin,
{
fn lp_peer_config<R>(&self, rng: &mut R) -> Result<LpPeerConfig, LpError>
where
R: rand09::CryptoRng,
{
// for now we don't support censorship resistance flag
let censorship_resistance = false;
match self.mode {
HandshakeMode::OneWayEntry => Ok(LpPeerConfig::new_client_to_entry(
rng,
censorship_resistance,
)),
HandshakeMode::OneWayExit => {
LpPeerConfig::new_client_to_exit(rng, 1, censorship_resistance)
}
HandshakeMode::MutualInternode => LpPeerConfig::new_node_to_node(rng),
}
}
/// Attempt to send KKT request to begin the handshake
async fn send_kkt_request(&mut self, request: KKTRequest) -> Result<(), LpError> {
let kem = self.inner_state.local_peer.ciphersuite.kem();
@@ -144,7 +103,7 @@ where
Ok(resp.into())
}
pub async fn complete_handshake(self) -> Result<LpTransportSession, LpError>
pub async fn complete_handshake(self) -> Result<LpSession, LpError>
where
S: LpHandshakeChannel + Unpin,
{
@@ -152,10 +111,7 @@ where
self.complete_handshake_with_rng(&mut rng).await
}
pub async fn complete_handshake_with_rng<R>(
mut self,
rng: &mut R,
) -> Result<LpTransportSession, LpError>
pub async fn complete_handshake_with_rng<R>(mut self, rng: &mut R) -> Result<LpSession, LpError>
where
S: LpHandshakeChannel + Unpin,
R: rand09::CryptoRng,
@@ -163,7 +119,7 @@ where
let ciphersuite = self.inner_state.local_peer.ciphersuite();
let kem = ciphersuite.kem();
let lp_peer_config = self.lp_peer_config(rng)?;
let lp_peer_config = LpPeerConfig::new_client_to_entry(rng, false);
// 1. retrieve the expected kem key hash. if we don't know it,
let dir_hash = self
@@ -172,34 +128,16 @@ where
.expected_kem_key_hash(ciphersuite)?;
// 2. prepare and send KKT request
let (mut initiator, kkt_request) = if self.mode.is_mutual() {
// this has been verified when setting the mutual flag
let Some(local_encapsulation_key) = self.inner_state.local_peer.encoded_kem_key(kem)
else {
return Err(LpError::PSQMutualInitiatorMissingKemKey);
};
KKTInitiator::generate_mutual_request(
rng,
ciphersuite,
local_encapsulation_key,
self.initiator_data.remote_peer.x25519(),
&dir_hash,
self.initiator_data.protocol_version,
Some(Vec::from(lp_peer_config.serialize())),
)?
} else {
KKTInitiator::generate_one_way_request(
rng,
ciphersuite,
self.initiator_data.remote_peer.x25519(),
&dir_hash,
self.initiator_data.protocol_version,
Some(Vec::from(lp_peer_config.serialize())),
)?
};
let init_kem_key = self.inner_state.local_peer.kem_key(kem);
let (mut initiator, kkt_request) = KKTInitiator::generate_one_way_request(
rng,
ciphersuite,
self.initiator_data.remote_peer.x25519(),
&dir_hash,
self.initiator_data.protocol_version,
Some(Vec::from(lp_peer_config.serialize())),
)?;
// derive the receiver index from the request
// let receiver_index = kkt_request
debug!("sending KKT request");
self.send_kkt_request(kkt_request).await?;
@@ -216,7 +154,7 @@ where
let conn = self.inner_state.connection;
// note: the clone is cheap due to internal Arcs
let resp_encapsulation_key = response.encapsulation_key.clone();
let encapsulation_key = response.encapsulation_key.clone();
// build the PSQ initiator
let initiator_ciphersuite = build_psq_ciphersuite(
@@ -253,18 +191,17 @@ where
let initiator_authenticator = Authenticator::Dh(self.inner_state.local_peer.x25519().pk);
let receiver_index = lp_peer_config
.derive_receiver_index(&initiator_authenticator, &resp_encapsulation_key)?;
let receiver_index =
lp_peer_config.derive_receiver_index(&initiator_authenticator, &encapsulation_key)?;
let binding = PersistentSessionBinding {
initiator_authenticator,
responder_ecdh_pk: self.initiator_data.remote_peer.x25519_public,
responder_pq_pk: Some(resp_encapsulation_key),
initiator_pq_pk: init_kem_key,
responder_pq_pk: Some(encapsulation_key),
};
let psq_session = psq_initiator.into_session()?;
LpTransportSession::new(psq_session, binding, receiver_index, protocol)
LpSession::new(psq_session, binding, receiver_index, protocol)
}
}
@@ -281,7 +218,6 @@ mod tests {
use nym_test_utils::helpers::{DeterministicRng09Send, u64_seeded_rng_09};
use nym_test_utils::mocks::async_read_write::MockIOStream;
use nym_test_utils::traits::{Leak, Timeboxed};
use std::collections::BTreeMap;
#[tokio::test]
async fn initiator_test_plain() -> anyhow::Result<()> {
@@ -289,8 +225,6 @@ mod tests {
let conn_init = MockIOStream::default();
let conn_resp = conn_init.try_get_remote_handle();
let dir_hash_init = BTreeMap::new();
// leak the connections (JUST FOR THE PURPOSE OF THIS TEST!)
// so they'd get 'static lifetime
let conn_init = conn_init.leak();
@@ -304,8 +238,8 @@ mod tests {
resp.ciphersuite = ciphersuite;
let initiator_data = InitiatorData::new(1, resp_remote);
let handshake_init = PSQHandshakeState::new(conn_init, init)
.as_initiator(initiator_data, HandshakeMode::OneWayEntry)?;
let handshake_init =
PSQHandshakeState::new(conn_init, init).as_initiator(initiator_data);
let mut init_rng = DeterministicRng09Send::new(u64_seeded_rng_09(1));
@@ -330,7 +264,6 @@ mod tests {
let kkt_responder = KKTResponder::new(
responder_x25519_keypair,
resp_keys,
&dir_hash_init,
&supported_hash,
&supported_sigs,
&[1],
@@ -404,126 +337,4 @@ mod tests {
}
Ok(())
}
#[tokio::test]
async fn initiator_test_plain_mutual() -> anyhow::Result<()> {
for kem in KEM::iter() {
let conn_init = MockIOStream::default();
let conn_resp = conn_init.try_get_remote_handle();
// leak the connections (JUST FOR THE PURPOSE OF THIS TEST!)
// so they'd get 'static lifetime
let conn_init = conn_init.leak();
let conn_resp = conn_resp.leak();
let (mut init, mut resp) = mock_peers();
let resp_remote = resp.as_remote();
let init_remote = init.as_remote();
let dir_hash_init = init_remote.expected_kem_key_digests;
let ciphersuite = Ciphersuite::default().with_kem(kem);
init.ciphersuite = ciphersuite;
resp.ciphersuite = ciphersuite;
let initiator_data = InitiatorData::new(1, resp_remote);
let handshake_init = PSQHandshakeState::new(conn_init, init)
.as_initiator(initiator_data, HandshakeMode::MutualInternode)?;
let mut init_rng = DeterministicRng09Send::new(u64_seeded_rng_09(1));
let init_fut = tokio::spawn(async move {
handshake_init
.complete_handshake_with_rng(&mut init_rng)
.timeboxed()
.await
});
// responder:
let supported_sigs = [SignatureScheme::Ed25519];
let supported_hash = [
HashFunction::Blake3,
HashFunction::Shake256,
HashFunction::Shake128,
HashFunction::SHA256,
];
let resp_keys = resp.kem_keypairs.as_ref().unwrap();
let responder_x25519_keypair = resp.x25519();
let kkt_responder = KKTResponder::new(
responder_x25519_keypair,
resp_keys,
&dir_hash_init,
&supported_hash,
&supported_sigs,
&[1],
)?;
// 1. read KKT request
let raw_kkt_req: handshake_message::KKTRequest = conn_resp
.receive_handshake_message(
KKTRequest::size_excluding_payload(KKTMode::Mutual, kem) + LP_PEER_CONFIG_SIZE,
)
.timeboxed()
.await??;
let req = raw_kkt_req.into();
// 2. process
let processed_req = kkt_responder.process_request(req, LP_PEER_CONFIG_SIZE)?;
conn_resp
.send_handshake_message::<handshake_message::KKTResponse>(
processed_req.response.into(),
kem,
)
.timeboxed()
.await??;
// 3. read PSQ req
let responder_ciphersuite = responder::build_psq_ciphersuite(&resp, kem)?;
let mut responder =
responder::build_psq_principal(rand09::rng(), 1, responder_ciphersuite)?;
let response_len = psq_msg1_size(kem);
let msg: PSQMsg1 = conn_resp
.receive_handshake_message(response_len)
.timeboxed()
.await??;
responder.read_message(&msg, &mut []).unwrap();
// 4 send PSQ response
let mut buf = vec![0u8; PSQ_MSG2_SIZE];
let n = responder.write_message(&[], &mut buf).unwrap();
assert_eq!(n, buf.len());
let msg = PSQMsg2::new(buf);
conn_resp
.send_handshake_message(msg, kem)
.timeboxed()
.await??;
assert!(responder.is_handshake_finished());
let mut session_init = init_fut.await???;
let mut r_transport = responder.into_session().unwrap();
// test serialization, deserialization
let channel_i = session_init.active_transport();
let mut channel_r = r_transport.transport_channel().unwrap();
assert_eq!(channel_i.identifier(), channel_r.identifier());
let app_data_i = b"Derived session hey".as_slice();
let app_data_r = b"Derived session ho".as_slice();
let ct_i = encrypt_data(app_data_i, channel_i)?;
let pt_r = decrypt_data(&ct_i, &mut channel_r)?;
assert_eq!(app_data_i, pt_r);
let ct_r = encrypt_data(app_data_r, &mut channel_r)?;
let pt_i = decrypt_data(&ct_r, channel_i)?;
assert_eq!(app_data_r, pt_i);
}
Ok(())
}
}
+6 -297
View File
@@ -4,16 +4,13 @@
use crate::packet::version;
use crate::peer::{LpLocalPeer, LpRemotePeer};
use crate::transport::traits::LpHandshakeChannel;
use nym_kkt_ciphersuite::{HashFunction, IntoEnumIterator, KEM, KEMKeyDigests, SignatureScheme};
use std::collections::BTreeMap;
use nym_kkt_ciphersuite::{HashFunction, IntoEnumIterator, KEM, SignatureScheme};
pub(crate) mod handshake_message;
mod helpers;
pub mod initiator;
pub mod responder;
use crate::LpError;
use crate::psq::initiator::HandshakeMode;
pub use initiator::PSQHandshakeStateInitiator;
pub use responder::PSQHandshakeStateResponder;
@@ -71,19 +68,6 @@ pub struct ResponderData {
/// List of supported outer (LP) protocol version by this Responder
pub supported_outer_protocol_versions: Vec<u8>,
/// Expected KEM hashes of the initiator.
/// It is only expected to be populated for the mutual mode of the KKT.
/// Otherwise the map is empty.
pub initiator_kem_hashes: BTreeMap<KEM, KEMKeyDigests>,
}
impl ResponderData {
#[must_use]
pub fn with_initiator_kem_hashes(mut self, kem_hashes: BTreeMap<KEM, KEMKeyDigests>) -> Self {
self.initiator_kem_hashes = kem_hashes;
self
}
}
impl Default for ResponderData {
@@ -93,7 +77,6 @@ impl Default for ResponderData {
supported_hash_functions: HashFunction::iter().collect(),
supported_signature_schemes: SignatureScheme::iter().collect(),
supported_outer_protocol_versions: vec![version::CURRENT],
initiator_kem_hashes: Default::default(),
}
}
}
@@ -109,20 +92,11 @@ where
}
}
pub fn as_initiator(
self,
initiator_data: InitiatorData,
mode: HandshakeMode,
) -> Result<PSQHandshakeStateInitiator<'a, S>, LpError> {
if mode.is_mutual() && self.local_peer.kem_keypairs.is_none() {
return Err(LpError::PSQMutualInitiatorMissingKemKey);
}
Ok(PSQHandshakeStateInitiator {
pub fn as_initiator(self, initiator_data: InitiatorData) -> PSQHandshakeStateInitiator<'a, S> {
PSQHandshakeStateInitiator {
initiator_data,
inner_state: self,
mode,
})
}
}
pub fn as_responder(self, responder_data: ResponderData) -> PSQHandshakeStateResponder<'a, S> {
@@ -150,7 +124,6 @@ mod tests {
};
use nym_test_utils::mocks::async_read_write::MockIOStream;
use nym_test_utils::traits::{Leak, TimeboxedSpawnable};
use std::collections::BTreeMap;
use tokio::join;
#[tokio::test]
@@ -170,10 +143,8 @@ mod tests {
resp.ciphersuite = ciphersuite;
let resp_remote = resp.as_remote();
let handshake_init = PSQHandshakeState::new(conn_init, init).as_initiator(
InitiatorData::new(1, resp_remote),
HandshakeMode::OneWayEntry,
)?;
let handshake_init = PSQHandshakeState::new(conn_init, init)
.as_initiator(InitiatorData::new(1, resp_remote));
let handshake_resp =
PSQHandshakeState::new(conn_resp, resp).as_responder(ResponderData::default());
@@ -226,82 +197,6 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn e2e_psq_mutual_handshake() -> anyhow::Result<()> {
for kem in KEM::iter() {
let conn_init = MockIOStream::default();
let conn_resp = conn_init.try_get_remote_handle();
// leak the connections (JUST FOR THE PURPOSE OF THIS TEST!)
// so they'd get 'static lifetime
let conn_init = conn_init.leak();
let conn_resp = conn_resp.leak();
let ciphersuite = Ciphersuite::default().with_kem(kem);
let (mut init, mut resp) = mock_peers();
init.ciphersuite = ciphersuite;
resp.ciphersuite = ciphersuite;
let resp_remote = resp.as_remote();
let init_remote = init.as_remote();
let handshake_init = PSQHandshakeState::new(conn_init, init).as_initiator(
InitiatorData::new(1, resp_remote),
HandshakeMode::MutualInternode,
)?;
let handshake_resp = PSQHandshakeState::new(conn_resp, resp).as_responder(
ResponderData::default()
.with_initiator_kem_hashes(init_remote.expected_kem_key_digests),
);
let init_rng = DeterministicRng09Send::new(u64_seeded_rng_09(1));
let resp_rng = DeterministicRng09Send::new(u64_seeded_rng_09(2));
// similarly leak the rngs to get the static lifetimes
let init_rng = init_rng.leak();
let resp_rng = resp_rng.leak();
let init_fut = handshake_init
.complete_handshake_with_rng(init_rng)
.spawn_timeboxed();
let resp_fut = handshake_resp
.complete_handshake_with_rng(resp_rng)
.spawn_timeboxed();
let (session_init, session_resp) = join!(init_fut, resp_fut);
let mut session_init = session_init???;
let mut session_resp = session_resp???;
assert_eq!(session_init.receiver_index(), session_resp.receiver_index());
assert_eq!(
session_init.session_identifier(),
session_resp.session_identifier()
);
// test serialization, deserialization
let channel_i = session_init.active_transport();
let channel_r = session_resp.active_transport();
assert_eq!(channel_i.identifier(), channel_r.identifier());
let app_data_i = b"Derived session hey".as_slice();
let app_data_r = b"Derived session ho".as_slice();
let ct_i = encrypt_data(app_data_i, channel_i)?;
let pt_r = decrypt_data(&ct_i, channel_r)?;
assert_eq!(app_data_i, pt_r);
let ct_r = encrypt_data(app_data_r, channel_r)?;
let pt_i = decrypt_data(&ct_r, channel_i)?;
assert_eq!(app_data_r, pt_i);
}
Ok(())
}
// plain test without any wrappers
#[test]
fn e2e_test_plain() {
@@ -314,7 +209,6 @@ mod tests {
init.ciphersuite = Ciphersuite::default().with_kem(kem);
let resp_remote = resp.as_remote();
let dir_hash = resp_remote.expected_kem_key_hash(init.ciphersuite).unwrap();
let dir_hash_init = BTreeMap::new();
let resp_keys = resp.kem_keypairs.as_ref().unwrap();
let responder_x25519_keypair = resp.x25519();
@@ -329,7 +223,6 @@ mod tests {
let kkt_responder = KKTResponder::new(
responder_x25519_keypair,
resp_keys,
&dir_hash_init,
&supported_hash,
&supported_sigs,
&[protocol_version],
@@ -476,188 +369,4 @@ mod tests {
assert_eq!(app_data_r, pt_i);
}
}
#[test]
fn e2e_test_plain_mutual() {
let mut rng = deterministic_rng_09();
for kem in KEM::iter() {
// SETUP START:
let protocol_version = 1;
let (mut init, resp) = mock_peers();
init.ciphersuite = Ciphersuite::default().with_kem(kem);
let init_remote = init.as_remote();
let resp_remote = resp.as_remote();
let dir_hash_init = init_remote.expected_kem_key_digests.clone();
let dir_hash_resp = resp_remote.expected_kem_key_hash(init.ciphersuite).unwrap();
let resp_keys = resp.kem_keypairs.as_ref().unwrap();
let responder_x25519_keypair = resp.x25519();
let init_keys = init.kem_keypairs.as_ref().unwrap();
let init_kem = init_keys.encoded_encapsulation_key(kem).unwrap();
let supported_sigs = [SignatureScheme::Ed25519];
let supported_hash = [
HashFunction::Blake3,
HashFunction::Shake256,
HashFunction::Shake128,
HashFunction::SHA256,
];
let kkt_responder = KKTResponder::new(
responder_x25519_keypair,
resp_keys,
&dir_hash_init,
&supported_hash,
&supported_sigs,
&[protocol_version],
)
.unwrap();
// SETUP END
let lp_peer_config = LpPeerConfig::new_client_to_entry(&mut rng, false);
// OneWay - MlKem
let (mut initiator, request) = KKTInitiator::generate_mutual_request(
&mut rng,
init.ciphersuite,
init_kem,
&responder_x25519_keypair.pk,
&dir_hash_resp,
protocol_version,
Some(Vec::from(lp_peer_config.serialize())),
)
.unwrap();
let processed_req = kkt_responder
.process_request(request, LP_PEER_CONFIG_SIZE)
.unwrap();
let init_key = processed_req.remote_encapsulation_key.unwrap();
assert_eq!(init_key.as_bytes(), init_kem);
let response = initiator
.process_response(processed_req.response, 0)
.unwrap();
let encapsulation_key = response.encapsulation_key;
let mut payload_buf_responder = vec![0u8; 4096];
let mut payload_buf_initiator = vec![0u8; 4096];
let initiator_ciphersuite =
initiator::build_psq_ciphersuite(&init, &resp_remote, &encapsulation_key).unwrap();
let mut initiator = initiator::build_psq_principal(
rand09::rng(),
protocol_version,
initiator_ciphersuite,
)
.unwrap();
let responder_ciphersuite = responder::build_psq_ciphersuite(&resp, kem).unwrap();
let mut responder = responder::build_psq_principal(
rand09::rng(),
protocol_version,
responder_ciphersuite,
)
.unwrap();
// Send first message
let mut buf = vec![0u8; psq_msg1_size(kem)];
let len_i = initiator.write_message(&[], &mut buf).unwrap();
assert_eq!(len_i, buf.len());
// Read first message
let (_, _) = responder
.read_message(&buf, &mut payload_buf_responder)
.unwrap();
// Get the authenticator out here, so we can deserialize the session later.
let Some(initiator_authenticator) = responder.initiator_authenticator() else {
panic!("No initiator authenticator found")
};
// Respond
let mut buf = [0u8; PSQ_MSG2_SIZE];
let len_r = responder.write_message(&[], &mut buf).unwrap();
assert_eq!(len_r, buf.len());
// Finalize on registration initiator
let (len_i_deserialized, _) = initiator
.read_message(&buf, &mut payload_buf_initiator)
.unwrap();
// We read the same amount of data.
assert_eq!(len_r, len_i_deserialized);
// Ready for transport mode
assert!(initiator.is_handshake_finished());
assert!(responder.is_handshake_finished());
let i_transport = initiator.into_session().unwrap();
let r_transport = responder.into_session().unwrap();
// test serialization, deserialization
let mut session_storage = vec![0u8; 4096];
i_transport
.serialize(
&mut session_storage,
SessionBinding {
initiator_authenticator: &Authenticator::Dh(init.x25519().pk),
responder_ecdh_pk: &responder_x25519_keypair.pk,
responder_pq_pk: Some(encapsulation_key.as_pq_encapsulation_key()),
},
)
.unwrap();
let mut i_transport = Session::deserialize(
&session_storage,
SessionBinding {
initiator_authenticator: &Authenticator::Dh(init.x25519().pk),
responder_ecdh_pk: &responder_x25519_keypair.pk,
responder_pq_pk: Some(encapsulation_key.as_pq_encapsulation_key()),
},
)
.unwrap();
r_transport
.serialize(
&mut session_storage,
SessionBinding {
initiator_authenticator: &initiator_authenticator,
responder_ecdh_pk: &responder_x25519_keypair.pk,
responder_pq_pk: Some(encapsulation_key.as_pq_encapsulation_key()),
},
)
.unwrap();
let mut r_transport = Session::deserialize(
&session_storage,
SessionBinding {
initiator_authenticator: &initiator_authenticator,
responder_ecdh_pk: &responder_x25519_keypair.pk,
responder_pq_pk: Some(encapsulation_key.as_pq_encapsulation_key()),
},
)
.unwrap();
let mut channel_i = i_transport.transport_channel().unwrap();
let mut channel_r = r_transport.transport_channel().unwrap();
assert_eq!(channel_i.identifier(), channel_r.identifier());
let app_data_i = b"Derived session hey".as_slice();
let app_data_r = b"Derived session ho".as_slice();
let ct_i = encrypt_data(app_data_i, &mut channel_i).unwrap();
let pt_r = decrypt_data(&ct_i, &mut channel_r).unwrap();
assert_eq!(app_data_i, pt_r);
let ct_r = encrypt_data(app_data_r, &mut channel_r).unwrap();
let pt_i = decrypt_data(&ct_r, &mut channel_i).unwrap();
assert_eq!(app_data_r, pt_i);
}
}
}
+11 -155
View File
@@ -11,7 +11,7 @@ use crate::psq::{
};
use crate::session::PersistentSessionBinding;
use crate::transport::traits::{HandshakeMessage, LpHandshakeChannel};
use crate::{LpError, LpTransportSession};
use crate::{LpError, LpSession};
use libcrux_psq::handshake::Responder;
use libcrux_psq::handshake::builders::{
CiphersuiteBuilder, PrincipalBuilder, ResponderCiphersuite,
@@ -77,14 +77,12 @@ impl<'a, S> PSQHandshakeStateResponder<'a, S>
where
S: LpHandshakeChannel + Unpin,
{
async fn receive_kkt_request(&mut self, mode: KKTMode) -> Result<KKTRequest, LpError> {
let packet_len =
KKTRequest::size_excluding_payload(mode, self.inner_state.local_peer.ciphersuite.kem())
+ LP_PEER_CONFIG_SIZE;
// TODO: we have an issue here: if initiator sends us a KEM key of different type
// than our ciphersuite, we will fail to receive it.
// Surely this won't blow up in our faces later... right?
/// Attempt to receive a KKT request from a one-way client
async fn receive_one_way_kkt_request(&mut self) -> Result<KKTRequest, LpError> {
let packet_len = KKTRequest::size_excluding_payload(
KKTMode::OneWay,
self.inner_state.local_peer.ciphersuite.kem(),
) + LP_PEER_CONFIG_SIZE;
let req = self
.inner_state
@@ -95,16 +93,6 @@ where
Ok(req.into())
}
/// Attempt to receive a KKT request from a one-way client
async fn receive_one_way_kkt_request(&mut self) -> Result<KKTRequest, LpError> {
Self::receive_kkt_request(self, KKTMode::OneWay).await
}
/// Attempt to receive a KKT request from a mutual client
async fn receive_mutual_kkt_request(&mut self) -> Result<KKTRequest, LpError> {
Self::receive_kkt_request(self, KKTMode::Mutual).await
}
/// Attempt to process the received KKT request
fn process_kkt_request(&self, kkt_request: KKTRequest) -> Result<ProcessedKKTRequest, LpError> {
let kem_keys = &self
@@ -117,7 +105,6 @@ where
let processed_req = KKTResponder::new(
&self.inner_state.local_peer.x25519,
kem_keys,
&self.responder_data.initiator_kem_hashes,
&self.responder_data.supported_hash_functions,
&self.responder_data.supported_signature_schemes,
&self.responder_data.supported_outer_protocol_versions,
@@ -146,7 +133,7 @@ where
Ok(msg.into_bytes())
}
pub async fn complete_handshake(self) -> Result<LpTransportSession, LpError>
pub async fn complete_handshake(self) -> Result<LpSession, LpError>
where
S: LpHandshakeChannel + Unpin,
{
@@ -154,27 +141,17 @@ where
self.complete_handshake_with_rng(&mut rng).await
}
pub async fn complete_handshake_with_rng<R>(
mut self,
rng: &mut R,
) -> Result<LpTransportSession, LpError>
pub async fn complete_handshake_with_rng<R>(mut self, rng: &mut R) -> Result<LpSession, LpError>
where
S: LpHandshakeChannel + Unpin,
R: rand09::CryptoRng,
{
// 1. receive and process KKTRequest
let kkt_request = if self.responder_data.initiator_kem_hashes.is_empty() {
debug!("expecting one way KKT request");
self.receive_one_way_kkt_request().await?
} else {
debug!("expecting mutual KKT request");
self.receive_mutual_kkt_request().await?
};
let kkt_request = self.receive_one_way_kkt_request().await?;
debug!("received KKT request");
let processed_req = self.process_kkt_request(kkt_request)?;
let kem = processed_req.requested_kem;
let init_kem = processed_req.remote_encapsulation_key;
let lp_peer_config = LpPeerConfig::deserialize(&processed_req.request_payload)?;
@@ -228,11 +205,10 @@ where
initiator_authenticator,
responder_ecdh_pk: self.inner_state.local_peer.x25519().pk,
responder_pq_pk: Some(kem_key),
initiator_pq_pk: init_kem,
};
let psq_session = psq_responder.into_session()?;
LpTransportSession::new(
LpSession::new(
psq_session,
binding,
receiver_index,
@@ -372,124 +348,4 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn responder_test_plain_mutual() -> anyhow::Result<()> {
for kem in KEM::iter() {
let conn_init = MockIOStream::default();
let conn_resp = conn_init.try_get_remote_handle();
// SETUP START:
// leak the connections (JUST FOR THE PURPOSE OF THIS TEST!)
// so they'd get 'static lifetime
let conn_init = conn_init.leak();
let conn_resp = conn_resp.leak();
let (mut init, mut resp) = mock_peers();
let resp_remote = resp.as_remote();
let init_remote = init.as_remote();
let ciphersuite = Ciphersuite::default().with_kem(kem);
init.ciphersuite = ciphersuite;
resp.ciphersuite = ciphersuite;
let responder_data = ResponderData::default()
.with_initiator_kem_hashes(init_remote.expected_kem_key_digests);
let handshake_resp =
PSQHandshakeState::new(conn_resp, resp).as_responder(responder_data);
let mut resp_rng = DeterministicRng09Send::new(u64_seeded_rng_09(2));
let resp_fut = tokio::spawn(async move {
handshake_resp
.complete_handshake_with_rng(&mut resp_rng)
.timeboxed()
.await
});
// initiator:
let mut rng = deterministic_rng_09();
let dir_hash = resp_remote.expected_kem_key_hash(init.ciphersuite)?;
let lp_peer_config = LpPeerConfig::new_client_to_entry(&mut rng, false);
// Mutual - MlKem
let (mut initiator, request) = KKTInitiator::generate_mutual_request(
&mut rng,
init.ciphersuite,
init.encoded_kem_key(kem).unwrap(),
&resp_remote.x25519_public,
&dir_hash,
1,
Some(Vec::from(lp_peer_config.serialize())),
)?;
// 1. send kkt request
conn_init
.send_handshake_message::<handshake_message::KKTRequest>(request.into(), kem)
.timeboxed()
.await??;
// 2. receive KKT response
let response_len = KKTResponse::size_excluding_payload(kem);
let resp: handshake_message::KKTResponse = conn_init
.receive_handshake_message(response_len)
.timeboxed()
.await??;
let kkt_response = resp.into();
let response = initiator.process_response(kkt_response, 0)?;
let encapsulation_key = response.encapsulation_key;
let initiator_ciphersuite =
initiator::build_psq_ciphersuite(&init, &resp_remote, &encapsulation_key)?;
let mut initiator =
initiator::build_psq_principal(rand09::rng(), 1, initiator_ciphersuite)?;
// 3. send PSQ msg1
// Send first message
let mut buf = vec![0u8; psq_msg1_size(kem)];
let n = initiator.write_message(&[], &mut buf).unwrap();
assert_eq!(n, buf.len());
let msg = PSQMsg1::new(buf);
conn_init
.send_handshake_message(msg, kem)
.timeboxed()
.await??;
// 4. receive PSQ msg2
let msg: PSQMsg2 = conn_init
.receive_handshake_message(PSQ_MSG2_SIZE)
.timeboxed()
.await??;
initiator.read_message(&msg, &mut []).unwrap();
assert!(initiator.is_handshake_finished());
let mut session_resp = resp_fut.await???;
let mut i_transport = initiator.into_session().unwrap();
// test serialization, deserialization
let mut channel_i = i_transport.transport_channel().unwrap();
let channel_r = session_resp.active_transport();
assert_eq!(channel_i.identifier(), channel_r.identifier());
let app_data_i = b"Derived session hey".as_slice();
let app_data_r = b"Derived session ho".as_slice();
let ct_i = encrypt_data(app_data_i, &mut channel_i)?;
let pt_r = decrypt_data(&ct_i, channel_r)?;
assert_eq!(app_data_i, pt_r);
let ct_r = encrypt_data(app_data_r, channel_r)?;
let pt_i = decrypt_data(&ct_r, &mut channel_i)?;
assert_eq!(app_data_r, pt_i);
}
Ok(())
}
}
+8 -160
View File
@@ -9,7 +9,6 @@ use crate::codec::{decrypt_lp_packet, encrypt_lp_packet};
use crate::packet::{EncryptedLpPacket, LpHeader, LpMessage, LpPacket};
use crate::peer::{LpLocalPeer, LpRemotePeer};
use crate::peer_config::LpReceiverIndex;
use crate::psq::initiator::HandshakeMode;
use crate::psq::{
InitiatorData, PSQHandshakeState, PSQHandshakeStateInitiator, PSQHandshakeStateResponder,
ResponderData,
@@ -20,38 +19,15 @@ use crate::{LpError, replay::ReceivingKeyCounterValidator};
use libcrux_psq::handshake::types::{Authenticator, DHPublicKey};
use libcrux_psq::session::{Session, SessionBinding};
use nym_kkt::keys::EncapsulationKey;
use nym_kkt_ciphersuite::{KEM, KEMKeyDigests};
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
/// Represents inputs that drive the state machine transitions.
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum LpInput {
/// Received an encrypted LP Packet from the network.
ReceivePacket(EncryptedLpPacket),
/// Application wants to send data (only valid in Transport state).
SendData(LpMessage),
}
/// Represents actions the state machine requests the environment to perform.
#[derive(Debug)]
pub enum LpAction {
/// Send an LP Packet over the network.
SendPacket(EncryptedLpPacket),
/// Deliver decrypted application data received from the peer.
DeliverData(LpMessage),
}
pub type SessionId = [u8; 32];
/// A session in the Lewes Protocol..
/// A session in the Lewes Protocol, handling connection state with Noise.
///
/// Sessions manage connection state, including LP replay protection.
/// Each session has a unique receiving index and sending index for connection identification.
pub struct LpTransportSession {
pub struct LpSession {
/// The underlying established session
psq_session: Session,
@@ -86,9 +62,6 @@ pub struct PersistentSessionBinding {
/// The responder's long term PQ-KEM public key (if any).
pub responder_pq_pk: Option<EncapsulationKey>,
/// The initiator's long term PQ-KEM public key (if any).
pub initiator_pq_pk: Option<EncapsulationKey>,
}
impl Debug for PersistentSessionBinding {
@@ -114,7 +87,7 @@ impl<'a> From<&'a PersistentSessionBinding> for SessionBinding<'a> {
}
}
impl Debug for LpTransportSession {
impl Debug for LpSession {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LpSession")
.field("session_id", &self.psq_session.identifier())
@@ -127,7 +100,7 @@ impl Debug for LpTransportSession {
}
}
impl LpTransportSession {
impl LpSession {
/// Creates a new session after completed KTT/PSQ exchange
pub fn new(
mut psq_session: Session,
@@ -140,7 +113,7 @@ impl LpTransportSession {
.transport_channel()
.map_err(|inner| LpError::TransportDerivationFailure { inner })?;
Ok(LpTransportSession {
Ok(LpSession {
psq_session,
session_binding,
active_transport: transport,
@@ -157,34 +130,12 @@ impl LpTransportSession {
local_peer: LpLocalPeer,
remote_peer: LpRemotePeer,
remote_protocol_version: u8,
mode: HandshakeMode,
) -> Result<PSQHandshakeStateInitiator<'_, S>, LpError>
) -> PSQHandshakeStateInitiator<'_, S>
where
S: LpHandshakeChannel + Unpin,
{
PSQHandshakeState::new(connection, local_peer).as_initiator(
InitiatorData::new(remote_protocol_version, remote_peer),
mode,
)
}
/// Helper function to create `PSQHandshakeState` for the handshake initiator for mutual KKT
pub fn psq_handshake_initiator_mutual_internode<S>(
connection: &'_ mut S,
local_peer: LpLocalPeer,
remote_peer: LpRemotePeer,
remote_protocol_version: u8,
) -> Result<PSQHandshakeStateInitiator<'_, S>, LpError>
where
S: LpHandshakeChannel + Unpin,
{
Self::psq_handshake_initiator(
connection,
local_peer,
remote_peer,
remote_protocol_version,
HandshakeMode::MutualInternode,
)
PSQHandshakeState::new(connection, local_peer)
.as_initiator(InitiatorData::new(remote_protocol_version, remote_peer))
}
/// Helper function to create `PSQHandshakeState` for the handshake responder
@@ -198,19 +149,6 @@ impl LpTransportSession {
PSQHandshakeState::new(connection, local_peer).as_responder(ResponderData::default())
}
/// Helper function to create `PSQHandshakeState` for the handshake responder for mutual KKT
pub fn psq_handshake_responder_mutual<S>(
connection: &'_ mut S,
local_peer: LpLocalPeer,
initiator_kem_hashes: BTreeMap<KEM, KEMKeyDigests>,
) -> PSQHandshakeStateResponder<'_, S>
where
S: LpHandshakeChannel + Unpin,
{
PSQHandshakeState::new(connection, local_peer)
.as_responder(ResponderData::default().with_initiator_kem_hashes(initiator_kem_hashes))
}
pub fn session_binding(&self) -> &PersistentSessionBinding {
&self.session_binding
}
@@ -334,41 +272,6 @@ impl LpTransportSession {
) -> Result<LpPacket, LpError> {
decrypt_lp_packet(packet, &mut self.active_transport)
}
/// Processes an input event and returns an action to perform.
pub fn process_input(&mut self, input: LpInput) -> Result<LpAction, LpError> {
match input {
LpInput::ReceivePacket(packet) => {
// Check if packet lp_id matches our session
if packet.outer_header().receiver_idx != self.receiver_index() {
return Err(LpError::UnknownSessionId(
packet.outer_header().receiver_idx,
));
}
let ctr = packet.outer_header().counter;
// 1. Check replay protection
self.receiving_counter_quick_check(ctr)?;
// 2. decrypt the packet and attempt to deliver data
let packet = self.decrypt_packet(packet)?;
// 3. Mark counter as received
self.receiving_counter_mark(ctr)?;
// 4. deliver the message
Ok(LpAction::DeliverData(packet.message))
}
LpInput::SendData(data) => {
// Encrypt and send application data
match self.encrypt_application_data(data) {
Ok(packet) => Ok(LpAction::SendPacket(packet)),
Err(e) => Err(e),
}
}
}
}
}
#[cfg(test)]
@@ -458,59 +361,4 @@ mod tests {
assert_eq!(packet_count.received, 2);
}
}
#[test]
fn test_state_machine_simplified_flow() {
for kem in KEM::iter() {
let mock_sessions = SessionsMock::mock_post_handshake(kem);
let receiver_index = mock_sessions.responder.receiver_index();
// Create state machines (already in Transport)
let mut initiator = mock_sessions.initiator;
let mut responder = mock_sessions.responder;
assert_eq!(
initiator.session_identifier(),
responder.session_identifier()
);
// --- Transport Phase ---
println!("--- Step 1: Initiator sends data ---");
let data_to_send_1 = LpMessage::new_opaque(b"hello responder".to_vec());
let init_actions_4 = initiator.process_input(LpInput::SendData(data_to_send_1.clone()));
let data_packet_1 = if let Ok(LpAction::SendPacket(packet)) = init_actions_4 {
packet.clone()
} else {
panic!("Initiator should send data packet");
};
assert_eq!(data_packet_1.outer_header().receiver_idx, receiver_index);
println!("--- Step 2: Responder receives data ---");
let resp_actions_5 = responder.process_input(LpInput::ReceivePacket(data_packet_1));
let resp_data_1 = if let Ok(LpAction::DeliverData(data)) = resp_actions_5 {
data
} else {
panic!("Responder should deliver data");
};
assert_eq!(resp_data_1, data_to_send_1);
println!("--- Step 3: Responder sends data ---");
let data_to_send_2 = LpMessage::new_opaque(b"hello initiator".to_vec());
let resp_actions_6 = responder.process_input(LpInput::SendData(data_to_send_2.clone()));
let data_packet_2 = if let Ok(LpAction::SendPacket(packet)) = resp_actions_6 {
packet.clone()
} else {
panic!("Responder should send data packet");
};
assert_eq!(data_packet_2.outer_header().receiver_idx, receiver_index);
println!("--- Step 4: Initiator receives data ---");
let init_actions_5 = initiator.process_input(LpInput::ReceivePacket(data_packet_2));
if let Ok(LpAction::DeliverData(data)) = init_actions_5 {
assert_eq!(data, data_to_send_2);
} else {
panic!("Initiator should deliver data");
}
}
}
}
+86 -18
View File
@@ -1,7 +1,7 @@
#[cfg(test)]
mod tests {
use crate::packet::{EncryptedLpPacket, LpMessage};
use crate::session::{LpAction, LpInput};
use crate::state_machine::{LpAction, LpInput, LpStateBare};
use crate::{LpError, SessionManager, SessionsMock};
use nym_kkt_ciphersuite::{IntoEnumIterator, KEM};
@@ -41,10 +41,10 @@ mod tests {
// 2. Create sessions using the pre-built Noise states
let peer_a_sm = session_manager_1
.insert_session(sessions.initiator)
.create_session_state_machine(sessions.initiator)
.unwrap();
let peer_b_sm = session_manager_2
.insert_session(sessions.responder)
.create_session_state_machine(sessions.responder)
.unwrap();
// 3. Send multiple encrypted messages both ways
@@ -62,6 +62,7 @@ mod tests {
let decrypted_payload = session_manager_2
.receive_packet(peer_b_sm, ciphertext_a)
.unwrap()
.unwrap()
.data();
assert_eq!(decrypted_payload.content, plaintext_a);
@@ -76,6 +77,7 @@ mod tests {
let decrypted_payload = session_manager_1
.receive_packet(peer_a_sm, ciphertext_b)
.unwrap()
.unwrap()
.data();
assert_eq!(decrypted_payload.content, plaintext_b);
}
@@ -129,24 +131,24 @@ mod tests {
let session2 = sessions.responder;
// 2. Create a session (using real noise state)
let _session = session_manager.insert_session(session1);
let _session = session_manager.create_session_state_machine(session1);
// 3. Try to get a non-existent session
let result = session_manager.session_exists(non_existent);
let result = session_manager.state_machine_exists(non_existent);
assert!(!result, "Non-existent session should return None");
// 4. Try to remove a non-existent session
let result = session_manager.remove_session(non_existent);
let result = session_manager.remove_state_machine(non_existent);
assert!(
!result,
"Remove session should not remove a non-existent session"
);
// 5. Create and immediately remove a session
let _temp_session = session_manager.insert_session(session2);
let _temp_session = session_manager.create_session_state_machine(session2);
assert!(
session_manager.remove_session(session_id),
session_manager.remove_state_machine(session_id),
"Should remove the session"
);
}
@@ -170,16 +172,26 @@ mod tests {
// 2. Create sessions state machines
session_manager_1
.insert_session(sessions.initiator)
.create_session_state_machine(sessions.initiator)
.unwrap();
session_manager_2
.insert_session(sessions.responder)
.create_session_state_machine(sessions.responder)
.unwrap();
assert_eq!(session_manager_1.session_count(), 1);
assert_eq!(session_manager_2.session_count(), 1);
assert!(session_manager_1.session_exists(session_id));
assert!(session_manager_2.session_exists(session_id));
assert!(session_manager_1.state_machine_exists(session_id));
assert!(session_manager_2.state_machine_exists(session_id));
// Verify initial states are Transport
assert_eq!(
session_manager_1.get_state(session_id).unwrap(),
LpStateBare::Transport
);
assert_eq!(
session_manager_2.get_state(session_id).unwrap(),
LpStateBare::Transport
);
// --- 3. Simulate Data Transfer via process_input ---
println!("Starting data transfer simulation via process_input...");
@@ -192,6 +204,7 @@ mod tests {
println!(" A sends to B");
let action_a_send = session_manager_1
.process_input(session_id, LpInput::SendData(plaintext_a_to_b.clone()))
.expect("A SendData should produce action")
.expect("A SendData failed");
let data_packet_a = action_a_send.ciphertext();
@@ -200,6 +213,7 @@ mod tests {
println!(" B receives from A");
let action_b_recv = session_manager_2
.process_input(session_id, LpInput::ReceivePacket(data_packet_a))
.expect("B ReceivePacket (data) should produce action")
.expect("B ReceivePacket (data) failed");
if let LpAction::DeliverData(data) = action_b_recv {
@@ -216,6 +230,7 @@ mod tests {
println!(" B sends to A");
let action_b_send = session_manager_2
.process_input(session_id, LpInput::SendData(plaintext_b_to_a.clone()))
.expect("B SendData should produce action")
.expect("B SendData failed");
let data_packet_b = action_b_send.ciphertext();
@@ -227,6 +242,7 @@ mod tests {
println!(" A receives from B");
let action_a_recv = session_manager_1
.process_input(session_id, LpInput::ReceivePacket(data_packet_b))
.expect("A ReceivePacket (data) should produce action")
.expect("A ReceivePacket (data) failed");
if let LpAction::DeliverData(data) = action_a_recv {
@@ -263,6 +279,7 @@ mod tests {
let action_send_n1 = session_manager_1
.process_input(session_id, LpInput::SendData(data_n_plus_1.clone()))
.unwrap()
.unwrap();
let packet_n1 = match action_send_n1 {
LpAction::SendPacket(p) => p,
@@ -271,6 +288,7 @@ mod tests {
let action_send_n = session_manager_1
.process_input(session_id, LpInput::SendData(data_n.clone()))
.unwrap()
.unwrap();
let packet_n = match action_send_n {
LpAction::SendPacket(p) => p,
@@ -282,6 +300,7 @@ mod tests {
println!(" B receives N+1");
let action_recv_n1 = session_manager_2
.process_input(session_id, LpInput::ReceivePacket(packet_n1))
.unwrap()
.unwrap();
match action_recv_n1 {
LpAction::DeliverData(d) => assert_eq!(d, data_n_plus_1, "Data N+1 mismatch"),
@@ -292,6 +311,7 @@ mod tests {
println!(" B receives N");
let action_recv_n = session_manager_2
.process_input(session_id, LpInput::ReceivePacket(packet_n))
.unwrap()
.unwrap();
match action_recv_n {
LpAction::DeliverData(d) => assert_eq!(d, data_n, "Data N mismatch"),
@@ -309,16 +329,64 @@ mod tests {
);
println!("Out-of-order test passed.");
// --- 6. Session Removal ---
assert!(session_manager_1.remove_session(session_id));
// --- 6. Close Test ---
println!("Testing close via process_input...");
// A closes
let action_a_close = session_manager_1
.process_input(session_id, LpInput::Close)
.expect("A Close should produce action")
.expect("A Close failed");
assert!(matches!(action_a_close, LpAction::ConnectionClosed));
assert_eq!(
session_manager_1.get_state(session_id).unwrap(),
LpStateBare::Closed
);
// Further actions on A fail
let send_after_close_a = session_manager_1.process_input(
session_id,
LpInput::SendData(LpMessage::new_opaque(b"fail".to_vec())),
);
assert!(send_after_close_a.is_err());
assert!(matches!(
send_after_close_a.err().unwrap(),
LpError::LpSessionClosed
));
// B closes
let action_b_close = session_manager_2
.process_input(session_id, LpInput::Close)
.expect("B Close should produce action")
.expect("B Close failed");
assert!(matches!(action_b_close, LpAction::ConnectionClosed));
assert_eq!(
session_manager_2.get_state(session_id).unwrap(),
LpStateBare::Closed
);
// Further actions on B fail
let send_after_close_b = session_manager_2.process_input(
session_id,
LpInput::SendData(LpMessage::new_opaque(b"fail".to_vec())),
);
assert!(send_after_close_b.is_err());
assert!(matches!(
send_after_close_b.err().unwrap(),
LpError::LpSessionClosed
));
println!("Close test passed.");
// --- 7. Session Removal ---
assert!(session_manager_1.remove_state_machine(session_id));
assert_eq!(session_manager_1.session_count(), 0);
assert!(!session_manager_1.session_exists(session_id));
assert!(!session_manager_1.state_machine_exists(session_id));
// B's session manager still has it until removed
assert!(session_manager_2.session_exists(session_id));
assert!(session_manager_2.remove_session(session_id));
assert!(session_manager_2.state_machine_exists(session_id));
assert!(session_manager_2.remove_state_machine(session_id));
assert_eq!(session_manager_2.session_count(), 0);
assert!(!session_manager_2.session_exists(session_id));
assert!(!session_manager_2.state_machine_exists(session_id));
println!("Session removal test passed.");
}
}
+64 -41
View File
@@ -8,26 +8,31 @@
use crate::packet::{EncryptedLpPacket, LpMessage};
use crate::peer_config::LpReceiverIndex;
use crate::{LpError, LpTransportSession};
use crate::state_machine::{LpAction, LpInput, LpStateBare};
use crate::{LpError, LpSession, LpStateMachine};
use std::collections::HashMap;
pub use crate::replay::validator::PacketCount;
use crate::session::{LpAction, LpInput};
/// Manages the lifecycle of Lewes Protocol sessions.
///
/// The SessionManager is responsible for creating, storing, and retrieving sessions
#[derive(Default)]
pub struct SessionManager {
/// Manages state machines directly, keyed by lp_id
sessions: HashMap<LpReceiverIndex, LpTransportSession>,
state_machines: HashMap<LpReceiverIndex, LpStateMachine>,
}
impl Default for SessionManager {
fn default() -> Self {
Self::new()
}
}
impl SessionManager {
/// Creates a new session manager with empty session storage.
pub fn new() -> Self {
Self {
sessions: HashMap::new(),
state_machines: HashMap::new(),
}
}
@@ -35,8 +40,8 @@ impl SessionManager {
&mut self,
lp_id: LpReceiverIndex,
input: LpInput,
) -> Result<LpAction, LpError> {
self.with_session_mut(lp_id, |sm| sm.process_input(input))?
) -> Result<Option<LpAction>, LpError> {
self.with_state_machine_mut(lp_id, |sm| sm.process_input(input).transpose())?
}
pub fn send_data(
@@ -44,39 +49,52 @@ impl SessionManager {
lp_id: LpReceiverIndex,
data: LpMessage,
) -> Result<LpAction, LpError> {
self.process_input(lp_id, LpInput::SendData(data))
self.process_input(lp_id, LpInput::SendData(data))?
.ok_or(LpError::NotInTransport)
}
pub fn receive_packet(
&mut self,
lp_id: LpReceiverIndex,
packet: EncryptedLpPacket,
) -> Result<LpAction, LpError> {
) -> Result<Option<LpAction>, LpError> {
self.process_input(lp_id, LpInput::ReceivePacket(packet))
}
pub fn closed(&self, lp_id: LpReceiverIndex) -> Result<bool, LpError> {
Ok(self.get_state(lp_id)? == LpStateBare::Closed)
}
pub fn transport(&self, lp_id: LpReceiverIndex) -> Result<bool, LpError> {
Ok(self.get_state(lp_id)? == LpStateBare::Transport)
}
#[cfg(test)]
fn get_session_id(&self, lp_id: LpReceiverIndex) -> Result<LpReceiverIndex, LpError> {
self.with_session(lp_id, |sm| sm.receiver_index())
fn get_state_machine_id(&self, lp_id: LpReceiverIndex) -> Result<LpReceiverIndex, LpError> {
self.with_state_machine(lp_id, |sm| sm.receiver_index())?
}
pub fn get_state(&self, lp_id: LpReceiverIndex) -> Result<LpStateBare, LpError> {
self.with_state_machine(lp_id, |sm| Ok(sm.bare_state()))?
}
pub fn current_packet_cnt(&self, lp_id: LpReceiverIndex) -> Result<PacketCount, LpError> {
self.with_session(lp_id, |sm| Ok(sm.current_packet_cnt()))?
self.with_state_machine(lp_id, |sm| Ok(sm.session()?.current_packet_cnt()))?
}
pub fn session_count(&self) -> usize {
self.sessions.len()
self.state_machines.len()
}
pub fn session_exists(&self, lp_id: LpReceiverIndex) -> bool {
self.sessions.contains_key(&lp_id)
pub fn state_machine_exists(&self, lp_id: LpReceiverIndex) -> bool {
self.state_machines.contains_key(&lp_id)
}
pub fn with_session<F, R>(&self, lp_id: LpReceiverIndex, f: F) -> Result<R, LpError>
pub fn with_state_machine<F, R>(&self, lp_id: LpReceiverIndex, f: F) -> Result<R, LpError>
where
F: FnOnce(&LpTransportSession) -> R,
F: FnOnce(&LpStateMachine) -> R,
{
if let Some(sm) = self.sessions.get(&lp_id) {
if let Some(sm) = self.state_machines.get(&lp_id) {
Ok(f(sm))
} else {
Err(LpError::StateMachineNotFound(lp_id))
@@ -84,34 +102,39 @@ impl SessionManager {
}
// For mutable access (like running process_input)
pub fn with_session_mut<F, R>(&mut self, lp_id: LpReceiverIndex, f: F) -> Result<R, LpError>
pub fn with_state_machine_mut<F, R>(
&mut self,
lp_id: LpReceiverIndex,
f: F,
) -> Result<R, LpError>
where
F: FnOnce(&mut LpTransportSession) -> R, // Closure takes mutable ref
F: FnOnce(&mut LpStateMachine) -> R, // Closure takes mutable ref
{
if let Some(sm) = self.sessions.get_mut(&lp_id) {
if let Some(sm) = self.state_machines.get_mut(&lp_id) {
Ok(f(sm))
} else {
Err(LpError::StateMachineNotFound(lp_id))
}
}
pub fn insert_session(
pub fn create_session_state_machine(
&mut self,
lp_session: LpTransportSession,
lp_session: LpSession,
) -> Result<LpReceiverIndex, LpError> {
let session_id = lp_session.receiver_index();
if self.sessions.contains_key(&session_id) {
if self.state_machines.contains_key(&session_id) {
return Err(LpError::DuplicateSessionId(session_id));
}
self.sessions.insert(session_id, lp_session);
let sm = LpStateMachine::new(lp_session);
self.state_machines.insert(session_id, sm);
Ok(session_id)
}
/// Method to remove a state machine
pub fn remove_session(&mut self, lp_id: LpReceiverIndex) -> bool {
let removed = self.sessions.remove(&lp_id);
pub fn remove_state_machine(&mut self, lp_id: LpReceiverIndex) -> bool {
let removed = self.state_machines.remove(&lp_id);
removed.is_some()
}
@@ -129,13 +152,13 @@ mod tests {
let local_session = mock_session_for_test();
let id = local_session.receiver_index();
let sm_1_id = manager.insert_session(local_session).unwrap();
let sm_1_id = manager.create_session_state_machine(local_session).unwrap();
assert_eq!(sm_1_id, id);
let retrieved = manager.session_exists(id);
let retrieved = manager.state_machine_exists(id);
assert!(retrieved);
let not_found = manager.session_exists(123);
let not_found = manager.state_machine_exists(123);
assert!(!not_found);
}
@@ -143,13 +166,13 @@ mod tests {
fn test_session_manager_remove() {
let mut manager = SessionManager::new();
let local_session = mock_session_for_test();
let sm_1_id = manager.insert_session(local_session).unwrap();
let sm_1_id = manager.create_session_state_machine(local_session).unwrap();
let removed = manager.remove_session(sm_1_id);
let removed = manager.remove_state_machine(sm_1_id);
assert!(removed);
assert_eq!(manager.session_count(), 0);
let removed_again = manager.remove_session(sm_1_id);
let removed_again = manager.remove_state_machine(sm_1_id);
assert!(!removed_again);
}
@@ -161,15 +184,15 @@ mod tests {
let session2 = SessionsMock::mock_seeded_post_handshake(124, kem).initiator;
let session3 = SessionsMock::mock_seeded_post_handshake(125, kem).initiator;
let sm_1 = manager.insert_session(session1).unwrap();
let sm_2 = manager.insert_session(session2).unwrap();
let sm_3 = manager.insert_session(session3).unwrap();
let sm_1 = manager.create_session_state_machine(session1).unwrap();
let sm_2 = manager.create_session_state_machine(session2).unwrap();
let sm_3 = manager.create_session_state_machine(session3).unwrap();
assert_eq!(manager.session_count(), 3);
let retrieved1 = manager.get_session_id(sm_1).unwrap();
let retrieved2 = manager.get_session_id(sm_2).unwrap();
let retrieved3 = manager.get_session_id(sm_3).unwrap();
let retrieved1 = manager.get_state_machine_id(sm_1).unwrap();
let retrieved2 = manager.get_state_machine_id(sm_2).unwrap();
let retrieved3 = manager.get_state_machine_id(sm_3).unwrap();
assert_eq!(retrieved1, sm_1);
assert_eq!(retrieved2, sm_2);
@@ -183,10 +206,10 @@ mod tests {
let sesion = mock_session_for_test();
let sm = manager.insert_session(sesion).unwrap();
let sm = manager.create_session_state_machine(sesion).unwrap();
assert_eq!(manager.session_count(), 1);
let retrieved = manager.get_session_id(sm);
let retrieved = manager.get_state_machine_id(sm);
assert!(retrieved.is_ok());
assert_eq!(retrieved.unwrap(), sm);
}
+352
View File
@@ -0,0 +1,352 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! Lewes Protocol State Machine for managing connection lifecycle.
//! State machine ensures protocol steps execute in correct order. Invalid transitions
//! return LpError, preventing protocol violations.
use crate::packet::EncryptedLpPacket;
use crate::packet::message::LpMessage;
use crate::peer_config::LpReceiverIndex;
use crate::session::SessionId;
use crate::{LpError, session::LpSession};
use std::mem;
#[derive(Debug)]
pub struct LpTransportState {
/// The underlying session in the transport state
session: Box<LpSession>,
}
/// Represents the possible states of the Lewes Protocol connection.
#[derive(Debug, Default)]
pub enum LpState {
/// Handshake complete, ready for data transport.
Transport(LpTransportState),
/// An error occurred, or the connection was intentionally closed.
Closed { reason: String },
/// Processing an input event.
#[default]
Processing,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LpStateBare {
Transport,
Closed,
Processing,
}
impl From<&LpState> for LpStateBare {
fn from(state: &LpState) -> Self {
match state {
LpState::Transport { .. } => LpStateBare::Transport,
LpState::Closed { .. } => LpStateBare::Closed,
LpState::Processing => LpStateBare::Processing,
}
}
}
/// Represents inputs that drive the state machine transitions.
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum LpInput {
/// Received an encrypted LP Packet from the network.
ReceivePacket(EncryptedLpPacket),
/// Application wants to send data (only valid in Transport state).
SendData(LpMessage),
/// Close the connection.
Close,
}
/// Represents actions the state machine requests the environment to perform.
#[derive(Debug)]
pub enum LpAction {
/// Send an LP Packet over the network.
SendPacket(EncryptedLpPacket),
/// Deliver decrypted application data received from the peer.
DeliverData(LpMessage),
/// Inform the environment that the connection is closed.
ConnectionClosed,
}
/// The Lewes Protocol State Machine.
pub struct LpStateMachine {
pub state: LpState,
}
impl LpStateMachine {
pub fn bare_state(&self) -> LpStateBare {
LpStateBare::from(&self.state)
}
pub fn session_mut(&mut self) -> Result<&mut LpSession, LpError> {
match &mut self.state {
LpState::Transport(transport) => Ok(&mut transport.session),
LpState::Closed { .. } => Err(LpError::LpSessionClosed),
LpState::Processing => Err(LpError::LpSessionProcessing),
}
}
pub fn session(&self) -> Result<&LpSession, LpError> {
match &self.state {
LpState::Transport(transport) => Ok(&transport.session),
LpState::Closed { .. } => Err(LpError::LpSessionClosed),
LpState::Processing => Err(LpError::LpSessionProcessing),
}
}
/// Consume the state machine and return the session with ownership.
/// This is useful when the handshake is complete and you want to transfer
/// ownership of the session to the caller.
pub fn into_session(self) -> Result<LpSession, LpError> {
match self.state {
LpState::Transport(transport) => Ok(*transport.session),
LpState::Closed { .. } => Err(LpError::LpSessionClosed),
LpState::Processing => Err(LpError::LpSessionProcessing),
}
}
pub fn session_identifier(&self) -> Result<SessionId, LpError> {
Ok(*self.session()?.session_identifier())
}
pub fn receiver_index(&self) -> Result<LpReceiverIndex, LpError> {
Ok(self.session()?.receiver_index())
}
/// Creates a new state machine in `Transport` state post-KKT/PSQ handshake
pub fn new(session: LpSession) -> Self {
LpStateMachine {
state: LpState::Transport(LpTransportState {
session: Box::new(session),
}),
}
}
fn process_input_transport(
&mut self,
mut state: LpTransportState,
input: LpInput,
) -> (LpState, Option<Result<LpAction, LpError>>) {
let session = &mut state.session;
match input {
LpInput::ReceivePacket(packet) => {
// Check if packet lp_id matches our session
if packet.outer_header().receiver_idx != session.receiver_index() {
let result_action = Some(Err(LpError::UnknownSessionId(
packet.outer_header().receiver_idx,
)));
return (LpState::Transport(state), result_action);
}
let ctr = packet.outer_header().counter;
// 1. Check replay protection
if let Err(e) = session.receiving_counter_quick_check(ctr) {
return (LpState::Transport(state), Some(Err(e)));
}
// 2. decrypt the packet and attempt to deliver data
let packet = match session.decrypt_packet(packet) {
Ok(packet) => packet,
Err(e) => return (LpState::Transport(state), Some(Err(e))),
};
// 3. Mark counter as received
if let Err(e) = session.receiving_counter_mark(ctr) {
return (LpState::Transport(state), Some(Err(e)));
}
// 4. deliver the message
let message = packet.message;
let result_action = Some(Ok(LpAction::DeliverData(message)));
(LpState::Transport(state), result_action)
}
LpInput::SendData(data) => {
// Encrypt and send application data
let result_action = match self.prepare_data_packet(session, data) {
Ok(packet) => Some(Ok(LpAction::SendPacket(packet))),
Err(e) => {
// If prepare fails, should we close? Let's report error and stay Transport for now.
// Alternative: transition to Closed state.
Some(Err(e))
}
};
// Remain in transport state
(LpState::Transport(state), result_action)
}
// --- Close Transition ---
LpInput::Close => {
// Transition to Closed state
(
LpState::Closed {
reason: "Closed by user".to_string(),
},
Some(Ok(LpAction::ConnectionClosed)),
)
}
}
}
/// Processes an input event and returns a list of actions to perform.
pub fn process_input(&mut self, input: LpInput) -> Option<Result<LpAction, LpError>> {
// 1. Replace current state with a placeholder, taking ownership of the real current state.
let current_state = mem::take(&mut self.state);
let mut result_action: Option<Result<LpAction, LpError>> = None;
// 2. Match on the owned current_state. Each arm calculates and returns the NEXT state.
let next_state = match (current_state, input) {
// --- Transport State ---
(LpState::Transport(transport), input) => {
let (next_state, action) = self.process_input_transport(transport, input);
result_action = action;
next_state
}
// Ignore Close if already Closed
(closed_state @ LpState::Closed { .. }, LpInput::Close) => {
// result_action remains None
// Return the original closed state
closed_state
}
// Ignore ReceivePacket if Closed
(closed_state @ LpState::Closed { .. }, LpInput::ReceivePacket(_)) => {
result_action = Some(Err(LpError::LpSessionClosed));
closed_state
}
// Ignore SendData if Closed
(closed_state @ LpState::Closed { .. }, LpInput::SendData(_)) => {
result_action = Some(Err(LpError::LpSessionClosed));
closed_state
}
// Processing state should not be matched directly if using replace
(LpState::Processing, _) => {
// This case should ideally be unreachable if placeholder logic is correct
let err = LpError::Internal("Reached Processing state unexpectedly".to_string());
let reason = err.to_string();
result_action = Some(Err(err));
LpState::Closed { reason }
}
};
// 3. Put the calculated next state back into the machine.
self.state = next_state;
result_action // Return the determined action (or None)
}
// Helper to prepare an outgoing data packet
// Kept as it doesn't mutate self.state
fn prepare_data_packet(
&self,
session: &mut LpSession,
data: LpMessage,
) -> Result<EncryptedLpPacket, LpError> {
session.encrypt_application_data(data)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::SessionsMock;
use nym_kkt_ciphersuite::{IntoEnumIterator, KEM};
#[test]
fn test_state_machine_init() {
for kem in KEM::iter() {
let mock_sessions = SessionsMock::mock_post_handshake(kem);
let initiator_sm = LpStateMachine::new(mock_sessions.initiator);
assert!(matches!(initiator_sm.state, LpState::Transport { .. }));
let init_session = initiator_sm.session().unwrap();
let responder_sm = LpStateMachine::new(mock_sessions.responder);
assert!(matches!(responder_sm.state, LpState::Transport { .. }));
let resp_session = responder_sm.session().unwrap();
// Check both state machines use the same receiver_index
assert_eq!(init_session.receiver_index(), resp_session.receiver_index());
}
}
#[test]
fn test_state_machine_simplified_flow() {
for kem in KEM::iter() {
let mock_sessions = SessionsMock::mock_post_handshake(kem);
let receiver_index = mock_sessions.responder.receiver_index();
// Create state machines (already in Transport)
let mut initiator = LpStateMachine::new(mock_sessions.initiator);
let mut responder = LpStateMachine::new(mock_sessions.responder);
assert_eq!(
initiator.session_identifier().unwrap(),
responder.session_identifier().unwrap()
);
// --- Transport Phase ---
println!("--- Step 1: Initiator sends data ---");
let data_to_send_1 = LpMessage::new_opaque(b"hello responder".to_vec());
let init_actions_4 = initiator.process_input(LpInput::SendData(data_to_send_1.clone()));
let data_packet_1 = if let Some(Ok(LpAction::SendPacket(packet))) = init_actions_4 {
packet.clone()
} else {
panic!("Initiator should send data packet");
};
assert_eq!(data_packet_1.outer_header().receiver_idx, receiver_index);
println!("--- Step 2: Responder receives data ---");
let resp_actions_5 = responder.process_input(LpInput::ReceivePacket(data_packet_1));
let resp_data_1 = if let Some(Ok(LpAction::DeliverData(data))) = resp_actions_5 {
data
} else {
panic!("Responder should deliver data");
};
assert_eq!(resp_data_1, data_to_send_1);
println!("--- Step 3: Responder sends data ---");
let data_to_send_2 = LpMessage::new_opaque(b"hello initiator".to_vec());
let resp_actions_6 = responder.process_input(LpInput::SendData(data_to_send_2.clone()));
let data_packet_2 = if let Some(Ok(LpAction::SendPacket(packet))) = resp_actions_6 {
packet.clone()
} else {
panic!("Responder should send data packet");
};
assert_eq!(data_packet_2.outer_header().receiver_idx, receiver_index);
println!("--- Step 4: Initiator receives data ---");
let init_actions_5 = initiator.process_input(LpInput::ReceivePacket(data_packet_2));
if let Some(Ok(LpAction::DeliverData(data))) = init_actions_5 {
assert_eq!(data, data_to_send_2);
} else {
panic!("Initiator should deliver data");
}
// --- Close ---
println!("--- Step 5: Initiator closes ---");
let init_actions_6 = initiator.process_input(LpInput::Close);
assert!(matches!(
init_actions_6,
Some(Ok(LpAction::ConnectionClosed))
));
assert!(matches!(initiator.state, LpState::Closed { .. }));
println!("--- Step 6: Responder closes ---");
let resp_actions_7 = responder.process_input(LpInput::Close);
assert!(matches!(
resp_actions_7,
Some(Ok(LpAction::ConnectionClosed))
));
assert!(matches!(responder.state, LpState::Closed { .. }));
}
}
}
+1 -2
View File
@@ -79,11 +79,10 @@ async fn read_n_bytes_async_read<R>(reader: &mut R, n: usize) -> Result<Vec<u8>,
where
R: AsyncRead + Unpin,
{
let mut buf = vec![0u8; n];
if n > MAX_HANDSHAKE_PACKET_SIZE {
return Err(LpTransportError::PacketTooBig { size: n });
}
let mut buf = vec![0u8; n];
reader
.read_exact(&mut buf)
.await
+3 -3
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use ::serde::{Deserialize, Serialize};
use nym_api_requests::nym_nodes::SkimmedNodeV1;
use nym_api_requests::nym_nodes::SkimmedNode;
use nym_crypto::asymmetric::ed25519;
use nym_mixnet_contract_common::EpochId;
use nym_sphinx_addressing::nodes::NodeIdentity;
@@ -283,11 +283,11 @@ impl NymTopology {
serde_json::from_reader(file).map_err(Into::into)
}
pub fn add_skimmed_nodes(&mut self, nodes: &[SkimmedNodeV1]) {
pub fn add_skimmed_nodes(&mut self, nodes: &[SkimmedNode]) {
self.add_additional_nodes(nodes.iter())
}
pub fn with_skimmed_nodes(mut self, nodes: &[SkimmedNodeV1]) -> Self {
pub fn with_skimmed_nodes(mut self, nodes: &[SkimmedNode]) -> Self {
self.add_skimmed_nodes(nodes);
self
}
+3 -3
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use nym_api_requests::models::DeclaredRolesV1;
use nym_api_requests::nym_nodes::SkimmedNodeV1;
use nym_api_requests::nym_nodes::SkimmedNode;
use nym_crypto::asymmetric::{ed25519, x25519};
use nym_mixnet_contract_common::NodeId;
use nym_sphinx_addressing::nodes::NymNodeRoutingAddress;
@@ -146,10 +146,10 @@ impl<'a> From<&'a RoutingNode> for SphinxNode {
}
}
impl<'a> TryFrom<&'a SkimmedNodeV1> for RoutingNode {
impl<'a> TryFrom<&'a SkimmedNode> for RoutingNode {
type Error = RoutingNodeError;
fn try_from(value: &'a SkimmedNodeV1) -> Result<Self, Self::Error> {
fn try_from(value: &'a SkimmedNode) -> Result<Self, Self::Error> {
// IF YOU EVER ADD "performance" TO RoutingNode,
// MAKE SURE TO UPDATE THE LAZY IMPLEMENTATION OF
// `impl NodeDescriptionTopologyExt for NymNodeDescription`!!!
@@ -212,13 +212,13 @@ chmod +x network-tunnel-manager.sh
- [Deriving `Serialize` for `GatewayData`](https://github.com/nymtech/nym/pull/6314): Deriving `Serialize` for gateway data, that will be used by the diagnostic tool in the `vpn-client` repo
- [DNS static table pre-resolve](https://github.com/nymtech/nym/pull/6297): This PR adds pre-resolve stage that returns address if we have used static table previously. This ensures that we don't continually suffer the penalty of a lookup timeout, while also allowing for the possibility of going back to the default internal secure resolver if one or more nameservers becomes usable again at a future time.
- [DNS static table pre-resolve](https://github.com/nymtech/nym/pull/6297): This PR adds pre-resolve stage that returns addres if we have used static table previously. This ensures that we don't continually suffer the penalty of a lookup timeout, while also allowing for the possibility of going back to the default internal secure resolver if one or more nameservers becomes usable again at a future time.
- [Add `Copy+Clone` to `nym_api_provider::Config`](https://github.com/nymtech/nym/pull/6296): Add `Copy+Clone` to `nym_client_core::client::topology_control::nym_api_provider::Config`
- [LP Registration + Telescoping + Gateway Probe Localnet Mode](https://github.com/nymtech/nym/pull/6286): Combines LP registration protocol implementation, adds telescoping/nested sessions support, adds localnet mode for `gateway-probe` testing, integrates KKT & PSQ cryptographic primitives
- [Minor DNS improvements](https://github.com/nymtech/nym/pull/6283): Increase timeouts back to 10 seconds for overall lookup and 5 seconds per query, ignore unreliable test, remove JIT resolution in http client as it is at best not useful, and at worst increasing timeout
- [Minor DNS improvements](https://github.com/nymtech/nym/pull/6283): Increase timeouts back to 10 seconds for overall lookup and 5 seconds per query, gnore unreliable test, remove JIT resolution in http client as it is at best not useful, and at worst increasing timeout
- [HTTP client without default features](https://github.com/nymtech/nym/pull/6281): Fix compile issue caused when using the http client using `default-features=false`
@@ -250,7 +250,7 @@ chmod +x network-tunnel-manager.sh
- [Update nix to `v0.30.1`](https://github.com/nymtech/nym/pull/6316)
- [Remove repetitive words in comment](https://github.com/nymtech/nym/pull/6313)
- [Rremove repetitive words in comment](https://github.com/nymtech/nym/pull/6313)
- [Clippy fixes and use fixed rust version from `REQUIRED_RUSTC_VERSION`](https://github.com/nymtech/nym/pull/6295)
+2 -2
View File
@@ -105,9 +105,9 @@ pub struct GatewayTasksBuilder {
shutdown_tracker: ShutdownTracker,
// populated and cached as necessary
use_mock_ecash: bool,
// populated and cached as necessary
ecash_manager:
Option<Arc<dyn nym_credential_verification::ecash::traits::EcashManager + Send + Sync>>,
@@ -226,7 +226,7 @@ impl GatewayTasksBuilder {
> {
// Check if we should use mock ecash for testing
if self.use_mock_ecash {
warn!("Using MockEcashManager for testing (credentials NOT verified)");
warn!("Using MockEcashManager for LP testing (credentials NOT verified)");
let mock_manager = MockEcashManager::new(Box::new(self.storage.clone()));
return Ok(Arc::new(mock_manager)
as Arc<
+26 -18
View File
@@ -17,10 +17,9 @@ mod tests {
use nym_lp::peer::LpLocalPeer;
use nym_node::config::{LpConfig, LpDebug};
use nym_node::node::GatewayStorage;
use nym_node::node::lp::control::ingress::client_handler::LpClientConnectionHandler;
use nym_node::node::lp::error::LpHandlerError;
use nym_node::node::lp::state::{ActiveLpSessions, NestedConnectionsManager};
use nym_node::node::lp::{SharedLpClientControlState, SharedLpState};
use nym_node::node::lp::handler::LpConnectionHandler;
use nym_node::node::lp::{LpHandlerState, MixForwardingReceiver, mix_forwarding_channels};
use nym_node::wireguard::{PeerManager, PeerRegistrator};
use nym_registration_client::{LpClientError, LpRegistrationClient};
use nym_test_utils::helpers::{CryptoRng09, seeded_rng};
@@ -36,7 +35,7 @@ mod tests {
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::sync::mpsc::{Receiver, channel};
use tokio::sync::mpsc::Receiver;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::error;
@@ -121,7 +120,7 @@ mod tests {
enum SpawnedLpConnectionHandlerState {
NotCreated,
Ready {
handler: LpClientConnectionHandler<MockIOStream>,
handler: LpConnectionHandler<MockIOStream>,
},
Running {
handle: JoinHandle<Option<Result<(), LpHandlerError>>>,
@@ -131,8 +130,11 @@ mod tests {
struct Gateway {
base: Party,
lp_state: SharedLpClientControlState,
lp_state: LpHandlerState,
ip_pool: IpPool,
// might be used later for mixnet registration tests
#[allow(unused)]
mix_receiver: MixForwardingReceiver,
mock_peer_controller: SpawnedPeerController,
tasks_cancellation: CancellationToken,
@@ -208,6 +210,9 @@ mod tests {
let forward_semaphore =
Arc::new(Semaphore::new(lp_config.debug.max_concurrent_forwards));
// Create mix forwarding channel (unused in tests but required by struct)
let (mix_sender, mix_receiver) = mix_forwarding_channels();
// create wireguard data
let (wireguard_data, peer_request_rx) = Self::wireguard_data(&base);
@@ -217,9 +222,6 @@ mod tests {
let (mock_peer_controller, peer_controller_state) =
mock_peer_controller(peer_request_rx);
let (connection_ctrl_sender, _connection_manager_receiver) = channel(42);
let nested_connections_manager = NestedConnectionsManager::new(connection_ctrl_sender);
// registering particular responses for peer controller is up to given test
let ecash_verifier = Arc::new(ecash_verifier);
@@ -229,25 +231,31 @@ mod tests {
upgrade_mode_details,
);
let lp_state = SharedLpClientControlState {
let lp_state = LpHandlerState {
local_lp_peer: base.peer.clone(),
metrics: Default::default(),
// use default lp config (with enabled flag)
lp_config,
// TODO: might be needed later on for mixnet registration
outbound_mix_sender: mix_sender,
// we start with empty state
session_states: Arc::new(Default::default()),
forward_semaphore,
// handles for dealing with new peers
peer_registrator: Some(peer_registrator),
shared: SharedLpState {
metrics: Default::default(),
lp_config,
session_states: ActiveLpSessions::new(),
},
nested_connections_manager,
};
Ok(Gateway {
base,
lp_state,
ip_pool: Self::ip_pool(),
mix_receiver,
mock_peer_controller: SpawnedPeerController::Ready {
controller: mock_peer_controller,
},
@@ -267,7 +275,7 @@ mod tests {
};
self.lp_connection_handler = SpawnedLpConnectionHandlerState::Ready {
handler: LpClientConnectionHandler::new(
handler: LpConnectionHandler::new(
client_connection,
client_address,
self.lp_state.clone(),
@@ -295,7 +303,7 @@ mod tests {
}
fn spawn_lp_handler(&mut self) {
let SpawnedLpConnectionHandlerState::Ready { mut handler } = mem::replace(
let SpawnedLpConnectionHandlerState::Ready { handler } = mem::replace(
&mut self.lp_connection_handler,
SpawnedLpConnectionHandlerState::NotCreated,
) else {
@@ -6,7 +6,7 @@ use crate::models::{
HostInformationV1, IpPacketRouterDetailsV1, NetworkRequesterDetailsV1,
OffsetDateTimeJsonSchemaWrapper, WebSocketsV1, WireguardDetailsV1,
};
use crate::nym_nodes::{BasicEntryInformation, NodeRole, SemiSkimmedNodeV1, SkimmedNodeV1};
use crate::nym_nodes::{BasicEntryInformation, NodeRole, SemiSkimmedNode, SkimmedNode};
use nym_crypto::asymmetric::{ed25519, x25519};
use nym_mixnet_contract_common::reward_params::Performance;
use nym_mixnet_contract_common::NodeId;
@@ -76,7 +76,7 @@ impl NymNodeDescriptionV1 {
current_rotation_id: u32,
role: NodeRole,
performance: Performance,
) -> SkimmedNodeV1 {
) -> SkimmedNode {
let keys = &self.description.host_information.keys;
let entry = if self.description.declared_role.entry {
Some(self.entry_information())
@@ -84,7 +84,7 @@ impl NymNodeDescriptionV1 {
None
};
SkimmedNodeV1 {
SkimmedNode {
node_id: self.node_id,
ed25519_identity_pubkey: keys.ed25519,
ip_addresses: self.description.host_information.ip_address.clone(),
@@ -105,10 +105,10 @@ impl NymNodeDescriptionV1 {
current_rotation_id: u32,
role: NodeRole,
performance: Performance,
) -> SemiSkimmedNodeV1 {
) -> SemiSkimmedNode {
let skimmed_node = self.to_skimmed_node(current_rotation_id, role, performance);
SemiSkimmedNodeV1 {
SemiSkimmedNode {
basic: skimmed_node,
x25519_noise_versioned_key: self
.description
@@ -7,9 +7,7 @@ use crate::models::{
LewesProtocolDetailsV1, NetworkRequesterDetailsV1, NymNodeDataV1, NymNodeDescriptionV1,
OffsetDateTimeJsonSchemaWrapper, SphinxKeyV1, WebSocketsV1, WireguardDetailsV1,
};
use crate::nym_nodes::{
BasicEntryInformation, NodeRole, SemiSkimmedNodeV1, SemiSkimmedNodeV3, SkimmedNodeV1,
};
use crate::nym_nodes::{BasicEntryInformation, NodeRole, SemiSkimmedNode, SkimmedNode};
use nym_crypto::asymmetric::{ed25519, x25519};
use nym_mixnet_contract_common::reward_params::Performance;
use nym_mixnet_contract_common::NodeId;
@@ -94,7 +92,7 @@ impl NymNodeDescriptionV2 {
current_rotation_id: u32,
role: NodeRole,
performance: Performance,
) -> SkimmedNodeV1 {
) -> SkimmedNode {
let keys = &self.description.host_information.keys;
let entry = if self.description.declared_role.entry {
Some(self.entry_information())
@@ -102,7 +100,7 @@ impl NymNodeDescriptionV2 {
None
};
SkimmedNodeV1 {
SkimmedNode {
node_id: self.node_id,
ed25519_identity_pubkey: keys.ed25519,
ip_addresses: self.description.host_information.ip_address.clone(),
@@ -123,10 +121,10 @@ impl NymNodeDescriptionV2 {
current_rotation_id: u32,
role: NodeRole,
performance: Performance,
) -> SemiSkimmedNodeV1 {
) -> SemiSkimmedNode {
let skimmed_node = self.to_skimmed_node(current_rotation_id, role, performance);
SemiSkimmedNodeV1 {
SemiSkimmedNode {
basic: skimmed_node,
x25519_noise_versioned_key: self
.description
@@ -135,26 +133,6 @@ impl NymNodeDescriptionV2 {
.x25519_versioned_noise,
}
}
pub fn to_semi_skimmed_node_v3(
&self,
current_rotation_id: u32,
role: NodeRole,
performance: Performance,
) -> SemiSkimmedNodeV3 {
let skimmed_node = self.to_skimmed_node(current_rotation_id, role, performance);
SemiSkimmedNodeV3 {
basic: skimmed_node,
noise_key: self
.description
.host_information
.keys
.x25519_versioned_noise,
build_version: self.description.build_information.build_version.clone(),
lp: self.description.lewes_protocol.clone(),
}
}
}
// to whoever is thinking of modifying this struct.
+12 -28
View File
@@ -1,9 +1,7 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::models::{
DeclaredRolesV1, LewesProtocolDetailsV1, NymNodeDataV1, OffsetDateTimeJsonSchemaWrapper,
};
use crate::models::{DeclaredRolesV1, NymNodeDataV1, OffsetDateTimeJsonSchemaWrapper};
use crate::pagination::{PaginatedResponse, Pagination};
use nym_crypto::asymmetric::ed25519::serde_helpers::bs58_ed25519_pubkey;
use nym_crypto::asymmetric::x25519::serde_helpers::bs58_x25519_pubkey;
@@ -20,24 +18,24 @@ use utoipa::ToSchema;
#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema, utoipa::ToSchema)]
pub struct SkimmedNodesWithMetadata {
pub nodes: Vec<SkimmedNodeV1>,
pub nodes: Vec<SkimmedNode>,
pub metadata: NodesResponseMetadata,
}
impl SkimmedNodesWithMetadata {
pub fn new(nodes: Vec<SkimmedNodeV1>, metadata: NodesResponseMetadata) -> Self {
pub fn new(nodes: Vec<SkimmedNode>, metadata: NodesResponseMetadata) -> Self {
SkimmedNodesWithMetadata { nodes, metadata }
}
}
#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema, utoipa::ToSchema)]
pub struct SemiSkimmedNodesWithMetadata {
pub nodes: Vec<SemiSkimmedNodeV1>,
pub nodes: Vec<SemiSkimmedNode>,
pub metadata: NodesResponseMetadata,
}
impl SemiSkimmedNodesWithMetadata {
pub fn new(nodes: Vec<SemiSkimmedNodeV1>, metadata: NodesResponseMetadata) -> Self {
pub fn new(nodes: Vec<SemiSkimmedNode>, metadata: NodesResponseMetadata) -> Self {
SemiSkimmedNodesWithMetadata { nodes, metadata }
}
}
@@ -230,7 +228,9 @@ pub struct BasicEntryInformation {
// the bare minimum information needed to construct sphinx packets
#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema, ToSchema)]
pub struct SkimmedNodeV1 {
pub struct SkimmedNode {
// in directory v3 all nodes (mixnodes AND gateways) will have a unique id
// but to keep structure consistent, introduce this field now
#[schema(value_type = u32)]
pub node_id: NodeId,
@@ -263,7 +263,7 @@ pub struct SkimmedNodeV1 {
pub performance: Performance,
}
impl SkimmedNodeV1 {
impl SkimmedNode {
pub fn get_mix_layer(&self) -> Option<u8> {
match self.role {
NodeRole::Mixnode { layer } => Some(layer),
@@ -275,8 +275,8 @@ impl SkimmedNodeV1 {
// an intermediate variant that exposes additional data such as noise keys but without
// the full fat of the self-described data
#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema, ToSchema)]
pub struct SemiSkimmedNodeV1 {
pub basic: SkimmedNodeV1,
pub struct SemiSkimmedNode {
pub basic: SkimmedNode,
pub x25519_noise_versioned_key: Option<VersionedNoiseKeyV1>,
// pub location:
@@ -284,7 +284,7 @@ pub struct SemiSkimmedNodeV1 {
#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema, ToSchema)]
pub struct FullFatNode {
pub expanded: SemiSkimmedNodeV1,
pub expanded: SemiSkimmedNode,
// kinda temporary for now to make as few changes as possible for now
pub self_described: Option<NymNodeDataV1>,
@@ -301,19 +301,3 @@ pub struct NodesByAddressesResponse {
#[schema(value_type = HashMap<String, Option<u32>>)]
pub existence: HashMap<IpAddr, Option<NodeId>>,
}
/// All the information required for sending packets between nodes (sphinx, noise, LP)
#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema, ToSchema)]
pub struct SemiSkimmedNodeV3 {
/// Basic node information required for mixnet routing
pub basic: SkimmedNodeV1,
/// Noise key of the node
pub noise_key: Option<VersionedNoiseKeyV1>,
/// Build version of this node used as a hint in inferring the Ciphersuite compatibility
pub build_version: String,
/// Information required for establishing an LP connection
pub lp: Option<LewesProtocolDetailsV1>,
}
+1 -1
View File
@@ -127,7 +127,7 @@ impl DailyMerkleTree {
pub(crate) fn maybe_rebuild(&mut self) {
// every 1000 leaves, rebuild the tree to purge the history
// (I wish the API of the library allowed to do it without having to go through those extra steps...)
if !self.inserted_leaves.is_empty() && self.inserted_leaves.len().is_multiple_of(1000) {
if !self.inserted_leaves.is_empty() && self.inserted_leaves.len() % 1000 == 0 {
self.rebuild_without_history();
}
}
+1 -1
View File
@@ -803,7 +803,7 @@ impl EcashState {
merkle_entry.maybe_rebuild();
// toss a coin to check if we should clean memory of old merkle trees
if thread_rng().next_u32().is_multiple_of(10000) {
if thread_rng().next_u32() % 10000 == 0 {
let mut values_to_clean = Vec::new();
let cutoff = self.config.ticketbook_retention_cutoff();
info!("attempting to remove old issued ticketbooks. the cutoff is set to {cutoff}");
+1 -3
View File
@@ -10,7 +10,6 @@ use crate::support::http::openapi::ApiDoc;
use crate::support::http::state::AppState;
use crate::unstable_routes::v1::unstable_routes_v1;
use crate::unstable_routes::v2::unstable_routes_v2;
use crate::unstable_routes::v3::unstable_routes_v3;
use crate::{nym_nodes, status};
use anyhow::anyhow;
use axum::response::Redirect;
@@ -71,8 +70,7 @@ impl RouterBuilder {
Router::new()
.nest("/unstable", unstable_routes_v2())
.nest("/nym-nodes", nym_nodes::handlers::v2::routes()),
)
.nest("/v3", Router::new().nest("/unstable", unstable_routes_v3()));
);
Self {
unfinished_router: default_routes,
-1
View File
@@ -4,4 +4,3 @@
pub(crate) mod helpers;
pub(crate) mod v1;
pub(crate) mod v2;
pub(crate) mod v3;
@@ -5,7 +5,7 @@ use crate::node_status_api::models::{AxumErrorResponse, AxumResult};
use crate::support::http::state::AppState;
use crate::unstable_routes::v1::nym_nodes::helpers::NodesParamsWithRole;
use axum::extract::{Query, State};
use nym_api_requests::nym_nodes::{CachedNodesResponse, SemiSkimmedNodeV1};
use nym_api_requests::nym_nodes::{CachedNodesResponse, SemiSkimmedNode};
use nym_http_api_common::FormattedResponse;
#[utoipa::path(
@@ -22,6 +22,6 @@ use nym_http_api_common::FormattedResponse;
pub(crate) async fn nodes_expanded(
_state: State<AppState>,
_query_params: Query<NodesParamsWithRole>,
) -> AxumResult<FormattedResponse<CachedNodesResponse<SemiSkimmedNodeV1>>> {
) -> AxumResult<FormattedResponse<CachedNodesResponse<SemiSkimmedNode>>> {
Err(AxumErrorResponse::not_implemented())
}
@@ -11,7 +11,7 @@ use crate::unstable_routes::v1::nym_nodes::skimmed::{
PaginatedCachedNodesResponseSchema, PaginatedSkimmedNodes,
};
use axum::extract::{Query, State};
use nym_api_requests::nym_nodes::{CachedNodesResponse, NodeRoleQueryParam, SkimmedNodeV1};
use nym_api_requests::nym_nodes::{CachedNodesResponse, NodeRoleQueryParam, SkimmedNode};
use nym_http_api_common::FormattedResponse;
/// Deprecated query that gets ALL gateways
@@ -23,9 +23,9 @@ use nym_http_api_common::FormattedResponse;
context_path = "/v1/unstable/nym-nodes",
responses(
(status = 200, content(
(CachedNodesResponse<SkimmedNodeV1> = "application/json"),
(CachedNodesResponse<SkimmedNodeV1> = "application/yaml"),
(CachedNodesResponse<SkimmedNodeV1> = "application/bincode")
(CachedNodesResponse<SkimmedNode> = "application/json"),
(CachedNodesResponse<SkimmedNode> = "application/yaml"),
(CachedNodesResponse<SkimmedNode> = "application/bincode")
))
),
)]
@@ -34,7 +34,7 @@ use nym_http_api_common::FormattedResponse;
pub(crate) async fn deprecated_gateways_basic(
state: State<AppState>,
query_params: Query<NodesParams>,
) -> AxumResult<FormattedResponse<CachedNodesResponse<SkimmedNodeV1>>> {
) -> AxumResult<FormattedResponse<CachedNodesResponse<SkimmedNode>>> {
let output = query_params.output.unwrap_or_default();
// 1. call '/v1/unstable/skimmed/entry-gateways/all'
@@ -59,9 +59,9 @@ pub(crate) async fn deprecated_gateways_basic(
context_path = "/v1/unstable/nym-nodes",
responses(
(status = 200, content(
(CachedNodesResponse<SkimmedNodeV1> = "application/json"),
(CachedNodesResponse<SkimmedNodeV1> = "application/yaml"),
(CachedNodesResponse<SkimmedNodeV1> = "application/bincode")
(CachedNodesResponse<SkimmedNode> = "application/json"),
(CachedNodesResponse<SkimmedNode> = "application/yaml"),
(CachedNodesResponse<SkimmedNode> = "application/bincode")
))
),
)]
@@ -70,7 +70,7 @@ pub(crate) async fn deprecated_gateways_basic(
pub(crate) async fn deprecated_mixnodes_basic(
state: State<AppState>,
query_params: Query<NodesParams>,
) -> AxumResult<FormattedResponse<CachedNodesResponse<SkimmedNodeV1>>> {
) -> AxumResult<FormattedResponse<CachedNodesResponse<SkimmedNode>>> {
let output = query_params.output.unwrap_or_default();
// 1. call '/v1/unstable/nym-nodes/skimmed/mixnodes/active'
@@ -3,7 +3,7 @@
use crate::node_status_api::models::AxumResult;
use nym_api_requests::models::OffsetDateTimeJsonSchemaWrapper;
use nym_api_requests::nym_nodes::{PaginatedCachedNodesResponseV1, SkimmedNodeV1};
use nym_api_requests::nym_nodes::{PaginatedCachedNodesResponseV1, SkimmedNode};
use nym_api_requests::pagination::PaginatedResponse;
use nym_http_api_common::FormattedResponse;
use utoipa::ToSchema;
@@ -12,7 +12,7 @@ pub(crate) mod handlers;
pub(crate) mod helpers;
pub type PaginatedSkimmedNodes =
AxumResult<FormattedResponse<PaginatedCachedNodesResponseV1<SkimmedNodeV1>>>;
AxumResult<FormattedResponse<PaginatedCachedNodesResponseV1<SkimmedNode>>>;
pub(crate) use handlers::*;
@@ -21,6 +21,6 @@ pub(crate) use handlers::*;
#[schema(title = "PaginatedCachedNodesResponse")]
pub struct PaginatedCachedNodesResponseSchema {
pub refreshed_at: OffsetDateTimeJsonSchemaWrapper,
#[schema(value_type = SkimmedNodeV1)]
pub nodes: PaginatedResponse<SkimmedNodeV1>,
#[schema(value_type = SkimmedNode)]
pub nodes: PaginatedResponse<SkimmedNode>,
}
@@ -9,7 +9,7 @@ use axum::extract::{Query, State};
use nym_api_requests::models::{
NodeAnnotation, NymNodeDescriptionV2, OffsetDateTimeJsonSchemaWrapper,
};
use nym_api_requests::nym_nodes::{NodeRole, PaginatedCachedNodesResponseV2, SemiSkimmedNodeV1};
use nym_api_requests::nym_nodes::{NodeRole, PaginatedCachedNodesResponseV2, SemiSkimmedNode};
use nym_api_requests::pagination::PaginatedResponse;
use nym_http_api_common::FormattedResponse;
use nym_mixnet_contract_common::NodeId;
@@ -18,7 +18,7 @@ use std::collections::HashMap;
use utoipa::ToSchema;
pub type PaginatedSemiSkimmedNodes =
AxumResult<FormattedResponse<PaginatedCachedNodesResponseV2<SemiSkimmedNodeV1>>>;
AxumResult<FormattedResponse<PaginatedCachedNodesResponseV2<SemiSkimmedNode>>>;
//SW TODO : this is copied from skimmed nodes, surely we can do better than that
fn build_nym_nodes_response<'a, NI>(
@@ -27,7 +27,7 @@ fn build_nym_nodes_response<'a, NI>(
annotations: &HashMap<NodeId, NodeAnnotation>,
current_key_rotation: u32,
active_only: bool,
) -> Vec<SemiSkimmedNodeV1>
) -> Vec<SemiSkimmedNode>
where
NI: Iterator<Item = &'a NymNodeDescriptionV2> + 'a,
{
@@ -57,11 +57,11 @@ where
#[allow(dead_code)] // not dead, used in OpenAPI docs
#[derive(ToSchema)]
#[schema(title = "PaginatedCachedNodesExpandedV2ResponseSchema")]
pub struct PaginatedCachedNodesExpandedV2ResponseSchema {
#[schema(title = "PaginatedCachedNodesExpandedResponseSchema")]
pub struct PaginatedCachedNodesExpandedResponseSchema {
pub refreshed_at: OffsetDateTimeJsonSchemaWrapper,
#[schema(value_type = SemiSkimmedNodeV1)]
pub nodes: PaginatedResponse<SemiSkimmedNodeV1>,
#[schema(value_type = SemiSkimmedNode)]
pub nodes: PaginatedResponse<SemiSkimmedNode>,
}
/// Return all Nym Nodes and optionally legacy mixnodes/gateways (if `no-legacy` flag is not used)
@@ -75,9 +75,9 @@ pub struct PaginatedCachedNodesExpandedV2ResponseSchema {
context_path = "/v2/unstable/nym-nodes/semi-skimmed",
responses(
(status = 200, content(
(PaginatedCachedNodesExpandedV2ResponseSchema = "application/json"),
(PaginatedCachedNodesExpandedV2ResponseSchema = "application/yaml"),
(PaginatedCachedNodesExpandedV2ResponseSchema = "application/bincode")
(PaginatedCachedNodesExpandedResponseSchema = "application/json"),
(PaginatedCachedNodesExpandedResponseSchema = "application/yaml"),
(PaginatedCachedNodesExpandedResponseSchema = "application/bincode")
))
)
)]
@@ -9,7 +9,7 @@ use axum::extract::{Query, State};
use nym_api_requests::models::{
NodeAnnotation, NymNodeDescriptionV2, OffsetDateTimeJsonSchemaWrapper,
};
use nym_api_requests::nym_nodes::{NodeRole, PaginatedCachedNodesResponseV2, SkimmedNodeV1};
use nym_api_requests::nym_nodes::{NodeRole, PaginatedCachedNodesResponseV2, SkimmedNode};
use nym_http_api_common::Output;
use nym_mixnet_contract_common::{Interval, NodeId};
use nym_topology::CachedEpochRewardedSet;
@@ -23,7 +23,7 @@ fn build_nym_nodes_response<'a, NI>(
annotations: &HashMap<NodeId, NodeAnnotation>,
current_key_rotation: u32,
active_only: bool,
) -> Vec<SkimmedNodeV1>
) -> Vec<SkimmedNode>
where
NI: Iterator<Item = &'a NymNodeDescriptionV2> + 'a,
{
@@ -56,7 +56,7 @@ fn maybe_add_expires_header(
interval: Interval,
current_key_rotation: u32,
refreshed_at: OffsetDateTimeJsonSchemaWrapper,
nodes: Vec<SkimmedNodeV1>,
nodes: Vec<SkimmedNode>,
active_only: bool,
) -> PaginatedSkimmedNodes {
let base_response = output.to_response(
@@ -3,7 +3,7 @@
use crate::node_status_api::models::AxumResult;
use nym_api_requests::models::OffsetDateTimeJsonSchemaWrapper;
use nym_api_requests::nym_nodes::{PaginatedCachedNodesResponseV2, SkimmedNodeV1};
use nym_api_requests::nym_nodes::{PaginatedCachedNodesResponseV2, SkimmedNode};
use nym_api_requests::pagination::PaginatedResponse;
use nym_http_api_common::FormattedResponse;
use utoipa::ToSchema;
@@ -12,7 +12,7 @@ pub(crate) mod handlers;
pub(crate) mod helpers;
pub type PaginatedSkimmedNodes =
AxumResult<FormattedResponse<PaginatedCachedNodesResponseV2<SkimmedNodeV1>>>;
AxumResult<FormattedResponse<PaginatedCachedNodesResponseV2<SkimmedNode>>>;
pub(crate) use handlers::*;
@@ -21,6 +21,6 @@ pub(crate) use handlers::*;
#[schema(title = "PaginatedCachedNodesResponse")]
pub struct PaginatedCachedNodesResponseSchema {
pub refreshed_at: OffsetDateTimeJsonSchemaWrapper,
#[schema(value_type = SkimmedNodeV1)]
pub nodes: PaginatedResponse<SkimmedNodeV1>,
#[schema(value_type = SkimmedNode)]
pub nodes: PaginatedResponse<SkimmedNode>,
}
-11
View File
@@ -1,11 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::support::http::state::AppState;
use axum::Router;
pub(crate) mod nym_nodes;
pub(crate) fn unstable_routes_v3() -> Router<AppState> {
Router::new().nest("/nym-nodes", nym_nodes::routes())
}
@@ -1,19 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::support::http::state::AppState;
use crate::unstable_routes::v3::nym_nodes::semi_skimmed::nodes_expanded;
use axum::routing::get;
use axum::Router;
use tower_http::compression::CompressionLayer;
pub(crate) mod semi_skimmed;
pub(crate) fn routes() -> Router<AppState> {
Router::new()
.nest(
"/semi-skimmed",
Router::new().route("/", get(nodes_expanded)),
)
.layer(CompressionLayer::new())
}
@@ -1,108 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node_status_api::models::AxumResult;
use crate::support::http::state::AppState;
use crate::unstable_routes::helpers::refreshed_at;
use axum::extract::{Query, State};
use nym_api_requests::models::{
NodeAnnotation, NymNodeDescriptionV2, OffsetDateTimeJsonSchemaWrapper,
};
use nym_api_requests::nym_nodes::{NodeRole, PaginatedCachedNodesResponseV2, SemiSkimmedNodeV3};
use nym_api_requests::pagination::PaginatedResponse;
use nym_http_api_common::{FormattedResponse, OutputParams};
use nym_mixnet_contract_common::NodeId;
use nym_topology::CachedEpochRewardedSet;
use std::collections::HashMap;
use utoipa::ToSchema;
pub type PaginatedSemiSkimmedNodes =
AxumResult<FormattedResponse<PaginatedCachedNodesResponseV2<SemiSkimmedNodeV3>>>;
fn build_response<'a>(
rewarded_set: &CachedEpochRewardedSet,
nym_nodes: impl Iterator<Item = &'a NymNodeDescriptionV2>,
annotations: &HashMap<NodeId, NodeAnnotation>,
current_key_rotation: u32,
) -> Vec<SemiSkimmedNodeV3> {
let mut nodes = Vec::new();
for nym_node in nym_nodes {
let node_id = nym_node.node_id;
let role: NodeRole = rewarded_set.role(node_id).into();
// honestly, not sure under what exact circumstances this value could be missing,
// but in that case just use 0 performance
let annotation = annotations.get(&node_id).copied().unwrap_or_default();
nodes.push(nym_node.to_semi_skimmed_node_v3(
current_key_rotation,
role,
annotation.last_24h_performance,
));
}
nodes
}
#[allow(dead_code)] // not dead, used in OpenAPI docs
#[derive(ToSchema)]
#[schema(title = "PaginatedCachedNodesExpandedV3ResponseSchema")]
pub struct PaginatedCachedNodesExpandedV3ResponseSchema {
pub refreshed_at: OffsetDateTimeJsonSchemaWrapper,
#[schema(value_type = SemiSkimmedNodeV3)]
pub nodes: PaginatedResponse<SemiSkimmedNodeV3>,
}
/// Return all Nym Nodes that are currently bonded.
#[utoipa::path(
operation_id = "v3_nodes_expanded",
tag = "Unstable Nym Nodes v3",
get,
params(OutputParams),
path = "/semi-skimmed",
context_path = "/v3/unstable/nym-nodes",
responses(
(status = 200, content(
(PaginatedCachedNodesExpandedV3ResponseSchema = "application/json"),
(PaginatedCachedNodesExpandedV3ResponseSchema = "application/yaml"),
(PaginatedCachedNodesExpandedV3ResponseSchema = "application/bincode")
))
)
)]
pub(super) async fn nodes_expanded(
state: State<AppState>,
Query(output): Query<OutputParams>,
) -> PaginatedSemiSkimmedNodes {
// 1. grab all relevant described nym-nodes
let rewarded_set = state.rewarded_set().await?;
let describe_cache = state.describe_nodes_cache_data().await?;
let all_nym_nodes = describe_cache.all_nym_nodes();
let status_cache = &state.node_status_cache();
let annotations = status_cache.node_annotations().await?;
let contract_cache = state.nym_contract_cache();
let current_key_rotation = contract_cache.current_key_rotation_id().await?;
let interval = contract_cache.current_interval().await?;
let nodes = build_response(
&rewarded_set,
all_nym_nodes,
&annotations,
current_key_rotation,
);
// min of all caches
let refreshed_at = refreshed_at([
rewarded_set.timestamp(),
status_cache.cache_timestamp().await,
describe_cache.timestamp(),
]);
Ok(output.to_response(PaginatedCachedNodesResponseV2::new_full(
interval.current_epoch_absolute_id(),
current_key_rotation,
refreshed_at,
nodes,
)))
}
+8 -3
View File
@@ -172,11 +172,13 @@ pub async fn lp_registration_probe(
let mut lp_outcome = LpProbeResults::default();
// Generate X25519 keypair for this connection
// Generate Ed25519 keypair for this connection (X25519 will be derived internally by LP)
let mut rng09 = rand09::rngs::StdRng::from_os_rng();
let client_x25519_keypair = Arc::new(DHKeyPair::new(&mut rng09));
// Create LP registration client
// Step 0: Derive X25519 keys from Ed25519 for the gateways
// Create LP registration client (uses Ed25519 keys directly, derives X25519 internally)
let mut client = LpRegistrationClient::<TcpStream>::new_with_default_config(
client_x25519_keypair,
peer,
@@ -210,13 +212,16 @@ pub async fn lp_registration_probe(
let mut rng = rand::thread_rng();
let wg_keypair = nym_crypto::asymmetric::x25519::KeyPair::new(&mut rng);
// Convert gateway identity to ed25519 public key
let gateway_ed25519_pubkey = gateway_identity;
// Register using the new packet-per-connection API (returns GatewayData directly)
let ticket_type = TicketType::V1WireguardEntry;
let gateway_data = match client
.register_dvpn(
&mut rng09,
&wg_keypair,
&gateway_identity,
&gateway_ed25519_pubkey,
bandwidth_controller,
ticket_type,
)
@@ -10,7 +10,7 @@ use nym_crypto::asymmetric::{ed25519, x25519};
use nym_network_defaults::DEFAULT_NYM_NODE_HTTP_PORT;
use nym_node_requests::api::v1::node::models::NodeDescription;
use nym_validator_client::{
client::NymNodeDetails, models::NymNodeDescriptionV1, nym_api::SkimmedNodeV1,
client::NymNodeDetails, models::NymNodeDescriptionV1, nym_api::SkimmedNode,
};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
@@ -462,7 +462,7 @@ pub(crate) struct NymNodeInsertRecord {
impl NymNodeInsertRecord {
pub fn new(
skimmed_node: SkimmedNodeV1,
skimmed_node: SkimmedNode,
bond_info: Option<&NymNodeDetails>,
self_described: Option<&NymNodeDescriptionV1>,
) -> anyhow::Result<Self> {
@@ -503,7 +503,7 @@ impl NymNodeInsertRecord {
}
}
impl TryFrom<NymNodeDto> for SkimmedNodeV1 {
impl TryFrom<NymNodeDto> for SkimmedNode {
type Error = anyhow::Error;
fn try_from(other: NymNodeDto) -> Result<Self, Self::Error> {
@@ -517,7 +517,7 @@ impl TryFrom<NymNodeDto> for SkimmedNodeV1 {
None => None,
};
let skimmed_node = SkimmedNodeV1 {
let skimmed_node = SkimmedNode {
node_id,
ed25519_identity_pubkey: ed25519::PublicKey::from_base58_string(
other.ed25519_identity_pubkey,
@@ -10,7 +10,7 @@ use crate::{
utils::now_utc,
};
use anyhow::Result;
use nym_validator_client::nym_api::SkimmedNodeV1;
use nym_validator_client::nym_api::SkimmedNode;
pub(crate) async fn get_nodes_for_scraping(pool: &DbPool) -> Result<Vec<ScraperNodeInfo>> {
let mut nodes_to_scrape = Vec::new();
@@ -24,7 +24,7 @@ pub(crate) async fn get_nodes_for_scraping(pool: &DbPool) -> Result<Vec<ScraperN
nodes_dto.into_iter().filter_map(|node_dto| {
let node_id = node_dto.node_id;
let http_api_port = node_dto.http_api_port;
match SkimmedNodeV1::try_from(node_dto) {
match SkimmedNode::try_from(node_dto) {
Ok(node) => Some((node, http_api_port)),
Err(e) => {
tracing::error!("Failed to decode node_id={}: {}", node_id, e);
@@ -141,7 +141,7 @@ mod db_tests {
http_api_port: None,
};
let skimmed_node: nym_validator_client::nym_api::SkimmedNodeV1 =
let skimmed_node: nym_validator_client::nym_api::SkimmedNode =
nym_node_dto.try_into().unwrap();
assert_eq!(skimmed_node.node_id, 1);
@@ -174,7 +174,7 @@ fn test_nym_node_insert_record_new() {
let ed25519_pk = nym_crypto::asymmetric::ed25519::PublicKey::from_bytes(&[1; 32]).unwrap();
let x25519_pk = nym_crypto::asymmetric::x25519::PublicKey::from_bytes(&[2; 32]).unwrap();
let skimmed_node = nym_validator_client::nym_api::SkimmedNodeV1 {
let skimmed_node = nym_validator_client::nym_api::SkimmedNode {
node_id: 1,
ed25519_identity_pubkey: ed25519_pk,
ip_addresses: vec!["1.1.1.1".parse().unwrap()],
@@ -226,7 +226,7 @@ fn test_nym_node_insert_record_with_entry() {
let ed25519_pk = nym_crypto::asymmetric::ed25519::PublicKey::from_bytes(&[1; 32]).unwrap();
let x25519_pk = nym_crypto::asymmetric::x25519::PublicKey::from_bytes(&[2; 32]).unwrap();
let skimmed_node = nym_validator_client::nym_api::SkimmedNodeV1 {
let skimmed_node = nym_validator_client::nym_api::SkimmedNode {
node_id: 1,
ed25519_identity_pubkey: ed25519_pk,
ip_addresses: vec!["1.1.1.1".parse().unwrap()],
@@ -438,7 +438,7 @@ fn test_nym_node_dto_with_invalid_keys() {
http_api_port: None,
};
let result: Result<nym_validator_client::nym_api::SkimmedNodeV1, _> = nym_node_dto.try_into();
let result: Result<nym_validator_client::nym_api::SkimmedNode, _> = nym_node_dto.try_into();
assert!(result.is_err());
assert!(
result
@@ -476,7 +476,7 @@ fn test_nym_node_dto_with_invalid_performance() {
http_api_port: None,
};
let result: Result<nym_validator_client::nym_api::SkimmedNodeV1, _> = nym_node_dto.try_into();
let result: Result<nym_validator_client::nym_api::SkimmedNode, _> = nym_node_dto.try_into();
assert!(result.is_err());
assert!(
result
@@ -12,7 +12,7 @@ use nym_node_requests::api::v1::node::models::NodeDescription;
use nym_validator_client::{
client::NodeId,
models::{AuthenticatorDetailsV1, BinaryBuildInformationOwned, IpPacketRouterDetailsV1},
nym_api::SkimmedNodeV1,
nym_api::SkimmedNode,
nym_nodes::{BasicEntryInformation, NodeRole},
};
use serde::{Deserialize, Serialize};
@@ -139,7 +139,7 @@ impl DVpnGateway {
#[instrument(level = tracing::Level::INFO, name = "dvpn_gw_new", skip_all, fields(gateway_key = gateway.gateway_identity_key, node_id = skimmed_node.node_id))]
pub(crate) fn new(
gateway: Gateway,
skimmed_node: &SkimmedNodeV1,
skimmed_node: &SkimmedNode,
socks5_score: Option<&ScoreValue>,
) -> anyhow::Result<Self> {
let location = gateway
@@ -6,7 +6,7 @@ use nym_contracts_common::NaiveFloat;
use nym_crypto::asymmetric::ed25519::PublicKey;
use nym_mixnet_contract_common::NodeId;
use nym_node_status_client::auth::VerifiableRequest;
use nym_validator_client::nym_api::SkimmedNodeV1;
use nym_validator_client::nym_api::SkimmedNode;
use semver::Version;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, net::IpAddr, sync::Arc, time::Duration};
@@ -304,7 +304,7 @@ impl HttpCache {
Ok(records) => {
let mut nodes = HashMap::new();
for dto in records {
match SkimmedNodeV1::try_from(dto) {
match SkimmedNode::try_from(dto) {
Ok(skimmed_node) => {
let key =
skimmed_node.ed25519_identity_pubkey.to_base58_string();
@@ -671,7 +671,7 @@ async fn aggregate_node_info_from_db(
let skimmed_nodes = queries::get_all_nym_nodes(pool).await.map(|records| {
records
.into_iter()
.filter_map(|dto| SkimmedNodeV1::try_from(dto).ok())
.filter_map(|dto| SkimmedNode::try_from(dto).ok())
.map(|skimmed_node| (skimmed_node.node_id, skimmed_node))
.collect::<HashMap<_, _>>()
})?;
@@ -12,7 +12,7 @@ use moka::future::Cache;
use nym_network_defaults::NymNetworkDetails;
use nym_validator_client::{
QueryHttpRpcNyxdClient,
nym_nodes::{NodeRole, SkimmedNodeV1},
nym_nodes::{NodeRole, SkimmedNode},
};
use nym_validator_client::{
client::{NodeId, NymApiClientExt, NymNodeDetails},
@@ -308,7 +308,7 @@ impl Monitor {
fn prepare_nym_node_data(
&self,
skimmed_nodes: Vec<SkimmedNodeV1>,
skimmed_nodes: Vec<SkimmedNode>,
bonded_node_info: &HashMap<NodeId, NymNodeDetails>,
described_nodes: &HashMap<NodeId, NymNodeDescriptionV1>,
) -> Vec<NymNodeInsertRecord> {
@@ -336,7 +336,7 @@ impl Monitor {
async fn prepare_gateway_data(
&mut self,
described_gateways: &[&NymNodeDescriptionV1],
skimmed_gateways: &[SkimmedNodeV1],
skimmed_gateways: &[SkimmedNode],
bonded_nodes: &HashMap<NodeId, NymNodeDetails>,
) -> anyhow::Result<Vec<GatewayInsertRecord>> {
let mut gateway_records = Vec::new();
+3 -2
View File
@@ -13,6 +13,8 @@ license = "GPL-3.0"
publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
path = "src/lib.rs"
[dependencies]
async-trait = { workspace = true }
@@ -115,7 +117,7 @@ nym-network-requester = { path = "../service-providers/network-requester" }
nym-ip-packet-router = { path = "../service-providers/ip-packet-router" }
# LP dependencies
nym-lp = { workspace = true }
nym-lp = { path = "../common/nym-lp" }
nym-registration-common = { path = "../common/registration" }
bincode = { workspace = true }
@@ -139,7 +141,6 @@ harness = false
cargo_metadata = { workspace = true }
[dev-dependencies]
nym-lp = { workspace = true, features = ["mock"] }
criterion = { workspace = true, features = ["async_tokio"] }
nym-test-utils = { workspace = true }
+7 -37
View File
@@ -16,14 +16,7 @@ pub struct NetworkStats {
// the call stack
active_egress_mixnet_connections: Arc<AtomicUsize>,
// incoming LP control connections from clients
active_lp_ingress_client_connections: AtomicUsize,
// incoming LP control connections from nodes
active_lp_ingress_node_connections: AtomicUsize,
// outgoing LP control connections to nodes
active_lp_egress_node_connections: AtomicUsize,
active_lp_connections: AtomicUsize,
}
impl NetworkStats {
@@ -66,38 +59,15 @@ impl NetworkStats {
.load(Ordering::Relaxed)
}
pub fn new_ingress_lp_client_connection(&self) {
self.active_lp_ingress_client_connections
.fetch_add(1, Ordering::Relaxed);
pub fn new_lp_connection(&self) {
self.active_lp_connections.fetch_add(1, Ordering::Relaxed);
}
pub fn closed_ingress_lp_client_connection(&self) {
self.active_lp_ingress_client_connections
.fetch_sub(1, Ordering::Relaxed);
pub fn lp_connection_closed(&self) {
self.active_lp_connections.fetch_sub(1, Ordering::Relaxed);
}
pub fn new_ingress_lp_node_connection(&self) {
self.active_lp_ingress_node_connections
.fetch_add(1, Ordering::Relaxed);
}
pub fn closed_ingress_lp_node_connection(&self) {
self.active_lp_ingress_node_connections
.fetch_sub(1, Ordering::Relaxed);
}
pub fn new_egress_lp_node_connection(&self) {
self.active_lp_egress_node_connections
.fetch_add(1, Ordering::Relaxed);
}
pub fn closed_egress_lp_node_connection(&self) {
self.active_lp_egress_node_connections
.fetch_sub(1, Ordering::Relaxed);
}
pub fn active_lp_client_connections_count(&self) -> usize {
self.active_lp_ingress_client_connections
.load(Ordering::Relaxed)
pub fn active_lp_connections_count(&self) -> usize {
self.active_lp_connections.load(Ordering::Relaxed)
}
}
-150
View File
@@ -1,150 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::config::LpDebug;
use crate::node::lp::state::ActiveLpSessions;
use nym_metrics::inc_by;
use std::time::Duration;
use tracing::{debug, info};
/// Wrapper for state entries with timestamp tracking for cleanup
///
/// This wrapper adds `created_at` and `last_activity` timestamps to state entries,
/// enabling TTL-based cleanup of stale handshakes and sessions.
pub struct TimestampedState<T> {
/// The actual state (LpStateMachine or LpSession)
pub state: T,
/// When this state was created (never changes)
created_at: std::time::Instant,
/// Last activity timestamp (unix seconds, atomically updated)
///
/// For handshakes: never updated (use created_at for TTL)
/// For sessions: updated on every packet received
last_activity: std::sync::atomic::AtomicU64,
}
impl<T> TimestampedState<T> {
/// Create a new timestamped state
pub fn new(state: T) -> Self {
let now_instant = std::time::Instant::now();
let now_unix = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
Self {
state,
created_at: now_instant,
last_activity: std::sync::atomic::AtomicU64::new(now_unix),
}
}
/// Update last_activity timestamp (cheap, lock-free operation)
pub fn touch(&self) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.last_activity
.store(now, std::sync::atomic::Ordering::Relaxed);
}
/// Get age since creation
#[allow(dead_code)]
pub fn age(&self) -> Duration {
self.created_at.elapsed()
}
/// Get time since last activity
pub fn since_activity(&self) -> Duration {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let last = self
.last_activity
.load(std::sync::atomic::Ordering::Relaxed);
Duration::from_secs(now.saturating_sub(last))
}
}
pub(crate) struct CleanupTask {
session_states: ActiveLpSessions,
cfg: LpDebug,
shutdown: nym_task::ShutdownToken,
}
impl CleanupTask {
pub fn new(
session_states: ActiveLpSessions,
cfg: LpDebug,
shutdown: nym_task::ShutdownToken,
) -> Self {
CleanupTask {
session_states,
cfg,
shutdown,
}
}
fn perform_cleanup(&self) {
let session_ttl = self.cfg.session_ttl;
let start = std::time::Instant::now();
let mut ss_removed = 0u64;
// Remove stale sessions (based on time since last activity)
// Use shorter TTL for demoted (ReadOnlyTransport) sessions
self.session_states.sessions.retain(|_, timestamped| {
if timestamped.since_activity() > session_ttl {
ss_removed += 1;
false
} else {
true
}
});
if ss_removed > 0 {
let duration = start.elapsed();
info!(
"LP state cleanup: {ss_removed} sessions (took {:.3}s)",
duration.as_secs_f64()
);
// Track metrics
if ss_removed > 0 {
inc_by!("lp_states_cleanup_session_removed", ss_removed as i64);
}
}
}
/// Background loop for cleaning up stale state entries
///
/// Runs periodically to scan handshake_states and session_states maps,
/// removing entries that have exceeded their TTL.
///
/// Demoted sessions (ReadOnlyTransport) use shorter TTL since they
/// only need to drain in-flight packets after subsession promotion.
pub(crate) async fn run(&self) {
let interval = self.cfg.state_cleanup_interval;
let mut cleanup_interval = tokio::time::interval(interval);
loop {
tokio::select! {
biased;
_ = self.shutdown.cancelled() => {
debug!("LP state cleanup task: received shutdown signal");
break;
}
_ = cleanup_interval.tick() => {
self.perform_cleanup();
}
}
}
info!("LP state cleanup task shutdown complete");
}
}
@@ -1,168 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::lp::control::LpConnectionStats;
use crate::node::lp::directory::LpNodeDetails;
use crate::node::lp::error::LpHandlerError;
use crate::node::lp::forwarding::client_connection::NestedClientConnectionSender;
use crate::node::lp::state::SharedLpNodeControlState;
use nym_lp::LpTransportSession;
use nym_lp::peer_config::LpReceiverIndex;
use nym_lp::transport::{LpHandshakeChannel, LpTransportChannel};
use std::collections::HashMap;
use std::net::SocketAddr;
use tracing::{debug, warn};
pub(crate) type NestedNodeConnectionSender = ();
pub(crate) type NestedNodeConnectionReceiver = ();
pub(crate) type NestedNodeControlSender = ();
pub(crate) type NestedNodeControlReceiver = ();
/// Initial connection handler for an egress LP node before completing the KKT/PSQ handshake.
pub struct InitialLpEgressNodeConnectionHandler<S> {
stream: S,
remote_addr: SocketAddr,
responder_details: LpNodeDetails,
state: SharedLpNodeControlState,
stats: LpConnectionStats,
}
impl<S> InitialLpEgressNodeConnectionHandler<S>
where
S: LpHandshakeChannel + LpHandshakeChannel + Unpin,
{
pub(crate) fn new(
stream: S,
remote_addr: SocketAddr,
responder_details: LpNodeDetails,
state: SharedLpNodeControlState,
) -> Self {
Self {
stream,
remote_addr,
responder_details,
state,
stats: LpConnectionStats::new(),
}
}
pub(crate) async fn complete_initial_handshake(
mut self,
) -> Option<Result<LpTransportSession, LpHandlerError>> {
let remote = self.remote_addr;
if self.responder_details.kem_key_hashes.is_empty() {
return Some(Err(LpHandlerError::MissingNodeKEMKeyHashes {
node_ip: self.remote_addr.ip(),
node_id: self.responder_details.node_id,
}));
}
// 1. complete KKT/PSQ handshake before doing anything else.
// bail if it takes too long
let timeout = self.state.shared.lp_config.debug.handshake_ttl;
let stream = &mut self.stream;
let handshake_state = match LpTransportSession::psq_handshake_initiator_mutual_internode(
stream,
self.state.local_lp_peer.clone(),
self.responder_details.to_lp_peer(),
self.responder_details.supported_protocol,
) {
Ok(handshake_state) => handshake_state,
Err(err) => {
debug!("failed to initiate mutual KTT/PSQ handshake with {remote}: {err}");
self.stats.emit_lifecycle_node_metrics(false);
return None;
}
};
let session = match tokio::time::timeout(timeout, handshake_state.complete_handshake())
.await
{
Err(_timeout) => {
debug!("timed out attempting to complete mutual KTT/PSQ handshake with {remote}");
self.stats.emit_lifecycle_node_metrics(false);
return None;
}
Ok(Err(handshake_failure)) => {
debug!(
"failed to complete mutual KKT/PSQ handshake with {remote}: {handshake_failure}"
);
self.stats.emit_lifecycle_node_metrics(false);
return None;
}
Ok(Ok(session)) => session,
};
debug!(
"completed egress KKT/PSQ handshake with node {}: {remote}",
self.responder_details.node_id
);
// TODO: change return type into complete handler
Some(Ok(session))
}
}
pub(crate) struct NestedNodeConnectionHandler<S> {
/// Persistent connection to exit gateway for forwarding.
/// Currently, it uses raw TCP socket, later it will be wrapped with dedicated PSQ tunnel
exit_stream: S,
/// Socket address of the remote of the established stream
exit_address: SocketAddr,
/// Map of senders to each known client handle (based on the inner receiver index)
client_handles: HashMap<LpReceiverIndex, NestedClientConnectionSender>,
/// Channel for receiving requests that are to be forwarded into the exit stream
data_receiver: NestedNodeConnectionReceiver,
/// Channel for adding new client handle and handling control requests from `NestedConnectionsController`
control_receiver: NestedNodeControlReceiver,
// client_streams: HashMap<StreamId, LpClientStream>,
}
impl<S> NestedNodeConnectionHandler<S>
where
S: LpTransportChannel + Unpin,
{
/// Attempt to extract outer receiver index from the received message
/// (that is meant to be an `LpPacket`)
fn extract_receiver_index(&self, raw: &[u8]) -> Option<LpReceiverIndex> {
if raw.len() < 4 {
return None;
}
Some(LpReceiverIndex::from_le_bytes([
raw[0], raw[1], raw[2], raw[3],
]))
}
/// Attempt to forward received packet to the client that established the inner LP session
async fn handle_exit_packet(&self, packet: Vec<u8>) {
let Some(receiver_index) = self.extract_receiver_index(&packet) else {
warn!("{} has sent us an invalid LP packet", self.exit_address);
return;
};
let Some(client_handle) = self.client_handles.get(&receiver_index) else {
warn!(
"no client handle for receiver index {receiver_index} received from {}",
self.exit_address
);
return;
};
// client_handle.send(packet).await;
}
async fn run(&mut self) {
// loop {
// tokio::select! {
//
// }
// }
}
}
@@ -1,4 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
pub mod connection;
@@ -1,188 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::config::LpConfig;
use crate::error::NymNodeError;
use crate::node::lp::control::ingress::client_handler::LpClientConnectionHandler;
use crate::node::lp::control::ingress::node_handler::InitialLpIngressNodeConnectionHandler;
use crate::node::lp::directory::LpNodeDetails;
use crate::node::lp::state::{SharedLpClientControlState, SharedLpNodeControlState};
use nym_task::ShutdownTracker;
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tracing::{debug, error, info, trace, warn};
/// LP listener that accepts TCP connections on port 41264
pub struct LpControlListener {
/// Address to bind to
bind_address: SocketAddr,
/// Shared state for clients connection handlers
clients_handler_state: SharedLpClientControlState,
/// Shared state for nodes connection handlers
nodes_handler_state: SharedLpNodeControlState,
/// Shutdown coordination
shutdown: ShutdownTracker,
}
impl LpControlListener {
pub fn new(
bind_address: SocketAddr,
clients_handler_state: SharedLpClientControlState,
nodes_handler_state: SharedLpNodeControlState,
shutdown: ShutdownTracker,
) -> Self {
Self {
bind_address,
clients_handler_state,
nodes_handler_state,
shutdown,
}
}
fn lp_config(&self) -> LpConfig {
self.clients_handler_state.shared.lp_config
}
pub async fn run(&mut self) -> Result<(), NymNodeError> {
let bind_address = self.bind_address;
info!("Starting LP control listener on {bind_address}");
let listener = TcpListener::bind(bind_address).await.map_err(|source| {
error!("Failed to bind LP listener to {bind_address}: {source}",);
NymNodeError::LpBindFailure {
address: bind_address,
source,
}
})?;
let shutdown_token = self.shutdown.clone_shutdown_token();
loop {
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
trace!("LP listener: received shutdown signal");
break;
}
result = listener.accept() => {
match result {
Ok((stream, addr)) => self.handle_connection(stream, addr),
Err(e) => warn!("Failed to accept LP connection: {e}")
}
}
}
}
info!("LP listener shutdown complete");
Ok(())
}
fn handle_node_connection(
&self,
stream: tokio::net::TcpStream,
remote_addr: SocketAddr,
initiator_details: LpNodeDetails,
) {
debug!("Accepting LP node connection from {remote_addr}");
// Spawn handler task
let mut handler = InitialLpIngressNodeConnectionHandler::new(
stream,
remote_addr,
initiator_details,
self.nodes_handler_state.clone(),
);
self.shutdown.try_spawn_named_with_shutdown(
async move {
let metrics = handler.metrics().clone();
// Increment connection counter
metrics.network.new_ingress_lp_node_connection();
let result = handler.handle().await;
// Decrement connection counter
metrics.network.closed_ingress_lp_node_connection();
// Handler emits lifecycle metrics internally on success
// For errors, we need to emit them here since handler is consumed
if let Err(e) = result {
warn!("LP node handler error for {remote_addr}: {e}");
// Note: metrics are emitted in handle() for graceful path
// On error path, handle() returns early without emitting
// So we track errors here
}
},
&format!("LP_NODE::{remote_addr}"),
);
}
fn handle_client_connection(&self, stream: tokio::net::TcpStream, remote_addr: SocketAddr) {
// Check connection limit (only for clients, nodes must always be allowed regardless of the limit)
let active_connections = self.active_client_connections();
let max_connections = self.lp_config().debug.max_connections;
if active_connections >= max_connections {
warn!(
"LP connection limit exceeded ({active_connections}/{max_connections}), rejecting connection from {remote_addr}"
);
return;
}
debug!(
"Accepting LP client connection from {remote_addr} ({active_connections} active connections)"
);
// Spawn handler task
let mut handler =
LpClientConnectionHandler::new(stream, remote_addr, self.clients_handler_state.clone());
self.shutdown.try_spawn_named_with_shutdown(
async move {
// Increment connection counter
handler.metrics().network.new_ingress_lp_client_connection();
let result = handler.handle().await;
// Decrement connection counter
handler
.metrics()
.network
.closed_ingress_lp_client_connection();
// Handler emits lifecycle metrics internally on success
// For errors, we need to emit them here since handler is consumed
if let Err(e) = result {
warn!("LP client handler error for {remote_addr}: {e}");
// Note: metrics are emitted in handle() for graceful path
// On error path, handle() returns early without emitting
// So we track errors here
}
},
&format!("LP_CLIENT::{remote_addr}"),
);
}
fn handle_connection(&self, stream: tokio::net::TcpStream, remote_addr: SocketAddr) {
if let Some(initiator_details) = self
.nodes_handler_state
.nodes
.get_node_details(remote_addr.ip())
{
self.handle_node_connection(stream, remote_addr, initiator_details);
} else {
self.handle_client_connection(stream, remote_addr);
}
}
fn active_client_connections(&self) -> usize {
self.clients_handler_state
.shared
.metrics
.network
.active_lp_client_connections_count()
}
}
@@ -1,6 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod client_handler;
pub(crate) mod listener;
pub mod node_handler;
@@ -1,148 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::node::lp::control::LpConnectionStats;
use crate::node::lp::directory::LpNodeDetails;
use crate::node::lp::error::LpHandlerError;
use crate::node::lp::state::SharedLpNodeControlState;
use nym_lp::LpTransportSession;
use nym_lp::transport::{LpHandshakeChannel, LpTransportChannel};
use nym_metrics::inc;
use nym_node_metrics::NymNodeMetrics;
use nym_topology::NodeId;
use std::net::SocketAddr;
use tokio::net::TcpStream;
use tracing::debug;
/// Initial connection handler for an ingress LP node before completing the KKT/PSQ handshake.
pub struct InitialLpIngressNodeConnectionHandler<S = TcpStream> {
stream: S,
remote_addr: SocketAddr,
initiator_details: LpNodeDetails,
state: SharedLpNodeControlState,
stats: LpConnectionStats,
}
impl<S> InitialLpIngressNodeConnectionHandler<S>
where
S: LpHandshakeChannel + LpTransportChannel + Unpin,
{
pub fn new(
stream: S,
remote_addr: SocketAddr,
initiator_details: LpNodeDetails,
state: SharedLpNodeControlState,
) -> Self {
Self {
stream,
remote_addr,
initiator_details,
state,
stats: LpConnectionStats::new(),
}
}
pub(crate) fn metrics(&self) -> &NymNodeMetrics {
&self.state.shared.metrics
}
pub(crate) async fn complete_initial_handshake(
mut self,
) -> Option<Result<LpIngressNodeConnectionHandler<S>, LpHandlerError>> {
let remote = self.remote_addr;
if self.initiator_details.kem_key_hashes.is_empty() {
return Some(Err(LpHandlerError::MissingNodeKEMKeyHashes {
node_ip: self.remote_addr.ip(),
node_id: self.initiator_details.node_id,
}));
}
// 1. complete KKT/PSQ handshake before doing anything else.
// bail if it takes too long
let timeout = self.state.shared.lp_config.debug.handshake_ttl;
let local_peer = self.state.local_lp_peer.clone();
let stream = &mut self.stream;
let kem_hashes = self.initiator_details.kem_key_hashes.clone();
let session = match tokio::time::timeout(timeout, async move {
LpTransportSession::psq_handshake_responder_mutual(stream, local_peer, kem_hashes)
.complete_handshake()
.await
})
.await
{
Err(_timeout) => {
debug!("timed out attempting to complete mutual KTT/PSQ handshake with {remote}");
self.stats.emit_lifecycle_node_metrics(false);
return None;
}
Ok(Err(handshake_failure)) => {
debug!(
"failed to complete mutual KKT/PSQ handshake with {remote}: {handshake_failure}"
);
self.stats.emit_lifecycle_node_metrics(false);
return None;
}
Ok(Ok(session)) => session,
};
debug!(
"completed ingress KKT/PSQ handshake with node {}: {remote}",
self.initiator_details.node_id
);
Some(Ok(LpIngressNodeConnectionHandler {
stream: self.stream,
remote_addr: remote,
remote_node_id: self.initiator_details.node_id,
state: self.state,
stats: self.stats,
transport_session: session,
}))
}
pub async fn handle(mut self) -> Result<(), LpHandlerError> {
// Track total LP connections handled
inc!("lp_node_connections_total");
// attempt to complete initial handshake
let upgraded_handler = match self.complete_initial_handshake().await {
None => return Ok(()),
Some(handler_res) => handler_res?,
};
// continue handling the requests with the transport session
upgraded_handler.handle().await
}
}
/// Connection handler for an LP node after completing the KKT/PSQ handshake.
pub struct LpIngressNodeConnectionHandler<S = TcpStream> {
stream: S,
remote_addr: SocketAddr,
remote_node_id: NodeId,
state: SharedLpNodeControlState,
stats: LpConnectionStats,
transport_session: LpTransportSession,
// LOCAL receiver index to stream id
// client_streams: HashMap<ReceiverIndex, ClientStreamId>,
}
impl<S> LpIngressNodeConnectionHandler<S>
where
S: LpHandshakeChannel + LpTransportChannel + Unpin,
{
async fn handle(mut self) -> Result<(), LpHandlerError> {
// handle all the forwarding here
self.stats.emit_lifecycle_node_metrics(true);
Ok(())
}
pub(crate) fn transport_session(&self) -> &LpTransportSession {
&self.transport_session
}
}
-119
View File
@@ -1,119 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use nym_metrics::{add_histogram_obs, inc, inc_by};
pub mod egress;
pub mod ingress;
mod tests;
// Histogram buckets for LP operation duration (legacy - used by unused forwarding methods)
const LP_DURATION_BUCKETS: &[f64] = &[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0];
// Histogram buckets for LP connection lifecycle duration
// LP connections can be very short (registration only: ~1s) or very long (dVPN sessions: hours/days)
// Covers full range from seconds to 24 hours
const LP_CONNECTION_DURATION_BUCKETS: &[f64] = &[
1.0, // 1 second
5.0, // 5 seconds
10.0, // 10 seconds
30.0, // 30 seconds
60.0, // 1 minute
300.0, // 5 minutes
600.0, // 10 minutes
1800.0, // 30 minutes
3600.0, // 1 hour
7200.0, // 2 hours
14400.0, // 4 hours
28800.0, // 8 hours
43200.0, // 12 hours
86400.0, // 24 hours
];
/// Connection lifecycle statistics tracking
pub(crate) struct LpConnectionStats {
/// When the connection started
start_time: std::time::Instant,
/// Total bytes received (including protocol framing)
bytes_received: u64,
/// Total bytes sent (including protocol framing)
bytes_sent: u64,
}
impl LpConnectionStats {
fn new() -> Self {
Self {
start_time: std::time::Instant::now(),
bytes_received: 0,
bytes_sent: 0,
}
}
fn duration(&self) -> std::time::Duration {
self.start_time.elapsed()
}
fn record_bytes_received(&mut self, bytes: usize) {
self.bytes_received += bytes as u64;
}
fn record_bytes_sent(&mut self, bytes: usize) {
self.bytes_sent += bytes as u64;
}
/// Emit connection lifecycle metrics for a client connection
fn emit_lifecycle_client_metrics(&self, graceful: bool) {
// Track connection duration
let duration = self.duration().as_secs_f64();
add_histogram_obs!(
"lp_client_connection_duration_seconds",
duration,
LP_CONNECTION_DURATION_BUCKETS
);
// Track bytes transferred
inc_by!(
"lp_client_connection_bytes_received_total",
self.bytes_received as i64
);
inc_by!(
"lp_client_connection_bytes_sent_total",
self.bytes_sent as i64
);
// Track completion type
if graceful {
inc!("lp_client_connections_completed_gracefully");
} else {
inc!("lp_client_connections_completed_with_error");
}
}
/// Emit connection lifecycle metrics for a node connection
fn emit_lifecycle_node_metrics(&self, graceful: bool) {
// Track connection duration
let duration = self.duration().as_secs_f64();
add_histogram_obs!(
"lp_node_connection_duration_seconds",
duration,
LP_CONNECTION_DURATION_BUCKETS
);
// Track bytes transferred
inc_by!(
"lp_node_connection_bytes_received_total",
self.bytes_received as i64
);
inc_by!(
"lp_node_connection_bytes_sent_total",
self.bytes_sent as i64
);
// Track completion type
if graceful {
inc!("lp_node_connections_completed_gracefully");
} else {
inc!("lp_node_connections_completed_with_error");
}
}
}
-89
View File
@@ -1,89 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
#[cfg(test)]
mod tests {
use crate::node::lp::SharedLpState;
use crate::node::lp::control::egress::connection::InitialLpEgressNodeConnectionHandler;
use crate::node::lp::control::ingress::node_handler::InitialLpIngressNodeConnectionHandler;
use crate::node::lp::directory::LpNodeDetails;
use crate::node::lp::state::SharedLpNodeControlState;
use anyhow::Context;
use nym_lp::packet::version;
use nym_lp::peer::{LpLocalPeer, LpRemotePeer, mock_peers};
use nym_test_utils::helpers::seeded_rng;
use nym_test_utils::mocks::async_read_write::MockIOStream;
use nym_test_utils::traits::TimeboxedSpawnable;
use rand::RngCore;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
fn shared_node_state(peer: LpLocalPeer) -> SharedLpNodeControlState {
SharedLpNodeControlState {
local_lp_peer: peer,
nodes: Default::default(),
shared: SharedLpState {
metrics: Default::default(),
lp_config: Default::default(),
session_states: Default::default(),
},
}
}
fn lp_node_details(peer: LpRemotePeer) -> LpNodeDetails {
let key_bytes = peer.x25519().as_ref().try_into().unwrap();
let mut rng = seeded_rng(key_bytes);
LpNodeDetails::new(
rng.next_u32(),
peer.kem_key_digests().clone(),
peer.x25519().clone(),
version::CURRENT,
)
}
#[tokio::test]
async fn basic_node_to_node_handshake() -> anyhow::Result<()> {
nym_test_utils::helpers::setup_test_logger();
let (init, resp) = mock_peers();
let init_remote = init.as_remote();
let resp_remote = resp.as_remote();
let conn_init = MockIOStream::default();
let conn_resp = conn_init.try_get_remote_handle();
let init_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234);
let init_details = lp_node_details(init_remote);
let resp_details = lp_node_details(resp_remote);
let init_state = shared_node_state(init);
let resp_state = shared_node_state(resp);
let init_handler = InitialLpEgressNodeConnectionHandler::new(
conn_init,
init_addr,
resp_details,
init_state,
);
let resp_handler = InitialLpIngressNodeConnectionHandler::new(
conn_resp,
init_addr,
init_details,
resp_state,
);
let init_future = init_handler.complete_initial_handshake().spawn_timeboxed();
let resp_future = resp_handler.complete_initial_handshake().spawn_timeboxed();
let (init_result, resp_result) = tokio::join!(init_future, resp_future);
let init_result = init_result??.context("handshake failure")??;
let resp_result = resp_result??.context("handshake failure")??;
assert_eq!(
init_result.receiver_index(),
resp_result.transport_session().receiver_index()
);
Ok(())
}
}
-81
View File
@@ -1,81 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::error::NymNodeError;
use crate::node::lp::data::MAX_UDP_PACKET_SIZE;
use crate::node::lp::data::handler::LpDataHandler;
use crate::node::lp::state::SharedLpDataState;
use nym_metrics::inc;
use std::net::SocketAddr;
use tokio::net::UdpSocket;
use tracing::log::warn;
use tracing::{debug, error, info};
/// LP UDP listener that accepts TCP connections on port 51264 (by default)
pub struct LpDataListener {
/// Address to bind to
bind_address: SocketAddr,
/// State used for handling received requests
handler: LpDataHandler,
/// Shutdown token
shutdown: nym_task::ShutdownToken,
}
impl LpDataListener {
pub fn new(
bind_address: SocketAddr,
state: SharedLpDataState,
shutdown: nym_task::ShutdownToken,
) -> Self {
Self {
bind_address,
handler: LpDataHandler::new(state),
shutdown,
}
}
pub async fn run(&self) -> Result<(), NymNodeError> {
let bind_address = self.bind_address;
info!("Starting LP data listener on {bind_address}");
let socket = UdpSocket::bind(bind_address).await.map_err(|source| {
error!("Failed to bind LP data socket to {bind_address}: {source}");
NymNodeError::LpBindFailure {
address: bind_address,
source,
}
})?;
let mut buf = vec![0u8; MAX_UDP_PACKET_SIZE];
loop {
tokio::select! {
biased;
_ = self.shutdown.cancelled() => {
info!("LP data listener: received shutdown signal");
break;
}
result = socket.recv_from(&mut buf) => {
match result {
Ok((len, src_addr)) => {
// Process packet in place (no spawn - UDP is fast)
if let Err(e) = self.handler.handle_packet(&buf[..len], src_addr).await {
debug!("LP data packet error from {src_addr}: {e}");
inc!("lp_data_packet_errors");
}
}
Err(e) => {
warn!("LP data socket recv error: {e}");
inc!("lp_data_recv_errors");
}
}
}
}
}
info!("LP data handler shutdown complete");
Ok(())
}
}
-20
View File
@@ -1,20 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
/// Maximum UDP packet size we'll accept
/// Sphinx packets are typically ~2KB, LP overhead is ~50 bytes, so 4KB is plenty
const MAX_UDP_PACKET_SIZE: usize = 4096;
pub mod handler;
pub(crate) mod listener;
#[cfg(test)]
mod tests {
use super::*;
// Sphinx packets are typically around 2KB
// 4KB should be plenty with room to spare
const _: () = {
assert!(MAX_UDP_PACKET_SIZE >= 2048 + 100);
};
}
@@ -1,5 +1,5 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
//! LP Data Handler - UDP listener for LP data plane (port 51264)
//!
@@ -15,24 +15,90 @@
//! ```
//!
use super::LpHandlerState;
use crate::error::NymNodeError;
use crate::node::lp::error::LpHandlerError;
use crate::node::lp::state::SharedLpDataState;
use nym_lp::packet::OuterHeader;
use nym_metrics::inc;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
use tracing::*;
/// Maximum UDP packet size we'll accept
/// Sphinx packets are typically ~2KB, LP overhead is ~50 bytes, so 4KB is plenty
const MAX_UDP_PACKET_SIZE: usize = 4096;
/// LP Data Handler for UDP data plane
pub struct LpDataHandler {
/// State used for handling received requests
/// UDP socket for receiving LP-wrapped Sphinx packets
socket: Arc<UdpSocket>,
/// Shared state with TCP control plane
#[allow(dead_code)]
state: SharedLpDataState,
state: LpHandlerState,
/// Shutdown token
shutdown: nym_task::ShutdownToken,
}
impl LpDataHandler {
/// Create a new LP data handler
pub fn new(state: SharedLpDataState) -> Self {
Self { state }
pub async fn new(
bind_addr: SocketAddr,
state: LpHandlerState,
shutdown: nym_task::ShutdownToken,
) -> Result<Self, NymNodeError> {
let socket = UdpSocket::bind(bind_addr).await.map_err(|source| {
error!("Failed to bind LP data socket to {bind_addr}: {source}");
NymNodeError::LpBindFailure {
address: bind_addr,
source,
}
})?;
info!("LP data handler listening on UDP {bind_addr}");
Ok(Self {
socket: Arc::new(socket),
state,
shutdown,
})
}
/// Run the UDP packet receive loop
pub async fn run(self) -> Result<(), LpHandlerError> {
let mut buf = vec![0u8; MAX_UDP_PACKET_SIZE];
loop {
tokio::select! {
biased;
_ = self.shutdown.cancelled() => {
info!("LP data handler: received shutdown signal");
break;
}
result = self.socket.recv_from(&mut buf) => {
match result {
Ok((len, src_addr)) => {
// Process packet in place (no spawn - UDP is fast)
if let Err(e) = self.handle_packet(&buf[..len], src_addr).await {
debug!("LP data packet error from {src_addr}: {e}");
inc!("lp_data_packet_errors");
}
}
Err(e) => {
warn!("LP data socket recv error: {e}");
inc!("lp_data_recv_errors");
}
}
}
}
}
info!("LP data handler shutdown complete");
Ok(())
}
/// Handle a single UDP packet
@@ -49,7 +115,7 @@ impl LpDataHandler {
/// - Marking counter as used after successful decryption
///
/// This prevents replay attacks where captured packets are re-sent.
pub(crate) async fn handle_packet(
async fn handle_packet(
&self,
packet: &[u8],
src_addr: SocketAddr,
@@ -157,3 +223,15 @@ impl LpDataHandler {
// }
}
}
#[cfg(test)]
mod tests {
use super::*;
// Sphinx packets are typically around 2KB
// LP overhead is small (~50 bytes header + AEAD tag)
// 4KB should be plenty with room to spare
const _: () = {
assert!(MAX_UDP_PACKET_SIZE >= 2048 + 100);
};
}
-75
View File
@@ -1,75 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use arc_swap::ArcSwap;
use nym_lp::peer::{DHPublicKey, LpRemotePeer};
use nym_lp::{KEM, KEMKeyDigests};
use nym_topology::NodeId;
use std::collections::{BTreeMap, HashMap};
use std::net::IpAddr;
use std::ops::Deref;
use std::sync::Arc;
/// Wrapper around all known LP nodes
#[derive(Clone, Default)]
pub struct LpNodes {
// map between all available ip addresses of other nodes and their details
nodes: Arc<ArcSwap<HashMap<IpAddr, LpNodeDetails>>>,
}
impl LpNodes {
pub(crate) fn get_node_details(&self, node_ip: IpAddr) -> Option<LpNodeDetails> {
self.nodes.load().get(&node_ip).cloned()
}
pub(crate) fn get_node_id(&self, node_ip: IpAddr) -> Option<NodeId> {
self.nodes
.load()
.get(&node_ip)
.map(|details| details.node_id)
}
}
#[derive(Clone)]
pub(crate) struct LpNodeDetails {
inner: Arc<LpNodeDetailsInner>,
}
impl LpNodeDetails {
pub(crate) fn new(
node_id: NodeId,
kem_key_hashes: BTreeMap<KEM, KEMKeyDigests>,
x25519: DHPublicKey,
supported_protocol: u8,
) -> Self {
LpNodeDetails {
inner: Arc::new(LpNodeDetailsInner {
node_id,
kem_key_hashes,
x25519,
supported_protocol,
}),
}
}
}
impl Deref for LpNodeDetails {
type Target = LpNodeDetailsInner;
fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}
pub(crate) struct LpNodeDetailsInner {
pub(crate) node_id: NodeId,
pub(crate) kem_key_hashes: BTreeMap<KEM, KEMKeyDigests>,
pub(crate) x25519: DHPublicKey,
pub(crate) supported_protocol: u8,
}
impl LpNodeDetailsInner {
pub(crate) fn to_lp_peer(&self) -> LpRemotePeer {
LpRemotePeer::new(self.x25519).with_key_digests(self.kem_key_hashes.clone())
}
}
+6 -17
View File
@@ -1,13 +1,12 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::node::lp::LpReceiverIndex;
use nym_lp::packet::message::LpMessageType;
use nym_lp::peer_config::LpReceiverIndex;
use nym_lp::session::LpAction;
use nym_lp::state_machine::LpAction;
use nym_lp::transport::LpTransportError;
use nym_lp::{LpError, packet::MalformedLpPacketError};
use nym_topology::NodeId;
use std::net::{IpAddr, SocketAddr};
use std::net::SocketAddr;
use thiserror::Error;
#[derive(Debug, Error)]
@@ -33,6 +32,9 @@ pub enum LpHandlerError {
received: LpReceiverIndex,
},
#[error("no action has been emitted from the LP State Machine")]
UnexpectedStateMachineHalt,
#[error("the state machine instructed an unexpected action: {action:?}")]
UnexpectedStateMachineAction { action: LpAction },
@@ -48,18 +50,9 @@ pub enum LpHandlerError {
#[error("timed out while attempting to send to/receive from the connection")]
ConnectionTimeout,
#[error("missing KEM key hashes for node {node_id} connected from {node_ip}")]
MissingNodeKEMKeyHashes { node_ip: IpAddr, node_id: NodeId },
#[error("data channel is not yet implemented")]
UnimplementedDataChannel,
#[error("{ip_addr} does not correspond to any known LP node")]
NotLpNode { ip_addr: IpAddr },
#[error("{0}")]
Internal(String),
#[error("{0}")]
Other(String),
}
@@ -74,10 +67,6 @@ impl LpHandlerError {
}
}
pub fn internal(msg: impl Into<String>) -> Self {
LpHandlerError::Internal(msg.into())
}
pub fn other(msg: impl Into<String>) -> Self {
LpHandlerError::Other(msg.into())
}
@@ -1,16 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::node::lp::control::egress::connection::NestedNodeConnectionSender;
use futures::channel::mpsc::UnboundedReceiver;
pub(crate) type NestedClientConnectionSender = ();
pub(crate) type NestedClientConnectionReceiver = UnboundedReceiver<Vec<u8>>;
pub(crate) struct NestedClientConnection {
// handle for sending into `NestedNodeConnectionHandler`
sender: NestedNodeConnectionSender,
// handle for receiving from `NestedNodeConnectionHandler`
receiver: NestedClientConnectionReceiver,
}
@@ -1,116 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::lp::control::egress::connection::NestedNodeControlSender;
use crate::node::lp::directory::LpNodes;
use crate::node::lp::error::LpHandlerError;
use crate::node::lp::forwarding::{
ConnectionControllerResponse, ConnectionHandlerResponse, ControllerResponse,
GetConnectionHandler, NestedConnectionControllerRequest,
};
use nym_topology::NodeId;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Notify;
use tracing::info;
pub const CONTROL_CHANNEL_SIZE: usize = 64;
pub(crate) enum NodeHandle {
Established(NestedNodeControlSender),
Pending(Arc<Notify>),
}
/// Keep track of connections to the exit gateway
pub struct NestedConnectionsController {
/// Handle channel for sending requests to this controller
sender: super::NodeConnectionControllerSender,
/// Channel for receiving requests in this controller
receiver: super::NodeConnectionControllerReceiver,
/// Map of all LP node ip addresses to their details (and ids)
lp_nodes: LpNodes,
/// Handles to the active nested node connections
nodes_handles: HashMap<NodeId, NodeHandle>,
/// Shutdown token
shutdown: nym_task::ShutdownToken,
}
impl NestedConnectionsController {
pub fn new(lp_nodes: LpNodes, shutdown: nym_task::ShutdownToken) -> Self {
let (sender, receiver) = tokio::sync::mpsc::channel(CONTROL_CHANNEL_SIZE);
Self {
sender,
receiver,
lp_nodes,
nodes_handles: HashMap::new(),
shutdown,
}
}
pub fn request_sender(&self) -> super::NodeConnectionControllerSender {
self.sender.clone()
}
async fn handle_get_connection_handler(
&mut self,
request: GetConnectionHandler,
) -> ConnectionHandlerResponse {
let ip = request.target_gateway_lp_address.ip();
let Some(node_id) = self.lp_nodes.get_node_id(ip) else {
return Err(LpHandlerError::NotLpNode { ip_addr: ip });
};
match self.nodes_handles.get(&node_id) {
Some(NodeHandle::Established(handle)) => {
todo!()
}
Some(NodeHandle::Pending(notify)) => {
Ok(ConnectionControllerResponse::Pending(notify.clone()))
}
None => {
let (res, notify) = ConnectionControllerResponse::new_pending();
self.nodes_handles
.insert(node_id, NodeHandle::Pending(notify.clone()));
// create a new connection and return a pending response
todo!();
return Ok(res);
}
}
}
async fn handle_request(&mut self, request: NestedConnectionControllerRequest) {
match request {
NestedConnectionControllerRequest::ConnectionHandler {
request,
response_tx,
} => {
response_tx
.send(self.handle_get_connection_handler(request).await)
.ok();
}
}
}
pub async fn run(&mut self) {
loop {
tokio::select! {
biased;
_ = self.shutdown.cancelled() => {
break;
}
Some(request) = self.receiver.recv() => {
self.handle_request(request).await;
}
}
}
info!("Nested connection controller shutdown complete");
}
}
@@ -1,73 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use super::{
ConnectionControllerResponse, ConnectionHandlerResponse, GetConnectionHandler,
NestedConnectionControllerRequest, NodeConnectionControllerSender,
};
use crate::node::lp::error::LpHandlerError;
use crate::node::lp::forwarding::client_connection::NestedClientConnection;
use nym_lp::peer_config::LpReceiverIndex;
use std::net::SocketAddr;
use tokio::sync::oneshot;
#[derive(Clone)]
pub struct NestedConnectionsManager {
sender: NodeConnectionControllerSender,
}
impl NestedConnectionsManager {
pub fn new(sender: NodeConnectionControllerSender) -> Self {
Self { sender }
}
async fn send_connection_handler_request(
&self,
request: GetConnectionHandler,
) -> Result<ConnectionHandlerResponse, LpHandlerError> {
let (response_tx, response_rx) = oneshot::channel();
self.sender
.send(NestedConnectionControllerRequest::ConnectionHandler {
request,
response_tx,
})
.await
.map_err(|_| LpHandlerError::internal("nested connection controller shut down"))?;
response_rx.await.map_err(|_| {
LpHandlerError::internal("nested connection controller hasn't send a response")
})
}
pub(crate) async fn get_connection_handle(
&self,
target_gateway_lp_address: SocketAddr,
inner_receiver_index: LpReceiverIndex,
) -> Result<NestedClientConnection, LpHandlerError> {
let request = GetConnectionHandler {
target_gateway_lp_address,
inner_receiver_index,
};
let notify = match self.send_connection_handler_request(request).await?? {
// if we have received a ready response, we can return the connection
ConnectionControllerResponse::Ready(conn) => return Ok(conn),
// otherwise we need to wait for the notification when it becomes available
ConnectionControllerResponse::Pending(notify) => notify,
};
// TODO: timeout
notify.notified().await;
match self.send_connection_handler_request(request).await?? {
// if we have received a ready response, we can return the connection
ConnectionControllerResponse::Ready(conn) => Ok(conn),
// otherwise we need to wait for the notification when it becomes available
ConnectionControllerResponse::Pending(_) => Err(LpHandlerError::internal(
"unavailable connection handler after successful notification",
)),
}
}
}
-58
View File
@@ -1,58 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::lp::error::LpHandlerError;
use crate::node::lp::forwarding::client_connection::NestedClientConnection;
use nym_lp::peer_config::LpReceiverIndex;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{Notify, oneshot};
pub mod client_connection;
pub mod controller;
pub mod manager;
pub type NodeConnectionControllerReceiver = Receiver<NestedConnectionControllerRequest>;
pub type NodeConnectionControllerSender = Sender<NestedConnectionControllerRequest>;
pub(crate) enum ConnectionControllerResponse<T> {
/// The response is immediately available
Ready(T),
/// The response is in the process of being resolved. It will be ready once the returned
/// notify resolves. At this point the caller should repeat the query
Pending(Arc<Notify>),
}
impl<T> ConnectionControllerResponse<T> {
pub fn new_pending() -> (Self, Arc<Notify>) {
let notify = Arc::new(Notify::new());
(
ConnectionControllerResponse::Pending(notify.clone()),
notify,
)
}
}
pub type ControllerResponse<T> = Result<ConnectionControllerResponse<T>, LpHandlerError>;
pub type ConnectionHandlerResponse = ControllerResponse<NestedClientConnection>;
pub enum NestedConnectionControllerRequest {
/// Attempt to retrieve or create a handle to an exit gateway connection.
/// If the connection doesn't exist, it will be established
ConnectionHandler {
request: GetConnectionHandler,
response_tx: oneshot::Sender<ConnectionHandlerResponse>,
},
}
#[derive(Copy, Clone)]
pub(crate) struct GetConnectionHandler {
/// Target gateway's LP address
pub target_gateway_lp_address: SocketAddr,
/// Receiver index on the inner packet
pub inner_receiver_index: LpReceiverIndex,
}
@@ -1,21 +1,16 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::lp::cleanup::TimestampedState;
use crate::node::lp::control::{LP_DURATION_BUCKETS, LpConnectionStats};
use super::{LpHandlerState, LpReceiverIndex, TimestampedState};
use crate::node::lp::error::LpHandlerError;
use crate::node::lp::forwarding::client_connection::NestedClientConnection;
use crate::node::lp::state::SharedLpClientControlState;
use dashmap::mapref::one::RefMut;
use nym_lp::packet::message::LpMessageType;
use nym_lp::packet::{EncryptedLpPacket, ForwardPacketData, LpMessage};
use nym_lp::peer_config::LpReceiverIndex;
use nym_lp::session::{LpAction, LpInput};
use nym_lp::state_machine::{LpAction, LpInput};
use nym_lp::transport::LpHandshakeChannel;
use nym_lp::transport::traits::LpTransportChannel;
use nym_lp::{LpTransportSession, packet::message::ExpectedResponseSize};
use nym_lp::{LpSession, LpStateMachine, packet::message::ExpectedResponseSize};
use nym_metrics::{add_histogram_obs, inc};
use nym_node_metrics::NymNodeMetrics;
use nym_registration_common::{LpRegistrationRequest, RegistrationStatus};
use std::net::SocketAddr;
use std::time::Duration;
@@ -23,16 +18,69 @@ use tokio::net::TcpStream;
use tokio::time::timeout;
use tracing::*;
// Histogram buckets for LP operation duration (legacy - used by unused forwarding methods)
const LP_DURATION_BUCKETS: &[f64] = &[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0];
// Timeout for forward I/O operations (send + receive on exit stream)
// Must be long enough to cover exit gateway processing time
const FORWARD_IO_TIMEOUT_SECS: u64 = 30;
pub struct LpClientConnectionHandler<S = TcpStream> {
// Histogram buckets for LP connection lifecycle duration
// LP connections can be very short (registration only: ~1s) or very long (dVPN sessions: hours/days)
// Covers full range from seconds to 24 hours
const LP_CONNECTION_DURATION_BUCKETS: &[f64] = &[
1.0, // 1 second
5.0, // 5 seconds
10.0, // 10 seconds
30.0, // 30 seconds
60.0, // 1 minute
300.0, // 5 minutes
600.0, // 10 minutes
1800.0, // 30 minutes
3600.0, // 1 hour
7200.0, // 2 hours
14400.0, // 4 hours
28800.0, // 8 hours
43200.0, // 12 hours
86400.0, // 24 hours
];
/// Connection lifecycle statistics tracking
struct ConnectionStats {
/// When the connection started
start_time: std::time::Instant,
/// Total bytes received (including protocol framing)
bytes_received: u64,
/// Total bytes sent (including protocol framing)
bytes_sent: u64,
}
impl ConnectionStats {
fn new() -> Self {
Self {
start_time: std::time::Instant::now(),
bytes_received: 0,
bytes_sent: 0,
}
}
fn record_bytes_received(&mut self, bytes: usize) {
self.bytes_received += bytes as u64;
}
fn record_bytes_sent(&mut self, bytes: usize) {
self.bytes_sent += bytes as u64;
}
}
pub struct LpConnectionHandler<S = TcpStream> {
stream: S,
remote_addr: SocketAddr,
state: SharedLpClientControlState,
stats: LpConnectionStats,
state: LpHandlerState,
stats: ConnectionStats,
// /// Flag indicating whether this is a connection from an entry gateway serving as a proxy
// forwarded_connection: bool,
/// Bound receiver_idx for this connection (set after first packet).
/// All subsequent packets on this connection must use this receiver_idx.
/// Set from ClientHello's proposed receiver_index, or from header for non-bootstrap packets.
@@ -42,43 +90,39 @@ pub struct LpClientConnectionHandler<S = TcpStream> {
/// Opened on first forward, reused for subsequent forwards, closed when client disconnects.
/// Tuple contains (stream, target_address) to verify subsequent forwards go to same exit.
exit_stream: Option<(S, SocketAddr)>,
/// Forwarding channel for sending requests to the exit gateway and receiving responses.
#[allow(dead_code)]
forwarding_channel: Option<NestedClientConnection>,
}
impl<S> LpClientConnectionHandler<S>
impl<S> LpConnectionHandler<S>
where
S: LpTransportChannel + LpHandshakeChannel + Unpin,
{
pub fn new(stream: S, remote_addr: SocketAddr, state: SharedLpClientControlState) -> Self {
pub fn new(
stream: S,
// forwarded_connection: bool,
remote_addr: SocketAddr,
state: LpHandlerState,
) -> Self {
Self {
stream,
remote_addr,
// forwarded_connection,
state,
stats: LpConnectionStats::new(),
stats: ConnectionStats::new(),
bound_receiver_idx: None,
exit_stream: None,
forwarding_channel: None,
}
}
pub(crate) fn metrics(&self) -> &NymNodeMetrics {
&self.state.shared.metrics
}
/// Get the mutable reference to the state machine associated with this client.
/// It is vital it's never held across await points or this might lead to a deadlock.
fn state_entry_mut(
&self,
) -> Result<RefMut<'_, LpReceiverIndex, TimestampedState<LpTransportSession>>, LpHandlerError>
{
) -> Result<RefMut<'_, LpReceiverIndex, TimestampedState<LpStateMachine>>, LpHandlerError> {
let receiver_index = self.bound_receiver_index()?;
self.state
.shared
.session_states
.get_state_entry_mut(receiver_index)
.get_mut(&receiver_index)
.ok_or_else(|| LpHandlerError::MissingLpSession { receiver_index })
}
/// AIDEV-NOTE: Stream-oriented packet loop
@@ -87,12 +131,11 @@ where
/// First packet binds the connection to a receiver_idx (session-affine).
/// Binding is set by handle_client_hello() from payload's receiver_index,
/// or by validate_or_set_binding() for non-bootstrap first packets.
pub async fn handle(&mut self) -> Result<(), LpHandlerError> {
let remote = self.remote_addr;
debug!("Handling LP connection from {remote}");
pub async fn handle(mut self) -> Result<(), LpHandlerError> {
debug!("Handling LP connection from {}", self.remote_addr);
// Track total LP connections handled
inc!("lp_client_connections_total");
inc!("lp_connections_total");
// ============================================================
// STREAM-ORIENTED PROCESSING: Loop until connection closes
@@ -101,24 +144,30 @@ where
// 1. complete KKT/PSQ handshake before doing anything else.
// bail if it takes too long
let timeout = self.state.shared.lp_config.debug.handshake_ttl;
let timeout = self.state.lp_config.debug.handshake_ttl;
let local_peer = self.state.local_lp_peer.clone();
let stream = &mut self.stream;
let session = match tokio::time::timeout(timeout, async move {
LpTransportSession::psq_handshake_responder(stream, local_peer)
LpSession::psq_handshake_responder(stream, local_peer)
.complete_handshake()
.await
})
.await
{
Err(_timeout) => {
debug!("timed out attempting to complete KTT/PSQ handshake with {remote}",);
debug!(
"timed out attempting to complete KTT/PSQ handshake with {}",
self.remote_addr
);
self.emit_lifecycle_metrics(false);
return Ok(());
}
Ok(Err(handshake_failure)) => {
debug!("failed to complete KKT/PSQ handshake with {remote}: {handshake_failure}",);
debug!(
"failed to complete KKT/PSQ handshake with {}: {handshake_failure}",
self.remote_addr
);
self.emit_lifecycle_metrics(false);
return Ok(());
}
@@ -127,7 +176,10 @@ where
let receiver_idx = session.receiver_index();
// 2. insert the state machine into the shared state
self.state.shared.session_states.insert_new_session(session);
let state_machine = LpStateMachine::new(session);
self.state
.session_states
.insert(receiver_idx, TimestampedState::new(state_machine));
self.bound_receiver_idx = Some(receiver_idx);
// 3. handle any new incoming packet
@@ -138,7 +190,7 @@ where
Err(err) => {
if err.is_connection_closed() {
// Graceful EOF - client closed connection
trace!("Connection closed by {remote} (EOF)");
trace!("Connection closed by {} (EOF)", self.remote_addr);
break;
} else {
inc!("lp_errors_receive_packet");
@@ -221,18 +273,28 @@ where
);
// Process packet through state machine
let action = state_machine.process_input(LpInput::ReceivePacket(encrypted_packet))?;
let action = state_machine
.process_input(LpInput::ReceivePacket(encrypted_packet))
.ok_or(LpHandlerError::UnexpectedStateMachineHalt)??;
drop(state_entry);
match action {
LpAction::SendPacket(response_packet) => {
self.send_serialised_packet(&response_packet).await
self.send_serialised_packet(&response_packet).await?;
Ok(())
}
LpAction::DeliverData(data) => {
// Decrypted application data - process as registration/forwarding
self.handle_decrypted_payload(receiver_index, data).await
}
other @ LpAction::ConnectionClosed => {
warn!(
"Unexpected action in transport from {}: {other:?}",
self.remote_addr
);
Err(LpHandlerError::UnexpectedStateMachineAction { action: other })
}
}
}
@@ -291,7 +353,9 @@ where
let wrapped_lp_data = LpMessage::new(response_kind, serialised_response);
// Process packet through state machine
let action = state_machine.process_input(LpInput::SendData(wrapped_lp_data))?;
let action = state_machine
.process_input(LpInput::SendData(wrapped_lp_data))
.ok_or(LpHandlerError::UnexpectedStateMachineHalt)??;
let packet = match action {
LpAction::SendPacket(packet) => packet,
@@ -569,16 +633,40 @@ where
/// Emit connection lifecycle metrics
fn emit_lifecycle_metrics(&self, graceful: bool) {
self.stats.emit_lifecycle_client_metrics(graceful);
use nym_metrics::inc_by;
// Track connection duration
let duration = self.stats.start_time.elapsed().as_secs_f64();
add_histogram_obs!(
"lp_connection_duration_seconds",
duration,
LP_CONNECTION_DURATION_BUCKETS
);
// Track bytes transferred
inc_by!(
"lp_connection_bytes_received_total",
self.stats.bytes_received as i64
);
inc_by!(
"lp_connection_bytes_sent_total",
self.stats.bytes_sent as i64
);
// Track completion type
if graceful {
inc!("lp_connections_completed_gracefully");
} else {
inc!("lp_connections_completed_with_error");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::LpConfig;
use crate::config::lp::LpDebug;
use crate::node::lp::state::{ActiveLpSessions, SharedLpState};
use crate::node::lp::LpConfig;
use nym_lp::peer::{KEMKeys, LpLocalPeer, generate_keypair_mceliece, generate_keypair_mlkem};
use nym_lp::{Ciphersuite, SessionManager, sessions_for_tests};
use nym_test_utils::helpers::{deterministic_rng, deterministic_rng_09};
@@ -586,7 +674,7 @@ mod tests {
// ==================== Test Helpers ====================
/// Create a minimal test state for handler tests
async fn create_minimal_test_state() -> SharedLpClientControlState {
async fn create_minimal_test_state() -> LpHandlerState {
use nym_crypto::asymmetric::ed25519;
let mut rng = deterministic_rng();
@@ -602,6 +690,9 @@ mod tests {
lp_config.debug.max_concurrent_forwards,
));
// Create mix forwarding channel (unused in tests but required by struct)
let (mix_sender, _mix_receiver) = nym_mixnet_client::forwarder::mix_forwarding_channels();
let id_keys = Arc::new(ed25519::KeyPair::new(&mut rng));
let x_keys = Arc::new(id_keys.to_x25519().try_into().unwrap());
@@ -611,15 +702,14 @@ mod tests {
);
let lp_peer = LpLocalPeer::new(Ciphersuite::default(), x_keys).with_kem_keys(kem_keys);
SharedLpClientControlState {
LpHandlerState {
lp_config,
local_lp_peer: lp_peer,
metrics: nym_node_metrics::NymNodeMetrics::default(),
outbound_mix_sender: mix_sender,
session_states: Arc::new(dashmap::DashMap::new()),
peer_registrator: None,
forward_semaphore,
shared: SharedLpState {
lp_config,
metrics: NymNodeMetrics::default(),
session_states: ActiveLpSessions::new(),
},
}
}
@@ -634,8 +724,8 @@ mod tests {
let (init, resp) = sessions_for_tests();
let mut init_sm = SessionManager::new();
let mut resp_sm = SessionManager::new();
resp_sm.insert_session(resp).unwrap();
let id = init_sm.insert_session(init).unwrap();
resp_sm.create_session_state_machine(resp).unwrap();
let id = init_sm.create_session_state_machine(init).unwrap();
// Bind to localhost
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
@@ -645,12 +735,13 @@ mod tests {
let server_task = tokio::spawn(async move {
let (stream, remote_addr) = listener.accept().await.unwrap();
let state = create_minimal_test_state().await;
let mut handler = LpClientConnectionHandler::new(stream, remote_addr, state);
let mut handler = LpConnectionHandler::new(stream, remote_addr, state);
// Two-phase: receive raw bytes + header, then parse full packet
let packet = handler.receive_raw_packet().await?;
let header = packet.outer_header();
assert_eq!(packet.outer_header().receiver_idx, id);
let LpAction::DeliverData(data) = resp_sm.receive_packet(id, packet)? else {
let Some(LpAction::DeliverData(data)) = resp_sm.receive_packet(id, packet).unwrap()
else {
panic!("illegal state")
};
Ok::<_, LpHandlerError>((header, data))
@@ -690,8 +781,8 @@ mod tests {
let (init, resp) = sessions_for_tests();
let mut init_sm = SessionManager::new();
let mut resp_sm = SessionManager::new();
resp_sm.insert_session(resp).unwrap();
let id = init_sm.insert_session(init).unwrap();
resp_sm.create_session_state_machine(resp).unwrap();
let id = init_sm.create_session_state_machine(init).unwrap();
let server_task = tokio::spawn(async move {
let (mut stream, _) = listener.accept().await.unwrap();
@@ -720,7 +811,8 @@ mod tests {
.await
.unwrap();
let header = received.outer_header();
let LpAction::DeliverData(data) = init_sm.receive_packet(id, received).unwrap() else {
let Some(LpAction::DeliverData(data)) = init_sm.receive_packet(id, received).unwrap()
else {
panic!("illegal state")
};
+370 -120
View File
@@ -9,7 +9,7 @@
// ## Connection Metrics (via NetworkStats in nym-node-metrics)
// - active_lp_connections: Gauge tracking current active LP connections (incremented on accept, decremented on close)
//
// ## Handler Metrics (in client_handler)
// ## Handler Metrics (in handler.rs)
// - lp_connections_total: Counter for total LP connections handled
// - lp_client_hello_failed: Counter for ClientHello failures (timestamp validation, protocol errors)
// - lp_handshakes_success: Counter for successful handshake completions
@@ -46,7 +46,7 @@
// ## Error Categorization Metrics
// - lp_errors_wg_peer_registration: Counter for WireGuard peer registration failures
//
// ## Connection Lifecycle Metrics (in client_handler)
// ## Connection Lifecycle Metrics (in handler.rs)
// - lp_connection_duration_seconds: Histogram of connection duration from start to end (buckets: 1s to 24h)
// - lp_connection_bytes_received_total: Counter for total bytes received including protocol framing
// - lp_connection_bytes_sent_total: Counter for total bytes sent including protocol framing
@@ -58,7 +58,7 @@
// - lp_states_cleanup_session_removed: Counter for stale sessions removed by cleanup task
// - lp_states_cleanup_demoted_removed: Counter for demoted (read-only) sessions removed by cleanup task
//
// ## Subsession/Rekeying Metrics (in client_handler)
// ## Subsession/Rekeying Metrics (in handler.rs)
// - lp_subsession_kk2_sent: Counter for SubsessionKK2 responses sent (indicates client initiated rekeying)
// - lp_subsession_complete: Counter for successful subsession promotions
// - lp_subsession_receiver_index_collision: Counter for subsession receiver_index collisions
@@ -67,154 +67,404 @@
// To view metrics, the nym-metrics registry automatically collects all metrics.
// They can be exported via Prometheus format using the metrics endpoint.
use crate::config::LpConfig;
use crate::config::lp::LpConfig;
use crate::error::NymNodeError;
use crate::node::lp::cleanup::CleanupTask;
use crate::node::lp::control::ingress::listener::LpControlListener;
use crate::node::lp::data::listener::LpDataListener;
use dashmap::DashMap;
use nym_gateway::node::wireguard::PeerRegistrator;
use nym_lp::peer::LpLocalPeer;
use nym_lp::peer_config::LpReceiverIndex;
use nym_lp::state_machine::LpStateMachine;
use nym_mixnet_client::forwarder::MixForwardingSender;
use nym_node_metrics::NymNodeMetrics;
use nym_task::ShutdownTracker;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::sync::Semaphore;
use tracing::error;
use tracing::*;
use crate::node::lp::directory::LpNodes;
use crate::node::lp::forwarding::controller::NestedConnectionsController;
use crate::node::lp::forwarding::manager::NestedConnectionsManager;
use crate::node::lp::state::{ActiveLpSessions, SharedLpNodeControlState};
pub use nym_mixnet_client::forwarder::{MixForwardingReceiver, mix_forwarding_channels};
pub use state::{SharedLpClientControlState, SharedLpDataState, SharedLpState};
mod cleanup;
pub mod control;
mod data;
pub mod directory;
mod data_handler;
pub mod error;
pub mod forwarding;
pub mod handler;
mod registration;
pub mod state;
pub struct LpSetup {
control_listener: LpControlListener,
data_listener: LpDataListener,
cleanup_task: CleanupTask,
nested_connections_controller: NestedConnectionsController,
/// Wrapper for state entries with timestamp tracking for cleanup
///
/// This wrapper adds `created_at` and `last_activity` timestamps to state entries,
/// enabling TTL-based cleanup of stale handshakes and sessions.
pub struct TimestampedState<T> {
/// The actual state (LpStateMachine or LpSession)
pub state: T,
/// When this state was created (never changes)
created_at: std::time::Instant,
/// Last activity timestamp (unix seconds, atomically updated)
///
/// For handshakes: never updated (use created_at for TTL)
/// For sessions: updated on every packet received
last_activity: std::sync::atomic::AtomicU64,
}
impl<T> TimestampedState<T> {
/// Create a new timestamped state
pub fn new(state: T) -> Self {
let now_instant = std::time::Instant::now();
let now_unix = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
Self {
state,
created_at: now_instant,
last_activity: std::sync::atomic::AtomicU64::new(now_unix),
}
}
/// Update last_activity timestamp (cheap, lock-free operation)
pub fn touch(&self) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.last_activity
.store(now, std::sync::atomic::Ordering::Relaxed);
}
/// Get age since creation
#[allow(dead_code)]
pub fn age(&self) -> Duration {
self.created_at.elapsed()
}
/// Get time since last activity
pub fn since_activity(&self) -> Duration {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let last = self
.last_activity
.load(std::sync::atomic::Ordering::Relaxed);
Duration::from_secs(now.saturating_sub(last))
}
}
/// Shared state for LP connection handlers
#[derive(Clone)]
pub struct LpHandlerState {
/// Encapsulates all required key information of a local Lewes Protocol Peer.
pub local_lp_peer: LpLocalPeer,
/// Metrics collection
pub metrics: NymNodeMetrics,
/// Handle registering new wireguard peers
pub peer_registrator: Option<PeerRegistrator>,
/// LP configuration (for timestamp validation, etc.)
pub lp_config: LpConfig,
/// Channel for forwarding Sphinx packets into the mixnet
///
/// Used by the LP data handler (UDP:51264) to forward decrypted Sphinx packets
/// from LP clients into the mixnet for routing.
#[allow(dead_code)]
pub outbound_mix_sender: MixForwardingSender,
/// Established sessions keyed by session_id
///
/// Used after handshake completes (session_id is deterministically computed from
/// both parties' X25519 keys). Enables stateless transport - each packet lookup
/// by session_id, decrypt/process, respond.
///
/// Wrapped in TimestampedState for TTL-based cleanup of inactive sessions.
///
/// Sessions are stored as LpStateMachine (not LpSession) to enable
/// subsession/rekeying support. The state machine handles subsession initiation
/// (SubsessionKK1/KK2/Ready) during transport phase, allowing long-lived connections
/// to rekey without re-authentication.
pub session_states: Arc<DashMap<LpReceiverIndex, TimestampedState<LpStateMachine>>>,
/// Semaphore limiting concurrent forward connections
///
/// Prevents file descriptor exhaustion when forwarding LP packets during
/// telescope setup. When at capacity, forward requests return an error
/// so clients can choose a different gateway.
// Connection limiting (not pooling) chosen for forward requests.
//
// Why not connection pooling?
// 1. Forwarding is one-time per telescope setup (handshake only), not ongoing traffic.
// Once telescope is established, data flows directly through the tunnel.
// 2. Telescope targets are distributed across many different gateways - each client
// typically connects to a different exit gateway, so pooled connections would
// rarely be reused.
// 3. Connections already go out of scope after each request-response. FD exhaustion
// only happens from concurrent spikes, not accumulation.
// 4. A pool would accumulate one idle connection per unique destination, most of
// which would never be reused before TTL expiration.
//
// Why semaphore limiting is better:
// 1. Directly caps concurrent forward connections regardless of destination.
// 2. When at capacity, returns "busy" error - client can choose another gateway.
// This is better than silently queuing requests behind a pool.
// 3. Simple implementation: no TTL management, stale connection handling, or cleanup.
pub forward_semaphore: Arc<Semaphore>,
}
/// LP listener that accepts TCP connections on port 41264
pub struct LpListener {
/// Shared state for connection handlers
handler_state: LpHandlerState,
/// Shutdown coordination
shutdown: ShutdownTracker,
}
impl LpSetup {
pub async fn new(
local_lp_peer: LpLocalPeer,
lp_config: LpConfig,
metrics: NymNodeMetrics,
peer_registrator: Option<PeerRegistrator>,
network_nodes: LpNodes,
mix_packet_sender: MixForwardingSender,
shutdown: ShutdownTracker,
) -> Result<Self, NymNodeError> {
// TODO: this will require loading old states from disk in the future
let session_states = ActiveLpSessions::new();
let nested_connections_controller = NestedConnectionsController::new(
network_nodes.clone(),
shutdown.clone_shutdown_token(),
);
let shared_lp_state = SharedLpState {
metrics,
lp_config,
session_states: session_states.clone(),
};
let client_control_state = SharedLpClientControlState {
local_lp_peer: local_lp_peer.clone(),
peer_registrator,
nested_connections_manager: NestedConnectionsManager::new(
nested_connections_controller.request_sender(),
),
forward_semaphore: Arc::new(Semaphore::new(lp_config.debug.max_concurrent_forwards)),
shared: shared_lp_state.clone(),
};
let nodes_control_state = SharedLpNodeControlState {
local_lp_peer,
nodes: network_nodes,
shared: shared_lp_state.clone(),
};
let data_state = SharedLpDataState {
outbound_mix_sender: mix_packet_sender,
shared: shared_lp_state,
};
let control_listener = LpControlListener::new(
lp_config.control_bind_address,
client_control_state,
nodes_control_state,
shutdown.clone(),
);
let data_listener = LpDataListener::new(
lp_config.data_bind_address,
data_state,
shutdown.clone_shutdown_token(),
);
let cleanup_task = CleanupTask::new(
session_states,
lp_config.debug,
shutdown.clone_shutdown_token(),
);
Ok(LpSetup {
control_listener,
data_listener,
cleanup_task,
nested_connections_controller,
impl LpListener {
pub fn new(handler_state: LpHandlerState, shutdown: ShutdownTracker) -> Self {
Self {
handler_state,
shutdown,
})
}
}
pub fn start_tasks(mut self) {
// control listener
fn lp_config(&self) -> LpConfig {
self.handler_state.lp_config
}
pub async fn run(&mut self) -> Result<(), NymNodeError> {
let control_bind_address = self.lp_config().control_bind_address;
let data_bind_address = self.lp_config().data_bind_address;
let listener = TcpListener::bind(control_bind_address)
.await
.map_err(|source| {
error!("Failed to bind LP listener to {control_bind_address}: {source}",);
NymNodeError::LpBindFailure {
address: control_bind_address,
source,
}
})?;
let shutdown_token = self.shutdown.clone_shutdown_token();
self.shutdown.try_spawn_named(
// Spawn background task for state cleanup
let _cleanup_handle = self.spawn_state_cleanup_task();
// Spawn UDP data handler for LP data plane (port 51264)
let _data_handler_handle = self.spawn_data_handler().await?;
info!(
"LP listener started on {control_bind_address} (data handler on: {data_bind_address})",
);
loop {
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
trace!("LP listener: received shutdown signal");
break;
}
result = listener.accept() => {
match result {
Ok((stream, addr)) => self.handle_connection(stream, addr),
Err(e) => warn!("Failed to accept LP connection: {e}")
}
}
}
}
info!("LP listener shutdown complete");
Ok(())
}
fn handle_connection(&self, stream: tokio::net::TcpStream, remote_addr: SocketAddr) {
// Check connection limit
let active_connections = self.active_lp_connections();
let max_connections = self.lp_config().debug.max_connections;
if active_connections >= max_connections {
warn!(
"LP connection limit exceeded ({active_connections}/{max_connections}), rejecting connection from {remote_addr}"
);
return;
}
debug!(
"Accepting LP connection from {remote_addr} ({active_connections} active connections)"
);
// Increment connection counter
self.handler_state.metrics.network.new_lp_connection();
// Spawn handler task
let handler =
handler::LpConnectionHandler::new(stream, remote_addr, self.handler_state.clone());
let metrics = self.handler_state.metrics.clone();
self.shutdown.try_spawn_named_with_shutdown(
async move {
if let Err(err) = self.control_listener.run().await {
shutdown_token.cancel();
error!("LP control listener error: {err}");
let result = handler.handle().await;
// Handler emits lifecycle metrics internally on success
// For errors, we need to emit them here since handler is consumed
if let Err(e) = result {
warn!("LP handler error for {remote_addr}: {e}");
// Note: metrics are emitted in handle() for graceful path
// On error path, handle() returns early without emitting
// So we track errors here
}
// Decrement connection counter on exit
metrics.network.lp_connection_closed();
},
&format!("LP::{remote_addr}"),
);
}
/// Spawn the UDP data handler for LP data plane
///
/// The data handler listens on UDP port 51264 and processes LP-wrapped Sphinx packets
/// from registered clients. It decrypts the LP layer and forwards the Sphinx packets
/// into the mixnet.
async fn spawn_data_handler(&self) -> Result<tokio::task::JoinHandle<()>, NymNodeError> {
// Create data handler
let data_handler = data_handler::LpDataHandler::new(
self.lp_config().data_bind_address,
self.handler_state.clone(),
self.shutdown.clone_shutdown_token(),
)
.await?;
// Spawn data handler task
let handle = self.shutdown.try_spawn_named(
async move {
if let Err(e) = data_handler.run().await {
error!("LP data handler error: {e}");
}
},
"LP::LpControlListener",
"LP::DataHandler",
);
// Spawn the UDP data handler for LP data plane
// The data handler listens on UDP port 51264 and processes LP-wrapped Sphinx packets
// from registered clients. It decrypts the LP layer and forwards the Sphinx packets
let shutdown_token = self.shutdown.clone_shutdown_token();
self.shutdown.try_spawn_named(
async move {
if let Err(err) = self.data_listener.run().await {
shutdown_token.cancel();
error!("LP data listener error: {err}");
}
},
"LP::LpDataListener",
Ok(handle)
}
/// Spawn background task for cleaning up stale state entries
///
/// This task runs periodically (every `state_cleanup_interval_secs`) to remove:
/// - Handshake states older than `handshake_ttl_secs`
/// - Session states with no activity for `session_ttl_secs`
///
/// The task automatically stops when the shutdown signal is received.
fn spawn_state_cleanup_task(&self) -> tokio::task::JoinHandle<()> {
let session_states = Arc::clone(&self.handler_state.session_states);
let dbg_cfg = self.handler_state.lp_config.debug;
let handshake_ttl = dbg_cfg.handshake_ttl;
let session_ttl = dbg_cfg.session_ttl;
let interval = dbg_cfg.state_cleanup_interval;
let shutdown = self.shutdown.clone_shutdown_token();
let metrics = self.handler_state.metrics.clone();
info!(
"Starting LP state cleanup task (handshake_ttl={}s, session_ttl={}s, interval={}s)",
handshake_ttl.as_secs(),
session_ttl.as_secs(),
interval.as_secs()
);
// cleanup task
self.shutdown.try_spawn_named(
async move { self.cleanup_task.run().await },
"LP::CleanupTask",
);
cleanup_task::cleanup_loop(session_states, dbg_cfg, shutdown, metrics),
"LP::StateCleanup",
)
}
// nested connections controller
self.shutdown.try_spawn_named(
async move { self.nested_connections_controller.run().await },
"LP::NestedConnectionsController",
);
fn active_lp_connections(&self) -> usize {
self.handler_state
.metrics
.network
.active_lp_connections_count()
}
}
pub(crate) mod cleanup_task {
use crate::config::lp::LpDebug;
use crate::node::lp::{LpReceiverIndex, TimestampedState};
use dashmap::DashMap;
use nym_lp::LpStateMachine;
use nym_metrics::inc_by;
use nym_node_metrics::NymNodeMetrics;
use std::sync::Arc;
use tracing::{debug, info};
async fn perform_cleanup(
session_states: &Arc<DashMap<LpReceiverIndex, TimestampedState<LpStateMachine>>>,
cfg: LpDebug,
) {
let session_ttl = cfg.session_ttl;
let start = std::time::Instant::now();
let mut ss_removed = 0u64;
// Remove stale sessions (based on time since last activity)
// Use shorter TTL for demoted (ReadOnlyTransport) sessions
session_states.retain(|_, timestamped| {
if timestamped.since_activity() > session_ttl {
ss_removed += 1;
false
} else {
true
}
});
if ss_removed > 0 {
let duration = start.elapsed();
info!(
"LP state cleanup: {ss_removed} sessions (took {:.3}s)",
duration.as_secs_f64()
);
// Track metrics
if ss_removed > 0 {
inc_by!("lp_states_cleanup_session_removed", ss_removed as i64);
}
}
}
/// Background loop for cleaning up stale state entries
///
/// Runs periodically to scan handshake_states and session_states maps,
/// removing entries that have exceeded their TTL.
///
/// Demoted sessions (ReadOnlyTransport) use shorter TTL since they
/// only need to drain in-flight packets after subsession promotion.
pub(crate) async fn cleanup_loop(
session_states: Arc<DashMap<LpReceiverIndex, TimestampedState<LpStateMachine>>>,
cfg: LpDebug,
shutdown: nym_task::ShutdownToken,
_metrics: NymNodeMetrics,
) {
let interval = cfg.state_cleanup_interval;
let mut cleanup_interval = tokio::time::interval(interval);
loop {
tokio::select! {
biased;
_ = shutdown.cancelled() => {
debug!("LP state cleanup task: received shutdown signal");
break;
}
_ = cleanup_interval.tick() => {
perform_cleanup(&session_states, cfg).await;
}
}
}
info!("LP state cleanup task shutdown complete");
}
}
+2 -3
View File
@@ -1,8 +1,7 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::lp::state::SharedLpClientControlState;
use nym_lp::peer_config::LpReceiverIndex;
use crate::node::lp::{LpHandlerState, LpReceiverIndex};
use nym_metrics::{add_histogram_obs, inc};
use nym_registration_common::dvpn::{
LpDvpnRegistrationFinalisation, LpDvpnRegistrationInitialRequest,
@@ -29,7 +28,7 @@ const LP_REGISTRATION_DURATION_BUCKETS: &[f64] = &[
30.0, // 30s
];
impl SharedLpClientControlState {
impl LpHandlerState {
async fn process_dvpn_initial_registration(
&self,
sender: LpReceiverIndex,
-115
View File
@@ -1,115 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::config::LpConfig;
use crate::node::lp::cleanup::TimestampedState;
use crate::node::lp::directory::LpNodes;
use crate::node::lp::error::LpHandlerError;
use dashmap::DashMap;
use dashmap::mapref::one::RefMut;
use nym_gateway::node::wireguard::PeerRegistrator;
use nym_lp::LpTransportSession;
use nym_lp::peer::LpLocalPeer;
use nym_lp::peer_config::LpReceiverIndex;
use nym_mixnet_client::forwarder::MixForwardingSender;
use nym_node_metrics::NymNodeMetrics;
use std::sync::Arc;
use tokio::sync::Semaphore;
pub use crate::node::lp::forwarding::manager::NestedConnectionsManager;
/// Shared state for LP clients control connections
#[derive(Clone)]
pub struct SharedLpClientControlState {
/// Encapsulates all required key information of a local Lewes Protocol Peer.
pub local_lp_peer: LpLocalPeer,
/// Handle registering new wireguard peers
pub peer_registrator: Option<PeerRegistrator>,
/// Controller for obtaining handles to forwarding channels
pub nested_connections_manager: NestedConnectionsManager,
/// Semaphore limiting concurrent forward connections
///
/// Prevents file descriptor exhaustion when forwarding LP packets during
/// telescope setup. When at capacity, forward requests return an error
/// so clients can choose a different gateway.
// this is temporary until there is persistent KKT/PSQ session between nodes
// #[deprecated]
pub forward_semaphore: Arc<Semaphore>,
/// Common shared data
pub shared: SharedLpState,
}
/// Shared state for LP nodes control connections
#[derive(Clone)]
pub struct SharedLpNodeControlState {
/// Encapsulates all required key information of a local Lewes Protocol Peer.
pub local_lp_peer: LpLocalPeer,
/// Information about all known LP nodes
pub nodes: LpNodes,
/// Common shared data
pub shared: SharedLpState,
}
/// Shared state for LP data connections
#[derive(Clone)]
pub struct SharedLpDataState {
/// Channel for forwarding Sphinx packets into the mixnet
///
/// Used by the LP data handler (UDP:51264) to forward decrypted Sphinx packets
/// from LP clients into the mixnet for routing.
#[allow(dead_code)]
pub outbound_mix_sender: MixForwardingSender,
/// Common shared data
pub shared: SharedLpState,
}
/// Established sessions keyed by the receiver index
///
/// Wrapped in TimestampedState for TTL-based cleanup of inactive sessions.
#[derive(Clone, Default)]
pub struct ActiveLpSessions {
// TODO: this might require split between client and node sessions. TBD
pub(crate) sessions: Arc<DashMap<LpReceiverIndex, TimestampedState<LpTransportSession>>>,
}
impl ActiveLpSessions {
pub fn new() -> Self {
Self::default()
}
pub(crate) fn get_state_entry_mut(
&self,
receiver_index: LpReceiverIndex,
) -> Result<RefMut<'_, LpReceiverIndex, TimestampedState<LpTransportSession>>, LpHandlerError>
{
self.sessions
.get_mut(&receiver_index)
.ok_or_else(|| LpHandlerError::MissingLpSession { receiver_index })
}
pub(crate) fn insert_new_session(&self, session: LpTransportSession) {
let receiver_index = session.receiver_index();
self.sessions
.insert(receiver_index, TimestampedState::new(session));
}
}
/// Shared state for LP connection handlers
#[derive(Clone)]
pub struct SharedLpState {
/// Metrics collection
pub metrics: NymNodeMetrics,
/// LP configuration (for timestamp validation, etc.)
pub lp_config: LpConfig,
/// Currently active LP sessions
pub session_states: ActiveLpSessions,
}
+27 -27
View File
@@ -22,7 +22,7 @@ use crate::node::http::{HttpServerConfig, NymNodeHttpServer, NymNodeRouter};
use crate::node::key_rotation::active_keys::ActiveSphinxKeys;
use crate::node::key_rotation::controller::KeyRotationController;
use crate::node::key_rotation::manager::SphinxKeyManager;
use crate::node::lp::LpSetup;
use crate::node::lp::{LpHandlerState, LpListener};
use crate::node::metrics::aggregator::MetricsAggregator;
use crate::node::metrics::console_logger::ConsoleLogger;
use crate::node::metrics::handler::client_sessions::GatewaySessionStatsHandler;
@@ -80,11 +80,10 @@ use std::net::SocketAddr;
use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::{Semaphore, mpsc};
use tracing::{debug, info, trace};
use zeroize::Zeroizing;
use crate::node::lp::directory::LpNodes;
pub use nym_gateway::node::ActiveClientsStore;
pub use nym_gateway::node::GatewayStorage;
@@ -454,16 +453,16 @@ impl NymNode {
&config.storage_paths.keys.x25519_noise_storage_paths(),
)?;
trace!("attempting to store x25519 lp keypair");
trace!("attempting to x25519 lp keypair");
store_x25519_lp_keypair(
&x25519_lp_keys,
&config.storage_paths.keys.x25519_lp_key_paths(),
)?;
trace!("attempting to store mlkem768 keypair");
trace!("attempting to mlkem768 keypair");
store_mlkem768_keypair(&mlkem, &config.storage_paths.keys.mlkem768_key_paths())?;
trace!("attempting to store mceliece keypair");
trace!("attempting to mceliece keypair");
store_mceliece_keypair(&mceliece, &config.storage_paths.keys.mceliece_key_paths())?;
trace!("creating description file");
@@ -488,25 +487,28 @@ impl NymNode {
config.save()
}
pub async fn build_lp_tasks(
&self,
pub async fn build_lp_listener(
&mut self,
peer_registrator: Option<PeerRegistrator>,
mix_packet_sender: MixForwardingSender,
network_nodes: LpNodes,
) -> Result<LpSetup, NymNodeError> {
let lp_peer = LpLocalPeer::new(Ciphersuite::default(), self.x25519_lp_keys.clone())
.with_kem_keys(self.psq_kem_keys.clone());
LpSetup::new(
lp_peer,
self.config.lp,
self.metrics.clone(),
) -> Result<LpListener, NymNodeError> {
let handler_state = LpHandlerState {
local_lp_peer: LpLocalPeer::new(Ciphersuite::default(), self.x25519_lp_keys.clone())
.with_kem_keys(self.psq_kem_keys.clone()),
metrics: self.metrics.clone(),
peer_registrator,
network_nodes,
mix_packet_sender,
lp_config: self.config.lp,
outbound_mix_sender: mix_packet_sender,
session_states: Arc::new(dashmap::DashMap::new()),
forward_semaphore: Arc::new(Semaphore::new(
self.config.lp.debug.max_concurrent_forwards,
)),
};
Ok(LpListener::new(
handler_state,
self.shutdown_manager.shutdown_tracker().clone(),
)
.await
))
}
pub(crate) async fn new(config: Config) -> Result<Self, NymNodeError> {
@@ -686,7 +688,6 @@ impl NymNode {
async fn start_gateway_tasks(
&mut self,
cached_network: CachedNetwork,
lp_nodes: LpNodes,
metrics_sender: MetricEventsSender,
active_clients_store: ActiveClientsStore,
mix_packet_sender: MixForwardingSender,
@@ -769,10 +770,11 @@ impl NymNode {
"starting the LP listener on {} (data handler on: {})",
self.config.lp.control_bind_address, self.config.lp.data_bind_address,
);
let lp_tasks = self
.build_lp_tasks(wg_peer_registrator.clone(), mix_packet_sender, lp_nodes)
let mut lp_listener = self
.build_lp_listener(wg_peer_registrator.clone(), mix_packet_sender)
.await?;
lp_tasks.start_tasks();
self.shutdown_tracker()
.try_spawn_named(async move { lp_listener.run().await }, "LpListener");
} else {
info!("node not running in entry mode: the websocket and LP will remain closed");
}
@@ -1359,7 +1361,6 @@ impl NymNode {
let network_refresher = self.build_network_refresher().await?;
let active_clients_store = ActiveClientsStore::new();
let lp_nodes = network_refresher.lp_nodes();
let bloomfilters_manager = self.setup_replay_detection().await?;
@@ -1386,7 +1387,6 @@ impl NymNode {
self.start_gateway_tasks(
network_refresher.cached_network(),
lp_nodes,
metrics_sender,
active_clients_store,
mix_packet_sender,
+2 -11
View File
@@ -3,7 +3,6 @@
use crate::error::NymNodeError;
use crate::node::key_rotation::active_keys::ActiveSphinxKeys;
use crate::node::lp::directory::LpNodes;
use crate::node::routing_filter::network_filter::NetworkRoutingFilter;
use async_trait::async_trait;
use nym_crypto::asymmetric::ed25519;
@@ -21,7 +20,7 @@ use nym_topology::{
use nym_validator_client::ValidatorClientError;
use nym_validator_client::nym_api::NymApiClientExt;
use nym_validator_client::nym_nodes::{
NodesByAddressesResponse, SemiSkimmedNodeV1, SemiSkimmedNodesWithMetadata,
NodesByAddressesResponse, SemiSkimmedNode, SemiSkimmedNodesWithMetadata,
};
use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, SocketAddr};
@@ -201,7 +200,7 @@ impl CachedNetwork {
struct CachedNetworkInner {
rewarded_set: EpochRewardedSet,
topology_metadata: NymTopologyMetadata,
network_nodes: Vec<SemiSkimmedNodeV1>,
network_nodes: Vec<SemiSkimmedNode>,
}
pub struct NetworkRefresher {
@@ -213,7 +212,6 @@ pub struct NetworkRefresher {
network: CachedNetwork,
routing_filter: NetworkRoutingFilter,
noise_view: NoiseNetworkView,
lp_nodes: LpNodes,
}
impl NetworkRefresher {
@@ -242,7 +240,6 @@ impl NetworkRefresher {
network: CachedNetwork::new_empty(),
routing_filter: NetworkRoutingFilter::new_empty(testnet),
noise_view: NoiseNetworkView::new_empty(),
lp_nodes: Default::default(),
};
this.obtain_initial_network().await?;
@@ -338,8 +335,6 @@ impl NetworkRefresher {
.collect::<HashMap<_, _>>();
self.noise_view.swap_view(noise_nodes);
warn!("unimplemented LP nodes update");
let mut network_guard = self.network.inner.write().await;
network_guard.topology_metadata = metadata.to_topology_metadata();
network_guard.network_nodes = nodes;
@@ -378,10 +373,6 @@ impl NetworkRefresher {
self.noise_view.clone()
}
pub(crate) fn lp_nodes(&self) -> LpNodes {
self.lp_nodes.clone()
}
pub(crate) async fn run(&mut self) {
let mut full_refresh_interval = interval(self.full_refresh_interval);
full_refresh_interval.reset();
+2 -3
View File
@@ -64,6 +64,7 @@ impl LpBasedRegistrationClient {
tracing::debug!("Exit gateway LP address: {exit_address}");
// Generate fresh x25519 keypairs for LP registration
// TODO: persist them for the duration of the sessions
let entry_lp_keypair = Arc::new(DHKeyPair::new(&mut rand09::rng()));
let exit_lp_keypair = Arc::new(DHKeyPair::new(&mut rand09::rng()));
@@ -99,7 +100,7 @@ impl LpBasedRegistrationClient {
tracing::info!("Registering with exit gateway via entry forwarding");
let mut nested_session = NestedLpSession::new(
exit_address,
exit_lp_keypair.clone(),
exit_lp_keypair,
exit_peer,
exit_ciphersuite,
exit_lp_protocol,
@@ -152,8 +153,6 @@ impl LpBasedRegistrationClient {
Ok(RegistrationResult::Lp(Box::new(LpRegistrationResult {
entry_gateway_data,
exit_gateway_data,
entry_lp_keypair,
exit_lp_keypair,
bw_controller: self.bandwidth_controller,
})))
}
+68 -50
View File
@@ -5,16 +5,16 @@
use super::config::LpRegistrationConfig;
use super::error::{LpClientError, Result};
use crate::lp_client::helpers::{LpDataDeliverExt, LpDataSendExt, exponential_backoff_with_jitter};
use crate::lp_client::helpers::{LpDataDeliverExt, LpDataSendExt};
use crate::lp_client::nested_session::connection::NestedConnection;
use crate::lp_client::session_helpers::{extract_forwarded_response, prepare_send_packet};
use crate::lp_client::state_machine_helpers::{extract_forwarded_response, prepare_send_packet};
use nym_bandwidth_controller::{BandwidthTicketProvider, DEFAULT_TICKETS_TO_SPEND};
use nym_credentials_interface::TicketType;
use nym_crypto::asymmetric::{ed25519, x25519};
use nym_lp::LpTransportSession;
use nym_lp::LpSession;
use nym_lp::peer::{DHKeyPair, LpLocalPeer, LpRemotePeer};
use nym_lp::peer_config::LpReceiverIndex;
use nym_lp::psq::initiator::HandshakeMode;
use nym_lp::state_machine::LpStateMachine;
use nym_lp::transport::traits::LpTransportChannel;
use nym_lp::transport::{LpHandshakeChannel, LpTransportError};
use nym_lp::{Ciphersuite, packet::EncryptedLpPacket, packet::version};
@@ -24,12 +24,12 @@ use nym_registration_common::{
WireguardRegistrationData,
};
use nym_wireguard_types::PeerPublicKey;
use rand09::{CryptoRng, RngCore};
use rand09::{CryptoRng, Rng, RngCore};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tracing::{debug, warn};
use tracing::warn;
/// LP (Lewes Protocol) registration client for direct gateway connections.
///
@@ -58,9 +58,9 @@ pub struct LpRegistrationClient<S = TcpStream> {
/// Included in case we have to downgrade our version.
gateway_supported_lp_protocol_version: u8,
/// LP transport session
/// LP state machine for managing connection lifecycle.
/// Created during handshake initiation.
transport_session: Option<LpTransportSession>,
state_machine: Option<LpStateMachine>,
/// Configuration for timeouts and TCP parameters.
pub(crate) config: LpRegistrationConfig,
@@ -110,7 +110,7 @@ where
gateway_lp_peer,
gateway_lp_address,
gateway_supported_lp_protocol_version: lp_protocol,
transport_session: None,
state_machine: None,
config,
stream: None,
}
@@ -153,14 +153,14 @@ where
)
}
pub(crate) fn transport_session(&self) -> Result<&LpTransportSession> {
self.transport_session
pub(crate) fn state_machine(&self) -> Result<&LpStateMachine> {
self.state_machine
.as_ref()
.ok_or(LpClientError::IncompleteHandshake)
}
pub(crate) fn transport_session_mut(&mut self) -> Result<&mut LpTransportSession> {
self.transport_session
pub(crate) fn state_machine_mut(&mut self) -> Result<&mut LpStateMachine> {
self.state_machine
.as_mut()
.ok_or(LpClientError::IncompleteHandshake)
}
@@ -171,7 +171,7 @@ where
/// Returns whether the client has completed the handshake and is ready for registration.
pub fn is_handshake_complete(&self) -> bool {
self.transport_session.is_some()
self.state_machine.is_some()
}
/// Returns the gateway LP address this client is configured for.
@@ -392,18 +392,18 @@ where
let protocol_version = self.gateway_supported_lp_protocol_version;
let connection = self.stream_mut()?;
let session = LpTransportSession::psq_handshake_initiator(
// TODO:
let session = LpSession::psq_handshake_initiator(
connection,
local_peer,
remote_peer,
protocol_version,
HandshakeMode::OneWayEntry,
)?
)
.complete_handshake()
.await?;
// Store the state machine (with established session) for later use
self.transport_session = Some(session);
self.state_machine = Some(LpStateMachine::new(session));
Ok(())
}
@@ -461,7 +461,7 @@ where
let lp_data = request.to_lp_data()?;
// 4. Encrypt and prepare packet via state machine
let state_machine = self.transport_session_mut()?;
let state_machine = self.state_machine_mut()?;
let request_packet = prepare_send_packet(lp_data, state_machine)?;
// 5. Send initial request and receive response on persistent connection with timeout
@@ -473,7 +473,7 @@ where
.await?;
// 6. Decrypt via state machine (re-borrow)
let state_machine = self.transport_session_mut()?;
let state_machine = self.state_machine_mut()?;
let received_data = extract_forwarded_response(response_packet, state_machine)?;
// 7. Extract decrypted data and deserialise the response
@@ -505,7 +505,7 @@ where
/// sends the registration request, and receives the response
/// on the same underlying connection.
/// Do note that this method does **not** perform retries on network failures,
/// for that please use [`Self::handshake_and_register_with_retry`] instead
/// for that please use [`Self::register_with_retry`] instead
///
/// # Arguments
/// * `rng` - RNG instance for generating PSK
@@ -548,7 +548,7 @@ where
let lp_data = request.to_lp_data()?;
// 3. Encrypt and prepare packet via state machine
let state_machine = self.transport_session_mut()?;
let state_machine = self.state_machine_mut()?;
let request_packet = prepare_send_packet(lp_data, state_machine)?;
// 4. Send initial request and receive response on persistent connection with timeout
@@ -560,7 +560,7 @@ where
.await?;
// 5. Decrypt via state machine (re-borrow)
let state_machine = self.transport_session_mut()?;
let state_machine = self.state_machine_mut()?;
let received_data = extract_forwarded_response(response_packet, state_machine)?;
// 6. Extract decrypted data and deserialise the response
@@ -633,7 +633,7 @@ where
/// # Note
/// Unlike `register()`, this method handles the full flow including handshake.
/// Do NOT call `perform_handshake()` before this method.
pub async fn handshake_and_register_with_retry<R>(
pub async fn register_with_retry<R>(
&mut self,
rng: &mut R,
wg_keypair: &x25519::KeyPair,
@@ -647,44 +647,59 @@ where
{
tracing::debug!("Starting resilient registration (max_retries={max_retries})",);
// attempt to perform handshake with retries
let mut last_error = None;
for attempt in 0..=max_retries {
let attempt_display = attempt + 1;
debug!("registration attempt {attempt_display}");
if attempt > 0 {
// Clear any stale state before re-handshaking
self.transport_session = None;
self.close();
exponential_backoff_with_jitter(attempt).await
// Exponential backoff with jitter: 100ms, 200ms, 400ms, 800ms, 1600ms (capped)
let base_delay_ms = 100u64 * (1 << attempt.min(4));
let jitter_ms: u64 = rand09::rng().random_range(0..(base_delay_ms / 4 + 1));
let delay = std::time::Duration::from_millis(base_delay_ms + jitter_ms);
tracing::info!("Retrying registration (attempt {attempt_display}) after {delay:?}");
tokio::time::sleep(delay).await;
}
match self.perform_handshake().await {
Ok(_) => break,
Err(e) => {
// Ensure fresh connection and handshake for each attempt
// (On retry, the old connection/session may be dead)
if self.stream.is_none() || attempt > 0 {
// Clear any stale state before re-handshaking
self.close();
self.state_machine = None;
if let Err(e) = self.perform_handshake().await {
tracing::warn!("Handshake failed on attempt {attempt_display}: {e}");
last_error = Some(e);
continue;
}
}
match self
.register_dvpn(
rng,
wg_keypair,
gateway_identity,
bandwidth_controller,
ticket_type,
)
.await
{
Ok(data) => {
if attempt > 0 {
tracing::info!("Registration succeeded on retry attempt {attempt_display}");
}
return Ok(data);
}
Err(e) => {
tracing::warn!("Registration attempt {attempt_display} failed: {e}");
last_error = Some(e);
}
}
}
if self.transport_session.is_none() {
return Err(last_error.unwrap_or(LpClientError::RegistrationFailure {
message: "Registration failed after all retries".to_string(),
}));
}
self.register_dvpn(
rng,
wg_keypair,
gateway_identity,
bandwidth_controller,
ticket_type,
)
.await
.inspect_err(|e| tracing::warn!("Registration failed: {e}"))
Err(last_error.unwrap_or(LpClientError::RegistrationFailure {
message: "Registration failed after all retries".to_string(),
}))
}
/// Get the LP session ID (receiver_idx) for this client.
@@ -698,7 +713,10 @@ where
/// # Errors
/// Returns an error if handshake has not been completed.
pub fn session_id(&self) -> Result<LpReceiverIndex> {
Ok(self.transport_session()?.receiver_index())
self.state_machine()?
.session()
.map(|s| s.receiver_index())
.map_err(Into::into)
}
}
@@ -6,7 +6,7 @@
use nym_lp::LpError;
use nym_lp::packet::MalformedLpPacketError;
use nym_lp::packet::message::LpMessageType;
use nym_lp::session::LpAction;
use nym_lp::state_machine::LpAction;
use nym_lp::transport::LpTransportError;
use thiserror::Error;
@@ -33,6 +33,9 @@ pub enum LpClientError {
#[error(transparent)]
LpProtocolError(#[from] LpError),
#[error("no action has been emitted from the LP State Machine")]
UnexpectedStateMachineHalt,
#[error("the state machine instructed an unexpected action: {action:?}")]
UnexpectedStateMachineAction { action: LpAction },
@@ -7,11 +7,10 @@ use crate::LpClientError;
use nym_lp::packet::message::LpMessageType;
use nym_lp::packet::{ForwardPacketData, LpMessage};
use nym_lp::peer::LpRemotePeer;
use nym_lp::session::{LpAction, LpInput};
use nym_lp::state_machine::{LpAction, LpInput};
use nym_registration_common::{
LpRegistrationRequest, LpRegistrationResponse, NymNodeLPInformation,
};
use rand09::Rng;
pub(crate) trait LpDataSendExt {
fn to_lp_data(&self) -> Result<LpMessage, LpClientError>;
@@ -84,12 +83,3 @@ pub(crate) fn try_convert_forward_response(action: LpAction) -> Result<Vec<u8>,
pub(crate) fn to_lp_remote_peer(data: NymNodeLPInformation) -> LpRemotePeer {
LpRemotePeer::new(data.x25519).with_key_digests(data.expected_kem_key_hashes)
}
pub(crate) async fn exponential_backoff_with_jitter(attempt: u32) {
// Exponential backoff with jitter: 100ms, 200ms, 400ms, 800ms, 1600ms (capped)
let base_delay_ms = 100u64 * (1 << attempt.min(4));
let jitter_ms: u64 = rand09::rng().random_range(0..(base_delay_ms / 4 + 1));
let delay = std::time::Duration::from_millis(base_delay_ms + jitter_ms);
tracing::info!("Retrying registration after the following delay {delay:?}");
tokio::time::sleep(delay).await;
}
+1 -1
View File
@@ -36,7 +36,7 @@ mod config;
pub(crate) mod error;
pub(crate) mod helpers;
mod nested_session;
mod session_helpers;
mod state_machine_helpers;
pub use client::LpRegistrationClient;
pub use config::LpRegistrationConfig;
@@ -6,7 +6,7 @@ use crate::{LpClientError, LpRegistrationClient};
use bytes::{BufMut, BytesMut};
use nym_lp::KEM;
use nym_lp::packet::{EncryptedLpPacket, ForwardPacketData, message::ExpectedResponseSize};
use nym_lp::session::{LpAction, LpInput};
use nym_lp::state_machine::{LpAction, LpInput};
use nym_lp::transport::traits::{HandshakeMessage, LpTransportChannel};
use nym_lp::transport::{LpHandshakeChannel, LpTransportError};
use std::io;
@@ -67,9 +67,11 @@ impl<'a, S> NestedConnection<'a, S> {
let input = convert_forward_data(data)?;
// 2. Encrypt and prepare packet via state machine
let state_machine = self.outer_client.transport_session_mut()?;
let state_machine = self.outer_client.state_machine_mut()?;
let action = state_machine.process_input(input)?;
let action = state_machine
.process_input(input)
.ok_or(LpClientError::UnexpectedStateMachineHalt)??;
let forward_packet = match action {
LpAction::SendPacket(packet) => packet,
@@ -100,8 +102,10 @@ impl<'a, S> NestedConnection<'a, S> {
.map_err(|_| LpClientError::ConnectionTimeout)??;
// 2. Decrypt via state machine (re-borrow)
let state_machine = self.outer_client.transport_session_mut()?;
let action = state_machine.process_input(LpInput::ReceivePacket(response_packet))?;
let state_machine = self.outer_client.state_machine_mut()?;
let action = state_machine
.process_input(LpInput::ReceivePacket(response_packet))
.ok_or(LpClientError::UnexpectedStateMachineHalt)??;
// 3. Extract decrypted response data
let response_data = try_convert_forward_response(action)?;
@@ -20,25 +20,25 @@
use super::client::LpRegistrationClient;
use super::error::{LpClientError, Result};
use crate::lp_client::helpers::{LpDataDeliverExt, LpDataSendExt, exponential_backoff_with_jitter};
use crate::lp_client::session_helpers::{extract_forwarded_response, prepare_send_packet};
use crate::lp_client::helpers::{LpDataDeliverExt, LpDataSendExt};
use crate::lp_client::state_machine_helpers::{extract_forwarded_response, prepare_send_packet};
use nym_bandwidth_controller::{BandwidthTicketProvider, DEFAULT_TICKETS_TO_SPEND};
use nym_credentials_interface::TicketType;
use nym_crypto::asymmetric::{ed25519, x25519};
use nym_lp::packet::version;
use nym_lp::packet::{EncryptedLpPacket, LpMessage};
use nym_lp::peer::{DHKeyPair, LpLocalPeer, LpRemotePeer};
use nym_lp::psq::initiator::HandshakeMode;
use nym_lp::state_machine::LpStateMachine;
use nym_lp::transport::LpHandshakeChannel;
use nym_lp::transport::traits::LpTransportChannel;
use nym_lp::{Ciphersuite, KEM, LpTransportSession};
use nym_lp::{Ciphersuite, KEM, LpSession};
use nym_registration_common::dvpn::LpDvpnRegistrationResponseMessageContent;
use nym_registration_common::{
LpRegistrationRequest, LpRegistrationResponse, WireguardConfiguration,
WireguardRegistrationData,
};
use nym_wireguard_types::PeerPublicKey;
use rand09::{CryptoRng, RngCore};
use rand09::{CryptoRng, Rng, RngCore};
use std::net::SocketAddr;
use std::sync::Arc;
use tracing::{debug, warn};
@@ -79,8 +79,8 @@ pub struct NestedLpSession {
/// Included in case we have to downgrade our version.
gateway_supported_lp_protocol_version: u8,
/// LP transport session for exit gateway session (populated after handshake)
transport_session: Option<LpTransportSession>,
/// LP state machine for exit gateway session (populated after handshake)
state_machine: Option<LpStateMachine>,
}
impl NestedLpSession {
@@ -116,12 +116,12 @@ impl NestedLpSession {
lp_local_peer,
gateway_lp_peer,
gateway_supported_lp_protocol_version: lp_protocol,
transport_session: None,
state_machine: None,
}
}
fn state_machine_mut(&mut self) -> Result<&mut LpTransportSession> {
self.transport_session
fn state_machine_mut(&mut self) -> Result<&mut LpStateMachine> {
self.state_machine
.as_mut()
.ok_or(LpClientError::IncompleteHandshake)
}
@@ -181,18 +181,17 @@ impl NestedLpSession {
let remote_peer = self.gateway_lp_peer.clone();
let protocol_version = self.gateway_supported_lp_protocol_version;
let session = LpTransportSession::psq_handshake_initiator(
let session = LpSession::psq_handshake_initiator(
&mut nested_connection,
local_peer,
remote_peer,
protocol_version,
HandshakeMode::OneWayExit,
)?
)
.complete_handshake()
.await?;
// Store the state machine (with established session) for later use
self.transport_session = Some(session);
self.state_machine = Some(LpStateMachine::new(session));
debug!("completed nested handshake");
Ok(())
}
@@ -286,7 +285,7 @@ impl NestedLpSession {
LpDvpnRegistrationResponseMessageContent::RegistrationFailure(res) => {
let reason = res.error;
// the registration has failed
warn!("Gateway rejected registration: {reason}");
tracing::warn!("Gateway rejected registration: {reason}");
Err(LpClientError::RegistrationRejected { reason })
}
LpDvpnRegistrationResponseMessageContent::CompletedRegistration(res) => Ok(res.config),
@@ -381,7 +380,7 @@ impl NestedLpSession {
LpDvpnRegistrationResponseMessageContent::RegistrationFailure(res) => {
let reason = res.error;
// the registration has failed
warn!("Gateway rejected registration: {reason}");
tracing::warn!("Gateway rejected registration: {reason}");
return Err(LpClientError::RegistrationRejected { reason });
}
LpDvpnRegistrationResponseMessageContent::CompletedRegistration(res) => res.config,
@@ -436,7 +435,7 @@ impl NestedLpSession {
/// - Forwarding through entry gateway fails
/// - Response decryption/deserialization fails
/// - Gateway rejects the registration
pub async fn handshake_and_register_dvpn<S, R>(
pub(crate) async fn handshake_and_register_dvpn<S, R>(
&mut self,
outer_client: &mut LpRegistrationClient<S>,
rng: &mut R,
@@ -511,49 +510,60 @@ impl NestedLpSession {
max_retries
);
// attempt to perform handshake with retries
let mut last_error = None;
for attempt in 0..=max_retries {
let attempt_display = attempt + 1;
debug!("registration attempt {attempt_display}");
// Verify outer session is still usable before retry
if !outer_client.is_handshake_complete() {
return Err(LpClientError::Other(
"Outer session lost during retry - caller must re-establish entry gateway connection".to_string()
));
}
if attempt > 0 {
// Verify outer session is still usable before retry
if !outer_client.is_handshake_complete() {
return Err(LpClientError::Other(
"Outer session lost during retry - caller must re-establish entry gateway connection".to_string()
));
}
// Exponential backoff with jitter: 100ms, 200ms, 400ms, 800ms, 1600ms (capped)
let base_delay_ms = 100u64 * (1 << attempt.min(4));
let jitter_ms: u64 = rand09::rng().random_range(0..(base_delay_ms / 4 + 1));
let delay = std::time::Duration::from_millis(base_delay_ms + jitter_ms);
tracing::info!(
"Retrying exit registration (attempt {}) after {:?}",
attempt + 1,
delay
);
tokio::time::sleep(delay).await;
// Clear state machine before retry - handshake needs fresh start
self.transport_session = None;
exponential_backoff_with_jitter(attempt).await
self.state_machine = None;
}
match self.perform_handshake(outer_client).await {
Ok(_) => break,
match self
.handshake_and_register_dvpn(
outer_client,
rng,
wg_keypair,
gateway_identity,
bandwidth_controller,
ticket_type,
)
.await
{
Ok(data) => {
if attempt > 0 {
tracing::info!(
"Exit registration succeeded on retry attempt {}",
attempt + 1
);
}
return Ok(data);
}
Err(e) => {
warn!("Handshake failed on attempt {attempt_display}: {e}");
tracing::warn!("Exit registration attempt {} failed: {}", attempt + 1, e);
last_error = Some(e);
}
}
}
if self.transport_session.is_none() {
return Err(last_error.unwrap_or(LpClientError::RegistrationFailure {
message: "Exit Registration failed after all retries".to_string(),
}));
}
self.register_dvpn(
outer_client,
rng,
wg_keypair,
gateway_identity,
bandwidth_controller,
ticket_type,
)
.await
.inspect_err(|e| warn!("Exit Registration failed: {e}"))
Err(last_error.unwrap_or(LpClientError::RegistrationFailure {
message: "Exit Registration failed after all retries".to_string(),
}))
}
}
@@ -3,16 +3,18 @@
use crate::LpClientError;
use nym_lp::packet::LpMessage;
use nym_lp::session::{LpAction, LpInput};
use nym_lp::{LpTransportSession, packet::EncryptedLpPacket};
use nym_lp::state_machine::{LpAction, LpInput};
use nym_lp::{LpStateMachine, packet::EncryptedLpPacket};
/// Attempt to prepare the provided data for sending by wrapping it in appropriate `LpAction`,
/// and attempting to extract `EncryptedLpPacket` from the provided state machine.
pub(crate) fn prepare_send_packet(
data: LpMessage,
state_machine: &mut LpTransportSession,
state_machine: &mut LpStateMachine,
) -> Result<EncryptedLpPacket, LpClientError> {
let action = state_machine.process_input(LpInput::SendData(data))?;
let action = state_machine
.process_input(LpInput::SendData(data))
.ok_or(LpClientError::UnexpectedStateMachineHalt)??;
match action {
LpAction::SendPacket(packet) => Ok(packet),
@@ -24,9 +26,11 @@ pub(crate) fn prepare_send_packet(
/// using the provided state machine.
pub(crate) fn extract_forwarded_response(
response_packet: EncryptedLpPacket,
state_machine: &mut LpTransportSession,
state_machine: &mut LpStateMachine,
) -> Result<LpMessage, LpClientError> {
let action = state_machine.process_input(LpInput::ReceivePacket(response_packet))?;
let action = state_machine
.process_input(LpInput::ReceivePacket(response_packet))
.ok_or(LpClientError::UnexpectedStateMachineHalt)??;
match action {
LpAction::DeliverData(data) => Ok(data),
-12
View File
@@ -3,10 +3,8 @@
use nym_authenticator_client::{AuthClientMixnetListenerHandle, AuthenticatorClient};
use nym_bandwidth_controller::BandwidthTicketProvider;
use nym_lp::peer::DHKeyPair;
use nym_registration_common::{AssignedAddresses, WireguardConfiguration};
use nym_sdk::mixnet::{EventReceiver, MixnetClient};
use std::sync::Arc;
pub enum RegistrationResult {
Mixnet(Box<MixnetRegistrationResult>),
@@ -38,8 +36,6 @@ pub struct WireguardRegistrationResult {
/// # Fields
/// * `entry_gateway_data` - WireGuard configuration from entry gateway
/// * `exit_gateway_data` - WireGuard configuration from exit gateway
/// * `entry_lp_keypair` - x25519 keypair used on the entry LP channel (persist to resume a pre-established session)
/// * `exit_lp_keypair` - x25519 keypair used on the exit LP channel (persist to resume a pre-established session)
/// * `bw_controller` - Bandwidth ticket provider for credential management
pub struct LpRegistrationResult {
/// Gateway configuration data from entry gateway
@@ -48,14 +44,6 @@ pub struct LpRegistrationResult {
/// Gateway configuration data from exit gateway
pub exit_gateway_data: WireguardConfiguration,
/// x25519 keypair used on the entry channel.
/// the purpose of persisting those keys is to be able to resume the pre-established session
pub entry_lp_keypair: Arc<DHKeyPair>,
/// x25519 keypair used on the exit channel
/// the purpose of persisting those keys is to be able to resume the pre-established session
pub exit_lp_keypair: Arc<DHKeyPair>,
/// Bandwidth controller for credential management
pub bw_controller: Box<dyn BandwidthTicketProvider>,
}
+217
View File
@@ -0,0 +1,217 @@
#!/usr/bin/env python3
import json
import pathlib
import subprocess
import sys
from collections import defaultdict
def dependency_section(dep):
kind = dep.get("kind") or "normal"
section = {
"normal": "dependencies",
"dev": "dev-dependencies",
"build": "build-dependencies",
}.get(kind, f"{kind}-dependencies")
target = dep.get("target")
if target:
return f"target.{target}.{section}"
return section
def manifest_member(root, manifest_path):
manifest_parent = pathlib.Path(manifest_path).resolve().parent
try:
return str(manifest_parent.relative_to(root))
except ValueError:
return str(manifest_parent)
def publish_status(pkg):
publish = pkg.get("publish")
if publish is None:
return True, "publishable to crates.io"
if isinstance(publish, list):
if not publish:
return False, "publish disabled (`publish = false`)"
if "crates-io" in publish:
return True, "publishable to crates.io"
registries = ", ".join(publish)
return False, f"publish restricted to non-crates.io registries ({registries})"
return False, f"unrecognized `publish` setting: {publish!r}"
def main():
root = pathlib.Path(".").resolve()
metadata = json.loads(
subprocess.check_output(
["cargo", "metadata", "--no-deps", "--format-version", "1"],
text=True,
)
)
packages_by_id = {pkg["id"]: pkg for pkg in metadata["packages"]}
workspace_ids = set(metadata["workspace_members"])
workspace_packages = [
packages_by_id[pkg_id] for pkg_id in workspace_ids if pkg_id in packages_by_id
]
workspace_by_name = {pkg["name"]: pkg for pkg in workspace_packages}
workspace_dir_to_name = {
str(pathlib.Path(pkg["manifest_path"]).resolve().parent): pkg["name"]
for pkg in workspace_packages
}
package_info = {}
for pkg in workspace_packages:
name = pkg["name"]
member = manifest_member(root, pkg["manifest_path"])
explicitly_publishable, publish_reason = publish_status(pkg)
package_info[name] = {
"pkg": pkg,
"member": member,
"explicitly_publishable": explicitly_publishable,
"publish_reason": publish_reason,
}
direct_issues = defaultdict(set)
workspace_deps = defaultdict(list)
for name, info in package_info.items():
pkg = info["pkg"]
member = info["member"]
explicitly_publishable = info["explicitly_publishable"]
if not explicitly_publishable:
direct_issues[name].add(info["publish_reason"])
continue
for field in ("description", "license", "repository"):
value = pkg.get(field)
if not isinstance(value, str) or not value.strip():
direct_issues[name].add(f"missing required field '{field}'")
for dep in pkg.get("dependencies", []):
section = dependency_section(dep)
dep_name = dep["name"]
dep_source = dep.get("source")
dep_workspace_name = workspace_by_name.get(dep_name, {}).get("name")
dep_path = dep.get("path")
if dep_workspace_name is None and dep_path:
dep_workspace_name = workspace_dir_to_name.get(
str(pathlib.Path(dep_path).resolve())
)
if dep_path and dep.get("req") in ("*", ""):
direct_issues[name].add(
f"{section}: path dependency '{dep_name}' has no explicit version ({dep_path})"
)
if dep_workspace_name:
dep_info = package_info[dep_workspace_name]
if not dep_info["explicitly_publishable"]:
direct_issues[name].add(
f"{section}: depends on non-publishable workspace crate '{dep_workspace_name}' ({dep_info['publish_reason']})"
)
continue
workspace_deps[name].append((dep_workspace_name, section))
continue
if dep_source and not dep_source.startswith("registry+"):
direct_issues[name].add(
f"{section}: non-registry dependency '{dep_name}' from '{dep_source}'"
)
effective_issues = {}
def collect_effective_issues(crate_name, stack):
cached = effective_issues.get(crate_name)
if cached is not None:
return cached
issues = set(direct_issues.get(crate_name, set()))
stack = stack | {crate_name}
for dep_name, dep_section in workspace_deps.get(crate_name, []):
dep_info = package_info[dep_name]
if not dep_info["explicitly_publishable"]:
issues.add(
f"{dep_section}: depends on non-publishable workspace crate '{dep_name}' ({dep_info['publish_reason']})"
)
continue
if dep_name in stack:
continue
dep_issues = collect_effective_issues(dep_name, stack)
if dep_issues:
issues.add(
f"{dep_section}: depends on blocked workspace crate '{dep_name}'"
)
effective_issues[crate_name] = issues
return issues
for crate_name in package_info:
collect_effective_issues(crate_name, set())
publish_targets = sorted(
name for name, info in package_info.items() if info["explicitly_publishable"]
)
root_blockers = sorted(
name
for name in publish_targets
if direct_issues.get(name)
)
transitive_blocked = sorted(
name
for name in publish_targets
if not direct_issues.get(name) and effective_issues.get(name)
)
disabled_by_config = sorted(
name for name, info in package_info.items() if not info["explicitly_publishable"]
)
print("Publishability preflight report:")
print(f"- workspace crates inspected: {len(package_info)}")
print(f"- crates configured for crates.io publish: {len(publish_targets)}")
print(f"- root blockers (direct issues): {len(root_blockers)}")
print(f"- downstream blocked crates (transitive): {len(transitive_blocked)}")
print(f"- crates excluded by config (publish = false / restricted): {len(disabled_by_config)}")
if root_blockers:
print("\nAction required: root blockers")
for crate_name in root_blockers:
info = package_info[crate_name]
print(f"- {crate_name} ({info['member']})")
for issue in sorted(direct_issues[crate_name]):
print(f" - {issue}")
if transitive_blocked:
print("\nDownstream blocked crates")
print("- These crates have no direct issue; they are blocked by dependencies listed below.")
for crate_name in transitive_blocked:
info = package_info[crate_name]
blockers = set()
for dep_name, dep_section in workspace_deps.get(crate_name, []):
dep_info = package_info[dep_name]
if not dep_info["explicitly_publishable"] or effective_issues.get(dep_name):
blockers.add(f"{dep_name} via {dep_section}")
print(f"- {crate_name} ({info['member']})")
for blocker in sorted(blockers):
print(f" - blocked by {blocker}")
if root_blockers or transitive_blocked:
print("\nPreflight checks failed:")
print(f"- {len(root_blockers) + len(transitive_blocked)} crate(s) configured for crates.io publish are blocked.")
sys.exit(1)
print("\nPreflight checks passed.")
if __name__ == "__main__":
main()
+1 -1
View File
@@ -421,7 +421,7 @@ impl SpeedtestClient {
/// This is the primary method for sending data through the mixnet via the LP transport.
/// Requires `init_lp_session()` to be called first to establish the LP cryptographic session.
///
/// # Data Flow (see gateway/src/node/lp_listener/handler)
/// # Data Flow (see gateway/src/node/lp_listener/data_handler.rs)
/// ```text
/// LP Client → UDP:51264 → LP Data Handler → Mixnet Entry
/// LP(Sphinx) decrypt LP forward Sphinx
+2 -2
View File
@@ -7,7 +7,7 @@
use anyhow::{anyhow, bail, Context, Result};
use nym_api_requests::models::{LPHashFunction, LPKEM};
use nym_api_requests::nym_nodes::SkimmedNodeV1;
use nym_api_requests::nym_nodes::SkimmedNode;
use nym_crypto::asymmetric::ed25519;
use nym_http_api_client::UserAgent;
use nym_kkt_ciphersuite::{Ciphersuite, KEMKeyDigests, SignatureScheme, KEM};
@@ -189,7 +189,7 @@ impl SpeedtestTopology {
}
/// Extract gateway info for LP connections from a SkimmedNode
fn gateway_info_from_skimmed(node: &SkimmedNodeV1) -> Result<GatewayInfo> {
fn gateway_info_from_skimmed(node: &SkimmedNode) -> Result<GatewayInfo> {
todo!("insufficient information to convert into GatewayInfo")
// let first_ip = node
// .ip_addresses