Compare commits
145 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0f8a8ddf7e | |||
| 3c92ce60ca | |||
| 846dbba363 | |||
| 94ab9d5466 | |||
| c78d942383 | |||
| 0b6166d20e | |||
| 6384467526 | |||
| fdd3823585 | |||
| 892a3bd826 | |||
| 59ff7d6588 | |||
| 20c4553bca | |||
| 4c38481c36 | |||
| 07680db2c7 | |||
| 59cbce50f7 | |||
| ac13ddbda8 | |||
| 67803930b6 | |||
| 7052e2e902 | |||
| cccfa76336 | |||
| a946336e67 | |||
| e5836bc1cb | |||
| f12108a7db | |||
| 70bdbce23f | |||
| e6f9b551ed | |||
| fcfa0b604e | |||
| 8b086e0239 | |||
| 6c76834b6c | |||
| 071589237b | |||
| 771ee10ba2 | |||
| 33ce05a3df | |||
| 73016ed687 | |||
| 8a5205ac4c | |||
| aaa7e317bf | |||
| f28c49e9d6 | |||
| e2ceaf48ed | |||
| 3e2137a33e | |||
| 984fa065e3 | |||
| da46ea7485 | |||
| b1bc359806 | |||
| b338644620 | |||
| 1ec0bf868b | |||
| 07842661b9 | |||
| 0cd4dd5747 | |||
| abdd960b20 | |||
| db2f3bff05 | |||
| be56c79106 | |||
| 3ccfbee834 | |||
| 942ab3c8e8 | |||
| 9ec937dd30 | |||
| 6ccc4a988a | |||
| 27890eb1a3 | |||
| fa327a1b2a | |||
| cea66c1237 | |||
| 757a89c5d7 | |||
| 1e3f531e15 | |||
| 7cc33d8df7 | |||
| 1bd0bfeee1 | |||
| f297af2a8c | |||
| d9190e5899 | |||
| a562812ad9 | |||
| 7368692629 | |||
| c185f485a7 | |||
| 6930968e88 | |||
| 8294191913 | |||
| 9b2fb45270 | |||
| cb8747abb8 | |||
| 47d37d8aed | |||
| d452932b18 | |||
| 702dfdc927 | |||
| 18e8dfe394 | |||
| 0208a84b77 | |||
| 7105bbf4b4 | |||
| 39692502df | |||
| fcefa079b0 | |||
| 371422f27b | |||
| 5541f242ff | |||
| 348e93dd70 | |||
| 7f8b7eea8c | |||
| 8760c40d46 | |||
| 8ae4b8fee2 | |||
| 4f4885fe50 | |||
| bc52db53b7 | |||
| 08d49a6f2e | |||
| 6f53192dbf | |||
| b5afb77f19 | |||
| 29714dea76 | |||
| 8fd9cee189 | |||
| 2b4a11e273 | |||
| a58b32703c | |||
| de80b4ce48 | |||
| 85a3b25be9 | |||
| 708bd71a56 | |||
| 40b886e0bd | |||
| 23c1c4bdac | |||
| 2dd8707725 | |||
| 0bb3c4b2bf | |||
| 72e8180abe | |||
| 2d5b1d577c | |||
| b5e45040ca | |||
| e420081512 | |||
| 0da4ee985b | |||
| 6d8cacc900 | |||
| 49543fcd98 | |||
| 7b80716c9a | |||
| a4a48c60ae | |||
| e027b5a1fe | |||
| 723df5584e | |||
| 2ca5155748 | |||
| 4f0cc58a11 | |||
| 2ccdfedd65 | |||
| d7ddb7592c | |||
| 7371ce3e36 | |||
| cd7bb9931e | |||
| b77dbdd87e | |||
| 83dcf3fd13 | |||
| a5c6e9d0e2 | |||
| a417411184 | |||
| 24d5e4aba9 | |||
| 6cb2fc8445 | |||
| 4ea2c3beb3 | |||
| be8c1191f3 | |||
| d969979c8c | |||
| c6fd3c8527 | |||
| 6ac4d93909 | |||
| 197a7eaec8 | |||
| f598ee2916 | |||
| b2fa6cdf8f | |||
| 97dbef155d | |||
| 9dbd91d93e | |||
| 7914cbdbb7 | |||
| 99febfb3aa | |||
| 2b00188983 | |||
| 82f270329f | |||
| 3cb17e76bd | |||
| 7b2f8a4ed1 | |||
| 438e745cb3 | |||
| 674fd511f4 | |||
| 66d85a7c0d | |||
| d12a5d754a | |||
| 3a78d62240 | |||
| 5e651b55fc | |||
| 8a6bf4a03d | |||
| 6a2f1a67ed | |||
| d56ab91a2e | |||
| 8f670f467b | |||
| d013168823 |
@@ -46,6 +46,7 @@ storybook-static
|
||||
**/.DS_Store
|
||||
cpu-cycles/libcpucycles/build
|
||||
foxyfox.env
|
||||
scratch.txt
|
||||
|
||||
.next
|
||||
ppa-private-key.b64
|
||||
|
||||
Generated
+1408
-1247
File diff suppressed because it is too large
Load Diff
@@ -103,6 +103,13 @@ impl LpFrame {
|
||||
Self::new(LpFrameKind::Forward, data)
|
||||
}
|
||||
|
||||
pub fn new_stream(attrs: StreamFrameAttributes, content: impl Into<Bytes>) -> Self {
|
||||
Self {
|
||||
header: LpFrameHeader::new(LpFrameKind::Stream, attrs.encode()),
|
||||
content: content.into(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn len(&self) -> usize {
|
||||
LpFrameHeader::SIZE + self.content.len()
|
||||
}
|
||||
@@ -115,6 +122,62 @@ pub enum LpFrameKind {
|
||||
Opaque = 0,
|
||||
Registration = 1,
|
||||
Forward = 2,
|
||||
Stream = 3,
|
||||
}
|
||||
|
||||
/// Message type within a `LpFrameKind::Stream` frame.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
#[repr(u8)]
|
||||
pub enum StreamMsgType {
|
||||
/// Open a new stream. Content is optional initial data.
|
||||
Open = 0,
|
||||
/// Data on an existing stream.
|
||||
Data = 1,
|
||||
}
|
||||
|
||||
/// Parsed form of the 14-byte `frame_attributes` for `LpFrameKind::Stream`.
|
||||
///
|
||||
/// Wire layout (big-endian):
|
||||
/// ```text
|
||||
/// [0..8 ) stream_id : u64
|
||||
/// [8 ) msg_type : u8 (0 = Open, 1 = Data)
|
||||
/// [9..13) sequence_num : u32
|
||||
/// [13 ) reserved : u8
|
||||
/// ```
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct StreamFrameAttributes {
|
||||
pub stream_id: u64,
|
||||
pub msg_type: StreamMsgType,
|
||||
pub sequence_num: u32,
|
||||
}
|
||||
|
||||
impl StreamFrameAttributes {
|
||||
pub fn encode(&self) -> [u8; 14] {
|
||||
let mut buf = [0u8; 14];
|
||||
buf[0..8].copy_from_slice(&self.stream_id.to_be_bytes());
|
||||
buf[8] = self.msg_type as u8;
|
||||
buf[9..13].copy_from_slice(&self.sequence_num.to_be_bytes());
|
||||
buf
|
||||
}
|
||||
|
||||
pub fn parse(attrs: &[u8; 14]) -> Result<Self, MalformedLpPacketError> {
|
||||
let stream_id = u64::from_be_bytes(attrs[0..8].try_into().unwrap());
|
||||
let msg_type = match attrs[8] {
|
||||
0 => StreamMsgType::Open,
|
||||
1 => StreamMsgType::Data,
|
||||
other => {
|
||||
return Err(MalformedLpPacketError::DeserialisationFailure(format!(
|
||||
"invalid stream msg_type: {other}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
let sequence_num = u32::from_be_bytes(attrs[9..13].try_into().unwrap());
|
||||
Ok(Self {
|
||||
stream_id,
|
||||
msg_type,
|
||||
sequence_num,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
[package]
|
||||
name = "mixtcp"
|
||||
version = "0.0.1"
|
||||
edition = "2021"
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
smoltcp = { workspace = true, default-features = false, features = [
|
||||
"std",
|
||||
"medium-ip",
|
||||
"proto-ipv4",
|
||||
"proto-ipv6",
|
||||
"socket-tcp",
|
||||
"socket-icmp",
|
||||
] }
|
||||
tokio = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
nym-bin-common = { path = "../common/bin-common", features = ["basic_tracing"] }
|
||||
nym-sdk = { path = "../sdk/rust/nym-sdk" }
|
||||
nym-ip-packet-requests = { path = "../common/ip-packet-requests" }
|
||||
thiserror.workspace = true
|
||||
rustls = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
reqwest.workspace = true
|
||||
dirs.workspace = true
|
||||
webpki-roots.workspace = true
|
||||
serde_json.workspace = true
|
||||
@@ -0,0 +1,30 @@
|
||||
# MixTCP
|
||||
|
||||
This is an initial proof of concept of a SmolTCP `device` that uses the Mixnet for transport. It relies on the `IpMixStream` module from the Rust SDK to set up a connection with an Exit Gateway's Ip-Packet-Router, meaning that this is the IP that is seen by the receiver of the request.
|
||||
|
||||
This can be used as the basis for building HTTP(S) crates on top of the Mixnet whilst abstracting away the complexities of using the Mixnet for transport.
|
||||
|
||||
More to come in the future.
|
||||
|
||||
`examples/` contains examples for:
|
||||
- a TLS ping with Cloudflare
|
||||
- creating a `reqwest`-like HTTPS `GET` request and receiving a response
|
||||
|
||||
## Component Interaction
|
||||
```sh
|
||||
create_device()
|
||||
|
|
||||
+--------------+---------------+
|
||||
| | |
|
||||
v v v
|
||||
NymIprDevice NymIprBridge IpPair
|
||||
| | (10.0.x.x)
|
||||
| |
|
||||
+-- channels --+
|
||||
|
|
||||
v
|
||||
IpMixStream
|
||||
|
|
||||
v
|
||||
Mixnet
|
||||
```
|
||||
@@ -0,0 +1,286 @@
|
||||
#![allow(clippy::result_large_err)]
|
||||
use mixtcp::{create_device, MixtcpError};
|
||||
use rustls::{pki_types::ServerName, ClientConfig, ClientConnection};
|
||||
use std::{
|
||||
io::{self, Read, Write},
|
||||
sync::Arc,
|
||||
};
|
||||
use tracing::info;
|
||||
|
||||
use nym_sdk::stream_wrapper::{IpMixStream, NetworkEnvironment};
|
||||
use smoltcp::{
|
||||
iface::{Config, Interface, SocketSet},
|
||||
socket::tcp,
|
||||
time::Instant,
|
||||
wire::{HardwareAddress, IpAddress, IpCidr, Ipv4Address},
|
||||
};
|
||||
use std::sync::Once;
|
||||
use std::time::Duration;
|
||||
|
||||
static INIT: Once = Once::new();
|
||||
|
||||
pub struct TlsOverTcp {
|
||||
pub conn: ClientConnection,
|
||||
}
|
||||
|
||||
impl TlsOverTcp {
|
||||
pub fn new(domain: &str) -> Result<Self, MixtcpError> {
|
||||
let mut root_store = rustls::RootCertStore::empty();
|
||||
root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
|
||||
|
||||
let config = ClientConfig::builder()
|
||||
.with_root_certificates(root_store)
|
||||
.with_no_client_auth();
|
||||
|
||||
let server_name = ServerName::try_from(domain)
|
||||
.map_err(|_| MixtcpError::InvalidDnsName)?
|
||||
.to_owned();
|
||||
|
||||
let conn = ClientConnection::new(Arc::new(config), server_name)
|
||||
.map_err(|_| MixtcpError::TlsHandshakeFailed)?;
|
||||
|
||||
Ok(Self { conn })
|
||||
}
|
||||
|
||||
/// Move data from TLS connection to TCP socket
|
||||
pub fn write_tls(&mut self, socket: &mut tcp::Socket) -> Result<(), MixtcpError> {
|
||||
let mut buf = [0u8; 4096];
|
||||
while self.conn.wants_write() {
|
||||
match self.conn.write_tls(&mut buf.as_mut_slice()) {
|
||||
Ok(n) if n > 0 => {
|
||||
socket
|
||||
.send_slice(&buf[..n])
|
||||
.map_err(|_| MixtcpError::TlsHandshakeFailed)?;
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Move data from TCP socket to TLS connection
|
||||
pub fn read_tls(&mut self, socket: &mut tcp::Socket) -> Result<(), MixtcpError> {
|
||||
if socket.can_recv() {
|
||||
let _ = socket.recv(|chunk| {
|
||||
if !chunk.is_empty() {
|
||||
inspect_tls_packet(chunk);
|
||||
let _ = self.conn.read_tls(&mut io::Cursor::new(&mut *chunk));
|
||||
let _ = self.conn.process_new_packets();
|
||||
}
|
||||
(chunk.len(), ())
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn send(&mut self, data: &[u8], socket: &mut tcp::Socket) -> Result<(), MixtcpError> {
|
||||
self.conn
|
||||
.writer()
|
||||
.write_all(data)
|
||||
.map_err(|_| MixtcpError::TlsHandshakeFailed)?;
|
||||
self.write_tls(socket)
|
||||
}
|
||||
|
||||
pub fn recv(&mut self, socket: &mut tcp::Socket) -> Result<Vec<u8>, MixtcpError> {
|
||||
self.read_tls(socket)?;
|
||||
let mut result = Vec::new();
|
||||
let mut buf = vec![0u8; 4096];
|
||||
match self.conn.reader().read(&mut buf) {
|
||||
Ok(n) if n > 0 => result.extend_from_slice(&buf[..n]),
|
||||
_ => {}
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
fn inspect_tls_packet(data: &[u8]) {
|
||||
if data.len() < 5 {
|
||||
return;
|
||||
}
|
||||
let content_type = data[0];
|
||||
if !(0x14..=0x17).contains(&content_type) {
|
||||
return;
|
||||
}
|
||||
let version = u16::from_be_bytes([data[1], data[2]]);
|
||||
let length = u16::from_be_bytes([data[3], data[4]]);
|
||||
info!(
|
||||
"TLS packet: ContentType={:#04x}, Version={:#06x}, Length={}",
|
||||
content_type, version, length
|
||||
);
|
||||
if content_type == 0x16 && data.len() > 5 {
|
||||
let handshake_type = data[5];
|
||||
let handshake_types = match handshake_type {
|
||||
0x01 => "ClientHello",
|
||||
0x02 => "ServerHello",
|
||||
0x0b => "Certificate",
|
||||
0x0c => "ServerKeyExchange",
|
||||
0x0d => "CertificateRequest",
|
||||
0x0e => "ServerHelloDone",
|
||||
0x0f => "CertificateVerify",
|
||||
0x10 => "ClientKeyExchange",
|
||||
0x14 => "Finished",
|
||||
_ => "Unknown",
|
||||
};
|
||||
info!(
|
||||
"Handshake type: {:#04x} ({}), Length: {}",
|
||||
handshake_type, handshake_types, length
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn init_logging() {
|
||||
INIT.call_once_force(|state| {
|
||||
if state.is_poisoned() {
|
||||
eprintln!("Logger initialization was poisoned, retrying");
|
||||
}
|
||||
if !tracing::dispatcher::has_been_set() {
|
||||
nym_bin_common::logging::setup_tracing_logger();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
init_logging();
|
||||
|
||||
let ipr_stream = IpMixStream::new(NetworkEnvironment::Mainnet).await?;
|
||||
let (mut device, bridge, allocated_ips) = create_device(ipr_stream).await?;
|
||||
info!("Allocated IP: {}", allocated_ips.ipv4);
|
||||
|
||||
tokio::spawn(async move {
|
||||
bridge.run().await.unwrap();
|
||||
});
|
||||
|
||||
let config = Config::new(HardwareAddress::Ip);
|
||||
let mut iface = Interface::new(config, &mut device, Instant::now());
|
||||
iface.update_ip_addrs(|ip_addrs| {
|
||||
ip_addrs
|
||||
.push(IpCidr::new(IpAddress::from(allocated_ips.ipv4), 32))
|
||||
.unwrap();
|
||||
});
|
||||
iface
|
||||
.routes_mut()
|
||||
.add_default_ipv4_route(Ipv4Address::UNSPECIFIED)
|
||||
.unwrap();
|
||||
|
||||
let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 16384]);
|
||||
let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 4096]);
|
||||
let tcp_socket = tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer);
|
||||
let mut sockets = SocketSet::new(vec![]);
|
||||
let tcp_handle = sockets.add(tcp_socket);
|
||||
|
||||
let target_ip = Ipv4Address::new(1, 1, 1, 1);
|
||||
let target_port = 443;
|
||||
|
||||
let mut timestamp = Instant::from_millis(0);
|
||||
let start = tokio::time::Instant::now();
|
||||
let mut connected = false;
|
||||
let mut tls = None;
|
||||
let mut handshake_completed = false;
|
||||
let mut request_sent = false;
|
||||
|
||||
loop {
|
||||
if start.elapsed() > Duration::from_secs(60) {
|
||||
info!("Test timeout after 60 seconds");
|
||||
break;
|
||||
}
|
||||
|
||||
iface.poll(timestamp, &mut device, &mut sockets);
|
||||
timestamp += smoltcp::time::Duration::from_millis(1);
|
||||
let socket = sockets.get_mut::<tcp::Socket>(tcp_handle);
|
||||
|
||||
// TCP connection setup
|
||||
if !connected && !socket.is_open() {
|
||||
match socket.connect(iface.context(), (target_ip, target_port), 49152) {
|
||||
Ok(_) => {
|
||||
info!("TCP connect started");
|
||||
connected = true;
|
||||
}
|
||||
Err(e) => {
|
||||
info!("TCP connect failed: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TLS setup after TCP established
|
||||
if socket.state() == tcp::State::Established && tls.is_none() {
|
||||
info!("TCP established - creating TLS connection");
|
||||
match TlsOverTcp::new("cloudflare.com") {
|
||||
Ok(t) => tls = Some(t),
|
||||
Err(e) => {
|
||||
info!("TLS create failed: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TLS handshake and request
|
||||
if let Some(ref mut tls_conn) = tls {
|
||||
let _ = tls_conn.read_tls(socket);
|
||||
let _ = tls_conn.write_tls(socket);
|
||||
|
||||
// Complete handshake
|
||||
if !tls_conn.conn.is_handshaking() && !handshake_completed {
|
||||
handshake_completed = true;
|
||||
info!("TLS handshake completed - ready for HTTPS");
|
||||
|
||||
// Send simple HTTP request
|
||||
let request = b"GET /cdn-cgi/trace HTTP/1.1\r\nHost: cloudflare.com\r\nUser-Agent: mixtcp-test/1.0\r\nAccept: */*\r\nConnection: close\r\n\r\n";
|
||||
match tls_conn.send(request, socket) {
|
||||
Ok(_) => {
|
||||
info!("HTTPS request sent");
|
||||
request_sent = true;
|
||||
}
|
||||
Err(e) => {
|
||||
info!("HTTPS send failed: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Read response after request sent
|
||||
if request_sent {
|
||||
let mut response_data = Vec::new();
|
||||
let mut buf = vec![0u8; 4096];
|
||||
|
||||
match tls_conn.conn.reader().read(&mut buf) {
|
||||
Ok(0) => {
|
||||
info!("Response complete - connection closed");
|
||||
break;
|
||||
}
|
||||
Ok(n) if n > 0 => {
|
||||
response_data.extend_from_slice(&buf[..n]);
|
||||
info!("Received {} bytes", n);
|
||||
|
||||
if let Ok(response_str) = std::str::from_utf8(&response_data) {
|
||||
if response_str.contains("\r\n\r\n") {
|
||||
info!("HTTPS response received!");
|
||||
|
||||
if let Some(status_end) = response_str.find("\r\n") {
|
||||
info!("HTTP Status: {}", &response_str[..status_end]);
|
||||
}
|
||||
|
||||
info!("Full response: {}", response_str);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(1_usize..) => {
|
||||
todo!()
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
// Keep polling
|
||||
}
|
||||
Err(e) => {
|
||||
info!("Read error: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
|
||||
Err("No HTTP response received".into())
|
||||
}
|
||||
@@ -0,0 +1,341 @@
|
||||
#![allow(clippy::result_large_err)]
|
||||
use mixtcp::{create_device, MixtcpError, NymIprDevice};
|
||||
use nym_sdk::stream_wrapper::{IpMixStream, NetworkEnvironment};
|
||||
use reqwest::StatusCode;
|
||||
use rustls::{pki_types::ServerName, ClientConfig, ClientConnection};
|
||||
use smoltcp::{
|
||||
iface::{Config, Interface, SocketSet},
|
||||
socket::tcp,
|
||||
time::Instant,
|
||||
wire::{HardwareAddress, IpAddress, IpCidr, Ipv4Address},
|
||||
};
|
||||
use std::sync::Once;
|
||||
use std::{
|
||||
io::{self, Read, Write},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use tracing::info;
|
||||
|
||||
static INIT: Once = Once::new();
|
||||
|
||||
pub struct TlsOverTcp {
|
||||
pub conn: ClientConnection,
|
||||
}
|
||||
|
||||
impl TlsOverTcp {
|
||||
pub fn new(domain: &str) -> Result<Self, MixtcpError> {
|
||||
let mut root_store = rustls::RootCertStore::empty();
|
||||
root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
|
||||
|
||||
let config = ClientConfig::builder()
|
||||
.with_root_certificates(root_store)
|
||||
.with_no_client_auth();
|
||||
|
||||
let server_name = ServerName::try_from(domain)
|
||||
.map_err(|_| MixtcpError::InvalidDnsName)?
|
||||
.to_owned();
|
||||
|
||||
let conn = ClientConnection::new(Arc::new(config), server_name)
|
||||
.map_err(|_| MixtcpError::TlsHandshakeFailed)?;
|
||||
|
||||
Ok(Self { conn })
|
||||
}
|
||||
|
||||
pub fn write_tls(&mut self, socket: &mut tcp::Socket) -> Result<(), MixtcpError> {
|
||||
let mut buf = [0u8; 4096];
|
||||
while self.conn.wants_write() {
|
||||
match self.conn.write_tls(&mut buf.as_mut_slice()) {
|
||||
Ok(n) if n > 0 => {
|
||||
socket
|
||||
.send_slice(&buf[..n])
|
||||
.map_err(|_| MixtcpError::TlsHandshakeFailed)?;
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn read_tls(&mut self, socket: &mut tcp::Socket) -> Result<(), MixtcpError> {
|
||||
if socket.can_recv() {
|
||||
let _ = socket.recv(|chunk| {
|
||||
if !chunk.is_empty() {
|
||||
let _ = self.conn.read_tls(&mut io::Cursor::new(&mut *chunk));
|
||||
let _ = self.conn.process_new_packets();
|
||||
}
|
||||
(chunk.len(), ())
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn send(&mut self, data: &[u8], socket: &mut tcp::Socket) -> Result<(), MixtcpError> {
|
||||
self.conn
|
||||
.writer()
|
||||
.write_all(data)
|
||||
.map_err(|_| MixtcpError::TlsHandshakeFailed)?;
|
||||
self.write_tls(socket)
|
||||
}
|
||||
}
|
||||
|
||||
/// Reqwest-ish client right now, just a handrolled GET request for the example
|
||||
pub struct MixtcpReqwestClient {
|
||||
device: Arc<tokio::sync::Mutex<(smoltcp::iface::Interface, NymIprDevice)>>,
|
||||
_bridge: tokio::task::JoinHandle<()>,
|
||||
_allocated_ip: Ipv4Address,
|
||||
}
|
||||
|
||||
impl MixtcpReqwestClient {
|
||||
pub async fn new() -> Result<Self, MixtcpError> {
|
||||
let ipr_stream = IpMixStream::new(NetworkEnvironment::Mainnet)
|
||||
.await
|
||||
.map_err(|_| MixtcpError::MixnetConnectionFailed)?;
|
||||
|
||||
let (mut device, bridge, allocated_ips) = create_device(ipr_stream).await?;
|
||||
info!("Allocated IP: {}", allocated_ips.ipv4);
|
||||
|
||||
let bridge_handle = tokio::spawn(async move {
|
||||
if let Err(e) = bridge.run().await {
|
||||
tracing::error!("Bridge error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
let config = Config::new(HardwareAddress::Ip);
|
||||
let mut iface = Interface::new(config, &mut device, Instant::now());
|
||||
iface.update_ip_addrs(|ip_addrs| {
|
||||
ip_addrs
|
||||
.push(IpCidr::new(IpAddress::from(allocated_ips.ipv4), 32))
|
||||
.unwrap();
|
||||
});
|
||||
iface
|
||||
.routes_mut()
|
||||
.add_default_ipv4_route(Ipv4Address::UNSPECIFIED)
|
||||
.unwrap();
|
||||
|
||||
let device = Arc::new(tokio::sync::Mutex::new((iface, device)));
|
||||
|
||||
Ok(Self {
|
||||
device,
|
||||
_bridge: bridge_handle,
|
||||
_allocated_ip: allocated_ips.ipv4,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get(&self, url: &str) -> Result<MixtcpResponse, MixtcpError> {
|
||||
let parsed_url = reqwest::Url::parse(url).map_err(|_| MixtcpError::InvalidUrl)?;
|
||||
let host = parsed_url.host_str().ok_or(MixtcpError::InvalidUrl)?;
|
||||
let path = parsed_url.path();
|
||||
|
||||
let response_bytes = self.simple_get_request(host, path).await?;
|
||||
let (status, body) = self.parse_simple_response(&response_bytes)?;
|
||||
|
||||
Ok(MixtcpResponse { status, body })
|
||||
}
|
||||
|
||||
async fn simple_get_request(&self, domain: &str, path: &str) -> Result<Vec<u8>, MixtcpError> {
|
||||
let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 16384]);
|
||||
let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 4096]);
|
||||
let tcp_socket = tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer);
|
||||
let mut sockets = SocketSet::new(vec![]);
|
||||
let tcp_handle = sockets.add(tcp_socket);
|
||||
|
||||
let target_ip = Ipv4Address::new(1, 1, 1, 1);
|
||||
let target_port = 443;
|
||||
|
||||
let mut timestamp = Instant::from_millis(0);
|
||||
let start = tokio::time::Instant::now();
|
||||
let mut connected = false;
|
||||
let mut tls = None;
|
||||
let mut handshake_completed = false;
|
||||
let mut request_sent = false;
|
||||
let mut response_data = Vec::new();
|
||||
|
||||
let mut device_guard = self.device.lock().await;
|
||||
let (ref mut iface, ref mut device) = &mut *device_guard;
|
||||
|
||||
loop {
|
||||
if start.elapsed() > Duration::from_secs(60) {
|
||||
return Err(MixtcpError::Timeout);
|
||||
}
|
||||
|
||||
iface.poll(timestamp, device, &mut sockets);
|
||||
timestamp += smoltcp::time::Duration::from_millis(1);
|
||||
let socket = sockets.get_mut::<tcp::Socket>(tcp_handle);
|
||||
|
||||
if !connected && !socket.is_open() {
|
||||
match socket.connect(iface.context(), (target_ip, target_port), 49152) {
|
||||
Ok(_) => {
|
||||
info!("TCP connect started");
|
||||
connected = true;
|
||||
}
|
||||
Err(e) => {
|
||||
info!("TCP connect failed: {}", e);
|
||||
return Err(MixtcpError::TcpConnectionFailed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if socket.state() == tcp::State::Established && tls.is_none() {
|
||||
info!("TCP established - creating TLS connection");
|
||||
match TlsOverTcp::new(domain) {
|
||||
Ok(t) => tls = Some(t),
|
||||
Err(e) => {
|
||||
info!("TLS create failed: {}", e);
|
||||
return Err(MixtcpError::TlsHandshakeFailed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref mut tls_conn) = tls {
|
||||
let _ = tls_conn.read_tls(socket);
|
||||
let _ = tls_conn.write_tls(socket);
|
||||
|
||||
if !tls_conn.conn.is_handshaking() && !handshake_completed {
|
||||
handshake_completed = true;
|
||||
info!("TLS handshake completed - ready for HTTPS");
|
||||
|
||||
let request = format!(
|
||||
"GET {} HTTP/1.1\r\nHost: {}\r\nUser-Agent: mixtcp/1.0\r\nAccept: */*\r\nConnection: close\r\n\r\n",
|
||||
path, domain
|
||||
);
|
||||
tls_conn.send(request.as_bytes(), socket)?;
|
||||
info!("HTTPS request sent");
|
||||
request_sent = true;
|
||||
}
|
||||
|
||||
if request_sent {
|
||||
let mut buf = vec![0u8; 4096];
|
||||
match tls_conn.conn.reader().read(&mut buf) {
|
||||
Ok(0) => {
|
||||
info!("Response complete");
|
||||
break;
|
||||
}
|
||||
Ok(n) if n > 0 => {
|
||||
response_data.extend_from_slice(&buf[..n]);
|
||||
if let Ok(response_str) = std::str::from_utf8(&response_data) {
|
||||
if response_str.contains("\r\n\r\n") {
|
||||
return Ok(response_data);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
|
||||
Err(e) => {
|
||||
info!("Read error: {}", e);
|
||||
return Err(MixtcpError::ResponseReadFailed);
|
||||
}
|
||||
Ok(_) => continue,
|
||||
}
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
|
||||
Err(MixtcpError::NoResponseReceived)
|
||||
}
|
||||
|
||||
/// Simple response - just extract status and body
|
||||
fn parse_simple_response(&self, response_bytes: &[u8]) -> Result<(u16, String), MixtcpError> {
|
||||
let response_str = String::from_utf8_lossy(response_bytes);
|
||||
|
||||
let status_line = response_str
|
||||
.lines()
|
||||
.next()
|
||||
.ok_or(MixtcpError::InvalidHttpResponse)?;
|
||||
|
||||
let status: u16 = status_line
|
||||
.split_whitespace()
|
||||
.nth(1)
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(200);
|
||||
|
||||
if let Some(body_start) = response_str.find("\r\n\r\n") {
|
||||
let body = response_str[body_start + 4..].to_string();
|
||||
Ok((status, body))
|
||||
} else {
|
||||
Err(MixtcpError::InvalidHttpResponse)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MixtcpResponse {
|
||||
status: u16,
|
||||
body: String,
|
||||
}
|
||||
|
||||
impl MixtcpResponse {
|
||||
pub fn status(&self) -> StatusCode {
|
||||
StatusCode::from_u16(self.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
|
||||
pub async fn text(self) -> Result<String, std::convert::Infallible> {
|
||||
Ok(self.body)
|
||||
}
|
||||
}
|
||||
|
||||
fn init_logging() {
|
||||
INIT.call_once_force(|state| {
|
||||
if state.is_poisoned() {
|
||||
eprintln!("Logger initialization was poisoned, retrying");
|
||||
}
|
||||
if !tracing::dispatcher::has_been_set() {
|
||||
nym_bin_common::logging::setup_tracing_logger();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
init_logging();
|
||||
|
||||
let test_url = "https://cloudflare.com/cdn-cgi/trace";
|
||||
|
||||
info!("Fetching with plain reqwest...");
|
||||
let start = tokio::time::Instant::now();
|
||||
let plain_response = reqwest::get(test_url).await?;
|
||||
let plain_status = plain_response.status();
|
||||
let plain_text = plain_response.text().await?;
|
||||
let plain_duration = start.elapsed();
|
||||
|
||||
info!(
|
||||
"Plain reqwest - Status: {}, Time: {:?}",
|
||||
plain_status, plain_duration
|
||||
);
|
||||
|
||||
info!("Setting up mixnet client...");
|
||||
let client = MixtcpReqwestClient::new().await?;
|
||||
let start = tokio::time::Instant::now();
|
||||
let mixnet_response = client.get(test_url).await?;
|
||||
let mixnet_status = mixnet_response.status();
|
||||
let mixnet_text = mixnet_response.text().await?;
|
||||
let mixnet_duration = start.elapsed();
|
||||
|
||||
info!(
|
||||
"Mixnet reqwest - Status: {}, Time: {:?}",
|
||||
mixnet_status, mixnet_duration
|
||||
);
|
||||
|
||||
info!("Status codes match: {}", plain_status == mixnet_status);
|
||||
info!(
|
||||
"Response lengths match: {}",
|
||||
plain_text.len() == mixnet_text.len()
|
||||
);
|
||||
|
||||
let key_fields = ["fl=", "ip=", "ts=", "visit_scheme="];
|
||||
for field in key_fields {
|
||||
let plain_has = plain_text.contains(field);
|
||||
let mixnet_has = mixnet_text.contains(field);
|
||||
info!(
|
||||
"Field '{}' - Plain: {}, Mixnet: {}",
|
||||
field, plain_has, mixnet_has
|
||||
);
|
||||
assert_eq!(plain_has, mixnet_has, "Field '{}' mismatch", field);
|
||||
}
|
||||
|
||||
info!("Plain reqwest time: {:?}", plain_duration);
|
||||
info!("Mixnet reqwest time: {:?}", mixnet_duration);
|
||||
let slowdown = mixnet_duration.as_millis() as f64 / plain_duration.as_millis() as f64;
|
||||
info!("Mixnet slowdown: {:.1}x", slowdown);
|
||||
info!("Both responses match");
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,270 @@
|
||||
#![allow(clippy::result_large_err)]
|
||||
use mixtcp::{create_device, MixtcpError};
|
||||
use rustls::{pki_types::ServerName, ClientConfig, ClientConnection};
|
||||
use std::{
|
||||
io::{self, Read, Write},
|
||||
sync::Arc,
|
||||
};
|
||||
use tracing::info;
|
||||
|
||||
use nym_sdk::stream_wrapper::{IpMixStream, NetworkEnvironment};
|
||||
|
||||
use smoltcp::{
|
||||
iface::{Config, Interface, SocketSet},
|
||||
socket::tcp,
|
||||
time::Instant,
|
||||
wire::{HardwareAddress, IpAddress, IpCidr, Ipv4Address},
|
||||
};
|
||||
use std::sync::Once;
|
||||
use std::time::Duration;
|
||||
|
||||
static INIT: Once = Once::new();
|
||||
|
||||
pub struct TlsOverTcp {
|
||||
pub conn: ClientConnection,
|
||||
}
|
||||
|
||||
impl TlsOverTcp {
|
||||
pub fn new(domain: &str) -> Result<Self, MixtcpError> {
|
||||
let mut root_store = rustls::RootCertStore::empty();
|
||||
root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
|
||||
|
||||
let config = ClientConfig::builder()
|
||||
.with_root_certificates(root_store)
|
||||
.with_no_client_auth();
|
||||
|
||||
let server_name = ServerName::try_from(domain)
|
||||
.map_err(|_| MixtcpError::InvalidDnsName)?
|
||||
.to_owned();
|
||||
|
||||
let conn = ClientConnection::new(Arc::new(config), server_name)
|
||||
.map_err(|_| MixtcpError::TlsHandshakeFailed)?;
|
||||
|
||||
Ok(Self { conn })
|
||||
}
|
||||
|
||||
/// Move data from TLS connection to TCP socket
|
||||
pub fn write_tls(&mut self, socket: &mut tcp::Socket) -> Result<(), MixtcpError> {
|
||||
let mut buf = [0u8; 4096];
|
||||
while self.conn.wants_write() {
|
||||
match self.conn.write_tls(&mut buf.as_mut_slice()) {
|
||||
Ok(n) if n > 0 => {
|
||||
socket
|
||||
.send_slice(&buf[..n])
|
||||
.map_err(|_| MixtcpError::TlsHandshakeFailed)?;
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Move data from TCP socket to TLS connection
|
||||
pub fn read_tls(&mut self, socket: &mut tcp::Socket) -> Result<(), MixtcpError> {
|
||||
if socket.can_recv() {
|
||||
let _ = socket.recv(|chunk| {
|
||||
if !chunk.is_empty() {
|
||||
inspect_tls_packet(chunk);
|
||||
let _ = self.conn.read_tls(&mut io::Cursor::new(&mut *chunk));
|
||||
let _ = self.conn.process_new_packets();
|
||||
}
|
||||
(chunk.len(), ())
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn send(&mut self, data: &[u8], socket: &mut tcp::Socket) -> Result<(), MixtcpError> {
|
||||
self.conn
|
||||
.writer()
|
||||
.write_all(data)
|
||||
.map_err(|_| MixtcpError::TlsHandshakeFailed)?;
|
||||
self.write_tls(socket)
|
||||
}
|
||||
|
||||
pub fn recv(&mut self, socket: &mut tcp::Socket) -> Result<Vec<u8>, MixtcpError> {
|
||||
self.read_tls(socket)?;
|
||||
let mut result = Vec::new();
|
||||
let mut buf = vec![0u8; 4096];
|
||||
match self.conn.reader().read(&mut buf) {
|
||||
Ok(n) if n > 0 => result.extend_from_slice(&buf[..n]),
|
||||
_ => {}
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
fn inspect_tls_packet(data: &[u8]) {
|
||||
if data.len() < 5 {
|
||||
return;
|
||||
}
|
||||
let content_type = data[0];
|
||||
if !(0x14..=0x17).contains(&content_type) {
|
||||
return;
|
||||
}
|
||||
let version = u16::from_be_bytes([data[1], data[2]]);
|
||||
let length = u16::from_be_bytes([data[3], data[4]]);
|
||||
info!(
|
||||
"TLS packet: ContentType={:#04x}, Version={:#06x}, Length={}",
|
||||
content_type, version, length
|
||||
);
|
||||
if content_type == 0x16 && data.len() > 5 {
|
||||
let handshake_type = data[5];
|
||||
let handshake_types = match handshake_type {
|
||||
0x01 => "ClientHello",
|
||||
0x02 => "ServerHello",
|
||||
0x0b => "Certificate",
|
||||
0x0c => "ServerKeyExchange",
|
||||
0x0d => "CertificateRequest",
|
||||
0x0e => "ServerHelloDone",
|
||||
0x0f => "CertificateVerify",
|
||||
0x10 => "ClientKeyExchange",
|
||||
0x14 => "Finished",
|
||||
_ => "Unknown",
|
||||
};
|
||||
info!(
|
||||
"Handshake type: {:#04x} ({}), Length: {}",
|
||||
handshake_type, handshake_types, length
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn init_logging() {
|
||||
INIT.call_once_force(|state| {
|
||||
if state.is_poisoned() {
|
||||
eprintln!("Logger initialization was poisoned, retrying");
|
||||
}
|
||||
if !tracing::dispatcher::has_been_set() {
|
||||
nym_bin_common::logging::setup_tracing_logger();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
init_logging();
|
||||
|
||||
let ipr_stream = IpMixStream::new(NetworkEnvironment::Mainnet).await?;
|
||||
let (mut device, bridge, allocated_ips) = create_device(ipr_stream).await?;
|
||||
info!("Allocated IP: {}", allocated_ips.ipv4);
|
||||
|
||||
tokio::spawn(async move {
|
||||
bridge.run().await.unwrap();
|
||||
});
|
||||
|
||||
let config = Config::new(HardwareAddress::Ip);
|
||||
let mut iface = Interface::new(config, &mut device, Instant::now());
|
||||
iface.update_ip_addrs(|ip_addrs| {
|
||||
ip_addrs
|
||||
.push(IpCidr::new(IpAddress::from(allocated_ips.ipv4), 32))
|
||||
.unwrap();
|
||||
});
|
||||
iface
|
||||
.routes_mut()
|
||||
.add_default_ipv4_route(Ipv4Address::UNSPECIFIED)
|
||||
.unwrap();
|
||||
|
||||
let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 16384]);
|
||||
let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 4096]);
|
||||
let tcp_socket = tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer);
|
||||
let mut sockets = SocketSet::new(vec![]);
|
||||
let tcp_handle = sockets.add(tcp_socket);
|
||||
|
||||
let target_ip = Ipv4Address::new(1, 1, 1, 1); // Pinging Cloudflare
|
||||
let target_port = 443;
|
||||
info!("Connecting to {}:{} through mixnet", target_ip, target_port);
|
||||
|
||||
let mut timestamp = Instant::from_millis(0);
|
||||
let start = tokio::time::Instant::now();
|
||||
let mut connected = false;
|
||||
let mut tls = None;
|
||||
let handshake_completed = false;
|
||||
|
||||
loop {
|
||||
if start.elapsed() > Duration::from_secs(120) {
|
||||
info!("Test timeout after 120 seconds");
|
||||
break;
|
||||
}
|
||||
|
||||
iface.poll(timestamp, &mut device, &mut sockets);
|
||||
timestamp += smoltcp::time::Duration::from_millis(1);
|
||||
let socket = sockets.get_mut::<tcp::Socket>(tcp_handle);
|
||||
|
||||
if !connected && !socket.is_open() {
|
||||
match socket.connect(iface.context(), (target_ip, target_port), 49152) {
|
||||
Ok(_) => {
|
||||
info!("TCP connect started");
|
||||
connected = true;
|
||||
}
|
||||
Err(e) => {
|
||||
info!("TCP connect failed: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if start.elapsed().as_secs().is_multiple_of(5) && start.elapsed().as_millis() % 1000 < 100 {
|
||||
info!(
|
||||
"State: TCP={:?}, established={}, can_send={}, can_recv={}",
|
||||
socket.state(),
|
||||
socket.state() == tcp::State::Established,
|
||||
socket.may_send(),
|
||||
socket.can_recv()
|
||||
);
|
||||
}
|
||||
|
||||
if socket.state() == tcp::State::Established && tls.is_none() {
|
||||
info!("TCP established - creating TLS connection");
|
||||
match TlsOverTcp::new("cloudflare.com") {
|
||||
Ok(t) => tls = Some(t),
|
||||
Err(e) => {
|
||||
info!("TLS create failed: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref mut tls_conn) = tls {
|
||||
let _ = tls_conn.read_tls(socket);
|
||||
let _ = tls_conn.write_tls(socket);
|
||||
|
||||
if start.elapsed().as_secs().is_multiple_of(10)
|
||||
&& start.elapsed().as_millis() % 1000 < 100
|
||||
{
|
||||
info!(
|
||||
"TLS state: handshaking={}, wants_read={}, wants_write={}",
|
||||
tls_conn.conn.is_handshaking(),
|
||||
tls_conn.conn.wants_read(),
|
||||
tls_conn.conn.wants_write()
|
||||
);
|
||||
}
|
||||
|
||||
if !tls_conn.conn.is_handshaking() && !handshake_completed {
|
||||
info!("TLS handshake complete");
|
||||
info!(
|
||||
"TLS verification: handshake_complete=true, wants_read={}, wants_write={}",
|
||||
tls_conn.conn.wants_read(),
|
||||
tls_conn.conn.wants_write()
|
||||
);
|
||||
|
||||
match tls_conn.recv(socket) {
|
||||
Ok(data) if data.is_empty() => {
|
||||
info!("No unexpected application data waiting to be read");
|
||||
}
|
||||
Ok(data) => {
|
||||
info!("Unexpected application data received: {} bytes", data.len());
|
||||
}
|
||||
Err(e) => {
|
||||
info!("TLS recv check failed: {}", e);
|
||||
}
|
||||
}
|
||||
info!("TLS handshake successful with cloudflare");
|
||||
break;
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(1)).await;
|
||||
}
|
||||
|
||||
info!("Test completed");
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,120 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-2.0-only
|
||||
|
||||
use crate::error::MixtcpError;
|
||||
use nym_ip_packet_requests::codec::MultiIpPacketCodec;
|
||||
use nym_sdk::stream_wrapper::IpMixStream;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{error, info};
|
||||
|
||||
/// Asynchronous bridge between smoltcp device and Mixnet.
|
||||
///
|
||||
/// This component runs in a separate task and handles all asynchronous
|
||||
/// operations required for outbound communication. It receives packets
|
||||
/// from the device via channels, bundles them according to IPR protocol
|
||||
/// (MultiIpPacketCodec) and transmits them through the Mixnet.
|
||||
///
|
||||
/// # Packet Processing Flow
|
||||
///
|
||||
/// Outgoing packets:
|
||||
/// - Receive from device via channel
|
||||
/// - Bundle using MultiIpPacketCodec
|
||||
/// - Send through mixnet via send_ip_packet()
|
||||
///
|
||||
/// Incoming packets:
|
||||
/// - Poll mixnet with handle_incoming()
|
||||
/// - Forward to device via channel
|
||||
/// - Device queues for smoltcp consumption
|
||||
pub struct NymIprBridge {
|
||||
/// Connected IPR stream for mixnet communication
|
||||
stream: IpMixStream,
|
||||
/// Channel for receiving outgoing packets from device
|
||||
tx_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
|
||||
/// Channel for sending incoming packets to device
|
||||
rx_sender: mpsc::UnboundedSender<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl NymIprBridge {
|
||||
pub fn new(
|
||||
stream: IpMixStream,
|
||||
tx_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
|
||||
rx_sender: mpsc::UnboundedSender<Vec<u8>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
tx_receiver,
|
||||
rx_sender,
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs the bridge event loop.
|
||||
///
|
||||
/// This method should be spawned in a separate task. It continuously:
|
||||
/// - Processes outgoing packets from the device
|
||||
/// - Polls for incoming packets from the mixnet
|
||||
/// - Maintains packet statistics
|
||||
///
|
||||
/// The loop exits when channels are closed or an error occurs.
|
||||
pub async fn run(mut self) -> Result<(), MixtcpError> {
|
||||
info!("Starting Nym IPR bridge");
|
||||
let mut packets_sent = 0;
|
||||
let mut packets_received = 0;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Outgoing packets from smoltcp layer above.
|
||||
Some(packet) = self.tx_receiver.recv() => {
|
||||
info!("Bridge sending {} byte packet to mixnet", packet.len());
|
||||
|
||||
// Log packet details for debugging
|
||||
if packet.len() >= 20 {
|
||||
let version = (packet[0] >> 4) & 0xF;
|
||||
let proto = packet[9];
|
||||
let src_ip = &packet[12..16];
|
||||
let dst_ip = &packet[16..20];
|
||||
info!(
|
||||
"Outgoing IPv{} packet: proto={}, src={}.{}.{}.{}, dst={}.{}.{}.{}",
|
||||
version, proto,
|
||||
src_ip[0], src_ip[1], src_ip[2], src_ip[3],
|
||||
dst_ip[0], dst_ip[1], dst_ip[2], dst_ip[3]
|
||||
);
|
||||
}
|
||||
|
||||
// Necessary to bundle for IPR! See stream_wrapper_ipr.rs tests.
|
||||
let bundled = MultiIpPacketCodec::bundle_one_packet(packet.into());
|
||||
if let Err(e) = self.stream.send_ip_packet(&bundled).await {
|
||||
error!("Failed to send packet through mixnet: {}", e);
|
||||
} else {
|
||||
packets_sent += 1;
|
||||
info!("Total packets sent: {}", packets_sent);
|
||||
}
|
||||
}
|
||||
|
||||
// Poll for incoming packets from mixnet
|
||||
Ok(packets) = self.stream.handle_incoming() => {
|
||||
if !packets.is_empty() {
|
||||
info!("Bridge received {} packets from mixnet", packets.len());
|
||||
for packet in packets {
|
||||
info!("Incoming packet: {} bytes", packet.len());
|
||||
|
||||
// Forward to device via channel
|
||||
if self.rx_sender.send(packet.to_vec()).is_err() {
|
||||
error!("Failed to send packet to device - receiver dropped");
|
||||
return Err(MixtcpError::ChannelClosed);
|
||||
}
|
||||
packets_received += 1;
|
||||
info!("Total packets received: {}", packets_received);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
else => {
|
||||
info!("Bridge shutting down. Sent: {}, Received: {}", packets_sent, packets_received);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,165 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-2.0-only
|
||||
|
||||
use smoltcp::{
|
||||
phy::{Device, DeviceCapabilities, Medium, RxToken, TxToken},
|
||||
time::Instant,
|
||||
};
|
||||
use std::collections::VecDeque;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{info, warn};
|
||||
|
||||
/// # Overview
|
||||
/// We need something to bridge the async / sync weirdness (Device trait fns are sync, IpMixStream fns are
|
||||
/// async) in a way that allows for the `NymIprDevice` to look and act like any other device.
|
||||
///
|
||||
/// We need to be polling the queue to/from the NymIprBridge, hence the addition of the
|
||||
/// mpsc channels in the Device struct and the extra fns.
|
||||
///
|
||||
/// # Architecture
|
||||
/// smoltcp (sync) <-> NymIprDevice <-> channels <-> NymIprBridge <-> Mixnet (async)
|
||||
///
|
||||
/// The device maintains a receive queue for packets coming from the mixnet and
|
||||
/// uses unbounded channels to communicate with the bridge task that handles the
|
||||
/// actual mixnet I/O. We poll the channel in receive() to move packets via mpsc
|
||||
/// from async to sync world.
|
||||
///
|
||||
/// This way no blocking from smoltcp + allows for concurrency.
|
||||
///
|
||||
/// Adapter pattern between sync polling-based I/O and async event-based I/O.
|
||||
pub struct NymIprDevice {
|
||||
// Receive queue for packets coming from the mixnet
|
||||
rx_queue: VecDeque<Vec<u8>>,
|
||||
|
||||
// Channel to send packets to the bridge task
|
||||
tx_sender: mpsc::UnboundedSender<Vec<u8>>,
|
||||
|
||||
// Device capabilities
|
||||
capabilities: DeviceCapabilities,
|
||||
|
||||
// Channel to receive packets from the bridge task
|
||||
rx_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl NymIprDevice {
|
||||
pub fn new(
|
||||
tx_sender: mpsc::UnboundedSender<Vec<u8>>,
|
||||
rx_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
|
||||
) -> Self {
|
||||
let mut capabilities = DeviceCapabilities::default();
|
||||
capabilities.medium = Medium::Ip;
|
||||
// Standard MTU for IP packets - TODO make configurable
|
||||
capabilities.max_transmission_unit = 1500;
|
||||
// Process one packet at a time. TODO experiment with this
|
||||
capabilities.max_burst_size = Some(1);
|
||||
|
||||
Self {
|
||||
rx_queue: VecDeque::new(),
|
||||
tx_sender,
|
||||
capabilities,
|
||||
rx_receiver,
|
||||
}
|
||||
}
|
||||
|
||||
/// Poll for new packets from the bridge
|
||||
fn poll_rx_queue(&mut self) {
|
||||
// Try to receive all available packets without blocking, queue them for smoltcp consumption.
|
||||
while let Ok(packet) = self.rx_receiver.try_recv() {
|
||||
info!("Received packet of {} bytes from bridge", packet.len());
|
||||
self.rx_queue.push_back(packet);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn tx_sender(&self) -> mpsc::UnboundedSender<Vec<u8>> {
|
||||
self.tx_sender.clone()
|
||||
}
|
||||
|
||||
/// Get the receiver for external use
|
||||
pub fn rx_receiver(&self) -> mpsc::UnboundedReceiver<Vec<u8>> {
|
||||
// Create a new channel and return the receiver
|
||||
// This is a bit of a hack but necessary for the current architecture
|
||||
let (_tx, rx) = mpsc::unbounded_channel();
|
||||
// We just need the receiver for testing
|
||||
rx
|
||||
}
|
||||
}
|
||||
|
||||
impl Device for NymIprDevice {
|
||||
type RxToken<'a>
|
||||
= NymRxToken
|
||||
where
|
||||
Self: 'a;
|
||||
type TxToken<'a>
|
||||
= NymTxToken
|
||||
where
|
||||
Self: 'a;
|
||||
|
||||
fn receive(&mut self, _timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
|
||||
// Poll for new packets from the async bridge
|
||||
self.poll_rx_queue();
|
||||
|
||||
// Check if we have a packet to deliver
|
||||
let packet = self.rx_queue.pop_front()?;
|
||||
|
||||
// Create tokens - RxToken owns the packet data
|
||||
let rx_token = NymRxToken { buffer: packet };
|
||||
let tx_token = NymTxToken {
|
||||
tx_sender: self.tx_sender.clone(),
|
||||
};
|
||||
|
||||
Some((rx_token, tx_token))
|
||||
}
|
||||
|
||||
fn transmit(&mut self, _timestamp: Instant) -> Option<Self::TxToken<'_>> {
|
||||
// We can always transmit (channel will buffer)
|
||||
Some(NymTxToken {
|
||||
tx_sender: self.tx_sender.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn capabilities(&self) -> DeviceCapabilities {
|
||||
self.capabilities.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Receive token - owns the packet buffer
|
||||
pub struct NymRxToken {
|
||||
buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl RxToken for NymRxToken {
|
||||
fn consume<R, F>(self, f: F) -> R
|
||||
where
|
||||
F: FnOnce(&[u8]) -> R,
|
||||
{
|
||||
info!("Consuming RX packet of {} bytes", self.buffer.len());
|
||||
f(&self.buffer)
|
||||
}
|
||||
}
|
||||
|
||||
/// Transmit token - holds channel sender
|
||||
pub struct NymTxToken {
|
||||
tx_sender: mpsc::UnboundedSender<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl TxToken for NymTxToken {
|
||||
fn consume<R, F>(self, len: usize, f: F) -> R
|
||||
where
|
||||
F: FnOnce(&mut [u8]) -> R,
|
||||
{
|
||||
// Create buffer for the packet
|
||||
let mut buffer = vec![0u8; len];
|
||||
|
||||
// Let smoltcp fill the packet
|
||||
let result = f(&mut buffer);
|
||||
|
||||
// Send raw packet to the bridge task for transmission
|
||||
if let Err(e) = self.tx_sender.send(buffer) {
|
||||
warn!("Failed to send packet to bridge: {}", e);
|
||||
} else {
|
||||
info!("Sent {} byte packet to bridge", len);
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-2.0-only
|
||||
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum MixtcpError {
|
||||
#[error("Channel closed")]
|
||||
ChannelClosed,
|
||||
|
||||
#[error("Not connected to IPR")]
|
||||
NotConnected,
|
||||
|
||||
#[error("Nym SDK error: {0}")]
|
||||
NymSdk(#[from] nym_sdk::Error),
|
||||
|
||||
#[error("TLS handshake failed")]
|
||||
TlsHandshakeFailed,
|
||||
|
||||
#[error("TLS encrypt/decrypt error")]
|
||||
TlsCrypto,
|
||||
|
||||
#[error("DNS err placeholder")]
|
||||
InvalidDnsName,
|
||||
|
||||
#[error("IO error: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
|
||||
#[error("HTTP parse failed")]
|
||||
HttpParseFailed,
|
||||
|
||||
#[error("Invalid URL")]
|
||||
InvalidUrl,
|
||||
|
||||
#[error("Mixnet connection failed")]
|
||||
MixnetConnectionFailed,
|
||||
|
||||
#[error("Request timeout")]
|
||||
Timeout,
|
||||
|
||||
#[error("TCP connection failed")]
|
||||
TcpConnectionFailed,
|
||||
|
||||
#[error("Response read failed")]
|
||||
ResponseReadFailed,
|
||||
|
||||
#[error("No response received")]
|
||||
NoResponseReceived,
|
||||
|
||||
#[error("Invalid HTTP response")]
|
||||
InvalidHttpResponse,
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-2.0-only
|
||||
|
||||
mod bridge;
|
||||
mod device;
|
||||
mod error;
|
||||
|
||||
pub use bridge::NymIprBridge;
|
||||
pub use device::NymIprDevice;
|
||||
pub use error::MixtcpError;
|
||||
|
||||
use nym_ip_packet_requests::IpPair;
|
||||
use nym_sdk::stream_wrapper::IpMixStream;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// Create a connected smoltcp device and async bridge for the tunneling packets through the
|
||||
/// Mixnet to remote hosts via an IPR.
|
||||
///
|
||||
/// This function handles the complete setup process:
|
||||
/// - Ensures the IPR stream is connected
|
||||
/// - Retrieves allocated IP addresses
|
||||
/// - Creates communication channels
|
||||
/// - Constructs the device and bridge components
|
||||
pub async fn create_device(
|
||||
mut ipr_stream: IpMixStream,
|
||||
) -> Result<(NymIprDevice, NymIprBridge, IpPair), MixtcpError> {
|
||||
// Ensure the stream is connected
|
||||
if !ipr_stream.is_connected() {
|
||||
ipr_stream.connect_tunnel().await?;
|
||||
}
|
||||
|
||||
// Get the allocated IPs before moving the stream - need these for proper packet creation
|
||||
// further 'up' the flow in the code calling this fn (see examples/tcp_connect.rs).
|
||||
let allocated_ips = *ipr_stream
|
||||
.allocated_ips()
|
||||
.ok_or(MixtcpError::NotConnected)?;
|
||||
|
||||
// Create channels for device <-> bridge communication
|
||||
let (tx_to_bridge, tx_from_device) = mpsc::unbounded_channel();
|
||||
let (rx_to_device, rx_from_bridge) = mpsc::unbounded_channel();
|
||||
|
||||
// Create device
|
||||
let device = NymIprDevice::new(tx_to_bridge, rx_from_bridge);
|
||||
|
||||
// Create bridge (moves ipr_stream)
|
||||
let bridge = NymIprBridge::new(ipr_stream, tx_from_device, rx_to_device);
|
||||
|
||||
Ok((device, bridge, allocated_ips))
|
||||
}
|
||||
@@ -16,10 +16,9 @@ workspace = true
|
||||
bincode.workspace = true
|
||||
bytes.workspace = true
|
||||
futures.workspace = true
|
||||
nym-ip-packet-requests = { workspace = true }
|
||||
nym-sdk = { workspace = true }
|
||||
thiserror.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tokio.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
nym-sdk = { workspace = true }
|
||||
nym-ip-packet-requests = { workspace = true }
|
||||
|
||||
@@ -30,8 +30,6 @@ enum ConnectionState {
|
||||
Disconnected,
|
||||
Connecting,
|
||||
Connected,
|
||||
#[allow(unused)]
|
||||
Disconnecting,
|
||||
}
|
||||
|
||||
pub struct IprClientConnect {
|
||||
@@ -83,7 +81,7 @@ impl IprClientConnect {
|
||||
self.listen_for_connect_response(request_id).await
|
||||
}
|
||||
|
||||
async fn send_connect_request(&self, ip_packet_router_address: Recipient) -> Result<u64> {
|
||||
async fn send_connect_request(&mut self, ip_packet_router_address: Recipient) -> Result<u64> {
|
||||
let (request, request_id) = IpPacketRequest::new_connect_request(None);
|
||||
|
||||
// We use 20 surbs for the connect request because typically the IPR is configured to have
|
||||
|
||||
@@ -18,6 +18,9 @@ pub enum Error {
|
||||
)]
|
||||
ReceivedResponseWithNewVersion { expected: u8, received: u8 },
|
||||
|
||||
#[error("got reply for connect request, but it appears intended for the wrong address?")]
|
||||
GotReplyIntendedForWrongAddress,
|
||||
|
||||
#[error("unexpected connect response")]
|
||||
UnexpectedConnectResponse,
|
||||
|
||||
@@ -41,6 +44,10 @@ pub enum Error {
|
||||
|
||||
#[error(transparent)]
|
||||
Bincode(#[from] bincode::Error),
|
||||
#[error("failed to create connect request")]
|
||||
FailedToCreateConnectRequest {
|
||||
source: nym_ip_packet_requests::sign::SignatureError,
|
||||
},
|
||||
}
|
||||
|
||||
// Result type based on our error type
|
||||
|
||||
@@ -261,7 +261,7 @@ where
|
||||
self.handle_forwarding_request(receiver_idx, forward_data)
|
||||
.await
|
||||
}
|
||||
typ @ LpFrameKind::Opaque => {
|
||||
typ @ (LpFrameKind::Opaque | LpFrameKind::Stream) => {
|
||||
// Neither registration nor forwarding - unknown payload type
|
||||
warn!(
|
||||
"Unknown transport payload type from {remote} (receiver_idx={receiver_idx}). dropping {} bytes",
|
||||
|
||||
Generated
+5611
File diff suppressed because it is too large
Load Diff
@@ -33,6 +33,7 @@ nym-credentials-interface = { workspace = true }
|
||||
nym-credential-storage = { workspace = true }
|
||||
nym-credential-utils = { workspace = true }
|
||||
nym-network-defaults = { workspace = true }
|
||||
nym-lp = { workspace = true }
|
||||
nym-sphinx = { workspace = true }
|
||||
nym-statistics-common = { workspace = true }
|
||||
nym-task = { workspace = true }
|
||||
@@ -64,6 +65,10 @@ url = { workspace = true }
|
||||
toml = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
|
||||
nym-ip-packet-requests = { path = "../../../common/ip-packet-requests" }
|
||||
pnet_packet = { workspace = true }
|
||||
nym-config = { path = "../../../common/config" }
|
||||
|
||||
# tcpproxy dependencies
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
anyhow.workspace = true
|
||||
|
||||
@@ -47,7 +47,7 @@ async fn main() {
|
||||
|
||||
// split_sender shares the stream_mode flag
|
||||
println!("\nTesting split_sender (shares stream_mode)");
|
||||
let sender = client.split_sender();
|
||||
let mut sender = client.split_sender();
|
||||
let result = sender
|
||||
.send_plain_message(our_address, "this should also fail")
|
||||
.await;
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
use nym_validator_client::nyxd::error::NyxdError;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use nym_ip_packet_requests::v8::response::{ConnectFailureReason, IpPacketResponseData};
|
||||
use nym_validator_client::nym_api::error::NymAPIError;
|
||||
|
||||
/// Top-level Error enum for the mixnet client and its relevant types.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
@@ -108,6 +111,64 @@ pub enum Error {
|
||||
|
||||
#[error("Stream subsystem failed to initialise: reconstructed_receiver unavailable")]
|
||||
StreamInitFailure,
|
||||
|
||||
#[error("nymsphinx receiver error: {0}")]
|
||||
MessageRecovery(#[from] nym_sphinx::receiver::MessageRecoveryError),
|
||||
|
||||
#[error("client not connected")]
|
||||
IprStreamClientNotConnected,
|
||||
|
||||
#[error("client already connected or connecting")]
|
||||
IprStreamClientAlreadyConnectedOrConnecting,
|
||||
|
||||
#[error("trying to send an anonymous reply but peer surb tag is not set")]
|
||||
MixStreamSurbTagNotSet,
|
||||
|
||||
#[error("trying to send an outgoing message but receipient address is not set")]
|
||||
MixStreamRecipientNotSet,
|
||||
|
||||
#[error("listening for connection response timed out")]
|
||||
IPRConnectResponseTimeout,
|
||||
|
||||
#[error("no next frame: assuming stream is closed")]
|
||||
IPRClientStreamClosed,
|
||||
|
||||
#[error("expected control response, got {0:?}")]
|
||||
UnexpectedResponseType(IpPacketResponseData),
|
||||
|
||||
#[error("connect denied: {0:?}")]
|
||||
ConnectDenied(ConnectFailureReason),
|
||||
|
||||
#[allow(clippy::result_large_err)]
|
||||
#[error("api directory error: {0}")]
|
||||
GatewayDirectoryError(#[from] NymAPIError),
|
||||
|
||||
#[error("did not receive Validator endpoint details")]
|
||||
NoValidatorDetailsAvailable,
|
||||
|
||||
#[error("did not receive URL")]
|
||||
NoValidatorAPIUrl,
|
||||
|
||||
#[error("did not receive NymVPN API URL")]
|
||||
NoNymAPIUrl,
|
||||
|
||||
#[error("no available gateway")]
|
||||
NoGatewayAvailable,
|
||||
|
||||
#[error("no IPR address on selected gateway")]
|
||||
NoIPRAvailable,
|
||||
|
||||
#[error("message version check failed: {0}")]
|
||||
IPRMessageVersionCheckFailed(String),
|
||||
|
||||
#[error("no response id found in connect response")]
|
||||
IPRNoId,
|
||||
|
||||
#[error("Could not find peer address or surb tag")]
|
||||
MixStreamNoPeerOrSurb,
|
||||
|
||||
#[error("No network env specified on new MixStream")]
|
||||
MissingStreamConfig,
|
||||
}
|
||||
|
||||
impl Error {
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
// Copyright 2023-2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
mod connect;
|
||||
mod error;
|
||||
pub mod helpers;
|
||||
pub mod listener;
|
||||
|
||||
pub use connect::IprClientConnect;
|
||||
pub use error::Error;
|
||||
pub use listener::{IprListener, MixnetMessageOutcome};
|
||||
|
||||
// Re-export the currently used version
|
||||
pub use nym_ip_packet_requests::v8 as current;
|
||||
@@ -0,0 +1,7 @@
|
||||
# Modified `nym-ip-packet-client`
|
||||
This set of code is made up of functions from several crates from the `nym-vpn-client` monorepo which had to be imported and modified to avoid a circular dependency on the `nym-sdk` package for use in the `mixnet_stream_wrapper_ipr` module, and is made up of:
|
||||
- a modified version of (basically) the entire `nym-ip-packet-client`
|
||||
- a set of IP Packet helper functions from the `nym-gateway-probe`
|
||||
- a set of helpers & types from the `nym-connection-monitor`
|
||||
|
||||
All of these can be found in [`nym-vpn-client/nym-vpn-core/crates/`](https://github.com/nymtech/nym-vpn-client/tree/develop/nym-vpn-core/crates).
|
||||
@@ -0,0 +1,196 @@
|
||||
// Copyright 2023-2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
pub use crate::mixnet::{
|
||||
InputMessage, MixnetClient, MixnetClientSender, MixnetMessageSender, Recipient,
|
||||
TransmissionLane,
|
||||
};
|
||||
use nym_ip_packet_requests::IpPair;
|
||||
use tokio::time::sleep;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error};
|
||||
|
||||
use super::error::{Error, Result};
|
||||
use crate::{
|
||||
ip_packet_client::current::{
|
||||
request::IpPacketRequest,
|
||||
response::{
|
||||
ConnectResponse, ConnectResponseReply, ControlResponse, IpPacketResponse,
|
||||
IpPacketResponseData,
|
||||
},
|
||||
},
|
||||
ip_packet_client::helpers::check_ipr_message_version,
|
||||
};
|
||||
|
||||
pub type SharedMixnetClient = Arc<tokio::sync::Mutex<Option<MixnetClient>>>;
|
||||
|
||||
const IPR_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
enum ConnectionState {
|
||||
Disconnected,
|
||||
Connecting,
|
||||
Connected,
|
||||
#[allow(unused)]
|
||||
Disconnecting,
|
||||
}
|
||||
|
||||
pub struct IprClientConnect {
|
||||
// During connection we need the mixnet client, but once connected we expect to setup a channel
|
||||
// from the main mixnet listener at the top-level.
|
||||
// As such, we drop the shared mixnet client once we're connected.
|
||||
mixnet_client: SharedMixnetClient,
|
||||
mixnet_sender: MixnetClientSender,
|
||||
connected: ConnectionState,
|
||||
cancel_token: CancellationToken,
|
||||
}
|
||||
|
||||
impl IprClientConnect {
|
||||
pub async fn new(mixnet_client: SharedMixnetClient, cancel_token: CancellationToken) -> Self {
|
||||
let mixnet_sender = mixnet_client.lock().await.as_ref().unwrap().split_sender();
|
||||
Self {
|
||||
mixnet_client,
|
||||
mixnet_sender,
|
||||
connected: ConnectionState::Disconnected,
|
||||
cancel_token,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn connect(&mut self, ip_packet_router_address: Recipient) -> Result<IpPair> {
|
||||
if self.connected != ConnectionState::Disconnected {
|
||||
return Err(Error::AlreadyConnected);
|
||||
}
|
||||
|
||||
tracing::info!("Connecting to exit gateway");
|
||||
self.connected = ConnectionState::Connecting;
|
||||
match self.connect_inner(ip_packet_router_address).await {
|
||||
Ok(ips) => {
|
||||
debug!("Successfully connected to the ip-packet-router");
|
||||
self.connected = ConnectionState::Connected;
|
||||
Ok(ips)
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Failed to connect to the ip-packet-router: {:?}", err);
|
||||
self.connected = ConnectionState::Disconnected;
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn connect_inner(&mut self, ip_packet_router_address: Recipient) -> Result<IpPair> {
|
||||
let request_id = self.send_connect_request(ip_packet_router_address).await?;
|
||||
|
||||
debug!("Waiting for reply...");
|
||||
self.listen_for_connect_response(request_id).await
|
||||
}
|
||||
|
||||
async fn send_connect_request(&mut self, ip_packet_router_address: Recipient) -> Result<u64> {
|
||||
let (request, request_id) = IpPacketRequest::new_connect_request(None);
|
||||
|
||||
// We use 20 surbs for the connect request because typically the IPR is configured to have
|
||||
// a min threshold of 10 surbs that it reserves for itself to request additional surbs.
|
||||
let surbs = 20;
|
||||
self.mixnet_sender
|
||||
.send(create_input_message(
|
||||
ip_packet_router_address,
|
||||
request,
|
||||
surbs,
|
||||
))
|
||||
.await
|
||||
.map_err(|err| Error::SdkError(Box::new(err)))?;
|
||||
|
||||
Ok(request_id)
|
||||
}
|
||||
|
||||
async fn handle_connect_response(&self, response: ConnectResponse) -> Result<IpPair> {
|
||||
debug!("Handling dynamic connect response");
|
||||
match response.reply {
|
||||
ConnectResponseReply::Success(r) => Ok(r.ips),
|
||||
ConnectResponseReply::Failure(reason) => Err(Error::ConnectRequestDenied { reason }),
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_ip_packet_router_response(&self, response: IpPacketResponse) -> Result<IpPair> {
|
||||
let control_response = match response.data {
|
||||
IpPacketResponseData::Control(control_response) => control_response,
|
||||
_ => {
|
||||
error!("Received non-control response while waiting for connect response");
|
||||
return Err(Error::UnexpectedConnectResponse);
|
||||
}
|
||||
};
|
||||
|
||||
match *control_response {
|
||||
ControlResponse::Connect(resp) => self.handle_connect_response(resp).await,
|
||||
response => {
|
||||
error!("Unexpected response: {response:?}");
|
||||
Err(Error::UnexpectedConnectResponse)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn listen_for_connect_response(&self, request_id: u64) -> Result<IpPair> {
|
||||
// Connecting is basically synchronous from the perspective of the mixnet client, so it's safe
|
||||
// to just grab ahold of the mutex and keep it until we get the response.
|
||||
let mut mixnet_client_handle = self.mixnet_client.lock().await;
|
||||
let mixnet_client = mixnet_client_handle.as_mut().unwrap();
|
||||
|
||||
let timeout = sleep(IPR_CONNECT_TIMEOUT);
|
||||
tokio::pin!(timeout);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = self.cancel_token.cancelled() => {
|
||||
error!("Cancelled while waiting for reply to connect request");
|
||||
return Err(Error::Cancelled);
|
||||
},
|
||||
_ = &mut timeout => {
|
||||
error!("Timed out waiting for reply to connect request");
|
||||
return Err(Error::TimeoutWaitingForConnectResponse);
|
||||
},
|
||||
msgs = mixnet_client.wait_for_messages() => match msgs {
|
||||
None => {
|
||||
return Err(Error::NoMixnetMessagesReceived);
|
||||
}
|
||||
Some(msgs) => {
|
||||
for msg in msgs {
|
||||
// Confirm that the version is correct
|
||||
if let Err(err) = check_ipr_message_version(&msg) {
|
||||
tracing::error!("Mixnet message version mismatch: {err}");
|
||||
break;
|
||||
}
|
||||
|
||||
// Then we deserialize the message
|
||||
tracing::debug!("IprClient: got message while waiting for connect response");
|
||||
let Ok(response) = IpPacketResponse::from_reconstructed_message(&msg) else {
|
||||
// This is ok, it's likely just one of our self-pings
|
||||
tracing::debug!("Failed to deserialize mixnet message");
|
||||
continue;
|
||||
};
|
||||
|
||||
if response.id() == Some(request_id) {
|
||||
tracing::debug!("Got response with matching id");
|
||||
return self.handle_ip_packet_router_response(response).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn create_input_message(
|
||||
recipient: Recipient,
|
||||
request: IpPacketRequest,
|
||||
surbs: u32,
|
||||
) -> InputMessage {
|
||||
InputMessage::new_anonymous(
|
||||
recipient,
|
||||
request.to_bytes().unwrap(),
|
||||
surbs,
|
||||
TransmissionLane::General,
|
||||
None,
|
||||
)
|
||||
}
|
||||
@@ -0,0 +1,79 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::ip_packet_client::current::response::ConnectFailureReason;
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("nym sdk")]
|
||||
// SdkError(#[source] Box<nym_sdk::Error>),
|
||||
SdkError(#[source] Box<crate::error::Error>),
|
||||
|
||||
#[error(
|
||||
"received response with version v{received}, the client is too new and can only understand v{expected}"
|
||||
)]
|
||||
ReceivedResponseWithOldVersion { expected: u8, received: u8 },
|
||||
|
||||
#[error(
|
||||
"received response with version v{received}, the client is too old and can only understand v{expected}"
|
||||
)]
|
||||
ReceivedResponseWithNewVersion { expected: u8, received: u8 },
|
||||
|
||||
#[error("got reply for connect request, but it appears intended for the wrong address?")]
|
||||
GotReplyIntendedForWrongAddress,
|
||||
|
||||
#[error("unexpected connect response")]
|
||||
UnexpectedConnectResponse,
|
||||
|
||||
#[error("mixnet client stopped returning responses")]
|
||||
NoMixnetMessagesReceived,
|
||||
|
||||
#[error("timeout waiting for connect response from exit gateway (ipr)")]
|
||||
TimeoutWaitingForConnectResponse,
|
||||
|
||||
#[error("connection cancelled")]
|
||||
Cancelled,
|
||||
|
||||
#[error("connect request denied: {reason}")]
|
||||
ConnectRequestDenied { reason: ConnectFailureReason },
|
||||
|
||||
#[error("failed to get version from message")]
|
||||
NoVersionInMessage,
|
||||
|
||||
#[error("already connected to the mixnet")]
|
||||
AlreadyConnected,
|
||||
|
||||
#[error("failed to create connect request")]
|
||||
FailedToCreateConnectRequest {
|
||||
source: nym_ip_packet_requests::sign::SignatureError,
|
||||
},
|
||||
|
||||
/// Below error types are from the nym-connection-monitor crate
|
||||
#[error(
|
||||
"timeout waiting for mixnet self ping, the entry gateway is not routing our mixnet traffic"
|
||||
)]
|
||||
TimeoutWaitingForMixnetSelfPing,
|
||||
|
||||
#[error("failed to serialize message")]
|
||||
FailedToSerializeMessage {
|
||||
#[from]
|
||||
source: bincode::Error,
|
||||
},
|
||||
|
||||
#[error("failed to create icmp echo request packet")]
|
||||
IcmpEchoRequestPacketCreationFailure,
|
||||
|
||||
#[error("failed to create icmp packet")]
|
||||
IcmpPacketCreationFailure,
|
||||
|
||||
#[error("failed to create ipv4 packet")]
|
||||
Ipv4PacketCreationFailure,
|
||||
}
|
||||
|
||||
impl From<crate::error::Error> for Error {
|
||||
fn from(err: crate::error::Error) -> Self {
|
||||
Error::SdkError(Box::new(err))
|
||||
}
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -0,0 +1,326 @@
|
||||
// Copyright 2023-2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use super::error::{Error, Result};
|
||||
use crate::ip_packet_client::current::VERSION as CURRENT_VERSION;
|
||||
pub use crate::mixnet::ReconstructedMessage;
|
||||
use nym_config::defaults::mixnet_vpn::{NYM_TUN_DEVICE_ADDRESS_V4, NYM_TUN_DEVICE_ADDRESS_V6};
|
||||
|
||||
use nym_ip_packet_requests::IpPair;
|
||||
|
||||
use bytes::Bytes;
|
||||
use pnet_packet::{
|
||||
icmp::{
|
||||
echo_reply::EchoReplyPacket,
|
||||
echo_request::{EchoRequestPacket, MutableEchoRequestPacket},
|
||||
IcmpPacket,
|
||||
},
|
||||
icmpv6,
|
||||
ipv4::{Ipv4Packet, MutableIpv4Packet},
|
||||
ipv6::{Ipv6Packet, MutableIpv6Packet},
|
||||
Packet,
|
||||
};
|
||||
use std::cmp::Ordering;
|
||||
use std::net::{Ipv4Addr, Ipv6Addr};
|
||||
|
||||
/**
|
||||
* This function is from the original nym-ip-packet-client crate.
|
||||
*/
|
||||
pub(crate) fn check_ipr_message_version(message: &ReconstructedMessage) -> Result<()> {
|
||||
// Assuming it's a IPR message, it will have a version as its first byte
|
||||
if let Some(version) = message.message.first() {
|
||||
match version.cmp(&CURRENT_VERSION) {
|
||||
Ordering::Greater => Err(Error::ReceivedResponseWithNewVersion {
|
||||
expected: CURRENT_VERSION,
|
||||
received: *version,
|
||||
}),
|
||||
Ordering::Less => Err(Error::ReceivedResponseWithOldVersion {
|
||||
expected: CURRENT_VERSION,
|
||||
received: *version,
|
||||
}),
|
||||
Ordering::Equal => {
|
||||
// We're good
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Err(Error::NoVersionInMessage)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Functions below are from the nym-connection-monitor crate.
|
||||
*/
|
||||
pub fn create_icmpv4_echo_request(
|
||||
sequence_number: u16,
|
||||
identifier: u16,
|
||||
) -> Result<EchoRequestPacket<'static>> {
|
||||
let buffer = vec![0; 64];
|
||||
let mut icmp_echo_request = MutableEchoRequestPacket::owned(buffer)
|
||||
.ok_or(Error::IcmpEchoRequestPacketCreationFailure)?;
|
||||
|
||||
// Configure the ICMP echo request packet
|
||||
icmp_echo_request.set_identifier(identifier);
|
||||
icmp_echo_request.set_sequence_number(sequence_number);
|
||||
icmp_echo_request.set_icmp_type(pnet_packet::icmp::IcmpTypes::EchoRequest);
|
||||
icmp_echo_request.set_icmp_code(pnet_packet::icmp::IcmpCode::new(0));
|
||||
|
||||
// Calculate checksum once we've set all the fields
|
||||
let icmp_packet =
|
||||
IcmpPacket::new(icmp_echo_request.packet()).ok_or(Error::IcmpPacketCreationFailure)?;
|
||||
let checksum = pnet_packet::icmp::checksum(&icmp_packet);
|
||||
icmp_echo_request.set_checksum(checksum);
|
||||
|
||||
Ok(icmp_echo_request.consume_to_immutable())
|
||||
}
|
||||
|
||||
pub fn create_icmpv6_echo_request(
|
||||
sequence_number: u16,
|
||||
identifier: u16,
|
||||
source: &Ipv6Addr,
|
||||
destination: &Ipv6Addr,
|
||||
) -> Result<icmpv6::echo_request::EchoRequestPacket<'static>> {
|
||||
let buffer = vec![0; 64];
|
||||
// let mut icmp_echo_request = MutableEchoRequestPacket::owned(buffer)
|
||||
let mut icmp_echo_request = icmpv6::echo_request::MutableEchoRequestPacket::owned(buffer)
|
||||
.ok_or(Error::IcmpEchoRequestPacketCreationFailure)?;
|
||||
|
||||
// Configure the ICMP echo request packet
|
||||
icmp_echo_request.set_identifier(identifier);
|
||||
icmp_echo_request.set_sequence_number(sequence_number);
|
||||
icmp_echo_request.set_icmpv6_type(pnet_packet::icmpv6::Icmpv6Types::EchoRequest);
|
||||
icmp_echo_request.set_icmpv6_code(pnet_packet::icmpv6::Icmpv6Code::new(0));
|
||||
|
||||
// Calculate checksum once we've set all the fields
|
||||
let icmp_packet = icmpv6::Icmpv6Packet::new(icmp_echo_request.packet())
|
||||
.ok_or(Error::IcmpPacketCreationFailure)?;
|
||||
let checksum = pnet_packet::icmpv6::checksum(&icmp_packet, source, destination);
|
||||
icmp_echo_request.set_checksum(checksum);
|
||||
|
||||
Ok(icmp_echo_request.consume_to_immutable())
|
||||
}
|
||||
|
||||
pub fn wrap_icmp_in_ipv4(
|
||||
icmp_echo_request: EchoRequestPacket,
|
||||
source: Ipv4Addr,
|
||||
destination: Ipv4Addr,
|
||||
) -> Result<Ipv4Packet> {
|
||||
// 20 bytes for IPv4 header + ICMP payload
|
||||
let total_length = 20 + icmp_echo_request.packet().len();
|
||||
// IPv4 header + ICMP payload
|
||||
let ipv4_buffer = vec![0u8; 20 + icmp_echo_request.packet().len()];
|
||||
let mut ipv4_packet =
|
||||
MutableIpv4Packet::owned(ipv4_buffer).ok_or(Error::Ipv4PacketCreationFailure)?;
|
||||
|
||||
ipv4_packet.set_version(4);
|
||||
ipv4_packet.set_header_length(5);
|
||||
ipv4_packet.set_total_length(total_length as u16);
|
||||
ipv4_packet.set_ttl(64);
|
||||
ipv4_packet.set_next_level_protocol(pnet_packet::ip::IpNextHeaderProtocols::Icmp);
|
||||
ipv4_packet.set_source(source);
|
||||
ipv4_packet.set_destination(destination);
|
||||
ipv4_packet.set_flags(pnet_packet::ipv4::Ipv4Flags::DontFragment);
|
||||
ipv4_packet.set_checksum(0);
|
||||
ipv4_packet.set_payload(icmp_echo_request.packet());
|
||||
|
||||
let ipv4_checksum = compute_ipv4_checksum(&ipv4_packet.to_immutable());
|
||||
ipv4_packet.set_checksum(ipv4_checksum);
|
||||
|
||||
Ok(ipv4_packet.consume_to_immutable())
|
||||
}
|
||||
|
||||
pub fn wrap_icmp_in_ipv6(
|
||||
icmp_echo_request: icmpv6::echo_request::EchoRequestPacket,
|
||||
source: Ipv6Addr,
|
||||
destination: Ipv6Addr,
|
||||
) -> Result<Ipv6Packet> {
|
||||
let ipv6_buffer = vec![0u8; 40 + icmp_echo_request.packet().len()];
|
||||
let mut ipv6_packet =
|
||||
MutableIpv6Packet::owned(ipv6_buffer).ok_or(Error::Ipv4PacketCreationFailure)?;
|
||||
|
||||
ipv6_packet.set_version(6);
|
||||
ipv6_packet.set_payload_length(icmp_echo_request.packet().len() as u16);
|
||||
ipv6_packet.set_next_header(pnet_packet::ip::IpNextHeaderProtocols::Icmpv6);
|
||||
ipv6_packet.set_hop_limit(64);
|
||||
ipv6_packet.set_source(source);
|
||||
ipv6_packet.set_destination(destination);
|
||||
ipv6_packet.set_payload(icmp_echo_request.packet());
|
||||
|
||||
Ok(ipv6_packet.consume_to_immutable())
|
||||
}
|
||||
|
||||
// Compute IPv4 checksum: sum all 16-bit words, add carry, take one's complement
|
||||
pub(crate) fn compute_ipv4_checksum(header: &Ipv4Packet) -> u16 {
|
||||
// Header length in 16-bit words
|
||||
let len = header.get_header_length() as usize * 2;
|
||||
let mut sum = 0u32;
|
||||
|
||||
for i in 0..len {
|
||||
let word = ((header.packet()[2 * i] as u32) << 8) | header.packet()[2 * i + 1] as u32;
|
||||
sum += word;
|
||||
}
|
||||
|
||||
// Add the carry
|
||||
while (sum >> 16) > 0 {
|
||||
sum = (sum & 0xFFFF) + (sum >> 16);
|
||||
}
|
||||
|
||||
// One's complement
|
||||
!sum as u16
|
||||
}
|
||||
|
||||
pub(crate) fn is_icmp_echo_reply(packet: &Bytes) -> Option<(u16, Ipv4Addr, Ipv4Addr)> {
|
||||
if let Some(ipv4_packet) = Ipv4Packet::new(packet) {
|
||||
if let Some(icmp_packet) = IcmpPacket::new(ipv4_packet.payload()) {
|
||||
if let Some(echo_reply) = EchoReplyPacket::new(icmp_packet.packet()) {
|
||||
return Some((
|
||||
echo_reply.get_identifier(),
|
||||
ipv4_packet.get_source(),
|
||||
ipv4_packet.get_destination(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub(crate) fn is_icmp_v6_echo_reply(packet: &Bytes) -> Option<(u16, Ipv6Addr, Ipv6Addr)> {
|
||||
if let Some(ipv6_packet) = Ipv6Packet::new(packet) {
|
||||
if let Some(icmp_packet) = IcmpPacket::new(ipv6_packet.payload()) {
|
||||
if let Some(echo_reply) =
|
||||
pnet_packet::icmpv6::echo_reply::EchoReplyPacket::new(icmp_packet.packet())
|
||||
{
|
||||
return Some((
|
||||
echo_reply.get_identifier(),
|
||||
ipv6_packet.get_source(),
|
||||
ipv6_packet.get_destination(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/**
|
||||
* Types and functions below are from the nym-connection-monitor crate.
|
||||
* The `send_ping_v4` + `_v6` functions have been modified to work with the IPMixStream wrapper instead of relying on a shared MixnetClient.
|
||||
*/
|
||||
#[derive(Debug)]
|
||||
pub enum ConnectionStatusEvent {
|
||||
MixnetSelfPing,
|
||||
Icmpv4IprTunDevicePingReply,
|
||||
Icmpv6IprTunDevicePingReply,
|
||||
Icmpv4IprExternalPingReply,
|
||||
Icmpv6IprExternalPingReply,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct IpPingReplies {
|
||||
pub ipr_tun_ip_v4: bool,
|
||||
pub ipr_tun_ip_v6: bool,
|
||||
pub external_ip_v4: bool,
|
||||
pub external_ip_v6: bool,
|
||||
}
|
||||
|
||||
impl IpPingReplies {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn register_event(&mut self, event: &ConnectionStatusEvent) {
|
||||
match event {
|
||||
ConnectionStatusEvent::MixnetSelfPing => {}
|
||||
ConnectionStatusEvent::Icmpv4IprTunDevicePingReply => self.ipr_tun_ip_v4 = true,
|
||||
ConnectionStatusEvent::Icmpv6IprTunDevicePingReply => self.ipr_tun_ip_v6 = true,
|
||||
ConnectionStatusEvent::Icmpv4IprExternalPingReply => self.external_ip_v4 = true,
|
||||
ConnectionStatusEvent::Icmpv6IprExternalPingReply => self.external_ip_v6 = true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum IcmpBeaconReply {
|
||||
TunDeviceReply,
|
||||
ExternalPingReply(Ipv4Addr),
|
||||
}
|
||||
|
||||
pub enum Icmpv6BeaconReply {
|
||||
TunDeviceReply,
|
||||
ExternalPingReply(Ipv6Addr),
|
||||
}
|
||||
|
||||
pub fn icmp_identifier() -> u16 {
|
||||
8475
|
||||
}
|
||||
|
||||
// TODO: send_ping_v4 and send_ping_v6 removed temporarily — will be re-added
|
||||
// when IpMixStream is rebuilt on top of MixnetStream + LP frame envelope
|
||||
|
||||
pub fn check_for_icmp_beacon_reply(
|
||||
packet: &Bytes,
|
||||
icmp_beacon_identifier: u16,
|
||||
our_ips: IpPair,
|
||||
) -> Option<ConnectionStatusEvent> {
|
||||
match is_icmp_beacon_reply(packet, icmp_beacon_identifier, our_ips.ipv4) {
|
||||
Some(IcmpBeaconReply::TunDeviceReply) => {
|
||||
tracing::debug!("Received ping response from ipr tun device");
|
||||
return Some(ConnectionStatusEvent::Icmpv4IprTunDevicePingReply);
|
||||
}
|
||||
Some(IcmpBeaconReply::ExternalPingReply(_source)) => {
|
||||
tracing::debug!("Received ping response from an external ip through the ipr");
|
||||
return Some(ConnectionStatusEvent::Icmpv4IprExternalPingReply);
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
|
||||
match is_icmp_v6_beacon_reply(packet, icmp_beacon_identifier, our_ips.ipv6) {
|
||||
Some(Icmpv6BeaconReply::TunDeviceReply) => {
|
||||
tracing::debug!("Received ping v6 response from ipr tun device");
|
||||
return Some(ConnectionStatusEvent::Icmpv6IprTunDevicePingReply);
|
||||
}
|
||||
Some(Icmpv6BeaconReply::ExternalPingReply(_source)) => {
|
||||
tracing::debug!("Received ping v6 response from an external ip through the ipr");
|
||||
return Some(ConnectionStatusEvent::Icmpv6IprExternalPingReply);
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
pub fn is_icmp_beacon_reply(
|
||||
packet: &Bytes,
|
||||
identifier: u16,
|
||||
destination: Ipv4Addr,
|
||||
) -> Option<IcmpBeaconReply> {
|
||||
if let Some((reply_identifier, reply_source, reply_destination)) = is_icmp_echo_reply(packet) {
|
||||
if reply_identifier == identifier && reply_destination == destination {
|
||||
if reply_source == NYM_TUN_DEVICE_ADDRESS_V4 {
|
||||
return Some(IcmpBeaconReply::TunDeviceReply);
|
||||
} else {
|
||||
// For external replies, we check if the source is NOT the TUN device
|
||||
// and NOT our own IP (since external hosts reply from their own IPs)
|
||||
return Some(IcmpBeaconReply::ExternalPingReply(reply_source));
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub fn is_icmp_v6_beacon_reply(
|
||||
packet: &Bytes,
|
||||
identifier: u16,
|
||||
destination: Ipv6Addr,
|
||||
) -> Option<Icmpv6BeaconReply> {
|
||||
if let Some((reply_identifier, reply_source, reply_destination)) = is_icmp_v6_echo_reply(packet)
|
||||
{
|
||||
if reply_identifier == identifier && reply_destination == destination {
|
||||
if reply_source == NYM_TUN_DEVICE_ADDRESS_V6 {
|
||||
return Some(Icmpv6BeaconReply::TunDeviceReply);
|
||||
} else {
|
||||
// For external replies, check if source is NOT the TUN device
|
||||
return Some(Icmpv6BeaconReply::ExternalPingReply(reply_source));
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
@@ -0,0 +1,133 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::StreamExt;
|
||||
use nym_ip_packet_requests::{codec::MultiIpPacketCodec, v8::response::ControlResponse};
|
||||
use nym_sphinx::receiver::ReconstructedMessage;
|
||||
use tokio_util::codec::FramedRead;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::{
|
||||
ip_packet_client::current::{
|
||||
request::{ControlRequest, IpPacketRequest, IpPacketRequestData},
|
||||
response::{InfoLevel, IpPacketResponse, IpPacketResponseData},
|
||||
},
|
||||
ip_packet_client::helpers::check_ipr_message_version,
|
||||
};
|
||||
|
||||
pub enum MixnetMessageOutcome {
|
||||
IpPackets(Vec<Bytes>),
|
||||
MixnetSelfPing,
|
||||
Disconnect,
|
||||
}
|
||||
|
||||
pub struct IprListener {}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum IprListenerError {
|
||||
#[error(transparent)]
|
||||
IprClientError(#[from] crate::Error),
|
||||
}
|
||||
|
||||
impl From<super::error::Error> for IprListenerError {
|
||||
fn from(err: super::error::Error) -> Self {
|
||||
match err {
|
||||
super::error::Error::SdkError(sdk_err) => IprListenerError::IprClientError(*sdk_err),
|
||||
other => IprListenerError::IprClientError(crate::Error::new_unsupported(format!(
|
||||
"IP packet error: {}",
|
||||
other
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IprListener {
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
|
||||
fn is_mix_ping(&self, request: &IpPacketRequest) -> bool {
|
||||
match request.data {
|
||||
IpPacketRequestData::Control(ref control) => {
|
||||
matches!(**control, ControlRequest::Ping(_))
|
||||
}
|
||||
_ => {
|
||||
debug!("Received unexpected request: {request:?}");
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle_reconstructed_message(
|
||||
&mut self,
|
||||
message: ReconstructedMessage,
|
||||
) -> Result<Option<MixnetMessageOutcome>, IprListenerError> {
|
||||
check_ipr_message_version(&message)?;
|
||||
|
||||
match IpPacketResponse::from_reconstructed_message(&message) {
|
||||
Ok(response) => {
|
||||
match response.data {
|
||||
IpPacketResponseData::Data(data_response) => {
|
||||
// Un-bundle the mixnet message and send the individual IP packets
|
||||
// to the tun device
|
||||
let framed_reader = FramedRead::new(
|
||||
data_response.ip_packet.as_ref(),
|
||||
MultiIpPacketCodec::new(),
|
||||
);
|
||||
let responses: Vec<Bytes> = framed_reader
|
||||
.filter_map(|res| async { res.ok().map(|packet| packet.into_bytes()) })
|
||||
.collect()
|
||||
.await;
|
||||
return Ok(Some(MixnetMessageOutcome::IpPackets(responses)));
|
||||
}
|
||||
IpPacketResponseData::Control(control_response) => match *control_response {
|
||||
ControlResponse::Connect(_) => {
|
||||
info!("Received connect response when already connected - ignoring");
|
||||
}
|
||||
ControlResponse::Disconnect(_) => {
|
||||
info!("Received disconnect response");
|
||||
return Ok(Some(MixnetMessageOutcome::Disconnect));
|
||||
}
|
||||
ControlResponse::UnrequestedDisconnect(_) => {
|
||||
info!("Received unrequested disconnect response, ignoring for now");
|
||||
}
|
||||
ControlResponse::Pong(_) => {
|
||||
info!("Received pong response, ignoring for now");
|
||||
}
|
||||
ControlResponse::Health(_) => {
|
||||
info!("Received health response, ignoring for now");
|
||||
}
|
||||
ControlResponse::Info(info) => {
|
||||
let msg =
|
||||
format!("Received info response from the mixnet: {}", info.reply);
|
||||
match info.level {
|
||||
InfoLevel::Info => info!("{msg}"),
|
||||
InfoLevel::Warn => warn!("{msg}"),
|
||||
InfoLevel::Error => error!("{msg}"),
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
// The exception to when we are not expecting a response, is when we
|
||||
// are sending a ping to ourselves.
|
||||
if let Ok(request) = IpPacketRequest::from_reconstructed_message(&message) {
|
||||
if self.is_mix_ping(&request) {
|
||||
return Ok(Some(MixnetMessageOutcome::MixnetSelfPing));
|
||||
}
|
||||
} else {
|
||||
warn!("Failed to deserialize reconstructed message: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for IprListener {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
//! High-level IPR (IP Packet Router) stream wrapper.
|
||||
//!
|
||||
//! [`IpMixStream`] tunnels IP packets through the Nym mixnet to an exit
|
||||
//! gateway running an IP Packet Router. Both requests and responses are
|
||||
//! wrapped in LP Stream frames for type-safe detection at the IPR and
|
||||
//! dispatch by the client's stream router.
|
||||
|
||||
mod ip_mix_stream;
|
||||
pub mod network_env;
|
||||
|
||||
pub use ip_mix_stream::{ConnectionState, IpMixStream, IprWithPerformance};
|
||||
pub use network_env::NetworkEnvironment;
|
||||
@@ -0,0 +1,79 @@
|
||||
# IpMixStream Architecture
|
||||
|
||||
## Overview
|
||||
|
||||
`IpMixStream` tunnels IP packets through the Nym mixnet to an IP Packet Router
|
||||
(IPR) exit gateway. It provides a high-level API over a single `MixnetStream`,
|
||||
which handles LP Stream framing and Sphinx packet transport automatically.
|
||||
|
||||
## Data Flow
|
||||
|
||||
```text
|
||||
Client IPR (exit gateway)
|
||||
------ ------------------
|
||||
IpMixStream.send_ip_packet(bytes)
|
||||
IpPacketRequest.to_bytes()
|
||||
MixnetStream.write()
|
||||
LP Stream frame
|
||||
Sphinx packets
|
||||
mixnet ──────────────────> on_reconstructed_message()
|
||||
detect LpFrameKind::Stream
|
||||
strip LP header
|
||||
parse IpPacketRequest
|
||||
write IP packet to TUN
|
||||
──> internet
|
||||
|
||||
internet response arrives on TUN
|
||||
ConnectedClientHandler
|
||||
wrap in IpPacketResponse
|
||||
wrap in LP Stream frame
|
||||
mixnet <────────────────── send via Sphinx/SURBs
|
||||
|
||||
stream router dispatches
|
||||
by stream_id
|
||||
MixnetStream.read()
|
||||
IprListener parses response
|
||||
IpMixStream.handle_incoming()
|
||||
returns Vec<ip_packet_bytes>
|
||||
```
|
||||
|
||||
## Layer Stack
|
||||
|
||||
```text
|
||||
IpMixStream IPR protocol (connect, data, disconnect)
|
||||
MixnetStream AsyncRead + AsyncWrite, LP Stream framing, seq numbers
|
||||
Stream Router Dispatches inbound messages by stream_id
|
||||
MixnetClient Sphinx packet encryption, SURB management
|
||||
Mixnet Entry GW -> Mix1 -> Mix2 -> Mix3 -> Exit GW
|
||||
```
|
||||
|
||||
## LP Stream Framing
|
||||
|
||||
All messages between client and IPR are wrapped in LP Stream frames:
|
||||
|
||||
- **Client -> IPR**: `MixnetStream.write()` wraps each write in an LP Stream
|
||||
Data frame (stream_id, sequence number, payload). The IPR detects
|
||||
`LpFrameKind::Stream` and strips the header before processing.
|
||||
|
||||
- **IPR -> Client**: Both inline responses (connect handshake, pong) and async
|
||||
TUN responses are wrapped in LP Stream frames with the same stream_id. The
|
||||
client's stream router dispatches by stream_id to the correct `MixnetStream`.
|
||||
|
||||
## Connection Lifecycle
|
||||
|
||||
1. `IpMixStream::new(env)` -- discover IPR, connect MixnetClient, open MixnetStream
|
||||
2. `connect_tunnel()` -- send connect request, receive allocated IPs
|
||||
3. `send_ip_packet()` / `handle_incoming()` -- steady-state data transfer
|
||||
4. `disconnect_stream()` -- tear down MixnetClient
|
||||
|
||||
## Key Design Decisions
|
||||
|
||||
- **MixnetStream over MixnetClient**: One stream per IPR tunnel. LP framing is
|
||||
handled by MixnetStream internally, no manual frame construction needed.
|
||||
|
||||
- **Multiplexing at IP layer**: Different remote hosts are addressed by IP
|
||||
packet destination headers, not by opening multiple streams.
|
||||
|
||||
- **stream_id threading**: The IPR stores stream_id in each client's
|
||||
`ConnectedClientHandler` so async TUN responses are wrapped in matching LP
|
||||
Stream frames for correct dispatch at the client.
|
||||
@@ -0,0 +1,507 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use super::network_env::NetworkEnvironment;
|
||||
use crate::ip_packet_client::{
|
||||
helpers::check_ipr_message_version, IprListener, MixnetMessageOutcome,
|
||||
};
|
||||
use crate::mixnet::{MixnetClient, MixnetStream, Recipient};
|
||||
use crate::Error;
|
||||
|
||||
use bytes::Bytes;
|
||||
use nym_crypto::asymmetric::ed25519;
|
||||
use nym_ip_packet_requests::{
|
||||
v8::{
|
||||
request::IpPacketRequest,
|
||||
response::{ConnectResponseReply, ControlResponse, IpPacketResponse, IpPacketResponseData},
|
||||
},
|
||||
IpPair,
|
||||
};
|
||||
use nym_network_defaults::ApiUrl;
|
||||
use nym_sphinx::receiver::ReconstructedMessage;
|
||||
use nym_validator_client::nym_api::NymApiClientExt;
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
const IPR_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
/// Maximum size for a single IPR response read from the stream.
|
||||
/// IPR responses fit within one Sphinx packet payload (~1.8 KB) so 64 KB
|
||||
/// provides ample headroom.
|
||||
const READ_BUF_SIZE: usize = 64 * 1024;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct IprWithPerformance {
|
||||
pub(crate) address: Recipient,
|
||||
pub(crate) identity: ed25519::PublicKey,
|
||||
pub(crate) performance: u8,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum ConnectionState {
|
||||
Disconnected,
|
||||
Connecting,
|
||||
Connected,
|
||||
}
|
||||
|
||||
#[allow(clippy::result_large_err)]
|
||||
fn create_nym_api_client(nym_api_urls: Vec<ApiUrl>) -> Result<nym_http_api_client::Client, Error> {
|
||||
let user_agent = format!("nym-sdk/{}", env!("CARGO_PKG_VERSION"));
|
||||
|
||||
let urls = nym_api_urls
|
||||
.into_iter()
|
||||
.map(|url| url.url.parse())
|
||||
.collect::<Result<Vec<nym_http_api_client::Url>, _>>()
|
||||
.map_err(|err| {
|
||||
error!("malformed nym-api url: {err}");
|
||||
Error::NoNymAPIUrl
|
||||
})?;
|
||||
|
||||
if urls.is_empty() {
|
||||
return Err(Error::NoNymAPIUrl);
|
||||
}
|
||||
|
||||
let client = nym_http_api_client::ClientBuilder::new_with_urls(urls)?
|
||||
.with_user_agent(user_agent)
|
||||
.build()?;
|
||||
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
async fn retrieve_exit_nodes_with_performance(
|
||||
client: nym_http_api_client::Client,
|
||||
) -> Result<Vec<IprWithPerformance>, Error> {
|
||||
let all_nodes = client
|
||||
.get_all_described_nodes()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|described| (described.ed25519_identity_key(), described))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let exit_gateways = client.get_all_basic_nodes_with_metadata().await?.nodes;
|
||||
|
||||
let mut described = Vec::new();
|
||||
|
||||
for exit in exit_gateways {
|
||||
if let Some(ipr_info) = all_nodes
|
||||
.get(&exit.ed25519_identity_pubkey)
|
||||
.and_then(|n| n.description.ip_packet_router.clone())
|
||||
{
|
||||
if let Ok(parsed_address) = ipr_info.address.parse() {
|
||||
described.push(IprWithPerformance {
|
||||
address: parsed_address,
|
||||
identity: exit.ed25519_identity_pubkey,
|
||||
performance: exit.performance.round_to_integer(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(described)
|
||||
}
|
||||
|
||||
async fn get_random_ipr(client: nym_http_api_client::Client) -> Result<Recipient, Error> {
|
||||
let nodes = retrieve_exit_nodes_with_performance(client).await?;
|
||||
info!("Found {} Exit Gateways", nodes.len());
|
||||
|
||||
let selected_gateway = nodes
|
||||
.into_iter()
|
||||
.max_by_key(|gw| gw.performance)
|
||||
.ok_or_else(|| Error::NoGatewayAvailable)?;
|
||||
|
||||
let ipr_address = selected_gateway.address;
|
||||
|
||||
info!(
|
||||
"Using IPR: {} (Gateway: {}, Performance: {:?})",
|
||||
ipr_address, selected_gateway.identity, selected_gateway.performance
|
||||
);
|
||||
|
||||
Ok(ipr_address)
|
||||
}
|
||||
|
||||
/// A bidirectional tunnel for sending and receiving IP packets through the mixnet.
|
||||
///
|
||||
/// Wraps a [`MixnetStream`] (opened to an IPR exit gateway) and provides a
|
||||
/// high-level API for the IPR protocol. The underlying `MixnetStream` handles
|
||||
/// LP Stream framing and stream multiplexing automatically.
|
||||
///
|
||||
/// # Data flow
|
||||
///
|
||||
/// ```text
|
||||
/// IpMixStream.send_ip_packet(bytes)
|
||||
/// → IpPacketRequest::to_bytes() → MixnetStream.write()
|
||||
/// → LP Stream frame (stream_id, seq, Data)
|
||||
/// → Sphinx packets → mixnet → IPR
|
||||
///
|
||||
/// IPR processes request → TUN → internet → response
|
||||
/// → IPR wraps in LP Stream frame → Sphinx → mixnet → client
|
||||
/// → stream router dispatches by stream_id
|
||||
/// → MixnetStream.read() → IpPacketResponse bytes
|
||||
/// → IprListener → extract IP packets
|
||||
/// ```
|
||||
pub struct IpMixStream {
|
||||
/// The underlying multiplexed stream to the IPR gateway.
|
||||
stream: MixnetStream,
|
||||
/// Kept for `nym_address()` and `disconnect()`.
|
||||
client: MixnetClient,
|
||||
/// Parses incoming IPR protocol responses.
|
||||
listener: IprListener,
|
||||
read_buf: Vec<u8>,
|
||||
allocated_ips: Option<IpPair>,
|
||||
connection_state: ConnectionState,
|
||||
}
|
||||
|
||||
impl IpMixStream {
|
||||
/// Create a new IP packet router stream connected to the mixnet.
|
||||
///
|
||||
/// Discovers an IPR gateway, connects a MixnetClient, and opens a
|
||||
/// `MixnetStream` to the IPR. Call [`connect_tunnel`](Self::connect_tunnel)
|
||||
/// to establish the IP tunnel.
|
||||
pub async fn new(env: NetworkEnvironment) -> Result<Self, Error> {
|
||||
let network_defaults = env.network_defaults();
|
||||
let api_client = create_nym_api_client(network_defaults.nym_api_urls.unwrap_or_default())?;
|
||||
let ipr_address = get_random_ipr(api_client).await?;
|
||||
|
||||
nym_network_defaults::setup_env(Some(env.env_file_path()));
|
||||
let mut client = MixnetClient::connect_new().await?;
|
||||
|
||||
// Open a stream to the IPR — this sends the LP Stream Open handshake
|
||||
// and starts the background stream router.
|
||||
let stream = client.open_stream(ipr_address, Some(10)).await?;
|
||||
|
||||
Ok(Self {
|
||||
stream,
|
||||
client,
|
||||
listener: IprListener::new(),
|
||||
read_buf: vec![0u8; READ_BUF_SIZE],
|
||||
allocated_ips: None,
|
||||
connection_state: ConnectionState::Disconnected,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the Nym network address of this stream.
|
||||
pub fn nym_address(&self) -> &Recipient {
|
||||
self.client.nym_address()
|
||||
}
|
||||
|
||||
/// Establish tunnel connection with the IPR and allocate IP addresses.
|
||||
pub async fn connect_tunnel(&mut self) -> Result<IpPair, Error> {
|
||||
if self.connection_state != ConnectionState::Disconnected {
|
||||
return Err(Error::IprStreamClientAlreadyConnectedOrConnecting);
|
||||
}
|
||||
|
||||
self.connection_state = ConnectionState::Connecting;
|
||||
info!("Connecting to IP packet router");
|
||||
|
||||
match self.connect_inner().await {
|
||||
Ok(ip_pair) => {
|
||||
self.allocated_ips = Some(ip_pair);
|
||||
self.connection_state = ConnectionState::Connected;
|
||||
info!(
|
||||
"Connected to IPv4: {}, IPv6: {}",
|
||||
ip_pair.ipv4, ip_pair.ipv6
|
||||
);
|
||||
Ok(ip_pair)
|
||||
}
|
||||
Err(e) => {
|
||||
self.connection_state = ConnectionState::Disconnected;
|
||||
error!("Failed to connect: {:?}", e);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn connect_inner(&mut self) -> Result<IpPair, Error> {
|
||||
let (request, request_id) = IpPacketRequest::new_connect_request(None);
|
||||
debug!("Sending connect request with ID: {}", request_id);
|
||||
|
||||
let request_bytes = request.to_bytes()?;
|
||||
self.stream
|
||||
.write_all(&request_bytes)
|
||||
.await
|
||||
.map_err(|_| Error::MessageSendingFailure)?;
|
||||
|
||||
self.listen_for_connect_response(request_id).await
|
||||
}
|
||||
|
||||
async fn listen_for_connect_response(&mut self, request_id: u64) -> Result<IpPair, Error> {
|
||||
let timeout = tokio::time::sleep(IPR_CONNECT_TIMEOUT);
|
||||
tokio::pin!(timeout);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut timeout => {
|
||||
return Err(Error::IPRConnectResponseTimeout);
|
||||
}
|
||||
result = self.stream.read(&mut self.read_buf) => {
|
||||
match result {
|
||||
Ok(0) => return Err(Error::IPRClientStreamClosed),
|
||||
Ok(n) => {
|
||||
let msg = ReconstructedMessage {
|
||||
message: self.read_buf[..n].to_vec(),
|
||||
sender_tag: None,
|
||||
};
|
||||
if let Err(e) = check_ipr_message_version(&msg) {
|
||||
return Err(Error::IPRMessageVersionCheckFailed(e.to_string()));
|
||||
}
|
||||
if let Ok(response) = IpPacketResponse::from_reconstructed_message(&msg) {
|
||||
if response.id() == Some(request_id) {
|
||||
return self.handle_connect_response(response);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => return Err(Error::IPRClientStreamClosed),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_connect_response(&self, response: IpPacketResponse) -> Result<IpPair, Error> {
|
||||
let control_response = match response.data {
|
||||
IpPacketResponseData::Control(c) => c,
|
||||
other => return Err(Error::UnexpectedResponseType(other)),
|
||||
};
|
||||
|
||||
match *control_response {
|
||||
ControlResponse::Connect(connect_resp) => match connect_resp.reply {
|
||||
ConnectResponseReply::Success(success) => Ok(success.ips),
|
||||
ConnectResponseReply::Failure(reason) => Err(Error::ConnectDenied(reason)),
|
||||
},
|
||||
_ => Err(Error::UnexpectedResponseType(
|
||||
IpPacketResponseData::Control(control_response.clone()),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Send an IP packet through the tunnel.
|
||||
pub async fn send_ip_packet(&mut self, packet: &[u8]) -> Result<(), Error> {
|
||||
if self.connection_state != ConnectionState::Connected {
|
||||
return Err(Error::IprStreamClientNotConnected);
|
||||
}
|
||||
let request = IpPacketRequest::new_data_request(packet.to_vec().into());
|
||||
let request_bytes = request.to_bytes()?;
|
||||
self.stream
|
||||
.write_all(&request_bytes)
|
||||
.await
|
||||
.map_err(|_| Error::MessageSendingFailure)
|
||||
}
|
||||
|
||||
/// Handle incoming messages from the mixnet.
|
||||
///
|
||||
/// Reads from the underlying `MixnetStream`, parses IPR responses, and
|
||||
/// extracts IP packets. Returns an empty vec on timeout (10 s).
|
||||
pub async fn handle_incoming(&mut self) -> Result<Vec<Bytes>, Error> {
|
||||
match tokio::time::timeout(
|
||||
Duration::from_secs(10),
|
||||
self.stream.read(&mut self.read_buf),
|
||||
)
|
||||
.await
|
||||
{
|
||||
// Timeout — no data yet, not an error
|
||||
Err(_) => Ok(Vec::new()),
|
||||
// EOF — stream router shut down, channel dead
|
||||
Ok(Ok(0)) => {
|
||||
self.connection_state = ConnectionState::Disconnected;
|
||||
Err(Error::IPRClientStreamClosed)
|
||||
}
|
||||
// IO error
|
||||
Ok(Err(_)) => {
|
||||
self.connection_state = ConnectionState::Disconnected;
|
||||
Err(Error::IPRClientStreamClosed)
|
||||
}
|
||||
Ok(Ok(n)) => {
|
||||
let msg = ReconstructedMessage {
|
||||
message: self.read_buf[..n].to_vec(),
|
||||
sender_tag: None,
|
||||
};
|
||||
match self.listener.handle_reconstructed_message(msg).await {
|
||||
Ok(Some(MixnetMessageOutcome::IpPackets(packets))) => {
|
||||
debug!("Extracted {} IP packets", packets.len());
|
||||
Ok(packets)
|
||||
}
|
||||
Ok(Some(MixnetMessageOutcome::Disconnect)) => {
|
||||
info!("Received disconnect");
|
||||
self.connection_state = ConnectionState::Disconnected;
|
||||
self.allocated_ips = None;
|
||||
Ok(Vec::new())
|
||||
}
|
||||
Ok(Some(MixnetMessageOutcome::MixnetSelfPing)) => {
|
||||
debug!("Received mixnet self ping");
|
||||
Ok(Vec::new())
|
||||
}
|
||||
Ok(None) => Ok(Vec::new()),
|
||||
Err(e) => {
|
||||
error!("Failed to handle message: {}", e);
|
||||
Ok(Vec::new())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn allocated_ips(&self) -> Option<&IpPair> {
|
||||
self.allocated_ips.as_ref()
|
||||
}
|
||||
|
||||
pub fn is_connected(&self) -> bool {
|
||||
self.connection_state == ConnectionState::Connected
|
||||
}
|
||||
|
||||
/// Disconnect from the Mixnet. Disconnected clients cannot be reconnected.
|
||||
pub async fn disconnect_stream(self) {
|
||||
debug!("Disconnecting");
|
||||
self.client.disconnect().await;
|
||||
debug!("Disconnected");
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::ip_packet_client::helpers::{
|
||||
icmp_identifier, is_icmp_echo_reply, is_icmp_v6_echo_reply,
|
||||
};
|
||||
use std::net::{Ipv4Addr, Ipv6Addr};
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore]
|
||||
async fn connect_to_ipr() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let mut stream = IpMixStream::new(NetworkEnvironment::Mainnet).await?;
|
||||
let ip_pair = stream.connect_tunnel().await?;
|
||||
|
||||
let ipv4: Ipv4Addr = ip_pair.ipv4;
|
||||
assert!(!ipv4.is_unspecified(), "IPv4 address should not be 0.0.0.0");
|
||||
|
||||
let ipv6: Ipv6Addr = ip_pair.ipv6;
|
||||
assert!(!ipv6.is_unspecified(), "IPv6 address should not be ::");
|
||||
|
||||
assert!(stream.is_connected(), "Stream should be connected");
|
||||
assert!(
|
||||
stream.allocated_ips().is_some(),
|
||||
"Should have allocated IPs"
|
||||
);
|
||||
|
||||
stream.disconnect_stream().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore]
|
||||
async fn dns_ping_checks() -> Result<(), Box<dyn std::error::Error>> {
|
||||
use crate::ip_packet_client::helpers::{
|
||||
create_icmpv4_echo_request, create_icmpv6_echo_request, wrap_icmp_in_ipv4,
|
||||
wrap_icmp_in_ipv6,
|
||||
};
|
||||
use nym_ip_packet_requests::codec::MultiIpPacketCodec;
|
||||
use pnet_packet::Packet;
|
||||
|
||||
let mut stream = IpMixStream::new(NetworkEnvironment::Mainnet).await?;
|
||||
let ip_pair = stream.connect_tunnel().await?;
|
||||
|
||||
info!(
|
||||
"Connected with IPs - IPv4: {}, IPv6: {}",
|
||||
ip_pair.ipv4, ip_pair.ipv6
|
||||
);
|
||||
|
||||
let external_v4_targets = vec![
|
||||
("Google DNS", Ipv4Addr::new(8, 8, 8, 8)),
|
||||
("Cloudflare DNS", Ipv4Addr::new(1, 1, 1, 1)),
|
||||
("Quad9 DNS", Ipv4Addr::new(9, 9, 9, 9)),
|
||||
];
|
||||
|
||||
let external_v6_targets = vec![
|
||||
("Google DNS", "2001:4860:4860::8888".parse::<Ipv6Addr>()?),
|
||||
(
|
||||
"Cloudflare DNS",
|
||||
"2606:4700:4700::1111".parse::<Ipv6Addr>()?,
|
||||
),
|
||||
("Quad9 DNS", "2620:fe::fe".parse::<Ipv6Addr>()?),
|
||||
];
|
||||
|
||||
let identifier = icmp_identifier();
|
||||
let mut successful_v4_pings = 0;
|
||||
let mut total_v4_pings = 0;
|
||||
let mut successful_v6_pings = 0;
|
||||
let mut total_v6_pings = 0;
|
||||
|
||||
for (name, target) in &external_v4_targets {
|
||||
info!("Testing IPv4 connectivity to {} ({})", name, target);
|
||||
|
||||
for seq in 0..3 {
|
||||
let icmp = create_icmpv4_echo_request(seq, identifier)?;
|
||||
let ipv4_packet = wrap_icmp_in_ipv4(icmp, ip_pair.ipv4, *target)?;
|
||||
let bundled =
|
||||
MultiIpPacketCodec::bundle_one_packet(ipv4_packet.packet().to_vec().into());
|
||||
stream.send_ip_packet(&bundled).await?;
|
||||
total_v4_pings += 1;
|
||||
}
|
||||
}
|
||||
|
||||
for (name, target) in &external_v6_targets {
|
||||
info!("Testing IPv6 connectivity to {} ({})", name, target);
|
||||
|
||||
for seq in 0..3 {
|
||||
let icmp = create_icmpv6_echo_request(seq, identifier, &ip_pair.ipv6, target)?;
|
||||
let ipv6_packet = wrap_icmp_in_ipv6(icmp, ip_pair.ipv6, *target)?;
|
||||
let bundled =
|
||||
MultiIpPacketCodec::bundle_one_packet(ipv6_packet.packet().to_vec().into());
|
||||
stream.send_ip_packet(&bundled).await?;
|
||||
total_v6_pings += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let collect_timeout = tokio::time::sleep(Duration::from_secs(10));
|
||||
tokio::pin!(collect_timeout);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut collect_timeout => {
|
||||
info!("Finished collecting replies");
|
||||
break;
|
||||
}
|
||||
result = stream.handle_incoming() => {
|
||||
if let Ok(packets) = result {
|
||||
for packet in packets {
|
||||
if let Some((reply_id, _source, dest)) = is_icmp_echo_reply(&packet) {
|
||||
if reply_id == identifier && dest == ip_pair.ipv4 {
|
||||
successful_v4_pings += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some((reply_id, _source, dest)) = is_icmp_v6_echo_reply(&packet) {
|
||||
if reply_id == identifier && dest == ip_pair.ipv6 {
|
||||
successful_v6_pings += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let v4_success_rate = (successful_v4_pings as f64 / total_v4_pings as f64) * 100.0;
|
||||
let v6_success_rate = (successful_v6_pings as f64 / total_v6_pings as f64) * 100.0;
|
||||
|
||||
info!(
|
||||
"IPv4: {}/{} ({:.1}%), IPv6: {}/{} ({:.1}%)",
|
||||
successful_v4_pings,
|
||||
total_v4_pings,
|
||||
v4_success_rate,
|
||||
successful_v6_pings,
|
||||
total_v6_pings,
|
||||
v6_success_rate
|
||||
);
|
||||
|
||||
assert!(successful_v4_pings > 0, "No IPv4 pings successful");
|
||||
assert!(v4_success_rate >= 75.0, "IPv4 success rate < 75%");
|
||||
assert!(successful_v6_pings > 0, "No IPv6 pings successful");
|
||||
assert!(v6_success_rate >= 75.0, "IPv6 success rate < 75%");
|
||||
|
||||
stream.disconnect_stream().await;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
use std::fs;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum NetworkEnvironment {
|
||||
#[default]
|
||||
Mainnet,
|
||||
// Sandbox,
|
||||
}
|
||||
|
||||
fn find_workspace_root() -> PathBuf {
|
||||
let mut current = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
|
||||
|
||||
loop {
|
||||
let cargo_toml = current.join("Cargo.toml");
|
||||
|
||||
if cargo_toml.exists() {
|
||||
if let Ok(contents) = fs::read_to_string(&cargo_toml) {
|
||||
// Check if this Cargo.toml defines a workspace
|
||||
if contents.contains("[workspace]") {
|
||||
return current;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !current.pop() {
|
||||
panic!("Could not find workspace root");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NetworkEnvironment {
|
||||
pub fn env_file_path(&self) -> PathBuf {
|
||||
let root = find_workspace_root();
|
||||
match self {
|
||||
Self::Mainnet => root.join("envs/mainnet.env"),
|
||||
// Self::Sandbox => root.join("envs/sandbox.env"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn network_defaults(&self) -> crate::NymNetworkDetails {
|
||||
match self {
|
||||
Self::Mainnet => crate::NymNetworkDetails::new_mainnet(),
|
||||
// Self::Sandbox => crate::NymNetworkDetails::new_sandbox(), // TODO
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse_network(s: &str) -> Result<Self, String> {
|
||||
match s.to_lowercase().as_str() {
|
||||
"mainnet" | "main" => Ok(Self::Mainnet),
|
||||
// "sandbox" | "sand" => Ok(Self::Sandbox),
|
||||
_ => Err(format!("Unknown env: {}", s)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,13 +1,16 @@
|
||||
//! Rust SDK for the Nym platform
|
||||
//!
|
||||
//! The main component currently is [`mixnet`].
|
||||
//! [`tcp_proxy`] is probably a good place to start for anyone wanting to integrate with existing app code and read/write from a socket.
|
||||
//! [`client_pool`] is a configurable client pool.
|
||||
//! [`tcp_proxy`] is a soon to be deprecated wrapper around the mixnet client which exposes a localhost port.
|
||||
//! [`ipr_wrapper`] tunnels IP packets through the mixnet to an IPR exit gateway.
|
||||
|
||||
mod error;
|
||||
|
||||
pub mod bandwidth;
|
||||
pub mod client_pool;
|
||||
pub mod ip_packet_client;
|
||||
pub mod ipr_wrapper;
|
||||
pub mod mixnet;
|
||||
pub mod tcp_proxy;
|
||||
|
||||
|
||||
@@ -16,7 +16,9 @@ use nym_sphinx::params::PacketType;
|
||||
use nym_task::connections::TransmissionLane;
|
||||
use tokio_util::sync::PollSender;
|
||||
|
||||
use super::protocol::{encode_stream_message, StreamId, StreamMessageType};
|
||||
use nym_lp::packet::frame::StreamMsgType;
|
||||
|
||||
use super::protocol::{encode_stream_message, StreamId};
|
||||
use super::StreamMap;
|
||||
|
||||
/// How to address outbound messages on this stream.
|
||||
@@ -45,6 +47,7 @@ pub struct MixnetStream {
|
||||
inbound_rx: mpsc::UnboundedReceiver<Vec<u8>>,
|
||||
read_buf: BytesMut,
|
||||
deregistered: bool,
|
||||
next_seq: u32,
|
||||
}
|
||||
|
||||
impl MixnetStream {
|
||||
@@ -71,6 +74,7 @@ impl MixnetStream {
|
||||
inbound_rx,
|
||||
read_buf: BytesMut::new(),
|
||||
deregistered: false,
|
||||
next_seq: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,6 +102,7 @@ impl MixnetStream {
|
||||
inbound_rx,
|
||||
read_buf,
|
||||
deregistered: false,
|
||||
next_seq: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -177,7 +182,9 @@ impl AsyncWrite for MixnetStream {
|
||||
ready!(self.sender.poll_ready_unpin(cx))
|
||||
.map_err(|_| std::io::Error::other("mixnet input channel closed"))?;
|
||||
|
||||
let wire = encode_stream_message(&self.id, StreamMessageType::Data, buf);
|
||||
let seq = self.next_seq;
|
||||
self.next_seq = self.next_seq.wrapping_add(1);
|
||||
let wire = encode_stream_message(&self.id, StreamMsgType::Data, seq, buf);
|
||||
let msg = self.make_input_message(wire);
|
||||
|
||||
self.sender
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
//! stream's channel (or to the listener for `Open` messages).
|
||||
|
||||
mod mixnet_stream;
|
||||
mod protocol;
|
||||
pub(crate) mod protocol;
|
||||
|
||||
pub use mixnet_stream::MixnetStream;
|
||||
pub use protocol::StreamId;
|
||||
@@ -32,7 +32,8 @@ use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use nym_task::connections::TransmissionLane;
|
||||
|
||||
use protocol::{decode_stream_message, encode_stream_message, StreamMessageType};
|
||||
use nym_lp::packet::frame::StreamMsgType;
|
||||
use protocol::{decode_stream_message, encode_stream_message};
|
||||
|
||||
use crate::mixnet::native_client::MixnetClient;
|
||||
use crate::{Error, Result};
|
||||
@@ -230,20 +231,20 @@ async fn run_router(
|
||||
continue;
|
||||
};
|
||||
|
||||
let stream_id = frame.header.stream_id;
|
||||
match frame.header.message_type {
|
||||
StreamMessageType::Open => {
|
||||
let stream_id = frame.stream_id;
|
||||
match frame.msg_type {
|
||||
StreamMsgType::Open => {
|
||||
let _ = listener_tx.send(InboundOpen {
|
||||
stream_id,
|
||||
sender_tag: msg.sender_tag,
|
||||
initial_data: frame.data.to_vec(),
|
||||
});
|
||||
}
|
||||
StreamMessageType::Data => {
|
||||
StreamMsgType::Data => {
|
||||
streams
|
||||
.send_to_stream(&stream_id, frame.data.to_vec())
|
||||
.await;
|
||||
} // TODO: if we decide we need close logic add another enum member
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -295,7 +296,7 @@ pub(crate) async fn open_stream(
|
||||
let rx = streams.register_stream(stream_id).await;
|
||||
|
||||
// Send Open to the peer
|
||||
let wire = encode_stream_message(&stream_id, StreamMessageType::Open, &[]);
|
||||
let wire = encode_stream_message(&stream_id, StreamMsgType::Open, 0, &[]);
|
||||
let msg = InputMessage::new_anonymous(
|
||||
recipient,
|
||||
wire,
|
||||
|
||||
@@ -1,24 +1,22 @@
|
||||
//! Wire protocol for stream multiplexing.
|
||||
//!
|
||||
//! Every message between streams carries a fixed header prepended to
|
||||
//! Every message between streams carries an LP frame header prepended to
|
||||
//! the payload inside the mixnet message body:
|
||||
//!
|
||||
//! ```text
|
||||
//! [Version: 1 byte][StreamId: 8 bytes][MessageType: 1 byte][payload: N bytes]
|
||||
//! [LpFrameKind: 2 bytes LE][StreamFrameAttributes: 14 bytes][payload: N bytes]
|
||||
//! ```
|
||||
//!
|
||||
//! This header sits inside the sphinx packet payload.
|
||||
//! The `StreamFrameAttributes` encode stream_id, message type, and sequence
|
||||
//! number inside the LP header's `frame_attributes` field. This is the same
|
||||
//! LP frame format used across the system (IPR detection, gateway dispatch).
|
||||
|
||||
use std::fmt;
|
||||
|
||||
/// Current stream protocol version.
|
||||
pub const STREAM_PROTOCOL_VERSION: u8 = 1;
|
||||
|
||||
/// Length of a StreamId in bytes (u64, big-endian).
|
||||
pub const STREAM_ID_LEN: usize = 8;
|
||||
|
||||
/// Total header length: Version (1) + StreamId (8) + MessageType (1).
|
||||
pub const STREAM_HEADER_LEN: usize = 1 + STREAM_ID_LEN + 1;
|
||||
use bytes::BytesMut;
|
||||
use nym_lp::packet::frame::{
|
||||
LpFrame, LpFrameHeader, LpFrameKind, StreamFrameAttributes, StreamMsgType,
|
||||
};
|
||||
|
||||
/// Identifies a stream within a MixnetClient.
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
|
||||
@@ -29,12 +27,12 @@ impl StreamId {
|
||||
Self(rand::random::<u64>())
|
||||
}
|
||||
|
||||
pub fn to_bytes(self) -> [u8; STREAM_ID_LEN] {
|
||||
self.0.to_be_bytes()
|
||||
pub fn as_u64(self) -> u64 {
|
||||
self.0
|
||||
}
|
||||
|
||||
pub fn from_bytes(bytes: [u8; STREAM_ID_LEN]) -> Self {
|
||||
Self(u64::from_be_bytes(bytes))
|
||||
pub fn from_u64(v: u64) -> Self {
|
||||
Self(v)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,88 +48,54 @@ impl fmt::Display for StreamId {
|
||||
}
|
||||
}
|
||||
|
||||
/// Message types within the stream protocol.
|
||||
///
|
||||
/// Note: there is no Close variant. Without message sequencing, a close
|
||||
/// message races ahead of in-flight data and arrives before the data is
|
||||
/// reconstructed. Streams clean up locally via Drop. If ordered close/EOF
|
||||
/// is needed in future, add sequencing + reorder buffering (see the
|
||||
/// tcp_proxy's `MessageBuffer` for a working example).
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
#[repr(u8)]
|
||||
pub enum StreamMessageType {
|
||||
/// Open a new stream. Payload is optional initial data.
|
||||
Open = 0,
|
||||
/// Data on an existing stream.
|
||||
Data = 1,
|
||||
}
|
||||
|
||||
impl StreamMessageType {
|
||||
pub fn from_byte(b: u8) -> Option<Self> {
|
||||
match b {
|
||||
0 => Some(Self::Open),
|
||||
1 => Some(Self::Data),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The fixed-size header prepended to every stream message.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct MixStreamHeader {
|
||||
pub version: u8,
|
||||
pub stream_id: StreamId,
|
||||
pub message_type: StreamMessageType,
|
||||
}
|
||||
|
||||
/// A decoded stream frame: header + payload reference.
|
||||
/// A decoded stream frame: LP header fields + payload reference.
|
||||
#[derive(Debug)]
|
||||
pub struct MixStreamFrame<'a> {
|
||||
pub header: MixStreamHeader,
|
||||
pub struct StreamFrame<'a> {
|
||||
pub stream_id: StreamId,
|
||||
pub msg_type: StreamMsgType,
|
||||
pub sequence_num: u32,
|
||||
pub data: &'a [u8],
|
||||
}
|
||||
|
||||
/// Encode a stream message: `[version][stream_id][msg_type][payload]`.
|
||||
/// Encode a stream message as an LP frame: `[LpFrameHeader][payload]`.
|
||||
pub fn encode_stream_message(
|
||||
id: &StreamId,
|
||||
msg_type: StreamMessageType,
|
||||
msg_type: StreamMsgType,
|
||||
sequence_num: u32,
|
||||
payload: &[u8],
|
||||
) -> Vec<u8> {
|
||||
let mut buf = Vec::with_capacity(STREAM_HEADER_LEN + payload.len());
|
||||
buf.push(STREAM_PROTOCOL_VERSION);
|
||||
buf.extend_from_slice(&id.to_bytes());
|
||||
buf.push(msg_type as u8);
|
||||
buf.extend_from_slice(payload);
|
||||
buf
|
||||
let attrs = StreamFrameAttributes {
|
||||
stream_id: id.as_u64(),
|
||||
msg_type,
|
||||
sequence_num,
|
||||
};
|
||||
let frame = LpFrame::new_stream(attrs, payload.to_vec());
|
||||
let mut buf = BytesMut::with_capacity(LpFrameHeader::SIZE + payload.len());
|
||||
frame.encode(&mut buf);
|
||||
buf.to_vec()
|
||||
}
|
||||
|
||||
/// Decode a stream message into a [`MixStreamFrame`].
|
||||
/// Decode a stream message from LP frame bytes.
|
||||
///
|
||||
/// Returns `None` if the buffer is too short, the version is unknown,
|
||||
/// or the message type byte is invalid.
|
||||
pub fn decode_stream_message(bytes: &[u8]) -> Option<MixStreamFrame<'_>> {
|
||||
if bytes.len() < STREAM_HEADER_LEN {
|
||||
/// Returns `None` if the buffer is too short, the frame kind is not `Stream`,
|
||||
/// or the stream attributes are invalid.
|
||||
pub fn decode_stream_message(bytes: &[u8]) -> Option<StreamFrame<'_>> {
|
||||
if bytes.len() < LpFrameHeader::SIZE {
|
||||
return None;
|
||||
}
|
||||
|
||||
let version = bytes[0];
|
||||
if version != STREAM_PROTOCOL_VERSION {
|
||||
let header = LpFrameHeader::parse(bytes).ok()?;
|
||||
if header.kind != LpFrameKind::Stream {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut id_bytes = [0u8; STREAM_ID_LEN];
|
||||
id_bytes.copy_from_slice(&bytes[1..1 + STREAM_ID_LEN]);
|
||||
let stream_id = StreamId::from_bytes(id_bytes);
|
||||
let attrs = StreamFrameAttributes::parse(&header.frame_attributes).ok()?;
|
||||
let data = &bytes[LpFrameHeader::SIZE..];
|
||||
|
||||
let message_type = StreamMessageType::from_byte(bytes[1 + STREAM_ID_LEN])?;
|
||||
let data = &bytes[STREAM_HEADER_LEN..];
|
||||
|
||||
Some(MixStreamFrame {
|
||||
header: MixStreamHeader {
|
||||
version,
|
||||
stream_id,
|
||||
message_type,
|
||||
},
|
||||
Some(StreamFrame {
|
||||
stream_id: StreamId::from_u64(attrs.stream_id),
|
||||
msg_type: attrs.msg_type,
|
||||
sequence_num: attrs.sequence_num,
|
||||
data,
|
||||
})
|
||||
}
|
||||
@@ -144,11 +108,11 @@ mod tests {
|
||||
fn roundtrip() {
|
||||
let id = StreamId::random();
|
||||
let payload = b"hello world";
|
||||
let encoded = encode_stream_message(&id, StreamMessageType::Data, payload);
|
||||
let encoded = encode_stream_message(&id, StreamMsgType::Data, 42, payload);
|
||||
let frame = decode_stream_message(&encoded).unwrap();
|
||||
assert_eq!(frame.header.version, STREAM_PROTOCOL_VERSION);
|
||||
assert_eq!(frame.header.stream_id, id);
|
||||
assert_eq!(frame.header.message_type, StreamMessageType::Data);
|
||||
assert_eq!(frame.stream_id, id);
|
||||
assert_eq!(frame.msg_type, StreamMsgType::Data);
|
||||
assert_eq!(frame.sequence_num, 42);
|
||||
assert_eq!(frame.data, payload);
|
||||
}
|
||||
|
||||
@@ -158,41 +122,70 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bad_version() {
|
||||
let id = StreamId::random();
|
||||
let mut encoded = encode_stream_message(&id, StreamMessageType::Data, b"x");
|
||||
encoded[0] = 0xFF;
|
||||
assert!(decode_stream_message(&encoded).is_none());
|
||||
fn wrong_frame_kind() {
|
||||
// Opaque frame kind (0x00, 0x00) should not parse as stream
|
||||
let mut buf = vec![0u8; LpFrameHeader::SIZE + 1];
|
||||
buf[LpFrameHeader::SIZE] = 0xAA;
|
||||
assert!(decode_stream_message(&buf).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bad_message_type() {
|
||||
let mut buf = [0u8; STREAM_HEADER_LEN];
|
||||
buf[0] = STREAM_PROTOCOL_VERSION;
|
||||
buf[1 + STREAM_ID_LEN] = 0xFF;
|
||||
assert!(decode_stream_message(&buf).is_none());
|
||||
fn bad_msg_type() {
|
||||
let id = StreamId::random();
|
||||
let mut encoded = encode_stream_message(&id, StreamMsgType::Data, 0, b"x");
|
||||
// msg_type is at byte offset 2 + 8 = 10 (inside frame_attributes)
|
||||
encoded[10] = 0xFF;
|
||||
assert!(decode_stream_message(&encoded).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn empty_payload() {
|
||||
let id = StreamId::random();
|
||||
let encoded = encode_stream_message(&id, StreamMessageType::Open, &[]);
|
||||
let encoded = encode_stream_message(&id, StreamMsgType::Open, 0, &[]);
|
||||
let frame = decode_stream_message(&encoded).unwrap();
|
||||
assert_eq!(frame.header.message_type, StreamMessageType::Open);
|
||||
assert_eq!(frame.msg_type, StreamMsgType::Open);
|
||||
assert_eq!(frame.sequence_num, 0);
|
||||
assert!(frame.data.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn header_wire_format() {
|
||||
let id = StreamId::from_bytes([0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77]);
|
||||
let encoded = encode_stream_message(&id, StreamMessageType::Open, &[0xAA]);
|
||||
assert_eq!(encoded.len(), STREAM_HEADER_LEN + 1);
|
||||
assert_eq!(encoded[0], STREAM_PROTOCOL_VERSION);
|
||||
let id = StreamId::from_u64(0x0011223344556677);
|
||||
let encoded = encode_stream_message(&id, StreamMsgType::Open, 1, &[0xAA]);
|
||||
|
||||
// LpFrameHeader::SIZE (16) + 1 byte payload
|
||||
assert_eq!(encoded.len(), LpFrameHeader::SIZE + 1);
|
||||
|
||||
// First 2 bytes: LpFrameKind::Stream = 3, LE
|
||||
assert_eq!(encoded[0], 0x03);
|
||||
assert_eq!(encoded[1], 0x00);
|
||||
|
||||
// Bytes 2..10: stream_id BE
|
||||
assert_eq!(
|
||||
&encoded[1..9],
|
||||
&encoded[2..10],
|
||||
&[0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77]
|
||||
);
|
||||
assert_eq!(encoded[9], StreamMessageType::Open as u8);
|
||||
assert_eq!(encoded[10], 0xAA);
|
||||
|
||||
// Byte 10: msg_type = Open = 0
|
||||
assert_eq!(encoded[10], StreamMsgType::Open as u8);
|
||||
|
||||
// Bytes 11..15: sequence_num = 1, BE
|
||||
assert_eq!(&encoded[11..15], &[0x00, 0x00, 0x00, 0x01]);
|
||||
|
||||
// Byte 15: reserved = 0
|
||||
assert_eq!(encoded[15], 0x00);
|
||||
|
||||
// Byte 16: payload
|
||||
assert_eq!(encoded[16], 0xAA);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sequence_num_roundtrip() {
|
||||
let id = StreamId::random();
|
||||
for seq in [0, 1, 255, 65535, u32::MAX] {
|
||||
let encoded = encode_stream_message(&id, StreamMsgType::Data, seq, b"test");
|
||||
let frame = decode_stream_message(&encoded).unwrap();
|
||||
assert_eq!(frame.sequence_num, seq);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ nym-crypto = { workspace = true }
|
||||
nym-exit-policy = { workspace = true }
|
||||
nym-id = { workspace = true }
|
||||
nym-ip-packet-requests = { workspace = true }
|
||||
nym-lp = { workspace = true }
|
||||
nym-kcp = { path = "../../common/nym-kcp" } # TODO MAX add to workspace dependencies
|
||||
nym-network-defaults = { workspace = true }
|
||||
nym-network-requester = { path = "../network-requester" }
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
@@ -10,6 +11,7 @@ use nym_ip_packet_requests::{
|
||||
v7::response::IpPacketResponse as IpPacketResponseV7,
|
||||
v8::response::IpPacketResponse as IpPacketResponseV8,
|
||||
};
|
||||
use nym_lp::packet::frame::{LpFrame, LpFrameHeader, StreamFrameAttributes, StreamMsgType};
|
||||
use nym_sdk::mixnet::{
|
||||
InputMessage, MixnetClientSender, MixnetMessageSender, MixnetMessageSinkTranslator,
|
||||
};
|
||||
@@ -64,6 +66,7 @@ impl ConnectedClientHandler {
|
||||
buffer_timeout: Duration,
|
||||
client_version: ClientVersion,
|
||||
mixnet_client_sender: MixnetClientSender,
|
||||
stream_id: Option<u64>,
|
||||
) -> (
|
||||
mpsc::UnboundedSender<Vec<u8>>,
|
||||
oneshot::Sender<()>,
|
||||
@@ -71,6 +74,9 @@ impl ConnectedClientHandler {
|
||||
) {
|
||||
log::debug!("Starting connected client handler for: {client_id}");
|
||||
log::debug!("client version: {client_version:?}");
|
||||
if let Some(sid) = stream_id {
|
||||
log::debug!("LP Stream mode: stream_id={sid:#018x}");
|
||||
}
|
||||
let (close_tx, close_rx) = oneshot::channel();
|
||||
let (forward_from_tun_tx, forward_from_tun_rx) = mpsc::unbounded_channel();
|
||||
|
||||
@@ -86,6 +92,8 @@ impl ConnectedClientHandler {
|
||||
let input_message_creator = ToIprDataResponse {
|
||||
send_to: client_id.clone(),
|
||||
client_version,
|
||||
stream_id,
|
||||
next_response_seq: AtomicU32::new(0),
|
||||
};
|
||||
|
||||
let connected_client_handler = ConnectedClientHandler {
|
||||
@@ -192,12 +200,29 @@ fn create_ip_packet_response(
|
||||
}
|
||||
}
|
||||
|
||||
// This struct is used by the sink to translate the the bundled IP packets into a IPR packet
|
||||
// responses that can be sent to the mixnet.
|
||||
#[derive(Clone, Debug)]
|
||||
// This struct is used by the sink to translate the bundled IP packets into IPR packet
|
||||
// responses that can be sent to the mixnet. When `stream_id` is set, responses are
|
||||
// wrapped in LP Stream frames so the client's stream router can dispatch them.
|
||||
#[derive(Debug)]
|
||||
struct ToIprDataResponse {
|
||||
send_to: ConnectedClientId,
|
||||
client_version: ClientVersion,
|
||||
/// When Some, wrap responses in LP Stream frames with this stream_id.
|
||||
stream_id: Option<u64>,
|
||||
/// Sequence number for LP Stream response frames.
|
||||
next_response_seq: AtomicU32,
|
||||
}
|
||||
|
||||
// Manual impl because AtomicU32 is not Clone.
|
||||
impl Clone for ToIprDataResponse {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
send_to: self.send_to.clone(),
|
||||
client_version: self.client_version,
|
||||
stream_id: self.stream_id,
|
||||
next_response_seq: AtomicU32::new(self.next_response_seq.load(Ordering::Relaxed)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MixnetMessageSinkTranslator for ToIprDataResponse {
|
||||
@@ -205,12 +230,26 @@ impl MixnetMessageSinkTranslator for ToIprDataResponse {
|
||||
&self,
|
||||
bundled_ip_packets: &[u8],
|
||||
) -> std::result::Result<InputMessage, nym_sdk::Error> {
|
||||
// Create a IPR packet response that the recipient can understand
|
||||
let response_packet = create_ip_packet_response(bundled_ip_packets, self.client_version)?;
|
||||
|
||||
// Wrap the response packet in a mixnet input message
|
||||
// Optionally wrap in LP Stream frame for stream-mode clients
|
||||
let final_packet = if let Some(stream_id) = self.stream_id {
|
||||
let seq = self.next_response_seq.fetch_add(1, Ordering::Relaxed);
|
||||
let attrs = StreamFrameAttributes {
|
||||
stream_id,
|
||||
msg_type: StreamMsgType::Data,
|
||||
sequence_num: seq,
|
||||
};
|
||||
let frame = LpFrame::new_stream(attrs, response_packet);
|
||||
let mut buf = BytesMut::with_capacity(LpFrameHeader::SIZE + frame.content.len());
|
||||
frame.encode(&mut buf);
|
||||
buf.to_vec()
|
||||
} else {
|
||||
response_packet
|
||||
};
|
||||
|
||||
let input_message =
|
||||
crate::util::create_message::create_input_message(&self.send_to, response_packet)
|
||||
crate::util::create_message::create_input_message(&self.send_to, final_packet)
|
||||
.with_max_retransmissions(0);
|
||||
|
||||
Ok(input_message)
|
||||
@@ -282,6 +321,8 @@ mod tests {
|
||||
let bytes_to_input_message = ToIprDataResponse {
|
||||
send_to: client_id.clone(),
|
||||
client_version,
|
||||
stream_id: None,
|
||||
next_response_seq: AtomicU32::new(0),
|
||||
};
|
||||
|
||||
let mixnet_ip_packet_sender = MixnetMessageSink::new_with_custom_translator(
|
||||
|
||||
@@ -119,6 +119,9 @@ pub enum IpPacketRouterError {
|
||||
|
||||
#[error("KCP protocol error: {0}")]
|
||||
KcpError(String),
|
||||
|
||||
#[error("{0}")]
|
||||
Other(String),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, IpPacketRouterError>;
|
||||
|
||||
@@ -181,6 +181,7 @@ impl IpPacketRouter {
|
||||
shutdown_token: self.shutdown.clone_shutdown_token(),
|
||||
connected_clients,
|
||||
kcp_session_manager: crate::kcp_session_manager::KcpSessionManager::new(),
|
||||
current_stream_id: None,
|
||||
};
|
||||
|
||||
log::info!("The address of this client is: {self_address}");
|
||||
|
||||
@@ -22,8 +22,12 @@ use crate::{
|
||||
request_filter::RequestFilter,
|
||||
util::parse_ip::ParsedPacket,
|
||||
};
|
||||
use bytes::BytesMut;
|
||||
use futures::StreamExt;
|
||||
use nym_ip_packet_requests::codec::MultiIpPacketCodec;
|
||||
use nym_lp::packet::frame::{
|
||||
LpFrame, LpFrameHeader, LpFrameKind, StreamFrameAttributes, StreamMsgType,
|
||||
};
|
||||
use nym_sdk::mixnet::MixnetMessageSender;
|
||||
use nym_sphinx::receiver::ReconstructedMessage;
|
||||
use nym_task::ShutdownToken;
|
||||
@@ -63,6 +67,10 @@ pub(crate) struct MixnetListener {
|
||||
|
||||
// KCP session manager for LP clients sending KCP-wrapped messages
|
||||
pub(crate) kcp_session_manager: KcpSessionManager,
|
||||
|
||||
// When processing an LP Stream frame, this holds the stream_id so connect
|
||||
// handlers can pass it to ConnectedClientHandler for LP-wrapping TUN responses.
|
||||
pub(crate) current_stream_id: Option<u64>,
|
||||
}
|
||||
|
||||
/// Check if a message payload appears to be KCP-wrapped.
|
||||
@@ -233,6 +241,7 @@ impl MixnetListener {
|
||||
buffer_timeout,
|
||||
version,
|
||||
self.mixnet_client.split_sender(),
|
||||
self.current_stream_id,
|
||||
);
|
||||
|
||||
// Register the new client in the set of connected clients
|
||||
@@ -318,6 +327,7 @@ impl MixnetListener {
|
||||
buffer_timeout,
|
||||
version,
|
||||
self.mixnet_client.split_sender(),
|
||||
self.current_stream_id,
|
||||
);
|
||||
|
||||
// Register the new client in the set of connected clients
|
||||
@@ -436,6 +446,15 @@ impl MixnetListener {
|
||||
.unwrap_or("missing".to_owned())
|
||||
);
|
||||
|
||||
// Check if this is an LP Stream frame
|
||||
if reconstructed.message.len() >= LpFrameHeader::SIZE {
|
||||
if let Ok(header) = LpFrameHeader::parse(&reconstructed.message) {
|
||||
if header.kind == LpFrameKind::Stream {
|
||||
return self.on_stream_frame(reconstructed).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check if this is a KCP-wrapped message from an LP client
|
||||
if is_kcp_message(&reconstructed.message) {
|
||||
return self.on_kcp_message(reconstructed).await;
|
||||
@@ -445,6 +464,104 @@ impl MixnetListener {
|
||||
self.on_ipr_message(reconstructed).await
|
||||
}
|
||||
|
||||
/// Handle LP Stream-framed messages.
|
||||
///
|
||||
/// Parses stream attributes, processes the inner IPR payload, and handles
|
||||
/// responses inline (wrapped in LP Stream frames) — same pattern as KCP.
|
||||
async fn on_stream_frame(
|
||||
&mut self,
|
||||
reconstructed: ReconstructedMessage,
|
||||
) -> Result<Vec<PacketHandleResult>> {
|
||||
log::debug!(
|
||||
"Received LP Stream frame ({} bytes)",
|
||||
reconstructed.message.len()
|
||||
);
|
||||
|
||||
let header = LpFrameHeader::parse(&reconstructed.message)
|
||||
.map_err(|e| IpPacketRouterError::Other(format!("Invalid LP frame header: {e}")))?;
|
||||
let attrs = StreamFrameAttributes::parse(&header.frame_attributes).map_err(|e| {
|
||||
IpPacketRouterError::Other(format!("Invalid stream frame attributes: {e}"))
|
||||
})?;
|
||||
|
||||
let stream_id = attrs.stream_id;
|
||||
log::debug!(
|
||||
"LP Stream: stream_id={stream_id:#018x}, msg_type={:?}, seq={}",
|
||||
attrs.msg_type,
|
||||
attrs.sequence_num
|
||||
);
|
||||
|
||||
// Set context so connect handlers thread stream_id to ConnectedClientHandler
|
||||
self.current_stream_id = Some(stream_id);
|
||||
|
||||
let payload = &reconstructed.message[LpFrameHeader::SIZE..];
|
||||
|
||||
// Open frames may carry an empty payload (stream handshake only)
|
||||
if payload.is_empty() {
|
||||
log::debug!("LP Stream: empty payload (Open handshake), skipping");
|
||||
self.current_stream_id = None;
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let inner_reconstructed = ReconstructedMessage {
|
||||
message: payload.to_vec(),
|
||||
sender_tag: reconstructed.sender_tag,
|
||||
};
|
||||
|
||||
match self.on_ipr_message(inner_reconstructed).await {
|
||||
Ok(results) => {
|
||||
for result in results {
|
||||
#[allow(clippy::collapsible_if)]
|
||||
if let Ok(Some(response)) = result {
|
||||
if let Err(e) = self.handle_stream_response(stream_id, response).await {
|
||||
log::warn!(
|
||||
"Error sending LP Stream response for stream_id={stream_id:#018x}: {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("Error processing LP Stream inner message: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
self.current_stream_id = None;
|
||||
|
||||
// Return empty — we handled responses directly above
|
||||
Ok(vec![])
|
||||
}
|
||||
|
||||
/// Wrap a response in an LP Stream frame and send it via the mixnet.
|
||||
///
|
||||
/// Used for inline responses to LP Stream clients (connect handshake, pong, etc.).
|
||||
async fn handle_stream_response(
|
||||
&mut self,
|
||||
stream_id: u64,
|
||||
response: VersionedResponse,
|
||||
) -> Result<()> {
|
||||
let reply_to = response.reply_to.clone();
|
||||
let response_bytes = response.try_into_bytes()?;
|
||||
|
||||
// Wrap in LP Stream frame (seq=0 for inline responses)
|
||||
let attrs = StreamFrameAttributes {
|
||||
stream_id,
|
||||
msg_type: StreamMsgType::Data,
|
||||
sequence_num: 0,
|
||||
};
|
||||
let frame = LpFrame::new_stream(attrs, response_bytes);
|
||||
let mut buf = BytesMut::with_capacity(LpFrameHeader::SIZE + frame.content.len());
|
||||
frame.encode(&mut buf);
|
||||
|
||||
let input_message =
|
||||
crate::util::create_message::create_input_message(&reply_to, buf.to_vec());
|
||||
|
||||
self.mixnet_client.send(input_message).await.map_err(|err| {
|
||||
IpPacketRouterError::FailedToSendPacketToMixnet {
|
||||
source: Box::new(err),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Handle KCP-wrapped messages from LP clients.
|
||||
///
|
||||
/// LP clients send: KCP(IpPacketRequest)
|
||||
@@ -813,4 +930,32 @@ mod tests {
|
||||
"Invalid KCP command 85 should be rejected"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_lp_stream_frame_detected() {
|
||||
use bytes::BytesMut;
|
||||
use nym_lp::packet::frame::{
|
||||
LpFrameHeader, LpFrameKind, StreamFrameAttributes, StreamMsgType,
|
||||
};
|
||||
|
||||
let attrs = StreamFrameAttributes {
|
||||
stream_id: 0x1234,
|
||||
msg_type: StreamMsgType::Data,
|
||||
sequence_num: 42,
|
||||
};
|
||||
let frame = nym_lp::packet::frame::LpFrame::new_stream(attrs, vec![8, 1, 0]); // fake IPR payload
|
||||
let mut buf = BytesMut::new();
|
||||
frame.encode(&mut buf);
|
||||
|
||||
let header = LpFrameHeader::parse(&buf).unwrap();
|
||||
assert_eq!(header.kind, LpFrameKind::Stream);
|
||||
|
||||
let parsed_attrs = StreamFrameAttributes::parse(&header.frame_attributes).unwrap();
|
||||
assert_eq!(parsed_attrs.stream_id, 0x1234);
|
||||
assert_eq!(parsed_attrs.msg_type, StreamMsgType::Data);
|
||||
assert_eq!(parsed_attrs.sequence_num, 42);
|
||||
|
||||
// Content is everything after the header
|
||||
assert_eq!(&buf[LpFrameHeader::SIZE..], &[8, 1, 0]);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user