Max/asyncread asyncwrite nym client (#6318)

* Remove AsyncRead/Write traits from native client - moving them to
stream/

* Substream model first push

* Update / add examples

* Update lockfile

* Clippy

* clippy examples

* remove codecs

* Remove unused bincode setup

* Revert a lot of changes when SDK client itself implemented
AsyncRead/Write

* Remove unnecessary mut

* Use local PollSender in MixnetStream instead of client_input.input_sender

Now that client-core's input_sender is back to mpsc::Sender (reverted
PollSender migration), MixnetStream creates its own PollSender wrapper
for the AsyncWrite impl's poll_ready/start_send calls.

* Remove now-unnecessary parameter

* Clippy

* Cleanup more stragglers from previous setup (Async traits on
MixnetClient)

* Rename files (remove module inception)

* - Shrink StreamId from 16 bytes to u64, add version byte to wire format
  - Introduce MixStreamHeader/MixStreamFrame structs for decode
  - Replace StreamMap type alias with struct using tokio::sync::Mutex
  - Add StreamMap helper methods, eliminate lock().expect() panics
  - Rename stream.rs -> mixnet_stream.rs to avoid module inception
  - Document irrevocable stream mode, add LP integration TODO

* - Remove dummy channel
- Add err variant for reciever alredy taken
- Remove panics

* add timeout to stream

* clippy
This commit is contained in:
mfahampshire
2026-03-13 09:40:45 +00:00
committed by GitHub
parent fdd2c8fac2
commit b231eb4f04
27 changed files with 1327 additions and 11 deletions
+1
View File
@@ -1,3 +1,4 @@
#![allow(deprecated)] // silences clippy warning: use of deprecated associated function `nym_crypto::generic_array::GenericArray::<T, N>::clone_from_slice`: please upgrade to generic-array 1.x - TODO
use std::future::Future;
#[cfg(all(
@@ -1,6 +1,7 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: use of deprecated associated function `nym_crypto::generic_array::GenericArray::<T, N>::from_exact_iter`: please upgrade to generic-array 1.x - TODO
pub use backend::*;
pub use combined::CombinedReplyStorage;
pub use key_storage::SentReplyKeys;
@@ -1,6 +1,8 @@
// Copyright 2022-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(clippy::derivable_impls)]
// MAX: surpressing warning for the moment, will be dealt with in a different PR (TODO)
use cosmwasm_schema::cw_serde;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
+2
View File
@@ -1,6 +1,8 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: deprecated associated function `generic_array::GenericArray::<T, N>::from_exact_iter`: please upgrade to generic-array 1.x - TODO
#[cfg(feature = "asymmetric")]
pub mod asymmetric;
pub mod bech32_address_validation;
+1
View File
@@ -1,5 +1,6 @@
// Copyright 2020-2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: deprecated associated function `nym_crypto::generic_array::GenericArray::<T, N>::clone_from_slice`: please upgrade to generic-array 1.x - TODO
pub use nym_crypto::generic_array;
use nym_crypto::OutputSizeUser;
+3
View File
@@ -1,6 +1,9 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)]
// silences clippy warning: use of deprecated tuple variant `HttpClientError::GenericRequestFailure`: use another more strongly typed variant - this variant is only left for compatibility reasons - TODO
//! Nym HTTP API Client
//!
//! Centralizes and implements the core API client functionality. This crate provides custom,
@@ -1,5 +1,6 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: deprecated associated function `nym_crypto::generic_array::GenericArray::<T, N>::clone_from_slice`: please upgrade to generic-array 1.x - TODO
pub mod identifier;
pub mod key;
@@ -1,6 +1,7 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: deprecated struct `nym_crypto::generic_array::GenericArray`: please upgrade to generic-array 1.x - TODO
pub mod encryption_key;
pub mod reply_surb;
pub mod requests;
+1
View File
@@ -1,6 +1,7 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)]
use aes_gcm::aead::{Aead, Nonce};
use aes_gcm::{AeadCore, AeadInPlace, KeyInit};
use rand::{thread_rng, CryptoRng, Fill, RngCore};
+3
View File
@@ -104,6 +104,9 @@ importers:
next:
specifier: 15.5.10
version: 15.5.10(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)
next-sitemap:
specifier: 4.2.3
version: 4.2.3(next@15.5.10(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1))
nextra:
specifier: '2'
version: 2.13.4(next@15.5.10(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1))(react-dom@18.3.1(react@18.3.1))(react@18.3.1)
+3
View File
@@ -1,3 +1,6 @@
// MAX: temp ignore deprecated, can be dealt with in its own PR
#![allow(deprecated)] // silences clippy warning: deprecated associated function `chacha20::cipher::generic_array::GenericArray::<T, N>::from_slice`: please upgrade to generic-array 1.x - TODO
pub mod constants;
pub mod error;
pub mod format;
+1 -1
View File
@@ -83,7 +83,7 @@ anyhow = { workspace = true }
dotenvy = { workspace = true }
reqwest = { workspace = true, features = ["json", "socks"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio = { workspace = true, features = ["full", "test-util"] }
time = { workspace = true }
nym-bin-common = {workspace = true, features = ["basic_tracing"] }
+2
View File
@@ -1,5 +1,7 @@
# Nym Rust SDK
<!--TODO MAX stream abstraction + notice on tcpproxy -->
This repo contains several components:
- `mixnet`: exposes Nym Client builders and methods. This is useful if you want to interact directly with the Client, or build transport abstractions.
- `tcp_proxy`: exposes functionality to set up client/server instances that expose a localhost TcpSocket to read/write to like a 'normal' socket connection. `tcp_proxy/bin/` contains standalone `nym-proxy-client` and `nym-proxy-server` binaries.
@@ -0,0 +1,78 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! Demonstrates stream idle timeout cleanup.
//!
//! Opens a stream to self, uses it, then stops. After the idle timeout
//! elapses the router removes the stream and reads return EOF.
//!
//! Run with: cargo run --example stream_idle_timeout
use nym_sdk::mixnet;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
/// Short idle timeout so we don't wait the default
const IDLE_TIMEOUT: Duration = Duration::from_secs(2);
const WAIT_TIMEOUT: Duration = Duration::from_secs(60);
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
// Build a client with a short stream idle timeout.
let mut client = mixnet::MixnetClientBuilder::new_ephemeral()
.with_stream_idle_timeout(IDLE_TIMEOUT)
.build()
.unwrap()
.connect_to_mixnet()
.await
.unwrap();
let our_address = *client.nym_address();
println!("Client address: {our_address}");
// Set up a listener and open a stream to ourselves.
let mut listener = client.listener().unwrap();
let mut outbound = client.open_stream(our_address, None).await.unwrap();
println!("Opened outbound stream: {}", outbound.id());
let mut inbound = tokio::time::timeout(WAIT_TIMEOUT, listener.accept())
.await
.expect("timed out waiting for accept")
.expect("listener shut down");
println!("Accepted inbound stream: {}", inbound.id());
// Use the stream.
let msg = b"hello from idle timeout example";
outbound.write_all(msg).await.unwrap();
outbound.flush().await.unwrap();
let mut buf = vec![0u8; 1024];
let n = tokio::time::timeout(WAIT_TIMEOUT, inbound.read(&mut buf))
.await
.expect("timed out reading")
.expect("read failed");
println!("Received: {:?}", String::from_utf8_lossy(&buf[..n]));
assert_eq!(&buf[..n], msg);
// Now stop using the stream and wait for the idle timeout.
println!(
"\nStream is idle. Waiting {}s for cleanup...",
IDLE_TIMEOUT.as_secs()
);
tokio::time::sleep(IDLE_TIMEOUT + Duration::from_secs(2)).await;
// The router should have cleaned up the stream. The inbound receiver
// is closed, so a read returns 0 bytes (EOF).
let n = inbound.read(&mut buf).await.expect("read failed");
if n == 0 {
println!("Inbound stream returned EOF — cleaned up by idle timeout.");
} else {
println!("Unexpected: got {n} bytes after idle timeout");
}
drop(outbound);
drop(inbound);
client.disconnect().await;
}
@@ -0,0 +1,69 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_sdk::mixnet;
use nym_sdk::mixnet::MixnetMessageSender;
use nym_sdk::Error;
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
let our_address = *client.nym_address();
println!("Our client nym address is: {our_address}");
// Message-based API works before stream mode is activated
println!("\nTesting message-based API (should work)");
client
.send_plain_message(our_address, "hello via message API")
.await
.unwrap();
println!("Message sent successfully via message-based API");
// Now activate stream mode by creating a listener
println!("\nActivating stream mode via listener()");
let _listener = client.listener().unwrap();
println!("Stream mode is now active");
// Message-based API should now fail
println!("\nTesting message-based API again (should fail)");
let result = client
.send_plain_message(our_address, "this should fail")
.await;
match result {
Err(Error::StreamModeActive) => {
println!("Got expected error: StreamModeActive");
println!("Message-based API correctly blocked after stream mode activation");
}
Err(e) => {
println!("Got unexpected error: {e:?}");
}
Ok(()) => {
println!("ERROR: send() should have failed but succeeded!");
}
}
// split_sender shares the stream_mode flag
println!("\nTesting split_sender (shares stream_mode)");
let sender = client.split_sender();
let result = sender
.send_plain_message(our_address, "this should also fail")
.await;
match result {
Err(Error::StreamModeActive) => {
println!("Got expected error: StreamModeActive on split sender");
println!("Split sender correctly shares stream_mode with parent client");
}
Err(e) => {
println!("Got unexpected error: {e:?}");
}
Ok(()) => {
println!("ERROR: split_sender.send() should have failed but succeeded!");
}
}
client.disconnect().await;
}
@@ -0,0 +1,151 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! Demonstrates concurrent streams over the Mixnet.
//!
//! One sender opens streams to two receivers.
//! Both receivers accept, read, and reply concurrently.
//!
//! Run with: cargo run --example async_read_write
use nym_sdk::mixnet;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
const TIMEOUT: Duration = Duration::from_secs(60);
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
let mut sender = mixnet::MixnetClient::connect_new().await.unwrap();
println!("Sender address: {}", sender.nym_address());
let mut receiver_a = mixnet::MixnetClient::connect_new().await.unwrap();
let addr_a = *receiver_a.nym_address();
println!("Receiver A address: {addr_a}");
let mut receiver_b = mixnet::MixnetClient::connect_new().await.unwrap();
let addr_b = *receiver_b.nym_address();
println!("Receiver B address: {addr_b}");
let mut listener_a = receiver_a.listener().unwrap();
let mut listener_b = receiver_b.listener().unwrap();
println!("\nOpening streams to both receivers...");
let mut stream_to_a = sender.open_stream(addr_a, None).await.unwrap();
println!("Stream to A opened: {}", stream_to_a.id());
let mut stream_to_b = sender.open_stream(addr_b, None).await.unwrap();
println!("Stream to B opened: {}", stream_to_b.id());
println!("\nWaiting for both receivers to accept...");
let (inbound_a, inbound_b) = tokio::try_join!(
async {
tokio::time::timeout(TIMEOUT, listener_a.accept())
.await
.expect("timed out waiting for A to accept")
.ok_or("listener A shut down")
},
async {
tokio::time::timeout(TIMEOUT, listener_b.accept())
.await
.expect("timed out waiting for B to accept")
.ok_or("listener B shut down")
},
)
.unwrap();
println!("A accepted stream: {}", inbound_a.id());
println!("B accepted stream: {}", inbound_b.id());
let msg_a = b"hello receiver A";
let msg_b = b"hello receiver B";
println!("\nSender writing to both streams...");
stream_to_a.write_all(msg_a).await.unwrap();
stream_to_a.flush().await.unwrap();
stream_to_b.write_all(msg_b).await.unwrap();
stream_to_b.flush().await.unwrap();
println!("\nBoth receivers reading and replying concurrently...");
let reply_a = b"reply from A";
let reply_b = b"reply from B";
let (res_a, res_b) = tokio::join!(
// Receiver A: read then reply
async {
let mut inbound = inbound_a;
let mut buf = vec![0u8; 1024];
let n = tokio::time::timeout(TIMEOUT, inbound.read(&mut buf))
.await
.expect("A: timed out reading")
.expect("A: read failed");
println!("Receiver A got: {:?}", String::from_utf8_lossy(&buf[..n]));
assert_eq!(&buf[..n], msg_a);
inbound.write_all(reply_a).await.unwrap();
inbound.flush().await.unwrap();
println!("Receiver A replied");
inbound
},
// Receiver B: read then reply
async {
let mut inbound = inbound_b;
let mut buf = vec![0u8; 1024];
let n = tokio::time::timeout(TIMEOUT, inbound.read(&mut buf))
.await
.expect("B: timed out reading")
.expect("B: read failed");
println!("Receiver B got: {:?}", String::from_utf8_lossy(&buf[..n]));
assert_eq!(&buf[..n], msg_b);
inbound.write_all(reply_b).await.unwrap();
inbound.flush().await.unwrap();
println!("Receiver B replied");
inbound
},
);
let inbound_a = res_a;
let inbound_b = res_b;
println!("\nSender reading replies...");
tokio::join!(
async {
let mut buf = vec![0u8; 1024];
let n = tokio::time::timeout(TIMEOUT, stream_to_a.read(&mut buf))
.await
.expect("timed out reading reply from A")
.expect("read failed");
println!(
"Sender got from A: {:?}",
String::from_utf8_lossy(&buf[..n])
);
assert_eq!(&buf[..n], reply_a);
},
async {
let mut buf = vec![0u8; 1024];
let n = tokio::time::timeout(TIMEOUT, stream_to_b.read(&mut buf))
.await
.expect("timed out reading reply from B")
.expect("read failed");
println!(
"Sender got from B: {:?}",
String::from_utf8_lossy(&buf[..n])
);
assert_eq!(&buf[..n], reply_b);
},
);
println!("\nConcurrent round-trips successful!");
// Streams clean up on drop (unregister from router).
// No close message is sent over the wire — see stream.rs.
drop(stream_to_a);
drop(stream_to_b);
drop(inbound_a);
drop(inbound_b);
sender.disconnect().await;
receiver_a.disconnect().await;
receiver_b.disconnect().await;
}
@@ -0,0 +1,81 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! Sends a 1 MB random file over a MixnetStream and verifies the
//! receiver got an identical copy. Cancel with Ctrl+C.
//!
//! Run with: cargo run --example stream_throughput
use nym_sdk::mixnet;
use rand::RngCore;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
const SIZE: usize = 1024 * 1024; // 1 MB
const TIMEOUT: Duration = Duration::from_secs(300);
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
// Generate random payload
let mut payload = vec![0u8; SIZE];
rand::rngs::OsRng.fill_bytes(&mut payload);
println!("Generated {} bytes of random data", payload.len());
// Connect two clients
println!("Connecting sender...");
let mut sender = mixnet::MixnetClient::connect_new().await.unwrap();
println!("{}", sender.nym_address());
println!("Connecting receiver...");
let mut receiver = mixnet::MixnetClient::connect_new().await.unwrap();
let recv_addr = *receiver.nym_address();
println!("{recv_addr}");
// Open stream
let mut listener = receiver.listener().unwrap();
let mut tx = sender.open_stream(recv_addr, None).await.unwrap();
let mut rx = tokio::time::timeout(TIMEOUT, listener.accept())
.await
.expect("accept timed out")
.expect("listener closed");
println!("Stream established\n");
// Send
let data = payload.clone();
let send_task = tokio::spawn(async move {
tx.write_all(&data).await.unwrap();
tx.flush().await.unwrap();
println!("Sent {} bytes", data.len());
});
// Receive — read exactly SIZE bytes (don't rely on close/EOF - if we need this in future
// iterations we can introduce something like what the TcpProxy module has)
let recv_task = tokio::spawn(async move {
let mut buf = vec![0u8; SIZE];
tokio::time::timeout(TIMEOUT, rx.read_exact(&mut buf))
.await
.expect("receive timed out")
.unwrap();
println!("Received {} bytes", buf.len());
buf
});
let (_, received) = tokio::join!(send_task, recv_task);
let received = received.unwrap();
if received == payload {
println!("\nIntegrity OK");
} else {
eprintln!(
"\nMISMATCH — sent {} bytes, got {}",
payload.len(),
received.len()
);
std::process::exit(1);
}
sender.disconnect().await;
receiver.disconnect().await;
}
+1
View File
@@ -29,6 +29,7 @@ where
St: Storage + Clone,
<St as Storage>::StorageError: Send + Sync + 'static,
{
#[allow(clippy::result_large_err)]
pub(crate) fn new(
network_details: NymNetworkDetails,
mnemonic: String,
+9
View File
@@ -99,6 +99,15 @@ pub enum Error {
#[error("Failed to get shutdown tracker from the task runtime registry: {0}")]
RegistryAccess(#[from] nym_task::RegistryAccessError),
#[error("Cannot use message-based functions after stream mode is activated")]
StreamModeActive,
#[error("Stream listener has already been taken — listener() can only be called once")]
ListenerAlreadyTaken,
#[error("Stream subsystem failed to initialise: reconstructed_receiver unavailable")]
StreamInitFailure,
}
impl Error {
+2
View File
@@ -37,6 +37,7 @@ mod native_client;
mod paths;
mod sink;
mod socks5_client;
pub mod stream;
mod traits;
pub use client::{DisconnectedMixnetClient, IncludedSurbs, MixnetClientBuilder};
@@ -88,4 +89,5 @@ pub use nym_topology::{provider_trait::TopologyProvider, NymTopology};
pub use paths::StoragePaths;
pub use sink::{MixnetMessageSink, MixnetMessageSinkTranslator};
pub use socks5_client::Socks5MixnetClient;
pub use stream::{MixnetListener, MixnetStream, StreamId};
pub use traits::MixnetMessageSender;
+28 -4
View File
@@ -42,8 +42,8 @@ use std::sync::Arc;
use url::Url;
use zeroize::Zeroizing;
// The number of surbs to include in a message by default
const DEFAULT_NUMBER_OF_SURBS: u32 = 10;
/// The number of reply SURBs to include in a message by default.
pub(crate) const DEFAULT_NUMBER_OF_SURBS: u32 = 10;
#[derive(Default)]
pub struct MixnetClientBuilder<S: MixnetClientStorage = Ephemeral> {
@@ -70,6 +70,7 @@ pub struct MixnetClientBuilder<S: MixnetClientStorage = Ephemeral> {
forget_me: ForgetMe,
remember_me: RememberMe,
derivation_material: Option<DerivationMaterial>,
stream_idle_timeout: Option<std::time::Duration>,
}
impl MixnetClientBuilder<Ephemeral> {
@@ -112,6 +113,7 @@ impl MixnetClientBuilder<OnDiskPersistent> {
forget_me: Default::default(),
remember_me: Default::default(),
derivation_material: None,
stream_idle_timeout: None,
})
}
}
@@ -149,6 +151,7 @@ where
forget_me: Default::default(),
remember_me: Default::default(),
derivation_material: None,
stream_idle_timeout: None,
}
}
@@ -175,6 +178,7 @@ where
forget_me: self.forget_me,
remember_me: self.remember_me,
derivation_material: self.derivation_material,
stream_idle_timeout: self.stream_idle_timeout,
}
}
@@ -205,6 +209,15 @@ where
self
}
/// Set the idle timeout for streams. Streams with no activity for this
/// duration are automatically cleaned up by the router.
/// Defaults to 30 minutes if not set.
#[must_use]
pub fn with_stream_idle_timeout(mut self, timeout: std::time::Duration) -> Self {
self.stream_idle_timeout = Some(timeout);
self
}
/// Request a specific gateway instead of a random one.
#[must_use]
pub fn request_gateway(mut self, user_chosen_gateway: String) -> Self {
@@ -352,6 +365,7 @@ where
}
/// Construct a [`DisconnectedMixnetClient`] from the setup specified.
#[allow(clippy::result_large_err)]
pub fn build(self) -> Result<DisconnectedMixnetClient<S>> {
let mut client = DisconnectedMixnetClient::new(
self.config,
@@ -375,6 +389,7 @@ where
client.forget_me = self.forget_me;
client.remember_me = self.remember_me;
client.derivation_material = self.derivation_material;
client.stream_idle_timeout = self.stream_idle_timeout;
Ok(client)
}
}
@@ -443,6 +458,8 @@ where
remember_me: RememberMe,
/// The derivation material to use for the client keys, its up to the caller to save this for rederivation later
derivation_material: Option<DerivationMaterial>,
stream_idle_timeout: Option<std::time::Duration>,
}
impl<S> DisconnectedMixnetClient<S>
@@ -462,6 +479,7 @@ where
/// Callers have the option of supplying further parameters to:
/// - store persistent identities at a location on-disk, if desired;
/// - use SOCKS5 mode
#[allow(clippy::result_large_err)]
fn new(
config: Config,
socks5_config: Option<Socks5>,
@@ -504,6 +522,7 @@ where
forget_me,
remember_me,
derivation_material: None,
stream_idle_timeout: None,
})
}
@@ -906,6 +925,7 @@ where
if self.socks5_config.is_some() {
return Err(Error::Socks5Config { set: true });
}
let stream_idle_timeout = self.stream_idle_timeout;
let (mut started_client, nym_address) = self.connect_to_mixnet_common().await?;
let client_input = started_client.client_input.register_producer();
let mut client_output = started_client.client_output.register_consumer();
@@ -916,7 +936,7 @@ where
let identity_keys = started_client.identity_keys.clone();
let reconstructed_receiver = client_output.register_receiver()?;
Ok(MixnetClient::new(
let mut client = MixnetClient::new(
nym_address,
identity_keys,
client_input,
@@ -928,7 +948,11 @@ where
None,
started_client.forget_me,
started_client.remember_me,
))
);
if let Some(timeout) = stream_idle_timeout {
client.stream_idle_timeout = timeout;
}
Ok(client)
}
}
+81 -6
View File
@@ -1,4 +1,6 @@
use crate::mixnet::client::MixnetClientBuilder;
use crate::mixnet::client::DEFAULT_NUMBER_OF_SURBS;
use crate::mixnet::stream::{MixnetListener, MixnetStream};
use crate::mixnet::traits::MixnetMessageSender;
use crate::{Error, Result};
use async_trait::async_trait;
@@ -21,8 +23,10 @@ use nym_task::connections::{ConnectionCommandSender, LaneQueueLengths};
use nym_task::ShutdownTracker;
use nym_topology::{NymRouteProvider, NymTopology};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::sync::RwLockReadGuard;
use tokio_util::sync::CancellationToken;
@@ -47,7 +51,8 @@ pub struct MixnetClient {
pub(crate) client_state: ClientState,
/// A channel for messages arriving from the mixnet after they have been reconstructed.
pub(crate) reconstructed_receiver: ReconstructedMessagesReceiver,
/// Taken by the stream router on stream mode activation, `None` thereafter.
pub(crate) reconstructed_receiver: Option<ReconstructedMessagesReceiver>,
/// A channel for sending stats event to be reported.
pub(crate) stats_events_reporter: ClientStatsSender,
@@ -56,10 +61,21 @@ pub struct MixnetClient {
pub(crate) shutdown_handle: ShutdownTracker,
pub(crate) packet_type: Option<PacketType>,
// internal state used for the `Stream` implementation
/// Internal state used for the `Stream` implementation
_buffered: Vec<ReconstructedMessage>,
pub(crate) forget_me: ForgetMe,
pub(crate) remember_me: RememberMe,
/// Set to `true` when the stream router is active, preventing
/// message-based functions from being used concurrently.
pub(crate) stream_mode: Arc<AtomicBool>,
/// Opaque stream multiplexing state (lazily initialized by stream module).
pub(crate) streams: Option<super::stream::StreamState>,
/// How long a stream can be idle before the router cleans it up.
pub(crate) stream_idle_timeout: Duration,
}
impl MixnetClient {
@@ -83,13 +99,16 @@ impl MixnetClient {
client_input,
client_output,
client_state,
reconstructed_receiver,
reconstructed_receiver: Some(reconstructed_receiver),
stats_events_reporter,
shutdown_handle: task_handle,
packet_type,
_buffered: Vec::new(),
forget_me,
remember_me,
stream_mode: Arc::new(AtomicBool::new(false)),
streams: None,
stream_idle_timeout: super::stream::DEFAULT_STREAM_IDLE_TIMEOUT,
}
}
@@ -156,6 +175,7 @@ impl MixnetClient {
MixnetClientSender {
client_input: self.client_input.clone(),
packet_type: self.packet_type,
stream_mode: self.stream_mode.clone(),
}
}
@@ -198,7 +218,11 @@ impl MixnetClient {
/// Wait for messages from the mixnet
pub async fn wait_for_messages(&mut self) -> Option<Vec<ReconstructedMessage>> {
self.reconstructed_receiver.next().await
if self.stream_mode.load(Ordering::SeqCst) {
tracing::warn!("wait_for_messages() called after stream mode activated");
return None;
}
self.reconstructed_receiver.as_mut()?.next().await
}
/// Provide a callback to execute on incoming messages from the mixnet.
@@ -280,23 +304,66 @@ impl MixnetClient {
}
}
}
/// Open a stream to a remote peer.
///
/// Returns a [`MixnetStream`] implementing `AsyncRead + AsyncWrite`.
/// `reply_surbs` controls how many reply SURBs are included with each
/// outbound message so the peer can reply. Defaults to 10 if `None`.
pub async fn open_stream(
&mut self,
recipient: Recipient,
reply_surbs: Option<u32>,
) -> Result<MixnetStream> {
super::stream::open_stream(
self,
recipient,
reply_surbs.unwrap_or(DEFAULT_NUMBER_OF_SURBS),
)
.await
}
/// Create a listener that accepts inbound streams from remote peers.
///
/// Can only be called once.
pub fn listener(&mut self) -> Result<MixnetListener> {
super::stream::listener(self)
}
}
#[derive(Clone)]
pub struct MixnetClientSender {
client_input: ClientInput,
packet_type: Option<PacketType>,
stream_mode: Arc<AtomicBool>,
}
impl Clone for MixnetClientSender {
fn clone(&self) -> Self {
Self {
client_input: self.client_input.clone(),
packet_type: self.packet_type,
stream_mode: self.stream_mode.clone(),
}
}
}
impl Stream for MixnetClient {
type Item = ReconstructedMessage;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.stream_mode.load(Ordering::SeqCst) {
tracing::warn!("Stream::poll_next() called after stream mode activated");
return Poll::Ready(None);
}
if let Some(next) = self._buffered.pop() {
cx.waker().wake_by_ref();
return Poll::Ready(Some(next));
}
match ready!(Pin::new(&mut self.reconstructed_receiver).poll_next(cx)) {
let receiver = match self.reconstructed_receiver.as_mut() {
Some(rx) => rx,
None => return Poll::Ready(None),
};
match ready!(Pin::new(receiver).poll_next(cx)) {
None => Poll::Ready(None),
Some(mut msgs) => {
// the vector itself should never be empty
@@ -327,6 +394,10 @@ impl MixnetMessageSender for MixnetClient {
}
async fn send(&self, message: InputMessage) -> Result<()> {
if self.stream_mode.load(Ordering::SeqCst) {
tracing::warn!("send() called after stream mode activated");
return Err(Error::StreamModeActive);
}
self.client_input
.send(message)
.await
@@ -341,6 +412,10 @@ impl MixnetMessageSender for MixnetClientSender {
}
async fn send(&self, message: InputMessage) -> Result<()> {
if self.stream_mode.load(Ordering::SeqCst) {
tracing::warn!("send() called after stream mode activated");
return Err(Error::StreamModeActive);
}
self.client_input
.send(message)
.await
+1
View File
@@ -53,6 +53,7 @@ impl StoragePaths {
///
/// This function will return an error if it is passed a path to an existing file instead of a
/// directory.
#[allow(clippy::result_large_err)]
pub fn new_from_dir<P: AsRef<Path>>(dir: P) -> Result<Self> {
let dir = dir.as_ref();
if dir.is_file() {
+1
View File
@@ -26,6 +26,7 @@ const SINK_BUFFER_SIZE_IN_MESSAGES: usize = 8;
/// Traits that represents the ability to convert bytes into InputMessages that can be sent to the
/// mixnet. This is typically used to set the destination and other sending parameters.
pub trait MixnetMessageSinkTranslator: Unpin {
#[allow(clippy::result_large_err)]
fn to_input_message(&self, bytes: &[u8]) -> Result<InputMessage, Error>;
}
@@ -0,0 +1,201 @@
//! Per-stream handle implementing `AsyncRead + AsyncWrite`.
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::BytesMut;
use futures::{ready, SinkExt};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::sync::mpsc;
use nym_client_core::client::base_client::ClientInput;
use nym_client_core::client::inbound_messages::InputMessage;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::params::PacketType;
use nym_task::connections::TransmissionLane;
use tokio_util::sync::PollSender;
use super::protocol::{encode_stream_message, StreamId, StreamMessageType};
use super::StreamMap;
/// How to address outbound messages on this stream.
enum Destination {
/// We know the peer's Nym address.
Address {
recipient: Box<Recipient>,
reply_surbs: u32,
},
/// We reply via the opener's anonymous sender tag.
Anonymous { sender_tag: AnonymousSenderTag },
}
/// A byte stream to a single remote Nym client.
///
/// Provides `AsyncRead + AsyncWrite`. Created via
/// [`MixnetClient::open_stream`] (outbound) or
/// [`MixnetListener::accept`] (inbound).
pub struct MixnetStream {
id: StreamId,
destination: Destination,
sender: PollSender<InputMessage>,
packet_type: Option<PacketType>,
streams: StreamMap,
inbound_rx: mpsc::UnboundedReceiver<Vec<u8>>,
read_buf: BytesMut,
deregistered: bool,
}
impl MixnetStream {
/// Create a stream we initiated to a known recipient.
pub(crate) fn new_outbound(
id: StreamId,
recipient: Recipient,
reply_surbs: u32,
client_input: ClientInput,
packet_type: Option<PacketType>,
streams: StreamMap,
inbound_rx: mpsc::UnboundedReceiver<Vec<u8>>,
) -> Self {
let sender = PollSender::new(client_input.input_sender.clone());
Self {
id,
destination: Destination::Address {
recipient: Box::new(recipient),
reply_surbs,
},
sender,
packet_type,
streams,
inbound_rx,
read_buf: BytesMut::new(),
deregistered: false,
}
}
/// Create a stream accepted from a remote peer's Open message.
pub(crate) fn new_inbound(
id: StreamId,
sender_tag: AnonymousSenderTag,
client_input: ClientInput,
packet_type: Option<PacketType>,
streams: StreamMap,
inbound_rx: mpsc::UnboundedReceiver<Vec<u8>>,
initial_data: Vec<u8>,
) -> Self {
let mut read_buf = BytesMut::new();
if !initial_data.is_empty() {
read_buf.extend_from_slice(&initial_data);
}
let sender = PollSender::new(client_input.input_sender.clone());
Self {
id,
destination: Destination::Anonymous { sender_tag },
sender,
packet_type,
streams,
inbound_rx,
read_buf,
deregistered: false,
}
}
/// Return the unique identifier for this stream.
pub fn id(&self) -> StreamId {
self.id
}
/// Wrap `data` in the appropriate `InputMessage` for this stream's destination.
fn make_input_message(&self, data: Vec<u8>) -> InputMessage {
match &self.destination {
Destination::Address {
recipient,
reply_surbs,
} => InputMessage::new_anonymous(
**recipient,
data,
*reply_surbs,
TransmissionLane::General,
self.packet_type,
),
Destination::Anonymous { sender_tag } => InputMessage::new_reply(
*sender_tag,
data,
TransmissionLane::General,
self.packet_type,
),
}
}
}
impl Drop for MixnetStream {
fn drop(&mut self) {
if !self.deregistered {
self.streams.remove_background(self.id);
}
}
}
impl AsyncRead for MixnetStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf,
) -> Poll<std::io::Result<()>> {
// Drain spillover first
if !self.read_buf.is_empty() {
let n = std::cmp::min(buf.remaining(), self.read_buf.len());
buf.put_slice(&self.read_buf.split_to(n));
return Poll::Ready(Ok(()));
}
match ready!(self.inbound_rx.poll_recv(cx)) {
Some(data) => {
let n = std::cmp::min(buf.remaining(), data.len());
buf.put_slice(&data[..n]);
if n < data.len() {
self.read_buf.extend_from_slice(&data[n..]);
}
Poll::Ready(Ok(()))
}
None => Poll::Ready(Ok(())), // EOF
}
}
}
impl AsyncWrite for MixnetStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
if buf.is_empty() {
return Poll::Ready(Ok(0));
}
ready!(self.sender.poll_ready_unpin(cx))
.map_err(|_| std::io::Error::other("mixnet input channel closed"))?;
let wire = encode_stream_message(&self.id, StreamMessageType::Data, buf);
let msg = self.make_input_message(wire);
self.sender
.start_send_unpin(msg)
.map_err(|_| std::io::Error::other("failed to send stream message"))?;
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
if !self.deregistered {
self.streams.remove_background(self.id);
self.deregistered = true;
}
Poll::Ready(Ok(()))
}
}
+404
View File
@@ -0,0 +1,404 @@
//! Stream multiplexing for `MixnetClient`.
//!
//! A [`MixnetStream`] is a byte channel (`AsyncRead + AsyncWrite`) to a
//! remote peer, identified by a [`StreamId`]. A single `MixnetClient`
//! can hold many streams to different peers concurrently.
//!
//! A background router task reads the client's `reconstructed_receiver`,
//! parses the stream header, and dispatches each payload to the right
//! stream's channel (or to the listener for `Open` messages).
mod mixnet_stream;
mod protocol;
pub use mixnet_stream::MixnetStream;
pub use protocol::StreamId;
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use futures::StreamExt;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{trace, warn};
use nym_client_core::client::inbound_messages::InputMessage;
use nym_client_core::client::received_buffer::ReconstructedMessagesReceiver;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_task::connections::TransmissionLane;
use protocol::{decode_stream_message, encode_stream_message, StreamMessageType};
use crate::mixnet::native_client::MixnetClient;
use crate::{Error, Result};
/// Default idle timeout before a stream is considered stale and cleaned up.
pub(crate) const DEFAULT_STREAM_IDLE_TIMEOUT: Duration = Duration::from_secs(30 * 60);
/// Maximum interval between stale-stream checks. The actual check interval
/// is `min(idle_timeout, MAX_CLEANUP_INTERVAL)` so that short idle timeouts
/// are respected promptly rather than waiting up to 60 s for the next sweep.
const MAX_CLEANUP_INTERVAL: Duration = Duration::from_secs(10);
/// Per-stream state stored in the routing table.
struct StreamEntry {
sender: mpsc::UnboundedSender<Vec<u8>>,
last_activity: Instant,
}
/// The shared stream routing table.
///
/// Wraps the map of active streams behind an async mutex with focused
/// methods so callers never touch the lock directly.
#[derive(Clone)]
pub(crate) struct StreamMap {
inner: Arc<tokio::sync::Mutex<HashMap<StreamId, StreamEntry>>>,
}
impl StreamMap {
fn new() -> Self {
Self {
inner: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
}
}
/// Register a new stream, returning the receiver end of its data channel.
async fn register_stream(&self, stream_id: StreamId) -> mpsc::UnboundedReceiver<Vec<u8>> {
let (tx, rx) = mpsc::unbounded_channel();
self.inner.lock().await.insert(
stream_id,
StreamEntry {
sender: tx,
last_activity: Instant::now(),
},
);
rx
}
/// Remove a stream from the map.
async fn remove(&self, stream_id: &StreamId) {
self.inner.lock().await.remove(stream_id);
}
/// Remove a stream without awaiting — for use in `Drop` and `poll_shutdown`
/// where we cannot `.await`. Spawns a lightweight background task.
fn remove_background(&self, stream_id: StreamId) {
let inner = self.inner.clone();
tokio::spawn(async move {
inner.lock().await.remove(&stream_id);
});
}
/// Dispatch data to a stream's channel. Updates `last_activity` on
/// success. Removes the entry if the receiver has been dropped.
async fn send_to_stream(&self, stream_id: &StreamId, data: Vec<u8>) {
let mut map = self.inner.lock().await;
let should_remove = if let Some(entry) = map.get_mut(stream_id) {
if entry.sender.send(data).is_err() {
true
} else {
entry.last_activity = Instant::now();
false
}
} else {
false
};
if should_remove {
map.remove(stream_id);
}
}
/// Remove streams that have been idle longer than `max_idle`.
async fn cleanup_stale(&self, max_idle: Duration) {
let now = Instant::now();
let mut map = self.inner.lock().await;
map.retain(|id, entry| {
let stale = now.duration_since(entry.last_activity) >= max_idle;
if stale {
trace!("Cleaning up stale stream {id} (idle > {max_idle:?})");
}
!stale
});
}
}
/// Delivered to the listener when a remote peer opens a new stream.
struct InboundOpen {
stream_id: StreamId,
sender_tag: Option<AnonymousSenderTag>,
initial_data: Vec<u8>,
}
/// Owns the router task and the shared state for all streams on a client.
/// The router is a background task that reads reconstructed messages from the
/// mixnet, decodes the stream header, and dispatches each payload to the
/// correct stream's channel (or to the listener for new `Open` messages).
pub(crate) struct StreamState {
streams: StreamMap,
listener_rx: Option<mpsc::UnboundedReceiver<InboundOpen>>,
shutdown: CancellationToken,
_router_handle: tokio::task::JoinHandle<()>,
}
impl Drop for StreamState {
fn drop(&mut self) {
self.shutdown.cancel();
}
}
/// Accepts inbound streams opened by remote peers.
///
/// Created via [`MixnetClient::listener`]. Each `accept()` returns a
/// `MixnetStream` ready for reading and writing.
pub struct MixnetListener {
inbound_rx: mpsc::UnboundedReceiver<InboundOpen>,
client_input: nym_client_core::client::base_client::ClientInput,
packet_type: Option<nym_sphinx::params::PacketType>,
streams: StreamMap,
}
impl MixnetListener {
/// Wait for a remote peer to open a stream.
///
/// Returns `None` if the router has shut down.
pub async fn accept(&mut self) -> Option<MixnetStream> {
loop {
let req = self.inbound_rx.recv().await?;
let sender_tag = match req.sender_tag {
Some(tag) => tag,
None => {
warn!(
"Listener: Open for {} has no sender_tag, skipping",
req.stream_id
);
continue;
}
};
let rx = self.streams.register_stream(req.stream_id).await;
return Some(MixnetStream::new_inbound(
req.stream_id,
sender_tag,
self.client_input.clone(),
self.packet_type,
self.streams.clone(),
rx,
req.initial_data,
));
}
}
}
/// Background loop that demuxes incoming mixnet messages into per-stream channels.
async fn run_router(
mut reconstructed_rx: ReconstructedMessagesReceiver,
streams: StreamMap,
listener_tx: mpsc::UnboundedSender<InboundOpen>,
shutdown: CancellationToken,
idle_timeout: Duration,
) {
let check_every = std::cmp::min(idle_timeout, MAX_CLEANUP_INTERVAL);
let mut cleanup_interval = tokio::time::interval(check_every);
cleanup_interval.tick().await; // consume the immediate first tick
loop {
let messages = tokio::select! {
_ = shutdown.cancelled() => break,
_ = cleanup_interval.tick() => {
streams.cleanup_stale(idle_timeout).await;
continue;
}
msg = reconstructed_rx.next() => match msg {
Some(messages) => messages,
None => break,
},
};
for msg in messages {
let Some(frame) = decode_stream_message(&msg.message) else {
trace!(
"Router: non-stream message ({} bytes), dropping",
msg.message.len()
);
continue;
};
let stream_id = frame.header.stream_id;
match frame.header.message_type {
StreamMessageType::Open => {
let _ = listener_tx.send(InboundOpen {
stream_id,
sender_tag: msg.sender_tag,
initial_data: frame.data.to_vec(),
});
}
StreamMessageType::Data => {
streams
.send_to_stream(&stream_id, frame.data.to_vec())
.await;
} // TODO: if we decide we need close logic add another enum member
}
}
}
}
/// Lazily initialise the stream subsystem and router on first use.
fn ensure_init(client: &mut MixnetClient) -> Result<&mut StreamState> {
if client.streams.is_none() {
let real_rx = client
.reconstructed_receiver
.take()
.ok_or(Error::StreamInitFailure)?;
// Set after take() succeeds so we don't leave the client in a
// broken state (stream_mode=true but no router) on failure.
client.stream_mode.store(true, Ordering::SeqCst);
let streams = StreamMap::new();
let (listener_tx, listener_rx) = mpsc::unbounded_channel();
let shutdown = CancellationToken::new();
let router_handle = tokio::spawn(run_router(
real_rx,
streams.clone(),
listener_tx,
shutdown.clone(),
client.stream_idle_timeout,
));
client.streams = Some(StreamState {
streams,
listener_rx: Some(listener_rx),
shutdown,
_router_handle: router_handle,
});
}
client.streams.as_mut().ok_or(Error::StreamInitFailure)
}
/// Open a stream to a remote peer.
pub(crate) async fn open_stream(
client: &mut MixnetClient,
recipient: Recipient,
reply_surbs: u32,
) -> Result<MixnetStream> {
let streams = ensure_init(client)?.streams.clone();
let stream_id = StreamId::random();
let rx = streams.register_stream(stream_id).await;
// Send Open to the peer
let wire = encode_stream_message(&stream_id, StreamMessageType::Open, &[]);
let msg = InputMessage::new_anonymous(
recipient,
wire,
reply_surbs,
TransmissionLane::General,
client.packet_type,
);
if (client.client_input.send(msg).await).is_err() {
streams.remove(&stream_id).await;
return Err(Error::MessageSendingFailure);
}
Ok(MixnetStream::new_outbound(
stream_id,
recipient,
reply_surbs,
client.client_input.clone(),
client.packet_type,
streams,
rx,
))
}
/// Create a listener that accepts inbound streams. Can only be called once.
pub(crate) fn listener(client: &mut MixnetClient) -> Result<MixnetListener> {
let state = ensure_init(client)?;
let listener_rx = state
.listener_rx
.take()
.ok_or(Error::ListenerAlreadyTaken)?;
let streams = state.streams.clone();
Ok(MixnetListener {
inbound_rx: listener_rx,
client_input: client.client_input.clone(),
packet_type: client.packet_type,
streams,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test(start_paused = true)]
async fn cleanup_stale_removes_idle_streams() {
let map = StreamMap::new();
let timeout = Duration::from_secs(10);
// Register two streams
let _rx_a = map.register_stream(StreamId::random()).await;
let _rx_b = map.register_stream(StreamId::random()).await;
// Advance time past the timeout
tokio::time::advance(timeout + Duration::from_secs(1)).await;
// Register a fresh stream (should survive cleanup)
let id_c = StreamId::random();
let _rx_c = map.register_stream(id_c).await;
map.cleanup_stale(timeout).await;
let inner = map.inner.lock().await;
assert_eq!(inner.len(), 1);
assert!(inner.contains_key(&id_c));
}
#[tokio::test(start_paused = true)]
async fn send_to_stream_updates_last_activity() {
let map = StreamMap::new();
let timeout = Duration::from_secs(10);
let id = StreamId::random();
let _rx = map.register_stream(id).await;
// Advance most of the way through the timeout
tokio::time::advance(Duration::from_secs(8)).await;
// Activity on the stream resets its timer
map.send_to_stream(&id, vec![1, 2, 3]).await;
// Advance past the original timeout, but only 5s since last activity
tokio::time::advance(Duration::from_secs(5)).await;
map.cleanup_stale(timeout).await;
// Stream should survive — last activity was 5s ago, not 13s
assert_eq!(map.inner.lock().await.len(), 1);
}
#[tokio::test(start_paused = true)]
async fn cleanup_does_not_remove_active_streams() {
let map = StreamMap::new();
let timeout = Duration::from_secs(10);
let id = StreamId::random();
let _rx = map.register_stream(id).await;
// Advance less than the timeout
tokio::time::advance(Duration::from_secs(5)).await;
map.cleanup_stale(timeout).await;
assert_eq!(map.inner.lock().await.len(), 1);
}
}
@@ -0,0 +1,198 @@
//! Wire protocol for stream multiplexing.
//!
//! Every message between streams carries a fixed header prepended to
//! the payload inside the mixnet message body:
//!
//! ```text
//! [Version: 1 byte][StreamId: 8 bytes][MessageType: 1 byte][payload: N bytes]
//! ```
//!
//! This header sits inside the sphinx packet payload.
use std::fmt;
/// Current stream protocol version.
pub const STREAM_PROTOCOL_VERSION: u8 = 1;
/// Length of a StreamId in bytes (u64, big-endian).
pub const STREAM_ID_LEN: usize = 8;
/// Total header length: Version (1) + StreamId (8) + MessageType (1).
pub const STREAM_HEADER_LEN: usize = 1 + STREAM_ID_LEN + 1;
/// Identifies a stream within a MixnetClient.
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct StreamId(u64);
impl StreamId {
pub fn random() -> Self {
Self(rand::random::<u64>())
}
pub fn to_bytes(self) -> [u8; STREAM_ID_LEN] {
self.0.to_be_bytes()
}
pub fn from_bytes(bytes: [u8; STREAM_ID_LEN]) -> Self {
Self(u64::from_be_bytes(bytes))
}
}
impl fmt::Debug for StreamId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "StreamId({:#018x})", self.0)
}
}
impl fmt::Display for StreamId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self, f)
}
}
/// Message types within the stream protocol.
///
/// Note: there is no Close variant. Without message sequencing, a close
/// message races ahead of in-flight data and arrives before the data is
/// reconstructed. Streams clean up locally via Drop. If ordered close/EOF
/// is needed in future, add sequencing + reorder buffering (see the
/// tcp_proxy's `MessageBuffer` for a working example).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum StreamMessageType {
/// Open a new stream. Payload is optional initial data.
Open = 0,
/// Data on an existing stream.
Data = 1,
}
impl StreamMessageType {
pub fn from_byte(b: u8) -> Option<Self> {
match b {
0 => Some(Self::Open),
1 => Some(Self::Data),
_ => None,
}
}
}
/// The fixed-size header prepended to every stream message.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MixStreamHeader {
pub version: u8,
pub stream_id: StreamId,
pub message_type: StreamMessageType,
}
/// A decoded stream frame: header + payload reference.
#[derive(Debug)]
pub struct MixStreamFrame<'a> {
pub header: MixStreamHeader,
pub data: &'a [u8],
}
/// Encode a stream message: `[version][stream_id][msg_type][payload]`.
pub fn encode_stream_message(
id: &StreamId,
msg_type: StreamMessageType,
payload: &[u8],
) -> Vec<u8> {
let mut buf = Vec::with_capacity(STREAM_HEADER_LEN + payload.len());
buf.push(STREAM_PROTOCOL_VERSION);
buf.extend_from_slice(&id.to_bytes());
buf.push(msg_type as u8);
buf.extend_from_slice(payload);
buf
}
/// Decode a stream message into a [`MixStreamFrame`].
///
/// Returns `None` if the buffer is too short, the version is unknown,
/// or the message type byte is invalid.
pub fn decode_stream_message(bytes: &[u8]) -> Option<MixStreamFrame<'_>> {
if bytes.len() < STREAM_HEADER_LEN {
return None;
}
let version = bytes[0];
if version != STREAM_PROTOCOL_VERSION {
return None;
}
let mut id_bytes = [0u8; STREAM_ID_LEN];
id_bytes.copy_from_slice(&bytes[1..1 + STREAM_ID_LEN]);
let stream_id = StreamId::from_bytes(id_bytes);
let message_type = StreamMessageType::from_byte(bytes[1 + STREAM_ID_LEN])?;
let data = &bytes[STREAM_HEADER_LEN..];
Some(MixStreamFrame {
header: MixStreamHeader {
version,
stream_id,
message_type,
},
data,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn roundtrip() {
let id = StreamId::random();
let payload = b"hello world";
let encoded = encode_stream_message(&id, StreamMessageType::Data, payload);
let frame = decode_stream_message(&encoded).unwrap();
assert_eq!(frame.header.version, STREAM_PROTOCOL_VERSION);
assert_eq!(frame.header.stream_id, id);
assert_eq!(frame.header.message_type, StreamMessageType::Data);
assert_eq!(frame.data, payload);
}
#[test]
fn too_short() {
assert!(decode_stream_message(&[0u8; 5]).is_none());
}
#[test]
fn bad_version() {
let id = StreamId::random();
let mut encoded = encode_stream_message(&id, StreamMessageType::Data, b"x");
encoded[0] = 0xFF;
assert!(decode_stream_message(&encoded).is_none());
}
#[test]
fn bad_message_type() {
let mut buf = [0u8; STREAM_HEADER_LEN];
buf[0] = STREAM_PROTOCOL_VERSION;
buf[1 + STREAM_ID_LEN] = 0xFF;
assert!(decode_stream_message(&buf).is_none());
}
#[test]
fn empty_payload() {
let id = StreamId::random();
let encoded = encode_stream_message(&id, StreamMessageType::Open, &[]);
let frame = decode_stream_message(&encoded).unwrap();
assert_eq!(frame.header.message_type, StreamMessageType::Open);
assert!(frame.data.is_empty());
}
#[test]
fn header_wire_format() {
let id = StreamId::from_bytes([0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77]);
let encoded = encode_stream_message(&id, StreamMessageType::Open, &[0xAA]);
assert_eq!(encoded.len(), STREAM_HEADER_LEN + 1);
assert_eq!(encoded[0], STREAM_PROTOCOL_VERSION);
assert_eq!(
&encoded[1..9],
&[0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77]
);
assert_eq!(encoded[9], StreamMessageType::Open as u8);
assert_eq!(encoded[10], 0xAA);
}
}