Max/sdk stream wrapper (#6320)

* Replace MixnetStream with LP framing
- Replace custom header with LpFrameHeader
- Added sequence number for message ordering

* IPR: support LP Stream-framed client connections
- Detect and route LP Stream frames in mixnet_listener
- Wrap inline responses in LP Stream frames
- Thread stream_id to ConnectedClientHandler for TUN responses

* sdk: add ipr_wrapper module with IpMixStream
- IpMixStream wraps MixnetStream for IPR tunnel over mixnet
- LP Stream framing handled automatically by MixnetStream
- Gateway discovery, connect handshake, IP packet send/receive

* sdk: remove superseded stream_wrapper module

* Trim obvious comments, add architecture.md stub

* sdk: add missing deps and fix warnings

* Cut down architecture diagram until finished with rest of the code, leaving stubs

* sdk: refactor IpMixStream, extract shared helpers

- Extract gateway discovery and connect response parsing
- Add recv() to MixnetStream, remove 64KB read buffer
- Simplify IpMixStream constructor

* Fix SphinxStream renames missed during rebase

* Add IpPacketResponse::from_bytes() for stream-based deserialization

* Clean up ip_packet_client: delete stale connect.rs, take raw bytes not ReconstructedMessage

* Clippy

* Delete unused ip_packet_client modules

- Remove helpers.rs (ICMP utilities moved to example)
- Remove error.rs (errors consolidated into sdk/error.rs)
- Remove README.md
- Update module root to only export discovery + listener

* Simplify listener, IpMixStream, and network_env

- Collapse IprListener struct into standalone handle_ipr_response()
- Move check_ipr_message_version() into listener.rs
- Remove IpMixStream test module (moved to example)
- Remove parse_network() and commented-out Sandbox arms
- Return Result from find_workspace_root() instead of panicking
- Add IprTunnelDisconnected and WorkspaceRootNotFound error variants

* Refactor IPR stream handling and document seq conventions
- Inline stream_id tracking (remove current_stream_id field)
- Re-export encode_stream_frame from clients module
- Document seq=0 reservation for inline control responses
- Document data-path counter starting at 1 with skip-on-wrap

* Add ipr_tunnel example for integration testing
- ICMP ping through IPR with --gateway flag for targeting specific exits
- Move pnet_packet from dependencies to dev-dependencies

* Add message reordering to stream router
- Buffer out-of-order messages per-stream using BTreeMap
- Drain contiguous sequences individually to preserve message boundaries
- Drop duplicate/old sequence numbers with a warning
- Remove dead_code allow on StreamFrame::sequence_num

* Clean up comments and fill architecture.md
- Remove separator line comments
- Update stale comments about ordering not being implemented
- Remove collapsible_if allows, use let-else instead
- Fill in architecture.md data flow and connection lifecycle

* Simplify ipr_tunnel example to minimal smoke test
- Single ping instead of multi-ping loop
- Remove identifier and PING_COUNT
- Collapse ICMP helpers into single build_icmp_ping function

* Add dual-stack IPv6 ping and rename gateway → ipr
- Rename --gateway flag to --ipr and new_with_gateway() to new_with_ipr()
- Add ICMPv6 ping to ipr_tunnel example for dual-stack smoke test
- Tighten echo reply validation (protocol field check, diagnostic output)
- Document IP allocation (subnets, static vs dynamic, client keying) in architecture.md
- Promote LP Stream Open handshake log to INFO

* Tweak subnet comment in docs

* Don't stop IPR listener on decode failure
- Change break to continue so garbage packets can't kill the listener
- Remaining valid packets in the bundle are still processed

* Fix license headers and use workspace dep for pnet_packet
- Switch GPL-3.0 to Apache-2.0 on all SDK library files
- Add missing license headers to 7 files
- Use workspace version for pnet_packet dependency

* Document IP pool isolation from WG/LP dVPN pool
- IPR uses 10.0.0.0/16 on nymtun, WG uses 10.1.0.0/16 on nymwg
- Reference constants.rs as source of truth

* Remove network_env.rs and simplify IpMixStream API
  - Default to mainnet via setup_env(None) instead of requiring env param
  - Remove NetworkEnvironment enum and workspace root detection
  - Remove WorkspaceRootNotFound error variant
  - Update ipr_tunnel example to match new signatures

* Use weighted random selection for IPR gateway discovery
  - Replace max_by_key with choose_weighted biased by performance score
  - Prevents all clients converging on a single highest-performing IPR

* Cap stream reorder buffer to prevent unbounded memory growth
- Add MAX_REORDER_BUFFER (256) to limit per-stream pending messages:
	- buffer overflows = skip ahead to lowest buffered seq and drain
	- protects against malicious senders that deliberately skip sequence numbers

* Extract shared IPR response helpers into nym-ip-packet-requests
  - Add response_helpers module with version check, connect response
    parsing, and control response dispatch
  - SDK ip_packet_client now delegates to shared module
  - Monorepo nym-ip-packet-client uses shared version check and
    connect response parsing
  - Fix doc comment attributing fork to nym-vpn-client

* Extract ICMP test helpers into nym-ip-packet-requests
  - Add icmp_utils module behind test-utils feature flag
  - Move build_icmp_ping, build_icmpv6_ping, is_echo_reply_v4/v6 from
    example
  - Update ipr_tunnel example to use shared helpers

* Add protocol v9 LP-framed transport marker

- Add v9 module (re-exports v8, VERSION=9)
- Accept v9 requests and responses in IPR
- Switch SDK IpMixStream to send v9

* Log protocol version in dynamic connect requests

* Remove KCP from IPR and fix unwrap_or_default in SDK
- Remove all KCP session management from ip-packet-router (replaced by
  LP Stream framing)
- Drop nym-kcp dependency and KcpError variant from IPR
- Replace unwrap_or_default with ok_or(Error::NoNymAPIUrl) in
  IpMixStream::new()

* Add v9 protocol wrapper constructors and enforce version/transport
consistency
- Add v9::new_connect_request(), new_data_request(),
  new_ip_packet_response() to centralise version stamping
- Replace manual protocol.version overrides in SDK and IPR with v9
  wrapper calls
- Bump nym-ip-packet-client current re-export from v8 to v9
- Enforce LP Stream frames must carry v9+ payloads, non-stream must be
  v8 or lower

* Filter IPR exit nodes by minimum v9-compatible release version
- Define MIN_RELEASE_VERSION (1.30.0) in ip-packet-requests/v9 alongside protocol constants
- Add semver-based filtering in SDK gateway discovery to skip nodes below v9 threshold
- Add semver dependency to ip-packet-requests and nym-sdk

* Use numeric version comparison for transport/version enforcement
- Compare version as u8 instead of enum equality so future v10+ is handled correctly
- Remove unused `use super::*` import left over from KCP test removal
This commit is contained in:
mfahampshire
2026-03-27 20:35:26 +00:00
committed by GitHub
parent cc799b69d3
commit c07ef0253d
35 changed files with 1338 additions and 873 deletions
Generated
+7 -1
View File
@@ -6957,12 +6957,15 @@ dependencies = [
"nym-crypto",
"nym-service-provider-requests-common",
"nym-sphinx",
"pnet_packet",
"rand 0.8.5",
"semver 1.0.27",
"serde",
"thiserror 2.0.12",
"time",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
@@ -6985,7 +6988,7 @@ dependencies = [
"nym-exit-policy",
"nym-id",
"nym-ip-packet-requests",
"nym-kcp",
"nym-lp",
"nym-network-defaults",
"nym-network-requester",
"nym-sdk",
@@ -7792,6 +7795,7 @@ dependencies = [
"nym-crypto",
"nym-gateway-requests",
"nym-http-api-client",
"nym-ip-packet-requests",
"nym-lp",
"nym-network-defaults",
"nym-ordered-buffer",
@@ -7805,8 +7809,10 @@ dependencies = [
"nym-topology",
"nym-validator-client",
"parking_lot",
"pnet_packet",
"rand 0.8.5",
"reqwest 0.13.1",
"semver 1.0.27",
"serde",
"tap",
"tempfile",
+6
View File
@@ -10,7 +10,11 @@ license.workspace = true
description = "Codec, signing functionality, and different version definitions for IP packet request and responses"
[features]
test-utils = ["pnet_packet"]
[dependencies]
pnet_packet = { workspace = true, optional = true }
bincode = { workspace = true }
bytes = { workspace = true }
nym-bin-common = { workspace = true }
@@ -18,8 +22,10 @@ nym-crypto = { workspace = true }
nym-service-provider-requests-common = { workspace = true }
nym-sphinx = { workspace = true }
rand = { workspace = true }
semver = { workspace = true }
serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
time = { workspace = true }
tokio = { workspace = true, features = ["time"] }
tokio-util = { workspace = true, features = ["codec"] }
tracing = { workspace = true }
+117
View File
@@ -0,0 +1,117 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
// Extracted from sdk/rust/nym-sdk/examples/ipr_tunnel.rs
//! ICMP/ICMPv6 packet construction and reply detection helpers for testing
//! IPR connectivity. Gated behind the `test-utils` feature.
use std::net::{Ipv4Addr, Ipv6Addr};
use pnet_packet::Packet;
use pnet_packet::icmp::echo_reply::EchoReplyPacket;
use pnet_packet::icmp::echo_request::MutableEchoRequestPacket;
use pnet_packet::icmp::{IcmpPacket, IcmpTypes};
use pnet_packet::icmpv6::Icmpv6Types;
use pnet_packet::ipv4::{Ipv4Flags, MutableIpv4Packet};
use pnet_packet::ipv6::MutableIpv6Packet;
/// Build a complete IPv4 ICMP echo request packet.
pub fn build_icmp_ping(src: Ipv4Addr, dst: Ipv4Addr, seq: u16) -> Option<Vec<u8>> {
let mut echo = MutableEchoRequestPacket::owned(vec![0u8; 64])?;
echo.set_icmp_type(IcmpTypes::EchoRequest);
echo.set_icmp_code(pnet_packet::icmp::IcmpCode::new(0));
echo.set_sequence_number(seq);
let cksum = pnet_packet::icmp::checksum(&IcmpPacket::new(echo.packet())?);
echo.set_checksum(cksum);
let total_len = 20 + echo.packet().len();
let mut ip = MutableIpv4Packet::owned(vec![0u8; total_len])?;
ip.set_version(4);
ip.set_header_length(5);
ip.set_total_length(total_len as u16);
ip.set_ttl(64);
ip.set_next_level_protocol(pnet_packet::ip::IpNextHeaderProtocols::Icmp);
ip.set_source(src);
ip.set_destination(dst);
ip.set_flags(Ipv4Flags::DontFragment);
ip.set_payload(echo.packet());
let mut buf = ip.consume_to_immutable().packet().to_vec();
let cksum = ipv4_checksum(&buf);
buf[10] = (cksum >> 8) as u8;
buf[11] = cksum as u8;
Some(buf)
}
/// Build a complete IPv6 ICMPv6 echo request packet.
pub fn build_icmpv6_ping(src: Ipv6Addr, dst: Ipv6Addr, seq: u16) -> Option<Vec<u8>> {
let mut echo =
pnet_packet::icmpv6::echo_request::MutableEchoRequestPacket::owned(vec![0u8; 64])?;
echo.set_icmpv6_type(Icmpv6Types::EchoRequest);
echo.set_icmpv6_code(pnet_packet::icmpv6::Icmpv6Code::new(0));
echo.set_sequence_number(seq);
let cksum = pnet_packet::icmpv6::checksum(
&pnet_packet::icmpv6::Icmpv6Packet::new(echo.packet())?,
&src,
&dst,
);
echo.set_checksum(cksum);
let payload_len = echo.packet().len();
let mut ip = MutableIpv6Packet::owned(vec![0u8; 40 + payload_len])?;
ip.set_version(6);
ip.set_payload_length(payload_len as u16);
ip.set_next_header(pnet_packet::ip::IpNextHeaderProtocols::Icmpv6);
ip.set_hop_limit(64);
ip.set_source(src);
ip.set_destination(dst);
ip.set_payload(echo.packet());
Some(ip.consume_to_immutable().packet().to_vec())
}
/// Check if a raw packet is an IPv4 ICMP echo reply destined to `expected_dst`.
pub fn is_echo_reply_v4(data: &[u8], expected_dst: Ipv4Addr) -> bool {
let Some(ip) = pnet_packet::ipv4::Ipv4Packet::new(data) else {
return false;
};
if ip.get_destination() != expected_dst {
return false;
}
if ip.get_next_level_protocol() != pnet_packet::ip::IpNextHeaderProtocols::Icmp {
return false;
}
let Some(reply) = EchoReplyPacket::new(ip.payload()) else {
return false;
};
reply.get_icmp_type() == IcmpTypes::EchoReply
}
/// Check if a raw packet is an IPv6 ICMPv6 echo reply destined to `expected_dst`.
pub fn is_echo_reply_v6(data: &[u8], expected_dst: Ipv6Addr) -> bool {
let Some(ip) = pnet_packet::ipv6::Ipv6Packet::new(data) else {
return false;
};
if ip.get_destination() != expected_dst {
return false;
}
if ip.get_next_header() != pnet_packet::ip::IpNextHeaderProtocols::Icmpv6 {
return false;
}
let Some(reply) = pnet_packet::icmpv6::echo_reply::EchoReplyPacket::new(ip.payload()) else {
return false;
};
reply.get_icmpv6_type() == Icmpv6Types::EchoReply
}
fn ipv4_checksum(header: &[u8]) -> u16 {
let mut sum = 0u32;
for i in (0..20).step_by(2) {
sum += ((header[i] as u32) << 8) | header[i + 1] as u32;
}
while (sum >> 16) > 0 {
sum = (sum & 0xFFFF) + (sum >> 16);
}
!sum as u16
}
+6
View File
@@ -3,10 +3,14 @@ use std::fmt::{Display, Formatter};
use std::net::{Ipv4Addr, Ipv6Addr};
pub mod codec;
#[cfg(feature = "test-utils")]
pub mod icmp_utils;
pub mod response_helpers;
pub mod sign;
pub mod v6;
pub mod v7;
pub mod v8;
pub mod v9;
// version 3: initial version
// version 4: IPv6 support
@@ -14,6 +18,8 @@ pub mod v8;
// version 6: Increase the available IPs
// version 7: Add signature support (for the future)
// version 8: Anonymous sends
// version 9: LP-framed transport (SphinxStream)
// response_helpers: shared IPR response parsing (nym-ip-packet-client + nym-sdk)
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct IpPair {
@@ -0,0 +1,134 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use bytes::{Bytes, BytesMut};
use tokio_util::codec::Decoder;
use tracing::{error, info, warn};
use crate::{
IpPair,
codec::MultiIpPacketCodec,
v8::response::{
ConnectResponseReply, ControlResponse, InfoLevel, IpPacketResponse, IpPacketResponseData,
},
};
#[derive(Debug, thiserror::Error)]
pub enum IprResponseError {
#[error("no version byte in message")]
NoVersionByte,
#[error("version mismatch: received v{received}, expected v{expected}")]
VersionMismatch { expected: u8, received: u8 },
#[error("expected control response, got {0:?}")]
UnexpectedResponse(IpPacketResponseData),
#[error("connect denied: {0:?}")]
ConnectDenied(crate::v8::response::ConnectFailureReason),
}
pub enum MixnetMessageOutcome {
IpPackets(Vec<Bytes>),
Disconnect,
}
// Extracted from:
// nym-ip-packet-client/src/helpers.rs — check_ipr_message_version()
// sdk/rust/nym-sdk/src/ip_packet_client/listener.rs — check_ipr_message_version()
/// Check that the first byte of an IPR message matches the expected protocol version.
pub fn check_ipr_message_version(data: &[u8], expected: u8) -> Result<(), IprResponseError> {
let version = data.first().ok_or(IprResponseError::NoVersionByte)?;
if *version != expected {
return Err(IprResponseError::VersionMismatch {
expected,
received: *version,
});
}
Ok(())
}
// Extracted from:
// nym-ip-packet-client/src/connect.rs — handle_connect_response() + handle_ip_packet_router_response()
// sdk/rust/nym-sdk/src/ip_packet_client/discovery.rs — parse_connect_response()
/// Parse an IPR connect response, returning allocated IPs on success.
pub fn parse_connect_response(response: IpPacketResponse) -> Result<IpPair, IprResponseError> {
let control_response = match response.data {
IpPacketResponseData::Control(c) => c,
other => return Err(IprResponseError::UnexpectedResponse(other)),
};
match *control_response {
ControlResponse::Connect(connect_resp) => match connect_resp.reply {
ConnectResponseReply::Success(success) => Ok(success.ips),
ConnectResponseReply::Failure(reason) => Err(IprResponseError::ConnectDenied(reason)),
},
_ => Err(IprResponseError::UnexpectedResponse(
IpPacketResponseData::Control(control_response),
)),
}
}
// Extracted from:
// nym-ip-packet-client/src/listener.rs — IprListener::handle_reconstructed_message()
// sdk/rust/nym-sdk/src/ip_packet_client/listener.rs — handle_ipr_response()
/// Parse raw IPR response bytes into an outcome.
///
/// Logs non-fatal conditions (unknown control messages, deserialization
/// failures) and returns `None` for them.
pub fn handle_ipr_response(data: &[u8]) -> Option<MixnetMessageOutcome> {
match IpPacketResponse::from_bytes(data) {
Ok(response) => match response.data {
IpPacketResponseData::Data(data_response) => {
let mut codec = MultiIpPacketCodec::new();
let mut buf = BytesMut::from(data_response.ip_packet.as_ref());
let mut packets = Vec::new();
loop {
match codec.decode(&mut buf) {
Ok(Some(packet)) => packets.push(packet.into_bytes()),
Ok(None) => break,
Err(e) => {
warn!("Failed to decode bundled IP packet: {e}");
break;
}
}
}
Some(MixnetMessageOutcome::IpPackets(packets))
}
IpPacketResponseData::Control(control_response) => match *control_response {
ControlResponse::Connect(_) => {
info!("Received connect response when already connected - ignoring");
None
}
ControlResponse::Disconnect(_) | ControlResponse::UnrequestedDisconnect(_) => {
info!("Received disconnect from IPR");
Some(MixnetMessageOutcome::Disconnect)
}
ControlResponse::Pong(_) => {
info!("Received pong response");
None
}
ControlResponse::Health(_) => {
info!("Received health response");
None
}
ControlResponse::Info(info_resp) => {
let msg = format!(
"Received info response from the mixnet: {}",
info_resp.reply
);
match info_resp.level {
InfoLevel::Info => info!("{msg}"),
InfoLevel::Warn => warn!("{msg}"),
InfoLevel::Error => error!("{msg}"),
}
None
}
},
},
Err(err) => {
warn!("Failed to deserialize IPR response: {err}");
None
}
}
}
+6 -2
View File
@@ -179,11 +179,15 @@ impl IpPacketResponse {
make_bincode_serializer().serialize(self)
}
pub fn from_bytes(data: &[u8]) -> Result<Self, bincode::Error> {
use bincode::Options;
make_bincode_serializer().deserialize(data)
}
pub fn from_reconstructed_message(
message: &nym_sphinx::receiver::ReconstructedMessage,
) -> Result<Self, bincode::Error> {
use bincode::Options;
make_bincode_serializer().deserialize(&message.message)
Self::from_bytes(&message.message)
}
}
+34
View File
@@ -0,0 +1,34 @@
pub const VERSION: u8 = 9;
/// Minimum nym-node release version that supports v9 (LP Stream framing).
/// Nodes running older versions will not understand LP-wrapped packets.
pub const MIN_RELEASE_VERSION: semver::Version = semver::Version::new(1, 30, 0);
// v9 uses the same wire format as v8. The version bump indicates
// the message was sent with LP framing (SphinxStream).
//
// Types are re-exported for deserialization/matching. Use the wrapper
// constructors below to create correctly-versioned packets — never
// manually set `protocol.version` or `response.version`.
pub use super::v8::{request, response};
/// Create a v9 connect request (version byte set to 9).
pub fn new_connect_request(buffer_timeout: Option<u64>) -> (request::IpPacketRequest, u64) {
let (mut req, id) = request::IpPacketRequest::new_connect_request(buffer_timeout);
req.protocol.version = VERSION;
(req, id)
}
/// Create a v9 data request (version byte set to 9).
pub fn new_data_request(data: bytes::Bytes) -> request::IpPacketRequest {
let mut req = request::IpPacketRequest::new_data_request(data);
req.protocol.version = VERSION;
req
}
/// Create a v9 IP packet response (version byte set to 9).
pub fn new_ip_packet_response(ip_packet: bytes::Bytes) -> response::IpPacketResponse {
let mut resp = response::IpPacketResponse::new_ip_packet(ip_packet);
resp.version = VERSION;
resp
}
+11 -34
View File
@@ -11,14 +11,10 @@ use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use nym_ip_packet_requests::response_helpers::{self, IprResponseError};
use crate::{
current::{
request::IpPacketRequest,
response::{
ConnectResponse, ConnectResponseReply, ControlResponse, IpPacketResponse,
IpPacketResponseData,
},
},
current::{request::IpPacketRequest, response::IpPacketResponse},
error::{Error, Result},
helpers::check_ipr_message_version,
};
@@ -101,32 +97,6 @@ impl IprClientConnect {
Ok(request_id)
}
async fn handle_connect_response(&self, response: ConnectResponse) -> Result<IpPair> {
debug!("Handling dynamic connect response");
match response.reply {
ConnectResponseReply::Success(r) => Ok(r.ips),
ConnectResponseReply::Failure(reason) => Err(Error::ConnectRequestDenied { reason }),
}
}
async fn handle_ip_packet_router_response(&self, response: IpPacketResponse) -> Result<IpPair> {
let control_response = match response.data {
IpPacketResponseData::Control(control_response) => control_response,
_ => {
error!("Received non-control response while waiting for connect response");
return Err(Error::UnexpectedConnectResponse);
}
};
match *control_response {
ControlResponse::Connect(resp) => self.handle_connect_response(resp).await,
response => {
error!("Unexpected response: {response:?}");
Err(Error::UnexpectedConnectResponse)
}
}
}
async fn listen_for_connect_response(&mut self, request_id: u64) -> Result<IpPair> {
// Connecting is basically synchronous from the perspective of the mixnet client, so it's safe
// to just grab ahold of the mutex and keep it until we get the response.
@@ -173,7 +143,14 @@ impl IprClientConnect {
if response.id() == Some(request_id) {
tracing::debug!("Got response with matching id");
return self.handle_ip_packet_router_response(response).await;
// Replaces local handle_ip_packet_router_response() + handle_connect_response()
return response_helpers::parse_connect_response(response)
.map_err(|e| match e {
IprResponseError::ConnectDenied(reason) => {
Error::ConnectRequestDenied { reason }
}
_ => Error::UnexpectedConnectResponse,
});
}
}
}
+15 -21
View File
@@ -1,30 +1,24 @@
// Copyright 2023-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use std::cmp::Ordering;
use nym_ip_packet_requests::response_helpers::IprResponseError;
use nym_sdk::mixnet::ReconstructedMessage;
use crate::{Error, current::VERSION as CURRENT_VERSION, error::Result};
use crate::{current::VERSION as CURRENT_VERSION, error::Result};
pub(crate) fn check_ipr_message_version(message: &ReconstructedMessage) -> Result<()> {
// Assuming it's a IPR message, it will have a version as its first byte
if let Some(version) = message.message.first() {
match version.cmp(&CURRENT_VERSION) {
Ordering::Greater => Err(Error::ReceivedResponseWithNewVersion {
expected: CURRENT_VERSION,
received: *version,
}),
Ordering::Less => Err(Error::ReceivedResponseWithOldVersion {
expected: CURRENT_VERSION,
received: *version,
}),
Ordering::Equal => {
// We're good
Ok(())
}
nym_ip_packet_requests::response_helpers::check_ipr_message_version(
&message.message,
CURRENT_VERSION,
)
.map_err(|e| match e {
IprResponseError::NoVersionByte => crate::Error::NoVersionInMessage,
IprResponseError::VersionMismatch { expected, received } if received < expected => {
crate::Error::ReceivedResponseWithOldVersion { expected, received }
}
} else {
Err(Error::NoVersionInMessage)
}
IprResponseError::VersionMismatch { expected, received } => {
crate::Error::ReceivedResponseWithNewVersion { expected, received }
}
_ => crate::Error::NoVersionInMessage,
})
}
+1 -1
View File
@@ -11,4 +11,4 @@ pub use error::Error;
pub use listener::{IprListener, MixnetMessageOutcome};
// Re-export the currently used version
pub use nym_ip_packet_requests::v8 as current;
pub use nym_ip_packet_requests::v9 as current;
+5
View File
@@ -65,6 +65,9 @@ url = { workspace = true }
toml = { workspace = true }
tempfile = { workspace = true }
nym-ip-packet-requests = { workspace = true }
semver = { workspace = true }
# tcpproxy dependencies
clap = { workspace = true, features = ["derive"] }
anyhow.workspace = true
@@ -94,6 +97,8 @@ tokio-stream = { workspace = true }
tokio-util = { workspace = true, features = ["codec"] }
parking_lot = { workspace = true }
hex = { workspace = true }
pnet_packet = { workspace = true }
nym-ip-packet-requests = { workspace = true, features = ["test-utils"] }
[features]
libp2p-vanilla = []
+92
View File
@@ -0,0 +1,92 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
// Smoke test for IpMixStream: connect to an IPR, send a ping, check we get a reply.
// Tests both IPv4 and IPv6 paths.
//
// Usage:
// cargo run --example ipr_tunnel
// cargo run --example ipr_tunnel -- --ipr <IPR_ADDRESS>
//
// e.g. cargo run --example ipr_tunnel -- --ipr 6B6iuWX4bQP4GVA4Yq7XmZencaaGw6BaPY6xJWYSwsbF.6g6LRx1fgU2Q2A4ZPKonYHtfBARh1GPMe1LtXk6vpRR8@q2A2cbooyC16YJzvdYaSMH9X3cSiieZNtfBr8cE8Fi1
use std::net::{Ipv4Addr, Ipv6Addr};
use std::time::Duration;
use nym_ip_packet_requests::codec::MultiIpPacketCodec;
use nym_ip_packet_requests::icmp_utils::{
build_icmp_ping, build_icmpv6_ping, is_echo_reply_v4, is_echo_reply_v6,
};
use nym_sdk::ipr_wrapper::IpMixStream;
const PING4_TARGET: Ipv4Addr = Ipv4Addr::new(8, 8, 8, 8);
const PING6_TARGET: Ipv6Addr = Ipv6Addr::new(0x2001, 0x4860, 0x4860, 0, 0, 0, 0, 0x8888);
#[tokio::main]
async fn main() -> anyhow::Result<()> {
nym_bin_common::logging::setup_tracing_logger();
let args: Vec<String> = std::env::args().collect();
let ipr_addr = args
.iter()
.position(|a| a == "--ipr")
.and_then(|i| args.get(i + 1));
let mut tunnel = if let Some(addr) = ipr_addr {
let recipient = addr.parse().expect("invalid IPR address");
IpMixStream::new_with_ipr(recipient).await?
} else {
IpMixStream::new().await?
};
let ips = tunnel.allocated_ips();
let src4 = ips.ipv4;
let src6 = ips.ipv6;
println!("Tunnel up — IPv4: {src4}, IPv6: {src6}");
// Send IPv4 ping (ICMP seq=0, unrelated to LP Stream sequence numbers)
let pkt4 = build_icmp_ping(src4, PING4_TARGET, 0).expect("failed to build ICMP packet");
let bundled = MultiIpPacketCodec::bundle_one_packet(pkt4.into());
tunnel.send_ip_packet(&bundled).await?;
println!("Sent ping → {PING4_TARGET}");
// Send IPv6 ping
let pkt6 = build_icmpv6_ping(src6, PING6_TARGET, 0).expect("failed to build ICMPv6 packet");
let bundled = MultiIpPacketCodec::bundle_one_packet(pkt6.into());
tunnel.send_ip_packet(&bundled).await?;
println!("Sent ping → {PING6_TARGET}");
let mut got_v4 = false;
let mut got_v6 = false;
let deadline = tokio::time::sleep(Duration::from_secs(30));
tokio::pin!(deadline);
loop {
tokio::select! {
_ = &mut deadline => {
if !got_v4 { println!("FAIL — no IPv4 reply within 30s"); }
if !got_v6 { println!("FAIL — no IPv6 reply within 30s"); }
break;
}
result = tunnel.handle_incoming() => {
for pkt in result? {
if !got_v4 && is_echo_reply_v4(&pkt, src4) {
println!("OK — got IPv4 echo reply");
got_v4 = true;
}
if !got_v6 && is_echo_reply_v6(&pkt, src6) {
println!("OK — got IPv6 echo reply");
got_v6 = true;
}
if got_v4 && got_v6 {
tunnel.disconnect().await;
return Ok(());
}
}
}
}
}
tunnel.disconnect().await;
Ok(())
}
+36
View File
@@ -1,3 +1,8 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_ip_packet_requests::v8::response::{ConnectFailureReason, IpPacketResponseData};
use nym_validator_client::nym_api::error::NymAPIError;
use nym_validator_client::nyxd::error::NyxdError;
use std::path::PathBuf;
@@ -108,6 +113,37 @@ pub enum Error {
#[error("Stream subsystem failed to initialise: reconstructed_receiver unavailable")]
StreamInitFailure,
#[error("client not connected")]
IprStreamClientNotConnected,
#[error("listening for connection response timed out")]
IPRConnectResponseTimeout,
#[error("stream closed")]
IPRClientStreamClosed,
#[error("expected control response, got {0:?}")]
UnexpectedResponseType(IpPacketResponseData),
#[error("connect denied: {0:?}")]
ConnectDenied(ConnectFailureReason),
#[allow(clippy::result_large_err)]
#[error("api directory error: {0}")]
GatewayDirectoryError(#[from] NymAPIError),
#[error("did not receive Nym API URL")]
NoNymAPIUrl,
#[error("no available gateway")]
NoGatewayAvailable,
#[error("tunnel disconnected by IPR")]
IprTunnelDisconnected,
#[error("message version check failed: {0}")]
IPRMessageVersionCheckFailed(String),
}
impl Error {
+17
View File
@@ -0,0 +1,17 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! IPR helpers for [`IpMixStream`](crate::ipr_wrapper::IpMixStream).
//!
//! - **Discovery** (`discovery`): queries the Nym API for IPR-enabled exit gateways.
//! - **Response handling** (`listener`): thin wrappers around
//! [`nym_ip_packet_requests::response_helpers`] that add version checking
//! and error mapping for SDK use.
pub mod discovery;
pub mod listener;
pub use listener::{handle_ipr_response, MixnetMessageOutcome};
// Re-export the currently used version
pub use nym_ip_packet_requests::v9 as current;
@@ -0,0 +1,121 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! IPR gateway discovery — find and rank IPR-enabled exit gateways via the Nym API.
use std::collections::HashMap;
use nym_crypto::asymmetric::ed25519;
use nym_ip_packet_requests::v9;
use nym_network_defaults::ApiUrl;
use nym_sphinx::addressing::clients::Recipient;
use nym_validator_client::nym_api::NymApiClientExt;
use tracing::{debug, error, info};
use rand::seq::SliceRandom;
use crate::Error;
#[derive(Clone)]
pub struct IprWithPerformance {
pub address: Recipient,
pub identity: ed25519::PublicKey,
pub performance: u8,
}
#[allow(clippy::result_large_err)]
pub fn create_nym_api_client(
nym_api_urls: Vec<ApiUrl>,
) -> Result<nym_http_api_client::Client, Error> {
let user_agent = format!("nym-sdk/{}", env!("CARGO_PKG_VERSION"));
let urls = nym_api_urls
.into_iter()
.map(|url| url.url.parse())
.collect::<Result<Vec<nym_http_api_client::Url>, _>>()
.map_err(|err| {
error!("malformed nym-api url: {err}");
Error::NoNymAPIUrl
})?;
if urls.is_empty() {
return Err(Error::NoNymAPIUrl);
}
let client = nym_http_api_client::ClientBuilder::new_with_urls(urls)?
.with_user_agent(user_agent)
.build()?;
Ok(client)
}
pub async fn retrieve_exit_nodes_with_performance(
client: nym_http_api_client::Client,
) -> Result<Vec<IprWithPerformance>, Error> {
let all_nodes = client
.get_all_described_nodes_v2()
.await?
.into_iter()
.map(|described| (described.ed25519_identity_key(), described))
.collect::<HashMap<_, _>>();
let exit_gateways = client.get_all_basic_nodes_with_metadata().await?.nodes;
let mut described = Vec::new();
for exit in exit_gateways {
let Some(node) = all_nodes.get(&exit.ed25519_identity_pubkey) else {
continue;
};
// Only select nodes running a version that supports v9 (LP Stream framing)
let Ok(node_version) = semver::Version::parse(node.version()) else {
debug!(
"Skipping node {}: unable to parse version '{}'",
exit.ed25519_identity_pubkey,
node.version()
);
continue;
};
if node_version < v9::MIN_RELEASE_VERSION {
debug!(
"Skipping node {}: version {} < minimum {}",
exit.ed25519_identity_pubkey,
node_version,
v9::MIN_RELEASE_VERSION
);
continue;
}
if let Some(ipr_info) = node.description.ip_packet_router.clone() {
if let Ok(parsed_address) = ipr_info.address.parse() {
described.push(IprWithPerformance {
address: parsed_address,
identity: exit.ed25519_identity_pubkey,
performance: exit.performance.round_to_integer(),
})
}
}
}
Ok(described)
}
/// Select the highest-performance IPR gateway from the directory.
pub async fn get_best_ipr(client: nym_http_api_client::Client) -> Result<Recipient, Error> {
let nodes = retrieve_exit_nodes_with_performance(client).await?;
info!("Found {} Exit Gateways", nodes.len());
let selected_ipr = nodes
.choose_weighted(&mut rand::thread_rng(), |gw| gw.performance as f64)
.map_err(|_| Error::NoGatewayAvailable)?;
let ipr_address = selected_ipr.address;
info!(
"Using IPR: {} (Gateway: {}, Performance: {:?})",
ipr_address, selected_ipr.identity, selected_ipr.performance
);
Ok(ipr_address)
}
@@ -0,0 +1,20 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub use nym_ip_packet_requests::response_helpers::MixnetMessageOutcome;
use nym_ip_packet_requests::response_helpers;
use crate::ip_packet_client::current::VERSION as CURRENT_VERSION;
/// Check that the first byte of an IPR message matches the expected protocol version.
pub(crate) fn check_ipr_message_version(data: &[u8]) -> Result<(), crate::Error> {
response_helpers::check_ipr_message_version(data, CURRENT_VERSION)
.map_err(|e| crate::Error::IPRMessageVersionCheckFailed(e.to_string()))
}
/// Parse raw IPR response bytes into an outcome.
pub fn handle_ipr_response(data: &[u8]) -> Result<Option<MixnetMessageOutcome>, crate::Error> {
check_ipr_message_version(data)?;
Ok(response_helpers::handle_ipr_response(data))
}
+13
View File
@@ -0,0 +1,13 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! High-level IPR (IP Packet Router) stream wrapper.
//!
//! [`IpMixStream`] tunnels IP packets through the Nym mixnet to an exit
//! gateway running an IP Packet Router. Both requests and responses are
//! wrapped in LP Stream frames for type-safe detection at the IPR and
//! dispatch by the client's stream router.
mod ip_mix_stream;
pub use ip_mix_stream::IpMixStream;
@@ -0,0 +1,80 @@
# IpMixStream Architecture
## Overview
`IpMixStream` tunnels IP packets through the Nym mixnet to an IP Packet Router
(IPR) exit gateway. It provides a high-level API over a single `MixnetStream`,
which handles LP Stream framing and Sphinx packet transport automatically.
## Data Flow
```text
Client IPR
| |
|-- IpPacketRequest (connect) ---- LP Stream ----->|
|<--- IpPacketResponse (ips) ---- LP Stream (s=0) -|
| |
|-- IpPacketRequest (data) ------- LP Stream ----->| -> TUN -> internet
|<--- IpPacketResponse (data) --- LP Stream (s=1+) | <- TUN <- internet
```
## Layer Stack
```text
IpMixStream IPR protocol (connect, data, disconnect)
MixnetStream AsyncRead + AsyncWrite, LP Stream framing, seq numbers
Stream Router Dispatches inbound messages by stream_id
MixnetClient Sphinx packet encryption, SURB management
Mixnet Entry GW -> Mix1 -> Mix2 -> Mix3 -> Exit GW
```
## LP Stream Framing
All messages between client and IPR are wrapped in LP Stream frames:
- **Client -> IPR**: `MixnetStream.write()` wraps each write in an LP Stream
Data frame (stream_id, sequence number, payload). The IPR detects
`LpFrameKind::Stream` and strips the header before processing.
- **IPR -> Client**: Both inline responses (connect handshake, pong) and async
TUN responses are wrapped in LP Stream frames with the same stream_id. The
client's stream router dispatches by stream_id to the correct `MixnetStream`.
## IP Allocation
The IPR owns a subnet (`10.0.0.0/16` for IPv4, `fc00::/112` for IPv6) and
allocates addresses to clients on connect. Two modes are supported:
- **Dynamic** (used by `IpMixStream`): The IPR picks a random unused `IpPair`
from the pool (up to 100 retries). `IpMixStream` uses this path.
- **Static**: The client requests specific IPs; the IPR checks availability.
On dynamic connect:
1. IPR calls `find_new_ip()` — random selection from the subnet
2. Registers the client in dual `HashMap<Ipv4Addr, Client>` / `HashMap<Ipv6Addr, Client>`
maps (keyed by allocated IP). The client identity (anonymous sender tag
for v8, Nym address for v6/v7) is stored inside the `ConnectedClient` value.
3. Returns `DynamicConnectSuccess { ips }` to the client
The client uses the allocated IPv4/IPv6 as its source address when constructing
packets. When the TUN device receives a response destined for that IP, the IPR
looks up which client owns it and forwards the packet back through the mixnet.
Reserved addresses: `10.0.0.1` / `fc00::1` (TUN gateway, never allocated).
> **Note — pool isolation from LP dVPN:** This pool (`10.0.0.0/16`, `fc00::/112`
> on `nymtun`) is separate from the authenticator/LP dVPN WireGuard pool
> (`10.1.0.0/16`, `fc01::/112` on `nymwg`). The second octet differs (`10.0` vs
> `10.1`) and each uses its own TUN interface. Both sets of constants live in
> `common/network-defaults/src/constants.rs` (`mixnet_vpn` vs `wireguard` modules).
> The IPR addresses are hardcoded; the WG addresses are configurable but default to
> the non-overlapping range.
## Connection Lifecycle
1. `IpMixStream::new()` discovers the best IPR via Nym API
2. Opens a `MixnetStream` to the IPR (`client.open_stream()`)
3. Sends a dynamic connect request, IPR allocates an `IpPair`
4. Ready for `send_ip_packet()` / `handle_incoming()` loop
5. `disconnect()` shuts down the mixnet client
@@ -0,0 +1,192 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::ip_packet_client::{
discovery::{create_nym_api_client, get_best_ipr},
handle_ipr_response,
listener::check_ipr_message_version,
MixnetMessageOutcome,
};
use crate::mixnet::{MixnetClient, MixnetStream, Recipient};
use crate::Error;
use bytes::Bytes;
use nym_ip_packet_requests::response_helpers;
use nym_ip_packet_requests::{
v9::{self, response::IpPacketResponse},
IpPair,
};
use nym_network_defaults::NymNetworkDetails;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tracing::{debug, info};
const IPR_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
/// A bidirectional tunnel for sending and receiving IP packets through the mixnet.
///
/// Wraps a [`MixnetStream`] (opened to an IPR exit gateway) and provides a
/// high-level API for the IPR protocol. The underlying `MixnetStream` handles
/// LP Stream framing and stream multiplexing automatically.
///
/// # Data flow
///
/// ```text
/// IpMixStream.send_ip_packet(bytes)
/// → IpPacketRequest::to_bytes() → MixnetStream.write()
/// → LP Stream frame (stream_id, seq, Data)
/// → Sphinx packets → mixnet → IPR
///
/// IPR processes request → TUN → internet → response
/// → IPR wraps in LP Stream frame → Sphinx → mixnet → client
/// → stream router dispatches by stream_id
/// → MixnetStream.recv() → IpPacketResponse bytes
/// → handle_ipr_response() → extract IP packets
/// ```
pub struct IpMixStream {
stream: MixnetStream,
client: MixnetClient,
allocated_ips: IpPair,
connected: bool,
}
impl IpMixStream {
/// Discover the best IPR, connect through the mixnet, and establish the IP tunnel.
///
/// Returns a ready-to-use tunnel with allocated IP addresses.
pub async fn new() -> Result<Self, Error> {
let network_defaults = NymNetworkDetails::new_mainnet();
let api_client =
create_nym_api_client(network_defaults.nym_api_urls.ok_or(Error::NoNymAPIUrl)?)?;
let ipr_address = get_best_ipr(api_client).await?;
Self::new_with_ipr(ipr_address).await
}
/// Connect to a specific IPR address.
///
/// Use this when you already know the IPR `Recipient` address (e.g. for
/// testing against a specific exit node). For automatic discovery, use
/// [`IpMixStream::new`] instead.
pub async fn new_with_ipr(ipr_address: Recipient) -> Result<Self, Error> {
nym_network_defaults::setup_env(None::<&str>);
let mut client = MixnetClient::connect_new().await?;
let mut stream = client.open_stream(ipr_address, Some(10)).await?;
info!("Connecting to IP packet router at {ipr_address}");
let allocated_ips = Self::connect_tunnel(&mut stream).await?;
info!(
"Connected — IPv4: {}, IPv6: {}",
allocated_ips.ipv4, allocated_ips.ipv6
);
Ok(Self {
stream,
client,
allocated_ips,
connected: true,
})
}
pub fn nym_address(&self) -> &Recipient {
self.client.nym_address()
}
pub fn allocated_ips(&self) -> &IpPair {
&self.allocated_ips
}
pub fn is_connected(&self) -> bool {
self.connected
}
/// Check that the tunnel is connected, returning an error if not.
pub fn check_connected(&self) -> Result<(), Error> {
if self.connected {
Ok(())
} else {
Err(Error::IprStreamClientNotConnected)
}
}
async fn connect_tunnel(stream: &mut MixnetStream) -> Result<IpPair, Error> {
let (request, request_id) = v9::new_connect_request(None);
debug!("Sending connect request with ID: {}", request_id);
let request_bytes = request.to_bytes()?;
stream
.write_all(&request_bytes)
.await
.map_err(|_| Error::MessageSendingFailure)?;
let timeout = tokio::time::sleep(IPR_CONNECT_TIMEOUT);
tokio::pin!(timeout);
loop {
tokio::select! {
_ = &mut timeout => {
return Err(Error::IPRConnectResponseTimeout);
}
result = stream.recv() => {
let data = result.ok_or(Error::IPRClientStreamClosed)?;
check_ipr_message_version(&data)?;
if let Ok(response) = IpPacketResponse::from_bytes(&data) {
if response.id() == Some(request_id) {
return response_helpers::parse_connect_response(response)
.map_err(|e| match e {
response_helpers::IprResponseError::ConnectDenied(r) => Error::ConnectDenied(r),
response_helpers::IprResponseError::UnexpectedResponse(d) => Error::UnexpectedResponseType(d),
other => Error::IPRMessageVersionCheckFailed(other.to_string()),
});
}
}
}
}
}
}
/// Send an IP packet through the tunnel.
pub async fn send_ip_packet(&mut self, packet: &[u8]) -> Result<(), Error> {
self.check_connected()?;
let request = v9::new_data_request(packet.to_vec().into());
let request_bytes = request.to_bytes()?;
self.stream
.write_all(&request_bytes)
.await
.map_err(|_| Error::MessageSendingFailure)
}
/// Handle incoming messages from the mixnet.
///
/// Reads from the underlying `MixnetStream`, parses IPR responses, and
/// extracts IP packets. Returns an empty vec on timeout (10 s).
pub async fn handle_incoming(&mut self) -> Result<Vec<Bytes>, Error> {
let data = match tokio::time::timeout(Duration::from_secs(10), self.stream.recv()).await {
Err(_) => return Ok(Vec::new()),
Ok(None) => {
self.connected = false;
return Err(Error::IPRClientStreamClosed);
}
Ok(Some(data)) => data,
};
match handle_ipr_response(&data) {
Ok(Some(MixnetMessageOutcome::IpPackets(packets))) => {
debug!("Extracted {} IP packets", packets.len());
Ok(packets)
}
Ok(Some(MixnetMessageOutcome::Disconnect)) => {
info!("Received disconnect");
self.connected = false;
Err(Error::IprTunnelDisconnected)
}
Ok(None) => Ok(Vec::new()),
Err(e) => Err(e),
}
}
/// Disconnect from the Mixnet. Disconnected clients cannot be reconnected.
pub async fn disconnect(self) {
debug!("Disconnecting");
self.client.disconnect().await;
debug!("Disconnected");
}
}
+6
View File
@@ -1,13 +1,19 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! Rust SDK for the Nym platform
//!
//! The main component currently is [`mixnet`].
//! [`tcp_proxy`] is probably a good place to start for anyone wanting to integrate with existing app code and read/write from a socket.
//! [`client_pool`] is a configurable client pool.
//! [`ipr_wrapper`] tunnels IP packets through the mixnet to an IPR exit gateway.
mod error;
pub mod bandwidth;
pub mod client_pool;
pub mod ip_packet_client;
pub mod ipr_wrapper;
pub mod mixnet;
pub mod tcp_proxy;
@@ -1,3 +1,6 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! Per-stream handle implementing `AsyncRead + AsyncWrite`.
use std::pin::Pin;
@@ -111,6 +114,17 @@ impl MixnetStream {
self.id
}
/// Receive a single message payload directly from the stream channel.
///
/// Returns `None` on EOF (channel closed). Drains any leftover from
/// a prior `AsyncRead` call first.
pub async fn recv(&mut self) -> Option<Vec<u8>> {
if !self.read_buf.is_empty() {
return Some(self.read_buf.split().to_vec());
}
self.inbound_rx.recv().await
}
/// Wrap `data` in the appropriate `InputMessage` for this stream's destination.
fn make_input_message(&self, data: Vec<u8>) -> InputMessage {
match &self.destination {
+98 -15
View File
@@ -1,3 +1,6 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! Stream multiplexing for `MixnetClient`.
//!
//! A [`MixnetStream`] is a byte channel (`AsyncRead + AsyncWrite`) to a
@@ -9,12 +12,12 @@
//! stream's channel (or to the listener for `Open` messages).
mod mixnet_stream;
mod protocol;
pub(crate) mod protocol;
pub use mixnet_stream::MixnetStream;
pub use protocol::StreamId;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
@@ -47,11 +50,25 @@ pub(crate) const DEFAULT_STREAM_IDLE_TIMEOUT: Duration = Duration::from_secs(30
const MAX_CLEANUP_INTERVAL: Duration = Duration::from_secs(10);
/// Per-stream state stored in the routing table.
///
/// Reorder buffer uses the same BTreeMap pattern as `OrderedMessageBuffer`
/// (`common/socks5/ordered-buffer/`) but drains per-message instead of
/// concatenating, so `recv()` preserves message boundaries.
struct StreamEntry {
sender: mpsc::UnboundedSender<Vec<u8>>,
last_activity: Instant,
next_seq: u32,
pending: BTreeMap<u32, Vec<u8>>,
}
/// Maximum number of out-of-order messages buffered per stream before we
/// skip ahead. Without this cap, a malicious sender that deliberately skips
/// a sequence number (e.g. never sends seq 1) could cause the buffer to
/// grow indefinitely while the drain loop waits for the missing seq.
/// The idle timeout only reaps *inactive* streams, so an actively-sending
/// attacker would bypass it.
const MAX_REORDER_BUFFER: usize = 256;
/// The shared stream routing table.
///
/// Wraps the map of active streams behind an async mutex with focused
@@ -76,6 +93,8 @@ impl StreamMap {
StreamEntry {
sender: tx,
last_activity: Instant::now(),
next_seq: 0,
pending: BTreeMap::new(),
},
);
rx
@@ -95,17 +114,49 @@ impl StreamMap {
});
}
/// Dispatch data to a stream's channel. Updates `last_activity` on
/// success. Removes the entry if the receiver has been dropped.
async fn send_to_stream(&self, stream_id: &StreamId, data: Vec<u8>) {
/// Buffer a message and flush any contiguous sequence to the channel.
/// Updates `last_activity` on success; removes the entry if the
/// receiver has been dropped.
async fn send_to_stream(&self, stream_id: &StreamId, seq: u32, data: Vec<u8>) {
let mut map = self.inner.lock().await;
let should_remove = if let Some(entry) = map.get_mut(stream_id) {
if entry.sender.send(data).is_err() {
true
if seq < entry.next_seq {
warn!(
"Stream {stream_id}: dropping old seq {seq} (expected >= {})",
entry.next_seq
);
} else {
entry.last_activity = Instant::now();
false
entry.pending.insert(seq, data);
}
// If the buffer has grown too large, skip ahead to the lowest
// buffered seq so we don't accumulate unbounded memory.
if entry.pending.len() > MAX_REORDER_BUFFER {
if let Some(&lowest) = entry.pending.keys().next() {
warn!(
"Stream {stream_id}: reorder buffer overflow ({} pending), \
skipping seq {} -> {lowest}",
entry.pending.len(),
entry.next_seq
);
entry.next_seq = lowest;
}
}
// Drain contiguous messages
let mut failed = false;
while let Some(msg) = entry.pending.remove(&entry.next_seq) {
if entry.sender.send(msg).is_err() {
failed = true;
break;
}
entry.next_seq += 1;
}
if !failed {
entry.last_activity = Instant::now();
}
failed
} else {
false
};
@@ -242,7 +293,7 @@ async fn run_router(
}
SphinxStreamMsgType::Data => {
streams
.send_to_stream(&stream_id, frame.data.to_vec())
.send_to_stream(&stream_id, frame.sequence_num, frame.data.to_vec())
.await;
}
}
@@ -295,10 +346,9 @@ pub(crate) async fn open_stream(
let stream_id = StreamId::random();
let rx = streams.register_stream(stream_id).await;
// Currently hardcoded as we don't have message ordering in place *yet* in the SDK
// streams - when it *is* added then it will set the receiver's expected starting seq
// number. Gives us the ability down the road to e.g. pick up a dropped stream from
// where it left off.
// Open message with seq=0. The receiver's reorder buffer starts at
// next_seq=0 so this could later carry an initial seq to resume a
// dropped stream from where it left off.
let wire = encode_stream_message(&stream_id, SphinxStreamMsgType::Open, 0, &[]);
let msg = InputMessage::new_anonymous(
recipient,
@@ -379,7 +429,7 @@ mod tests {
tokio::time::advance(Duration::from_secs(8)).await;
// Activity on the stream resets its timer
map.send_to_stream(&id, vec![1, 2, 3]).await;
map.send_to_stream(&id, 0, vec![1, 2, 3]).await;
// Advance past the original timeout, but only 5s since last activity
tokio::time::advance(Duration::from_secs(5)).await;
@@ -405,4 +455,37 @@ mod tests {
assert_eq!(map.inner.lock().await.len(), 1);
}
#[tokio::test]
async fn out_of_order_messages_delivered_in_sequence() {
let map = StreamMap::new();
let id = StreamId::random();
let mut rx = map.register_stream(id).await;
// Send seq 2, 0, 1 out of order
map.send_to_stream(&id, 2, vec![20]).await;
map.send_to_stream(&id, 0, vec![0]).await;
// seq 0 should be delivered now, but 2 is buffered (gap at 1)
assert_eq!(rx.recv().await.unwrap(), vec![0]);
// Fill the gap — both 1 and 2 should flush
map.send_to_stream(&id, 1, vec![10]).await;
assert_eq!(rx.recv().await.unwrap(), vec![10]);
assert_eq!(rx.recv().await.unwrap(), vec![20]);
}
#[tokio::test]
async fn duplicate_seq_is_dropped() {
let map = StreamMap::new();
let id = StreamId::random();
let mut rx = map.register_stream(id).await;
map.send_to_stream(&id, 0, vec![0]).await;
map.send_to_stream(&id, 0, vec![99]).await; // duplicate, dropped
map.send_to_stream(&id, 1, vec![1]).await;
assert_eq!(rx.recv().await.unwrap(), vec![0]);
assert_eq!(rx.recv().await.unwrap(), vec![1]);
}
}
@@ -1,3 +1,6 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! Wire protocol for stream multiplexing.
//!
//! Every message between streams carries an LP frame header prepended to
@@ -53,7 +56,6 @@ impl fmt::Display for StreamId {
pub struct StreamFrame<'a> {
pub stream_id: StreamId,
pub msg_type: SphinxStreamMsgType,
#[allow(dead_code)] // will be used for reordering
pub sequence_num: u32,
pub data: &'a [u8],
}
@@ -26,7 +26,7 @@ nym-crypto = { workspace = true }
nym-exit-policy = { workspace = true }
nym-id = { workspace = true }
nym-ip-packet-requests = { workspace = true }
nym-kcp = { path = "../../common/nym-kcp" } # TODO MAX add to workspace dependencies
nym-lp = { workspace = true }
nym-network-defaults = { workspace = true }
nym-network-requester = { path = "../network-requester" }
nym-sdk = { workspace = true }
@@ -1,6 +1,7 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use bytes::{Bytes, BytesMut};
@@ -9,6 +10,10 @@ use nym_ip_packet_requests::{
v6::response::IpPacketResponse as IpPacketResponseV6,
v7::response::IpPacketResponse as IpPacketResponseV7,
v8::response::IpPacketResponse as IpPacketResponseV8,
v9,
};
use nym_lp::packet::frame::{
LpFrame, LpFrameHeader, SphinxStreamFrameAttributes, SphinxStreamMsgType,
};
use nym_sdk::mixnet::{
InputMessage, MixnetClientSender, MixnetMessageSender, MixnetMessageSinkTranslator,
@@ -26,6 +31,19 @@ use crate::{
messages::ClientVersion,
};
/// Encode a payload inside an LP Stream frame: `[LpFrameHeader][payload]`.
pub(crate) fn encode_stream_frame(stream_id: u64, sequence_num: u32, payload: Vec<u8>) -> Vec<u8> {
let attrs = SphinxStreamFrameAttributes {
stream_id,
msg_type: SphinxStreamMsgType::Data,
sequence_num,
};
let frame = LpFrame::new_stream(attrs, payload);
let mut buf = BytesMut::with_capacity(LpFrameHeader::SIZE + frame.content.len());
frame.encode(&mut buf);
buf.to_vec()
}
// Data flow
// Out: mixnet_listener -> decode -> handle_packet -> write_to_tun
// In: tun_listener -> [connected_client_handler -> encode] -> mixnet_sender
@@ -64,6 +82,7 @@ impl ConnectedClientHandler {
buffer_timeout: Duration,
client_version: ClientVersion,
mixnet_client_sender: MixnetClientSender,
stream_id: Option<u64>,
) -> (
mpsc::UnboundedSender<Vec<u8>>,
oneshot::Sender<()>,
@@ -71,6 +90,9 @@ impl ConnectedClientHandler {
) {
log::debug!("Starting connected client handler for: {client_id}");
log::debug!("client version: {client_version:?}");
if let Some(sid) = stream_id {
log::debug!("LP Stream mode: stream_id={sid:#018x}");
}
let (close_tx, close_rx) = oneshot::channel();
let (forward_from_tun_tx, forward_from_tun_rx) = mpsc::unbounded_channel();
@@ -86,6 +108,13 @@ impl ConnectedClientHandler {
let input_message_creator = ToIprDataResponse {
send_to: client_id.clone(),
client_version,
stream_id,
// Start at 1 — seq=0 is reserved for inline control responses
// (connect handshake, pong, health) sent by
// handle_stream_response() in mixnet_listener.rs.
// The skip-on-wrap logic in to_input_message() ensures we never
// emit seq=0 from the data path either.
next_response_seq: AtomicU32::new(1),
};
let connected_client_handler = ConnectedClientHandler {
@@ -189,15 +218,21 @@ fn create_ip_packet_response(
ClientVersion::V6 => IpPacketResponseV6::new_ip_packet(packets).to_bytes(),
ClientVersion::V7 => IpPacketResponseV7::new_ip_packet(packets).to_bytes(),
ClientVersion::V8 => IpPacketResponseV8::new_ip_packet(packets).to_bytes(),
ClientVersion::V9 => v9::new_ip_packet_response(packets).to_bytes(),
}
}
// This struct is used by the sink to translate the the bundled IP packets into a IPR packet
// responses that can be sent to the mixnet.
#[derive(Clone, Debug)]
// This struct is used by the sink to translate the bundled IP packets into IPR packet
// responses that can be sent to the mixnet. When `stream_id` is set, responses are
// wrapped in LP Stream frames so the client's stream router can dispatch them.
#[derive(Debug)]
struct ToIprDataResponse {
send_to: ConnectedClientId,
client_version: ClientVersion,
/// When Some, wrap responses in LP Stream frames with this stream_id.
stream_id: Option<u64>,
/// Sequence number for LP Stream response frames.
next_response_seq: AtomicU32,
}
impl MixnetMessageSinkTranslator for ToIprDataResponse {
@@ -205,12 +240,24 @@ impl MixnetMessageSinkTranslator for ToIprDataResponse {
&self,
bundled_ip_packets: &[u8],
) -> std::result::Result<InputMessage, nym_sdk::Error> {
// Create a IPR packet response that the recipient can understand
let response_packet = create_ip_packet_response(bundled_ip_packets, self.client_version)?;
// Wrap the response packet in a mixnet input message
// Optionally wrap in LP Stream frame for stream-mode clients.
// Seq counter starts at 1 and skips 0 on wrap-around because seq=0
// is used by handle_stream_response() for inline control responses
// (see comment in mixnet_listener.rs for the full story).
let final_packet = if let Some(stream_id) = self.stream_id {
let mut seq = self.next_response_seq.fetch_add(1, Ordering::Relaxed);
if seq == 0 {
seq = self.next_response_seq.fetch_add(1, Ordering::Relaxed);
}
encode_stream_frame(stream_id, seq, response_packet)
} else {
response_packet
};
let input_message =
crate::util::create_message::create_input_message(&self.send_to, response_packet)
crate::util::create_message::create_input_message(&self.send_to, final_packet)
.with_max_retransmissions(0);
Ok(input_message)
@@ -282,6 +329,8 @@ mod tests {
let bytes_to_input_message = ToIprDataResponse {
send_to: client_id.clone(),
client_version,
stream_id: None,
next_response_seq: AtomicU32::new(0),
};
let mixnet_ip_packet_sender = MixnetMessageSink::new_with_custom_translator(
@@ -6,7 +6,7 @@ mod connected_client_handler;
mod connected_clients;
pub(crate) use client_id::ConnectedClientId;
pub(crate) use connected_client_handler::ConnectedClientHandler;
pub(crate) use connected_client_handler::{ConnectedClientHandler, encode_stream_frame};
pub(crate) use connected_clients::{
ConnectEvent, ConnectedClientEvent, ConnectedClients, DisconnectEvent,
};
@@ -117,8 +117,8 @@ pub enum IpPacketRouterError {
#[error("failed to deserialize protocol: {source}")]
FailedToDeserializeProtocol { source: ProtocolError },
#[error("KCP protocol error: {0}")]
KcpError(String),
#[error("{0}")]
Other(String),
}
pub type Result<T> = std::result::Result<T, IpPacketRouterError>;
@@ -180,7 +180,6 @@ impl IpPacketRouter {
mixnet_client,
shutdown_token: self.shutdown.clone_shutdown_token(),
connected_clients,
kcp_session_manager: crate::kcp_session_manager::KcpSessionManager::new(),
};
log::info!("The address of this client is: {self_address}");
@@ -1,504 +0,0 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
#![allow(unused)]
//! KCP Session Manager for LP clients at the exit gateway.
//!
//! This module sits between Sphinx unwrapping and IPR message processing.
//! It maintains per-client KCP state (keyed by conv_id from KCP packets),
//! reassembles KCP fragments into complete messages, and wraps responses
//! in KCP for SURB replies.
//!
//! # Architecture
//!
//! ```text
//! Mixnet → [Sphinx unwrap] → [KCP Session Manager] → [IPR message handling]
//! ↓
//! KCP sessions per conv_id
//! ↓
//! Reassemble fragments → DataRequest
//! ```
//!
//! # Design Notes
//!
//! - Conv ID is extracted from the first 4 bytes of KCP packet data
//! - SURBs are stored per conv_id for sending replies
//! - Pattern follows `nym-lp-node::Node` from lewes-protocol
use bytes::BytesMut;
use nym_kcp::driver::KcpDriver;
use nym_kcp::packet::KcpPacket;
use nym_kcp::session::KcpSession;
use nym_sphinx::anonymous_replies::ReplySurb;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use std::collections::{HashMap, VecDeque};
use std::time::{Duration, Instant};
use crate::error::IpPacketRouterError;
/// Default session timeout (5 minutes, matching IPR client timeout)
const SESSION_TIMEOUT: Duration = Duration::from_secs(300);
/// Maximum concurrent KCP sessions per exit gateway
const MAX_SESSIONS: usize = 10000;
/// State for a single KCP session
struct KcpSessionState {
driver: KcpDriver,
/// SURBs for sending replies back to this client
surbs: VecDeque<ReplySurb>,
/// Last activity timestamp
last_activity: Instant,
/// The sender tag associated with this session (for logging/debugging)
sender_tag: Option<AnonymousSenderTag>,
}
impl KcpSessionState {
fn new(conv_id: u32) -> Self {
let session = KcpSession::new(conv_id);
Self {
driver: KcpDriver::new(session),
surbs: VecDeque::new(),
last_activity: Instant::now(),
sender_tag: None,
}
}
fn touch(&mut self) {
self.last_activity = Instant::now();
}
fn is_expired(&self, timeout: Duration) -> bool {
self.last_activity.elapsed() > timeout
}
fn add_surbs(&mut self, surbs: Vec<ReplySurb>) {
self.surbs.extend(surbs);
}
fn take_surb(&mut self) -> Option<ReplySurb> {
self.surbs.pop_front()
}
fn surb_count(&self) -> usize {
self.surbs.len()
}
}
/// KCP Session Manager maintains per-client KCP state for LP clients.
///
/// It intercepts incoming Sphinx payloads containing KCP data, extracts KCP frames,
/// reassembles them into complete messages, and returns the assembled data for IPR processing.
///
/// Sessions are keyed by `conv_id` (first 4 bytes of KCP packet), which is derived
/// by clients from their local and remote addresses.
pub struct KcpSessionManager {
/// KCP sessions keyed by conv_id (from KCP packet header)
sessions: HashMap<u32, KcpSessionState>,
/// Session timeout duration
timeout: Duration,
/// Maximum number of sessions
max_sessions: usize,
}
/// Result of processing incoming KCP data from a client
pub(crate) struct KcpProcessingResult {
/// The conv_id extracted from the KCP packet
pub(crate) conversation_id: u32,
/// Vector of decoded KCP packets (for inspection/logging)
pub(crate) decoded_packets: Vec<KcpPacket>,
/// Vector of complete reassembled messages ready for IPR processing
pub(crate) reassembled_messages: Vec<Vec<u8>>,
}
impl Default for KcpSessionManager {
fn default() -> Self {
Self::new()
}
}
impl KcpSessionManager {
/// Create a new KCP Session Manager with default settings
pub fn new() -> Self {
Self {
sessions: HashMap::new(),
timeout: SESSION_TIMEOUT,
max_sessions: MAX_SESSIONS,
}
}
/// Create a new KCP Session Manager with custom settings
pub fn with_config(timeout: Duration, max_sessions: usize) -> Self {
Self {
sessions: HashMap::new(),
timeout,
max_sessions,
}
}
/// Process incoming KCP data from a client.
///
/// Takes raw KCP-encoded data (from a RepliableMessage). The conv_id is extracted
/// from the first 4 bytes of the KCP data.
///
/// # Arguments
/// * `kcp_data` - Raw KCP packet data (conv_id is in first 4 bytes)
/// * `reply_surbs` - SURBs attached to the message for sending replies
/// * `sender_tag` - The anonymous sender tag (for logging/association)
/// * `current_time_ms` - Current time in milliseconds for KCP timing
///
/// # Returns
/// A tuple containing:
/// - The conv_id extracted from the KCP packet
/// - A vector of decoded KCP packets (for inspection/logging)
/// - A vector of complete reassembled messages ready for IPR processing
pub fn process_incoming(
&mut self,
kcp_data: &[u8],
reply_surbs: Vec<ReplySurb>,
sender_tag: Option<AnonymousSenderTag>,
current_time_ms: u64,
) -> Result<KcpProcessingResult, IpPacketRouterError> {
if kcp_data.len() < 4 {
return Err(IpPacketRouterError::KcpError(
"KCP data too short to contain conv_id".to_string(),
));
}
// Extract conv_id from first 4 bytes of KCP packet
let conv_id = u32::from_le_bytes([kcp_data[0], kcp_data[1], kcp_data[2], kcp_data[3]]);
// Get or create session
self.ensure_session(conv_id, sender_tag)?;
let session = self
.sessions
.get_mut(&conv_id)
.ok_or_else(|| IpPacketRouterError::KcpError("Session not found".to_string()))?;
session.touch();
// Store SURBs for later replies
session.add_surbs(reply_surbs);
// Input the KCP data and get decoded packets
let decoded_packets = match session.driver.input(kcp_data) {
Ok(pkts) => pkts,
Err(e) => {
log::warn!("KCP input error for conv_id {}: {}", conv_id, e);
return Err(IpPacketRouterError::KcpError(e.to_string()));
}
};
// Update KCP state machine
session.driver.update(current_time_ms);
// Collect any complete messages
let incoming_messages: Vec<Vec<u8>> = session
.driver
.fetch_incoming()
.into_iter()
.map(|buf| buf.to_vec())
.collect();
Ok(KcpProcessingResult {
conversation_id: conv_id,
decoded_packets,
reassembled_messages: incoming_messages,
})
}
/// Wrap outgoing data in KCP for sending via SURB.
///
/// # Arguments
/// * `conv_id` - The conversation ID
/// * `data` - The data to wrap in KCP
/// * `current_time_ms` - Current time in milliseconds for KCP timing
///
/// # Returns
/// KCP-encoded packets ready to send
pub fn wrap_response(
&mut self,
conv_id: u32,
data: &[u8],
current_time_ms: u64,
) -> Result<Vec<u8>, IpPacketRouterError> {
let session = self
.sessions
.get_mut(&conv_id)
.ok_or_else(|| IpPacketRouterError::KcpError("No session for conv_id".to_string()))?;
session.touch();
// Queue the data for sending
session.driver.send(data);
// Update to generate outgoing packets
session.driver.update(current_time_ms);
// Fetch outgoing KCP packets and encode
let packets = session.driver.fetch_outgoing();
let mut buf = BytesMut::new();
for pkt in packets {
pkt.encode(&mut buf);
}
Ok(buf.to_vec())
}
/// Take a SURB for sending a reply to a client.
///
/// # Arguments
/// * `conv_id` - The conversation ID
///
/// # Returns
/// A SURB if available, None otherwise
pub fn take_surb(&mut self, conv_id: u32) -> Option<ReplySurb> {
self.sessions.get_mut(&conv_id)?.take_surb()
}
/// Get the number of available SURBs for a session
pub fn surb_count(&self, conv_id: u32) -> usize {
self.sessions
.get(&conv_id)
.map(|s| s.surb_count())
.unwrap_or(0)
}
/// Get the sender_tag associated with a session.
///
/// Returns None if the session doesn't exist or has no sender_tag.
pub fn get_sender_tag(&self, conv_id: u32) -> Option<AnonymousSenderTag> {
self.sessions.get(&conv_id)?.sender_tag
}
/// Fetch any pending outgoing KCP packets for a specific session.
///
/// This is used to send immediate ACKs after receiving packets,
/// rather than waiting for the periodic tick.
pub fn fetch_outgoing_for_conv(
&mut self,
conv_id: u32,
current_time_ms: u64,
) -> Option<Vec<u8>> {
let session = self.sessions.get_mut(&conv_id)?;
session.driver.update(current_time_ms);
let packets = session.driver.fetch_outgoing();
if packets.is_empty() {
return None;
}
let mut buf = BytesMut::new();
for pkt in packets {
pkt.encode(&mut buf);
}
Some(buf.to_vec())
}
/// Periodic update for all sessions.
///
/// This should be called periodically (e.g., every 10-100ms) to:
/// - Drive KCP state machines (retransmissions, etc.)
/// - Clean up expired sessions
///
/// Returns a list of (conv_id, outgoing_data) pairs for any sessions
/// that have pending outgoing packets.
pub fn tick(&mut self, current_time_ms: u64) -> Vec<(u32, Vec<u8>)> {
let mut outgoing = Vec::new();
for (&conv_id, session) in self.sessions.iter_mut() {
session.driver.update(current_time_ms);
let packets = session.driver.fetch_outgoing();
if !packets.is_empty() {
let mut buf = BytesMut::new();
for pkt in packets {
pkt.encode(&mut buf);
}
outgoing.push((conv_id, buf.to_vec()));
}
}
// Clean up expired sessions
self.cleanup_expired();
outgoing
}
/// Remove expired sessions.
pub fn cleanup_expired(&mut self) {
let timeout = self.timeout;
self.sessions.retain(|conv_id, state| {
let expired = state.is_expired(timeout);
if expired {
log::debug!("Removing expired KCP session for conv_id {}", conv_id);
}
!expired
});
}
/// Get the number of active sessions
pub fn session_count(&self) -> usize {
self.sessions.len()
}
/// Check if a session exists for the given conv_id
pub fn has_session(&self, conv_id: u32) -> bool {
self.sessions.contains_key(&conv_id)
}
/// Ensure a session exists for the given conv_id, creating one if needed
fn ensure_session(
&mut self,
conv_id: u32,
sender_tag: Option<AnonymousSenderTag>,
) -> Result<(), IpPacketRouterError> {
if self.sessions.contains_key(&conv_id) {
// Update sender_tag if provided
if let Some(tag) = sender_tag
&& let Some(session) = self.sessions.get_mut(&conv_id)
{
session.sender_tag = Some(tag);
}
return Ok(());
}
// Check session limit
if self.sessions.len() >= self.max_sessions {
// Try to clean up expired sessions first
self.cleanup_expired();
// Still at limit?
if self.sessions.len() >= self.max_sessions {
return Err(IpPacketRouterError::KcpError(
"Maximum KCP sessions reached".to_string(),
));
}
}
log::debug!("Creating new KCP session for conv_id {}", conv_id);
let mut state = KcpSessionState::new(conv_id);
state.sender_tag = sender_tag;
self.sessions.insert(conv_id, state);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_session_creation() {
let mut manager = KcpSessionManager::new();
assert!(!manager.has_session(12345));
assert_eq!(manager.session_count(), 0);
// Create a minimal KCP packet (just conv_id)
let conv_id: u32 = 12345;
let mut kcp_data = conv_id.to_le_bytes().to_vec();
// Add minimal header padding to make it look like a packet
kcp_data.extend_from_slice(&[0u8; 21]); // KCP header is 25 bytes total
// Processing data should create a session
let result = manager.process_incoming(&kcp_data, vec![], None, 0);
// May error due to invalid KCP packet, but session should be created
let _ = result;
assert!(manager.has_session(conv_id));
assert_eq!(manager.session_count(), 1);
}
#[test]
fn test_session_expiry() {
let mut manager = KcpSessionManager::with_config(Duration::from_millis(10), 100);
let conv_id: u32 = 99999;
// Create session directly
manager.ensure_session(conv_id, None).unwrap();
assert!(manager.has_session(conv_id));
// Wait for expiry
std::thread::sleep(Duration::from_millis(20));
manager.cleanup_expired();
assert!(!manager.has_session(conv_id));
}
#[test]
fn test_max_sessions_limit() {
let mut manager = KcpSessionManager::with_config(Duration::from_secs(300), 2);
manager.ensure_session(1, None).unwrap();
manager.ensure_session(2, None).unwrap();
assert_eq!(manager.session_count(), 2);
// Third session should fail
let result = manager.ensure_session(3, None);
assert!(result.is_err());
}
#[test]
fn test_kcp_roundtrip() {
use nym_kcp::driver::KcpDriver;
use nym_kcp::session::KcpSession;
let mut manager = KcpSessionManager::new();
let conv_id: u32 = 42424242;
// Create a "client" KCP session to send data
let client_session = KcpSession::new(conv_id);
let mut client_driver = KcpDriver::new(client_session);
// Client sends a message
let message = b"Hello, IPR via KCP!";
client_driver.send(message);
client_driver.update(100);
// Get the KCP packets from the client
let outgoing = client_driver.fetch_outgoing();
assert!(!outgoing.is_empty(), "Client should produce KCP packets");
// Encode packets
let mut kcp_data = BytesMut::new();
for pkt in outgoing {
pkt.encode(&mut kcp_data);
}
// Feed to the session manager
let res = manager
.process_incoming(&kcp_data, vec![], None, 100)
.expect("process_incoming should succeed");
// Verify conv_id was extracted correctly
assert_eq!(res.conversation_id, conv_id);
// Should have received the complete message
assert_eq!(res.reassembled_messages.len(), 1);
assert_eq!(res.reassembled_messages[0], message);
}
#[test]
fn test_surb_storage() {
let mut manager = KcpSessionManager::new();
let conv_id: u32 = 11111;
// Create session
manager.ensure_session(conv_id, None).unwrap();
// Initially no SURBs
assert_eq!(manager.surb_count(conv_id), 0);
assert!(manager.take_surb(conv_id).is_none());
// Note: We can't easily create ReplySurbs in tests without complex setup,
// but the storage mechanism is tested via the session state
}
}
@@ -17,7 +17,6 @@ pub(crate) mod non_linux_dummy;
mod clients;
mod constants;
mod ip_packet_router;
mod kcp_session_manager;
mod mixnet_client;
mod mixnet_listener;
mod tun_listener;
@@ -9,6 +9,7 @@ pub(crate) enum ClientVersion {
V6,
V7,
V8,
V9,
}
impl ClientVersion {
@@ -17,6 +18,7 @@ impl ClientVersion {
ClientVersion::V6 => 6,
ClientVersion::V7 => 7,
ClientVersion::V8 => 8,
ClientVersion::V9 => 9,
}
}
}
@@ -4,6 +4,7 @@
mod v6;
mod v7;
mod v8;
mod v9;
use nym_ip_packet_requests::{
IpPair, v6::request::IpPacketRequest as IpPacketRequestV6,
@@ -129,6 +130,16 @@ impl TryFrom<&ReconstructedMessage> for IpPacketRequest {
.ok_or(IpPacketRouterError::MissingSenderTag)?;
Ok(IpPacketRequest::from((request_v8, sender_tag)))
}
9 => {
let request_v8 = IpPacketRequestV8::from_reconstructed_message(reconstructed)
.map_err(
|source| IpPacketRouterError::FailedToDeserializeTaggedPacket { source },
)?;
let sender_tag = reconstructed
.sender_tag
.ok_or(IpPacketRouterError::MissingSenderTag)?;
Ok(v9::convert(request_v8, sender_tag))
}
_ => {
log::info!("Received packet with invalid version: v{request_version}");
Err(IpPacketRouterError::InvalidPacketVersion(request_version))
@@ -137,6 +148,21 @@ impl TryFrom<&ReconstructedMessage> for IpPacketRequest {
}
}
impl IpPacketRequest {
pub(crate) fn version(&self) -> ClientVersion {
match self {
IpPacketRequest::Data(r) => r.version,
IpPacketRequest::Control(c) => match c {
ControlRequest::StaticConnect(r) => r.version,
ControlRequest::DynamicConnect(r) => r.version,
ControlRequest::Disconnect(r) => r.version,
ControlRequest::Ping(r) => r.version,
ControlRequest::Health(r) => r.version,
},
}
}
}
impl fmt::Display for IpPacketRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
@@ -0,0 +1,27 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use nym_ip_packet_requests::v8::request::{
IpPacketRequest as IpPacketRequestV8, IpPacketRequestData as IpPacketRequestDataV8,
};
use nym_sdk::mixnet::AnonymousSenderTag;
use super::{ClientVersion, IpPacketRequest};
// v9 uses the same wire format as v8, so we reuse the v8 deserialization
// and just tag the result with ClientVersion::V9.
//
// We cannot implement From<(IpPacketRequestV8, AnonymousSenderTag)> again
// because v8.rs already has that impl (same concrete types).
pub(crate) fn convert(
request: IpPacketRequestV8,
sender_tag: AnonymousSenderTag,
) -> IpPacketRequest {
let version = ClientVersion::V9;
match request.data {
IpPacketRequestDataV8::Data(inner) => IpPacketRequest::Data((inner, version).into()),
IpPacketRequestDataV8::Control(inner) => {
IpPacketRequest::Control((*inner, sender_tag, version).into())
}
}
}
@@ -130,6 +130,7 @@ impl VersionedResponse {
ClientVersion::V6 => IpPacketResponseV6::try_from(self)?.to_bytes(),
ClientVersion::V7 => IpPacketResponseV7::try_from(self)?.to_bytes(),
ClientVersion::V8 => IpPacketResponseV8::try_from(self)?.to_bytes(),
ClientVersion::V9 => IpPacketResponseV8::try_from(self)?.to_bytes(),
}
.map_err(|err| IpPacketRouterError::FailedToSerializeResponsePacket { source: err })
}
@@ -6,7 +6,6 @@ use crate::{
config::Config,
constants::DISCONNECT_TIMER_INTERVAL,
error::{IpPacketRouterError, Result},
kcp_session_manager::KcpSessionManager,
messages::{
ClientVersion,
request::{
@@ -24,6 +23,7 @@ use crate::{
};
use futures::StreamExt;
use nym_ip_packet_requests::codec::MultiIpPacketCodec;
use nym_lp::packet::frame::{LpFrameHeader, LpFrameKind, SphinxStreamFrameAttributes};
use nym_sdk::mixnet::MixnetMessageSender;
use nym_sphinx::receiver::ReconstructedMessage;
use nym_task::ShutdownToken;
@@ -31,9 +31,6 @@ use std::{net::SocketAddr, time::Duration};
use tokio::io::AsyncWriteExt;
use tokio_util::codec::FramedRead;
/// KCP tick interval for session updates (retransmissions, ACKs, cleanup)
const KCP_TICK_INTERVAL: Duration = Duration::from_millis(10);
#[cfg(not(target_os = "linux"))]
type TunDevice = crate::non_linux_dummy::DummyDevice;
@@ -60,45 +57,6 @@ pub(crate) struct MixnetListener {
// The map of connected clients that the mixnet listener keeps track of. It monitors
// activity and disconnects clients that have been inactive for too long.
pub(crate) connected_clients: ConnectedClients,
// KCP session manager for LP clients sending KCP-wrapped messages
pub(crate) kcp_session_manager: KcpSessionManager,
}
/// Check if a message payload appears to be KCP-wrapped.
///
/// KCP packets have a 25-byte header with the command byte at position 4.
/// Valid KCP commands are: Push(81), Ack(82), Wask(83), Wins(84).
///
/// This is distinguishable from IPR protocol messages which have:
/// - Version byte at position 0: 6, 7, or 8
/// - ServiceProviderType at position 1: 0, 1, or 2 (for v8+)
///
/// We use a two-step heuristic:
/// 1. Exclude messages that look like IPR protocol headers
/// 2. Check if byte 4 contains a valid KCP command (81-84)
///
/// See: `Protocol::try_from` in service-provider-requests-common for header format.
fn is_kcp_message(data: &[u8]) -> bool {
// Need at least 25 bytes for KCP header
if data.len() < 25 {
return false;
}
// First, check if this looks like an IPR protocol message.
// IPR messages have: byte 0 = version (6-8), byte 1 = ServiceProviderType (0-2 for v8+)
// See: IpPacketRequest::try_from in messages/request.rs
let version_byte = data[0];
let service_type_byte = data[1];
if (6..=8).contains(&version_byte) && service_type_byte <= 2 {
// This matches IPR protocol header pattern - not a KCP message
return false;
}
// Check KCP command byte at position 4
let cmd = data[4];
// Valid KCP commands: Push=81, Ack=82, Wask=83, Wins=84
(81..=84).contains(&cmd)
}
// #[cfg(target_os = "linux")]
@@ -172,11 +130,19 @@ impl MixnetListener {
let decoder = MultiIpPacketCodec::new();
let mut framed_reader = FramedRead::new(data_request.ip_packets.as_ref(), decoder);
while let Some(Ok(packet)) = framed_reader.next().await {
let result = self
.handle_packet(packet.as_bytes(), data_request.version)
.await;
responses.push(result);
while let Some(result) = framed_reader.next().await {
match result {
Ok(packet) => {
let result = self
.handle_packet(packet.as_bytes(), data_request.version)
.await;
responses.push(result);
}
Err(e) => {
log::warn!("Failed to decode bundled IP packet: {e}");
continue;
}
}
}
Ok(responses)
@@ -187,6 +153,7 @@ impl MixnetListener {
async fn on_static_connect_request(
&mut self,
connect_request: StaticConnectRequest,
stream_id: Option<u64>,
) -> PacketHandleResult {
log::info!(
"Received static connect request from {}",
@@ -209,6 +176,31 @@ impl MixnetListener {
let is_client_id_taken = self.connected_clients.is_client_connected(&sent_by);
let response = match (is_ip_taken, is_client_id_taken) {
(true, true) if stream_id.is_some() => {
// Stream-mode reconnect: tear down the old handler (which has a
// stale stream_id) and create a fresh one for the new stream.
log::info!("Stream-mode client reconnecting, replacing handler");
self.connected_clients.disconnect_client(&sent_by);
let (forward_from_tun_tx, close_tx, handle) = ConnectedClientHandler::start(
sent_by.clone(),
buffer_timeout,
version,
self.mixnet_client.split_sender(),
stream_id,
);
self.connected_clients.connect(
requested_ips,
sent_by.clone(),
forward_from_tun_tx,
close_tx,
handle,
);
Response::StaticConnect {
request_id,
reply: StaticConnectResponse::Success,
}
}
(true, true) => {
log::info!("Connecting an already connected client");
if self
@@ -233,6 +225,7 @@ impl MixnetListener {
buffer_timeout,
version,
self.mixnet_client.split_sender(),
stream_id,
);
// Register the new client in the set of connected clients
@@ -274,9 +267,11 @@ impl MixnetListener {
fn on_dynamic_connect_request(
&mut self,
connect_request: DynamicConnectRequest,
stream_id: Option<u64>,
) -> PacketHandleResult {
log::info!(
"Received dynamic connect request from {}",
"Received v{} dynamic connect request from {}",
connect_request.version.into_u8(),
connect_request.sent_by
);
@@ -289,15 +284,22 @@ impl MixnetListener {
.unwrap_or(nym_ip_packet_requests::codec::BUFFER_TIMEOUT);
if let Some(ips) = self.connected_clients.lookup_ip_from_client_id(&reply_to) {
log::debug!("Reconnecting to the previous session");
return Ok(Some(VersionedResponse {
version,
reply_to,
response: Response::DynamicConnect {
request_id,
reply: DynamicConnectSuccess { ips }.into(),
},
}));
if stream_id.is_some() {
// Stream-mode reconnect: tear down old handler (stale stream_id)
// and create a fresh one below with the new stream.
log::info!("Stream-mode client reconnecting, replacing handler");
self.connected_clients.disconnect_client(&reply_to);
} else {
log::debug!("Reconnecting to the previous session");
return Ok(Some(VersionedResponse {
version,
reply_to,
response: Response::DynamicConnect {
request_id,
reply: DynamicConnectSuccess { ips }.into(),
},
}));
}
}
let Some(new_ips) = self.connected_clients.find_new_ip() else {
@@ -318,6 +320,7 @@ impl MixnetListener {
buffer_timeout,
version,
self.mixnet_client.split_sender(),
stream_id,
);
// Register the new client in the set of connected clients
@@ -403,10 +406,14 @@ impl MixnetListener {
}))
}
async fn on_control_request(&mut self, control_request: ControlRequest) -> PacketHandleResult {
async fn on_control_request(
&mut self,
control_request: ControlRequest,
stream_id: Option<u64>,
) -> PacketHandleResult {
match control_request {
ControlRequest::StaticConnect(r) => self.on_static_connect_request(r).await,
ControlRequest::DynamicConnect(r) => self.on_dynamic_connect_request(r),
ControlRequest::StaticConnect(r) => self.on_static_connect_request(r, stream_id).await,
ControlRequest::DynamicConnect(r) => self.on_dynamic_connect_request(r, stream_id),
ControlRequest::Disconnect(r) => self.on_disconnect_request(r),
ControlRequest::Ping(r) => self.on_ping_request(r),
ControlRequest::Health(r) => self.on_health_request(r),
@@ -436,127 +443,106 @@ impl MixnetListener {
.unwrap_or("missing".to_owned())
);
// Check if this is a KCP-wrapped message from an LP client
if is_kcp_message(&reconstructed.message) {
return self.on_kcp_message(reconstructed).await;
// Check if this is an LP Stream frame
if reconstructed.message.len() >= LpFrameHeader::SIZE
&& let Ok(header) = LpFrameHeader::parse(&reconstructed.message)
&& header.kind == LpFrameKind::SphinxStream
{
return self.on_stream_frame(reconstructed, header).await;
}
// Regular IPR protocol message (websocket clients)
self.on_ipr_message(reconstructed).await
self.on_ipr_message(reconstructed, None).await
}
/// Handle KCP-wrapped messages from LP clients.
/// Handle LP Stream-framed messages.
///
/// LP clients send: KCP(IpPacketRequest)
/// We unwrap the KCP layer, reassemble fragments, then process the inner IpPacketRequest.
/// Responses are wrapped in KCP and sent directly via the sender_tag reply mechanism,
/// rather than being returned for standard handling.
async fn on_kcp_message(
/// Parses stream attributes, processes the inner IPR payload, and handles
/// responses inline (wrapped in LP Stream frames).
async fn on_stream_frame(
&mut self,
reconstructed: ReconstructedMessage,
header: LpFrameHeader,
) -> Result<Vec<PacketHandleResult>> {
let current_time_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
// Process the KCP data through the session manager.
// AIDEV-NOTE: Reply mechanism for LP clients:
// 1. LP clients MUST send SURBs (in RepliableMessage) when connecting/sending data
// 2. The SDK's client-core layer (ReceivedMessagesBuffer + ReplyController) automatically
// extracts SURBs from incoming RepliableMessages and stores them keyed by sender_tag
// 3. When we call InputMessage::new_reply(sender_tag, ...), the SDK looks up stored SURBs
// 4. The vec![] here is for KcpSessionManager's internal SURB storage (currently unused)
// since the SDK handles SURB storage at a lower layer
// 5. If replies fail, check that LP client is sending SURBs in its messages
let processing_result = self
.kcp_session_manager
.process_incoming(
&reconstructed.message,
vec![], // SDK handles SURB extraction/storage automatically
reconstructed.sender_tag,
current_time_ms,
)
.inspect_err(|e| {
log::warn!("KCP processing error: {e}");
})?;
let conv_id = processing_result.conversation_id;
log::debug!(
"KCP conv_id={}: received {} packets, {} complete messages",
processing_result.conversation_id,
processing_result.decoded_packets.len(),
processing_result.reassembled_messages.len()
"Received LP Stream frame ({} bytes)",
reconstructed.message.len()
);
// Process each reassembled message as an IpPacketRequest
for message_data in processing_result.reassembled_messages {
// Create a synthetic ReconstructedMessage for the inner payload
let inner_reconstructed = ReconstructedMessage {
message: message_data,
sender_tag: reconstructed.sender_tag,
};
let attrs = SphinxStreamFrameAttributes::parse(&header.frame_attributes).map_err(|e| {
IpPacketRouterError::Other(format!("Invalid stream frame attributes: {e}"))
})?;
match self.on_ipr_message(inner_reconstructed).await {
Ok(results) => {
// Handle responses by wrapping in KCP and sending directly
for result in results {
// false positive: this if can't be collapsed due to `response` being moved
// between calls
#[allow(clippy::collapsible_if)]
if let Ok(Some(response)) = result {
if let Err(e) = self
.handle_kcp_response(conv_id, response, current_time_ms)
.await
{
log::warn!(
"Error sending KCP-wrapped response for conv_id={conv_id}: {e}",
);
}
}
let stream_id = attrs.stream_id;
log::debug!(
"LP Stream: stream_id={stream_id:#018x}, msg_type={:?}, seq={}",
attrs.msg_type,
attrs.sequence_num
);
let payload = &reconstructed.message[LpFrameHeader::SIZE..];
// Open frames may carry an empty payload (stream handshake).
if payload.is_empty() {
log::info!("LP Stream: new stream opened (stream_id={stream_id:#018x})");
return Ok(vec![]);
}
let inner_reconstructed = ReconstructedMessage {
message: payload.to_vec(),
sender_tag: reconstructed.sender_tag,
};
match self
.on_ipr_message(inner_reconstructed, Some(stream_id))
.await
{
Ok(results) => {
for result in results {
let Ok(Some(response)) = result else { continue };
if let Err(e) = self.handle_stream_response(stream_id, response).await {
log::warn!(
"Error sending LP Stream response for stream_id={stream_id:#018x}: {e}"
);
}
}
Err(e) => {
log::warn!("Error processing KCP inner message: {}", e);
// Continue processing other messages
}
}
Err(e) => {
log::warn!("Error processing LP Stream inner message: {e}");
}
}
// Return empty - we handled responses directly above
Ok(vec![])
}
/// Wrap a response in KCP and send it via the mixnet reply mechanism.
/// Wrap a response in an LP Stream frame and send it via the mixnet.
///
/// This is used for LP clients that communicate via KCP-wrapped messages.
async fn handle_kcp_response(
/// Used for inline control responses (connect handshake, pong, health)
/// that are sent directly from the message handler — outside the
/// `ConnectedClientHandler` which owns the data-path sequence counter.
///
/// # Sequence numbering
///
/// These inline responses always use **seq=0**. The data-path counter in
/// `ConnectedClientHandler` starts at 1 and skips 0 on wrap-around, so
/// the two paths never collide.
///
/// Limitation: if multiple inline responses are sent on the same stream
/// (e.g. connect + later pong), they share seq=0. The client's reorder
/// buffer will see the second as a duplicate and drop it. In practice
/// this is fine because control responses are rare and idempotent, but
/// if it becomes a problem, give inline responses their own counter.
async fn handle_stream_response(
&mut self,
conv_id: u32,
stream_id: u64,
response: VersionedResponse,
current_time_ms: u64,
) -> Result<()> {
let reply_to = response.reply_to.clone();
// Serialize the response
let response_bytes = response.try_into_bytes()?;
// Wrap in KCP
let kcp_wrapped =
self.kcp_session_manager
.wrap_response(conv_id, &response_bytes, current_time_ms)?;
let wrapped = crate::clients::encode_stream_frame(stream_id, 0, response_bytes);
log::debug!(
"KCP conv_id={}: wrapped {} byte response into {} bytes",
conv_id,
response_bytes.len(),
kcp_wrapped.len()
);
// Send via mixnet using the sender_tag reply mechanism
let input_message =
crate::util::create_message::create_input_message(&reply_to, kcp_wrapped);
let input_message = crate::util::create_message::create_input_message(&reply_to, wrapped);
self.mixnet_client.send(input_message).await.map_err(|err| {
IpPacketRouterError::FailedToSendPacketToMixnet {
@@ -566,9 +552,21 @@ impl MixnetListener {
}
/// Handle regular IPR protocol messages (from websocket clients).
///
/// `stream_id` is `Some` when processing a payload extracted from an LP
/// Stream frame, so that connect handlers can thread the id to the
/// `ConnectedClientHandler` for LP-wrapping TUN responses.
///
/// # Version / transport enforcement
///
/// - LP Stream frames (`stream_id` is `Some`) **must** carry v9+ payloads.
/// - Non-stream messages (`stream_id` is `None`) **must** be v8 or lower.
///
/// Messages that violate these rules are dropped.
async fn on_ipr_message(
&mut self,
reconstructed: ReconstructedMessage,
stream_id: Option<u64>,
) -> Result<Vec<PacketHandleResult>> {
// First deserialize the request
let request = match IpPacketRequest::try_from(&reconstructed) {
@@ -579,11 +577,27 @@ impl MixnetListener {
req => req,
}?;
// Enforce version/transport consistency:
// - LP Stream frames must carry v9+ payloads
// - Non-stream messages must be v8 or lower
let version_num = request.version().into_u8();
if stream_id.is_some() && version_num < 9 {
log::warn!("LP Stream frame contains v{version_num} payload, expected v9+; dropping",);
return Ok(vec![]);
}
if stream_id.is_none() && version_num >= 9 {
log::warn!("Non-stream message claims v{version_num}, expected v8 or lower; dropping",);
return Ok(vec![]);
}
log::debug!("Received request: {request}");
match request {
IpPacketRequest::Data(request) => self.on_data_request(request).await,
IpPacketRequest::Control(request) => Ok(vec![self.on_control_request(request).await]),
IpPacketRequest::Control(request) => {
Ok(vec![self.on_control_request(request, stream_id).await])
}
}
}
@@ -640,45 +654,8 @@ impl MixnetListener {
}
}
/// Handle KCP session tick - drives retransmissions, ACKs, and cleanup.
///
/// Sends any pending outgoing KCP packets (ACKs, retransmissions) via the
/// sender_tag reply mechanism.
async fn handle_kcp_tick(&mut self) {
let current_time_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
// Tick all KCP sessions - generates ACKs, retransmissions, etc.
let outgoing = self.kcp_session_manager.tick(current_time_ms);
// Send any pending outgoing KCP protocol packets
for (conv_id, data) in outgoing {
// Get the sender_tag for this session to reply via mixnet
let Some(sender_tag) = self.kcp_session_manager.get_sender_tag(conv_id) else {
log::warn!(
"KCP tick: conv_id={} has {} bytes but no sender_tag, dropping",
conv_id,
data.len()
);
continue;
};
log::trace!("KCP tick: conv_id={} sending {} bytes", conv_id, data.len());
let reply_to = crate::clients::ConnectedClientId::AnonymousSenderTag(sender_tag);
let input_message = crate::util::create_message::create_input_message(&reply_to, data);
if let Err(e) = self.mixnet_client.send(input_message).await {
log::warn!("KCP tick: failed to send for conv_id={}: {}", conv_id, e);
}
}
}
pub(crate) async fn run(mut self) -> Result<()> {
let mut disconnect_timer = tokio::time::interval(DISCONNECT_TIMER_INTERVAL);
let mut kcp_tick_timer = tokio::time::interval(KCP_TICK_INTERVAL);
loop {
tokio::select! {
@@ -690,9 +667,6 @@ impl MixnetListener {
_ = disconnect_timer.tick() => {
self.handle_disconnect_timer().await;
},
_ = kcp_tick_timer.tick() => {
self.handle_kcp_tick().await;
},
msg = self.mixnet_client.next() => {
if let Some(msg) = msg {
match self.on_reconstructed_message(msg).await {
@@ -719,98 +693,31 @@ pub(crate) type PacketHandleResult = Result<Option<VersionedResponse>>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_is_kcp_message_rejects_ipr_protocol() {
// IPR v8 message: version=8, service_provider_type=1 (IpPacketRouter)
// Even if byte 4 happens to be a valid KCP command, we should reject it
let mut ipr_message = vec![0u8; 30];
ipr_message[0] = 8; // version
ipr_message[1] = 1; // ServiceProviderType::IpPacketRouter
ipr_message[4] = 81; // This would be KCP Push command, but should be ignored
fn test_lp_stream_frame_detected() {
use bytes::BytesMut;
use nym_lp::packet::frame::{
LpFrameHeader, LpFrameKind, SphinxStreamFrameAttributes, SphinxStreamMsgType,
};
assert!(
!is_kcp_message(&ipr_message),
"IPR v8 message should not be detected as KCP"
);
let attrs = SphinxStreamFrameAttributes {
stream_id: 0x1234,
msg_type: SphinxStreamMsgType::Data,
sequence_num: 42,
};
let frame = nym_lp::packet::frame::LpFrame::new_stream(attrs, vec![8, 1, 0]); // fake IPR payload
let mut buf = BytesMut::new();
frame.encode(&mut buf);
// IPR v6 message
ipr_message[0] = 6;
ipr_message[1] = 0; // v6 doesn't use service_provider_type but byte could be 0
assert!(
!is_kcp_message(&ipr_message),
"IPR v6 message should not be detected as KCP"
);
let header = LpFrameHeader::parse(&buf).unwrap();
assert_eq!(header.kind, LpFrameKind::SphinxStream);
// IPR v7 message
ipr_message[0] = 7;
ipr_message[1] = 2; // Authenticator type
assert!(
!is_kcp_message(&ipr_message),
"IPR v7 message should not be detected as KCP"
);
}
let parsed_attrs = SphinxStreamFrameAttributes::parse(&header.frame_attributes).unwrap();
assert_eq!(parsed_attrs.stream_id, 0x1234);
assert_eq!(parsed_attrs.msg_type, SphinxStreamMsgType::Data);
assert_eq!(parsed_attrs.sequence_num, 42);
#[test]
fn test_is_kcp_message_accepts_kcp() {
// Valid KCP message: conv_id in bytes 0-3, cmd=Push(81) at byte 4
// First bytes are conv_id (little-endian u32), so they won't look like IPR version
let mut kcp_message = vec![0u8; 30];
kcp_message[0] = 0x12; // conv_id byte 0 (not 6-8, so not IPR version)
kcp_message[1] = 0x34; // conv_id byte 1
kcp_message[2] = 0x56; // conv_id byte 2
kcp_message[3] = 0x78; // conv_id byte 3
kcp_message[4] = 81; // KCP Push command
assert!(
is_kcp_message(&kcp_message),
"Valid KCP message should be detected"
);
// Test all valid KCP commands
for cmd in [81u8, 82, 83, 84] {
kcp_message[4] = cmd;
assert!(
is_kcp_message(&kcp_message),
"KCP command {} should be accepted",
cmd
);
}
}
#[test]
fn test_is_kcp_message_rejects_short_messages() {
// Less than 25 bytes should be rejected
let short_message = vec![0u8; 24];
assert!(
!is_kcp_message(&short_message),
"Short message should not be detected as KCP"
);
let empty_message: Vec<u8> = vec![];
assert!(
!is_kcp_message(&empty_message),
"Empty message should not be detected as KCP"
);
}
#[test]
fn test_is_kcp_message_rejects_invalid_kcp_command() {
// Message with invalid KCP command at byte 4
let mut message = vec![0u8; 30];
message[0] = 0x12; // Not IPR version
message[4] = 80; // Invalid KCP command (valid are 81-84)
assert!(
!is_kcp_message(&message),
"Invalid KCP command should be rejected"
);
message[4] = 85; // Also invalid
assert!(
!is_kcp_message(&message),
"Invalid KCP command 85 should be rejected"
);
// Content is everything after the header
assert_eq!(&buf[LpFrameHeader::SIZE..], &[8, 1, 0]);
}
}