Compare commits

...

145 Commits

Author SHA1 Message Date
mfahampshire 0f8a8ddf7e Trim obvious comments, add architecture.md stub 2026-03-17 15:56:04 +00:00
mfahampshire 3c92ce60ca sdk: remove superseded stream_wrapper module 2026-03-17 15:29:34 +00:00
mfahampshire 846dbba363 sdk: add ipr_wrapper module with IpMixStream
- IpMixStream wraps MixnetStream for IPR tunnel over mixnet
- LP Stream framing handled automatically by MixnetStream
- Gateway discovery, connect handshake, IP packet send/receive
2026-03-17 15:29:07 +00:00
mfahampshire 94ab9d5466 IPR: support LP Stream-framed client connections
- Detect and route LP Stream frames in mixnet_listener
- Wrap inline responses in LP Stream frames
- Thread stream_id to ConnectedClientHandler for TUN responses
2026-03-17 15:28:39 +00:00
mfahampshire c78d942383 Replace MixnetStream with LP framing
- Replace custom header with LpFrameHeader
- Added sequence number for message ordering
2026-03-17 12:03:12 +00:00
mfahampshire 0b6166d20e Add LpFrameKind::Stream variant with StreamFrameAttributes
- Define LP wire format for stream multiplexing
- Handle new variant in entry gateway match arm
2026-03-17 12:02:12 +00:00
mfahampshire 6384467526 Reset rebase contamination: restore develop state for shared code
Mass-reset ~50 files that were accidentally modified during rebase
(PollSender/InputMessageCodec/&mut self changes from old experimental
commits). Disable stream_wrapper module (will be rebuilt on MixnetStream
+ LP frame envelope). Remove IpMixStream refs from ip_packet_client
helpers temporarily.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-16 16:15:40 +00:00
mfahampshire fdd3823585 clean 2026-03-16 15:35:30 +00:00
mfahampshire 892a3bd826 add scratch notes to gitignore 2026-03-16 15:35:29 +00:00
mfahampshire 59ff7d6588 comment 2026-03-16 15:35:29 +00:00
mfahampshire 20c4553bca clippy 2026-03-16 15:35:29 +00:00
mfahampshire 4c38481c36 Fix env discovery 2026-03-16 15:35:29 +00:00
mfahampshire 07680db2c7 mod ignore 2026-03-16 15:35:29 +00:00
mfahampshire 59cbce50f7 mod logging to with poisoning retry 2026-03-16 15:35:29 +00:00
mfahampshire ac13ddbda8 remove unnecessary logging from unit tests 2026-03-16 15:35:29 +00:00
mfahampshire 67803930b6 dont always run dns ping tests 2026-03-16 15:35:29 +00:00
mfahampshire 7052e2e902 fix ipmixstream new() 2026-03-16 15:35:28 +00:00
mfahampshire cccfa76336 clippy 2026-03-16 15:35:28 +00:00
mfahampshire a946336e67 add missing network env 2026-03-16 15:35:28 +00:00
mfahampshire e5836bc1cb fmt 2026-03-16 15:35:28 +00:00
mfahampshire f12108a7db clippy 2026-03-16 15:35:28 +00:00
mfahampshire 70bdbce23f Clippy 2026-03-16 15:35:28 +00:00
mfahampshire e6f9b551ed clippy 2026-03-16 15:35:28 +00:00
mfahampshire fcfa0b604e rustfmt 2026-03-16 15:35:28 +00:00
mfahampshire 8b086e0239 Fix useragent 2026-03-16 15:35:27 +00:00
mfahampshire 6c76834b6c Example code 2026-03-16 15:35:27 +00:00
mfahampshire 071589237b Add network_envs 2026-03-16 15:35:27 +00:00
mfahampshire 771ee10ba2 Update inline examples 2026-03-16 15:35:27 +00:00
mfahampshire 33ce05a3df Add bootstrap network config 2026-03-16 15:35:27 +00:00
mfahampshire 73016ed687 Docs first pass 2026-03-16 15:35:27 +00:00
mfahampshire 8a5205ac4c Include err for no surb tag or peer 2026-03-16 15:35:27 +00:00
mfahampshire aaa7e317bf Fix Mixstream::new() with new configurable network 2026-03-16 15:35:27 +00:00
mfahampshire f28c49e9d6 Update docs + make network configurable 2026-03-16 15:35:27 +00:00
mfahampshire e2ceaf48ed new message borrow 2026-03-16 15:35:27 +00:00
mfahampshire 3e2137a33e First pass rework to bytes in bytes out 2026-03-16 15:35:26 +00:00
mfahampshire 984fa065e3 remove .expect()s and add some encode and decode tests 2026-03-16 15:35:26 +00:00
mfahampshire da46ea7485 remove unused connction type enum 2026-03-16 15:35:26 +00:00
mfahampshire b1bc359806 Fix double copy + deserialisation -> none loop 2026-03-16 15:35:26 +00:00
mfahampshire b338644620 Switch frm default bincode in nymsphinx 2026-03-16 15:35:26 +00:00
mfahampshire 1ec0bf868b use make_bincode_serializer instead of bincode default in client-core 2026-03-16 15:35:26 +00:00
mfahampshire 07842661b9 properly fail on version checks 2026-03-16 15:35:26 +00:00
mfahampshire 0cd4dd5747 Removed unneccesary panics with self.peer_surb_tag 2026-03-16 15:35:26 +00:00
Jędrzej Stuczyński abdd960b20 removed dependency on nym-gateway-directory 2026-03-16 15:35:25 +00:00
mfahampshire db2f3bff05 Use workspace import for mixtcp rustls 2026-03-16 15:35:14 +00:00
mfahampshire be56c79106 remove commented out code 2026-03-16 15:34:54 +00:00
mfahampshire 3ccfbee834 add doc info for other sdk modules 2026-03-16 15:34:53 +00:00
mfahampshire 942ab3c8e8 follow convention for to_v2_bytes 2026-03-16 15:34:53 +00:00
mfahampshire 9ec937dd30 fix comment and duplication in root cargo 2026-03-16 15:34:53 +00:00
mfahampshire 6ccc4a988a use workspace base64 version 2026-03-16 15:34:44 +00:00
mfahampshire 27890eb1a3 remove external patch 2026-03-16 15:34:43 +00:00
mfahampshire fa327a1b2a add license to mixtcp cargo 2026-03-16 15:34:43 +00:00
mfahampshire cea66c1237 edition matches workspace 2026-03-16 15:34:43 +00:00
mfahampshire 757a89c5d7 clippy 2026-03-16 15:34:43 +00:00
mfahampshire 1e3f531e15 remove last nym vpn api deps 2026-03-16 15:34:43 +00:00
mfahampshire 7cc33d8df7 remove nymvpnapi - always use http fallback 2026-03-16 15:34:43 +00:00
mfahampshire 1bd0bfeee1 temp before big mod 2026-03-16 15:34:43 +00:00
mfahampshire f297af2a8c cont removing unnecessary types 2026-03-16 15:34:43 +00:00
mfahampshire d9190e5899 remove unused 2026-03-16 15:34:43 +00:00
mfahampshire a562812ad9 added stream module to mixnet readme 2026-03-16 15:34:43 +00:00
mfahampshire 7368692629 remove external dep on nymvpn repo in sdk 2026-03-16 15:34:42 +00:00
mfahampshire c185f485a7 lint 2026-03-16 15:34:42 +00:00
mfahampshire 6930968e88 lock 2026-03-16 15:34:42 +00:00
mfahampshire 8294191913 remove old commented out imports 2026-03-16 15:34:42 +00:00
mfahampshire 9b2fb45270 temp get rid of logging for ci again 2026-03-16 15:34:42 +00:00
mfahampshire cb8747abb8 temp get rid of logging for ci 2026-03-16 15:34:42 +00:00
mfahampshire 47d37d8aed clippy 2026-03-16 15:34:42 +00:00
mfahampshire d452932b18 clippy 2026-03-16 15:34:42 +00:00
mfahampshire 702dfdc927 clippy warnings: remove 2026-03-16 15:34:30 +00:00
mfahampshire 18e8dfe394 Fix FFI shared lib 2026-03-16 15:34:29 +00:00
mfahampshire 0208a84b77 Mod to mixnet client mutability from traits elsewhere 2026-03-16 15:34:29 +00:00
mfahampshire 7105bbf4b4 Add RwLock to wasm client helper 2026-03-16 15:34:11 +00:00
mfahampshire 39692502df remove accidental import from merge 2026-03-16 15:34:10 +00:00
mfahampshire fcefa079b0 reintroduce import 2026-03-16 15:34:10 +00:00
mfahampshire 371422f27b lint 2026-03-16 15:34:10 +00:00
mfahampshire 5541f242ff smol mixtcp readme 2026-03-16 15:34:10 +00:00
mfahampshire 348e93dd70 rename smolmix - mixtcp 2026-03-16 15:34:10 +00:00
mfahampshire 7f8b7eea8c strip down commenting that is triggering compiler err 2026-03-16 15:34:10 +00:00
mfahampshire 8760c40d46 info -> debug logging for serialised bytes written by stream_wrapper 2026-03-16 15:34:10 +00:00
mfahampshire 8ae4b8fee2 Move files to examples + split examples apart 2026-03-16 15:34:10 +00:00
mfahampshire 4f4885fe50 Remove unused imports 2026-03-16 15:34:09 +00:00
mfahampshire bc52db53b7 remove comments and unused imports 2026-03-16 15:34:09 +00:00
mfahampshire 08d49a6f2e remove unwraps in place of error types 2026-03-16 15:34:09 +00:00
mfahampshire 6f53192dbf deprecate notice for tcpproxy module 2026-03-16 15:34:09 +00:00
mfahampshire b5afb77f19 Clean up unused imports 2026-03-16 15:34:09 +00:00
mfahampshire 29714dea76 Fix gw directory api change in ipr wrapper 2026-03-16 15:34:09 +00:00
mfahampshire 8fd9cee189 almost sorted new version gw dir 2026-03-16 15:34:09 +00:00
mfahampshire 2b4a11e273 linting 2026-03-16 15:34:09 +00:00
mfahampshire a58b32703c add missed stuff from rebase 2026-03-16 15:34:09 +00:00
mfahampshire de80b4ce48 Made explicit error types 2026-03-16 15:34:08 +00:00
mfahampshire 85a3b25be9 Fix logging in tests 2026-03-16 15:33:44 +00:00
mfahampshire 708bd71a56 framing > byte buffer 2026-03-16 15:33:44 +00:00
mfahampshire 40b886e0bd Fix inverted buffer slice logic 2026-03-16 15:33:44 +00:00
mfahampshire 23c1c4bdac Tests + getting reuable client in new() for speedup 2026-03-16 15:33:44 +00:00
mfahampshire 2dd8707725 rough first reqwest client poc 2026-03-16 15:33:44 +00:00
mfahampshire 0bb3c4b2bf remove clunky old examples in place of unit tests 2026-03-16 15:33:43 +00:00
mfahampshire 72e8180abe TLS first version 2026-03-16 15:33:43 +00:00
mfahampshire 2d5b1d577c update readme with new logging 2026-03-16 15:33:43 +00:00
mfahampshire b5e45040ca change logging for nym provider 2026-03-16 15:33:43 +00:00
mfahampshire e420081512 remove old note 2026-03-16 15:33:43 +00:00
mfahampshire 0da4ee985b smolmix device + example 2026-03-16 15:33:43 +00:00
mfahampshire 6d8cacc900 commenting 2026-03-16 15:33:43 +00:00
mfahampshire 49543fcd98 export extra types from ipmixstream 2026-03-16 15:33:43 +00:00
mfahampshire 7b80716c9a split ipmixstream + tests 2026-03-16 15:33:43 +00:00
mfahampshire a4a48c60ae err handling on surb send between split 2026-03-16 15:33:42 +00:00
mfahampshire e027b5a1fe removed IpMixSocket; was a bit unnecessary given connection logic 2026-03-16 15:33:42 +00:00
mfahampshire 723df5584e Remove unnecessary MixnetClient from IpSocket: streamlining 2026-03-16 15:33:42 +00:00
mfahampshire 2ca5155748 more comments 2026-03-16 15:33:42 +00:00
mfahampshire 4f0cc58a11 commenting 2026-03-16 15:33:42 +00:00
mfahampshire 2ccdfedd65 commenting format change + comment out logging in test 2026-03-16 15:33:42 +00:00
mfahampshire d7ddb7592c comment out logging in test 2026-03-16 15:33:42 +00:00
mfahampshire 7371ce3e36 * got ipr pings working with stream_wrapper_ipr
* updated stream_wrapper with debug methods
2026-03-16 15:33:42 +00:00
mfahampshire cd7bb9931e pull in + mod nym-gateway 2026-03-16 15:33:24 +00:00
mfahampshire b77dbdd87e * pulled in helpers from various files
* added readme to explain
2026-03-16 15:33:23 +00:00
mfahampshire 83dcf3fd13 got ipr wrapper connected 2026-03-16 15:33:23 +00:00
mfahampshire a5c6e9d0e2 mod ip_packet_client 2026-03-16 15:33:23 +00:00
mfahampshire a417411184 out of dependency hell 2026-03-16 15:33:23 +00:00
mfahampshire 24d5e4aba9 removed circular dep from gateway-directory 2026-03-16 15:33:23 +00:00
mfahampshire 6cb2fc8445 before directory modification 2026-03-16 15:33:23 +00:00
mfahampshire 4ea2c3beb3 temp commit: got gateway dir dependency working, moving on to vpn-api-client 2026-03-16 15:33:23 +00:00
mfahampshire be8c1191f3 commit before messing with reexport stuff 2026-03-16 15:32:53 +00:00
mfahampshire d969979c8c reorg 2026-03-16 15:32:53 +00:00
mfahampshire c6fd3c8527 added surbs to split r/w + some streamlining + comments + tests 2026-03-16 15:32:53 +00:00
mfahampshire 6ac4d93909 adding surbs + anon reply functionality 2026-03-16 15:32:53 +00:00
mfahampshire 197a7eaec8 make inputmessage anonymous type over simple 2026-03-16 15:32:53 +00:00
mfahampshire f598ee2916 first full pass @ stream + split wrappers 2026-03-16 15:32:53 +00:00
mfahampshire b2fa6cdf8f temp 2026-03-16 15:32:53 +00:00
mfahampshire 97dbef155d initial pass streamwrapper 2026-03-16 15:32:52 +00:00
durch 9dbd91d93e Address part of PR comments 2026-03-16 15:32:52 +00:00
durch 7914cbdbb7 fmt 2026-03-16 15:32:52 +00:00
durch 99febfb3aa Log decoding error 2026-03-16 15:32:52 +00:00
durch 2b00188983 Cleanup prints 2026-03-16 15:32:52 +00:00
durch 82f270329f Update IPR sig 2026-03-16 15:32:52 +00:00
mfahampshire 3cb17e76bd tweaks to tcpproxy example 2026-03-16 15:32:52 +00:00
durch 7b2f8a4ed1 WASM changes 2026-03-16 15:32:40 +00:00
durch 438e745cb3 AsyncWrite 2026-03-16 15:32:40 +00:00
mfahampshire 674fd511f4 remove double asyncwrite 2026-03-16 15:32:40 +00:00
durch 66d85a7c0d Use tokio AsyncRead 2026-03-16 15:32:39 +00:00
durch d12a5d754a ReconstructedMessageCodec 2026-03-16 15:32:21 +00:00
Drazen 3a78d62240 InputMessageCodec, Serde for MixPacket 2026-03-16 15:32:21 +00:00
mfahampshire 5e651b55fc minor changes with new files / fixes 2026-03-16 15:32:05 +00:00
Drazen 8a6bf4a03d Use Sink always 2026-03-16 15:32:04 +00:00
mfahampshire 6a2f1a67ed temp 2026-03-16 15:31:39 +00:00
Drazen d56ab91a2e Switch to PollSender 2026-03-16 15:31:33 +00:00
durch 8f670f467b AsyncRead for MixnetClient 2026-03-16 15:31:08 +00:00
durch d013168823 serde for ReconstructedMessage 2026-03-16 15:29:35 +00:00
39 changed files with 10229 additions and 1374 deletions
+1
View File
@@ -46,6 +46,7 @@ storybook-static
**/.DS_Store
cpu-cycles/libcpucycles/build
foxyfox.env
scratch.txt
.next
ppa-private-key.b64
Generated
+1408 -1247
View File
File diff suppressed because it is too large Load Diff
+63
View File
@@ -103,6 +103,13 @@ impl LpFrame {
Self::new(LpFrameKind::Forward, data)
}
pub fn new_stream(attrs: StreamFrameAttributes, content: impl Into<Bytes>) -> Self {
Self {
header: LpFrameHeader::new(LpFrameKind::Stream, attrs.encode()),
content: content.into(),
}
}
pub(crate) fn len(&self) -> usize {
LpFrameHeader::SIZE + self.content.len()
}
@@ -115,6 +122,62 @@ pub enum LpFrameKind {
Opaque = 0,
Registration = 1,
Forward = 2,
Stream = 3,
}
/// Message type within a `LpFrameKind::Stream` frame.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum StreamMsgType {
/// Open a new stream. Content is optional initial data.
Open = 0,
/// Data on an existing stream.
Data = 1,
}
/// Parsed form of the 14-byte `frame_attributes` for `LpFrameKind::Stream`.
///
/// Wire layout (big-endian):
/// ```text
/// [0..8 ) stream_id : u64
/// [8 ) msg_type : u8 (0 = Open, 1 = Data)
/// [9..13) sequence_num : u32
/// [13 ) reserved : u8
/// ```
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StreamFrameAttributes {
pub stream_id: u64,
pub msg_type: StreamMsgType,
pub sequence_num: u32,
}
impl StreamFrameAttributes {
pub fn encode(&self) -> [u8; 14] {
let mut buf = [0u8; 14];
buf[0..8].copy_from_slice(&self.stream_id.to_be_bytes());
buf[8] = self.msg_type as u8;
buf[9..13].copy_from_slice(&self.sequence_num.to_be_bytes());
buf
}
pub fn parse(attrs: &[u8; 14]) -> Result<Self, MalformedLpPacketError> {
let stream_id = u64::from_be_bytes(attrs[0..8].try_into().unwrap());
let msg_type = match attrs[8] {
0 => StreamMsgType::Open,
1 => StreamMsgType::Data,
other => {
return Err(MalformedLpPacketError::DeserialisationFailure(format!(
"invalid stream msg_type: {other}"
)));
}
};
let sequence_num = u32::from_be_bytes(attrs[9..13].try_into().unwrap());
Ok(Self {
stream_id,
msg_type,
sequence_num,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
+30
View File
@@ -0,0 +1,30 @@
[package]
name = "mixtcp"
version = "0.0.1"
edition = "2021"
license.workspace = true
[dependencies]
smoltcp = { workspace = true, default-features = false, features = [
"std",
"medium-ip",
"proto-ipv4",
"proto-ipv6",
"socket-tcp",
"socket-icmp",
] }
tokio = { workspace = true }
bytes = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
nym-bin-common = { path = "../common/bin-common", features = ["basic_tracing"] }
nym-sdk = { path = "../sdk/rust/nym-sdk" }
nym-ip-packet-requests = { path = "../common/ip-packet-requests" }
thiserror.workspace = true
rustls = { workspace = true }
[dev-dependencies]
reqwest.workspace = true
dirs.workspace = true
webpki-roots.workspace = true
serde_json.workspace = true
+30
View File
@@ -0,0 +1,30 @@
# MixTCP
This is an initial proof of concept of a SmolTCP `device` that uses the Mixnet for transport. It relies on the `IpMixStream` module from the Rust SDK to set up a connection with an Exit Gateway's Ip-Packet-Router, meaning that this is the IP that is seen by the receiver of the request.
This can be used as the basis for building HTTP(S) crates on top of the Mixnet whilst abstracting away the complexities of using the Mixnet for transport.
More to come in the future.
`examples/` contains examples for:
- a TLS ping with Cloudflare
- creating a `reqwest`-like HTTPS `GET` request and receiving a response
## Component Interaction
```sh
create_device()
|
+--------------+---------------+
| | |
v v v
NymIprDevice NymIprBridge IpPair
| | (10.0.x.x)
| |
+-- channels --+
|
v
IpMixStream
|
v
Mixnet
```
+286
View File
@@ -0,0 +1,286 @@
#![allow(clippy::result_large_err)]
use mixtcp::{create_device, MixtcpError};
use rustls::{pki_types::ServerName, ClientConfig, ClientConnection};
use std::{
io::{self, Read, Write},
sync::Arc,
};
use tracing::info;
use nym_sdk::stream_wrapper::{IpMixStream, NetworkEnvironment};
use smoltcp::{
iface::{Config, Interface, SocketSet},
socket::tcp,
time::Instant,
wire::{HardwareAddress, IpAddress, IpCidr, Ipv4Address},
};
use std::sync::Once;
use std::time::Duration;
static INIT: Once = Once::new();
pub struct TlsOverTcp {
pub conn: ClientConnection,
}
impl TlsOverTcp {
pub fn new(domain: &str) -> Result<Self, MixtcpError> {
let mut root_store = rustls::RootCertStore::empty();
root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
let config = ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
let server_name = ServerName::try_from(domain)
.map_err(|_| MixtcpError::InvalidDnsName)?
.to_owned();
let conn = ClientConnection::new(Arc::new(config), server_name)
.map_err(|_| MixtcpError::TlsHandshakeFailed)?;
Ok(Self { conn })
}
/// Move data from TLS connection to TCP socket
pub fn write_tls(&mut self, socket: &mut tcp::Socket) -> Result<(), MixtcpError> {
let mut buf = [0u8; 4096];
while self.conn.wants_write() {
match self.conn.write_tls(&mut buf.as_mut_slice()) {
Ok(n) if n > 0 => {
socket
.send_slice(&buf[..n])
.map_err(|_| MixtcpError::TlsHandshakeFailed)?;
}
_ => break,
}
}
Ok(())
}
/// Move data from TCP socket to TLS connection
pub fn read_tls(&mut self, socket: &mut tcp::Socket) -> Result<(), MixtcpError> {
if socket.can_recv() {
let _ = socket.recv(|chunk| {
if !chunk.is_empty() {
inspect_tls_packet(chunk);
let _ = self.conn.read_tls(&mut io::Cursor::new(&mut *chunk));
let _ = self.conn.process_new_packets();
}
(chunk.len(), ())
});
}
Ok(())
}
pub fn send(&mut self, data: &[u8], socket: &mut tcp::Socket) -> Result<(), MixtcpError> {
self.conn
.writer()
.write_all(data)
.map_err(|_| MixtcpError::TlsHandshakeFailed)?;
self.write_tls(socket)
}
pub fn recv(&mut self, socket: &mut tcp::Socket) -> Result<Vec<u8>, MixtcpError> {
self.read_tls(socket)?;
let mut result = Vec::new();
let mut buf = vec![0u8; 4096];
match self.conn.reader().read(&mut buf) {
Ok(n) if n > 0 => result.extend_from_slice(&buf[..n]),
_ => {}
}
Ok(result)
}
}
fn inspect_tls_packet(data: &[u8]) {
if data.len() < 5 {
return;
}
let content_type = data[0];
if !(0x14..=0x17).contains(&content_type) {
return;
}
let version = u16::from_be_bytes([data[1], data[2]]);
let length = u16::from_be_bytes([data[3], data[4]]);
info!(
"TLS packet: ContentType={:#04x}, Version={:#06x}, Length={}",
content_type, version, length
);
if content_type == 0x16 && data.len() > 5 {
let handshake_type = data[5];
let handshake_types = match handshake_type {
0x01 => "ClientHello",
0x02 => "ServerHello",
0x0b => "Certificate",
0x0c => "ServerKeyExchange",
0x0d => "CertificateRequest",
0x0e => "ServerHelloDone",
0x0f => "CertificateVerify",
0x10 => "ClientKeyExchange",
0x14 => "Finished",
_ => "Unknown",
};
info!(
"Handshake type: {:#04x} ({}), Length: {}",
handshake_type, handshake_types, length
);
}
}
fn init_logging() {
INIT.call_once_force(|state| {
if state.is_poisoned() {
eprintln!("Logger initialization was poisoned, retrying");
}
if !tracing::dispatcher::has_been_set() {
nym_bin_common::logging::setup_tracing_logger();
}
});
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
init_logging();
let ipr_stream = IpMixStream::new(NetworkEnvironment::Mainnet).await?;
let (mut device, bridge, allocated_ips) = create_device(ipr_stream).await?;
info!("Allocated IP: {}", allocated_ips.ipv4);
tokio::spawn(async move {
bridge.run().await.unwrap();
});
let config = Config::new(HardwareAddress::Ip);
let mut iface = Interface::new(config, &mut device, Instant::now());
iface.update_ip_addrs(|ip_addrs| {
ip_addrs
.push(IpCidr::new(IpAddress::from(allocated_ips.ipv4), 32))
.unwrap();
});
iface
.routes_mut()
.add_default_ipv4_route(Ipv4Address::UNSPECIFIED)
.unwrap();
let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 16384]);
let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 4096]);
let tcp_socket = tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer);
let mut sockets = SocketSet::new(vec![]);
let tcp_handle = sockets.add(tcp_socket);
let target_ip = Ipv4Address::new(1, 1, 1, 1);
let target_port = 443;
let mut timestamp = Instant::from_millis(0);
let start = tokio::time::Instant::now();
let mut connected = false;
let mut tls = None;
let mut handshake_completed = false;
let mut request_sent = false;
loop {
if start.elapsed() > Duration::from_secs(60) {
info!("Test timeout after 60 seconds");
break;
}
iface.poll(timestamp, &mut device, &mut sockets);
timestamp += smoltcp::time::Duration::from_millis(1);
let socket = sockets.get_mut::<tcp::Socket>(tcp_handle);
// TCP connection setup
if !connected && !socket.is_open() {
match socket.connect(iface.context(), (target_ip, target_port), 49152) {
Ok(_) => {
info!("TCP connect started");
connected = true;
}
Err(e) => {
info!("TCP connect failed: {}", e);
break;
}
}
}
// TLS setup after TCP established
if socket.state() == tcp::State::Established && tls.is_none() {
info!("TCP established - creating TLS connection");
match TlsOverTcp::new("cloudflare.com") {
Ok(t) => tls = Some(t),
Err(e) => {
info!("TLS create failed: {}", e);
break;
}
}
}
// TLS handshake and request
if let Some(ref mut tls_conn) = tls {
let _ = tls_conn.read_tls(socket);
let _ = tls_conn.write_tls(socket);
// Complete handshake
if !tls_conn.conn.is_handshaking() && !handshake_completed {
handshake_completed = true;
info!("TLS handshake completed - ready for HTTPS");
// Send simple HTTP request
let request = b"GET /cdn-cgi/trace HTTP/1.1\r\nHost: cloudflare.com\r\nUser-Agent: mixtcp-test/1.0\r\nAccept: */*\r\nConnection: close\r\n\r\n";
match tls_conn.send(request, socket) {
Ok(_) => {
info!("HTTPS request sent");
request_sent = true;
}
Err(e) => {
info!("HTTPS send failed: {}", e);
break;
}
}
}
// Read response after request sent
if request_sent {
let mut response_data = Vec::new();
let mut buf = vec![0u8; 4096];
match tls_conn.conn.reader().read(&mut buf) {
Ok(0) => {
info!("Response complete - connection closed");
break;
}
Ok(n) if n > 0 => {
response_data.extend_from_slice(&buf[..n]);
info!("Received {} bytes", n);
if let Ok(response_str) = std::str::from_utf8(&response_data) {
if response_str.contains("\r\n\r\n") {
info!("HTTPS response received!");
if let Some(status_end) = response_str.find("\r\n") {
info!("HTTP Status: {}", &response_str[..status_end]);
}
info!("Full response: {}", response_str);
return Ok(());
}
}
}
Ok(1_usize..) => {
todo!()
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// Keep polling
}
Err(e) => {
info!("Read error: {}", e);
break;
}
}
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
Err("No HTTP response received".into())
}
+341
View File
@@ -0,0 +1,341 @@
#![allow(clippy::result_large_err)]
use mixtcp::{create_device, MixtcpError, NymIprDevice};
use nym_sdk::stream_wrapper::{IpMixStream, NetworkEnvironment};
use reqwest::StatusCode;
use rustls::{pki_types::ServerName, ClientConfig, ClientConnection};
use smoltcp::{
iface::{Config, Interface, SocketSet},
socket::tcp,
time::Instant,
wire::{HardwareAddress, IpAddress, IpCidr, Ipv4Address},
};
use std::sync::Once;
use std::{
io::{self, Read, Write},
sync::Arc,
time::Duration,
};
use tracing::info;
static INIT: Once = Once::new();
pub struct TlsOverTcp {
pub conn: ClientConnection,
}
impl TlsOverTcp {
pub fn new(domain: &str) -> Result<Self, MixtcpError> {
let mut root_store = rustls::RootCertStore::empty();
root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
let config = ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
let server_name = ServerName::try_from(domain)
.map_err(|_| MixtcpError::InvalidDnsName)?
.to_owned();
let conn = ClientConnection::new(Arc::new(config), server_name)
.map_err(|_| MixtcpError::TlsHandshakeFailed)?;
Ok(Self { conn })
}
pub fn write_tls(&mut self, socket: &mut tcp::Socket) -> Result<(), MixtcpError> {
let mut buf = [0u8; 4096];
while self.conn.wants_write() {
match self.conn.write_tls(&mut buf.as_mut_slice()) {
Ok(n) if n > 0 => {
socket
.send_slice(&buf[..n])
.map_err(|_| MixtcpError::TlsHandshakeFailed)?;
}
_ => break,
}
}
Ok(())
}
pub fn read_tls(&mut self, socket: &mut tcp::Socket) -> Result<(), MixtcpError> {
if socket.can_recv() {
let _ = socket.recv(|chunk| {
if !chunk.is_empty() {
let _ = self.conn.read_tls(&mut io::Cursor::new(&mut *chunk));
let _ = self.conn.process_new_packets();
}
(chunk.len(), ())
});
}
Ok(())
}
pub fn send(&mut self, data: &[u8], socket: &mut tcp::Socket) -> Result<(), MixtcpError> {
self.conn
.writer()
.write_all(data)
.map_err(|_| MixtcpError::TlsHandshakeFailed)?;
self.write_tls(socket)
}
}
/// Reqwest-ish client right now, just a handrolled GET request for the example
pub struct MixtcpReqwestClient {
device: Arc<tokio::sync::Mutex<(smoltcp::iface::Interface, NymIprDevice)>>,
_bridge: tokio::task::JoinHandle<()>,
_allocated_ip: Ipv4Address,
}
impl MixtcpReqwestClient {
pub async fn new() -> Result<Self, MixtcpError> {
let ipr_stream = IpMixStream::new(NetworkEnvironment::Mainnet)
.await
.map_err(|_| MixtcpError::MixnetConnectionFailed)?;
let (mut device, bridge, allocated_ips) = create_device(ipr_stream).await?;
info!("Allocated IP: {}", allocated_ips.ipv4);
let bridge_handle = tokio::spawn(async move {
if let Err(e) = bridge.run().await {
tracing::error!("Bridge error: {}", e);
}
});
let config = Config::new(HardwareAddress::Ip);
let mut iface = Interface::new(config, &mut device, Instant::now());
iface.update_ip_addrs(|ip_addrs| {
ip_addrs
.push(IpCidr::new(IpAddress::from(allocated_ips.ipv4), 32))
.unwrap();
});
iface
.routes_mut()
.add_default_ipv4_route(Ipv4Address::UNSPECIFIED)
.unwrap();
let device = Arc::new(tokio::sync::Mutex::new((iface, device)));
Ok(Self {
device,
_bridge: bridge_handle,
_allocated_ip: allocated_ips.ipv4,
})
}
pub async fn get(&self, url: &str) -> Result<MixtcpResponse, MixtcpError> {
let parsed_url = reqwest::Url::parse(url).map_err(|_| MixtcpError::InvalidUrl)?;
let host = parsed_url.host_str().ok_or(MixtcpError::InvalidUrl)?;
let path = parsed_url.path();
let response_bytes = self.simple_get_request(host, path).await?;
let (status, body) = self.parse_simple_response(&response_bytes)?;
Ok(MixtcpResponse { status, body })
}
async fn simple_get_request(&self, domain: &str, path: &str) -> Result<Vec<u8>, MixtcpError> {
let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 16384]);
let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 4096]);
let tcp_socket = tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer);
let mut sockets = SocketSet::new(vec![]);
let tcp_handle = sockets.add(tcp_socket);
let target_ip = Ipv4Address::new(1, 1, 1, 1);
let target_port = 443;
let mut timestamp = Instant::from_millis(0);
let start = tokio::time::Instant::now();
let mut connected = false;
let mut tls = None;
let mut handshake_completed = false;
let mut request_sent = false;
let mut response_data = Vec::new();
let mut device_guard = self.device.lock().await;
let (ref mut iface, ref mut device) = &mut *device_guard;
loop {
if start.elapsed() > Duration::from_secs(60) {
return Err(MixtcpError::Timeout);
}
iface.poll(timestamp, device, &mut sockets);
timestamp += smoltcp::time::Duration::from_millis(1);
let socket = sockets.get_mut::<tcp::Socket>(tcp_handle);
if !connected && !socket.is_open() {
match socket.connect(iface.context(), (target_ip, target_port), 49152) {
Ok(_) => {
info!("TCP connect started");
connected = true;
}
Err(e) => {
info!("TCP connect failed: {}", e);
return Err(MixtcpError::TcpConnectionFailed);
}
}
}
if socket.state() == tcp::State::Established && tls.is_none() {
info!("TCP established - creating TLS connection");
match TlsOverTcp::new(domain) {
Ok(t) => tls = Some(t),
Err(e) => {
info!("TLS create failed: {}", e);
return Err(MixtcpError::TlsHandshakeFailed);
}
}
}
if let Some(ref mut tls_conn) = tls {
let _ = tls_conn.read_tls(socket);
let _ = tls_conn.write_tls(socket);
if !tls_conn.conn.is_handshaking() && !handshake_completed {
handshake_completed = true;
info!("TLS handshake completed - ready for HTTPS");
let request = format!(
"GET {} HTTP/1.1\r\nHost: {}\r\nUser-Agent: mixtcp/1.0\r\nAccept: */*\r\nConnection: close\r\n\r\n",
path, domain
);
tls_conn.send(request.as_bytes(), socket)?;
info!("HTTPS request sent");
request_sent = true;
}
if request_sent {
let mut buf = vec![0u8; 4096];
match tls_conn.conn.reader().read(&mut buf) {
Ok(0) => {
info!("Response complete");
break;
}
Ok(n) if n > 0 => {
response_data.extend_from_slice(&buf[..n]);
if let Ok(response_str) = std::str::from_utf8(&response_data) {
if response_str.contains("\r\n\r\n") {
return Ok(response_data);
}
}
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
Err(e) => {
info!("Read error: {}", e);
return Err(MixtcpError::ResponseReadFailed);
}
Ok(_) => continue,
}
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
Err(MixtcpError::NoResponseReceived)
}
/// Simple response - just extract status and body
fn parse_simple_response(&self, response_bytes: &[u8]) -> Result<(u16, String), MixtcpError> {
let response_str = String::from_utf8_lossy(response_bytes);
let status_line = response_str
.lines()
.next()
.ok_or(MixtcpError::InvalidHttpResponse)?;
let status: u16 = status_line
.split_whitespace()
.nth(1)
.and_then(|s| s.parse().ok())
.unwrap_or(200);
if let Some(body_start) = response_str.find("\r\n\r\n") {
let body = response_str[body_start + 4..].to_string();
Ok((status, body))
} else {
Err(MixtcpError::InvalidHttpResponse)
}
}
}
pub struct MixtcpResponse {
status: u16,
body: String,
}
impl MixtcpResponse {
pub fn status(&self) -> StatusCode {
StatusCode::from_u16(self.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
}
pub async fn text(self) -> Result<String, std::convert::Infallible> {
Ok(self.body)
}
}
fn init_logging() {
INIT.call_once_force(|state| {
if state.is_poisoned() {
eprintln!("Logger initialization was poisoned, retrying");
}
if !tracing::dispatcher::has_been_set() {
nym_bin_common::logging::setup_tracing_logger();
}
});
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
init_logging();
let test_url = "https://cloudflare.com/cdn-cgi/trace";
info!("Fetching with plain reqwest...");
let start = tokio::time::Instant::now();
let plain_response = reqwest::get(test_url).await?;
let plain_status = plain_response.status();
let plain_text = plain_response.text().await?;
let plain_duration = start.elapsed();
info!(
"Plain reqwest - Status: {}, Time: {:?}",
plain_status, plain_duration
);
info!("Setting up mixnet client...");
let client = MixtcpReqwestClient::new().await?;
let start = tokio::time::Instant::now();
let mixnet_response = client.get(test_url).await?;
let mixnet_status = mixnet_response.status();
let mixnet_text = mixnet_response.text().await?;
let mixnet_duration = start.elapsed();
info!(
"Mixnet reqwest - Status: {}, Time: {:?}",
mixnet_status, mixnet_duration
);
info!("Status codes match: {}", plain_status == mixnet_status);
info!(
"Response lengths match: {}",
plain_text.len() == mixnet_text.len()
);
let key_fields = ["fl=", "ip=", "ts=", "visit_scheme="];
for field in key_fields {
let plain_has = plain_text.contains(field);
let mixnet_has = mixnet_text.contains(field);
info!(
"Field '{}' - Plain: {}, Mixnet: {}",
field, plain_has, mixnet_has
);
assert_eq!(plain_has, mixnet_has, "Field '{}' mismatch", field);
}
info!("Plain reqwest time: {:?}", plain_duration);
info!("Mixnet reqwest time: {:?}", mixnet_duration);
let slowdown = mixnet_duration.as_millis() as f64 / plain_duration.as_millis() as f64;
info!("Mixnet slowdown: {:.1}x", slowdown);
info!("Both responses match");
Ok(())
}
+270
View File
@@ -0,0 +1,270 @@
#![allow(clippy::result_large_err)]
use mixtcp::{create_device, MixtcpError};
use rustls::{pki_types::ServerName, ClientConfig, ClientConnection};
use std::{
io::{self, Read, Write},
sync::Arc,
};
use tracing::info;
use nym_sdk::stream_wrapper::{IpMixStream, NetworkEnvironment};
use smoltcp::{
iface::{Config, Interface, SocketSet},
socket::tcp,
time::Instant,
wire::{HardwareAddress, IpAddress, IpCidr, Ipv4Address},
};
use std::sync::Once;
use std::time::Duration;
static INIT: Once = Once::new();
pub struct TlsOverTcp {
pub conn: ClientConnection,
}
impl TlsOverTcp {
pub fn new(domain: &str) -> Result<Self, MixtcpError> {
let mut root_store = rustls::RootCertStore::empty();
root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
let config = ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
let server_name = ServerName::try_from(domain)
.map_err(|_| MixtcpError::InvalidDnsName)?
.to_owned();
let conn = ClientConnection::new(Arc::new(config), server_name)
.map_err(|_| MixtcpError::TlsHandshakeFailed)?;
Ok(Self { conn })
}
/// Move data from TLS connection to TCP socket
pub fn write_tls(&mut self, socket: &mut tcp::Socket) -> Result<(), MixtcpError> {
let mut buf = [0u8; 4096];
while self.conn.wants_write() {
match self.conn.write_tls(&mut buf.as_mut_slice()) {
Ok(n) if n > 0 => {
socket
.send_slice(&buf[..n])
.map_err(|_| MixtcpError::TlsHandshakeFailed)?;
}
_ => break,
}
}
Ok(())
}
/// Move data from TCP socket to TLS connection
pub fn read_tls(&mut self, socket: &mut tcp::Socket) -> Result<(), MixtcpError> {
if socket.can_recv() {
let _ = socket.recv(|chunk| {
if !chunk.is_empty() {
inspect_tls_packet(chunk);
let _ = self.conn.read_tls(&mut io::Cursor::new(&mut *chunk));
let _ = self.conn.process_new_packets();
}
(chunk.len(), ())
});
}
Ok(())
}
pub fn send(&mut self, data: &[u8], socket: &mut tcp::Socket) -> Result<(), MixtcpError> {
self.conn
.writer()
.write_all(data)
.map_err(|_| MixtcpError::TlsHandshakeFailed)?;
self.write_tls(socket)
}
pub fn recv(&mut self, socket: &mut tcp::Socket) -> Result<Vec<u8>, MixtcpError> {
self.read_tls(socket)?;
let mut result = Vec::new();
let mut buf = vec![0u8; 4096];
match self.conn.reader().read(&mut buf) {
Ok(n) if n > 0 => result.extend_from_slice(&buf[..n]),
_ => {}
}
Ok(result)
}
}
fn inspect_tls_packet(data: &[u8]) {
if data.len() < 5 {
return;
}
let content_type = data[0];
if !(0x14..=0x17).contains(&content_type) {
return;
}
let version = u16::from_be_bytes([data[1], data[2]]);
let length = u16::from_be_bytes([data[3], data[4]]);
info!(
"TLS packet: ContentType={:#04x}, Version={:#06x}, Length={}",
content_type, version, length
);
if content_type == 0x16 && data.len() > 5 {
let handshake_type = data[5];
let handshake_types = match handshake_type {
0x01 => "ClientHello",
0x02 => "ServerHello",
0x0b => "Certificate",
0x0c => "ServerKeyExchange",
0x0d => "CertificateRequest",
0x0e => "ServerHelloDone",
0x0f => "CertificateVerify",
0x10 => "ClientKeyExchange",
0x14 => "Finished",
_ => "Unknown",
};
info!(
"Handshake type: {:#04x} ({}), Length: {}",
handshake_type, handshake_types, length
);
}
}
fn init_logging() {
INIT.call_once_force(|state| {
if state.is_poisoned() {
eprintln!("Logger initialization was poisoned, retrying");
}
if !tracing::dispatcher::has_been_set() {
nym_bin_common::logging::setup_tracing_logger();
}
});
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
init_logging();
let ipr_stream = IpMixStream::new(NetworkEnvironment::Mainnet).await?;
let (mut device, bridge, allocated_ips) = create_device(ipr_stream).await?;
info!("Allocated IP: {}", allocated_ips.ipv4);
tokio::spawn(async move {
bridge.run().await.unwrap();
});
let config = Config::new(HardwareAddress::Ip);
let mut iface = Interface::new(config, &mut device, Instant::now());
iface.update_ip_addrs(|ip_addrs| {
ip_addrs
.push(IpCidr::new(IpAddress::from(allocated_ips.ipv4), 32))
.unwrap();
});
iface
.routes_mut()
.add_default_ipv4_route(Ipv4Address::UNSPECIFIED)
.unwrap();
let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 16384]);
let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 4096]);
let tcp_socket = tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer);
let mut sockets = SocketSet::new(vec![]);
let tcp_handle = sockets.add(tcp_socket);
let target_ip = Ipv4Address::new(1, 1, 1, 1); // Pinging Cloudflare
let target_port = 443;
info!("Connecting to {}:{} through mixnet", target_ip, target_port);
let mut timestamp = Instant::from_millis(0);
let start = tokio::time::Instant::now();
let mut connected = false;
let mut tls = None;
let handshake_completed = false;
loop {
if start.elapsed() > Duration::from_secs(120) {
info!("Test timeout after 120 seconds");
break;
}
iface.poll(timestamp, &mut device, &mut sockets);
timestamp += smoltcp::time::Duration::from_millis(1);
let socket = sockets.get_mut::<tcp::Socket>(tcp_handle);
if !connected && !socket.is_open() {
match socket.connect(iface.context(), (target_ip, target_port), 49152) {
Ok(_) => {
info!("TCP connect started");
connected = true;
}
Err(e) => {
info!("TCP connect failed: {}", e);
break;
}
}
}
if start.elapsed().as_secs().is_multiple_of(5) && start.elapsed().as_millis() % 1000 < 100 {
info!(
"State: TCP={:?}, established={}, can_send={}, can_recv={}",
socket.state(),
socket.state() == tcp::State::Established,
socket.may_send(),
socket.can_recv()
);
}
if socket.state() == tcp::State::Established && tls.is_none() {
info!("TCP established - creating TLS connection");
match TlsOverTcp::new("cloudflare.com") {
Ok(t) => tls = Some(t),
Err(e) => {
info!("TLS create failed: {}", e);
break;
}
}
}
if let Some(ref mut tls_conn) = tls {
let _ = tls_conn.read_tls(socket);
let _ = tls_conn.write_tls(socket);
if start.elapsed().as_secs().is_multiple_of(10)
&& start.elapsed().as_millis() % 1000 < 100
{
info!(
"TLS state: handshaking={}, wants_read={}, wants_write={}",
tls_conn.conn.is_handshaking(),
tls_conn.conn.wants_read(),
tls_conn.conn.wants_write()
);
}
if !tls_conn.conn.is_handshaking() && !handshake_completed {
info!("TLS handshake complete");
info!(
"TLS verification: handshake_complete=true, wants_read={}, wants_write={}",
tls_conn.conn.wants_read(),
tls_conn.conn.wants_write()
);
match tls_conn.recv(socket) {
Ok(data) if data.is_empty() => {
info!("No unexpected application data waiting to be read");
}
Ok(data) => {
info!("Unexpected application data received: {} bytes", data.len());
}
Err(e) => {
info!("TLS recv check failed: {}", e);
}
}
info!("TLS handshake successful with cloudflare");
break;
}
}
tokio::time::sleep(Duration::from_millis(1)).await;
}
info!("Test completed");
Ok(())
}
+120
View File
@@ -0,0 +1,120 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-2.0-only
use crate::error::MixtcpError;
use nym_ip_packet_requests::codec::MultiIpPacketCodec;
use nym_sdk::stream_wrapper::IpMixStream;
use tokio::sync::mpsc;
use tracing::{error, info};
/// Asynchronous bridge between smoltcp device and Mixnet.
///
/// This component runs in a separate task and handles all asynchronous
/// operations required for outbound communication. It receives packets
/// from the device via channels, bundles them according to IPR protocol
/// (MultiIpPacketCodec) and transmits them through the Mixnet.
///
/// # Packet Processing Flow
///
/// Outgoing packets:
/// - Receive from device via channel
/// - Bundle using MultiIpPacketCodec
/// - Send through mixnet via send_ip_packet()
///
/// Incoming packets:
/// - Poll mixnet with handle_incoming()
/// - Forward to device via channel
/// - Device queues for smoltcp consumption
pub struct NymIprBridge {
/// Connected IPR stream for mixnet communication
stream: IpMixStream,
/// Channel for receiving outgoing packets from device
tx_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
/// Channel for sending incoming packets to device
rx_sender: mpsc::UnboundedSender<Vec<u8>>,
}
impl NymIprBridge {
pub fn new(
stream: IpMixStream,
tx_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
rx_sender: mpsc::UnboundedSender<Vec<u8>>,
) -> Self {
Self {
stream,
tx_receiver,
rx_sender,
}
}
/// Runs the bridge event loop.
///
/// This method should be spawned in a separate task. It continuously:
/// - Processes outgoing packets from the device
/// - Polls for incoming packets from the mixnet
/// - Maintains packet statistics
///
/// The loop exits when channels are closed or an error occurs.
pub async fn run(mut self) -> Result<(), MixtcpError> {
info!("Starting Nym IPR bridge");
let mut packets_sent = 0;
let mut packets_received = 0;
loop {
tokio::select! {
// Outgoing packets from smoltcp layer above.
Some(packet) = self.tx_receiver.recv() => {
info!("Bridge sending {} byte packet to mixnet", packet.len());
// Log packet details for debugging
if packet.len() >= 20 {
let version = (packet[0] >> 4) & 0xF;
let proto = packet[9];
let src_ip = &packet[12..16];
let dst_ip = &packet[16..20];
info!(
"Outgoing IPv{} packet: proto={}, src={}.{}.{}.{}, dst={}.{}.{}.{}",
version, proto,
src_ip[0], src_ip[1], src_ip[2], src_ip[3],
dst_ip[0], dst_ip[1], dst_ip[2], dst_ip[3]
);
}
// Necessary to bundle for IPR! See stream_wrapper_ipr.rs tests.
let bundled = MultiIpPacketCodec::bundle_one_packet(packet.into());
if let Err(e) = self.stream.send_ip_packet(&bundled).await {
error!("Failed to send packet through mixnet: {}", e);
} else {
packets_sent += 1;
info!("Total packets sent: {}", packets_sent);
}
}
// Poll for incoming packets from mixnet
Ok(packets) = self.stream.handle_incoming() => {
if !packets.is_empty() {
info!("Bridge received {} packets from mixnet", packets.len());
for packet in packets {
info!("Incoming packet: {} bytes", packet.len());
// Forward to device via channel
if self.rx_sender.send(packet.to_vec()).is_err() {
error!("Failed to send packet to device - receiver dropped");
return Err(MixtcpError::ChannelClosed);
}
packets_received += 1;
info!("Total packets received: {}", packets_received);
}
}
}
else => {
info!("Bridge shutting down. Sent: {}, Received: {}", packets_sent, packets_received);
break;
}
}
}
Ok(())
}
}
+165
View File
@@ -0,0 +1,165 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-2.0-only
use smoltcp::{
phy::{Device, DeviceCapabilities, Medium, RxToken, TxToken},
time::Instant,
};
use std::collections::VecDeque;
use tokio::sync::mpsc;
use tracing::{info, warn};
/// # Overview
/// We need something to bridge the async / sync weirdness (Device trait fns are sync, IpMixStream fns are
/// async) in a way that allows for the `NymIprDevice` to look and act like any other device.
///
/// We need to be polling the queue to/from the NymIprBridge, hence the addition of the
/// mpsc channels in the Device struct and the extra fns.
///
/// # Architecture
/// smoltcp (sync) <-> NymIprDevice <-> channels <-> NymIprBridge <-> Mixnet (async)
///
/// The device maintains a receive queue for packets coming from the mixnet and
/// uses unbounded channels to communicate with the bridge task that handles the
/// actual mixnet I/O. We poll the channel in receive() to move packets via mpsc
/// from async to sync world.
///
/// This way no blocking from smoltcp + allows for concurrency.
///
/// Adapter pattern between sync polling-based I/O and async event-based I/O.
pub struct NymIprDevice {
// Receive queue for packets coming from the mixnet
rx_queue: VecDeque<Vec<u8>>,
// Channel to send packets to the bridge task
tx_sender: mpsc::UnboundedSender<Vec<u8>>,
// Device capabilities
capabilities: DeviceCapabilities,
// Channel to receive packets from the bridge task
rx_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
}
impl NymIprDevice {
pub fn new(
tx_sender: mpsc::UnboundedSender<Vec<u8>>,
rx_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
) -> Self {
let mut capabilities = DeviceCapabilities::default();
capabilities.medium = Medium::Ip;
// Standard MTU for IP packets - TODO make configurable
capabilities.max_transmission_unit = 1500;
// Process one packet at a time. TODO experiment with this
capabilities.max_burst_size = Some(1);
Self {
rx_queue: VecDeque::new(),
tx_sender,
capabilities,
rx_receiver,
}
}
/// Poll for new packets from the bridge
fn poll_rx_queue(&mut self) {
// Try to receive all available packets without blocking, queue them for smoltcp consumption.
while let Ok(packet) = self.rx_receiver.try_recv() {
info!("Received packet of {} bytes from bridge", packet.len());
self.rx_queue.push_back(packet);
}
}
pub fn tx_sender(&self) -> mpsc::UnboundedSender<Vec<u8>> {
self.tx_sender.clone()
}
/// Get the receiver for external use
pub fn rx_receiver(&self) -> mpsc::UnboundedReceiver<Vec<u8>> {
// Create a new channel and return the receiver
// This is a bit of a hack but necessary for the current architecture
let (_tx, rx) = mpsc::unbounded_channel();
// We just need the receiver for testing
rx
}
}
impl Device for NymIprDevice {
type RxToken<'a>
= NymRxToken
where
Self: 'a;
type TxToken<'a>
= NymTxToken
where
Self: 'a;
fn receive(&mut self, _timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
// Poll for new packets from the async bridge
self.poll_rx_queue();
// Check if we have a packet to deliver
let packet = self.rx_queue.pop_front()?;
// Create tokens - RxToken owns the packet data
let rx_token = NymRxToken { buffer: packet };
let tx_token = NymTxToken {
tx_sender: self.tx_sender.clone(),
};
Some((rx_token, tx_token))
}
fn transmit(&mut self, _timestamp: Instant) -> Option<Self::TxToken<'_>> {
// We can always transmit (channel will buffer)
Some(NymTxToken {
tx_sender: self.tx_sender.clone(),
})
}
fn capabilities(&self) -> DeviceCapabilities {
self.capabilities.clone()
}
}
/// Receive token - owns the packet buffer
pub struct NymRxToken {
buffer: Vec<u8>,
}
impl RxToken for NymRxToken {
fn consume<R, F>(self, f: F) -> R
where
F: FnOnce(&[u8]) -> R,
{
info!("Consuming RX packet of {} bytes", self.buffer.len());
f(&self.buffer)
}
}
/// Transmit token - holds channel sender
pub struct NymTxToken {
tx_sender: mpsc::UnboundedSender<Vec<u8>>,
}
impl TxToken for NymTxToken {
fn consume<R, F>(self, len: usize, f: F) -> R
where
F: FnOnce(&mut [u8]) -> R,
{
// Create buffer for the packet
let mut buffer = vec![0u8; len];
// Let smoltcp fill the packet
let result = f(&mut buffer);
// Send raw packet to the bridge task for transmission
if let Err(e) = self.tx_sender.send(buffer) {
warn!("Failed to send packet to bridge: {}", e);
} else {
info!("Sent {} byte packet to bridge", len);
}
result
}
}
+52
View File
@@ -0,0 +1,52 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-2.0-only
use thiserror::Error;
#[derive(Error, Debug)]
pub enum MixtcpError {
#[error("Channel closed")]
ChannelClosed,
#[error("Not connected to IPR")]
NotConnected,
#[error("Nym SDK error: {0}")]
NymSdk(#[from] nym_sdk::Error),
#[error("TLS handshake failed")]
TlsHandshakeFailed,
#[error("TLS encrypt/decrypt error")]
TlsCrypto,
#[error("DNS err placeholder")]
InvalidDnsName,
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("HTTP parse failed")]
HttpParseFailed,
#[error("Invalid URL")]
InvalidUrl,
#[error("Mixnet connection failed")]
MixnetConnectionFailed,
#[error("Request timeout")]
Timeout,
#[error("TCP connection failed")]
TcpConnectionFailed,
#[error("Response read failed")]
ResponseReadFailed,
#[error("No response received")]
NoResponseReceived,
#[error("Invalid HTTP response")]
InvalidHttpResponse,
}
+49
View File
@@ -0,0 +1,49 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-2.0-only
mod bridge;
mod device;
mod error;
pub use bridge::NymIprBridge;
pub use device::NymIprDevice;
pub use error::MixtcpError;
use nym_ip_packet_requests::IpPair;
use nym_sdk::stream_wrapper::IpMixStream;
use tokio::sync::mpsc;
/// Create a connected smoltcp device and async bridge for the tunneling packets through the
/// Mixnet to remote hosts via an IPR.
///
/// This function handles the complete setup process:
/// - Ensures the IPR stream is connected
/// - Retrieves allocated IP addresses
/// - Creates communication channels
/// - Constructs the device and bridge components
pub async fn create_device(
mut ipr_stream: IpMixStream,
) -> Result<(NymIprDevice, NymIprBridge, IpPair), MixtcpError> {
// Ensure the stream is connected
if !ipr_stream.is_connected() {
ipr_stream.connect_tunnel().await?;
}
// Get the allocated IPs before moving the stream - need these for proper packet creation
// further 'up' the flow in the code calling this fn (see examples/tcp_connect.rs).
let allocated_ips = *ipr_stream
.allocated_ips()
.ok_or(MixtcpError::NotConnected)?;
// Create channels for device <-> bridge communication
let (tx_to_bridge, tx_from_device) = mpsc::unbounded_channel();
let (rx_to_device, rx_from_bridge) = mpsc::unbounded_channel();
// Create device
let device = NymIprDevice::new(tx_to_bridge, rx_from_bridge);
// Create bridge (moves ipr_stream)
let bridge = NymIprBridge::new(ipr_stream, tx_from_device, rx_to_device);
Ok((device, bridge, allocated_ips))
}
+2 -3
View File
@@ -16,10 +16,9 @@ workspace = true
bincode.workspace = true
bytes.workspace = true
futures.workspace = true
nym-ip-packet-requests = { workspace = true }
nym-sdk = { workspace = true }
thiserror.workspace = true
tokio-util.workspace = true
tokio.workspace = true
tracing.workspace = true
nym-sdk = { workspace = true }
nym-ip-packet-requests = { workspace = true }
+1 -3
View File
@@ -30,8 +30,6 @@ enum ConnectionState {
Disconnected,
Connecting,
Connected,
#[allow(unused)]
Disconnecting,
}
pub struct IprClientConnect {
@@ -83,7 +81,7 @@ impl IprClientConnect {
self.listen_for_connect_response(request_id).await
}
async fn send_connect_request(&self, ip_packet_router_address: Recipient) -> Result<u64> {
async fn send_connect_request(&mut self, ip_packet_router_address: Recipient) -> Result<u64> {
let (request, request_id) = IpPacketRequest::new_connect_request(None);
// We use 20 surbs for the connect request because typically the IPR is configured to have
+7
View File
@@ -18,6 +18,9 @@ pub enum Error {
)]
ReceivedResponseWithNewVersion { expected: u8, received: u8 },
#[error("got reply for connect request, but it appears intended for the wrong address?")]
GotReplyIntendedForWrongAddress,
#[error("unexpected connect response")]
UnexpectedConnectResponse,
@@ -41,6 +44,10 @@ pub enum Error {
#[error(transparent)]
Bincode(#[from] bincode::Error),
#[error("failed to create connect request")]
FailedToCreateConnectRequest {
source: nym_ip_packet_requests::sign::SignatureError,
},
}
// Result type based on our error type
@@ -261,7 +261,7 @@ where
self.handle_forwarding_request(receiver_idx, forward_data)
.await
}
typ @ LpFrameKind::Opaque => {
typ @ (LpFrameKind::Opaque | LpFrameKind::Stream) => {
// Neither registration nor forwarding - unknown payload type
warn!(
"Unknown transport payload type from {remote} (receiver_idx={receiver_idx}). dropping {} bytes",
+5611
View File
File diff suppressed because it is too large Load Diff
+5
View File
@@ -33,6 +33,7 @@ nym-credentials-interface = { workspace = true }
nym-credential-storage = { workspace = true }
nym-credential-utils = { workspace = true }
nym-network-defaults = { workspace = true }
nym-lp = { workspace = true }
nym-sphinx = { workspace = true }
nym-statistics-common = { workspace = true }
nym-task = { workspace = true }
@@ -64,6 +65,10 @@ url = { workspace = true }
toml = { workspace = true }
tempfile = { workspace = true }
nym-ip-packet-requests = { path = "../../../common/ip-packet-requests" }
pnet_packet = { workspace = true }
nym-config = { path = "../../../common/config" }
# tcpproxy dependencies
clap = { workspace = true, features = ["derive"] }
anyhow.workspace = true
@@ -47,7 +47,7 @@ async fn main() {
// split_sender shares the stream_mode flag
println!("\nTesting split_sender (shares stream_mode)");
let sender = client.split_sender();
let mut sender = client.split_sender();
let result = sender
.send_plain_message(our_address, "this should also fail")
.await;
+61
View File
@@ -1,6 +1,9 @@
use nym_validator_client::nyxd::error::NyxdError;
use std::path::PathBuf;
use nym_ip_packet_requests::v8::response::{ConnectFailureReason, IpPacketResponseData};
use nym_validator_client::nym_api::error::NymAPIError;
/// Top-level Error enum for the mixnet client and its relevant types.
#[derive(Debug, thiserror::Error)]
pub enum Error {
@@ -108,6 +111,64 @@ pub enum Error {
#[error("Stream subsystem failed to initialise: reconstructed_receiver unavailable")]
StreamInitFailure,
#[error("nymsphinx receiver error: {0}")]
MessageRecovery(#[from] nym_sphinx::receiver::MessageRecoveryError),
#[error("client not connected")]
IprStreamClientNotConnected,
#[error("client already connected or connecting")]
IprStreamClientAlreadyConnectedOrConnecting,
#[error("trying to send an anonymous reply but peer surb tag is not set")]
MixStreamSurbTagNotSet,
#[error("trying to send an outgoing message but receipient address is not set")]
MixStreamRecipientNotSet,
#[error("listening for connection response timed out")]
IPRConnectResponseTimeout,
#[error("no next frame: assuming stream is closed")]
IPRClientStreamClosed,
#[error("expected control response, got {0:?}")]
UnexpectedResponseType(IpPacketResponseData),
#[error("connect denied: {0:?}")]
ConnectDenied(ConnectFailureReason),
#[allow(clippy::result_large_err)]
#[error("api directory error: {0}")]
GatewayDirectoryError(#[from] NymAPIError),
#[error("did not receive Validator endpoint details")]
NoValidatorDetailsAvailable,
#[error("did not receive URL")]
NoValidatorAPIUrl,
#[error("did not receive NymVPN API URL")]
NoNymAPIUrl,
#[error("no available gateway")]
NoGatewayAvailable,
#[error("no IPR address on selected gateway")]
NoIPRAvailable,
#[error("message version check failed: {0}")]
IPRMessageVersionCheckFailed(String),
#[error("no response id found in connect response")]
IPRNoId,
#[error("Could not find peer address or surb tag")]
MixStreamNoPeerOrSurb,
#[error("No network env specified on new MixStream")]
MissingStreamConfig,
}
impl Error {
+14
View File
@@ -0,0 +1,14 @@
// Copyright 2023-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
mod connect;
mod error;
pub mod helpers;
pub mod listener;
pub use connect::IprClientConnect;
pub use error::Error;
pub use listener::{IprListener, MixnetMessageOutcome};
// Re-export the currently used version
pub use nym_ip_packet_requests::v8 as current;
@@ -0,0 +1,7 @@
# Modified `nym-ip-packet-client`
This set of code is made up of functions from several crates from the `nym-vpn-client` monorepo which had to be imported and modified to avoid a circular dependency on the `nym-sdk` package for use in the `mixnet_stream_wrapper_ipr` module, and is made up of:
- a modified version of (basically) the entire `nym-ip-packet-client`
- a set of IP Packet helper functions from the `nym-gateway-probe`
- a set of helpers & types from the `nym-connection-monitor`
All of these can be found in [`nym-vpn-client/nym-vpn-core/crates/`](https://github.com/nymtech/nym-vpn-client/tree/develop/nym-vpn-core/crates).
@@ -0,0 +1,196 @@
// Copyright 2023-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use std::{sync::Arc, time::Duration};
pub use crate::mixnet::{
InputMessage, MixnetClient, MixnetClientSender, MixnetMessageSender, Recipient,
TransmissionLane,
};
use nym_ip_packet_requests::IpPair;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use super::error::{Error, Result};
use crate::{
ip_packet_client::current::{
request::IpPacketRequest,
response::{
ConnectResponse, ConnectResponseReply, ControlResponse, IpPacketResponse,
IpPacketResponseData,
},
},
ip_packet_client::helpers::check_ipr_message_version,
};
pub type SharedMixnetClient = Arc<tokio::sync::Mutex<Option<MixnetClient>>>;
const IPR_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Clone, Debug, PartialEq, Eq)]
enum ConnectionState {
Disconnected,
Connecting,
Connected,
#[allow(unused)]
Disconnecting,
}
pub struct IprClientConnect {
// During connection we need the mixnet client, but once connected we expect to setup a channel
// from the main mixnet listener at the top-level.
// As such, we drop the shared mixnet client once we're connected.
mixnet_client: SharedMixnetClient,
mixnet_sender: MixnetClientSender,
connected: ConnectionState,
cancel_token: CancellationToken,
}
impl IprClientConnect {
pub async fn new(mixnet_client: SharedMixnetClient, cancel_token: CancellationToken) -> Self {
let mixnet_sender = mixnet_client.lock().await.as_ref().unwrap().split_sender();
Self {
mixnet_client,
mixnet_sender,
connected: ConnectionState::Disconnected,
cancel_token,
}
}
pub async fn connect(&mut self, ip_packet_router_address: Recipient) -> Result<IpPair> {
if self.connected != ConnectionState::Disconnected {
return Err(Error::AlreadyConnected);
}
tracing::info!("Connecting to exit gateway");
self.connected = ConnectionState::Connecting;
match self.connect_inner(ip_packet_router_address).await {
Ok(ips) => {
debug!("Successfully connected to the ip-packet-router");
self.connected = ConnectionState::Connected;
Ok(ips)
}
Err(err) => {
error!("Failed to connect to the ip-packet-router: {:?}", err);
self.connected = ConnectionState::Disconnected;
Err(err)
}
}
}
async fn connect_inner(&mut self, ip_packet_router_address: Recipient) -> Result<IpPair> {
let request_id = self.send_connect_request(ip_packet_router_address).await?;
debug!("Waiting for reply...");
self.listen_for_connect_response(request_id).await
}
async fn send_connect_request(&mut self, ip_packet_router_address: Recipient) -> Result<u64> {
let (request, request_id) = IpPacketRequest::new_connect_request(None);
// We use 20 surbs for the connect request because typically the IPR is configured to have
// a min threshold of 10 surbs that it reserves for itself to request additional surbs.
let surbs = 20;
self.mixnet_sender
.send(create_input_message(
ip_packet_router_address,
request,
surbs,
))
.await
.map_err(|err| Error::SdkError(Box::new(err)))?;
Ok(request_id)
}
async fn handle_connect_response(&self, response: ConnectResponse) -> Result<IpPair> {
debug!("Handling dynamic connect response");
match response.reply {
ConnectResponseReply::Success(r) => Ok(r.ips),
ConnectResponseReply::Failure(reason) => Err(Error::ConnectRequestDenied { reason }),
}
}
async fn handle_ip_packet_router_response(&self, response: IpPacketResponse) -> Result<IpPair> {
let control_response = match response.data {
IpPacketResponseData::Control(control_response) => control_response,
_ => {
error!("Received non-control response while waiting for connect response");
return Err(Error::UnexpectedConnectResponse);
}
};
match *control_response {
ControlResponse::Connect(resp) => self.handle_connect_response(resp).await,
response => {
error!("Unexpected response: {response:?}");
Err(Error::UnexpectedConnectResponse)
}
}
}
async fn listen_for_connect_response(&self, request_id: u64) -> Result<IpPair> {
// Connecting is basically synchronous from the perspective of the mixnet client, so it's safe
// to just grab ahold of the mutex and keep it until we get the response.
let mut mixnet_client_handle = self.mixnet_client.lock().await;
let mixnet_client = mixnet_client_handle.as_mut().unwrap();
let timeout = sleep(IPR_CONNECT_TIMEOUT);
tokio::pin!(timeout);
loop {
tokio::select! {
_ = self.cancel_token.cancelled() => {
error!("Cancelled while waiting for reply to connect request");
return Err(Error::Cancelled);
},
_ = &mut timeout => {
error!("Timed out waiting for reply to connect request");
return Err(Error::TimeoutWaitingForConnectResponse);
},
msgs = mixnet_client.wait_for_messages() => match msgs {
None => {
return Err(Error::NoMixnetMessagesReceived);
}
Some(msgs) => {
for msg in msgs {
// Confirm that the version is correct
if let Err(err) = check_ipr_message_version(&msg) {
tracing::error!("Mixnet message version mismatch: {err}");
break;
}
// Then we deserialize the message
tracing::debug!("IprClient: got message while waiting for connect response");
let Ok(response) = IpPacketResponse::from_reconstructed_message(&msg) else {
// This is ok, it's likely just one of our self-pings
tracing::debug!("Failed to deserialize mixnet message");
continue;
};
if response.id() == Some(request_id) {
tracing::debug!("Got response with matching id");
return self.handle_ip_packet_router_response(response).await;
}
}
}
}
}
}
}
}
fn create_input_message(
recipient: Recipient,
request: IpPacketRequest,
surbs: u32,
) -> InputMessage {
InputMessage::new_anonymous(
recipient,
request.to_bytes().unwrap(),
surbs,
TransmissionLane::General,
None,
)
}
@@ -0,0 +1,79 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::ip_packet_client::current::response::ConnectFailureReason;
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("nym sdk")]
// SdkError(#[source] Box<nym_sdk::Error>),
SdkError(#[source] Box<crate::error::Error>),
#[error(
"received response with version v{received}, the client is too new and can only understand v{expected}"
)]
ReceivedResponseWithOldVersion { expected: u8, received: u8 },
#[error(
"received response with version v{received}, the client is too old and can only understand v{expected}"
)]
ReceivedResponseWithNewVersion { expected: u8, received: u8 },
#[error("got reply for connect request, but it appears intended for the wrong address?")]
GotReplyIntendedForWrongAddress,
#[error("unexpected connect response")]
UnexpectedConnectResponse,
#[error("mixnet client stopped returning responses")]
NoMixnetMessagesReceived,
#[error("timeout waiting for connect response from exit gateway (ipr)")]
TimeoutWaitingForConnectResponse,
#[error("connection cancelled")]
Cancelled,
#[error("connect request denied: {reason}")]
ConnectRequestDenied { reason: ConnectFailureReason },
#[error("failed to get version from message")]
NoVersionInMessage,
#[error("already connected to the mixnet")]
AlreadyConnected,
#[error("failed to create connect request")]
FailedToCreateConnectRequest {
source: nym_ip_packet_requests::sign::SignatureError,
},
/// Below error types are from the nym-connection-monitor crate
#[error(
"timeout waiting for mixnet self ping, the entry gateway is not routing our mixnet traffic"
)]
TimeoutWaitingForMixnetSelfPing,
#[error("failed to serialize message")]
FailedToSerializeMessage {
#[from]
source: bincode::Error,
},
#[error("failed to create icmp echo request packet")]
IcmpEchoRequestPacketCreationFailure,
#[error("failed to create icmp packet")]
IcmpPacketCreationFailure,
#[error("failed to create ipv4 packet")]
Ipv4PacketCreationFailure,
}
impl From<crate::error::Error> for Error {
fn from(err: crate::error::Error) -> Self {
Error::SdkError(Box::new(err))
}
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -0,0 +1,326 @@
// Copyright 2023-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use super::error::{Error, Result};
use crate::ip_packet_client::current::VERSION as CURRENT_VERSION;
pub use crate::mixnet::ReconstructedMessage;
use nym_config::defaults::mixnet_vpn::{NYM_TUN_DEVICE_ADDRESS_V4, NYM_TUN_DEVICE_ADDRESS_V6};
use nym_ip_packet_requests::IpPair;
use bytes::Bytes;
use pnet_packet::{
icmp::{
echo_reply::EchoReplyPacket,
echo_request::{EchoRequestPacket, MutableEchoRequestPacket},
IcmpPacket,
},
icmpv6,
ipv4::{Ipv4Packet, MutableIpv4Packet},
ipv6::{Ipv6Packet, MutableIpv6Packet},
Packet,
};
use std::cmp::Ordering;
use std::net::{Ipv4Addr, Ipv6Addr};
/**
* This function is from the original nym-ip-packet-client crate.
*/
pub(crate) fn check_ipr_message_version(message: &ReconstructedMessage) -> Result<()> {
// Assuming it's a IPR message, it will have a version as its first byte
if let Some(version) = message.message.first() {
match version.cmp(&CURRENT_VERSION) {
Ordering::Greater => Err(Error::ReceivedResponseWithNewVersion {
expected: CURRENT_VERSION,
received: *version,
}),
Ordering::Less => Err(Error::ReceivedResponseWithOldVersion {
expected: CURRENT_VERSION,
received: *version,
}),
Ordering::Equal => {
// We're good
Ok(())
}
}
} else {
Err(Error::NoVersionInMessage)
}
}
/**
* Functions below are from the nym-connection-monitor crate.
*/
pub fn create_icmpv4_echo_request(
sequence_number: u16,
identifier: u16,
) -> Result<EchoRequestPacket<'static>> {
let buffer = vec![0; 64];
let mut icmp_echo_request = MutableEchoRequestPacket::owned(buffer)
.ok_or(Error::IcmpEchoRequestPacketCreationFailure)?;
// Configure the ICMP echo request packet
icmp_echo_request.set_identifier(identifier);
icmp_echo_request.set_sequence_number(sequence_number);
icmp_echo_request.set_icmp_type(pnet_packet::icmp::IcmpTypes::EchoRequest);
icmp_echo_request.set_icmp_code(pnet_packet::icmp::IcmpCode::new(0));
// Calculate checksum once we've set all the fields
let icmp_packet =
IcmpPacket::new(icmp_echo_request.packet()).ok_or(Error::IcmpPacketCreationFailure)?;
let checksum = pnet_packet::icmp::checksum(&icmp_packet);
icmp_echo_request.set_checksum(checksum);
Ok(icmp_echo_request.consume_to_immutable())
}
pub fn create_icmpv6_echo_request(
sequence_number: u16,
identifier: u16,
source: &Ipv6Addr,
destination: &Ipv6Addr,
) -> Result<icmpv6::echo_request::EchoRequestPacket<'static>> {
let buffer = vec![0; 64];
// let mut icmp_echo_request = MutableEchoRequestPacket::owned(buffer)
let mut icmp_echo_request = icmpv6::echo_request::MutableEchoRequestPacket::owned(buffer)
.ok_or(Error::IcmpEchoRequestPacketCreationFailure)?;
// Configure the ICMP echo request packet
icmp_echo_request.set_identifier(identifier);
icmp_echo_request.set_sequence_number(sequence_number);
icmp_echo_request.set_icmpv6_type(pnet_packet::icmpv6::Icmpv6Types::EchoRequest);
icmp_echo_request.set_icmpv6_code(pnet_packet::icmpv6::Icmpv6Code::new(0));
// Calculate checksum once we've set all the fields
let icmp_packet = icmpv6::Icmpv6Packet::new(icmp_echo_request.packet())
.ok_or(Error::IcmpPacketCreationFailure)?;
let checksum = pnet_packet::icmpv6::checksum(&icmp_packet, source, destination);
icmp_echo_request.set_checksum(checksum);
Ok(icmp_echo_request.consume_to_immutable())
}
pub fn wrap_icmp_in_ipv4(
icmp_echo_request: EchoRequestPacket,
source: Ipv4Addr,
destination: Ipv4Addr,
) -> Result<Ipv4Packet> {
// 20 bytes for IPv4 header + ICMP payload
let total_length = 20 + icmp_echo_request.packet().len();
// IPv4 header + ICMP payload
let ipv4_buffer = vec![0u8; 20 + icmp_echo_request.packet().len()];
let mut ipv4_packet =
MutableIpv4Packet::owned(ipv4_buffer).ok_or(Error::Ipv4PacketCreationFailure)?;
ipv4_packet.set_version(4);
ipv4_packet.set_header_length(5);
ipv4_packet.set_total_length(total_length as u16);
ipv4_packet.set_ttl(64);
ipv4_packet.set_next_level_protocol(pnet_packet::ip::IpNextHeaderProtocols::Icmp);
ipv4_packet.set_source(source);
ipv4_packet.set_destination(destination);
ipv4_packet.set_flags(pnet_packet::ipv4::Ipv4Flags::DontFragment);
ipv4_packet.set_checksum(0);
ipv4_packet.set_payload(icmp_echo_request.packet());
let ipv4_checksum = compute_ipv4_checksum(&ipv4_packet.to_immutable());
ipv4_packet.set_checksum(ipv4_checksum);
Ok(ipv4_packet.consume_to_immutable())
}
pub fn wrap_icmp_in_ipv6(
icmp_echo_request: icmpv6::echo_request::EchoRequestPacket,
source: Ipv6Addr,
destination: Ipv6Addr,
) -> Result<Ipv6Packet> {
let ipv6_buffer = vec![0u8; 40 + icmp_echo_request.packet().len()];
let mut ipv6_packet =
MutableIpv6Packet::owned(ipv6_buffer).ok_or(Error::Ipv4PacketCreationFailure)?;
ipv6_packet.set_version(6);
ipv6_packet.set_payload_length(icmp_echo_request.packet().len() as u16);
ipv6_packet.set_next_header(pnet_packet::ip::IpNextHeaderProtocols::Icmpv6);
ipv6_packet.set_hop_limit(64);
ipv6_packet.set_source(source);
ipv6_packet.set_destination(destination);
ipv6_packet.set_payload(icmp_echo_request.packet());
Ok(ipv6_packet.consume_to_immutable())
}
// Compute IPv4 checksum: sum all 16-bit words, add carry, take one's complement
pub(crate) fn compute_ipv4_checksum(header: &Ipv4Packet) -> u16 {
// Header length in 16-bit words
let len = header.get_header_length() as usize * 2;
let mut sum = 0u32;
for i in 0..len {
let word = ((header.packet()[2 * i] as u32) << 8) | header.packet()[2 * i + 1] as u32;
sum += word;
}
// Add the carry
while (sum >> 16) > 0 {
sum = (sum & 0xFFFF) + (sum >> 16);
}
// One's complement
!sum as u16
}
pub(crate) fn is_icmp_echo_reply(packet: &Bytes) -> Option<(u16, Ipv4Addr, Ipv4Addr)> {
if let Some(ipv4_packet) = Ipv4Packet::new(packet) {
if let Some(icmp_packet) = IcmpPacket::new(ipv4_packet.payload()) {
if let Some(echo_reply) = EchoReplyPacket::new(icmp_packet.packet()) {
return Some((
echo_reply.get_identifier(),
ipv4_packet.get_source(),
ipv4_packet.get_destination(),
));
}
}
}
None
}
pub(crate) fn is_icmp_v6_echo_reply(packet: &Bytes) -> Option<(u16, Ipv6Addr, Ipv6Addr)> {
if let Some(ipv6_packet) = Ipv6Packet::new(packet) {
if let Some(icmp_packet) = IcmpPacket::new(ipv6_packet.payload()) {
if let Some(echo_reply) =
pnet_packet::icmpv6::echo_reply::EchoReplyPacket::new(icmp_packet.packet())
{
return Some((
echo_reply.get_identifier(),
ipv6_packet.get_source(),
ipv6_packet.get_destination(),
));
}
}
}
None
}
/**
* Types and functions below are from the nym-connection-monitor crate.
* The `send_ping_v4` + `_v6` functions have been modified to work with the IPMixStream wrapper instead of relying on a shared MixnetClient.
*/
#[derive(Debug)]
pub enum ConnectionStatusEvent {
MixnetSelfPing,
Icmpv4IprTunDevicePingReply,
Icmpv6IprTunDevicePingReply,
Icmpv4IprExternalPingReply,
Icmpv6IprExternalPingReply,
}
#[derive(Debug, Clone, Default)]
pub struct IpPingReplies {
pub ipr_tun_ip_v4: bool,
pub ipr_tun_ip_v6: bool,
pub external_ip_v4: bool,
pub external_ip_v6: bool,
}
impl IpPingReplies {
pub fn new() -> Self {
Self::default()
}
pub fn register_event(&mut self, event: &ConnectionStatusEvent) {
match event {
ConnectionStatusEvent::MixnetSelfPing => {}
ConnectionStatusEvent::Icmpv4IprTunDevicePingReply => self.ipr_tun_ip_v4 = true,
ConnectionStatusEvent::Icmpv6IprTunDevicePingReply => self.ipr_tun_ip_v6 = true,
ConnectionStatusEvent::Icmpv4IprExternalPingReply => self.external_ip_v4 = true,
ConnectionStatusEvent::Icmpv6IprExternalPingReply => self.external_ip_v6 = true,
}
}
}
pub enum IcmpBeaconReply {
TunDeviceReply,
ExternalPingReply(Ipv4Addr),
}
pub enum Icmpv6BeaconReply {
TunDeviceReply,
ExternalPingReply(Ipv6Addr),
}
pub fn icmp_identifier() -> u16 {
8475
}
// TODO: send_ping_v4 and send_ping_v6 removed temporarily — will be re-added
// when IpMixStream is rebuilt on top of MixnetStream + LP frame envelope
pub fn check_for_icmp_beacon_reply(
packet: &Bytes,
icmp_beacon_identifier: u16,
our_ips: IpPair,
) -> Option<ConnectionStatusEvent> {
match is_icmp_beacon_reply(packet, icmp_beacon_identifier, our_ips.ipv4) {
Some(IcmpBeaconReply::TunDeviceReply) => {
tracing::debug!("Received ping response from ipr tun device");
return Some(ConnectionStatusEvent::Icmpv4IprTunDevicePingReply);
}
Some(IcmpBeaconReply::ExternalPingReply(_source)) => {
tracing::debug!("Received ping response from an external ip through the ipr");
return Some(ConnectionStatusEvent::Icmpv4IprExternalPingReply);
}
None => {}
}
match is_icmp_v6_beacon_reply(packet, icmp_beacon_identifier, our_ips.ipv6) {
Some(Icmpv6BeaconReply::TunDeviceReply) => {
tracing::debug!("Received ping v6 response from ipr tun device");
return Some(ConnectionStatusEvent::Icmpv6IprTunDevicePingReply);
}
Some(Icmpv6BeaconReply::ExternalPingReply(_source)) => {
tracing::debug!("Received ping v6 response from an external ip through the ipr");
return Some(ConnectionStatusEvent::Icmpv6IprExternalPingReply);
}
None => {}
}
None
}
pub fn is_icmp_beacon_reply(
packet: &Bytes,
identifier: u16,
destination: Ipv4Addr,
) -> Option<IcmpBeaconReply> {
if let Some((reply_identifier, reply_source, reply_destination)) = is_icmp_echo_reply(packet) {
if reply_identifier == identifier && reply_destination == destination {
if reply_source == NYM_TUN_DEVICE_ADDRESS_V4 {
return Some(IcmpBeaconReply::TunDeviceReply);
} else {
// For external replies, we check if the source is NOT the TUN device
// and NOT our own IP (since external hosts reply from their own IPs)
return Some(IcmpBeaconReply::ExternalPingReply(reply_source));
}
}
}
None
}
pub fn is_icmp_v6_beacon_reply(
packet: &Bytes,
identifier: u16,
destination: Ipv6Addr,
) -> Option<Icmpv6BeaconReply> {
if let Some((reply_identifier, reply_source, reply_destination)) = is_icmp_v6_echo_reply(packet)
{
if reply_identifier == identifier && reply_destination == destination {
if reply_source == NYM_TUN_DEVICE_ADDRESS_V6 {
return Some(Icmpv6BeaconReply::TunDeviceReply);
} else {
// For external replies, check if source is NOT the TUN device
return Some(Icmpv6BeaconReply::ExternalPingReply(reply_source));
}
}
}
None
}
@@ -0,0 +1,133 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use bytes::Bytes;
use futures::StreamExt;
use nym_ip_packet_requests::{codec::MultiIpPacketCodec, v8::response::ControlResponse};
use nym_sphinx::receiver::ReconstructedMessage;
use tokio_util::codec::FramedRead;
use tracing::{debug, error, info, warn};
use crate::{
ip_packet_client::current::{
request::{ControlRequest, IpPacketRequest, IpPacketRequestData},
response::{InfoLevel, IpPacketResponse, IpPacketResponseData},
},
ip_packet_client::helpers::check_ipr_message_version,
};
pub enum MixnetMessageOutcome {
IpPackets(Vec<Bytes>),
MixnetSelfPing,
Disconnect,
}
pub struct IprListener {}
#[derive(Debug, thiserror::Error)]
pub enum IprListenerError {
#[error(transparent)]
IprClientError(#[from] crate::Error),
}
impl From<super::error::Error> for IprListenerError {
fn from(err: super::error::Error) -> Self {
match err {
super::error::Error::SdkError(sdk_err) => IprListenerError::IprClientError(*sdk_err),
other => IprListenerError::IprClientError(crate::Error::new_unsupported(format!(
"IP packet error: {}",
other
))),
}
}
}
impl IprListener {
pub fn new() -> Self {
Self {}
}
fn is_mix_ping(&self, request: &IpPacketRequest) -> bool {
match request.data {
IpPacketRequestData::Control(ref control) => {
matches!(**control, ControlRequest::Ping(_))
}
_ => {
debug!("Received unexpected request: {request:?}");
false
}
}
}
pub async fn handle_reconstructed_message(
&mut self,
message: ReconstructedMessage,
) -> Result<Option<MixnetMessageOutcome>, IprListenerError> {
check_ipr_message_version(&message)?;
match IpPacketResponse::from_reconstructed_message(&message) {
Ok(response) => {
match response.data {
IpPacketResponseData::Data(data_response) => {
// Un-bundle the mixnet message and send the individual IP packets
// to the tun device
let framed_reader = FramedRead::new(
data_response.ip_packet.as_ref(),
MultiIpPacketCodec::new(),
);
let responses: Vec<Bytes> = framed_reader
.filter_map(|res| async { res.ok().map(|packet| packet.into_bytes()) })
.collect()
.await;
return Ok(Some(MixnetMessageOutcome::IpPackets(responses)));
}
IpPacketResponseData::Control(control_response) => match *control_response {
ControlResponse::Connect(_) => {
info!("Received connect response when already connected - ignoring");
}
ControlResponse::Disconnect(_) => {
info!("Received disconnect response");
return Ok(Some(MixnetMessageOutcome::Disconnect));
}
ControlResponse::UnrequestedDisconnect(_) => {
info!("Received unrequested disconnect response, ignoring for now");
}
ControlResponse::Pong(_) => {
info!("Received pong response, ignoring for now");
}
ControlResponse::Health(_) => {
info!("Received health response, ignoring for now");
}
ControlResponse::Info(info) => {
let msg =
format!("Received info response from the mixnet: {}", info.reply);
match info.level {
InfoLevel::Info => info!("{msg}"),
InfoLevel::Warn => warn!("{msg}"),
InfoLevel::Error => error!("{msg}"),
}
}
},
}
}
Err(err) => {
// The exception to when we are not expecting a response, is when we
// are sending a ping to ourselves.
if let Ok(request) = IpPacketRequest::from_reconstructed_message(&message) {
if self.is_mix_ping(&request) {
return Ok(Some(MixnetMessageOutcome::MixnetSelfPing));
}
} else {
warn!("Failed to deserialize reconstructed message: {err}");
}
}
}
Ok(None)
}
}
impl Default for IprListener {
fn default() -> Self {
Self::new()
}
}
+15
View File
@@ -0,0 +1,15 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
//! High-level IPR (IP Packet Router) stream wrapper.
//!
//! [`IpMixStream`] tunnels IP packets through the Nym mixnet to an exit
//! gateway running an IP Packet Router. Both requests and responses are
//! wrapped in LP Stream frames for type-safe detection at the IPR and
//! dispatch by the client's stream router.
mod ip_mix_stream;
pub mod network_env;
pub use ip_mix_stream::{ConnectionState, IpMixStream, IprWithPerformance};
pub use network_env::NetworkEnvironment;
@@ -0,0 +1,79 @@
# IpMixStream Architecture
## Overview
`IpMixStream` tunnels IP packets through the Nym mixnet to an IP Packet Router
(IPR) exit gateway. It provides a high-level API over a single `MixnetStream`,
which handles LP Stream framing and Sphinx packet transport automatically.
## Data Flow
```text
Client IPR (exit gateway)
------ ------------------
IpMixStream.send_ip_packet(bytes)
IpPacketRequest.to_bytes()
MixnetStream.write()
LP Stream frame
Sphinx packets
mixnet ──────────────────> on_reconstructed_message()
detect LpFrameKind::Stream
strip LP header
parse IpPacketRequest
write IP packet to TUN
──> internet
internet response arrives on TUN
ConnectedClientHandler
wrap in IpPacketResponse
wrap in LP Stream frame
mixnet <────────────────── send via Sphinx/SURBs
stream router dispatches
by stream_id
MixnetStream.read()
IprListener parses response
IpMixStream.handle_incoming()
returns Vec<ip_packet_bytes>
```
## Layer Stack
```text
IpMixStream IPR protocol (connect, data, disconnect)
MixnetStream AsyncRead + AsyncWrite, LP Stream framing, seq numbers
Stream Router Dispatches inbound messages by stream_id
MixnetClient Sphinx packet encryption, SURB management
Mixnet Entry GW -> Mix1 -> Mix2 -> Mix3 -> Exit GW
```
## LP Stream Framing
All messages between client and IPR are wrapped in LP Stream frames:
- **Client -> IPR**: `MixnetStream.write()` wraps each write in an LP Stream
Data frame (stream_id, sequence number, payload). The IPR detects
`LpFrameKind::Stream` and strips the header before processing.
- **IPR -> Client**: Both inline responses (connect handshake, pong) and async
TUN responses are wrapped in LP Stream frames with the same stream_id. The
client's stream router dispatches by stream_id to the correct `MixnetStream`.
## Connection Lifecycle
1. `IpMixStream::new(env)` -- discover IPR, connect MixnetClient, open MixnetStream
2. `connect_tunnel()` -- send connect request, receive allocated IPs
3. `send_ip_packet()` / `handle_incoming()` -- steady-state data transfer
4. `disconnect_stream()` -- tear down MixnetClient
## Key Design Decisions
- **MixnetStream over MixnetClient**: One stream per IPR tunnel. LP framing is
handled by MixnetStream internally, no manual frame construction needed.
- **Multiplexing at IP layer**: Different remote hosts are addressed by IP
packet destination headers, not by opening multiple streams.
- **stream_id threading**: The IPR stores stream_id in each client's
`ConnectedClientHandler` so async TUN responses are wrapped in matching LP
Stream frames for correct dispatch at the client.
@@ -0,0 +1,507 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use super::network_env::NetworkEnvironment;
use crate::ip_packet_client::{
helpers::check_ipr_message_version, IprListener, MixnetMessageOutcome,
};
use crate::mixnet::{MixnetClient, MixnetStream, Recipient};
use crate::Error;
use bytes::Bytes;
use nym_crypto::asymmetric::ed25519;
use nym_ip_packet_requests::{
v8::{
request::IpPacketRequest,
response::{ConnectResponseReply, ControlResponse, IpPacketResponse, IpPacketResponseData},
},
IpPair,
};
use nym_network_defaults::ApiUrl;
use nym_sphinx::receiver::ReconstructedMessage;
use nym_validator_client::nym_api::NymApiClientExt;
use std::collections::HashMap;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::{debug, error, info};
const IPR_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
/// Maximum size for a single IPR response read from the stream.
/// IPR responses fit within one Sphinx packet payload (~1.8 KB) so 64 KB
/// provides ample headroom.
const READ_BUF_SIZE: usize = 64 * 1024;
#[derive(Clone)]
pub struct IprWithPerformance {
pub(crate) address: Recipient,
pub(crate) identity: ed25519::PublicKey,
pub(crate) performance: u8,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ConnectionState {
Disconnected,
Connecting,
Connected,
}
#[allow(clippy::result_large_err)]
fn create_nym_api_client(nym_api_urls: Vec<ApiUrl>) -> Result<nym_http_api_client::Client, Error> {
let user_agent = format!("nym-sdk/{}", env!("CARGO_PKG_VERSION"));
let urls = nym_api_urls
.into_iter()
.map(|url| url.url.parse())
.collect::<Result<Vec<nym_http_api_client::Url>, _>>()
.map_err(|err| {
error!("malformed nym-api url: {err}");
Error::NoNymAPIUrl
})?;
if urls.is_empty() {
return Err(Error::NoNymAPIUrl);
}
let client = nym_http_api_client::ClientBuilder::new_with_urls(urls)?
.with_user_agent(user_agent)
.build()?;
Ok(client)
}
async fn retrieve_exit_nodes_with_performance(
client: nym_http_api_client::Client,
) -> Result<Vec<IprWithPerformance>, Error> {
let all_nodes = client
.get_all_described_nodes()
.await?
.into_iter()
.map(|described| (described.ed25519_identity_key(), described))
.collect::<HashMap<_, _>>();
let exit_gateways = client.get_all_basic_nodes_with_metadata().await?.nodes;
let mut described = Vec::new();
for exit in exit_gateways {
if let Some(ipr_info) = all_nodes
.get(&exit.ed25519_identity_pubkey)
.and_then(|n| n.description.ip_packet_router.clone())
{
if let Ok(parsed_address) = ipr_info.address.parse() {
described.push(IprWithPerformance {
address: parsed_address,
identity: exit.ed25519_identity_pubkey,
performance: exit.performance.round_to_integer(),
})
}
}
}
Ok(described)
}
async fn get_random_ipr(client: nym_http_api_client::Client) -> Result<Recipient, Error> {
let nodes = retrieve_exit_nodes_with_performance(client).await?;
info!("Found {} Exit Gateways", nodes.len());
let selected_gateway = nodes
.into_iter()
.max_by_key(|gw| gw.performance)
.ok_or_else(|| Error::NoGatewayAvailable)?;
let ipr_address = selected_gateway.address;
info!(
"Using IPR: {} (Gateway: {}, Performance: {:?})",
ipr_address, selected_gateway.identity, selected_gateway.performance
);
Ok(ipr_address)
}
/// A bidirectional tunnel for sending and receiving IP packets through the mixnet.
///
/// Wraps a [`MixnetStream`] (opened to an IPR exit gateway) and provides a
/// high-level API for the IPR protocol. The underlying `MixnetStream` handles
/// LP Stream framing and stream multiplexing automatically.
///
/// # Data flow
///
/// ```text
/// IpMixStream.send_ip_packet(bytes)
/// → IpPacketRequest::to_bytes() → MixnetStream.write()
/// → LP Stream frame (stream_id, seq, Data)
/// → Sphinx packets → mixnet → IPR
///
/// IPR processes request → TUN → internet → response
/// → IPR wraps in LP Stream frame → Sphinx → mixnet → client
/// → stream router dispatches by stream_id
/// → MixnetStream.read() → IpPacketResponse bytes
/// → IprListener → extract IP packets
/// ```
pub struct IpMixStream {
/// The underlying multiplexed stream to the IPR gateway.
stream: MixnetStream,
/// Kept for `nym_address()` and `disconnect()`.
client: MixnetClient,
/// Parses incoming IPR protocol responses.
listener: IprListener,
read_buf: Vec<u8>,
allocated_ips: Option<IpPair>,
connection_state: ConnectionState,
}
impl IpMixStream {
/// Create a new IP packet router stream connected to the mixnet.
///
/// Discovers an IPR gateway, connects a MixnetClient, and opens a
/// `MixnetStream` to the IPR. Call [`connect_tunnel`](Self::connect_tunnel)
/// to establish the IP tunnel.
pub async fn new(env: NetworkEnvironment) -> Result<Self, Error> {
let network_defaults = env.network_defaults();
let api_client = create_nym_api_client(network_defaults.nym_api_urls.unwrap_or_default())?;
let ipr_address = get_random_ipr(api_client).await?;
nym_network_defaults::setup_env(Some(env.env_file_path()));
let mut client = MixnetClient::connect_new().await?;
// Open a stream to the IPR — this sends the LP Stream Open handshake
// and starts the background stream router.
let stream = client.open_stream(ipr_address, Some(10)).await?;
Ok(Self {
stream,
client,
listener: IprListener::new(),
read_buf: vec![0u8; READ_BUF_SIZE],
allocated_ips: None,
connection_state: ConnectionState::Disconnected,
})
}
/// Get the Nym network address of this stream.
pub fn nym_address(&self) -> &Recipient {
self.client.nym_address()
}
/// Establish tunnel connection with the IPR and allocate IP addresses.
pub async fn connect_tunnel(&mut self) -> Result<IpPair, Error> {
if self.connection_state != ConnectionState::Disconnected {
return Err(Error::IprStreamClientAlreadyConnectedOrConnecting);
}
self.connection_state = ConnectionState::Connecting;
info!("Connecting to IP packet router");
match self.connect_inner().await {
Ok(ip_pair) => {
self.allocated_ips = Some(ip_pair);
self.connection_state = ConnectionState::Connected;
info!(
"Connected to IPv4: {}, IPv6: {}",
ip_pair.ipv4, ip_pair.ipv6
);
Ok(ip_pair)
}
Err(e) => {
self.connection_state = ConnectionState::Disconnected;
error!("Failed to connect: {:?}", e);
Err(e)
}
}
}
async fn connect_inner(&mut self) -> Result<IpPair, Error> {
let (request, request_id) = IpPacketRequest::new_connect_request(None);
debug!("Sending connect request with ID: {}", request_id);
let request_bytes = request.to_bytes()?;
self.stream
.write_all(&request_bytes)
.await
.map_err(|_| Error::MessageSendingFailure)?;
self.listen_for_connect_response(request_id).await
}
async fn listen_for_connect_response(&mut self, request_id: u64) -> Result<IpPair, Error> {
let timeout = tokio::time::sleep(IPR_CONNECT_TIMEOUT);
tokio::pin!(timeout);
loop {
tokio::select! {
_ = &mut timeout => {
return Err(Error::IPRConnectResponseTimeout);
}
result = self.stream.read(&mut self.read_buf) => {
match result {
Ok(0) => return Err(Error::IPRClientStreamClosed),
Ok(n) => {
let msg = ReconstructedMessage {
message: self.read_buf[..n].to_vec(),
sender_tag: None,
};
if let Err(e) = check_ipr_message_version(&msg) {
return Err(Error::IPRMessageVersionCheckFailed(e.to_string()));
}
if let Ok(response) = IpPacketResponse::from_reconstructed_message(&msg) {
if response.id() == Some(request_id) {
return self.handle_connect_response(response);
}
}
}
Err(_) => return Err(Error::IPRClientStreamClosed),
}
}
}
}
}
fn handle_connect_response(&self, response: IpPacketResponse) -> Result<IpPair, Error> {
let control_response = match response.data {
IpPacketResponseData::Control(c) => c,
other => return Err(Error::UnexpectedResponseType(other)),
};
match *control_response {
ControlResponse::Connect(connect_resp) => match connect_resp.reply {
ConnectResponseReply::Success(success) => Ok(success.ips),
ConnectResponseReply::Failure(reason) => Err(Error::ConnectDenied(reason)),
},
_ => Err(Error::UnexpectedResponseType(
IpPacketResponseData::Control(control_response.clone()),
)),
}
}
/// Send an IP packet through the tunnel.
pub async fn send_ip_packet(&mut self, packet: &[u8]) -> Result<(), Error> {
if self.connection_state != ConnectionState::Connected {
return Err(Error::IprStreamClientNotConnected);
}
let request = IpPacketRequest::new_data_request(packet.to_vec().into());
let request_bytes = request.to_bytes()?;
self.stream
.write_all(&request_bytes)
.await
.map_err(|_| Error::MessageSendingFailure)
}
/// Handle incoming messages from the mixnet.
///
/// Reads from the underlying `MixnetStream`, parses IPR responses, and
/// extracts IP packets. Returns an empty vec on timeout (10 s).
pub async fn handle_incoming(&mut self) -> Result<Vec<Bytes>, Error> {
match tokio::time::timeout(
Duration::from_secs(10),
self.stream.read(&mut self.read_buf),
)
.await
{
// Timeout — no data yet, not an error
Err(_) => Ok(Vec::new()),
// EOF — stream router shut down, channel dead
Ok(Ok(0)) => {
self.connection_state = ConnectionState::Disconnected;
Err(Error::IPRClientStreamClosed)
}
// IO error
Ok(Err(_)) => {
self.connection_state = ConnectionState::Disconnected;
Err(Error::IPRClientStreamClosed)
}
Ok(Ok(n)) => {
let msg = ReconstructedMessage {
message: self.read_buf[..n].to_vec(),
sender_tag: None,
};
match self.listener.handle_reconstructed_message(msg).await {
Ok(Some(MixnetMessageOutcome::IpPackets(packets))) => {
debug!("Extracted {} IP packets", packets.len());
Ok(packets)
}
Ok(Some(MixnetMessageOutcome::Disconnect)) => {
info!("Received disconnect");
self.connection_state = ConnectionState::Disconnected;
self.allocated_ips = None;
Ok(Vec::new())
}
Ok(Some(MixnetMessageOutcome::MixnetSelfPing)) => {
debug!("Received mixnet self ping");
Ok(Vec::new())
}
Ok(None) => Ok(Vec::new()),
Err(e) => {
error!("Failed to handle message: {}", e);
Ok(Vec::new())
}
}
}
}
}
pub fn allocated_ips(&self) -> Option<&IpPair> {
self.allocated_ips.as_ref()
}
pub fn is_connected(&self) -> bool {
self.connection_state == ConnectionState::Connected
}
/// Disconnect from the Mixnet. Disconnected clients cannot be reconnected.
pub async fn disconnect_stream(self) {
debug!("Disconnecting");
self.client.disconnect().await;
debug!("Disconnected");
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ip_packet_client::helpers::{
icmp_identifier, is_icmp_echo_reply, is_icmp_v6_echo_reply,
};
use std::net::{Ipv4Addr, Ipv6Addr};
#[tokio::test]
#[ignore]
async fn connect_to_ipr() -> Result<(), Box<dyn std::error::Error>> {
let mut stream = IpMixStream::new(NetworkEnvironment::Mainnet).await?;
let ip_pair = stream.connect_tunnel().await?;
let ipv4: Ipv4Addr = ip_pair.ipv4;
assert!(!ipv4.is_unspecified(), "IPv4 address should not be 0.0.0.0");
let ipv6: Ipv6Addr = ip_pair.ipv6;
assert!(!ipv6.is_unspecified(), "IPv6 address should not be ::");
assert!(stream.is_connected(), "Stream should be connected");
assert!(
stream.allocated_ips().is_some(),
"Should have allocated IPs"
);
stream.disconnect_stream().await;
Ok(())
}
#[tokio::test]
#[ignore]
async fn dns_ping_checks() -> Result<(), Box<dyn std::error::Error>> {
use crate::ip_packet_client::helpers::{
create_icmpv4_echo_request, create_icmpv6_echo_request, wrap_icmp_in_ipv4,
wrap_icmp_in_ipv6,
};
use nym_ip_packet_requests::codec::MultiIpPacketCodec;
use pnet_packet::Packet;
let mut stream = IpMixStream::new(NetworkEnvironment::Mainnet).await?;
let ip_pair = stream.connect_tunnel().await?;
info!(
"Connected with IPs - IPv4: {}, IPv6: {}",
ip_pair.ipv4, ip_pair.ipv6
);
let external_v4_targets = vec![
("Google DNS", Ipv4Addr::new(8, 8, 8, 8)),
("Cloudflare DNS", Ipv4Addr::new(1, 1, 1, 1)),
("Quad9 DNS", Ipv4Addr::new(9, 9, 9, 9)),
];
let external_v6_targets = vec![
("Google DNS", "2001:4860:4860::8888".parse::<Ipv6Addr>()?),
(
"Cloudflare DNS",
"2606:4700:4700::1111".parse::<Ipv6Addr>()?,
),
("Quad9 DNS", "2620:fe::fe".parse::<Ipv6Addr>()?),
];
let identifier = icmp_identifier();
let mut successful_v4_pings = 0;
let mut total_v4_pings = 0;
let mut successful_v6_pings = 0;
let mut total_v6_pings = 0;
for (name, target) in &external_v4_targets {
info!("Testing IPv4 connectivity to {} ({})", name, target);
for seq in 0..3 {
let icmp = create_icmpv4_echo_request(seq, identifier)?;
let ipv4_packet = wrap_icmp_in_ipv4(icmp, ip_pair.ipv4, *target)?;
let bundled =
MultiIpPacketCodec::bundle_one_packet(ipv4_packet.packet().to_vec().into());
stream.send_ip_packet(&bundled).await?;
total_v4_pings += 1;
}
}
for (name, target) in &external_v6_targets {
info!("Testing IPv6 connectivity to {} ({})", name, target);
for seq in 0..3 {
let icmp = create_icmpv6_echo_request(seq, identifier, &ip_pair.ipv6, target)?;
let ipv6_packet = wrap_icmp_in_ipv6(icmp, ip_pair.ipv6, *target)?;
let bundled =
MultiIpPacketCodec::bundle_one_packet(ipv6_packet.packet().to_vec().into());
stream.send_ip_packet(&bundled).await?;
total_v6_pings += 1;
}
}
let collect_timeout = tokio::time::sleep(Duration::from_secs(10));
tokio::pin!(collect_timeout);
loop {
tokio::select! {
_ = &mut collect_timeout => {
info!("Finished collecting replies");
break;
}
result = stream.handle_incoming() => {
if let Ok(packets) = result {
for packet in packets {
if let Some((reply_id, _source, dest)) = is_icmp_echo_reply(&packet) {
if reply_id == identifier && dest == ip_pair.ipv4 {
successful_v4_pings += 1;
}
}
if let Some((reply_id, _source, dest)) = is_icmp_v6_echo_reply(&packet) {
if reply_id == identifier && dest == ip_pair.ipv6 {
successful_v6_pings += 1;
}
}
}
}
}
}
}
let v4_success_rate = (successful_v4_pings as f64 / total_v4_pings as f64) * 100.0;
let v6_success_rate = (successful_v6_pings as f64 / total_v6_pings as f64) * 100.0;
info!(
"IPv4: {}/{} ({:.1}%), IPv6: {}/{} ({:.1}%)",
successful_v4_pings,
total_v4_pings,
v4_success_rate,
successful_v6_pings,
total_v6_pings,
v6_success_rate
);
assert!(successful_v4_pings > 0, "No IPv4 pings successful");
assert!(v4_success_rate >= 75.0, "IPv4 success rate < 75%");
assert!(successful_v6_pings > 0, "No IPv6 pings successful");
assert!(v6_success_rate >= 75.0, "IPv6 success rate < 75%");
stream.disconnect_stream().await;
Ok(())
}
}
@@ -0,0 +1,55 @@
use std::fs;
use std::path::PathBuf;
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
pub enum NetworkEnvironment {
#[default]
Mainnet,
// Sandbox,
}
fn find_workspace_root() -> PathBuf {
let mut current = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
loop {
let cargo_toml = current.join("Cargo.toml");
if cargo_toml.exists() {
if let Ok(contents) = fs::read_to_string(&cargo_toml) {
// Check if this Cargo.toml defines a workspace
if contents.contains("[workspace]") {
return current;
}
}
}
if !current.pop() {
panic!("Could not find workspace root");
}
}
}
impl NetworkEnvironment {
pub fn env_file_path(&self) -> PathBuf {
let root = find_workspace_root();
match self {
Self::Mainnet => root.join("envs/mainnet.env"),
// Self::Sandbox => root.join("envs/sandbox.env"),
}
}
pub fn network_defaults(&self) -> crate::NymNetworkDetails {
match self {
Self::Mainnet => crate::NymNetworkDetails::new_mainnet(),
// Self::Sandbox => crate::NymNetworkDetails::new_sandbox(), // TODO
}
}
pub fn parse_network(s: &str) -> Result<Self, String> {
match s.to_lowercase().as_str() {
"mainnet" | "main" => Ok(Self::Mainnet),
// "sandbox" | "sand" => Ok(Self::Sandbox),
_ => Err(format!("Unknown env: {}", s)),
}
}
}
+4 -1
View File
@@ -1,13 +1,16 @@
//! Rust SDK for the Nym platform
//!
//! The main component currently is [`mixnet`].
//! [`tcp_proxy`] is probably a good place to start for anyone wanting to integrate with existing app code and read/write from a socket.
//! [`client_pool`] is a configurable client pool.
//! [`tcp_proxy`] is a soon to be deprecated wrapper around the mixnet client which exposes a localhost port.
//! [`ipr_wrapper`] tunnels IP packets through the mixnet to an IPR exit gateway.
mod error;
pub mod bandwidth;
pub mod client_pool;
pub mod ip_packet_client;
pub mod ipr_wrapper;
pub mod mixnet;
pub mod tcp_proxy;
@@ -16,7 +16,9 @@ use nym_sphinx::params::PacketType;
use nym_task::connections::TransmissionLane;
use tokio_util::sync::PollSender;
use super::protocol::{encode_stream_message, StreamId, StreamMessageType};
use nym_lp::packet::frame::StreamMsgType;
use super::protocol::{encode_stream_message, StreamId};
use super::StreamMap;
/// How to address outbound messages on this stream.
@@ -45,6 +47,7 @@ pub struct MixnetStream {
inbound_rx: mpsc::UnboundedReceiver<Vec<u8>>,
read_buf: BytesMut,
deregistered: bool,
next_seq: u32,
}
impl MixnetStream {
@@ -71,6 +74,7 @@ impl MixnetStream {
inbound_rx,
read_buf: BytesMut::new(),
deregistered: false,
next_seq: 0,
}
}
@@ -98,6 +102,7 @@ impl MixnetStream {
inbound_rx,
read_buf,
deregistered: false,
next_seq: 0,
}
}
@@ -177,7 +182,9 @@ impl AsyncWrite for MixnetStream {
ready!(self.sender.poll_ready_unpin(cx))
.map_err(|_| std::io::Error::other("mixnet input channel closed"))?;
let wire = encode_stream_message(&self.id, StreamMessageType::Data, buf);
let seq = self.next_seq;
self.next_seq = self.next_seq.wrapping_add(1);
let wire = encode_stream_message(&self.id, StreamMsgType::Data, seq, buf);
let msg = self.make_input_message(wire);
self.sender
+9 -8
View File
@@ -9,7 +9,7 @@
//! stream's channel (or to the listener for `Open` messages).
mod mixnet_stream;
mod protocol;
pub(crate) mod protocol;
pub use mixnet_stream::MixnetStream;
pub use protocol::StreamId;
@@ -32,7 +32,8 @@ use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_task::connections::TransmissionLane;
use protocol::{decode_stream_message, encode_stream_message, StreamMessageType};
use nym_lp::packet::frame::StreamMsgType;
use protocol::{decode_stream_message, encode_stream_message};
use crate::mixnet::native_client::MixnetClient;
use crate::{Error, Result};
@@ -230,20 +231,20 @@ async fn run_router(
continue;
};
let stream_id = frame.header.stream_id;
match frame.header.message_type {
StreamMessageType::Open => {
let stream_id = frame.stream_id;
match frame.msg_type {
StreamMsgType::Open => {
let _ = listener_tx.send(InboundOpen {
stream_id,
sender_tag: msg.sender_tag,
initial_data: frame.data.to_vec(),
});
}
StreamMessageType::Data => {
StreamMsgType::Data => {
streams
.send_to_stream(&stream_id, frame.data.to_vec())
.await;
} // TODO: if we decide we need close logic add another enum member
}
}
}
}
@@ -295,7 +296,7 @@ pub(crate) async fn open_stream(
let rx = streams.register_stream(stream_id).await;
// Send Open to the peer
let wire = encode_stream_message(&stream_id, StreamMessageType::Open, &[]);
let wire = encode_stream_message(&stream_id, StreamMsgType::Open, 0, &[]);
let msg = InputMessage::new_anonymous(
recipient,
wire,
+95 -102
View File
@@ -1,24 +1,22 @@
//! Wire protocol for stream multiplexing.
//!
//! Every message between streams carries a fixed header prepended to
//! Every message between streams carries an LP frame header prepended to
//! the payload inside the mixnet message body:
//!
//! ```text
//! [Version: 1 byte][StreamId: 8 bytes][MessageType: 1 byte][payload: N bytes]
//! [LpFrameKind: 2 bytes LE][StreamFrameAttributes: 14 bytes][payload: N bytes]
//! ```
//!
//! This header sits inside the sphinx packet payload.
//! The `StreamFrameAttributes` encode stream_id, message type, and sequence
//! number inside the LP header's `frame_attributes` field. This is the same
//! LP frame format used across the system (IPR detection, gateway dispatch).
use std::fmt;
/// Current stream protocol version.
pub const STREAM_PROTOCOL_VERSION: u8 = 1;
/// Length of a StreamId in bytes (u64, big-endian).
pub const STREAM_ID_LEN: usize = 8;
/// Total header length: Version (1) + StreamId (8) + MessageType (1).
pub const STREAM_HEADER_LEN: usize = 1 + STREAM_ID_LEN + 1;
use bytes::BytesMut;
use nym_lp::packet::frame::{
LpFrame, LpFrameHeader, LpFrameKind, StreamFrameAttributes, StreamMsgType,
};
/// Identifies a stream within a MixnetClient.
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
@@ -29,12 +27,12 @@ impl StreamId {
Self(rand::random::<u64>())
}
pub fn to_bytes(self) -> [u8; STREAM_ID_LEN] {
self.0.to_be_bytes()
pub fn as_u64(self) -> u64 {
self.0
}
pub fn from_bytes(bytes: [u8; STREAM_ID_LEN]) -> Self {
Self(u64::from_be_bytes(bytes))
pub fn from_u64(v: u64) -> Self {
Self(v)
}
}
@@ -50,88 +48,54 @@ impl fmt::Display for StreamId {
}
}
/// Message types within the stream protocol.
///
/// Note: there is no Close variant. Without message sequencing, a close
/// message races ahead of in-flight data and arrives before the data is
/// reconstructed. Streams clean up locally via Drop. If ordered close/EOF
/// is needed in future, add sequencing + reorder buffering (see the
/// tcp_proxy's `MessageBuffer` for a working example).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum StreamMessageType {
/// Open a new stream. Payload is optional initial data.
Open = 0,
/// Data on an existing stream.
Data = 1,
}
impl StreamMessageType {
pub fn from_byte(b: u8) -> Option<Self> {
match b {
0 => Some(Self::Open),
1 => Some(Self::Data),
_ => None,
}
}
}
/// The fixed-size header prepended to every stream message.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MixStreamHeader {
pub version: u8,
pub stream_id: StreamId,
pub message_type: StreamMessageType,
}
/// A decoded stream frame: header + payload reference.
/// A decoded stream frame: LP header fields + payload reference.
#[derive(Debug)]
pub struct MixStreamFrame<'a> {
pub header: MixStreamHeader,
pub struct StreamFrame<'a> {
pub stream_id: StreamId,
pub msg_type: StreamMsgType,
pub sequence_num: u32,
pub data: &'a [u8],
}
/// Encode a stream message: `[version][stream_id][msg_type][payload]`.
/// Encode a stream message as an LP frame: `[LpFrameHeader][payload]`.
pub fn encode_stream_message(
id: &StreamId,
msg_type: StreamMessageType,
msg_type: StreamMsgType,
sequence_num: u32,
payload: &[u8],
) -> Vec<u8> {
let mut buf = Vec::with_capacity(STREAM_HEADER_LEN + payload.len());
buf.push(STREAM_PROTOCOL_VERSION);
buf.extend_from_slice(&id.to_bytes());
buf.push(msg_type as u8);
buf.extend_from_slice(payload);
buf
let attrs = StreamFrameAttributes {
stream_id: id.as_u64(),
msg_type,
sequence_num,
};
let frame = LpFrame::new_stream(attrs, payload.to_vec());
let mut buf = BytesMut::with_capacity(LpFrameHeader::SIZE + payload.len());
frame.encode(&mut buf);
buf.to_vec()
}
/// Decode a stream message into a [`MixStreamFrame`].
/// Decode a stream message from LP frame bytes.
///
/// Returns `None` if the buffer is too short, the version is unknown,
/// or the message type byte is invalid.
pub fn decode_stream_message(bytes: &[u8]) -> Option<MixStreamFrame<'_>> {
if bytes.len() < STREAM_HEADER_LEN {
/// Returns `None` if the buffer is too short, the frame kind is not `Stream`,
/// or the stream attributes are invalid.
pub fn decode_stream_message(bytes: &[u8]) -> Option<StreamFrame<'_>> {
if bytes.len() < LpFrameHeader::SIZE {
return None;
}
let version = bytes[0];
if version != STREAM_PROTOCOL_VERSION {
let header = LpFrameHeader::parse(bytes).ok()?;
if header.kind != LpFrameKind::Stream {
return None;
}
let mut id_bytes = [0u8; STREAM_ID_LEN];
id_bytes.copy_from_slice(&bytes[1..1 + STREAM_ID_LEN]);
let stream_id = StreamId::from_bytes(id_bytes);
let attrs = StreamFrameAttributes::parse(&header.frame_attributes).ok()?;
let data = &bytes[LpFrameHeader::SIZE..];
let message_type = StreamMessageType::from_byte(bytes[1 + STREAM_ID_LEN])?;
let data = &bytes[STREAM_HEADER_LEN..];
Some(MixStreamFrame {
header: MixStreamHeader {
version,
stream_id,
message_type,
},
Some(StreamFrame {
stream_id: StreamId::from_u64(attrs.stream_id),
msg_type: attrs.msg_type,
sequence_num: attrs.sequence_num,
data,
})
}
@@ -144,11 +108,11 @@ mod tests {
fn roundtrip() {
let id = StreamId::random();
let payload = b"hello world";
let encoded = encode_stream_message(&id, StreamMessageType::Data, payload);
let encoded = encode_stream_message(&id, StreamMsgType::Data, 42, payload);
let frame = decode_stream_message(&encoded).unwrap();
assert_eq!(frame.header.version, STREAM_PROTOCOL_VERSION);
assert_eq!(frame.header.stream_id, id);
assert_eq!(frame.header.message_type, StreamMessageType::Data);
assert_eq!(frame.stream_id, id);
assert_eq!(frame.msg_type, StreamMsgType::Data);
assert_eq!(frame.sequence_num, 42);
assert_eq!(frame.data, payload);
}
@@ -158,41 +122,70 @@ mod tests {
}
#[test]
fn bad_version() {
let id = StreamId::random();
let mut encoded = encode_stream_message(&id, StreamMessageType::Data, b"x");
encoded[0] = 0xFF;
assert!(decode_stream_message(&encoded).is_none());
fn wrong_frame_kind() {
// Opaque frame kind (0x00, 0x00) should not parse as stream
let mut buf = vec![0u8; LpFrameHeader::SIZE + 1];
buf[LpFrameHeader::SIZE] = 0xAA;
assert!(decode_stream_message(&buf).is_none());
}
#[test]
fn bad_message_type() {
let mut buf = [0u8; STREAM_HEADER_LEN];
buf[0] = STREAM_PROTOCOL_VERSION;
buf[1 + STREAM_ID_LEN] = 0xFF;
assert!(decode_stream_message(&buf).is_none());
fn bad_msg_type() {
let id = StreamId::random();
let mut encoded = encode_stream_message(&id, StreamMsgType::Data, 0, b"x");
// msg_type is at byte offset 2 + 8 = 10 (inside frame_attributes)
encoded[10] = 0xFF;
assert!(decode_stream_message(&encoded).is_none());
}
#[test]
fn empty_payload() {
let id = StreamId::random();
let encoded = encode_stream_message(&id, StreamMessageType::Open, &[]);
let encoded = encode_stream_message(&id, StreamMsgType::Open, 0, &[]);
let frame = decode_stream_message(&encoded).unwrap();
assert_eq!(frame.header.message_type, StreamMessageType::Open);
assert_eq!(frame.msg_type, StreamMsgType::Open);
assert_eq!(frame.sequence_num, 0);
assert!(frame.data.is_empty());
}
#[test]
fn header_wire_format() {
let id = StreamId::from_bytes([0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77]);
let encoded = encode_stream_message(&id, StreamMessageType::Open, &[0xAA]);
assert_eq!(encoded.len(), STREAM_HEADER_LEN + 1);
assert_eq!(encoded[0], STREAM_PROTOCOL_VERSION);
let id = StreamId::from_u64(0x0011223344556677);
let encoded = encode_stream_message(&id, StreamMsgType::Open, 1, &[0xAA]);
// LpFrameHeader::SIZE (16) + 1 byte payload
assert_eq!(encoded.len(), LpFrameHeader::SIZE + 1);
// First 2 bytes: LpFrameKind::Stream = 3, LE
assert_eq!(encoded[0], 0x03);
assert_eq!(encoded[1], 0x00);
// Bytes 2..10: stream_id BE
assert_eq!(
&encoded[1..9],
&encoded[2..10],
&[0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77]
);
assert_eq!(encoded[9], StreamMessageType::Open as u8);
assert_eq!(encoded[10], 0xAA);
// Byte 10: msg_type = Open = 0
assert_eq!(encoded[10], StreamMsgType::Open as u8);
// Bytes 11..15: sequence_num = 1, BE
assert_eq!(&encoded[11..15], &[0x00, 0x00, 0x00, 0x01]);
// Byte 15: reserved = 0
assert_eq!(encoded[15], 0x00);
// Byte 16: payload
assert_eq!(encoded[16], 0xAA);
}
#[test]
fn sequence_num_roundtrip() {
let id = StreamId::random();
for seq in [0, 1, 255, 65535, u32::MAX] {
let encoded = encode_stream_message(&id, StreamMsgType::Data, seq, b"test");
let frame = decode_stream_message(&encoded).unwrap();
assert_eq!(frame.sequence_num, seq);
}
}
}
@@ -26,6 +26,7 @@ nym-crypto = { workspace = true }
nym-exit-policy = { workspace = true }
nym-id = { workspace = true }
nym-ip-packet-requests = { workspace = true }
nym-lp = { workspace = true }
nym-kcp = { path = "../../common/nym-kcp" } # TODO MAX add to workspace dependencies
nym-network-defaults = { workspace = true }
nym-network-requester = { path = "../network-requester" }
@@ -1,6 +1,7 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use bytes::{Bytes, BytesMut};
@@ -10,6 +11,7 @@ use nym_ip_packet_requests::{
v7::response::IpPacketResponse as IpPacketResponseV7,
v8::response::IpPacketResponse as IpPacketResponseV8,
};
use nym_lp::packet::frame::{LpFrame, LpFrameHeader, StreamFrameAttributes, StreamMsgType};
use nym_sdk::mixnet::{
InputMessage, MixnetClientSender, MixnetMessageSender, MixnetMessageSinkTranslator,
};
@@ -64,6 +66,7 @@ impl ConnectedClientHandler {
buffer_timeout: Duration,
client_version: ClientVersion,
mixnet_client_sender: MixnetClientSender,
stream_id: Option<u64>,
) -> (
mpsc::UnboundedSender<Vec<u8>>,
oneshot::Sender<()>,
@@ -71,6 +74,9 @@ impl ConnectedClientHandler {
) {
log::debug!("Starting connected client handler for: {client_id}");
log::debug!("client version: {client_version:?}");
if let Some(sid) = stream_id {
log::debug!("LP Stream mode: stream_id={sid:#018x}");
}
let (close_tx, close_rx) = oneshot::channel();
let (forward_from_tun_tx, forward_from_tun_rx) = mpsc::unbounded_channel();
@@ -86,6 +92,8 @@ impl ConnectedClientHandler {
let input_message_creator = ToIprDataResponse {
send_to: client_id.clone(),
client_version,
stream_id,
next_response_seq: AtomicU32::new(0),
};
let connected_client_handler = ConnectedClientHandler {
@@ -192,12 +200,29 @@ fn create_ip_packet_response(
}
}
// This struct is used by the sink to translate the the bundled IP packets into a IPR packet
// responses that can be sent to the mixnet.
#[derive(Clone, Debug)]
// This struct is used by the sink to translate the bundled IP packets into IPR packet
// responses that can be sent to the mixnet. When `stream_id` is set, responses are
// wrapped in LP Stream frames so the client's stream router can dispatch them.
#[derive(Debug)]
struct ToIprDataResponse {
send_to: ConnectedClientId,
client_version: ClientVersion,
/// When Some, wrap responses in LP Stream frames with this stream_id.
stream_id: Option<u64>,
/// Sequence number for LP Stream response frames.
next_response_seq: AtomicU32,
}
// Manual impl because AtomicU32 is not Clone.
impl Clone for ToIprDataResponse {
fn clone(&self) -> Self {
Self {
send_to: self.send_to.clone(),
client_version: self.client_version,
stream_id: self.stream_id,
next_response_seq: AtomicU32::new(self.next_response_seq.load(Ordering::Relaxed)),
}
}
}
impl MixnetMessageSinkTranslator for ToIprDataResponse {
@@ -205,12 +230,26 @@ impl MixnetMessageSinkTranslator for ToIprDataResponse {
&self,
bundled_ip_packets: &[u8],
) -> std::result::Result<InputMessage, nym_sdk::Error> {
// Create a IPR packet response that the recipient can understand
let response_packet = create_ip_packet_response(bundled_ip_packets, self.client_version)?;
// Wrap the response packet in a mixnet input message
// Optionally wrap in LP Stream frame for stream-mode clients
let final_packet = if let Some(stream_id) = self.stream_id {
let seq = self.next_response_seq.fetch_add(1, Ordering::Relaxed);
let attrs = StreamFrameAttributes {
stream_id,
msg_type: StreamMsgType::Data,
sequence_num: seq,
};
let frame = LpFrame::new_stream(attrs, response_packet);
let mut buf = BytesMut::with_capacity(LpFrameHeader::SIZE + frame.content.len());
frame.encode(&mut buf);
buf.to_vec()
} else {
response_packet
};
let input_message =
crate::util::create_message::create_input_message(&self.send_to, response_packet)
crate::util::create_message::create_input_message(&self.send_to, final_packet)
.with_max_retransmissions(0);
Ok(input_message)
@@ -282,6 +321,8 @@ mod tests {
let bytes_to_input_message = ToIprDataResponse {
send_to: client_id.clone(),
client_version,
stream_id: None,
next_response_seq: AtomicU32::new(0),
};
let mixnet_ip_packet_sender = MixnetMessageSink::new_with_custom_translator(
@@ -119,6 +119,9 @@ pub enum IpPacketRouterError {
#[error("KCP protocol error: {0}")]
KcpError(String),
#[error("{0}")]
Other(String),
}
pub type Result<T> = std::result::Result<T, IpPacketRouterError>;
@@ -181,6 +181,7 @@ impl IpPacketRouter {
shutdown_token: self.shutdown.clone_shutdown_token(),
connected_clients,
kcp_session_manager: crate::kcp_session_manager::KcpSessionManager::new(),
current_stream_id: None,
};
log::info!("The address of this client is: {self_address}");
@@ -22,8 +22,12 @@ use crate::{
request_filter::RequestFilter,
util::parse_ip::ParsedPacket,
};
use bytes::BytesMut;
use futures::StreamExt;
use nym_ip_packet_requests::codec::MultiIpPacketCodec;
use nym_lp::packet::frame::{
LpFrame, LpFrameHeader, LpFrameKind, StreamFrameAttributes, StreamMsgType,
};
use nym_sdk::mixnet::MixnetMessageSender;
use nym_sphinx::receiver::ReconstructedMessage;
use nym_task::ShutdownToken;
@@ -63,6 +67,10 @@ pub(crate) struct MixnetListener {
// KCP session manager for LP clients sending KCP-wrapped messages
pub(crate) kcp_session_manager: KcpSessionManager,
// When processing an LP Stream frame, this holds the stream_id so connect
// handlers can pass it to ConnectedClientHandler for LP-wrapping TUN responses.
pub(crate) current_stream_id: Option<u64>,
}
/// Check if a message payload appears to be KCP-wrapped.
@@ -233,6 +241,7 @@ impl MixnetListener {
buffer_timeout,
version,
self.mixnet_client.split_sender(),
self.current_stream_id,
);
// Register the new client in the set of connected clients
@@ -318,6 +327,7 @@ impl MixnetListener {
buffer_timeout,
version,
self.mixnet_client.split_sender(),
self.current_stream_id,
);
// Register the new client in the set of connected clients
@@ -436,6 +446,15 @@ impl MixnetListener {
.unwrap_or("missing".to_owned())
);
// Check if this is an LP Stream frame
if reconstructed.message.len() >= LpFrameHeader::SIZE {
if let Ok(header) = LpFrameHeader::parse(&reconstructed.message) {
if header.kind == LpFrameKind::Stream {
return self.on_stream_frame(reconstructed).await;
}
}
}
// Check if this is a KCP-wrapped message from an LP client
if is_kcp_message(&reconstructed.message) {
return self.on_kcp_message(reconstructed).await;
@@ -445,6 +464,104 @@ impl MixnetListener {
self.on_ipr_message(reconstructed).await
}
/// Handle LP Stream-framed messages.
///
/// Parses stream attributes, processes the inner IPR payload, and handles
/// responses inline (wrapped in LP Stream frames) — same pattern as KCP.
async fn on_stream_frame(
&mut self,
reconstructed: ReconstructedMessage,
) -> Result<Vec<PacketHandleResult>> {
log::debug!(
"Received LP Stream frame ({} bytes)",
reconstructed.message.len()
);
let header = LpFrameHeader::parse(&reconstructed.message)
.map_err(|e| IpPacketRouterError::Other(format!("Invalid LP frame header: {e}")))?;
let attrs = StreamFrameAttributes::parse(&header.frame_attributes).map_err(|e| {
IpPacketRouterError::Other(format!("Invalid stream frame attributes: {e}"))
})?;
let stream_id = attrs.stream_id;
log::debug!(
"LP Stream: stream_id={stream_id:#018x}, msg_type={:?}, seq={}",
attrs.msg_type,
attrs.sequence_num
);
// Set context so connect handlers thread stream_id to ConnectedClientHandler
self.current_stream_id = Some(stream_id);
let payload = &reconstructed.message[LpFrameHeader::SIZE..];
// Open frames may carry an empty payload (stream handshake only)
if payload.is_empty() {
log::debug!("LP Stream: empty payload (Open handshake), skipping");
self.current_stream_id = None;
return Ok(vec![]);
}
let inner_reconstructed = ReconstructedMessage {
message: payload.to_vec(),
sender_tag: reconstructed.sender_tag,
};
match self.on_ipr_message(inner_reconstructed).await {
Ok(results) => {
for result in results {
#[allow(clippy::collapsible_if)]
if let Ok(Some(response)) = result {
if let Err(e) = self.handle_stream_response(stream_id, response).await {
log::warn!(
"Error sending LP Stream response for stream_id={stream_id:#018x}: {e}"
);
}
}
}
}
Err(e) => {
log::warn!("Error processing LP Stream inner message: {e}");
}
}
self.current_stream_id = None;
// Return empty — we handled responses directly above
Ok(vec![])
}
/// Wrap a response in an LP Stream frame and send it via the mixnet.
///
/// Used for inline responses to LP Stream clients (connect handshake, pong, etc.).
async fn handle_stream_response(
&mut self,
stream_id: u64,
response: VersionedResponse,
) -> Result<()> {
let reply_to = response.reply_to.clone();
let response_bytes = response.try_into_bytes()?;
// Wrap in LP Stream frame (seq=0 for inline responses)
let attrs = StreamFrameAttributes {
stream_id,
msg_type: StreamMsgType::Data,
sequence_num: 0,
};
let frame = LpFrame::new_stream(attrs, response_bytes);
let mut buf = BytesMut::with_capacity(LpFrameHeader::SIZE + frame.content.len());
frame.encode(&mut buf);
let input_message =
crate::util::create_message::create_input_message(&reply_to, buf.to_vec());
self.mixnet_client.send(input_message).await.map_err(|err| {
IpPacketRouterError::FailedToSendPacketToMixnet {
source: Box::new(err),
}
})
}
/// Handle KCP-wrapped messages from LP clients.
///
/// LP clients send: KCP(IpPacketRequest)
@@ -813,4 +930,32 @@ mod tests {
"Invalid KCP command 85 should be rejected"
);
}
#[test]
fn test_lp_stream_frame_detected() {
use bytes::BytesMut;
use nym_lp::packet::frame::{
LpFrameHeader, LpFrameKind, StreamFrameAttributes, StreamMsgType,
};
let attrs = StreamFrameAttributes {
stream_id: 0x1234,
msg_type: StreamMsgType::Data,
sequence_num: 42,
};
let frame = nym_lp::packet::frame::LpFrame::new_stream(attrs, vec![8, 1, 0]); // fake IPR payload
let mut buf = BytesMut::new();
frame.encode(&mut buf);
let header = LpFrameHeader::parse(&buf).unwrap();
assert_eq!(header.kind, LpFrameKind::Stream);
let parsed_attrs = StreamFrameAttributes::parse(&header.frame_attributes).unwrap();
assert_eq!(parsed_attrs.stream_id, 0x1234);
assert_eq!(parsed_attrs.msg_type, StreamMsgType::Data);
assert_eq!(parsed_attrs.sequence_num, 42);
// Content is everything after the header
assert_eq!(&buf[LpFrameHeader::SIZE..], &[8, 1, 0]);
}
}