removed redundant LP states (#6509)

This commit is contained in:
Jędrzej Stuczyński
2026-03-03 13:58:47 +00:00
committed by GitHub
parent 5093450004
commit 05b6f5e282
16 changed files with 211 additions and 551 deletions
+15 -6
View File
@@ -48,7 +48,7 @@ pub(crate) fn encrypt_lp_packet(
) -> Result<EncryptedLpPacket, LpError> {
let mut plaintext = BytesMut::with_capacity(InnerHeader::SIZE + packet.message().len());
packet.header().inner.encode(&mut plaintext);
packet.message().encode_content(&mut plaintext);
packet.message().encode(&mut plaintext);
let ciphertext = encrypt_data(plaintext.as_ref(), transport)?;
@@ -67,7 +67,7 @@ pub(crate) fn decrypt_lp_packet(
let inner_header = InnerHeader::parse(&plaintext)?;
let payload = &plaintext[InnerHeader::SIZE..];
let message = LpMessage::decode_content(payload, inner_header.message_type)?;
let message = LpMessage::decode(payload)?;
Ok(LpPacket::new(
LpHeader {
@@ -82,7 +82,7 @@ pub(crate) fn decrypt_lp_packet(
mod tests {
use crate::LpError;
use crate::codec::{decrypt_data, decrypt_lp_packet, encrypt_data, encrypt_lp_packet};
use crate::packet::{EncryptedLpPacket, LpHeader, LpMessage, LpPacket, MessageType};
use crate::packet::{EncryptedLpPacket, LpHeader, LpMessage, LpPacket};
use crate::peer::mock_peers;
use crate::psq::initiator::{build_psq_ciphersuite, build_psq_principal};
use crate::psq::{PSQ_MSG2_SIZE, psq_msg1_size, responder};
@@ -259,7 +259,10 @@ mod tests {
let (mut init_transport, mut resp_transport) = mock_transport();
// happy path
let packet = LpPacket::new(LpHeader::new(123, 0, 1, MessageType::Busy), LpMessage::Busy);
let packet = LpPacket::new(
LpHeader::new(123, 0, 1),
LpMessage::new_opaque(b"foomp".to_vec()),
);
let ciphertext = encrypt_lp_packet(packet.clone(), &mut init_transport).unwrap();
assert_eq!(packet.header().outer, ciphertext.outer_header());
@@ -268,7 +271,10 @@ mod tests {
assert_eq!(packet, plaintext);
// incomplete ciphertext
let packet = LpPacket::new(LpHeader::new(123, 1, 1, MessageType::Busy), LpMessage::Busy);
let packet = LpPacket::new(
LpHeader::new(123, 1, 1),
LpMessage::new_opaque(b"foomp".to_vec()),
);
let ciphertext2 = encrypt_lp_packet(packet, &mut init_transport).unwrap();
let l = ciphertext2.ciphertext().len();
let malformed_content = ciphertext2.ciphertext()[..l - 1].to_vec();
@@ -277,7 +283,10 @@ mod tests {
assert!(matches!(dec_err, LpError::PSQSessionFailure { .. }));
// too small buffer
let packet = LpPacket::new(LpHeader::new(123, 1, 1, MessageType::Busy), LpMessage::Busy);
let packet = LpPacket::new(
LpHeader::new(123, 1, 1),
LpMessage::new_opaque(b"foomp".to_vec()),
);
let ciphertext3 = encrypt_lp_packet(packet, &mut resp_transport).unwrap();
let malformed = EncryptedLpPacket::new(ciphertext3.outer_header(), vec![]);
let dec_err = decrypt_lp_packet(malformed, &mut init_transport).unwrap_err();
+4 -4
View File
@@ -11,8 +11,8 @@ pub enum MalformedLpPacketError {
#[error("provided insufficient data to fully deserialise the struct")]
InsufficientData,
#[error("{0} is not a valid MessageType")]
InvalidMessageType(u32),
#[error("{0} is not a valid LpDataKind")]
InvalidLpDataKind(u16),
#[error("invalid payload size: expected {expected}, got {actual}")]
InvalidPayloadSize { expected: usize, actual: usize },
@@ -27,7 +27,7 @@ pub enum MalformedLpPacketError {
}
impl MalformedLpPacketError {
pub fn invalid_message_type(message_type: u32) -> Self {
MalformedLpPacketError::InvalidMessageType(message_type)
pub fn invalid_data_kind(message_type: u16) -> Self {
MalformedLpPacketError::InvalidLpDataKind(message_type)
}
}
+2 -19
View File
@@ -1,11 +1,9 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::packet::message::MessageType;
use crate::packet::version;
use crate::{packet::error::MalformedLpPacketError, peer_config::LpReceiverIndex};
use bytes::{BufMut, BytesMut};
// use nym_lp::peer_config::LpReceiverIndex;
use tracing::warn;
/// Outer header (12 bytes) - always cleartext, used for routing.
@@ -58,11 +56,10 @@ impl OuterHeader {
pub struct InnerHeader {
pub protocol_version: u8,
pub reserved: [u8; 3],
pub message_type: MessageType,
}
impl InnerHeader {
pub const SIZE: usize = 8; // protocol_version(1) + reserved(3) + message_type(4)
pub const SIZE: usize = 4; // protocol_version(1) + reserved(3)
pub fn encode(&self, dst: &mut BytesMut) {
// protocol version
@@ -70,9 +67,6 @@ impl InnerHeader {
// reserved
dst.put_slice(&self.reserved);
// message type
dst.put_slice(&(self.message_type as u32).to_le_bytes());
}
pub fn parse(src: &[u8]) -> Result<Self, MalformedLpPacketError> {
@@ -104,14 +98,9 @@ impl InnerHeader {
warn!("received non-zero reserved bytes. got: {reserved:?}");
}
let msg_type_raw = u32::from_le_bytes([src[4], src[5], src[6], src[7]]);
let message_type = MessageType::from_u32(msg_type_raw)
.ok_or_else(|| MalformedLpPacketError::invalid_message_type(msg_type_raw))?;
Ok(InnerHeader {
protocol_version,
reserved,
message_type,
})
}
}
@@ -129,12 +118,7 @@ pub struct LpHeader {
}
impl LpHeader {
pub fn new(
receiver_idx: LpReceiverIndex,
counter: u64,
protocol_version: u8,
message_type: MessageType,
) -> Self {
pub fn new(receiver_idx: LpReceiverIndex, counter: u64, protocol_version: u8) -> Self {
Self {
outer: OuterHeader {
receiver_idx,
@@ -143,7 +127,6 @@ impl LpHeader {
inner: InnerHeader {
protocol_version,
reserved: [0u8; 3],
message_type,
},
}
}
+102 -348
View File
@@ -2,109 +2,119 @@
// SPDX-License-Identifier: Apache-2.0
use crate::packet::error::MalformedLpPacketError;
use bytes::{BufMut, BytesMut};
use bytes::{BufMut, Bytes, BytesMut};
use num_enum::{IntoPrimitive, TryFromPrimitive};
use std::fmt;
use std::fmt::Display;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
#[derive(Debug, Copy, Clone, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)]
#[repr(u32)]
pub enum MessageType {
/// The party is busy
Busy = 0x0000,
/// Encrypted payload
EncryptedData = 0x0001,
/// Receiver should forward this message via telescoping
ForwardPacket = 0x0002,
/// Receiver index collision - client should retry with new index
Collision = 0x0003,
/// Acknowledgment - gateway confirms receipt of message
Ack = 0x0004,
/// General error
Error = 0x00FF,
#[derive(Debug, Clone, PartialEq)]
pub struct LpMessageHeader {
pub kind: LpMessageType,
pub message_attributes: [u8; 14],
}
impl MessageType {
pub(crate) fn from_u32(value: u32) -> Option<Self> {
MessageType::try_from(value).ok()
}
impl LpMessageHeader {
pub const SIZE: usize = 16; // message_kind(2) + message_attributes(14)
pub fn to_u32(&self) -> u32 {
u32::from(*self)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ApplicationData(pub Vec<u8>);
impl ApplicationData {
pub fn new(bytes: Vec<u8>) -> Self {
Self(bytes)
}
fn len(&self) -> usize {
self.0.len()
}
fn encode(&self, dst: &mut BytesMut) {
dst.put_slice(&self.0);
}
fn decode(bytes: &[u8]) -> Result<Self, MalformedLpPacketError> {
Ok(ApplicationData(bytes.to_vec()))
}
}
/// General human-readable error message
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ErrorPacketData {
pub message: String,
}
impl ErrorPacketData {
pub fn new(message: impl Into<String>) -> Self {
ErrorPacketData {
message: message.into(),
pub fn new(kind: LpMessageType, message_attributes: [u8; 14]) -> Self {
Self {
kind,
message_attributes,
}
}
fn len(&self) -> usize {
// length-encoding + message
4 + self.message.len()
}
fn encode(&self, dst: &mut BytesMut) {
dst.put_u32_le(self.message.len() as u32);
dst.put_slice(self.message.as_bytes());
}
fn decode(bytes: &[u8]) -> Result<Self, MalformedLpPacketError> {
if bytes.len() < 4 {
return Err(MalformedLpPacketError::DeserialisationFailure(format!(
"Too few bytes to deserialise ErrorPacketData. got {}",
bytes.len()
)));
pub fn new_no_attributes(kind: LpMessageType) -> Self {
Self {
kind,
message_attributes: [0; 14],
}
let message_len = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize;
if bytes[4..].len() != message_len {
return Err(MalformedLpPacketError::DeserialisationFailure(format!(
"Wrong number of bytes to deserialise ErrorPacketData. got {}. Expected {}",
bytes.len(),
4 + message_len
)));
}
let message = String::from_utf8_lossy(&bytes[4..]).to_string();
Ok(ErrorPacketData { message })
}
/// Encode directly into a BytesMut buffer
pub fn encode(&self, dst: &mut BytesMut) {
dst.put_u16_le(self.kind as u16);
dst.put_slice(&self.message_attributes);
}
pub fn parse(src: &[u8]) -> Result<Self, MalformedLpPacketError> {
if src.len() < Self::SIZE {
return Err(MalformedLpPacketError::InsufficientData);
}
let raw_kind = u16::from_le_bytes([src[0], src[1]]);
let kind = LpMessageType::try_from(raw_kind)
.map_err(|_| MalformedLpPacketError::invalid_data_kind(raw_kind))?;
#[allow(clippy::unwrap_used)]
let message_attributes = src[2..16].try_into().unwrap();
Ok(Self {
kind,
message_attributes,
})
}
}
/// Represent application data being sent in Transport mode
#[derive(Debug, Clone, PartialEq)]
pub struct LpMessage {
pub header: LpMessageHeader,
pub content: Bytes,
}
impl AsRef<[u8]> for LpMessage {
fn as_ref(&self) -> &[u8] {
&self.content
}
}
impl LpMessage {
pub fn new(kind: LpMessageType, content: impl Into<Bytes>) -> Self {
Self {
header: LpMessageHeader::new_no_attributes(kind),
content: content.into(),
}
}
pub fn encode(&self, dst: &mut BytesMut) {
self.header.encode(dst);
dst.put_slice(&self.content);
}
pub fn decode(src: &[u8]) -> Result<Self, MalformedLpPacketError> {
let header = LpMessageHeader::parse(src)?;
let content = src[LpMessageHeader::SIZE..].to_vec().into();
Ok(Self { header, content })
}
pub fn kind(&self) -> LpMessageType {
self.header.kind
}
pub fn new_opaque(content: impl Into<Bytes>) -> Self {
Self::new(LpMessageType::Opaque, content)
}
pub fn new_registration(data: impl Into<Bytes>) -> Self {
Self::new(LpMessageType::Registration, data)
}
pub fn new_forward(data: impl Into<Bytes>) -> Self {
Self::new(LpMessageType::Forward, data)
}
pub(crate) fn len(&self) -> usize {
LpMessageHeader::SIZE + self.content.len()
}
}
/// Represent kind of application data being sent in Transport mode
#[derive(Clone, Copy, PartialEq, Eq, Debug, IntoPrimitive, TryFromPrimitive)]
#[repr(u16)]
pub enum LpMessageType {
Opaque = 0,
Registration = 1,
Forward = 2,
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -163,24 +173,6 @@ impl ForwardPacketData {
}
}
fn len(&self) -> usize {
// 1 byte length of target lp address type
// +
// {4,16} target_lp_address IPv{4,6}
// +
// 2 bytes target_lp_address port
// +
// 4 bytes for expected response size
// +
// 4 bytes of length of inner packet bytes
// +
// inner_packet_bytes.len()
match self.target_lp_address {
SocketAddr::V4(_) => 1 + 4 + 2 + 4 + 4 + self.inner_packet_bytes.len(),
SocketAddr::V6(_) => 1 + 16 + 2 + 4 + 4 + self.inner_packet_bytes.len(),
}
}
// 0 || [4B ipv4] || [2B port] || [4B res size] || [4B plen] || payload
// 1 || [16B ipv6] || [2B port] || [4B res size] || [4B plen] || payload
fn encode(&self, dst: &mut BytesMut) {
@@ -261,241 +253,3 @@ impl ForwardPacketData {
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LpMessage {
/// The party is busy
Busy,
/// Application payload is being sent
ApplicationData(ApplicationData),
/// Receiver should forward this message via telescoping
ForwardPacket(ForwardPacketData),
/// Receiver index collision - client should retry with new receiver_index
Collision,
/// Acknowledgment - gateway confirms receipt of message
Ack,
/// An error has occurred
Error(ErrorPacketData),
}
impl From<ApplicationData> for LpMessage {
fn from(value: ApplicationData) -> Self {
LpMessage::ApplicationData(value)
}
}
impl From<ForwardPacketData> for LpMessage {
fn from(value: ForwardPacketData) -> Self {
LpMessage::ForwardPacket(value)
}
}
impl Display for LpMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
LpMessage::Busy => write!(f, "Busy"),
LpMessage::ApplicationData(_) => write!(f, "EncryptedData"),
LpMessage::ForwardPacket(_) => write!(f, "ForwardPacket"),
LpMessage::Collision => write!(f, "Collision"),
LpMessage::Ack => write!(f, "Ack"),
LpMessage::Error(_) => write!(f, "Error"),
}
}
}
impl LpMessage {
#[deprecated(note = "is it actually needed?")]
pub fn payload(&self) -> &[u8] {
match self {
LpMessage::Busy => &[],
LpMessage::ApplicationData(payload) => payload.0.as_slice(),
LpMessage::ForwardPacket(_) => &[], // Structured data, serialized in encode_content
LpMessage::Collision => &[],
LpMessage::Ack => &[],
LpMessage::Error(_) => &[], // Structured data, serialized in encode_content (?)
}
}
#[deprecated(note = "is it actually needed?")]
pub fn is_empty(&self) -> bool {
match self {
LpMessage::Busy => true,
LpMessage::ApplicationData(payload) => payload.0.is_empty(),
LpMessage::ForwardPacket(_) => false, // Always has data
LpMessage::Collision => true,
LpMessage::Ack => true,
LpMessage::Error(_) => false,
}
}
pub fn len(&self) -> usize {
match self {
LpMessage::Busy => 0,
LpMessage::ApplicationData(payload) => payload.len(),
LpMessage::ForwardPacket(payload) => payload.len(),
LpMessage::Collision => 0,
LpMessage::Ack => 0,
LpMessage::Error(payload) => payload.len(),
}
}
pub fn typ(&self) -> MessageType {
match self {
LpMessage::Busy => MessageType::Busy,
LpMessage::ApplicationData(_) => MessageType::EncryptedData,
LpMessage::ForwardPacket(_) => MessageType::ForwardPacket,
LpMessage::Collision => MessageType::Collision,
LpMessage::Ack => MessageType::Ack,
LpMessage::Error(_) => MessageType::Error,
}
}
pub fn encode_content(&self, dst: &mut BytesMut) {
match self {
LpMessage::Busy => { /* No content */ }
LpMessage::ApplicationData(payload) => payload.encode(dst),
LpMessage::ForwardPacket(data) => data.encode(dst),
LpMessage::Collision => { /* No content */ }
LpMessage::Ack => { /* No content */ }
LpMessage::Error(data) => data.encode(dst),
}
}
/// Parse message from its type and content bytes.
///
/// Used when decrypting outer-encrypted packets where the message type
/// was encrypted along with the content.
pub fn decode_content(
content: &[u8],
message_type: MessageType,
) -> Result<Self, MalformedLpPacketError> {
match message_type {
MessageType::Busy => {
content.ensure_empty()?;
Ok(LpMessage::Busy)
}
MessageType::EncryptedData => Ok(LpMessage::ApplicationData(ApplicationData::decode(
content,
)?)),
MessageType::ForwardPacket => Ok(LpMessage::ForwardPacket(ForwardPacketData::decode(
content,
)?)),
MessageType::Collision => {
content.ensure_empty()?;
Ok(LpMessage::Collision)
}
MessageType::Ack => {
content.ensure_empty()?;
Ok(LpMessage::Ack)
}
MessageType::Error => Ok(LpMessage::Error(ErrorPacketData::decode(content)?)),
}
}
}
/// Helper trait for improving readability to return error if bytes content is not empty
trait EnsureEmptyContent {
fn ensure_empty(&self) -> Result<(), MalformedLpPacketError>;
}
impl EnsureEmptyContent for &[u8] {
fn ensure_empty(&self) -> Result<(), MalformedLpPacketError> {
if !self.is_empty() {
return Err(MalformedLpPacketError::InvalidPayloadSize {
expected: 0,
actual: self.len(),
});
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::packet::{InnerHeader, LpHeader, LpPacket, OuterHeader};
#[test]
fn encoding() {
let message = LpMessage::ApplicationData(ApplicationData(vec![11u8; 124]));
let resp_header = LpHeader {
outer: OuterHeader {
receiver_idx: 456,
counter: 123,
},
inner: InnerHeader {
protocol_version: 1,
reserved: [0u8; 3],
message_type: MessageType::EncryptedData,
},
};
let packet = LpPacket {
header: resp_header,
message,
};
// Just print packet for debug, will be captured in test output
println!("{packet:?}");
// Verify message type
assert!(matches!(packet.message.typ(), MessageType::EncryptedData));
// Verify correct data in message
match &packet.message {
LpMessage::ApplicationData(data) => {
assert_eq!(*data, ApplicationData(vec![11u8; 124]));
}
_ => panic!("Wrong message type"),
}
}
#[test]
fn forward_message_encoding() {
let msg1 = ForwardPacketData {
target_lp_address: "1.2.3.4:5678".parse().unwrap(),
expected_response_size: ExpectedResponseSize::Transport,
inner_packet_bytes: vec![],
};
let msg2 = ForwardPacketData {
target_lp_address: "1.2.3.4:5678".parse().unwrap(),
expected_response_size: ExpectedResponseSize::Handshake(250),
inner_packet_bytes: vec![42u8; 64],
};
let msg3 = ForwardPacketData {
target_lp_address: "[2001:db8::1]:8080".parse().unwrap(),
expected_response_size: ExpectedResponseSize::Transport,
inner_packet_bytes: vec![],
};
let msg4 = ForwardPacketData {
target_lp_address: "[2001:db8::1]:8080".parse().unwrap(),
expected_response_size: ExpectedResponseSize::Handshake(250),
inner_packet_bytes: vec![42u8; 64],
};
let b = msg1.to_bytes();
let msg1_r = ForwardPacketData::decode(&b).unwrap();
assert_eq!(msg1_r, msg1);
let b = msg2.to_bytes();
let msg2_r = ForwardPacketData::decode(&b).unwrap();
assert_eq!(msg2_r, msg2);
let b = msg3.to_bytes();
let msg3_r = ForwardPacketData::decode(&b).unwrap();
assert_eq!(msg3_r, msg3);
let b = msg4.to_bytes();
let msg4_r = ForwardPacketData::decode(&b).unwrap();
assert_eq!(msg4_r, msg4);
}
}
+3 -9
View File
@@ -7,7 +7,7 @@ use std::fmt::{Debug, Formatter};
pub use error::MalformedLpPacketError;
pub use header::{InnerHeader, LpHeader, OuterHeader};
pub use message::{ApplicationData, ForwardPacketData, LpMessage, MessageType};
pub use message::{ForwardPacketData, LpMessage};
pub mod error;
pub mod header;
@@ -78,7 +78,7 @@ impl EncryptedLpPacket {
}
}
#[derive(Clone, PartialEq, Eq)]
#[derive(Clone, PartialEq)]
pub struct LpPacket {
pub(crate) header: LpHeader,
pub(crate) message: LpMessage,
@@ -95,10 +95,6 @@ impl LpPacket {
Self { header, message }
}
pub fn typ(&self) -> MessageType {
self.message.typ()
}
pub fn message(&self) -> &LpMessage {
&self.message
}
@@ -119,8 +115,6 @@ impl LpPacket {
pub(crate) fn dbg_encode(&self, dst: &mut BytesMut) {
self.header.dbg_encode(dst);
dst.put_slice(&(self.message.typ() as u16).to_le_bytes());
self.message.encode_content(dst);
self.message.encode(dst)
}
}
+4 -9
View File
@@ -6,7 +6,7 @@
//! This module implements session management functionality, including replay protection
use crate::codec::{decrypt_lp_packet, encrypt_lp_packet};
use crate::packet::{ApplicationData, EncryptedLpPacket, LpHeader, LpMessage, LpPacket};
use crate::packet::{EncryptedLpPacket, LpHeader, LpMessage, LpPacket};
use crate::peer::{LpLocalPeer, LpRemotePeer};
use crate::peer_config::LpReceiverIndex;
use crate::psq::{
@@ -174,12 +174,7 @@ impl LpSession {
pub fn next_packet(&mut self, message: LpMessage) -> Result<LpPacket, LpError> {
let counter = self.next_counter();
let header = LpHeader::new(
self.receiver_index(),
counter,
self.protocol_version,
message.typ(),
);
let header = LpHeader::new(self.receiver_index(), counter, self.protocol_version);
let packet = LpPacket::new(header, message);
Ok(packet)
}
@@ -255,9 +250,9 @@ impl LpSession {
/// * `Err(LpError)` if the session is not in transport mode or encryption fails.
pub(crate) fn encrypt_application_data(
&mut self,
data: Vec<u8>,
data: LpMessage,
) -> Result<EncryptedLpPacket, LpError> {
let packet = self.next_packet(LpMessage::ApplicationData(ApplicationData::new(data)))?;
let packet = self.next_packet(data)?;
encrypt_lp_packet(packet, &mut self.active_transport)
}
+14 -12
View File
@@ -1,7 +1,7 @@
#[cfg(test)]
mod tests {
use crate::packet::EncryptedLpPacket;
use crate::state_machine::{LpAction, LpData, LpInput, LpStateBare};
use crate::packet::{EncryptedLpPacket, LpMessage};
use crate::state_machine::{LpAction, LpInput, LpStateBare};
use crate::{LpError, SessionManager, SessionsMock};
use nym_kkt_ciphersuite::{IntoEnumIterator, KEM};
@@ -9,7 +9,7 @@ mod tests {
trait ActionExtract {
fn ciphertext(self) -> EncryptedLpPacket;
fn data(self) -> LpData;
fn data(self) -> LpMessage;
}
impl ActionExtract for LpAction {
@@ -21,7 +21,7 @@ mod tests {
}
}
fn data(self) -> LpData {
fn data(self) -> LpMessage {
if let LpAction::DeliverData(data) = self {
data
} else {
@@ -54,7 +54,7 @@ mod tests {
// --- A sends to B ---
let plaintext_a = format!("A->B Message {i}").into_bytes();
let ciphertext_a = session_manager_1
.send_data(peer_a_sm, LpData::new_opaque(plaintext_a.clone()))
.send_data(peer_a_sm, LpMessage::new_opaque(plaintext_a.clone()))
.unwrap()
.ciphertext();
@@ -69,7 +69,7 @@ mod tests {
// --- B sends to A ---
let plaintext_b = format!("B->A Message {i}").into_bytes();
let ciphertext_b = session_manager_2
.send_data(peer_b_sm, LpData::new_opaque(plaintext_b.clone()))
.send_data(peer_b_sm, LpMessage::new_opaque(plaintext_b.clone()))
.unwrap()
.ciphertext();
@@ -195,8 +195,10 @@ mod tests {
// --- 3. Simulate Data Transfer via process_input ---
println!("Starting data transfer simulation via process_input...");
let plaintext_a_to_b = LpData::new_opaque(b"Hello from A via process_input!".to_vec());
let plaintext_b_to_a = LpData::new_opaque(b"Hello from B via process_input!".to_vec());
let plaintext_a_to_b =
LpMessage::new_opaque(b"Hello from A via process_input!".to_vec());
let plaintext_b_to_a =
LpMessage::new_opaque(b"Hello from B via process_input!".to_vec());
// --- A sends to B ---
println!(" A sends to B");
@@ -272,8 +274,8 @@ mod tests {
println!("Testing out-of-order reception via process_input...");
// A prepares N+1 then N
let data_n_plus_1 = LpData::new_opaque(b"Message N+1".to_vec());
let data_n = LpData::new_opaque(b"Message N".to_vec());
let data_n_plus_1 = LpMessage::new_opaque(b"Message N+1".to_vec());
let data_n = LpMessage::new_opaque(b"Message N".to_vec());
let action_send_n1 = session_manager_1
.process_input(session_id, LpInput::SendData(data_n_plus_1.clone()))
@@ -344,7 +346,7 @@ mod tests {
// Further actions on A fail
let send_after_close_a = session_manager_1.process_input(
session_id,
LpInput::SendData(LpData::new_opaque(b"fail".to_vec())),
LpInput::SendData(LpMessage::new_opaque(b"fail".to_vec())),
);
assert!(send_after_close_a.is_err());
assert!(matches!(
@@ -366,7 +368,7 @@ mod tests {
// Further actions on B fail
let send_after_close_b = session_manager_2.process_input(
session_id,
LpInput::SendData(LpData::new_opaque(b"fail".to_vec())),
LpInput::SendData(LpMessage::new_opaque(b"fail".to_vec())),
);
assert!(send_after_close_b.is_err());
assert!(matches!(
+7 -3
View File
@@ -6,9 +6,9 @@
//! This module implements session lifecycle management functionality, handling
//! creation, retrieval, and storage of sessions.
use crate::packet::EncryptedLpPacket;
use crate::packet::{EncryptedLpPacket, LpMessage};
use crate::peer_config::LpReceiverIndex;
use crate::state_machine::{LpAction, LpData, LpInput, LpStateBare};
use crate::state_machine::{LpAction, LpInput, LpStateBare};
use crate::{LpError, LpSession, LpStateMachine};
use std::collections::HashMap;
@@ -44,7 +44,11 @@ impl SessionManager {
self.with_state_machine_mut(lp_id, |sm| sm.process_input(input).transpose())?
}
pub fn send_data(&mut self, lp_id: LpReceiverIndex, data: LpData) -> Result<LpAction, LpError> {
pub fn send_data(
&mut self,
lp_id: LpReceiverIndex,
data: LpMessage,
) -> Result<LpAction, LpError> {
self.process_input(lp_id, LpInput::SendData(data))?
.ok_or(LpError::NotInTransport)
}
+12 -103
View File
@@ -5,12 +5,11 @@
//! State machine ensures protocol steps execute in correct order. Invalid transitions
//! return LpError, preventing protocol violations.
use crate::packet::{EncryptedLpPacket, LpMessage};
use crate::packet::EncryptedLpPacket;
use crate::packet::message::LpMessage;
use crate::peer_config::LpReceiverIndex;
use crate::session::SessionId;
use crate::{LpError, session::LpSession};
use bytes::{Buf, Bytes};
use num_enum::{IntoPrimitive, TryFromPrimitive};
use std::mem;
#[derive(Debug)]
@@ -58,7 +57,7 @@ pub enum LpInput {
ReceivePacket(EncryptedLpPacket),
/// Application wants to send data (only valid in Transport state).
SendData(LpData),
SendData(LpMessage),
/// Close the connection.
Close,
@@ -71,81 +70,12 @@ pub enum LpAction {
SendPacket(EncryptedLpPacket),
/// Deliver decrypted application data received from the peer.
DeliverData(LpData),
DeliverData(LpMessage),
/// Inform the environment that the connection is closed.
ConnectionClosed,
}
/// Represent application data being sent in Transport mode
#[derive(Debug, Clone, PartialEq)]
pub struct LpData {
pub kind: LpDataKind,
pub content: Bytes,
}
impl AsRef<[u8]> for LpData {
fn as_ref(&self) -> &[u8] {
&self.content
}
}
impl LpData {
pub fn new(kind: LpDataKind, content: impl Into<Bytes>) -> Self {
Self {
kind,
content: content.into(),
}
}
pub fn new_opaque(content: impl Into<Bytes>) -> Self {
Self::new(LpDataKind::Opaque, content)
}
pub fn new_registration(data: impl Into<Bytes>) -> Self {
Self::new(LpDataKind::Registration, data)
}
pub fn new_forward(data: impl Into<Bytes>) -> Self {
Self::new(LpDataKind::Forward, data)
}
pub fn to_vec(self) -> Vec<u8> {
self.into()
}
}
impl From<LpData> for Vec<u8> {
fn from(data: LpData) -> Self {
let mut out = Vec::with_capacity(data.content.len() + 1);
out.push(data.kind as u8);
out.extend_from_slice(data.content.as_ref());
out
}
}
impl TryFrom<Vec<u8>> for LpData {
type Error = LpError;
fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
let kind = LpDataKind::try_from(value[0]).map_err(|_| {
LpError::DeserializationError(format!("unknown data type: {}", value[0]))
})?;
let mut content = Bytes::from(value);
content.advance(1);
Ok(LpData::new(kind, content))
}
}
/// Represent kind of application data being sent in Transport mode
#[derive(Clone, Copy, PartialEq, Eq, Debug, IntoPrimitive, TryFromPrimitive)]
#[repr(u8)]
pub enum LpDataKind {
Opaque = 0,
Registration = 1,
Forward = 2,
}
/// The Lewes Protocol State Machine.
pub struct LpStateMachine {
pub state: LpState,
@@ -234,31 +164,10 @@ impl LpStateMachine {
return (LpState::Transport(state), Some(Err(e)));
}
// Check message type
match packet.into_message() {
// Normal encrypted data
LpMessage::ApplicationData(payload) => {
// Deliver data
match payload.0.try_into() {
Ok(data) => {
let result_action = Some(Ok(LpAction::DeliverData(data)));
(LpState::Transport(state), result_action)
}
Err(e) => {
let reason = e.to_string();
(LpState::Closed { reason }, Some(Err(e)))
}
}
}
other => {
// Unexpected message type in Transport state
let err = LpError::InvalidStateTransition {
state: "Transport".to_string(),
input: format!("Unexpected message type: {other}"),
};
(LpState::Transport(state), Some(Err(err)))
}
}
// 4. deliver the message
let message = packet.message;
let result_action = Some(Ok(LpAction::DeliverData(message)));
(LpState::Transport(state), result_action)
}
LpInput::SendData(data) => {
// Encrypt and send application data
@@ -339,9 +248,9 @@ impl LpStateMachine {
fn prepare_data_packet(
&self,
session: &mut LpSession,
data: LpData,
data: LpMessage,
) -> Result<EncryptedLpPacket, LpError> {
session.encrypt_application_data(data.to_vec())
session.encrypt_application_data(data)
}
}
@@ -386,7 +295,7 @@ mod tests {
// --- Transport Phase ---
println!("--- Step 1: Initiator sends data ---");
let data_to_send_1 = LpData::new_opaque(b"hello responder".to_vec());
let data_to_send_1 = LpMessage::new_opaque(b"hello responder".to_vec());
let init_actions_4 = initiator.process_input(LpInput::SendData(data_to_send_1.clone()));
let data_packet_1 = if let Some(Ok(LpAction::SendPacket(packet))) = init_actions_4 {
packet.clone()
@@ -405,7 +314,7 @@ mod tests {
assert_eq!(resp_data_1, data_to_send_1);
println!("--- Step 3: Responder sends data ---");
let data_to_send_2 = LpData::new_opaque(b"hello initiator".to_vec());
let data_to_send_2 = LpMessage::new_opaque(b"hello initiator".to_vec());
let resp_actions_6 = responder.process_input(LpInput::SendData(data_to_send_2.clone()));
let data_packet_2 = if let Some(Ok(LpAction::SendPacket(packet))) = resp_actions_6 {
packet.clone()
+2 -1
View File
@@ -393,10 +393,11 @@ mod tests {
#[tokio::test]
async fn test_basic_lp_entry_registration() -> anyhow::Result<()> {
// nym_test_utils::helpers::setup_test_logger();
for kem in KEM::iter() {
let ciphersuite = Ciphersuite::default().with_kem(kem);
// nym_test_utils::helpers::setup_test_logger();
// initialise random, but deterministic, keys, addresses, etc. for the parties
let mut client_rng = u64_seeded_rng_09(0);
let mut gateway_rng = u64_seeded_rng_09(1);
+3 -2
View File
@@ -2,7 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
use crate::node::lp::LpReceiverIndex;
use nym_lp::state_machine::{LpAction, LpDataKind};
use nym_lp::packet::message::LpMessageType;
use nym_lp::state_machine::LpAction;
use nym_lp::transport::LpTransportError;
use nym_lp::{LpError, packet::MalformedLpPacketError};
use std::net::SocketAddr;
@@ -44,7 +45,7 @@ pub enum LpHandlerError {
MalformedLpPacket(#[from] MalformedLpPacketError),
#[error("received payload type of an unexpected type: {typ:?}")]
UnexpectedLpPayload { typ: LpDataKind },
UnexpectedLpPayload { typ: LpMessageType },
#[error("timed out while attempting to send to/receive from the connection")]
ConnectionTimeout,
+15 -13
View File
@@ -4,8 +4,9 @@
use super::{LpHandlerState, LpReceiverIndex, TimestampedState};
use crate::node::lp::error::LpHandlerError;
use dashmap::mapref::one::RefMut;
use nym_lp::packet::{EncryptedLpPacket, ForwardPacketData};
use nym_lp::state_machine::{LpAction, LpData, LpDataKind, LpInput};
use nym_lp::packet::message::LpMessageType;
use nym_lp::packet::{EncryptedLpPacket, ForwardPacketData, LpMessage};
use nym_lp::state_machine::{LpAction, LpInput};
use nym_lp::transport::LpHandshakeChannel;
use nym_lp::transport::traits::LpTransportChannel;
use nym_lp::{LpSession, LpStateMachine, packet::message::ExpectedResponseSize};
@@ -301,13 +302,14 @@ where
async fn handle_decrypted_payload(
&mut self,
receiver_idx: LpReceiverIndex,
decrypted_data: LpData,
decrypted_data: LpMessage,
) -> Result<(), LpHandlerError> {
let remote = self.remote_addr;
let header = decrypted_data.header;
let bytes = decrypted_data.content;
match decrypted_data.kind {
LpDataKind::Registration => {
match header.kind {
LpMessageType::Registration => {
let request = LpRegistrationRequest::try_deserialise(&bytes)
.map_err(|source| LpHandlerError::MalformedRegistrationRequest { source })?;
@@ -319,13 +321,13 @@ where
self.handle_registration_request(receiver_idx, request)
.await
}
LpDataKind::Forward => {
LpMessageType::Forward => {
let forward_data = ForwardPacketData::decode(&bytes)?;
self.handle_forwarding_request(receiver_idx, forward_data)
.await
}
typ @ LpDataKind::Opaque => {
typ @ LpMessageType::Opaque => {
// Neither registration nor forwarding - unknown payload type
warn!(
"Unknown transport payload type from {remote} (receiver_idx={receiver_idx}). dropping {} bytes",
@@ -341,14 +343,14 @@ where
async fn send_response_packet(
&mut self,
serialised_response: Vec<u8>,
response_kind: LpDataKind,
response_kind: LpMessageType,
) -> Result<(), LpHandlerError> {
let mut state_entry = self.state_entry_mut()?;
// Access session via state machine for subsession support
let state_machine = &mut state_entry.value_mut().state;
let wrapped_lp_data = LpData::new(response_kind, serialised_response);
let wrapped_lp_data = LpMessage::new(response_kind, serialised_response);
// Process packet through state machine
let action = state_machine
@@ -378,7 +380,7 @@ where
.serialise()
.map_err(|source| LpHandlerError::MalformedRegistrationRequest { source })?;
self.send_response_packet(response_bytes, LpDataKind::Registration)
self.send_response_packet(response_bytes, LpMessageType::Registration)
.await?;
match response.status {
@@ -415,7 +417,7 @@ where
// Forward the packet to the target gateway and retrieve its response
let response_bytes = self.handle_forward_packet(forward_data).await?;
self.send_response_packet(response_bytes, LpDataKind::Forward)
self.send_response_packet(response_bytes, LpMessageType::Forward)
.await?;
debug!(
@@ -750,7 +752,7 @@ mod tests {
// Send a valid packet from client side
let LpAction::SendPacket(packet) = init_sm
.send_data(id, LpData::new_opaque(b"foomp".to_vec()))
.send_data(id, LpMessage::new_opaque(b"foomp".to_vec()))
.unwrap()
else {
panic!("illegal state")
@@ -786,7 +788,7 @@ mod tests {
let (mut stream, _) = listener.accept().await.unwrap();
let LpAction::SendPacket(packet) = resp_sm
.send_data(id, LpData::new_opaque(b"foomp".to_vec()))
.send_data(id, LpMessage::new_opaque(b"foomp".to_vec()))
.unwrap()
else {
panic!("illegal state")
@@ -5,7 +5,8 @@
use nym_lp::LpError;
use nym_lp::packet::MalformedLpPacketError;
use nym_lp::state_machine::{LpAction, LpDataKind};
use nym_lp::packet::message::LpMessageType;
use nym_lp::state_machine::LpAction;
use nym_lp::transport::LpTransportError;
use thiserror::Error;
@@ -45,7 +46,7 @@ pub enum LpClientError {
MalformedLpPacket(#[from] MalformedLpPacketError),
#[error("received payload type of an unexpected type: {typ:?}")]
UnexpectedLpPayload { typ: LpDataKind },
UnexpectedLpPayload { typ: LpMessageType },
#[error("timed out while attempting to finish the KKT/PSQ handshake")]
HandshakeTimeout,
@@ -4,23 +4,24 @@
#![allow(dead_code)]
use crate::LpClientError;
use nym_lp::packet::ForwardPacketData;
use nym_lp::packet::message::LpMessageType;
use nym_lp::packet::{ForwardPacketData, LpMessage};
use nym_lp::peer::LpRemotePeer;
use nym_lp::state_machine::{LpAction, LpData, LpDataKind, LpInput};
use nym_lp::state_machine::{LpAction, LpInput};
use nym_registration_common::{
LpRegistrationRequest, LpRegistrationResponse, NymNodeLPInformation,
};
pub(crate) trait LpDataSendExt {
fn to_lp_data(&self) -> Result<LpData, LpClientError>;
fn to_lp_data(&self) -> Result<LpMessage, LpClientError>;
}
pub(crate) trait LpDataDeliverExt: Sized {
fn from_lp_data(data: LpData) -> Result<Self, LpClientError>;
fn from_lp_data(data: LpMessage) -> Result<Self, LpClientError>;
}
impl LpDataSendExt for LpRegistrationRequest {
fn to_lp_data(&self) -> Result<LpData, LpClientError> {
fn to_lp_data(&self) -> Result<LpMessage, LpClientError> {
let request_bytes = self.serialise().map_err(|e| {
LpClientError::SendRegistrationRequest(format!("Failed to serialize request: {e}"))
})?;
@@ -30,14 +31,14 @@ impl LpDataSendExt for LpRegistrationRequest {
request_bytes.len()
);
Ok(LpData::new_registration(request_bytes))
Ok(LpMessage::new_registration(request_bytes))
}
}
impl LpDataDeliverExt for LpRegistrationResponse {
fn from_lp_data(data: LpData) -> Result<Self, LpClientError> {
if data.kind != LpDataKind::Registration {
return Err(LpClientError::UnexpectedLpPayload { typ: data.kind });
fn from_lp_data(data: LpMessage) -> Result<Self, LpClientError> {
if data.kind() != LpMessageType::Registration {
return Err(LpClientError::UnexpectedLpPayload { typ: data.kind() });
}
let response = LpRegistrationResponse::try_deserialise(&data.content)
@@ -48,7 +49,7 @@ impl LpDataDeliverExt for LpRegistrationResponse {
}
impl LpDataSendExt for ForwardPacketData {
fn to_lp_data(&self) -> Result<LpData, LpClientError> {
fn to_lp_data(&self) -> Result<LpMessage, LpClientError> {
let request_bytes = self.to_bytes();
tracing::trace!(
@@ -56,7 +57,7 @@ impl LpDataSendExt for ForwardPacketData {
request_bytes.len()
);
Ok(LpData::new_forward(request_bytes))
Ok(LpMessage::new_forward(request_bytes))
}
}
@@ -70,9 +71,9 @@ pub(crate) fn try_convert_forward_response(action: LpAction) -> Result<Vec<u8>,
action => return Err(LpClientError::UnexpectedStateMachineAction { action }),
};
if response_data.kind != LpDataKind::Forward {
if response_data.kind() != LpMessageType::Forward {
return Err(LpClientError::UnexpectedLpPayload {
typ: response_data.kind,
typ: response_data.kind(),
});
}
@@ -25,10 +25,10 @@ use crate::lp_client::state_machine_helpers::{extract_forwarded_response, prepar
use nym_bandwidth_controller::{BandwidthTicketProvider, DEFAULT_TICKETS_TO_SPEND};
use nym_credentials_interface::TicketType;
use nym_crypto::asymmetric::{ed25519, x25519};
use nym_lp::packet::EncryptedLpPacket;
use nym_lp::packet::version;
use nym_lp::packet::{EncryptedLpPacket, LpMessage};
use nym_lp::peer::{DHKeyPair, LpLocalPeer, LpRemotePeer};
use nym_lp::state_machine::{LpData, LpStateMachine};
use nym_lp::state_machine::LpStateMachine;
use nym_lp::transport::LpHandshakeChannel;
use nym_lp::transport::traits::LpTransportChannel;
use nym_lp::{Ciphersuite, KEM, LpSession};
@@ -128,14 +128,17 @@ impl NestedLpSession {
/// Attempt to wrap the provided `LpData` into a `EncryptedLpPacket`
/// using the inner state machine.
fn prepare_transport_packet(&mut self, data: LpData) -> Result<EncryptedLpPacket> {
fn prepare_transport_packet(&mut self, data: LpMessage) -> Result<EncryptedLpPacket> {
let state_machine = self.state_machine_mut()?;
prepare_send_packet(data, state_machine)
}
/// Attempt to recover received `LpData` from the received `EncryptedLpPacket`
/// using the inner state machine.
fn extract_forwarded_response(&mut self, response_packet: EncryptedLpPacket) -> Result<LpData> {
fn extract_forwarded_response(
&mut self,
response_packet: EncryptedLpPacket,
) -> Result<LpMessage> {
let state_machine = self.state_machine_mut()?;
extract_forwarded_response(response_packet, state_machine)
}
@@ -2,13 +2,14 @@
// SPDX-License-Identifier: Apache-2.0
use crate::LpClientError;
use nym_lp::state_machine::{LpAction, LpData, LpInput};
use nym_lp::packet::LpMessage;
use nym_lp::state_machine::{LpAction, LpInput};
use nym_lp::{LpStateMachine, packet::EncryptedLpPacket};
/// Attempt to prepare the provided data for sending by wrapping it in appropriate `LpAction`,
/// and attempting to extract `EncryptedLpPacket` from the provided state machine.
pub(crate) fn prepare_send_packet(
data: LpData,
data: LpMessage,
state_machine: &mut LpStateMachine,
) -> Result<EncryptedLpPacket, LpClientError> {
let action = state_machine
@@ -26,7 +27,7 @@ pub(crate) fn prepare_send_packet(
pub(crate) fn extract_forwarded_response(
response_packet: EncryptedLpPacket,
state_machine: &mut LpStateMachine,
) -> Result<LpData, LpClientError> {
) -> Result<LpMessage, LpClientError> {
let action = state_machine
.process_input(LpInput::ReceivePacket(response_packet))
.ok_or(LpClientError::UnexpectedStateMachineHalt)??;