LP: mixnet reg fixes (#6356)

* removed x25519 key used within LP mixnet registration

* use Vec<u8> rather than BytesMut for LpAction::DeliverData

* introduced an explicit kind prefix for raw data sent and received within LP

* review nits
This commit is contained in:
Jędrzej Stuczyński
2026-01-23 13:21:52 +00:00
committed by GitHub
parent a63a1e745e
commit e2be2b0b34
20 changed files with 558 additions and 535 deletions
Generated
+1
View File
@@ -7520,6 +7520,7 @@ dependencies = [
"nym-ip-packet-requests",
"nym-kkt-ciphersuite",
"nym-sphinx",
"nym-test-utils",
"nym-wireguard-types",
"serde",
"time",
+6 -1
View File
@@ -17,7 +17,7 @@ use crate::psk::{
};
use crate::replay::ReceivingKeyCounterValidator;
use crate::{LpError, LpMessage, LpPacket};
use nym_crypto::asymmetric::x25519;
use nym_crypto::asymmetric::{ed25519, x25519};
use nym_kkt::KKT_RESPONSE_AAD;
use nym_kkt::ciphersuite::{DecapsulationKey, EncapsulationKey};
use nym_kkt::context::KKTContext;
@@ -298,6 +298,11 @@ impl LpSession {
*self.local_peer.x25519.public_key()
}
/// Returns the remote ed25519 public key.
pub fn remote_ed25519_public(&self) -> ed25519::PublicKey {
self.remote_peer.ed25519_public
}
/// Returns the remote X25519 public key.
///
/// Used for tie-breaking in simultaneous subsession initiation.
+21 -25
View File
@@ -734,13 +734,13 @@ mod tests {
// Remove unused imports if SessionManager methods are no longer direct dependencies
// use crate::noise_protocol::{create_noise_state, create_noise_state_responder};
use crate::peer::mock_peers;
use crate::state_machine::LpData;
use crate::{
// Bring in state machine types
state_machine::{LpAction, LpInput, LpStateBare},
// message::LpMessage, // LpMessage likely still needed for LpInput/LpAction
// packet::{LpHeader, LpPacket, TRAILER_LEN}, // LpPacket needed for LpAction/LpInput
};
use bytes::Bytes;
// Use Bytes for SendData input
// Keep helper function for creating test packets if needed,
@@ -1058,13 +1058,13 @@ mod tests {
// --- 5. Simulate Data Transfer via process_input ---
println!("Starting data transfer simulation via process_input...");
let plaintext_a_to_b = b"Hello from A via process_input!";
let plaintext_b_to_a = b"Hello from B via process_input!";
let plaintext_a_to_b = LpData::new_opaque(b"Hello from A via process_input!".to_vec());
let plaintext_b_to_a = LpData::new_opaque(b"Hello from B via process_input!".to_vec());
// --- A sends to B ---
println!(" A sends to B");
let action_a_send = session_manager_1
.process_input(receiver_index, LpInput::SendData(plaintext_a_to_b.to_vec()))
.process_input(receiver_index, LpInput::SendData(plaintext_a_to_b.clone()))
.expect("A SendData should produce action")
.expect("A SendData failed");
@@ -1087,14 +1087,10 @@ mod tests {
.expect("B ReceivePacket (data) failed");
if let LpAction::DeliverData(data) = action_b_recv {
assert_eq!(
data,
Bytes::copy_from_slice(plaintext_a_to_b),
"Decrypted data mismatch A->B"
);
assert_eq!(data, plaintext_a_to_b, "Decrypted data mismatch A->B");
println!(
" B successfully decrypted: {:?}",
String::from_utf8_lossy(&data)
String::from_utf8_lossy(&data.content)
);
} else {
panic!("B ReceivePacket did not produce DeliverData");
@@ -1103,7 +1099,7 @@ mod tests {
// --- B sends to A ---
println!(" B sends to A");
let action_b_send = session_manager_2
.process_input(receiver_index, LpInput::SendData(plaintext_b_to_a.to_vec()))
.process_input(receiver_index, LpInput::SendData(plaintext_b_to_a.clone()))
.expect("B SendData should produce action")
.expect("B SendData failed");
@@ -1128,14 +1124,10 @@ mod tests {
.expect("A ReceivePacket (data) failed");
if let LpAction::DeliverData(data) = action_a_recv {
assert_eq!(
data,
Bytes::copy_from_slice(plaintext_b_to_a),
"Decrypted data mismatch B->A"
);
assert_eq!(data, plaintext_b_to_a, "Decrypted data mismatch B->A");
println!(
" A successfully decrypted: {:?}",
String::from_utf8_lossy(&data)
String::from_utf8_lossy(&data.content)
);
} else {
panic!("A ReceivePacket did not produce DeliverData");
@@ -1160,11 +1152,11 @@ mod tests {
println!("Testing out-of-order reception via process_input...");
// A prepares N+1 then N
let data_n_plus_1 = Bytes::from_static(b"Message N+1");
let data_n = Bytes::from_static(b"Message N");
let data_n_plus_1 = LpData::new_opaque(b"Message N+1".to_vec());
let data_n = LpData::new_opaque(b"Message N".to_vec());
let action_send_n1 = session_manager_1
.process_input(receiver_index, LpInput::SendData(data_n_plus_1.to_vec()))
.process_input(receiver_index, LpInput::SendData(data_n_plus_1.clone()))
.unwrap()
.unwrap();
let packet_n1 = match action_send_n1 {
@@ -1173,7 +1165,7 @@ mod tests {
};
let action_send_n = session_manager_1
.process_input(receiver_index, LpInput::SendData(data_n.to_vec()))
.process_input(receiver_index, LpInput::SendData(data_n.clone()))
.unwrap()
.unwrap();
let packet_n = match action_send_n {
@@ -1230,8 +1222,10 @@ mod tests {
);
// Further actions on A fail
let send_after_close_a =
session_manager_1.process_input(receiver_index, LpInput::SendData(b"fail".to_vec()));
let send_after_close_a = session_manager_1.process_input(
receiver_index,
LpInput::SendData(LpData::new_opaque(b"fail".to_vec())),
);
assert!(send_after_close_a.is_err());
assert!(matches!(
send_after_close_a.err().unwrap(),
@@ -1250,8 +1244,10 @@ mod tests {
);
// Further actions on B fail
let send_after_close_b =
session_manager_2.process_input(receiver_index, LpInput::SendData(b"fail".to_vec()));
let send_after_close_b = session_manager_2.process_input(
receiver_index,
LpInput::SendData(LpData::new_opaque(b"fail".to_vec())),
);
assert!(send_after_close_b.is_err());
assert!(matches!(
send_after_close_b.err().unwrap(),
+116 -21
View File
@@ -21,7 +21,8 @@ use crate::{
packet::LpPacket,
session::{LpSession, SubsessionHandshake},
};
use bytes::BytesMut;
use bytes::{Buf, Bytes};
use num_enum::{IntoPrimitive, TryFromPrimitive};
use std::mem;
use tracing::debug;
@@ -97,7 +98,7 @@ pub enum LpInput {
/// Received an LP Packet from the network.
ReceivePacket(LpPacket),
/// Application wants to send data (only valid in Transport state).
SendData(Vec<u8>), // Using Bytes for efficiency
SendData(LpData),
/// Close the connection.
Close,
/// Initiate a subsession handshake (only valid in Transport state).
@@ -111,7 +112,7 @@ pub enum LpAction {
/// Send an LP Packet over the network.
SendPacket(LpPacket),
/// Deliver decrypted application data received from the peer.
DeliverData(BytesMut),
DeliverData(LpData),
/// Inform the environment that KKT exchange completed successfully.
KKTComplete,
/// Inform the environment that the handshake is complete.
@@ -134,6 +135,75 @@ pub enum LpAction {
},
}
/// Represent application data being sent in Transport mode
#[derive(Debug, Clone, PartialEq)]
pub struct LpData {
pub kind: LpDataKind,
pub content: Bytes,
}
impl AsRef<[u8]> for LpData {
fn as_ref(&self) -> &[u8] {
&self.content
}
}
impl LpData {
pub fn new(kind: LpDataKind, content: impl Into<Bytes>) -> Self {
Self {
kind,
content: content.into(),
}
}
pub fn new_opaque(content: impl Into<Bytes>) -> Self {
Self::new(LpDataKind::Opaque, content)
}
pub fn new_registration(data: impl Into<Bytes>) -> Self {
Self::new(LpDataKind::Registration, data)
}
pub fn new_forward(data: impl Into<Bytes>) -> Self {
Self::new(LpDataKind::Forward, data)
}
pub fn to_vec(self) -> Vec<u8> {
self.into()
}
}
impl From<LpData> for Vec<u8> {
fn from(data: LpData) -> Self {
let mut out = Vec::with_capacity(data.content.len() + 1);
out.push(data.kind as u8);
out.extend_from_slice(data.content.as_ref());
out
}
}
impl TryFrom<Vec<u8>> for LpData {
type Error = LpError;
fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
let kind = LpDataKind::try_from(value[0]).map_err(|_| {
LpError::DeserializationError(format!("unknown data type: {}", value[0]))
})?;
let mut content = Bytes::from(value);
content.advance(1);
Ok(LpData::new(kind, content))
}
}
/// Represent kind of application data being sent in Transport mode
#[derive(Clone, Copy, PartialEq, Eq, Debug, IntoPrimitive, TryFromPrimitive)]
#[repr(u8)]
pub enum LpDataKind {
Opaque = 0,
Registration = 1,
Forward = 2,
}
/// The Lewes Protocol State Machine.
pub struct LpStateMachine {
pub state: LpState,
@@ -551,8 +621,17 @@ impl LpStateMachine {
LpState::Transport { session }
} else {
// 4. Deliver data
result_action = Some(Ok(LpAction::DeliverData(BytesMut::from(plaintext.as_slice()))));
LpState::Transport { session }
match plaintext.try_into() {
Ok(data) => {
result_action = Some(Ok(LpAction::DeliverData(data)));
LpState::Transport { session }
},
Err(e) => {
let reason = e.to_string();
result_action = Some(Err(e));
LpState::Closed { reason }
}
}
}
}
Err(e) => {
@@ -586,7 +665,7 @@ impl LpStateMachine {
}
(LpState::Transport { session }, LpInput::SendData(data)) => {
// Encrypt and send application data
match self.prepare_data_packet(&session, &data) {
match self.prepare_data_packet(&session, data) {
Ok(packet) => result_action = Some(Ok(LpAction::SendPacket(packet))),
Err(e) => {
// If prepare fails, should we close? Let's report error and stay Transport for now.
@@ -823,8 +902,16 @@ impl LpStateMachine {
result_action = Some(Err(e));
LpState::SubsessionHandshaking { session, subsession }
} else {
result_action = Some(Ok(LpAction::DeliverData(BytesMut::from(plaintext.as_slice()))));
LpState::SubsessionHandshaking { session, subsession }
match plaintext.try_into() {
Ok(data) => {
result_action = Some(Ok(LpAction::DeliverData(data)));
LpState::SubsessionHandshaking { session, subsession }
}
Err(err) => {
result_action = Some(Err(err));
LpState::SubsessionHandshaking { session, subsession }
}
}
}
}
Err(e) => {
@@ -890,7 +977,7 @@ impl LpStateMachine {
// Parent can still send data during subsession handshake
(LpState::SubsessionHandshaking { session, subsession }, LpInput::SendData(data)) => {
match self.prepare_data_packet(&session, &data) {
match self.prepare_data_packet(&session, data) {
Ok(packet) => result_action = Some(Ok(LpAction::SendPacket(packet))),
Err(e) => {
result_action = Some(Err(e.into()));
@@ -932,8 +1019,16 @@ impl LpStateMachine {
result_action = Some(Err(e));
LpState::ReadOnlyTransport { session }
} else {
result_action = Some(Ok(LpAction::DeliverData(BytesMut::from(plaintext.as_slice()))));
LpState::ReadOnlyTransport { session }
match plaintext.try_into() {
Ok(data) => {
result_action = Some(Ok(LpAction::DeliverData(data)));
LpState::ReadOnlyTransport { session }
}
Err(err) => {
result_action = Some(Err(err));
LpState::ReadOnlyTransport { session }
}
}
}
}
Err(e) => {
@@ -1037,9 +1132,9 @@ impl LpStateMachine {
fn prepare_data_packet(
&self,
session: &LpSession,
data: &[u8],
data: LpData,
) -> Result<LpPacket, NoiseError> {
let encrypted_message = session.encrypt_data(data)?;
let encrypted_message = session.encrypt_data(Vec::<u8>::from(data).as_ref())?;
session
.next_packet(encrypted_message)
.map_err(|e| NoiseError::Other(e.to_string())) // Improve error conversion?
@@ -1050,7 +1145,6 @@ impl LpStateMachine {
mod tests {
use super::*;
use crate::peer::mock_peers;
use bytes::Bytes;
#[test]
fn test_state_machine_init() {
@@ -1233,8 +1327,8 @@ mod tests {
// --- Transport Phase ---
println!("--- Step 8: Initiator sends data ---");
let data_to_send_1 = b"hello responder";
let init_actions_4 = initiator.process_input(LpInput::SendData(data_to_send_1.to_vec()));
let data_to_send_1 = LpData::new_opaque(b"hello responder".to_vec());
let init_actions_4 = initiator.process_input(LpInput::SendData(data_to_send_1.clone()));
let data_packet_1 = if let Some(Ok(LpAction::SendPacket(packet))) = init_actions_4 {
packet.clone()
} else {
@@ -1249,11 +1343,11 @@ mod tests {
} else {
panic!("Responder should deliver data");
};
assert_eq!(resp_data_1, Bytes::copy_from_slice(data_to_send_1));
assert_eq!(resp_data_1, data_to_send_1);
println!("--- Step 10: Responder sends data ---");
let data_to_send_2 = b"hello initiator";
let resp_actions_6 = responder.process_input(LpInput::SendData(data_to_send_2.to_vec()));
let data_to_send_2 = LpData::new_opaque(b"hello initiator".to_vec());
let resp_actions_6 = responder.process_input(LpInput::SendData(data_to_send_2.clone()));
let data_packet_2 = if let Some(Ok(LpAction::SendPacket(packet))) = resp_actions_6 {
packet.clone()
} else {
@@ -1264,7 +1358,7 @@ mod tests {
println!("--- Step 11: Initiator receives data ---");
let init_actions_5 = initiator.process_input(LpInput::ReceivePacket(data_packet_2));
if let Some(Ok(LpAction::DeliverData(data))) = init_actions_5 {
assert_eq!(data, Bytes::copy_from_slice(data_to_send_2));
assert_eq!(data, data_to_send_2);
} else {
panic!("Initiator should deliver data");
}
@@ -1414,7 +1508,8 @@ mod tests {
assert!(matches!(initiator.state, LpState::KKTExchange { .. }));
// Try SendData during KKT exchange (should be rejected)
let send_action = initiator.process_input(LpInput::SendData(vec![1, 2, 3]));
let send_data = LpData::new_opaque(vec![1, 2, 3]);
let send_action = initiator.process_input(LpInput::SendData(send_data));
assert!(matches!(
send_action,
Some(Err(LpError::InvalidStateTransition { .. }))
+1
View File
@@ -28,3 +28,4 @@ nym-kkt-ciphersuite = { path = "../nym-kkt-ciphersuite" }
[dev-dependencies]
bincode.workspace = true
time.workspace = true
nym-test-utils = { workspace = true }
+2 -1
View File
@@ -14,7 +14,8 @@ mod lp_messages;
mod serialisation;
pub use lp_messages::{
LpGatewayData, LpRegistrationRequest, LpRegistrationResponse, RegistrationMode,
LpDvpnRegistrationRequest, LpMixnetGatewayData, LpMixnetRegistrationRequest,
LpRegistrationData, LpRegistrationRequest, LpRegistrationResponse, RegistrationMode,
};
use nym_kkt_ciphersuite::{KEM, KEMKeyDigests};
pub use serialisation::BincodeError;
+59 -86
View File
@@ -3,17 +3,45 @@
//! LP (Lewes Protocol) registration message types shared between client and gateway.
use nym_credentials_interface::{CredentialSpendingData, TicketType};
use serde::{Deserialize, Serialize};
use std::net::IpAddr;
use crate::GatewayData;
use crate::serialisation::{BincodeError, BincodeOptions, lp_bincode_serializer};
use nym_credentials_interface::{CredentialSpendingData, TicketType};
use nym_crypto::asymmetric::ed25519;
use serde::{Deserialize, Serialize};
/// Registration request sent by client after LP handshake
/// Aligned with existing authenticator registration flow
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LpRegistrationRequest {
/// Mode specific registration data
pub registration_data: LpRegistrationData,
/// Unix timestamp for replay protection
pub timestamp: u64,
}
impl LpRegistrationRequest {
pub fn mode(&self) -> RegistrationMode {
match self.registration_data {
LpRegistrationData::Dvpn { .. } => RegistrationMode::Dvpn,
LpRegistrationData::Mixnet { .. } => RegistrationMode::Mixnet,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LpRegistrationData {
/// dVPN mode - register as WireGuard peer (most common)
Dvpn {
data: Box<LpDvpnRegistrationRequest>,
},
/// Mixnet mode - register for mixnet routing via IPR
Mixnet { data: LpMixnetRegistrationRequest },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LpDvpnRegistrationRequest {
/// Client's WireGuard public key (for dVPN mode)
pub wg_public_key: nym_wireguard_types::PeerPublicKey,
@@ -22,15 +50,14 @@ pub struct LpRegistrationRequest {
/// Ticket type for bandwidth allocation
pub ticket_type: TicketType,
}
/// Registration mode
pub mode: RegistrationMode,
/// Client's IP address (for tracking/metrics)
pub client_ip: IpAddr,
/// Unix timestamp for replay protection
pub timestamp: u64,
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LpMixnetRegistrationRequest {
/// Client's ed25519 public key (identity)
///
/// Used to derive DestinationAddressBytes for ActiveClientsStore lookup.
pub client_ed25519_pubkey: ed25519::PublicKey,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -39,22 +66,7 @@ pub enum RegistrationMode {
Dvpn,
/// Mixnet mode - register for mixnet routing via IPR
///
/// Client provides identity and encryption keys for nym address derivation.
/// Gateway stores client in ActiveClientsStore for SURB reply delivery.
Mixnet {
/// Client's ed25519 public key (identity)
///
/// Used to derive DestinationAddressBytes for ActiveClientsStore lookup.
/// Must match the key used in LP handshake for authentication.
client_ed25519_pubkey: [u8; 32],
/// Client's x25519 public key (encryption)
///
/// Used for SURB reply encryption. Combined with ed25519 identity
/// and gateway identity to form the full nym Recipient address.
client_x25519_pubkey: [u8; 32],
},
Mixnet,
}
/// Gateway data for mixnet mode registration
@@ -62,16 +74,12 @@ pub enum RegistrationMode {
/// Contains the gateway's identity and sphinx key needed for the client
/// to construct its full nym Recipient address.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LpGatewayData {
pub struct LpMixnetGatewayData {
/// Gateway's ed25519 identity public key
///
/// Forms part of the client's nym Recipient address.
pub gateway_identity: [u8; 32],
/// Gateway's x25519 sphinx public key
///
/// Used by the client for Sphinx packet construction.
pub gateway_sphinx_key: [u8; 32],
pub gateway_identity: ed25519::PublicKey,
// TODO: what we really need in here is the address of internal IPR
}
/// Registration response from gateway
@@ -92,7 +100,7 @@ pub struct LpRegistrationResponse {
///
/// Contains gateway identity and sphinx key needed for nym address construction.
/// Only populated for Mixnet mode registrations.
pub lp_gateway_data: Option<LpGatewayData>,
pub lp_gateway_data: Option<LpMixnetGatewayData>,
/// Allocated bandwidth in bytes
pub allocated_bandwidth: i64,
@@ -104,14 +112,15 @@ impl LpRegistrationRequest {
wg_public_key: nym_wireguard_types::PeerPublicKey,
credential: CredentialSpendingData,
ticket_type: TicketType,
client_ip: IpAddr,
) -> Self {
Self {
wg_public_key,
credential,
ticket_type,
mode: RegistrationMode::Dvpn,
client_ip,
registration_data: LpRegistrationData::Dvpn {
data: Box::new(LpDvpnRegistrationRequest {
wg_public_key,
credential,
ticket_type,
}),
},
#[allow(clippy::expect_used)]
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
@@ -154,7 +163,7 @@ impl LpRegistrationResponse {
}
/// Create a success response for mixnet mode with LpGatewayData
pub fn success_mixnet(allocated_bandwidth: i64, lp_gateway_data: LpGatewayData) -> Self {
pub fn success_mixnet(allocated_bandwidth: i64, lp_gateway_data: LpMixnetGatewayData) -> Self {
Self {
success: true,
error: None,
@@ -189,8 +198,8 @@ impl LpRegistrationResponse {
#[cfg(test)]
mod tests {
use super::*;
use nym_test_utils::helpers::deterministic_rng;
use std::net::Ipv4Addr;
// ==================== Helper Functions ====================
fn create_test_gateway_data() -> GatewayData {
@@ -242,49 +251,14 @@ mod tests {
assert!(response.gateway_data.is_none());
assert_eq!(response.allocated_bandwidth, 0);
}
// ==================== RegistrationMode Tests ====================
#[test]
fn test_registration_mode_serialize_dvpn() {
let mode = RegistrationMode::Dvpn;
let serialized = bincode::serialize(&mode).expect("Failed to serialize mode");
let deserialized: RegistrationMode =
bincode::deserialize(&serialized).expect("Failed to deserialize mode");
assert!(matches!(deserialized, RegistrationMode::Dvpn));
}
#[test]
fn test_registration_mode_serialize_mixnet() {
let client_ed25519_pubkey = [99u8; 32];
let client_x25519_pubkey = [88u8; 32];
let mode = RegistrationMode::Mixnet {
client_ed25519_pubkey,
client_x25519_pubkey,
};
let serialized = bincode::serialize(&mode).expect("Failed to serialize mode");
let deserialized: RegistrationMode =
bincode::deserialize(&serialized).expect("Failed to deserialize mode");
match deserialized {
RegistrationMode::Mixnet {
client_ed25519_pubkey: ed25519,
client_x25519_pubkey: x25519,
} => {
assert_eq!(ed25519, client_ed25519_pubkey);
assert_eq!(x25519, client_x25519_pubkey);
}
_ => panic!("Expected Mixnet mode"),
}
}
#[test]
fn test_lp_registration_response_success_mixnet() {
let lp_gateway_data = LpGatewayData {
gateway_identity: [1u8; 32],
gateway_sphinx_key: [2u8; 32],
let mut rng = deterministic_rng();
let valid_key = ed25519::KeyPair::new(&mut rng);
let lp_gateway_data = LpMixnetGatewayData {
gateway_identity: *valid_key.public_key(),
};
let allocated_bandwidth = 500_000_000;
@@ -299,7 +273,6 @@ mod tests {
let gw_data = response
.lp_gateway_data
.expect("LpGatewayData should be present");
assert_eq!(gw_data.gateway_identity, [1u8; 32]);
assert_eq!(gw_data.gateway_sphinx_key, [2u8; 32]);
assert_eq!(gw_data.gateway_identity, *valid_key.public_key());
}
}
+1 -1
View File
@@ -191,7 +191,7 @@ impl LpDataHandler {
match action {
LpAction::DeliverData(data) => {
// Decrypted application data - forward as Sphinx packet
self.forward_sphinx_packet(&data).await?;
self.forward_sphinx_packet(&data.content).await?;
inc!("lp_data_packets_forwarded");
Ok(())
}
+86 -108
View File
@@ -1,18 +1,18 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use super::messages::LpRegistrationRequest;
use super::registration::process_registration;
use super::LpHandlerState;
use crate::error::GatewayError;
use nym_crypto::asymmetric::{ed25519, x25519};
use nym_lp::state_machine::{LpAction, LpInput};
use nym_lp::state_machine::{LpAction, LpData, LpDataKind, LpInput};
use nym_lp::{
codec::OuterAeadKey, message::ForwardPacketData, packet::LpHeader, LpMessage, LpPacket,
OuterHeader,
};
use nym_lp_transport::traits::LpTransport;
use nym_metrics::{add_histogram_obs, inc};
use nym_registration_common::LpRegistrationRequest;
use std::net::SocketAddr;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@@ -535,16 +535,12 @@ where
})?
.map_err(|e| GatewayError::LpProtocolError(format!("State machine error: {}", e)))?;
let lp_session = state_machine.session().map_err(|e| {
GatewayError::LpProtocolError(format!("Session unavailable after processing: {}", e))
})?;
// Get outer key before releasing borrow
let outer_key = state_machine
.session()
.map_err(|e| {
GatewayError::LpProtocolError(format!(
"Session unavailable after processing: {}",
e
))
})?
.outer_aead_key();
let outer_key = lp_session.outer_aead_key();
drop(state_entry);
match action {
@@ -562,8 +558,7 @@ where
}
LpAction::DeliverData(data) => {
// Decrypted application data - process as registration/forwarding
self.handle_decrypted_payload(receiver_idx, data.to_vec())
.await
self.handle_decrypted_payload(receiver_idx, data).await
}
LpAction::SubsessionComplete {
packet: ready_packet,
@@ -597,38 +592,41 @@ where
async fn handle_decrypted_payload(
&mut self,
receiver_idx: u32,
decrypted_bytes: Vec<u8>,
decrypted_data: LpData,
) -> Result<(), GatewayError> {
let remote = self.remote_addr;
// Try to deserialize as LpRegistrationRequest first (most common case after handshake)
if let Ok(request) = LpRegistrationRequest::try_deserialise(&decrypted_bytes) {
debug!(
"LP registration request from {remote} (receiver_idx={receiver_idx}): mode={:?}",
request.mode
);
return self
.handle_registration_request(receiver_idx, request)
.await;
}
let bytes = decrypted_data.content;
match decrypted_data.kind {
LpDataKind::Registration => {
let request = LpRegistrationRequest::try_deserialise(&bytes).map_err(|err| {
GatewayError::LpProtocolError(format!("malformed LpRegistrationRequest: {err}"))
})?;
// Try to deserialize as ForwardPacketData (entry gateway forwarding to exit)
if let Ok(forward_data) = ForwardPacketData::decode(&decrypted_bytes) {
debug!(
"LP forward request from {remote} (receiver_idx={receiver_idx}) to {}",
forward_data.target_lp_address
);
return self
.handle_forwarding_request(receiver_idx, forward_data)
.await;
}
debug!(
"LP registration request from {remote} (receiver_idx={receiver_idx}): mode={:?}",
request.mode());
// Neither registration nor forwarding - unknown payload type
warn!("Unknown transport payload type from {remote} (receiver_idx={receiver_idx})");
inc!("lp_errors_unknown_payload_type");
Err(GatewayError::LpProtocolError(
"Unknown transport payload type (not registration or forwarding)".to_string(),
))
self.handle_registration_request(receiver_idx, request)
.await
}
LpDataKind::Forward => {
let forward_data = ForwardPacketData::decode(&bytes).map_err(|err| {
GatewayError::LpProtocolError(format!("malformed ForwardPacketData: {err}"))
})?;
self.handle_forwarding_request(receiver_idx, forward_data)
.await
}
LpDataKind::Opaque => {
// Neither registration nor forwarding - unknown payload type
warn!("Unknown transport payload type from {remote} (receiver_idx={receiver_idx}). dropping {} bytes", bytes.len());
inc!("lp_errors_unknown_payload_type");
Err(GatewayError::LpProtocolError(
"Unknown transport payload type (not registration or forwarding)".to_string(),
))
}
}
}
/// Handle subsession completion - promote subsession to new session
@@ -699,6 +697,48 @@ where
Ok(())
}
/// Attempt to wrap and send specified response back to the client
async fn send_response_packet(
&mut self,
receiver_idx: u32,
serialised_response: Vec<u8>,
response_kind: LpDataKind,
) -> Result<(), GatewayError> {
let session_entry = self
.state
.session_states
.get(&receiver_idx)
.ok_or_else(|| {
GatewayError::LpProtocolError(format!("Session not found: {receiver_idx}"))
})?;
// Access session via state machine for subsession support
let session = session_entry
.value()
.state
.session()
.map_err(|e| GatewayError::LpProtocolError(format!("Session error: {e}")))?;
let wrapped_lp_data = LpData::new(response_kind, serialised_response);
let data_bytes = wrapped_lp_data.to_vec();
let encrypted_message = session.encrypt_data(&data_bytes).map_err(|e| {
GatewayError::LpProtocolError(format!("Failed to encrypt response: {e}"))
})?;
let response_packet = session.next_packet(encrypted_message).map_err(|e| {
GatewayError::LpProtocolError(format!("Failed to create response packet: {e}"))
})?;
let outer_key = session.outer_aead_key();
drop(session_entry);
// Send response (encrypted with outer AEAD)
self.send_lp_packet(response_packet, outer_key.as_ref())
.await?;
Ok(())
}
/// Handle registration request on an established session
async fn handle_registration_request(
&mut self,
@@ -707,43 +747,11 @@ where
) -> Result<(), GatewayError> {
// Process registration (might modify state)
let response = process_registration(request, &self.state).await;
let response_bytes = response.serialise().map_err(|e| {
GatewayError::LpProtocolError(format!("Failed to serialize response: {e}"))
})?;
// Acquire session lock for encryption and get outer AEAD key
let (response_packet, outer_key) = {
let session_entry = self
.state
.session_states
.get(&receiver_idx)
.ok_or_else(|| {
GatewayError::LpProtocolError(format!("Session not found: {}", receiver_idx))
})?;
// Access session via state machine for subsession support
let session = session_entry
.value()
.state
.session()
.map_err(|e| GatewayError::LpProtocolError(format!("Session error: {}", e)))?;
// Serialize and encrypt response
let response_bytes = response.serialise().map_err(|e| {
GatewayError::LpProtocolError(format!("Failed to serialize response: {}", e))
})?;
let encrypted_message = session.encrypt_data(&response_bytes).map_err(|e| {
GatewayError::LpProtocolError(format!("Failed to encrypt response: {}", e))
})?;
let packet = session.next_packet(encrypted_message).map_err(|e| {
GatewayError::LpProtocolError(format!("Failed to create response packet: {}", e))
})?;
// Get outer AEAD key for packet encryption
let outer_key = session.outer_aead_key();
(packet, outer_key)
};
// Send response (encrypted with outer AEAD)
self.send_lp_packet(response_packet, outer_key.as_ref())
self.send_response_packet(receiver_idx, response_bytes, LpDataKind::Registration)
.await?;
if response.success {
@@ -767,40 +775,10 @@ where
receiver_idx: u32,
forward_data: ForwardPacketData,
) -> Result<(), GatewayError> {
// Forward the packet to the target gateway
// Forward the packet to the target gateway and retrieve its response
let response_bytes = self.handle_forward_packet(forward_data).await?;
// Encrypt response for client and get outer AEAD key
let (response_packet, outer_key) = {
let session_entry = self
.state
.session_states
.get(&receiver_idx)
.ok_or_else(|| {
GatewayError::LpProtocolError(format!("Session not found: {}", receiver_idx))
})?;
// Access session via state machine for subsession support
let session = session_entry
.value()
.state
.session()
.map_err(|e| GatewayError::LpProtocolError(format!("Session error: {}", e)))?;
let encrypted_message = session.encrypt_data(&response_bytes).map_err(|e| {
GatewayError::LpProtocolError(format!("Failed to encrypt forward response: {}", e))
})?;
let packet = session.next_packet(encrypted_message).map_err(|e| {
GatewayError::LpProtocolError(format!("Failed to create response packet: {}", e))
})?;
// Get outer AEAD key for packet encryption
let outer_key = session.outer_aead_key();
(packet, outer_key)
};
// Send encrypted response to client (encrypted with outer AEAD)
self.send_lp_packet(response_packet, outer_key.as_ref())
self.send_response_packet(receiver_idx, response_bytes, LpDataKind::Forward)
.await?;
debug!(
-10
View File
@@ -1,10 +0,0 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
//! LP registration message types.
//!
//! Re-exports shared message types from nym-registration-common.
pub use nym_registration_common::{
LpGatewayData, LpRegistrationRequest, LpRegistrationResponse, RegistrationMode,
};
+1 -2
View File
@@ -78,7 +78,6 @@ pub use nym_mixnet_client::forwarder::{
};
use nym_node_metrics::NymNodeMetrics;
use nym_task::ShutdownTracker;
pub use nym_wireguard::{PeerControlRequest, WireguardGatewayData};
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
@@ -87,10 +86,10 @@ use tokio::sync::{mpsc, Semaphore};
use tracing::*;
pub use nym_lp::peer::LpLocalPeer;
pub use nym_wireguard::{PeerControlRequest, WireguardGatewayData};
mod data_handler;
pub mod handler;
mod messages;
mod registration;
/// Configuration for LP listener
+117 -159
View File
@@ -1,9 +1,6 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use super::messages::{
LpGatewayData, LpRegistrationRequest, LpRegistrationResponse, RegistrationMode,
};
use super::LpHandlerState;
use crate::error::GatewayError;
use crate::node::client_handling::websocket::message_receiver::IsActive;
@@ -16,12 +13,14 @@ use nym_credential_verification::{
ClientBandwidth, CredentialVerifier,
};
use nym_credentials_interface::CredentialSpendingData;
use nym_crypto::asymmetric::ed25519;
use nym_gateway_requests::models::CredentialSpendingRequest;
use nym_gateway_storage::models::PersistedBandwidth;
use nym_gateway_storage::traits::BandwidthGatewayStorage;
use nym_metrics::{add_histogram_obs, inc, inc_by};
use nym_registration_common::GatewayData;
use nym_registration_common::{
GatewayData, LpDvpnRegistrationRequest, LpMixnetGatewayData, LpMixnetRegistrationRequest,
LpRegistrationData, LpRegistrationRequest, LpRegistrationResponse,
};
use nym_wireguard::PeerControlRequest;
use std::sync::Arc;
use time::OffsetDateTime;
@@ -195,12 +194,121 @@ async fn check_existing_registration(
))
}
async fn process_dvpn_registration(
request: Box<LpDvpnRegistrationRequest>,
state: &LpHandlerState,
) -> LpRegistrationResponse {
// Track dVPN registration attempts
inc!("lp_registration_dvpn_attempts");
// Check for idempotent re-registration (same WG key already registered)
// This allows clients to retry registration after network failures
// without wasting credentials
let wg_key_str = request.wg_public_key.to_string();
if let Some(existing_response) = check_existing_registration(&wg_key_str, state).await {
info!("LP dVPN re-registration for existing peer {wg_key_str} (idempotent)",);
inc!("lp_registration_dvpn_idempotent");
return existing_response;
}
// Register as WireGuard peer first to get client_id
let (gateway_data, client_id) = match register_wg_peer(
request.wg_public_key.inner().as_ref(),
request.ticket_type,
state,
)
.await
{
Ok(result) => result,
Err(e) => {
error!("LP WireGuard peer registration failed: {e}");
inc!("lp_registration_dvpn_failed");
inc!("lp_errors_wg_peer_registration");
return LpRegistrationResponse::error(format!(
"WireGuard peer registration failed: {e}",
));
}
};
// Verify credential with CredentialVerifier (handles double-spend, storage, etc.)
let allocated_bandwidth =
match credential_verification(state.ecash_verifier.clone(), request.credential, client_id)
.await
{
Ok(bandwidth) => bandwidth,
Err(e) => {
// Credential verification failed, remove the peer
warn!("LP credential verification failed for client {client_id}: {e}",);
inc!("lp_registration_dvpn_failed");
if let Err(remove_err) = state
.storage
.remove_wireguard_peer(&request.wg_public_key.to_string())
.await
{
error!(
"Failed to remove peer after credential verification failure: {remove_err}"
);
}
return LpRegistrationResponse::error(format!(
"Credential verification failed: {e}",
));
}
};
info!("LP dVPN registration successful (client_id: {client_id})");
inc!("lp_registration_dvpn_success");
LpRegistrationResponse::success(allocated_bandwidth, gateway_data)
}
async fn process_mixnet_registration(
request: LpMixnetRegistrationRequest,
state: &LpHandlerState,
) -> LpRegistrationResponse {
let session_id = rand::random::<u32>();
// Track mixnet registration attempts
inc!("lp_registration_mixnet_attempts");
// Derive destination address for ActiveClientsStore lookup
let client_identity = request.client_ed25519_pubkey;
let client_address = client_identity.derive_destination_address();
info!("LP Mixnet registration for client {client_identity}, session {session_id}");
warn!("unimplemented: LP mixnet registration initial bandwidth allocation");
// (the old implementation was wrong - it wasn't creating correct db entries)
// Create channels for client message delivery
let (mix_sender, _mix_receiver) = mpsc::unbounded();
let (is_active_request_sender, _is_active_request_receiver) =
mpsc::unbounded::<oneshot::Sender<IsActive>>();
// Insert client into ActiveClientsStore for SURB reply delivery
if !state.active_clients_store.insert_remote(
client_address,
mix_sender,
is_active_request_sender,
OffsetDateTime::now_utc(),
) {
warn!("LP Mixnet registration failed: client {client_identity} already registered",);
inc!("lp_registration_mixnet_failed");
return LpRegistrationResponse::error("Client already registered".to_string());
}
// Get gateway identity and derive sphinx key
let gateway_identity = *state.local_lp_peer.ed25519().public_key();
info!("LP Mixnet registration successful (client: {client_identity})",);
inc!("lp_registration_mixnet_success");
LpRegistrationResponse::success_mixnet(0, LpMixnetGatewayData { gateway_identity })
}
/// Process an LP registration request
pub async fn process_registration(
request: LpRegistrationRequest,
state: &LpHandlerState,
) -> LpRegistrationResponse {
let session_id = rand::random::<u32>();
let registration_start = std::time::Instant::now();
// Track total registration attempts
@@ -214,159 +322,9 @@ pub async fn process_registration(
}
// 2. Process based on mode
let result = match request.mode {
RegistrationMode::Dvpn => {
// Track dVPN registration attempts
inc!("lp_registration_dvpn_attempts");
// Check for idempotent re-registration (same WG key already registered)
// This allows clients to retry registration after network failures
// without wasting credentials
let wg_key_str = request.wg_public_key.to_string();
if let Some(existing_response) = check_existing_registration(&wg_key_str, state).await {
info!("LP dVPN re-registration for existing peer {wg_key_str} (idempotent)",);
inc!("lp_registration_dvpn_idempotent");
return existing_response;
}
// Register as WireGuard peer first to get client_id
let (gateway_data, client_id) = match register_wg_peer(
request.wg_public_key.inner().as_ref(),
request.ticket_type,
state,
)
.await
{
Ok(result) => result,
Err(e) => {
error!("LP WireGuard peer registration failed: {e}");
inc!("lp_registration_dvpn_failed");
inc!("lp_errors_wg_peer_registration");
return LpRegistrationResponse::error(format!(
"WireGuard peer registration failed: {e}",
));
}
};
// Verify credential with CredentialVerifier (handles double-spend, storage, etc.)
let allocated_bandwidth = match credential_verification(
state.ecash_verifier.clone(),
request.credential,
client_id,
)
.await
{
Ok(bandwidth) => bandwidth,
Err(e) => {
// Credential verification failed, remove the peer
warn!("LP credential verification failed for client {client_id}: {e}",);
inc!("lp_registration_dvpn_failed");
if let Err(remove_err) = state
.storage
.remove_wireguard_peer(&request.wg_public_key.to_string())
.await
{
error!("Failed to remove peer after credential verification failure: {remove_err}");
}
return LpRegistrationResponse::error(format!(
"Credential verification failed: {e}",
));
}
};
info!("LP dVPN registration successful (client_id: {})", client_id);
inc!("lp_registration_dvpn_success");
LpRegistrationResponse::success(allocated_bandwidth, gateway_data)
}
RegistrationMode::Mixnet {
client_ed25519_pubkey,
client_x25519_pubkey: _,
} => {
// Track mixnet registration attempts
inc!("lp_registration_mixnet_attempts");
// Parse client's ed25519 public key
let client_identity = match ed25519::PublicKey::from_bytes(&client_ed25519_pubkey) {
Ok(key) => key,
Err(e) => {
warn!("LP Mixnet registration failed: invalid ed25519 key: {e}");
inc!("lp_registration_mixnet_failed");
return LpRegistrationResponse::error(format!(
"Invalid client ed25519 key: {e}",
));
}
};
// Derive destination address for ActiveClientsStore lookup
let client_address = client_identity.derive_destination_address();
// Generate client_id for credential verification (first 8 bytes of ed25519 key)
#[allow(clippy::expect_used)]
let client_id = i64::from_be_bytes(
client_ed25519_pubkey[0..8]
.try_into()
.expect("This cannot fail, since the key is 32 bytes long"),
);
info!("LP Mixnet registration for client {client_identity}, session {session_id}",);
// Verify credential with CredentialVerifier
let allocated_bandwidth = match credential_verification(
state.ecash_verifier.clone(),
request.credential,
client_id,
)
.await
{
Ok(bandwidth) => bandwidth,
Err(e) => {
warn!("LP Mixnet credential verification failed for client {client_identity}: {e}");
inc!("lp_registration_mixnet_failed");
return LpRegistrationResponse::error(format!(
"Credential verification failed: {e}"
));
}
};
// Create channels for client message delivery
let (mix_sender, _mix_receiver) = mpsc::unbounded();
let (is_active_request_sender, _is_active_request_receiver) =
mpsc::unbounded::<oneshot::Sender<IsActive>>();
// Insert client into ActiveClientsStore for SURB reply delivery
if !state.active_clients_store.insert_remote(
client_address,
mix_sender,
is_active_request_sender,
OffsetDateTime::now_utc(),
) {
warn!("LP Mixnet registration failed: client {client_identity} already registered",);
inc!("lp_registration_mixnet_failed");
return LpRegistrationResponse::error("Client already registered".to_string());
}
// Get gateway identity and derive sphinx key
let ed25519_key = state.local_lp_peer.ed25519().public_key();
let gateway_identity = ed25519_key.to_bytes();
warn!("TEMPORARY ed25519 -> x25519 conversion");
#[allow(clippy::expect_used)]
let gateway_sphinx_key = ed25519_key
.to_x25519()
.expect("valid ed25519 key should convert to x25519")
.to_bytes();
info!("LP Mixnet registration successful (client: {client_identity})",);
inc!("lp_registration_mixnet_success");
LpRegistrationResponse::success_mixnet(
allocated_bandwidth,
LpGatewayData {
gateway_identity,
gateway_sphinx_key,
},
)
}
let result = match request.registration_data {
LpRegistrationData::Dvpn { data } => process_dvpn_registration(data, state).await,
LpRegistrationData::Mixnet { data } => process_mixnet_registration(data, state).await,
};
// Track registration duration
+1 -1
View File
@@ -529,6 +529,7 @@ mod tests {
#[tokio::test]
async fn test_basic_lp_exit_registration() -> anyhow::Result<()> {
// nym_test_utils::helpers::setup_test_logger();
// initialise random, but deterministic, keys, addresses, etc. for the parties
let mut client_rng = u64_seeded_rng(0);
let mut entry_rng = u64_seeded_rng(1);
@@ -652,7 +653,6 @@ mod tests {
exit.base.peer.ed25519().public_key(),
&client_data.ticket_provider,
TicketType::V1WireguardExit,
client_data.base.socket_addr.ip(),
)
.timeboxed()
.await??;
-2
View File
@@ -1329,7 +1329,6 @@ where
&exit_wg_keypair,
credential,
TicketType::V1WireguardExit,
exit_ip,
)
.await
{
@@ -1347,7 +1346,6 @@ where
&exit_gateway_pubkey,
bandwidth_controller,
TicketType::V1WireguardExit,
exit_ip,
)
.await
{
-1
View File
@@ -235,7 +235,6 @@ impl RegistrationClient {
&self.config.exit.node.identity,
&*self.bandwidth_controller,
TicketType::V1WireguardExit,
self.config.exit.node.ip_address,
)
.await
.map_err(|source| RegistrationClientError::ExitGatewayRegisterLp {
+17 -49
View File
@@ -5,6 +5,10 @@
use super::config::LpConfig;
use super::error::{LpClientError, Result};
use crate::lp_client::helpers::{
convert_forward_data, convert_registration_request, try_convert_forward_response,
try_convert_registration_response,
};
use bytes::BytesMut;
use nym_bandwidth_controller::{BandwidthTicketProvider, DEFAULT_TICKETS_TO_SPEND};
use nym_credentials_interface::{CredentialSpendingData, TicketType};
@@ -13,9 +17,9 @@ use nym_lp::LpPacket;
use nym_lp::codec::{OuterAeadKey, parse_lp_packet, serialize_lp_packet};
use nym_lp::message::ForwardPacketData;
use nym_lp::peer::{LpLocalPeer, LpRemotePeer};
use nym_lp::state_machine::{LpAction, LpInput, LpStateMachine};
use nym_lp::state_machine::{LpAction, LpData, LpInput, LpStateMachine};
use nym_lp_transport::traits::LpTransport;
use nym_registration_common::{GatewayData, LpRegistrationRequest, LpRegistrationResponse};
use nym_registration_common::{GatewayData, LpRegistrationRequest};
use nym_wireguard_types::PeerPublicKey;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
@@ -740,20 +744,12 @@ where
// 1. Build registration request
let wg_public_key = PeerPublicKey::new(wg_keypair.public_key().to_bytes().into());
let request =
LpRegistrationRequest::new_dvpn(wg_public_key, credential, ticket_type, self.client_ip);
let request = LpRegistrationRequest::new_dvpn(wg_public_key, credential, ticket_type);
tracing::trace!("Built registration request: {:?}", request);
// 2. Serialize the request
let request_bytes = request.serialise().map_err(|e| {
LpClientError::SendRegistrationRequest(format!("Failed to serialize request: {e}"))
})?;
tracing::debug!(
"Sending registration request ({} bytes)",
request_bytes.len()
);
let input = convert_registration_request(request)?;
// 3. Encrypt and prepare packet via state machine (scoped borrow)
let (request_packet, send_key, recv_key) = {
@@ -762,7 +758,7 @@ where
})?;
let action = state_machine
.process_input(LpInput::SendData(request_bytes))
.process_input(input)
.ok_or_else(|| LpClientError::transport("State machine returned no action"))?
.map_err(|e| {
LpClientError::SendRegistrationRequest(format!(
@@ -821,29 +817,15 @@ where
))
})?;
// 7. Extract decrypted data
let response_data = match action {
LpAction::DeliverData(data) => data,
other => {
return Err(LpClientError::Transport(format!(
"Unexpected action when receiving registration response: {other:?}"
)));
}
};
// 8. Deserialize the response
let response = LpRegistrationResponse::try_deserialise(&response_data).map_err(|e| {
LpClientError::ReceiveRegistrationResponse(format!(
"Failed to deserialize registration response: {e}",
))
})?;
// 6. Extract decrypted data and deserialise the response
let response = try_convert_registration_response(action)?;
tracing::debug!(
"Received registration response: success={}",
response.success,
);
// 9. Validate and extract GatewayData
// 7. Validate and extract GatewayData
if !response.success {
let error_msg = response
.error
@@ -1022,14 +1004,8 @@ where
target_lp_address: target_address.clone(),
inner_packet_bytes,
};
// 2. Serialize the ForwardPacketData
let forward_data_bytes = forward_data.to_bytes();
tracing::trace!(
"Serialized ForwardPacketData ({} bytes)",
forward_data_bytes.len()
);
let input = convert_forward_data(forward_data);
// 3. Encrypt and prepare packet via state machine (scoped borrow)
let (forward_packet, send_key, recv_key) = {
@@ -1038,7 +1014,7 @@ where
})?;
let action = state_machine
.process_input(LpInput::SendData(forward_data_bytes))
.process_input(input)
.ok_or_else(|| LpClientError::transport("State machine returned no action"))?
.map_err(|e| {
LpClientError::Transport(format!("Failed to encrypt ForwardPacket: {e}"))
@@ -1094,15 +1070,7 @@ where
})?;
// 7. Extract decrypted response data
let response_data = match action {
LpAction::DeliverData(data) => data,
other => {
return Err(LpClientError::Transport(format!(
"Unexpected action when receiving forward response: {:?}",
other
)));
}
};
let response_data = try_convert_forward_response(action)?;
tracing::debug!(
"Successfully received forward response from {} ({} bytes)",
@@ -1110,7 +1078,7 @@ where
response_data.len()
);
Ok(response_data.to_vec())
Ok(response_data)
}
/// Wrap data in an LP packet for UDP transmission to the data plane (port 51264).
@@ -1148,7 +1116,7 @@ where
// Process data through state machine to create LP packet
let action = state_machine
.process_input(LpInput::SendData(data.to_vec()))
.process_input(LpInput::SendData(LpData::new_opaque(data.to_vec())))
.ok_or_else(|| LpClientError::transport("State machine returned no action"))?
.map_err(|e| LpClientError::Transport(format!("Failed to encrypt data: {e}")))?;
@@ -0,0 +1,83 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::LpClientError;
use nym_lp::message::ForwardPacketData;
use nym_lp::state_machine::{LpAction, LpData, LpDataKind, LpInput};
use nym_registration_common::{LpRegistrationRequest, LpRegistrationResponse};
pub(crate) fn convert_registration_request(
request: LpRegistrationRequest,
) -> Result<LpInput, LpClientError> {
let request_bytes = request.serialise().map_err(|e| {
LpClientError::SendRegistrationRequest(format!("Failed to serialize request: {e}"))
})?;
tracing::debug!(
"Sending registration request ({} bytes)",
request_bytes.len()
);
let data = LpData::new_registration(request_bytes);
Ok(LpInput::SendData(data))
}
pub(crate) fn try_convert_registration_response(
action: LpAction,
) -> Result<LpRegistrationResponse, LpClientError> {
let response_data = match action {
LpAction::DeliverData(data) => data,
other => {
return Err(LpClientError::Transport(format!(
"Unexpected action when receiving registration response: {other:?}"
)));
}
};
if response_data.kind != LpDataKind::Registration {
return Err(LpClientError::Transport(format!(
"did not receive a valid registration response. got {:?} instead",
response_data.kind
)));
}
let response =
LpRegistrationResponse::try_deserialise(&response_data.content).map_err(|e| {
LpClientError::Transport(format!("Failed to deserialize registration response: {e}",))
})?;
Ok(response)
}
pub(crate) fn convert_forward_data(request: ForwardPacketData) -> LpInput {
let request_bytes = request.to_bytes();
tracing::trace!(
"Sending forward packet data request ({} bytes)",
request_bytes.len()
);
let data = LpData::new_forward(request_bytes);
LpInput::SendData(data)
}
pub(crate) fn try_convert_forward_response(action: LpAction) -> Result<Vec<u8>, LpClientError> {
let response_data = match action {
LpAction::DeliverData(data) => data,
other => {
return Err(LpClientError::Transport(format!(
"Unexpected action when receiving forward response: {:?}",
other
)));
}
};
if response_data.kind != LpDataKind::Forward {
return Err(LpClientError::Transport(format!(
"did not receive a valid foreward response. got {:?} instead",
response_data.kind
)));
}
Ok(response_data.content.into())
}
@@ -34,6 +34,7 @@
mod client;
mod config;
pub(crate) mod error;
pub(crate) mod helpers;
mod nested_session;
pub use client::LpRegistrationClient;
@@ -20,6 +20,7 @@
use super::client::LpRegistrationClient;
use super::error::{LpClientError, Result};
use crate::lp_client::helpers::{convert_registration_request, try_convert_registration_response};
use bytes::BytesMut;
use nym_bandwidth_controller::BandwidthTicketProvider;
use nym_credentials_interface::TicketType;
@@ -29,9 +30,8 @@ use nym_lp::peer::{LpLocalPeer, LpRemotePeer};
use nym_lp::state_machine::{LpAction, LpInput, LpStateMachine};
use nym_lp::{LpMessage, LpPacket};
use nym_lp_transport::traits::LpTransport;
use nym_registration_common::{GatewayData, LpRegistrationRequest, LpRegistrationResponse};
use nym_registration_common::{GatewayData, LpRegistrationRequest};
use nym_wireguard_types::PeerPublicKey;
use std::net::IpAddr;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
@@ -293,7 +293,6 @@ impl NestedLpSession {
wg_keypair: &x25519::KeyPair,
credential: nym_credentials_interface::CredentialSpendingData,
ticket_type: TicketType,
client_ip: IpAddr,
) -> Result<GatewayData>
where
S: LpTransport + Unpin,
@@ -312,24 +311,17 @@ impl NestedLpSession {
// Step 3: Build registration request (credential already provided)
let wg_public_key = PeerPublicKey::new(wg_keypair.public_key().to_bytes().into());
let request =
LpRegistrationRequest::new_dvpn(wg_public_key, credential, ticket_type, client_ip);
let request = LpRegistrationRequest::new_dvpn(wg_public_key, credential, ticket_type);
tracing::trace!("Built registration request: {:?}", request);
// Step 4: Serialize the request
let request_bytes = request.serialise().map_err(|e| {
LpClientError::Transport(format!("Failed to serialize registration request: {}", e))
})?;
tracing::debug!(
"Sending registration request to exit gateway via forwarding ({} bytes)",
request_bytes.len()
);
let input = convert_registration_request(request)?;
tracing::debug!("Sending registration request to exit gateway via forwarding",);
// Step 5: Encrypt and prepare packet via state machine
let action = state_machine
.process_input(LpInput::SendData(request_bytes))
.process_input(input)
.ok_or_else(|| {
LpClientError::Transport("State machine returned no action".to_string())
})?
@@ -374,36 +366,20 @@ impl NestedLpSession {
LpClientError::Transport(format!("Failed to decrypt registration response: {}", e))
})?;
// Step 9: Extract decrypted data
let response_data = match action {
LpAction::DeliverData(data) => data,
other => {
return Err(LpClientError::Transport(format!(
"Unexpected action when receiving registration response: {:?}",
other
)));
}
};
// Step 10: Deserialize the response
let response = LpRegistrationResponse::try_deserialise(&response_data).map_err(|e| {
LpClientError::Transport(format!(
"Failed to deserialize registration response: {}",
e
))
})?;
// Step 9: Extract decrypted data and deserialise the response
let response = try_convert_registration_response(action)?;
tracing::debug!(
"Received registration response from exit: success={}",
response.success,
);
// Step 11: Validate and extract GatewayData
// Step 10: Validate and extract GatewayData
if !response.success {
let error_msg = response
.error
.unwrap_or_else(|| "Unknown error".to_string());
tracing::warn!("Exit gateway rejected registration: {}", error_msg);
tracing::warn!("Exit gateway rejected registration: {error_msg}");
return Err(LpClientError::RegistrationRejected { reason: error_msg });
}
@@ -456,7 +432,6 @@ impl NestedLpSession {
gateway_identity: &ed25519::PublicKey,
bandwidth_controller: &dyn BandwidthTicketProvider,
ticket_type: TicketType,
client_ip: IpAddr,
) -> Result<GatewayData>
where
S: LpTransport + Unpin,
@@ -486,24 +461,17 @@ impl NestedLpSession {
// Step 4: Build registration request
let wg_public_key = PeerPublicKey::new(wg_keypair.public_key().to_bytes().into());
let request =
LpRegistrationRequest::new_dvpn(wg_public_key, credential, ticket_type, client_ip);
let request = LpRegistrationRequest::new_dvpn(wg_public_key, credential, ticket_type);
tracing::trace!("Built registration request: {:?}", request);
// Step 5: Serialize the request
let request_bytes = request.serialise().map_err(|e| {
LpClientError::Transport(format!("Failed to serialize registration request: {}", e))
})?;
tracing::debug!(
"Sending registration request to exit gateway via forwarding ({} bytes)",
request_bytes.len()
);
let input = convert_registration_request(request)?;
tracing::debug!("Sending registration request to exit gateway via forwarding");
// Step 6: Encrypt and prepare packet via state machine
let action = state_machine
.process_input(LpInput::SendData(request_bytes))
.process_input(input)
.ok_or_else(|| {
LpClientError::Transport("State machine returned no action".to_string())
})?
@@ -548,31 +516,15 @@ impl NestedLpSession {
LpClientError::Transport(format!("Failed to decrypt registration response: {}", e))
})?;
// Step 10: Extract decrypted data
let response_data = match action {
LpAction::DeliverData(data) => data,
other => {
return Err(LpClientError::Transport(format!(
"Unexpected action when receiving registration response: {:?}",
other
)));
}
};
// Step 11: Deserialize the response
let response = LpRegistrationResponse::try_deserialise(&response_data).map_err(|e| {
LpClientError::Transport(format!(
"Failed to deserialize registration response: {}",
e
))
})?;
// Step 10: Extract decrypted data and deserialise the response
let response = try_convert_registration_response(action)?;
tracing::debug!(
"Received registration response from exit: success={}",
response.success,
);
// Step 12: Validate and extract GatewayData
// Step 11: Validate and extract GatewayData
if !response.success {
let error_msg = response
.error
@@ -632,7 +584,6 @@ impl NestedLpSession {
gateway_identity: &ed25519::PublicKey,
bandwidth_controller: &dyn BandwidthTicketProvider,
ticket_type: TicketType,
client_ip: IpAddr,
max_retries: u32,
) -> Result<GatewayData>
where
@@ -687,7 +638,6 @@ impl NestedLpSession {
wg_keypair,
credential.clone(),
ticket_type,
client_ip,
)
.await
{
+28 -1
View File
@@ -1,10 +1,13 @@
use std::time::Duration;
use nym_client_core::client::base_client::ClientState;
use nym_socks5_client_core::config::Socks5;
use nym_sphinx::addressing::clients::Recipient;
use nym_task::connections::LaneQueueLengths;
use nym_task::ShutdownTracker;
use tokio::sync::RwLockReadGuard;
use nym_topology::NymTopology;
use nym_topology::{NymRouteProvider, NymTopology, NymTopologyError};
use crate::mixnet::client::MixnetClientBuilder;
use crate::Result;
@@ -85,4 +88,28 @@ impl Socks5MixnetClient {
pub async fn disconnect(self) {
self.task_handle.shutdown().await;
}
/// Gets the current route provider if topology is available.
/// Returns `None` if topology is empty/not yet fetched.
async fn read_current_route_provider(&self) -> Option<RwLockReadGuard<'_, NymRouteProvider>> {
self.client_state
.topology_accessor
.current_route_provider()
.await
}
/// Wait for topology to become available, with a timeout.
/// Returns `Ok(())` when topology is ready, or `Err` if timeout is reached.
pub async fn wait_for_topology(&self, timeout: Duration) -> Result<(), NymTopologyError> {
let deadline = tokio::time::Instant::now() + timeout;
loop {
if self.read_current_route_provider().await.is_some() {
return Ok(());
}
if tokio::time::Instant::now() >= deadline {
return Err(NymTopologyError::EmptyNetworkTopology);
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}