Max/sdk docrs (#6566)

* Improve SDK rustdoc and add ARCHITECTURE.md files

- Rewrite lib.rs module docs with quick-start example and module overview
- Add stream example and include_str! ARCHITECTURE.md to mixnet module
- Add ARCHITECTURE.md for mixnet, client_pool, and stream modules
- Add rustdoc to MixnetClientBuilder, MixnetClientSender, MixnetMessageSender
- Add cancel safety and drop behavior annotations to async methods
- Add TcpProxy deprecation notice pointing to stream module

* Fix rustdoc errors and add stepwise comments to remaining examples

Rustdoc fixes:
- Add missing .unwrap() on connect_new example
- Replace broken turbofish intra-doc link in MixnetClientBuilder
- Fix NymProxyServer::new args in tcp_proxy example
- Wrap BandwidthImporter example in scoped block to fix borrow-then-move
- Change misleading "5-hop routing" to "multi-hop routing"
- Fix copy-paste "forget me" in send_remember_me error message
- Fix wrong cargo run command in stream_simple_read_write
- Fix DecayWrapper description

* Cut down doc comment length

* Trimmed down SDK ARCHITECTURE files

* Slim Rust SDK docs and rename opener to dialer

- Merge tour page into SDK landing page, delete tour.mdx
- Trim all three tutorials: cut boilerplate, duplicated code, and misplaced content
- Make FFI page evergreen with Go and C++ snippets, link to repo examples
- Rename "opener" to "dialer" in stream docs, source ARCHITECTURE.md, and rustdoc
- Add reply-to-open arrow in stream mermaid diagram
- Replace remaining Unicode dashes in mermaid flowchart

* - elevate streams in rustdoc: examples on lib.rs, MixnetClient, open_stream, listener
- add stream quick reference to mixnet ARCHITECTURE.md
- add stream types to key types list in ARCHITECTURE.md
- add docs.rs links for AsyncRead/AsyncWrite and stream submodule
- tcp_proxy: replace bold deprecation with warning box

* - replace individual example doc pages with GitHub-linked tables
- add step-by-step inline comments to all SDK example source files
- add doc comments to examples missing them (simple, surb_reply, builder, etc.)
- expand mixnet tutorial with persistent identity and split_sender sections
- add tcpproxy tutorial
- rename "API Reference" to "TypeDoc Reference" in TS SDK sidebar
- rename "Misc" to "Extras" in developer sidebar, move VPN CLI up
- remove echo server from tools
- update message-queue callout to reference actual modules
- fix mixnet/examples redirect collision

* Add missing mut to example code

* Update ARCHITECTURE.md with LP Framing + stream examples with sequencing

* Update doc comment in utils.rs

* Standardise commenting style across Rust SDK examples

* Fix inline doc examples and trim re-export boilerplate

* Update sdk/rust/nym-sdk/examples/bandwidth.rs

Co-authored-by: Simon Wicky <simon@nymtech.net>

* Fix review comments

---------

Co-authored-by: Simon Wicky <simon@nymtech.net>
This commit is contained in:
mfahampshire
2026-04-10 10:51:38 +00:00
committed by GitHub
parent 82ed88e26e
commit 9db748e8dd
39 changed files with 1770 additions and 676 deletions
+15 -7
View File
@@ -1,3 +1,10 @@
//! Paid bandwidth credentials using the Ecash scheme.
//!
//! Acquires a bandwidth credential from the sandbox network, connects
//! with it, and sends a message to self. Requires the sandbox `.env`.
//!
//! Run with: cargo run --example bandwidth
use futures::StreamExt;
use nym_credentials_interface::TicketType;
use nym_network_defaults::setup_env;
@@ -7,10 +14,12 @@ use nym_sdk::mixnet::MixnetMessageSender;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
nym_bin_common::logging::setup_tracing_logger();
// right now, only sandbox has coconut setup
// this should be run from the `sdk/rust/nym-sdk` directory
// Load the sandbox environment.
// Run from the `sdk/rust/nym-sdk` directory so the relative path resolves.
setup_env(Some("../../../envs/sandbox.env"));
// Build a credentials-enabled client (not yet connected).
let sandbox_network = mixnet::NymNetworkDetails::new_from_env();
let mnemonic = String::from("my super secret mnemonic");
@@ -19,19 +28,17 @@ async fn main() -> anyhow::Result<()> {
.enable_credentials_mode()
.build()?;
// Acquire a bandwidth credential (ticketbook) before connecting.
let bandwidth_client = mixnet_client
.create_bandwidth_client(mnemonic, TicketType::V1MixnetEntry)
.await?;
// Get a bandwidth credential for the mixnet_client
bandwidth_client.acquire().await?;
// Connect using paid bandwidth credential
// Connect to the mixnet using the acquired credential.
let mut client = mixnet_client.connect_to_mixnet().await?;
let our_address = client.nym_address();
// Send a message throughout the mixnet to ourselves
// Send a message to ourselves and wait for it.
client
.send_plain_message(*our_address, "hello there")
.await?;
@@ -40,6 +47,7 @@ async fn main() -> anyhow::Result<()> {
let received = client.next().await.unwrap();
println!("Received: {}", String::from_utf8_lossy(&received.message));
// Disconnect for clean shutdown.
client.disconnect().await;
Ok(())
}
+12 -6
View File
@@ -1,3 +1,10 @@
//! Using `MixnetClientBuilder` with ephemeral (in-memory) keys.
//!
//! The builder lets you configure the client before connecting.
//! Ephemeral keys are generated in memory and discarded on disconnect.
//!
//! Run with: cargo run --example builder
use nym_sdk::mixnet;
use nym_sdk::mixnet::MixnetMessageSender;
@@ -5,20 +12,18 @@ use nym_sdk::mixnet::MixnetMessageSender;
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
// Create client builder, including ephemeral keys. The builder can be usable in the context
// where you don't want to connect just yet.
// Create a builder with ephemeral keys.
// The builder lets you configure the client before connecting.
let client = mixnet::MixnetClientBuilder::new_ephemeral()
.build()
.unwrap();
// Now we connect to the mixnet, using ephemeral keys already created
// Connect to the mixnet.
let mut client = client.connect_to_mixnet().await.unwrap();
// Be able to get our client address
let our_address = client.nym_address();
println!("Our client nym address is: {our_address}");
// Send a message through the mixnet to ourselves
// Send a message and wait for it.
client
.send_plain_message(*our_address, "hello there")
.await
@@ -31,5 +36,6 @@ async fn main() {
}
}
// Always disconnect for clean shutdown.
client.disconnect().await;
}
@@ -1,3 +1,10 @@
//! Using `MixnetClientBuilder` with persistent on-disk key storage.
//!
//! Keys are generated on the first run, then loaded from disk on
//! subsequent runs so the client keeps the same Nym address.
//!
//! Run with: cargo run --example builder_with_storage
use nym_sdk::mixnet;
use nym_sdk::mixnet::MixnetMessageSender;
use std::path::PathBuf;
@@ -6,26 +13,24 @@ use std::path::PathBuf;
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
// Specify some config options
// Point storage at a directory.
// If keys exist there they are loaded; otherwise new ones are generated.
let config_dir = PathBuf::from("/tmp/mixnet-client");
let storage_paths = mixnet::StoragePaths::new_from_dir(&config_dir).unwrap();
// Create the client with a storage backend, and enable it by giving it some paths. If keys
// exists at these paths, they will be loaded, otherwise they will be generated.
// Build the client with on-disk persistent storage.
let client = mixnet::MixnetClientBuilder::new_with_default_storage(storage_paths)
.await
.unwrap()
.build()
.unwrap();
// Now we connect to the mixnet, using keys now stored in the paths provided.
// Connect to the mixnet.
let mut client = client.connect_to_mixnet().await.unwrap();
// Be able to get our client address
let our_address = client.nym_address();
println!("Our client nym address is: {our_address}");
// Send a message throught the mixnet to ourselves
// Send a message to ourselves and wait for it.
client
.send_plain_message(*our_address, "hello there")
.await
@@ -38,5 +43,6 @@ async fn main() {
}
}
// Always disconnect for clean shutdown.
client.disconnect().await;
}
+19 -6
View File
@@ -1,30 +1,42 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! Pre-warmed pool of ephemeral Mixnet clients.
//!
//! The pool keeps a reserve of connected clients so that new connections
//! can be served instantly. When the pool is empty, a fallback ephemeral
//! client is created on-demand (with higher latency).
//!
//! Run with: cargo run --example client_pool -- ../../../envs/<NETWORK>.env
use anyhow::Result;
use nym_network_defaults::setup_env;
use nym_sdk::client_pool::ClientPool;
use nym_sdk::mixnet::{MixnetClientBuilder, NymNetworkDetails};
use tokio::signal::ctrl_c;
// This client pool is used internally by the TcpProxyClient but can also be used by the Mixnet module, in case you're quickly swapping clients in and out but won't want to use the TcpProxy module for whatever reason.
//
// Run with: cargo run --example client_pool -- ../../../envs/<NETWORK>.env
#[tokio::main]
async fn main() -> Result<()> {
nym_bin_common::logging::setup_tracing_logger();
setup_env(std::env::args().nth(1));
let conn_pool = ClientPool::new(2); // Start the Client Pool with 2 Clients always being kept in reserve
// Create a pool that maintains 2 clients in reserve.
let conn_pool = ClientPool::new(2);
// Start the pool's background loop in a spawned task.
// It will continuously create clients until the reserve is full.
let client_maker = conn_pool.clone();
tokio::spawn(async move {
client_maker.start().await?;
Ok::<(), anyhow::Error>(())
});
// Wait for the pool to fill up.
println!("\n\nWaiting a few seconds to fill pool\n\n");
tokio::time::sleep(tokio::time::Duration::from_secs(15)).await;
// Grab clients from the pool in two concurrent tasks.
// If the pool is empty, fall back to creating an ephemeral client.
let pool_clone_one = conn_pool.clone();
let pool_clone_two = conn_pool.clone();
@@ -54,7 +66,7 @@ async fn main() -> Result<()> {
let our_address = client_one.nym_address();
println!("\n\nClient 1: {our_address}\n\n");
client_one.disconnect().await;
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; // Emulate doing something
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
Ok::<(), anyhow::Error>(())
});
@@ -84,10 +96,11 @@ async fn main() -> Result<()> {
let our_address = *client_two.nym_address();
println!("\n\nClient 2: {our_address}\n\n");
client_two.disconnect().await;
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; // Emulate doing something
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
Ok::<(), anyhow::Error>(())
});
// Wait for ctrl-c, then shut down the pool.
wait_for_ctrl_c(conn_pool).await?;
Ok(())
}
+14 -7
View File
@@ -1,8 +1,14 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
// use nym_client::client::config::{BaseClientConfig, Config, GatewayEndpointConfig};
// use nym_client::client::{DirectClient, KeyManager, Recipient, ReconstructedMessage, SocketClient};
//! Sending control requests to a service provider via the mixnet.
//!
//! Demonstrates `send_message` with explicit SURB counts and the
//! `nym-service-providers-common` request/response protocol. Sends
//! Health, BinaryInfo, and SupportedRequestVersions queries.
//!
//! Run with: cargo run --example control_requests
use nym_sdk::mixnet::{
IncludedSurbs, MixnetClient, MixnetMessageSender, Recipient, ReconstructedMessage,
};
@@ -32,13 +38,11 @@ async fn wait_for_control_response(client: &mut MixnetClient) -> ControlResponse
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// technically we don't need to start the entire client with all the subroutines,
// but I needed an easy way of sending to and receiving from the mixnet
// and that was the most straightforward way of achieving it
// Connect an ephemeral client.
let mut client = MixnetClient::connect_new().await.unwrap();
let provider: Recipient = "8YF6f8x17j3fviBdU87EGD9g9MAgn9DARxunwLEVM7Bm.4ydfpjbTjCmzj58hWdQjxU2gT6CRVnTbnKajr2hAGBBM@2xU4CBE6QiiYt6EyBXSALwxkNvM7gqJfjHXaMkjiFmYW".parse().unwrap();
// generic service provider request, so we don't even need to care it's to the socks5 provider
// Build control requests using the service-provider interface.
let request_health = ControlRequest::Health;
let request_binary_info = ControlRequest::BinaryInfo;
let request_versions = ControlRequest::SupportedRequestVersions;
@@ -50,7 +54,7 @@ async fn main() -> anyhow::Result<()> {
let full_request_versions: Request =
Request::new_control(ProviderInterfaceVersion::new_current(), request_versions);
// TODO: currently we HAVE TO use surbs unfortunately
// Send a Health request with 10 reply SURBs and wait for the response.
println!("Sending 'Health' request...");
client
.send_message(
@@ -62,6 +66,7 @@ async fn main() -> anyhow::Result<()> {
let response = wait_for_control_response(&mut client).await;
println!("response to 'Health' request: {response:#?}");
// Send a BinaryInfo request (no SURBs — the SP reuses SURBs from previous messages).
println!("Sending 'BinaryInfo' request...");
client
.send_message(
@@ -73,6 +78,7 @@ async fn main() -> anyhow::Result<()> {
let response = wait_for_control_response(&mut client).await;
println!("response to 'BinaryInfo' request: {response:#?}");
// Send a SupportedRequestVersions request.
println!("Sending 'SupportedRequestVersions' request...");
client
.send_message(
@@ -84,6 +90,7 @@ async fn main() -> anyhow::Result<()> {
let response = wait_for_control_response(&mut client).await;
println!("response to 'SupportedRequestVersions' request: {response:#?}");
// Disconnect for clean shutdown.
client.disconnect().await;
Ok(())
}
@@ -1,6 +1,15 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! Custom topology provider that filters mix nodes from the Nym API.
//!
//! Implements `TopologyProvider` to fetch the network topology and apply
//! custom filtering (here: only nodes with `node_id % 3 == 0` and a
//! perfect performance score). Shows how to plug alternative topology
//! sources into the client builder.
//!
//! Run with: cargo run --example custom_topology_provider
use nym_sdk::mixnet;
use nym_sdk::mixnet::MixnetMessageSender;
use nym_topology::provider_trait::{async_trait, ToTopologyMetadata, TopologyProvider};
@@ -8,6 +17,8 @@ use nym_topology::{EpochRewardedSet, NymTopology};
use nym_validator_client::nym_api::NymApiClientExt;
use url::Url;
// Define a custom topology provider.
// It fetches topology from the Nym API and applies arbitrary filtering.
struct MyTopologyProvider {
validator_client: nym_http_api_client::Client,
}
@@ -40,9 +51,8 @@ impl MyTopologyProvider {
let epoch_rewarded_set: EpochRewardedSet = rewarded_set.into();
let mut base_topology = NymTopology::new(metadata, epoch_rewarded_set, Vec::new());
// in our topology provider only use mixnodes that have node_id divisible by 3
// and has exactly 100 performance score
// why? because this is just an example to showcase arbitrary uses and capabilities of this trait
// Custom filter: only mix nodes with node_id divisible by 3
// and a perfect performance score.
let filtered_mixnodes = mixnodes_response
.nodes
.into_iter()
@@ -62,9 +72,10 @@ impl MyTopologyProvider {
}
}
// Implement the TopologyProvider trait.
// The client refreshes topology on a timer using this method.
#[async_trait]
impl TopologyProvider for MyTopologyProvider {
// this will be manually refreshed on a timer specified inside mixnet client config
async fn get_new_topology(&mut self) -> Option<NymTopology> {
Some(self.get_topology().await)
}
@@ -74,10 +85,10 @@ impl TopologyProvider for MyTopologyProvider {
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
// Create the provider and pass it to the client builder.
let nym_api = "https://validator.nymtech.net/api/".parse().unwrap();
let my_topology_provider = MyTopologyProvider::new(nym_api);
// Passing no config makes the client fire up an ephemeral session and figure things out on its own
let mut client = mixnet::MixnetClientBuilder::new_ephemeral()
.custom_topology_provider(Box::new(my_topology_provider))
.build()
@@ -89,7 +100,7 @@ async fn main() {
let our_address = client.nym_address();
println!("Our client nym address is: {our_address}");
// Send a message through the mixnet to ourselves
// Send a message to ourselves using the custom topology.
client
.send_plain_message(*our_address, "hello there")
.await
+5 -8
View File
@@ -1,14 +1,11 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
// Smoke test for IpMixStream: connect to an IPR, send a ping, check we get a reply.
// Tests both IPv4 and IPv6 paths.
//
// Usage:
// cargo run --example ipr_tunnel
// cargo run --example ipr_tunnel -- --ipr <IPR_ADDRESS>
//
// e.g. cargo run --example ipr_tunnel -- --ipr 6B6iuWX4bQP4GVA4Yq7XmZencaaGw6BaPY6xJWYSwsbF.6g6LRx1fgU2Q2A4ZPKonYHtfBARh1GPMe1LtXk6vpRR8@q2A2cbooyC16YJzvdYaSMH9X3cSiieZNtfBr8cE8Fi1
//! Smoke test for `IpMixStream`: connect to an IPR, send a ping, check we get a reply.
//!
//! Tests both IPv4 and IPv6 paths.
//!
//! Run with: `cargo run --example ipr_tunnel -- --ipr <IPR_ADDRESS>`
use std::net::{Ipv4Addr, Ipv6Addr};
use std::time::Duration;
@@ -1,3 +1,14 @@
//! Custom storage backend via the `MixnetClientStorage` trait.
//!
//! Shows how to implement the storage traits (`KeyStore`,
//! `GatewaysDetailsStore`, etc.) so the SDK can be backed by any
//! persistence layer (database, cloud KMS, HSM, …).
//!
//! This example uses mock implementations that always generate fresh
//! keys — in practice you would persist to your own store.
//!
//! Run with: cargo run --example manually_handle_storage
use nym_client_core::client::base_client::storage::gateways_storage::GatewayPublishedData;
use nym_sdk::mixnet::{
self, ed25519, ActiveGateway, BadGateway, ClientKeys, EmptyReplyStorage,
@@ -10,9 +21,10 @@ use nym_topology::provider_trait::async_trait;
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
// Just some plain data to pretend we have some external storage that the application
// implementer is using.
// Create an instance of your custom storage backend.
let mock_storage = MockClientStorage::empty();
// Pass it to the builder via `new_with_storage`.
let mut client = mixnet::MixnetClientBuilder::new_with_storage(mock_storage)
.build()
.unwrap()
@@ -20,11 +32,10 @@ async fn main() {
.await
.unwrap();
// Be able to get our client address
let our_address = client.nym_address();
println!("Our client nym address is: {our_address}");
// Send important info up the pipe to a buddy
// Use the client normally — storage is transparent.
client
.send_plain_message(*our_address, "hello there")
.await
@@ -37,6 +48,7 @@ async fn main() {
}
}
// Disconnect for clean shutdown.
client.disconnect().await;
}
@@ -1,6 +1,14 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! Manually overwriting the network topology at runtime.
//!
//! Connects an ephemeral client, then replaces its topology with a
//! hand-picked set of mix nodes while keeping the original gateways.
//! All subsequent traffic is routed through these specific nodes.
//!
//! Run with: cargo run --example manually_overwrite_topology
use nym_sdk::mixnet;
use nym_sdk::mixnet::MixnetMessageSender;
use nym_topology::{NymTopology, NymTopologyMetadata, RoutingNode, SupportedRoles};
@@ -9,11 +17,11 @@ use nym_topology::{NymTopology, NymTopologyMetadata, RoutingNode, SupportedRoles
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
// Passing no config makes the client fire up an ephemeral session and figure shit out on its own
// Connect an ephemeral client and grab the current topology.
let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
let starting_topology = client.read_current_route_provider().await.unwrap().clone();
// but we don't like our default topology, we want to use only those very specific, hardcoded, nodes:
// Define a custom set of hardcoded mix nodes.
let nodes = vec![
RoutingNode {
node_id: 63,
@@ -65,32 +73,31 @@ async fn main() {
},
];
// make sure our custom nodes are in the fake rewarded set (so they'd be used by default by the client)
// Build a custom topology using these nodes plus the original gateways.
// Inject our custom nodes into the rewarded set so the client will use them.
let mut rewarded_set = starting_topology.topology.rewarded_set().clone();
rewarded_set.layer1.insert(nodes[0].node_id);
rewarded_set.layer2.insert(nodes[1].node_id);
rewarded_set.layer3.insert(nodes[2].node_id);
// but we like the available gateways, so keep using them!
// (we like them because the author of this example is too lazy to use the same hardcoded gateway
// during client initialisation to make sure we are able to send to ourselves : ) )
// Keep the original gateways so we can still send to ourselves.
let gateways = starting_topology.topology.entry_capable_nodes();
// you should have obtained valid metadata information, in particular the key rotation ID!
// In production, obtain valid metadata (especially the key rotation ID).
let metadata = NymTopologyMetadata::new(u32::MAX, 123, time::OffsetDateTime::now_utc());
let mut custom_topology = NymTopology::new(metadata, rewarded_set, Vec::new());
custom_topology.add_routing_nodes(nodes);
custom_topology.add_routing_nodes(gateways);
// Apply the custom topology. All subsequent traffic goes
// through these specific nodes.
client.manually_overwrite_topology(custom_topology).await;
// and everything we send now should only ever go via those nodes
let our_address = client.nym_address();
println!("Our client nym address is: {our_address}");
// Send a message through the mixnet to ourselves
// Send a message to ourselves through the custom topology.
client
.send_plain_message(*our_address, "hello there")
.await
@@ -1,6 +1,14 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! Sending and receiving from separate tasks using `split_sender()`.
//!
//! `split_sender()` returns a clone-able `MixnetClientSender` that can
//! send messages from any task, while the original client handles
//! receiving via `futures::Stream`.
//!
//! Run with: cargo run --example parallel_sending_and_receiving
use futures::StreamExt;
use nym_sdk::mixnet;
use nym_sdk::mixnet::MixnetMessageSender;
@@ -9,25 +17,25 @@ use nym_sdk::mixnet::MixnetMessageSender;
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
// Passing no config makes the client fire up an ephemeral session and figure stuff out on its own
// Connect an ephemeral client.
let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
// Be able to get our client address
let our_address = *client.nym_address();
println!("Our client nym address is: {our_address}");
// Split the client into a sender handle.
// The sender is clone-able and can be moved into any task.
let sender = client.split_sender();
// receiving task
// Spawn a receiving task.
// The original client implements futures::Stream, so you can use .next().
let receiving_task_handle = tokio::spawn(async move {
if let Some(received) = client.next().await {
println!("Received: {}", String::from_utf8_lossy(&received.message));
}
client.disconnect().await;
});
// sending task
// Spawn a sending task using the split sender.
let sending_task_handle = tokio::spawn(async move {
sender
.send_plain_message(our_address, "hello from a different task!")
@@ -35,7 +43,7 @@ async fn main() {
.unwrap();
});
// wait for both tasks to be done
// Wait for both tasks to complete.
println!("waiting for shutdown");
sending_task_handle.await.unwrap();
receiving_task_handle.await.unwrap();
+15 -6
View File
@@ -1,31 +1,39 @@
//! Connecting to the Sandbox testnet instead of mainnet.
//!
//! Loads a sandbox `.env` file to override the default (mainnet)
//! network details, then sends a message to self.
//!
//! Run with: cargo run --example sandbox
use futures::StreamExt;
use nym_network_defaults::setup_env;
use nym_sdk::mixnet;
use nym_sdk::mixnet::MixnetMessageSender;
// An example of creating a client relying on a testnet, in this case Sandbox.
#[tokio::main]
async fn main() -> anyhow::Result<()> {
nym_bin_common::logging::setup_tracing_logger();
// relative root is `sdk/rust/nym-sdk/` for fallback file path
// Load the sandbox environment.
// Set NYM_ENV_PATH or fall back to the in-repo env file.
let env_path =
std::env::var("NYM_ENV_PATH").unwrap_or_else(|_| "../../../envs/sandbox.env".to_string());
setup_env(Some(&env_path));
let sandbox_network = mixnet::NymNetworkDetails::new_from_env();
// Build and connect using the sandbox network details.
let sandbox_network = mixnet::NymNetworkDetails::new_from_env();
let mixnet_client = mixnet::MixnetClientBuilder::new_ephemeral()
.network_details(sandbox_network)
.build()?;
let mut client = mixnet_client.connect_to_mixnet().await?;
let our_address = client.nym_address();
// Send a message throughout the mixnet to ourselves
// Send a message to ourselves through the sandbox mixnet.
client
.send_plain_message(*our_address, "hello there")
.await?;
// Wait for the message via the futures::Stream impl.
println!("Waiting for message");
if let Some(received) = client.next().await {
println!("Received: {}", String::from_utf8_lossy(&received.message));
@@ -33,6 +41,7 @@ async fn main() -> anyhow::Result<()> {
eprintln!("Failed to receive message.");
}
// Disconnect for clean shutdown.
client.disconnect().await;
Ok(())
}
+12 -5
View File
@@ -1,3 +1,9 @@
//! Minimal message example: send a message to yourself and print it.
//!
//! Uses an ephemeral client — no keys are stored to disk.
//!
//! Run with: cargo run --example simple
use nym_sdk::mixnet;
use nym_sdk::mixnet::MixnetMessageSender;
@@ -5,20 +11,21 @@ use nym_sdk::mixnet::MixnetMessageSender;
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
// Passing no config makes the client fire up an ephemeral session and figure shit out on its own
// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
// Connect an ephemeral client (keys generated in memory).
let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
// Be able to get our client address
let our_address = client.nym_address();
println!("Our client nym address is: {our_address}");
// Send a message through the mixnet to ourselves
// Send a message to ourselves through the mixnet.
// The message is Sphinx-encrypted and routed through 5 nodes
// (gateway → 3 mix nodes → gateway).
client
.send_plain_message(*our_address, "hello there")
.await
.unwrap();
// Wait for incoming messages and print them.
// on_messages blocks forever — press ctrl-c to exit.
println!("Waiting for message (ctrl-c to exit)");
client
.on_messages(|msg| println!("Received: {}", String::from_utf8_lossy(&msg.message)))
+16 -2
View File
@@ -1,12 +1,21 @@
//! SOCKS5 proxy client that routes HTTP requests through the mixnet.
//!
//! Connects a `Socks5MixnetClient` to a receiving `MixnetClient` acting
//! as the service provider, then sends an HTTP GET via the SOCKS5 proxy.
//!
//! Run with: cargo run --example socks5
use nym_sdk::mixnet;
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
// Connect a receiving client (acts as the "network requester").
println!("Connecting receiver");
let mut receiving_client = mixnet::MixnetClient::connect_new().await.unwrap();
// Build and connect a SOCKS5 sending client pointed at the receiver.
let socks5_config = mixnet::Socks5::new(receiving_client.nym_address().to_string());
let sending_client = mixnet::MixnetClientBuilder::new_ephemeral()
.socks5_config(socks5_config)
@@ -16,15 +25,19 @@ async fn main() {
println!("Connecting sender");
let sending_client = sending_client.connect_to_mixnet_via_socks5().await.unwrap();
// Configure an HTTP client to use the SOCKS5 proxy.
let proxy = reqwest::Proxy::all(sending_client.socks5_url()).unwrap();
let reqwest_client = reqwest::Client::builder().proxy(proxy).build().unwrap();
// Send an HTTP request through the mixnet via SOCKS5.
// No network requester is running on the other end, so we won't
// get a real HTTP response — but the receiver sees the raw bytes.
tokio::spawn(async move {
println!("Sending socks5-wrapped http request");
// Message should be sent through the mixnet, via socks5
// We don't expect to get anything, as there is no network requester on the other end
reqwest_client.get("https://nymtech.net").send().await.ok()
});
// The receiver sees the raw SOCKS5/HTTP bytes arrive.
println!("Waiting for message");
if let Some(received) = receiving_client.wait_for_messages().await {
for r in received {
@@ -35,6 +48,7 @@ async fn main() {
}
}
// Disconnect both clients.
receiving_client.disconnect().await;
sending_client.disconnect().await;
}
@@ -20,7 +20,7 @@ const WAIT_TIMEOUT: Duration = Duration::from_secs(60);
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
// Build a client with a short stream idle timeout.
// Build a client with a short stream idle timeout (default is 30 min).
let mut client = mixnet::MixnetClientBuilder::new_ephemeral()
.with_stream_idle_timeout(IDLE_TIMEOUT)
.build()
@@ -32,7 +32,7 @@ async fn main() {
let our_address = *client.nym_address();
println!("Client address: {our_address}");
// Set up a listener and open a stream to ourselves.
// Open a stream to ourselves (useful for testing).
let mut listener = client.listener().unwrap();
let mut outbound = client.open_stream(our_address, None).await.unwrap();
println!("Opened outbound stream: {}", outbound.id());
@@ -43,7 +43,7 @@ async fn main() {
.expect("listener shut down");
println!("Accepted inbound stream: {}", inbound.id());
// Use the stream.
// Use the stream — send and receive.
let msg = b"hello from idle timeout example";
outbound.write_all(msg).await.unwrap();
outbound.flush().await.unwrap();
@@ -56,15 +56,15 @@ async fn main() {
println!("Received: {:?}", String::from_utf8_lossy(&buf[..n]));
assert_eq!(&buf[..n], msg);
// Now stop using the stream and wait for the idle timeout.
// Stop using the stream. The router's periodic cleanup task
// will remove it after the idle timeout elapses.
println!(
"\nStream is idle. Waiting {}s for cleanup...",
IDLE_TIMEOUT.as_secs()
);
tokio::time::sleep(IDLE_TIMEOUT + Duration::from_secs(2)).await;
// The router should have cleaned up the stream. The inbound receiver
// is closed, so a read returns 0 bytes (EOF).
// Read returns 0 bytes (EOF) — the router cleaned up the stream.
let n = inbound.read(&mut buf).await.expect("read failed");
if n == 0 {
println!("Inbound stream returned EOF — cleaned up by idle timeout.");
+25 -7
View File
@@ -1,6 +1,23 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! Demonstrates the stream/message mode mutual exclusion guard.
//!
//! A `MixnetClient` operates in one of two modes: message mode (default)
//! or stream mode (activated by `open_stream()` or `listener()`). Once
//! stream mode is active, message-based methods like `send_plain_message`
//! return `Error::StreamModeActive`. This is a one-way transition — the
//! two modes share a single inbound channel from the gateway, so they
//! cannot coexist.
//!
//! This example shows:
//! - Using the message API before stream mode
//! - Activating stream mode via `listener()`
//! - Observing `StreamModeActive` errors on message sends
//! - `split_sender()` shares the mode flag (via `Arc<AtomicBool>`)
//!
//! Run with: cargo run --example stream_mode_guard
use nym_sdk::mixnet;
use nym_sdk::mixnet::MixnetMessageSender;
use nym_sdk::Error;
@@ -13,7 +30,7 @@ async fn main() {
let our_address = *client.nym_address();
println!("Our client nym address is: {our_address}");
// Message-based API works before stream mode is activated
// Message-based API works before stream mode is activated.
println!("\nTesting message-based API (should work)");
client
.send_plain_message(our_address, "hello via message API")
@@ -21,12 +38,14 @@ async fn main() {
.unwrap();
println!("Message sent successfully via message-based API");
// Now activate stream mode by creating a listener
// Activate stream mode by creating a listener.
// This is a one-way transition — the two modes share a single inbound
// channel from the gateway, so they cannot coexist.
println!("\nActivating stream mode via listener()");
let _listener = client.listener().unwrap();
println!("Stream mode is now active");
// Message-based API should now fail
// Message-based API now returns Error::StreamModeActive.
println!("\nTesting message-based API again (should fail)");
let result = client
.send_plain_message(our_address, "this should fail")
@@ -35,7 +54,6 @@ async fn main() {
match result {
Err(Error::StreamModeActive) => {
println!("Got expected error: StreamModeActive");
println!("Message-based API correctly blocked after stream mode activation");
}
Err(e) => {
println!("Got unexpected error: {e:?}");
@@ -45,8 +63,9 @@ async fn main() {
}
}
// split_sender shares the stream_mode flag
println!("\nTesting split_sender (shares stream_mode)");
// split_sender() shares the mode flag (Arc<AtomicBool>),
// so it also returns StreamModeActive.
println!("\nTesting split_sender (shares stream_mode flag)");
let sender = client.split_sender();
let result = sender
.send_plain_message(our_address, "this should also fail")
@@ -55,7 +74,6 @@ async fn main() {
match result {
Err(Error::StreamModeActive) => {
println!("Got expected error: StreamModeActive on split sender");
println!("Split sender correctly shares stream_mode with parent client");
}
Err(e) => {
println!("Got unexpected error: {e:?}");
@@ -18,6 +18,7 @@ const TIMEOUT: Duration = Duration::from_secs(60);
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
// Connect three ephemeral clients — one sender, two receivers.
let mut sender = mixnet::MixnetClient::connect_new().await.unwrap();
println!("Sender address: {}", sender.nym_address());
@@ -29,9 +30,13 @@ async fn main() {
let addr_b = *receiver_b.nym_address();
println!("Receiver B address: {addr_b}");
// Each receiver creates a listener (activates stream mode).
// listener() can only be called once per client.
let mut listener_a = receiver_a.listener().unwrap();
let mut listener_b = receiver_b.listener().unwrap();
// The sender opens a stream to each receiver.
// Each stream gets a random StreamId for multiplexing.
println!("\nOpening streams to both receivers...");
let mut stream_to_a = sender.open_stream(addr_a, None).await.unwrap();
println!("Stream to A opened: {}", stream_to_a.id());
@@ -39,6 +44,7 @@ async fn main() {
let mut stream_to_b = sender.open_stream(addr_b, None).await.unwrap();
println!("Stream to B opened: {}", stream_to_b.id());
// Both receivers accept the incoming streams concurrently.
println!("\nWaiting for both receivers to accept...");
let (inbound_a, inbound_b) = tokio::try_join!(
async {
@@ -58,6 +64,7 @@ async fn main() {
println!("A accepted stream: {}", inbound_a.id());
println!("B accepted stream: {}", inbound_b.id());
// Sender writes to both streams using AsyncWrite.
let msg_a = b"hello receiver A";
let msg_b = b"hello receiver B";
@@ -68,6 +75,8 @@ async fn main() {
stream_to_b.write_all(msg_b).await.unwrap();
stream_to_b.flush().await.unwrap();
// Both receivers read and reply concurrently.
// Replies travel via SURBs — receivers never learn the sender's address.
println!("\nBoth receivers reading and replying concurrently...");
let reply_a = b"reply from A";
let reply_b = b"reply from B";
@@ -109,6 +118,7 @@ async fn main() {
let inbound_a = res_a;
let inbound_b = res_b;
// Sender reads the replies back.
println!("\nSender reading replies...");
tokio::join!(
async {
@@ -139,8 +149,8 @@ async fn main() {
println!("\nConcurrent round-trips successful!");
// Streams clean up on drop (unregister from router).
// No close message is sent over the wire — see stream.rs.
// Clean up — streams deregister from the router on drop.
// No close message is sent; the remote side sees EOF after idle timeout.
drop(stream_to_a);
drop(stream_to_b);
drop(inbound_a);
+19 -9
View File
@@ -1,8 +1,16 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! Sends a 1 MB random file over a MixnetStream and verifies the
//! receiver got an identical copy. Cancel with Ctrl+C.
//! Sends 1 MiB of random data over a MixnetStream and verifies integrity.
//!
//! Uses `write_all` on the sender and `read_exact` on the receiver.
//! `read_exact` is needed because there is no close/EOF signal — streams
//! clean up via `Drop` and idle timeout, so the receiver must know the
//! expected size up front.
//!
//! Messages are reordered by sequence number in the stream layer, so
//! large payloads spanning multiple Sphinx packets are reassembled
//! correctly.
//!
//! Run with: cargo run --example stream_throughput
@@ -11,19 +19,19 @@ use rand::RngCore;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
const SIZE: usize = 1024 * 1024; // 1 MB
const SIZE: usize = 1024 * 1024; // 1 MiB
const TIMEOUT: Duration = Duration::from_secs(300);
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
// Generate random payload
// Generate 1 MiB of random data to send.
let mut payload = vec![0u8; SIZE];
rand::rngs::OsRng.fill_bytes(&mut payload);
println!("Generated {} bytes of random data", payload.len());
// Connect two clients
// Connect two clients and establish a stream.
println!("Connecting sender...");
let mut sender = mixnet::MixnetClient::connect_new().await.unwrap();
println!("{}", sender.nym_address());
@@ -33,7 +41,6 @@ async fn main() {
let recv_addr = *receiver.nym_address();
println!("{recv_addr}");
// Open stream
let mut listener = receiver.listener().unwrap();
let mut tx = sender.open_stream(recv_addr, None).await.unwrap();
let mut rx = tokio::time::timeout(TIMEOUT, listener.accept())
@@ -42,7 +49,8 @@ async fn main() {
.expect("listener closed");
println!("Stream established\n");
// Send
// Send the payload. write_all splits it across multiple
// Sphinx packets automatically.
let data = payload.clone();
let send_task = tokio::spawn(async move {
tx.write_all(&data).await.unwrap();
@@ -50,8 +58,9 @@ async fn main() {
println!("Sent {} bytes", data.len());
});
// Receive — read exactly SIZE bytes (don't rely on close/EOF - if we need this in future
// iterations we can introduce something like what the TcpProxy module has)
// Receive exactly SIZE bytes using read_exact.
// We use read_exact (not read-until-EOF) because there is no
// close/EOF signal — streams clean up via Drop and idle timeout.
let recv_task = tokio::spawn(async move {
let mut buf = vec![0u8; SIZE];
tokio::time::timeout(TIMEOUT, rx.read_exact(&mut buf))
@@ -62,6 +71,7 @@ async fn main() {
buf
});
// Verify integrity — the received bytes must match exactly.
let (_, received) = tokio::join!(send_task, recv_task);
let received = received.unwrap();
+26 -26
View File
@@ -1,3 +1,12 @@
//! Anonymous replies using SURBs (Single Use Reply Blocks).
//!
//! Sends a message to self, extracts the `AnonymousSenderTag` from the
//! incoming message, and replies using `send_reply()` — without knowing
//! the sender's Nym address. The SDK includes SURBs with every message
//! by default, so the recipient can always reply anonymously.
//!
//! Run with: cargo run --example surb_reply
use nym_sdk::mixnet::{
AnonymousSenderTag, MixnetClientBuilder, MixnetMessageSender, ReconstructedMessage,
StoragePaths,
@@ -9,37 +18,29 @@ use tempfile::TempDir;
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
// Specify some config options
// Build a client with persistent key storage.
// Keys are generated on first run, then loaded from disk on subsequent runs.
let config_dir: PathBuf = TempDir::new().unwrap().path().to_path_buf();
let storage_paths = StoragePaths::new_from_dir(&config_dir).unwrap();
// Create the client with a storage backend, and enable it by giving it some paths. If keys
// exists at these paths, they will be loaded, otherwise they will be generated.
let client = MixnetClientBuilder::new_with_default_storage(storage_paths)
.await
.unwrap()
.build()
.unwrap();
// Now we connect to the mixnet, using keys now stored in the paths provided.
let mut client = client.connect_to_mixnet().await.unwrap();
// Be able to get our client address
let our_address = client.nym_address();
println!("\nOur client nym address is: {our_address}");
// Send a message through the mixnet to ourselves using our nym address
// Send a message to ourselves.
client
.send_plain_message(*our_address, "hello there")
.await
.unwrap();
// we're going to parse the sender_tag (AnonymousSenderTag) from the incoming message and use it to 'reply' to ourselves instead of our Nym address.
// we know there will be a sender_tag since the sdk sends SURBs along with messages by default.
// Receive the message.
println!("Waiting for message\n");
// get the actual message - discard the empty vec sent along with a potential SURB topup request
let mut message: Vec<ReconstructedMessage> = Vec::new();
// Filter empty messages — these are SURB replenishment requests.
while let Some(new_message) = client.wait_for_messages().await {
if new_message.is_empty() {
continue;
@@ -48,25 +49,24 @@ async fn main() {
break;
}
let mut parsed = String::new();
if let Some(r) = message.first() {
parsed = String::from_utf8(r.message.clone()).unwrap();
}
// parse sender_tag: we will use this to reply to sender without needing their Nym address
let return_recipient: AnonymousSenderTag = message[0].sender_tag.unwrap();
println!(
"\nReceived the following message: {parsed} \nfrom sender with surb bucket {return_recipient}"
);
let parsed = String::from_utf8(message[0].message.clone()).unwrap();
// reply to self with it: note we use `send_str_reply` instead of `send_str`
println!("Replying with using SURBs");
// Extract the AnonymousSenderTag from the incoming message.
// This opaque token lets you reply without knowing the sender's address.
// The SDK includes SURBs with every message by default.
let return_recipient: AnonymousSenderTag = message[0].sender_tag.unwrap();
println!("Received: {parsed}\nSender tag: {return_recipient}");
// Reply anonymously using send_reply() instead of send_plain_message().
println!("Replying using SURBs...");
client
.send_reply(return_recipient, "hi an0n!")
.await
.unwrap();
println!("Waiting for message (once you see it, ctrl-c to exit)\n");
// Receive the reply.
println!("Waiting for reply (ctrl-c to exit)\n");
client
.on_messages(|msg| println!("\nReceived: {}", String::from_utf8_lossy(&msg.message)))
.on_messages(|msg| println!("Received: {}", String::from_utf8_lossy(&msg.message)))
.await;
}
@@ -1,3 +1,14 @@
//! Multiple concurrent TCP connections proxied through the mixnet.
//!
//! Starts several `TcpStream` connections on a loop to a remote
//! `NymProxyServer` (e.g. the echo server in `nym/tools/echo-server/`),
//! sends messages with variable delays, and logs round-trip replies.
//!
//! Run with:
//! ```text
//! cargo run --example tcp_proxy_multistream -- <ECHO_SERVER_NYM_ADDRESS> <ENV_FILE> <PORT>
//! ```
use nym_sdk::mixnet::Recipient;
use nym_sdk::tcp_proxy;
use rand::rngs::SmallRng;
@@ -20,12 +31,6 @@ struct ExampleMessage {
tcp_conn: i8,
}
// This example just starts off a bunch of Tcp connections on a loop to a remote endpoint: in this case the TcpListener behind the NymProxyServer instance on the echo server found in `nym/tools/echo-server/`. It pipes a few messages to it, logs the replies, and keeps track of the number of replies received per connection.
//
// To run:
// - run the echo server with `cargo run`
// - run this example with `cargo run --example tcp_proxy_multistream -- <ECHO_SERVER_NYM_ADDRESS> <ENV_FILE_PATH> <CLIENT_PORT>` e.g.
// cargo run --example tcp_proxy_multistream -- DMHyxo8n6sKWHHTVvjRVDxDSMX8gYXRU1AQ6UpwsrWiB.6STYCWGWyRxqn2juWdgjMkAMsT9EaAzPpLWq5zkS68MB@CJG5zTcmoLijmDrtAiLV9PZHxNz8LQu6hmgA89V2RxxL ../../../envs/canary.env 8080
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let server_address = env::args().nth(1).expect("Server address not provided");
@@ -1,3 +1,17 @@
//! Single TCP connection proxied through the mixnet via an echo server.
//!
//! Opens one `TcpStream` and sends 10 messages with variable delays.
//! The proxy handles Sphinx packet chunking and reassembly; this example
//! shows that individual messages may arrive out of order at the
//! application level even though per-message byte ordering is preserved.
//!
//! For concurrent connections see `tcp_proxy_multistream`.
//!
//! Run with:
//! ```text
//! cargo run --example tcp_proxy_single_connection <SERVER_PORT> <ENV_FILE> <CLIENT_PORT>
//! ```
use nym_sdk::tcp_proxy;
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};
@@ -19,17 +33,6 @@ struct ExampleMessage {
message_bytes: Vec<u8>,
}
// This is a basic example which opens a single TCP connection and writes a bunch of messages between a client and an echo
// server, so only uses a single session under the hood and doesn't really show off the message ordering capabilities; this is mainly
// just a quick introductory illustration on how:
// - the mixnet does message ordering
// - the NymProxyClient and NymProxyServer can be hooked into and used to communicate between two otherwise pretty vanilla TcpStreams
//
// For a more irl example checkout tcp_proxy_multistream.rs
//
// Run this with:
// `cargo run --example tcp_proxy_single_connection <SERVER_LISTEN_PORT> <ENV_FILE_PATH> <CLIENT_LISTEN_PATH>` e.g.
// `cargo run --example tcp_proxy_single_connection 8081 ../../../envs/canary.env 8080 `
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Keep track of sent/received messages
+66 -5
View File
@@ -7,11 +7,51 @@ use nym_credentials::{
IssuedTicketBook,
};
/// Represents a helper that can be used for importing ticketbooks / bandwidth
/// into the client before commencing mixnet connection
/// The way to create it is to call
/// [`crate::mixnet::DisconnectedMixnetClient::begin_bandwidth_import`] on the associated mixnet
/// client.
/// Helper for importing bandwidth credentials (ticketbooks) into a mixnet client.
///
/// `BandwidthImporter` provides methods for importing the various cryptographic
/// components needed for paid network access before connecting to the mixnet.
///
/// ## Overview
///
/// To use the Nym mixnet with paid bandwidth, clients need:
/// 1. **Ticketbooks**: Pre-purchased bandwidth tokens that are spent during network use
/// 2. **Verification keys**: Cryptographic keys to verify credential signatures
/// 3. **Signatures**: Aggregated signatures for coin indices and expiration dates
///
/// ## Usage
///
/// Obtain a `BandwidthImporter` by calling
/// [`DisconnectedMixnetClient::begin_bandwidth_import`](crate::mixnet::DisconnectedMixnetClient::begin_bandwidth_import)
/// on a disconnected client:
///
/// ```rust,no_run
/// use nym_sdk::mixnet::MixnetClientBuilder;
///
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// let client = MixnetClientBuilder::new_ephemeral()
/// .build()?;
///
/// // Import credentials before connecting
/// {
/// let importer = client.begin_bandwidth_import();
/// // importer.import_ticketbook(&ticketbook).await?;
/// // importer.import_master_verification_key(&key).await?;
/// // ...
/// } // importer dropped here, releasing the borrow on client
///
/// // Now connect with credentials available
/// let connected = client.connect_to_mixnet().await?;
/// # Ok(())
/// # }
/// ```
///
/// ## Credential Components
///
/// - **Ticketbook**: Contains bandwidth tokens that are spent during network use
/// - **Master verification key**: Used to verify credential signatures for an epoch
/// - **Coin index signatures**: Signatures over the coin indices in the ticketbook
/// - **Expiration date signatures**: Signatures over credential expiration dates
pub struct BandwidthImporter<'a, St> {
storage: &'a St,
}
@@ -25,6 +65,10 @@ where
BandwidthImporter { storage }
}
/// Imports a complete ticketbook into credential storage.
///
/// A ticketbook contains pre-purchased bandwidth tokens that are spent
/// during network use. Each token represents a certain amount of bandwidth.
pub async fn import_ticketbook(
&self,
ticketbook: &IssuedTicketBook,
@@ -38,6 +82,10 @@ where
Ok(())
}
/// Imports a partial range of tickets from a ticketbook.
///
/// Useful when sharing a ticketbook across multiple clients or when only
/// a portion should be available to this client.
pub async fn import_partial_ticketbook(
&self,
ticketbook: &IssuedTicketBook,
@@ -57,6 +105,10 @@ where
Ok(())
}
/// Imports the master verification key for credential validation.
///
/// Used to verify that credentials were properly issued by the credential
/// signers. Each epoch has its own verification key.
pub async fn import_master_verification_key(
&self,
key: &EpochVerificationKey,
@@ -70,6 +122,10 @@ where
Ok(())
}
/// Imports aggregated coin index signatures.
///
/// These signatures are needed to prove ownership of specific
/// coins/tokens in the ticketbook.
pub async fn import_coin_index_signatures(
&self,
signatures: &AggregatedCoinIndicesSignatures,
@@ -83,6 +139,11 @@ where
Ok(())
}
/// Imports aggregated expiration date signatures.
///
/// These signatures verify the validity period of credentials. Credentials
/// are only valid for a certain time period, and these signatures prove
/// when they expire.
pub async fn import_expiration_date_signatures(
&self,
signatures: &AggregatedExpirationDateSignatures,
+36 -140
View File
@@ -1,146 +1,42 @@
//! A variable-size pool of ephemeral Nym Clients to quickly grab and use.
//! A variable-sized pool of ephemeral Mixnet clients for higher-throughput applications.
//!
//! This module provides [`ClientPool`], which maintains a configurable number of
//! connected ephemeral [`MixnetClient`](crate::mixnet::MixnetClient) instances. This is
//! useful for applications that need to handle many concurrent connections without
//! the latency of creating new clients on-demand.
//!
//! use crate::mixnet::{MixnetClient, MixnetClientBuilder, NymNetworkDetails};
//! use anyhow::Result;
//! use std::fmt;
//! use std::sync::Arc;
//! use tokio::sync::RwLock;
//! use tokio_util::sync::CancellationToken;
//! use tracing::{debug, info, warn};
//! pub struct ClientPool {
//! clients: Arc<RwLock<Vec<Arc<MixnetClient>>>>, // Collection of clients waiting to be used which are popped off in get_mixnet_client()
//! client_pool_reserve_number: usize, // Default # of clients to have available in pool in reserve waiting for incoming connections
//! cancel_token: CancellationToken,
//! }
//! // This is only necessary for when you're wanting to check the addresses of the clients that are currently in the //! pool.
//! impl fmt::Debug for ClientPool {
//! fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
//! let clients_debug = match self.clients.try_read() {
//! Ok(clients) => {
//! if f.alternate() {
//! // pretty
//! clients
//! .iter()
//! .enumerate()
//! .map(|(i, client)| format!("\n {}: {}", i, client.nym_address()))
//! .collect::<Vec<_>>()
//! .join(",")
//! } else {
//! // compact
//! clients
//! .iter()
//! .map(|client| client.nym_address().to_string())
//! .collect::<Vec<_>>()
//! .join(", ")
//! }
//! }
//! Err(_) => "<locked>".to_string(),
//! };
//! let mut debug_struct = f.debug_struct("Pool");
//! debug_struct
//! .field(
//! "client_pool_reserve_number",
//! &self.client_pool_reserve_number,
//! )
//! .field("clients", &format_args!("[{}]", clients_debug));
//! debug_struct.finish()
//! }
//! }
//! impl ClientPool {
//! pub fn new(client_pool_reserve_number: usize) -> Self {
//! ClientPool {
//! clients: Arc::new(RwLock::new(Vec::new())),
//! client_pool_reserve_number,
//! cancel_token: CancellationToken::new(),
//! }
//! }
//! // The loop here is simple: if there aren't enough clients, create more. If you set clients to 0, repeatedly //! just sleep.
//! // disconnect_pool() will kill this loop via the cancellation token.
//! pub async fn start(&self) -> Result<()> {
//! loop {
//! let spawned_clients = self.clients.read().await.len();
//! let addresses = self;
//! debug!(
//! "Currently spawned clients: {}: {:?}",
//! spawned_clients, addresses
//! );
//! if self.cancel_token.is_cancelled() {
//! break Ok(());
//! }
//! if spawned_clients >= self.client_pool_reserve_number {
//! debug!("Got enough clients already: sleeping");
//! } else {
//! info!(
//! "Clients in reserve = {}, reserve amount = {}, spawning new client",
//! spawned_clients, self.client_pool_reserve_number
//! );
//! let client = loop {
//! let net = NymNetworkDetails::new_from_env();
//! match MixnetClientBuilder::new_ephemeral()
//! .network_details(net)
//! .build()?
//! .connect_to_mixnet()
//! .await
//! {
//! Ok(client) => break client,
//! Err(err) => {
//! warn!("Error creating client: {:?}, will retry in 100ms", err);
//! tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
//! }
//! }
//! };
//! self.clients.write().await.push(Arc::new(client));
//! }
//! tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
//! }
//! }
//! pub async fn disconnect_pool(&self) {
//! info!("Triggering Client Pool disconnect");
//! self.cancel_token.cancel();
//! info!(
//! "Client pool cancellation token cancelled: {}",
//! self.cancel_token.is_cancelled()
//! );
//! let mut clients = self.clients.write().await;
//! while let Some(arc_client) = clients.pop() {
//! if let Ok(client) = Arc::try_unwrap(arc_client) {
//! info!("Killing reserve client {}", client.nym_address());
//! client.disconnect().await;
//! }
//! }
//! }
//! pub async fn get_mixnet_client(&self) -> Option<MixnetClient> {
//! debug!("Grabbing client from pool");
//! let mut clients = self.clients.write().await;
//! clients
//! .pop()
//! .and_then(|arc_client| Arc::try_unwrap(arc_client).ok())
//! }
//! pub async fn get_client_count(&self) -> usize {
//! self.clients.read().await.len()
//! }
//! pub async fn get_pool_reserve(&self) -> usize {
//! self.client_pool_reserve_number
//! }
//! pub fn clone(&self) -> Self {
//! Self {
//! clients: Arc::clone(&self.clients),
//! client_pool_reserve_number: self.client_pool_reserve_number,
//! cancel_token: self.cancel_token.clone(),
//! }
//! }
//! See the [tutorial](https://nymtech.net/docs/developers/rust/client-pool/tutorial)
//! for a step-by-step guide.
//!
//! # Example
//!
//! ```rust,no_run
//! use nym_sdk::client_pool::ClientPool;
//!
//! #[tokio::main]
//! async fn main() -> anyhow::Result<()> {
//! // Create a pool that maintains 5 clients in reserve
//! let pool = ClientPool::new(5);
//!
//! // Start the pool in a background task
//! let pool_clone = pool.clone();
//! tokio::spawn(async move {
//! pool_clone.start().await
//! });
//!
//! // Get a client from the pool when needed
//! if let Some(client) = pool.get_mixnet_client().await {
//! println!("Got client: {}", client.nym_address());
//! client.disconnect().await;
//! }
//!
//! // Shutdown the pool
//! pool.disconnect_pool().await;
//! Ok(())
//! }
//! ```
//!
#![doc = include_str!("client_pool/ARCHITECTURE.md")]
mod mixnet_client_pool;
@@ -0,0 +1,44 @@
# Client Pool — Architecture
## Overview
`ClientPool` maintains a configurable number of pre-connected ephemeral
`MixnetClient` instances, eliminating per-request connection latency
(gateway handshake, key generation, topology fetch).
```text
┌──────────────────────────────────────────────────────┐
│ ClientPool │
│ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Client 1 │ │ Client 2 │ │ Client 3 │ ... │
│ │ (connected)│ │ (connected)│ │ (connected)│ │
│ └────────────┘ └────────────┘ └────────────┘ │
│ │
│ start() loop: │
│ if len < reserve → create new client │
│ if len >= reserve → sleep │
│ if cancel_token → break │
└──────────────────────────────────────────────────────┘
│ get_mixnet_client()
Arc::try_unwrap(client) → MixnetClient
```
## Lifecycle
1. **Create**`ClientPool::new(reserve)` creates an empty pool.
2. **Start**`pool.start().await` runs a background loop that keeps
the pool topped up to `reserve` connected clients.
3. **Get**`pool.get_mixnet_client().await` pops a client (returns
`None` if empty). Clients are consumed, not returned — the background
loop creates replacements.
4. **Shutdown**`pool.disconnect_pool().await` cancels the loop and
disconnects all remaining clients.
## Integration with TcpProxy
`NymProxyClient` uses a `ClientPool` internally. Each TCP connection
pops a client; if the pool is empty, an ephemeral client is created
on the fly. Set reserve to 0 to always create on-demand.
@@ -10,9 +10,53 @@ use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
/// A pool of connected ephemeral [`MixnetClient`] instances for higher-throughput applications.
///
/// `ClientPool` maintains a configurable number of ready-to-use Mixnet clients in reserve,
/// automatically creating new clients when the pool is depleted. This is useful for
/// applications that need to handle many concurrent connections without the latency
/// of creating new clients on-demand.
///
/// ## Usage
///
/// The pool operates as a background task that continuously maintains the configured
/// number of clients. Clients are obtained via [`get_mixnet_client`](Self::get_mixnet_client)
/// and are removed from the pool (then disconnected).
///
/// ## Example
///
/// ```rust,no_run
/// use nym_sdk::client_pool::ClientPool;
///
/// #[tokio::main]
/// async fn main() -> anyhow::Result<()> {
/// // Create a pool that maintains 5 clients in reserve
/// let pool = ClientPool::new(5);
///
/// // Start the pool in a background task
/// let pool_clone = pool.clone();
/// tokio::spawn(async move {
/// pool_clone.start().await
/// });
///
/// // Get a client from the pool when needed
/// if let Some(client) = pool.get_mixnet_client().await {
/// println!("Got client: {}", client.nym_address());
/// // Use the client...
/// client.disconnect().await;
/// }
///
/// // Shutdown the pool
/// pool.disconnect_pool().await;
/// Ok(())
/// }
/// ```
pub struct ClientPool {
clients: Arc<RwLock<Vec<Arc<MixnetClient>>>>, // Collection of clients waiting to be used which are popped off in get_mixnet_client()
client_pool_reserve_number: usize, // Default # of clients to have available in pool in reserve waiting for incoming connections
/// Collection of clients waiting to be used which are popped off in get_mixnet_client()
clients: Arc<RwLock<Vec<Arc<MixnetClient>>>>,
/// Default # of clients to have available in pool in reserve waiting for incoming connections
client_pool_reserve_number: usize,
/// CancellationToken used to signal shutdown
cancel_token: CancellationToken,
}
@@ -63,6 +107,16 @@ impl Clone for ClientPool {
}
impl ClientPool {
/// Creates a new client pool with the specified reserve size.
///
/// The pool will attempt to maintain `client_pool_reserve_number` clients
/// ready for immediate use. The pool starts empty and must be activated
/// by calling [`start`](Self::start).
///
/// # Arguments
///
/// * `client_pool_reserve_number` - The target number of clients to keep in reserve.
/// Set to 0 to create a pool that doesn't automatically spawn clients.
pub fn new(client_pool_reserve_number: usize) -> Self {
ClientPool {
clients: Arc::new(RwLock::new(Vec::new())),
@@ -71,8 +125,28 @@ impl ClientPool {
}
}
// The loop here is simple: if there aren't enough clients, create more. If you set clients to 0, repeatedly just sleep.
// disconnect_pool() will kill this loop via the cancellation token.
/// Starts the pool's background task that maintains the client reserve.
///
/// This method runs a loop that continuously checks if more clients are needed
/// and creates them as necessary. The loop continues until [`disconnect_pool`](Self::disconnect_pool)
/// is called.
///
/// This should typically be spawned as a background task:
///
/// ```rust,no_run
/// # use nym_sdk::client_pool::ClientPool;
/// # async fn example() {
/// let pool = ClientPool::new(3);
/// let pool_clone = pool.clone();
/// tokio::spawn(async move {
/// let _ = pool_clone.start().await;
/// });
/// # }
/// ```
///
/// # Returns
///
/// Returns `Ok(())` when the pool is shut down via cancellation token.
pub async fn start(&self) -> Result<()> {
loop {
let spawned_clients = self.clients.read().await.len();
@@ -112,9 +186,15 @@ impl ClientPool {
}
}
// Even though this is basically start() with an extra param since I think this
// will only be used for testing scenarios, and I didn't want to unnecessarily add
// another param to the function that will be used elsewhere, hence this is its own fn
/// Starts the pool with all clients connecting to a specific gateway.
///
/// This variant of [`start`](Self::start) forces all created clients to use the
/// specified gateway. Primarily useful for testing scenarios where gateway
/// consistency is required.
///
/// # Arguments
///
/// * `gateway` - The Ed25519 public key of the gateway all clients should connect to.
pub async fn start_with_specified_gateway(&self, gateway: ed25519::PublicKey) -> Result<()> {
loop {
let spawned_clients = self.clients.read().await.len();
@@ -155,6 +235,14 @@ impl ClientPool {
}
}
/// Shuts down the pool and disconnects all clients.
///
/// This method:
/// 1. Cancels the background task that creates new clients
/// 2. Disconnects all clients currently in the pool
///
/// After calling this method, the pool cannot be restarted. Create a new
/// `ClientPool` instance if you need to resume pooling.
pub async fn disconnect_pool(&self) {
info!("Triggering Client Pool disconnect");
self.cancel_token.cancel();
@@ -171,6 +259,18 @@ impl ClientPool {
}
}
/// Retrieves a client from the pool, if one is available.
///
/// The client is removed from the pool and ownership is transferred to the caller.
/// After use, the client should be disconnected; it is not returned to the pool.
///
/// If the pool is empty, this returns `None`. The background task started by
/// [`start`](Self::start) will create a replacement client automatically.
///
/// # Returns
///
/// - `Some(MixnetClient)` if a client was available in the pool
/// - `None` if the pool is currently empty
pub async fn get_mixnet_client(&self) -> Option<MixnetClient> {
debug!("Grabbing client from pool");
let mut clients = self.clients.write().await;
@@ -179,10 +279,15 @@ impl ClientPool {
.and_then(|arc_client| Arc::try_unwrap(arc_client).ok())
}
/// Returns the current number of clients available in the pool.
pub async fn get_client_count(&self) -> usize {
self.clients.read().await.len()
}
/// Returns the configured reserve size for this pool.
///
/// This is the target number of clients the pool attempts to maintain,
/// as set during construction with [`new`](Self::new).
pub async fn get_pool_reserve(&self) -> usize {
self.client_pool_reserve_number
}
+1
View File
@@ -154,4 +154,5 @@ impl Error {
}
}
/// A [`Result`](std::result::Result) type alias with [`Error`] as the default error type.
pub type Result<T, E = Error> = std::result::Result<T, E>;
+137 -18
View File
@@ -1,12 +1,93 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! Rust SDK for the Nym platform
//! # Nym SDK
//!
//! The main component currently is [`mixnet`].
//! [`tcp_proxy`] is probably a good place to start for anyone wanting to integrate with existing app code and read/write from a socket.
//! [`client_pool`] is a configurable client pool.
//! [`ipr_wrapper`] tunnels IP packets through the mixnet to an IPR exit gateway.
//! Rust SDK for building privacy-preserving applications on the [Nym mixnet](https://nymtech.net),
//! a decentralized network that provides network-level privacy through packet mixing,
//! timing obfuscation, and Sphinx packet encryption.
//!
//! For tutorials and conceptual guides, see the
//! [Nym developer portal](https://nymtech.net/docs/developers/rust).
//!
//! # Getting started
//!
//! **Start with [`mixnet::MixnetClient::connect_new`]** for a quick ephemeral client, or
//! [`mixnet::MixnetClientBuilder`] when you need to configure storage, gateway selection,
//! or network settings.
//!
//! ```no_run
//! use nym_sdk::mixnet::{self, MixnetMessageSender};
//!
//! # #[tokio::main]
//! # async fn main() {
//! let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
//! let addr = *client.nym_address();
//!
//! client.send_plain_message(addr, "hello mixnet!").await.unwrap();
//!
//! // Always disconnect for clean shutdown
//! client.disconnect().await;
//! # }
//! ```
//!
//! ## Stream I/O
//!
//! For persistent bidirectional byte channels (like a TCP socket), use
//! [`MixnetClient::open_stream`](mixnet::MixnetClient::open_stream) and
//! [`MixnetClient::listener`](mixnet::MixnetClient::listener).
//! Streams implement [`AsyncRead`](tokio::io::AsyncRead) +
//! [`AsyncWrite`](tokio::io::AsyncWrite) — see [`mixnet::stream`] for
//! the full API:
//!
//! ```no_run
//! use nym_sdk::mixnet;
//! use tokio::io::{AsyncReadExt, AsyncWriteExt};
//!
//! # #[tokio::main]
//! # async fn main() {
//! let mut sender = mixnet::MixnetClient::connect_new().await.unwrap();
//! let mut receiver = mixnet::MixnetClient::connect_new().await.unwrap();
//! let recv_addr = *receiver.nym_address();
//!
//! let mut listener = receiver.listener().unwrap();
//! let mut tx = sender.open_stream(recv_addr, None).await.unwrap();
//! let mut rx = listener.accept().await.unwrap();
//!
//! tx.write_all(b"hello via stream").await.unwrap();
//! tx.flush().await.unwrap();
//!
//! let mut buf = vec![0u8; 1024];
//! let n = rx.read(&mut buf).await.unwrap();
//!
//! sender.disconnect().await;
//! receiver.disconnect().await;
//! # }
//! ```
//!
//! See [`mixnet::stream`] for the full stream API, and the
//! [stream tutorial](https://nymtech.net/docs/developers/rust/stream/tutorial)
//! for a step-by-step walkthrough.
//!
//! # Modules
//!
//! | Module | Purpose |
//! |--------|---------|
//! | [`mixnet`] | Core client — messages, streams, builder, storage |
//! | [`client_pool`] | Pre-warmed pool of ephemeral clients |
//! | [`tcp_proxy`] | TCP tunnelling over the mixnet (deprecated — prefer streams) |
//! | [`bandwidth`] | Bandwidth credential management |
//!
//! # Feature flags
//!
//! **Feature gates are not yet implemented.** Importing `nym-sdk` currently pulls in all
//! modules and their full dependency trees. Work is planned to gate modules behind Cargo
//! features so you can import only what you need.
//!
//! # Network configuration
//!
//! By default, the SDK connects to the Nym mainnet. Customize with
//! [`NymNetworkDetails`] or environment variables.
mod error;
@@ -18,19 +99,57 @@ pub mod mixnet;
pub mod tcp_proxy;
pub use error::{Error, Result};
// Re-exports: gateway transceiver types (deprecated internals)
#[allow(deprecated)]
pub use nym_client_core::{
client::{
mix_traffic::transceiver::*,
topology_control::{
NymApiTopologyProvider, NymApiTopologyProviderConfig, TopologyProvider,
},
},
config::{DebugConfig, RememberMe},
pub use nym_client_core::client::mix_traffic::transceiver::{
ErasedGatewayError, GatewayPacketRouter, GatewayReceiver, GatewaySender, GatewayTransceiver,
LocalGateway, LocalGatewayError, MockGateway, MockGatewayError, PacketRouter, RemoteGateway,
};
pub use nym_network_defaults::{
ChainDetails, DenomDetails, DenomDetailsOwned, NymContracts, NymNetworkDetails,
ValidatorDetails,
};
pub use nym_task::{ShutdownToken, ShutdownTracker};
// Re-exports: topology
/// Fetches network topology from the Nym API.
pub use nym_client_core::client::topology_control::NymApiTopologyProvider;
/// Configuration for [`NymApiTopologyProvider`].
pub use nym_client_core::client::topology_control::NymApiTopologyProviderConfig;
/// Trait for custom topology providers. Implement this to fetch topology
/// from alternative sources (see `custom_topology_provider` example).
pub use nym_client_core::client::topology_control::TopologyProvider;
// Re-exports: config
/// Debug/development configuration for mixnet clients.
pub use nym_client_core::config::DebugConfig;
/// Controls whether client identity persists across restarts.
pub use nym_client_core::config::RememberMe;
// Re-exports: network defaults
/// Cosmos chain configuration (chain ID, RPC, gas).
pub use nym_network_defaults::ChainDetails;
/// Token denomination details (borrowed).
pub use nym_network_defaults::DenomDetails;
/// Token denomination details (owned).
pub use nym_network_defaults::DenomDetailsOwned;
/// Nym smart contract addresses.
pub use nym_network_defaults::NymContracts;
/// Complete network configuration (endpoints, contracts, chain details).
///
/// ```rust,no_run
/// use nym_sdk::NymNetworkDetails;
///
/// // Load from environment (defaults to mainnet)
/// let network = NymNetworkDetails::new_from_env();
/// println!("API: {:?}", network.endpoints);
/// ```
pub use nym_network_defaults::NymNetworkDetails;
/// Validator/API endpoint configuration.
pub use nym_network_defaults::ValidatorDetails;
// Re-exports: task management
/// Cancellation token for graceful shutdown.
pub use nym_task::ShutdownToken;
/// Tracks spawned tasks for coordinated shutdown.
pub use nym_task::ShutdownTracker;
// Re-exports: API client
/// Client identification sent with Nym API requests.
pub use nym_validator_client::UserAgent;
+185 -49
View File
@@ -1,22 +1,24 @@
//! The mixnet component of the Rust SDK for the Nym platform
//! The mixnet component of the Rust SDK for the Nym platform.
//!
//! **Start here:** [`MixnetClient::connect_new`] for an ephemeral client, or
//! [`MixnetClientBuilder`] for full configuration. See the
//! [tutorial](https://nymtech.net/docs/developers/rust/mixnet/tutorial) for a
//! step-by-step walkthrough.
//!
//! # Basic example
//! # Message example
//!
//! Send and receive raw message payloads through the Mixnet:
//!
//! ```no_run
//! use nym_sdk::mixnet::{self, MixnetMessageSender};
//!
//! #[tokio::main]
//! async fn main() {
//! // Passing no config makes the client fire up an ephemeral session and figure stuff out on
//! // its own
//! let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
//! let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
//!
//! // Be able to get our client address
//! let our_address = client.nym_address();
//! println!("Our client nym address is: {our_address}");
//!
//! // Send a message throught the mixnet to ourselves
//! client.send_plain_message(*our_address, "hello there").await.unwrap();
//!
//! println!("Waiting for message");
@@ -29,6 +31,49 @@
//! client.disconnect().await;
//! }
//! ```
//!
//! # Stream example
//!
//! Persistent bidirectional byte channels using
//! [`AsyncRead`](tokio::io::AsyncRead) + [`AsyncWrite`](tokio::io::AsyncWrite)
//! — see the [`stream`] submodule for the full API:
//!
//! ```no_run
//! use nym_sdk::mixnet;
//! use tokio::io::{AsyncReadExt, AsyncWriteExt};
//!
//! #[tokio::main]
//! async fn main() {
//! let mut sender = mixnet::MixnetClient::connect_new().await.unwrap();
//! let mut receiver = mixnet::MixnetClient::connect_new().await.unwrap();
//! let receiver_addr = *receiver.nym_address();
//!
//! // Receiver creates a listener (activates stream mode)
//! let mut listener = receiver.listener().unwrap();
//!
//! // Sender opens a stream to the receiver
//! let mut outbound = sender.open_stream(receiver_addr, None).await.unwrap();
//!
//! // Receiver accepts the incoming stream
//! let mut inbound = listener.accept().await.unwrap();
//!
//! // Write and read — just like a TCP socket
//! outbound.write_all(b"hello").await.unwrap();
//! outbound.flush().await.unwrap();
//!
//! let mut buf = vec![0u8; 1024];
//! let n = inbound.read(&mut buf).await.unwrap();
//! println!("Got: {}", String::from_utf8_lossy(&buf[..n]));
//!
//! // Streams deregister on drop, then disconnect clients
//! drop(outbound);
//! drop(inbound);
//! sender.disconnect().await;
//! receiver.disconnect().await;
//! }
//! ```
//!
#![doc = include_str!("mixnet/ARCHITECTURE.md")]
mod client;
mod config;
@@ -40,54 +85,145 @@ mod socks5_client;
pub mod stream;
mod traits;
// Local module exports
pub use client::{DisconnectedMixnetClient, IncludedSurbs, MixnetClientBuilder};
pub use config::Config;
pub use native_client::MixnetClient;
pub use native_client::MixnetClientSender;
#[allow(deprecated)]
pub use nym_client_core::client::{
base_client::{
storage::{
gateways_storage::{
ActiveGateway, BadGateway, GatewayRegistration, GatewaysDetailsStore,
},
Ephemeral, MixnetClientStorage, OnDiskPersistent,
},
EventReceiver, EventSender, MixnetClientEvent,
},
inbound_messages::InputMessage,
key_manager::{
persistence::{InMemEphemeralKeys, KeyStore, OnDiskKeys},
ClientKeys,
},
mix_traffic::MixTrafficEvent,
replies::reply_storage::{
fs_backend::Backend as ReplyStorage, CombinedReplyStorage, Empty as EmptyReplyStorage,
ReplyStorageBackend,
},
};
pub use nym_credential_storage::{
ephemeral_storage::EphemeralStorage as EphemeralCredentialStorage,
models::StoredIssuedTicketbook, storage::Storage as CredentialStorage,
};
pub use nym_crypto::asymmetric::{ed25519, x25519};
pub use nym_network_defaults::NymNetworkDetails;
pub use nym_socks5_client_core::config::Socks5;
pub use nym_sphinx::{
addressing::{
clients::{ClientIdentity, Recipient, RecipientFormattingError},
nodes::NodeIdentity,
},
anonymous_replies::requests::AnonymousSenderTag,
receiver::ReconstructedMessage,
};
pub use nym_statistics_common::clients::{
connection::ConnectionStatsEvent, ClientStatsEvents, ClientStatsSender,
};
pub use nym_task::connections::{LaneQueueLengths, TransmissionLane};
pub use nym_topology::{provider_trait::TopologyProvider, NymTopology};
pub use paths::StoragePaths;
pub use sink::{MixnetMessageSink, MixnetMessageSinkTranslator};
pub use socks5_client::Socks5MixnetClient;
pub use stream::{MixnetListener, MixnetStream, StreamId};
pub use traits::MixnetMessageSender;
// Re-exports from nym-client-core with documentation
#[allow(deprecated)]
pub use nym_client_core::client::base_client::storage::gateways_storage::GatewaysDetailsStore;
/// Information about a currently active gateway connection.
#[doc(alias = "Gateway")]
pub use nym_client_core::client::base_client::storage::gateways_storage::ActiveGateway;
/// Information about a gateway that failed to connect or is invalid.
pub use nym_client_core::client::base_client::storage::gateways_storage::BadGateway;
/// Registration details for a gateway including keys and connection info.
pub use nym_client_core::client::base_client::storage::gateways_storage::GatewayRegistration;
/// Ephemeral (in-memory) storage backend. Data is lost when the client disconnects.
pub use nym_client_core::client::base_client::storage::Ephemeral;
/// Trait for mixnet client storage implementations.
pub use nym_client_core::client::base_client::storage::MixnetClientStorage;
/// On-disk persistent storage backend. Data survives client restarts.
pub use nym_client_core::client::base_client::storage::OnDiskPersistent;
/// Receiver for client lifecycle events.
pub use nym_client_core::client::base_client::EventReceiver;
/// Sender for client lifecycle events.
pub use nym_client_core::client::base_client::EventSender;
/// Events emitted by the mixnet client during its lifecycle.
pub use nym_client_core::client::base_client::MixnetClientEvent;
/// A message to be sent through the mixnet.
pub use nym_client_core::client::inbound_messages::InputMessage;
/// In-memory ephemeral key storage. Keys are lost when the client disconnects.
pub use nym_client_core::client::key_manager::persistence::InMemEphemeralKeys;
/// Trait for key storage implementations.
pub use nym_client_core::client::key_manager::persistence::KeyStore;
/// On-disk key storage. Keys persist across client restarts.
pub use nym_client_core::client::key_manager::persistence::OnDiskKeys;
/// The client's cryptographic keys (identity, encryption, gateway shared key).
pub use nym_client_core::client::key_manager::ClientKeys;
/// Events related to mix traffic (packet sending/receiving).
pub use nym_client_core::client::mix_traffic::MixTrafficEvent;
/// File-system backed reply SURB storage.
pub use nym_client_core::client::replies::reply_storage::fs_backend::Backend as ReplyStorage;
/// Combined reply storage supporting multiple backends.
pub use nym_client_core::client::replies::reply_storage::CombinedReplyStorage;
/// Empty reply storage that discards all SURBs. Replies will not work.
pub use nym_client_core::client::replies::reply_storage::Empty as EmptyReplyStorage;
/// Trait for reply SURB storage implementations.
pub use nym_client_core::client::replies::reply_storage::ReplyStorageBackend;
// Re-exports from nym-credential-storage
/// Ephemeral (in-memory) credential storage. Credentials are lost on disconnect.
pub use nym_credential_storage::ephemeral_storage::EphemeralStorage as EphemeralCredentialStorage;
/// A ticketbook stored in the credential storage.
pub use nym_credential_storage::models::StoredIssuedTicketbook;
/// Trait for credential storage implementations.
pub use nym_credential_storage::storage::Storage as CredentialStorage;
// Re-exports from nym-crypto
/// Ed25519 digital signature cryptography (signing and verification).
pub use nym_crypto::asymmetric::ed25519;
/// X25519 elliptic curve Diffie-Hellman key exchange.
pub use nym_crypto::asymmetric::x25519;
// Re-exports from nym-network-defaults
/// Network configuration details (API endpoints, contract addresses, etc.).
pub use nym_network_defaults::NymNetworkDetails;
// Re-exports from nym-socks5-client-core
/// SOCKS5 proxy configuration.
pub use nym_socks5_client_core::config::Socks5;
// Re-exports from nym-sphinx
/// The Ed25519 public key identifying a client.
pub use nym_sphinx::addressing::clients::ClientIdentity;
/// A Nym network address for sending messages. Format: `identity.encryption@gateway`.
#[doc(alias = "Address")]
#[doc(alias = "NymAddress")]
pub use nym_sphinx::addressing::clients::Recipient;
/// Error when parsing a [`Recipient`] from a string.
pub use nym_sphinx::addressing::clients::RecipientFormattingError;
/// The Ed25519 public key identifying a mix node or gateway.
pub use nym_sphinx::addressing::nodes::NodeIdentity;
/// A tag identifying an anonymous sender, used for sending replies via SURBs.
pub use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
/// A message reconstructed from Sphinx packets after traversing the mixnet.
pub use nym_sphinx::receiver::ReconstructedMessage;
// Re-exports from nym-statistics-common
/// Events related to connection statistics.
pub use nym_statistics_common::clients::connection::ConnectionStatsEvent;
/// Statistics events that can be reported by clients.
pub use nym_statistics_common::clients::ClientStatsEvents;
/// Channel for sending statistics events to be reported.
pub use nym_statistics_common::clients::ClientStatsSender;
// Re-exports from nym-task
/// Queue lengths for different transmission lanes, useful for backpressure.
pub use nym_task::connections::LaneQueueLengths;
/// Transmission lane for prioritizing different types of traffic.
pub use nym_task::connections::TransmissionLane;
// Re-exports from nym-topology
/// Trait for providing network topology information.
pub use nym_topology::provider_trait::TopologyProvider;
/// The network topology containing mix nodes, gateways, and their routing info.
pub use nym_topology::NymTopology;
@@ -0,0 +1,74 @@
# Mixnet Module — Architecture
## Overview
The `mixnet` module provides `MixnetClient` — the main handle for
connecting to the Nym mixnet, sending messages through Sphinx-encrypted
multi-hop routing, and receiving reconstructed messages.
```text
User code
┌─────────────┴─────────────┐
│ MixnetClient │
│ │
send─────┤ client_input ──────────► │──► Sphinx packets ──► mixnet
│ │
recv◄────┤ reconstructed_receiver ◄──│◄── reconstructed ◄── mixnet
│ │
│ client_state (topology, │
│ queue lengths, etc.) │
└────────────────────────────┘
```
## Client Lifecycle
1. **Build**`MixnetClientBuilder` configures endpoints, storage,
gateway preference, and optional stream settings.
Shorthand: `MixnetClient::connect_new().await`.
2. **Connect**`.build()?.connect_to_mixnet().await?` yields a
connected `MixnetClient`.
3. **Use** — send/receive in one of two modes (see below).
4. **Disconnect**`client.disconnect().await` shuts down all
background tasks.
## Two Operating Modes
**Message mode** (default): raw payload send/receive.
- `send_plain_message`, `send_message`, `send_reply`
- `wait_for_messages` / `Stream<Item = ReconstructedMessage>`
**Stream mode**: persistent `AsyncRead + AsyncWrite` channels.
- `open_stream(recipient, reply_surbs)``MixnetStream`
- `client.listener()``MixnetListener``.accept()`
- One-way transition — message-mode methods return
`Error::StreamModeActive` once activated.
- See the [`stream`] submodule for details.
## Key Types
| Type | Role |
|---|---|
| `MixnetClient` | Connected client handle |
| `MixnetClientSender` | Clone-able send-only handle (`split_sender()`) |
| `MixnetClientBuilder` | Configures and connects a client |
| `DisconnectedMixnetClient` | After `build()`, before `connect_to_mixnet()` |
| `MixnetMessageSender` | Trait shared by `MixnetClient` and `MixnetClientSender` |
| `MixnetStream` | Single `AsyncRead + AsyncWrite` byte channel |
| `MixnetListener` | Accepts inbound streams |
| `Recipient` | Nym address (`identity.encryption@gateway`) |
## Storage
- **`Ephemeral`** — in-memory, keys discarded on disconnect
- **`OnDiskPersistent`** — keys and gateway registration persisted to disk
## Sub-modules
| Module | Purpose |
|---|---|
| `client` | `MixnetClientBuilder`, `DisconnectedMixnetClient` |
| `native_client` | `MixnetClient`, `MixnetClientSender` |
| `stream` | Stream multiplexing (`MixnetStream`, `MixnetListener`) |
| `traits` | `MixnetMessageSender` trait |
| `socks5_client` | SOCKS5 proxy client variant |
+22 -2
View File
@@ -45,6 +45,26 @@ use zeroize::Zeroizing;
/// The number of reply SURBs to include in a message by default.
pub(crate) const DEFAULT_NUMBER_OF_SURBS: u32 = 10;
/// Configures and builds a [`MixnetClient`].
///
/// Use [`new_ephemeral`](Self::new_ephemeral) for in-memory keys (discarded on disconnect),
/// or `new_with_default_storage` for persistent identity that survives restarts.
///
/// # Example
///
/// ```no_run
/// use nym_sdk::mixnet::MixnetClientBuilder;
///
/// # #[tokio::main]
/// # async fn main() {
/// let client = MixnetClientBuilder::new_ephemeral()
/// .build()
/// .unwrap()
/// .connect_to_mixnet()
/// .await
/// .unwrap();
/// # }
/// ```
#[derive(Default)]
pub struct MixnetClientBuilder<S: MixnetClientStorage = Ephemeral> {
config: Config,
@@ -398,8 +418,8 @@ where
///
/// Represents a client that is not yet connected to the mixnet. You typically create one when you
/// want to have a separate configuration and connection phase. Once the mixnet client builder is
/// configured, call [`MixnetClientBuilder::connect_to_mixnet()`] or
/// [`MixnetClientBuilder::connect_to_mixnet_via_socks5()`] to transition to a connected
/// configured, call [`connect_to_mixnet()`](Self::connect_to_mixnet) or
/// [`connect_to_mixnet_via_socks5()`](Self::connect_to_mixnet_via_socks5) to transition to a connected
/// client.
pub struct DisconnectedMixnetClient<S>
where
+136 -4
View File
@@ -31,6 +31,74 @@ use tokio::sync::RwLockReadGuard;
use tokio_util::sync::CancellationToken;
/// Client connected to the Nym mixnet.
///
/// `MixnetClient` operates in one of two mutually exclusive modes:
///
/// - **Message mode** (default) — send/receive discrete payloads via
/// [`send_plain_message`](MixnetMessageSender::send_plain_message) and
/// [`wait_for_messages`](Self::wait_for_messages).
/// - **[Stream mode](super::stream)** — persistent
/// [`AsyncRead`](tokio::io::AsyncRead) + [`AsyncWrite`](tokio::io::AsyncWrite)
/// byte channels via [`open_stream`](Self::open_stream) and
/// [`listener`](Self::listener). Activated on first stream call;
/// message-mode methods return
/// [`Error::StreamModeActive`](crate::Error::StreamModeActive) thereafter.
///
/// # Quick start — messages
///
/// ```no_run
/// use nym_sdk::mixnet::{self, MixnetMessageSender};
///
/// # #[tokio::main]
/// # async fn main() {
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
/// let addr = *client.nym_address();
///
/// client.send_plain_message(addr, "hello").await.unwrap();
///
/// if let Some(msgs) = client.wait_for_messages().await {
/// for m in msgs {
/// println!("{}", String::from_utf8_lossy(&m.message));
/// }
/// }
/// client.disconnect().await;
/// # }
/// ```
///
/// # Quick start — streams
///
/// ```no_run
/// use nym_sdk::mixnet;
/// use tokio::io::{AsyncReadExt, AsyncWriteExt};
///
/// # #[tokio::main]
/// # async fn main() {
/// let mut sender = mixnet::MixnetClient::connect_new().await.unwrap();
/// let mut receiver = mixnet::MixnetClient::connect_new().await.unwrap();
/// let recv_addr = *receiver.nym_address();
///
/// let mut listener = receiver.listener().unwrap();
/// let mut tx = sender.open_stream(recv_addr, None).await.unwrap();
/// let mut rx = listener.accept().await.unwrap();
///
/// tx.write_all(b"hello stream").await.unwrap();
/// tx.flush().await.unwrap();
///
/// let mut buf = vec![0u8; 1024];
/// let n = rx.read(&mut buf).await.unwrap();
/// println!("read {} bytes", n);
///
/// sender.disconnect().await;
/// receiver.disconnect().await;
/// # }
/// ```
///
/// # Shutdown
///
/// **Always call [`disconnect`](Self::disconnect) before dropping.** The client
/// runs background tasks (gateway connection, topology refresh, SURB management)
/// that need a coordinated shutdown. Dropping without disconnecting will leak
/// these tasks and may leave state files in an inconsistent state.
pub struct MixnetClient {
/// The nym address of this connected client.
pub(crate) nym_address: Recipient,
@@ -122,7 +190,7 @@ impl MixnetClient {
///
/// #[tokio::main]
/// async fn main() {
/// let mut client = mixnet::MixnetClient::connect_new().await;
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
/// }
///
/// ```
@@ -216,7 +284,12 @@ impl MixnetClient {
self.client_state.topology_accessor.release_manual_control()
}
/// Wait for messages from the mixnet
/// Wait for messages from the mixnet.
///
/// # Cancel safety
///
/// This method is cancel safe. If cancelled before a batch is available,
/// no messages are lost — they remain in the channel for the next call.
pub async fn wait_for_messages(&mut self) -> Option<Vec<ReconstructedMessage>> {
if self.stream_mode.load(Ordering::SeqCst) {
tracing::warn!("wait_for_messages() called after stream mode activated");
@@ -248,6 +321,12 @@ impl MixnetClient {
/// Disconnect from the mixnet. Currently, it is not supported to reconnect a disconnected
/// client.
///
/// # Cancel safety
///
/// This method is **not** cancel safe. If cancelled mid-shutdown,
/// background tasks may be left running and state files may not be
/// flushed. Always let this future run to completion.
pub async fn disconnect(self) {
if self.forget_me.any() {
log::debug!("Sending forget me request: {:?}", self.forget_me);
@@ -299,7 +378,7 @@ impl MixnetClient {
{
Ok(_) => Ok(()),
Err(e) => {
error!("Failed to send forget me request: {e}");
error!("Failed to send remember me request: {e}");
Err(Error::MessageSendingFailure)
}
}
@@ -310,6 +389,34 @@ impl MixnetClient {
/// Returns a [`MixnetStream`] implementing `AsyncRead + AsyncWrite`.
/// `reply_surbs` controls how many reply SURBs are included with each
/// outbound message so the peer can reply. Defaults to 10 if `None`.
///
/// This is a one-way transition: once stream mode is active,
/// message-mode methods like [`send_plain_message`](MixnetMessageSender::send_plain_message)
/// return [`Error::StreamModeActive`](crate::Error::StreamModeActive).
///
/// # Examples
///
/// ```no_run
/// use nym_sdk::mixnet;
/// use tokio::io::AsyncWriteExt;
///
/// # #[tokio::main]
/// # async fn main() {
/// let mut sender = mixnet::MixnetClient::connect_new().await.unwrap();
/// let mut receiver = mixnet::MixnetClient::connect_new().await.unwrap();
/// let recv_addr = *receiver.nym_address();
///
/// let mut stream = sender.open_stream(recv_addr, None).await.unwrap();
/// stream.write_all(b"hello").await.unwrap();
/// stream.flush().await.unwrap();
/// # }
/// ```
///
/// # Cancel safety
///
/// This method is **not** cancel safe. Cancelling after the `Open`
/// message is sent but before the `MixnetStream` is returned will
/// leave the stream registered in the routing table with no owner.
pub async fn open_stream(
&mut self,
recipient: Recipient,
@@ -325,12 +432,37 @@ impl MixnetClient {
/// Create a listener that accepts inbound streams from remote peers.
///
/// Can only be called once.
/// Can only be called once per client. Returns
/// [`Error::ListenerAlreadyTaken`](crate::Error::ListenerAlreadyTaken) on subsequent calls.
///
/// # Examples
///
/// ```no_run
/// use nym_sdk::mixnet;
/// use tokio::io::AsyncReadExt;
///
/// # #[tokio::main]
/// # async fn main() {
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
/// let mut listener = client.listener().unwrap();
///
/// // Blocks until a remote peer opens a stream
/// if let Some(mut stream) = listener.accept().await {
/// let mut buf = vec![0u8; 1024];
/// let n = stream.read(&mut buf).await.unwrap();
/// println!("received: {}", String::from_utf8_lossy(&buf[..n]));
/// }
/// # }
/// ```
pub fn listener(&mut self) -> Result<MixnetListener> {
super::stream::listener(self)
}
}
/// A clonable handle for sending messages through a connected [`MixnetClient`].
///
/// Obtained via [`MixnetClient::split_sender`]. Implements [`MixnetMessageSender`]
/// so it can send messages independently while another task handles receiving.
pub struct MixnetClientSender {
client_input: ClientInput,
packet_type: Option<PacketType>,
+41 -1
View File
@@ -12,7 +12,47 @@ use nym_topology::{NymRouteProvider, NymTopology, NymTopologyError};
use crate::mixnet::client::MixnetClientBuilder;
use crate::Result;
/// Client connected to the Nym mixnet.
/// A SOCKS5 proxy client connected to the Nym mixnet.
///
/// `Socks5MixnetClient` provides a SOCKS5 proxy interface to the Nym mixnet,
/// allowing HTTP(S) clients and other SOCKS5-compatible applications to route
/// their traffic through the mixnet for enhanced privacy.
///
/// ## Usage
///
/// 1. Connect to a service provider via [`connect_new`](Self::connect_new)
/// 2. Get the SOCKS5 URL via [`socks5_url`](Self::socks5_url)
/// 3. Configure your HTTP client to use this SOCKS5 proxy
///
/// ## Example
///
/// ```rust,no_run
/// use nym_sdk::mixnet::Socks5MixnetClient;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// // Connect to a network requester service provider
/// let client = Socks5MixnetClient::connect_new("provider_nym_address...").await?;
///
/// // Get the SOCKS5 proxy URL
/// let socks5_url = client.socks5_url();
/// println!("Configure your HTTP client to use: {}", socks5_url);
///
/// // Your HTTP client can now use the SOCKS5 proxy
/// // let http_client = reqwest::Client::builder()
/// // .proxy(reqwest::Proxy::all(&socks5_url)?)
/// // .build()?;
///
/// client.disconnect().await;
/// Ok(())
/// }
/// ```
///
/// ## Service Providers
///
/// The SOCKS5 client connects to a "network requester" service provider that
/// makes HTTP requests on behalf of the client. The service provider's Nym
/// address must be provided when creating the client.
pub struct Socks5MixnetClient {
/// The nym address of this connected client.
pub(crate) nym_address: Recipient,
@@ -0,0 +1,118 @@
# Stream Multiplexing — Architecture
## Overview
The stream subsystem multiplexes concurrent `AsyncRead + AsyncWrite` byte
channels over a single `MixnetClient`. Each channel is a `MixnetStream`
identified by a random `StreamId`.
```text
┌─────────────────────────────────────────────────────────┐
│ MixnetClient │
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ MixnetStream │ │ MixnetStream │ ... │
│ │ (peer A) │ │ (peer B) │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │writes │writes │
│ ▼ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ ClientInput.input_sender │ │
│ └──────────────┬──────────────────┘ │
│ │ │
│ ▼ │
│ ── mixnet ── │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ reconstructed_receiver │ │
│ └──────────────┬──────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ Router task │ │
│ │ decode header → dispatch by ID │ │
│ └──┬──────────────────────────┬───┘ │
│ │ Open messages │ Data messages │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────────┐ │
│ │MixnetListener│ │ StreamMap lookup │ │
│ │ .accept() │ │ → per-stream tx │ │
│ └──────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────┘
```
## Wire Protocol (`protocol.rs`)
Every stream message uses the system-wide LP frame format (`nym-lp`).
Each message is an `LpFrame` with a 16-byte header:
```text
[LpFrameKind: 2 LE][frame_attributes: 14][payload: N bytes]
```
For streams, `LpFrameKind` is `SphinxStream` (0x0003) and the 14-byte
`frame_attributes` are parsed as `SphinxStreamFrameAttributes`:
```text
[StreamId: 8 BE][MsgType: 1][SequenceNum: 4 BE][reserved: 1]
```
- `Open` (0) — initiates a new stream
- `Data` (1) — carries payload for an existing stream
There is no `Close` type — streams clean up via `Drop` and idle timeout.
Sequence numbers enable reorder buffering (up to `MAX_REORDER_BUFFER`
out-of-order messages per stream).
## Initialization
Stream mode activates lazily on the first `open_stream()` or `listener()`
call. This is a one-way transition — message-mode APIs
(`send_plain_message`, `wait_for_messages`, etc.) return
`Error::StreamModeActive` afterwards.
On activation, `reconstructed_receiver` is handed to the router task
exclusively.
## Router Task (`run_router`)
A background task that reads inbound messages and dispatches them:
- **Open** → forwarded to `MixnetListener`'s accept channel
- **Data** → looked up in `StreamMap` by `StreamId`, forwarded to the
stream's channel
- Unrecognised messages are silently dropped
Shuts down via `CancellationToken` or when the receiver closes.
## Stream Lifecycle
**Outbound** (`open_stream`): generates a random `StreamId`, registers in
`StreamMap`, sends an `Open` message, returns a `MixnetStream`.
**Inbound** (`MixnetListener::accept`): receives an `InboundOpen` from the
router, registers in `StreamMap`, returns a `MixnetStream` using the
sender's reply SURBs.
## Cleanup
- **`Drop` on `MixnetStream`** — deregisters from `StreamMap`
- **`poll_shutdown`** — same, with a `deregistered` flag to avoid double-remove
- **Idle timeout** — streams inactive longer than `stream_idle_timeout`
(default 30 min) are reaped every 10s
## `StreamMap`
`Arc<Mutex<HashMap<StreamId, StreamEntry>>>` — shared between router,
streams, and listener. Methods: `register_stream`, `remove`,
`send_to_stream`, `cleanup_stale`.
## Known Limitations
- **No `Close` message** — there is no explicit stream-close signal.
Streams clean up locally via `Drop` and idle timeout. A proper
close/EOF mechanism requires further protocol work.
- **Reorder buffer cap** — out-of-order messages are buffered up to
`MAX_REORDER_BUFFER` (256) per stream. If a sequence number is
permanently lost, the buffer skips ahead once full.
@@ -31,15 +31,15 @@ enum Destination {
recipient: Box<Recipient>,
reply_surbs: u32,
},
/// We reply via the opener's anonymous sender tag.
/// We reply via the dialer's anonymous sender tag.
Anonymous { sender_tag: AnonymousSenderTag },
}
/// A byte stream to a single remote Nym client.
///
/// Provides `AsyncRead + AsyncWrite`. Created via
/// [`MixnetClient::open_stream`] (outbound) or
/// [`MixnetListener::accept`] (inbound).
/// [`MixnetClient::open_stream`](crate::mixnet::MixnetClient::open_stream) (outbound) or
/// [`MixnetListener::accept`](super::MixnetListener::accept) (inbound).
pub struct MixnetStream {
id: StreamId,
destination: Destination,
+13
View File
@@ -10,6 +10,11 @@
//! A background router task reads the client's `reconstructed_receiver`,
//! parses the stream header, and dispatches each payload to the right
//! stream's channel (or to the listener for `Open` messages).
//!
//! See the [tutorial](https://nymtech.net/docs/developers/rust/stream/tutorial)
//! for a step-by-step walkthrough.
//!
#![doc = include_str!("ARCHITECTURE.md")]
mod mixnet_stream;
pub(crate) mod protocol;
@@ -207,6 +212,9 @@ impl Drop for StreamState {
///
/// Created via [`MixnetClient::listener`]. Each `accept()` returns a
/// `MixnetStream` ready for reading and writing.
///
/// Only one `MixnetListener` can exist per client — a second call to
/// `listener()` returns [`Error::ListenerAlreadyTaken`].
pub struct MixnetListener {
inbound_rx: mpsc::UnboundedReceiver<InboundOpen>,
client_input: nym_client_core::client::base_client::ClientInput,
@@ -218,6 +226,11 @@ impl MixnetListener {
/// Wait for a remote peer to open a stream.
///
/// Returns `None` if the router has shut down.
///
/// # Cancel safety
///
/// This method is cancel safe. If cancelled before a stream arrives,
/// the pending `Open` message remains in the channel for the next call.
pub async fn accept(&mut self) -> Option<MixnetStream> {
loop {
let req = self.inbound_rx.recv().await?;
+39 -23
View File
@@ -8,8 +8,11 @@ use nym_client_core::client::inbound_messages::InputMessage;
use nym_sphinx::params::PacketType;
use nym_task::connections::TransmissionLane;
// defined to guarantee common interface regardless of whether you're using the full client
// or just the sending handler
/// Trait for sending messages through the Nym mixnet.
///
/// Implemented by both [`MixnetClient`](crate::mixnet::MixnetClient) and
/// [`MixnetClientSender`](crate::mixnet::MixnetClientSender), allowing code
/// to be generic over the sender type.
#[async_trait]
pub trait MixnetMessageSender {
fn packet_type(&self) -> Option<PacketType> {
@@ -18,6 +21,11 @@ pub trait MixnetMessageSender {
/// Sends a [`InputMessage`] to the mixnet. This is the most low-level sending function, for
/// full customization.
///
/// # Cancel safety
///
/// This method is cancel safe. The message is either fully queued or not
/// sent at all.
async fn send(&self, message: InputMessage) -> Result<()>;
/// Sends data to the supplied Nym address with the default surb behaviour.
@@ -27,13 +35,13 @@ pub trait MixnetMessageSender {
/// ```no_run
/// use nym_sdk::mixnet::{self, MixnetMessageSender};
///
/// #[tokio::main]
/// async fn main() {
/// let address = "foobar";
/// let recipient = mixnet::Recipient::try_from_base58_string(address).unwrap();
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
/// client.send_plain_message(recipient, "hi").await.unwrap();
/// }
/// # #[tokio::main]
/// # async fn main() {
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
/// let addr = *client.nym_address();
///
/// client.send_plain_message(addr, "hello").await.unwrap();
/// # }
/// ```
async fn send_plain_message<M>(&self, address: Recipient, message: M) -> Result<()>
where
@@ -51,14 +59,14 @@ pub trait MixnetMessageSender {
/// ```no_run
/// use nym_sdk::mixnet::{self, MixnetMessageSender};
///
/// #[tokio::main]
/// async fn main() {
/// let address = "foobar";
/// let recipient = mixnet::Recipient::try_from_base58_string(address).unwrap();
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
/// let surbs = mixnet::IncludedSurbs::default();
/// client.send_message(recipient, "hi".to_owned().into_bytes(), surbs).await.unwrap();
/// }
/// # #[tokio::main]
/// # async fn main() {
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
/// let addr = *client.nym_address();
/// let surbs = mixnet::IncludedSurbs::new(5);
///
/// client.send_message(addr, b"hello", surbs).await.unwrap();
/// # }
/// ```
async fn send_message<M>(
&self,
@@ -90,18 +98,26 @@ pub trait MixnetMessageSender {
/// Sends reply data to the supplied anonymous recipient.
///
/// The [`AnonymousSenderTag`] comes from a received message's
/// [`sender_tag`](nym_sphinx::receiver::ReconstructedMessage::sender_tag) field.
///
/// # Example
///
/// ```no_run
/// use nym_sdk::mixnet::{self, MixnetMessageSender};
///
/// #[tokio::main]
/// async fn main() {
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
/// // note: the tag is something you would have received from a remote client sending you surbs!
/// let tag = mixnet::AnonymousSenderTag::try_from_base58_string("foobar").unwrap();
/// client.send_reply(tag, b"hi").await.unwrap();
/// # #[tokio::main]
/// # async fn main() {
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
///
/// if let Some(msgs) = client.wait_for_messages().await {
/// for msg in msgs {
/// if let Some(tag) = msg.sender_tag {
/// client.send_reply(tag, b"got it!").await.unwrap();
/// }
/// }
/// }
/// # }
/// ```
async fn send_reply<M>(&self, recipient_tag: AnonymousSenderTag, message: M) -> Result<()>
where
+85 -265
View File
@@ -1,276 +1,96 @@
//! The TcpProxy Module of the Nym SDK which exposes a socket interface for the Mixnet
//! TCP proxy functionality for routing socket connections through the Nym mixnet.
//!
//! # Basic Example
//! # Deprecated
//!
//! ```no_run
//! use nym_sdk::client_pool::ClientPool;
//! use nym_sdk::mixnet::{IncludedSurbs, MixnetClientBuilder, MixnetMessageSender, NymNetworkDetails};
//! use nym_sdk::tcp_proxy::utils::{MessageBuffer, Payload, ProxiedMessage};
//! use anyhow::Result;
//! use dashmap::DashSet;
//! use nym_network_defaults::setup_env;
//! <div class="warning">
//!
//! This module is deprecated. For new projects, use the
//! [`stream`](crate::mixnet::stream) module instead, which provides
//! `AsyncRead + AsyncWrite` streams directly over the mixnet without the
//! TCP socket overhead.
//!
//! </div>
//!
//! This module provides [`NymProxyClient`] and [`NymProxyServer`] for creating
//! TCP proxy tunnels that route traffic through the Nym mixnet for enhanced privacy.
//!
//! # Architecture
//!
//! The TCP proxy system consists of two components:
//!
//! - **[`NymProxyClient`]** - Listens for local TCP connections and forwards them
//! through the mixnet to a remote `NymProxyServer`
//! - **[`NymProxyServer`]** - Receives connections from the mixnet and forwards
//! them to a local upstream service
//!
//! ```text
//! ┌─────────────┐ ┌─────────────────┐ ┌─────────────┐ ┌─────────────────┐ ┌──────────────┐
//! │ Application │────▶│ NymProxyClient │────▶│ Mixnet │────▶│ NymProxyServer │────▶│ Upstream │
//! │ (Client) │◀────│ (localhost) │◀────│ (anonymity)│◀────│ (remote) │◀────│ Service │
//! └─────────────┘ └─────────────────┘ └─────────────┘ └─────────────────┘ └──────────────┘
//! ```
//!
//! # Message Ordering
//!
//! Since the mixnet does not guarantee message ordering, the proxy implements
//! a session-based ordering system using [`MessageBuffer`] and [`ProxiedMessage`].
//! Each message includes a session ID and sequence number for proper reassembly.
//!
//! # Example
//!
//! ## Client Side
//!
//! ```rust,no_run
//! use nym_sdk::tcp_proxy::NymProxyClient;
//! use nym_sphinx::addressing::Recipient;
//! use std::sync::Arc;
//! use tokio::{
//! net::{TcpListener, TcpStream},
//! sync::oneshot,
//! };
//! use tokio_stream::StreamExt;
//! use tokio_util::codec::{BytesCodec, FramedRead};
//! use tokio_util::sync::CancellationToken;
//! use tracing::{debug, info, instrument};
//!
//! const DEFAULT_CLOSE_TIMEOUT: u64 = 60; // seconds
//! const DEFAULT_LISTEN_HOST: &str = "127.0.0.1";
//! const DEFAULT_LISTEN_PORT: &str = "8080";
//! const DEFAULT_CLIENT_POOL_SIZE: usize = 2;
//! #[tokio::main]
//! async fn main() -> anyhow::Result<()> {
//! // Parse the server's Nym address
//! let server_address: Recipient = "server_nym_address...".parse()?;
//!
//! #[derive(Clone)]
//! pub struct NymProxyClient {
//! server_address: Recipient,
//! listen_address: String,
//! listen_port: String,
//! close_timeout: u64,
//! conn_pool: ClientPool,
//! cancel_token: CancellationToken,
//! }
//! // Create a proxy client listening on localhost:8080
//! let client = NymProxyClient::new(
//! server_address,
//! "127.0.0.1",
//! "8080",
//! 60, // close timeout in seconds
//! None, // use default network
//! 2, // client pool size
//! ).await?;
//!
//! impl NymProxyClient {
//! pub async fn new(
//! server_address: Recipient,
//! listen_address: &str,
//! listen_port: &str,
//! close_timeout: u64,
//! env: Option<String>,
//! default_client_amount: usize,
//! ) -> Result<Self> {
//! debug!("Loading env file: {:?}", env);
//! setup_env(env); // Defaults to mainnet if empty
//! Ok(NymProxyClient {
//! server_address,
//! listen_address: listen_address.to_string(),
//! listen_port: listen_port.to_string(),
//! close_timeout,
//! conn_pool: ClientPool::new(default_client_amount),
//! cancel_token: CancellationToken::new(),
//! })
//! }
//!
//! // server_address is the Nym address of the NymProxyServer to communicate with.
//! pub async fn new_with_defaults(server_address: Recipient, env: Option<String>) -> Result<Self> {
//! NymProxyClient::new(
//! server_address,
//! DEFAULT_LISTEN_HOST,
//! DEFAULT_LISTEN_PORT,
//! DEFAULT_CLOSE_TIMEOUT,
//! env,
//! DEFAULT_CLIENT_POOL_SIZE,
//! )
//! .await
//! }
//!
//! pub async fn run(&self) -> Result<()> {
//! info!("Connecting to mixnet server at {}", self.server_address);
//!
//! let listener =
//! TcpListener::bind(format!("{}:{}", self.listen_address, self.listen_port)).await?;
//!
//! let client_maker = self.conn_pool.clone();
//! tokio::spawn(async move {
//! client_maker.start().await?;
//! Ok::<(), anyhow::Error>(())
//! });
//!
//! loop {
//! tokio::select! {
//! stream = listener.accept() => {
//! let (stream, _) = stream?;
//! tokio::spawn(NymProxyClient::handle_incoming(
//! stream,
//! self.server_address,
//! self.close_timeout,
//! self.conn_pool.clone(),
//! self.cancel_token.clone(),
//! ));
//! }
//! _ = self.cancel_token.cancelled() => {
//! break Ok(());
//! }
//! }
//! }
//! }
//!
//! pub async fn disconnect(&self) {
//! self.cancel_token.cancel();
//! self.conn_pool.disconnect_pool().await;
//! }
//!
//! // The main body of our logic, triggered on each accepted incoming tcp connection. To deal with assumptions about
//! // streaming we have to implement an abstract session for each set of outgoing messages atop each connection, with message
//! // IDs to deal with the fact that the mixnet does not enforce message ordering.
//! //
//! // There is an initial thread which does a bunch of setup logic
//! // - Create a random session ID
//! // - Create a Nym Client (and split into read/write clients for concurrent read/write)
//! // - Split incoming TcpStream into OwnedReadHalf and OwnedWriteHalf for concurrent read/write
//! //
//! // Then we spawn 2 tasks:
//! // - 'Outgoing' thread => frames incoming bytes from OwnedReadHalf and pipe through the mixnet & trigger session close.
//! // - 'Incoming' thread => orders incoming messages from the Mixnet via placing them in a MessageBuffer and using tick(), as well as manage session closing.
//! #[instrument(skip(stream, server_address, close_timeout, conn_pool, cancel_token))]
//! async fn handle_incoming(
//! stream: TcpStream,
//! server_address: Recipient,
//! close_timeout: u64,
//! conn_pool: ClientPool,
//! cancel_token: CancellationToken,
//! ) -> Result<()> {
//! // ID for creation of session abstraction; new session ID per new connection accepted by our tcp listener above.
//! let session_id = uuid::Uuid::new_v4();
//!
//! // Used to communicate end of session between 'Outgoing' and 'Incoming' tasks
//! let (tx, mut rx) = oneshot::channel();
//!
//! info!("Starting session: {}", session_id);
//!
//! let mut client = match conn_pool.get_mixnet_client().await {
//! Some(client) => {
//! info!("Grabbed client {} from pool", client.nym_address());
//! client
//! }
//! None => {
//! info!("Not enough clients in pool, creating ephemeral client");
//! let net = NymNetworkDetails::new_from_env();
//! let client = MixnetClientBuilder::new_ephemeral()
//! .network_details(net)
//! .build()?
//! .connect_to_mixnet()
//! .await?;
//! info!(
//! "Using {} for the moment, created outside of the connection pool",
//! client.nym_address()
//! );
//! client
//! }
//! };
//!
//! // Split our tcpstream into OwnedRead and OwnedWrite halves for concurrent read/writing
//! let (read, mut write) = stream.into_split();
//! // Since we're just trying to pipe whatever bytes our client/server are normally sending to each other,
//! // the bytescodec is fine to use here; we're trying to avoid modifying this stream e.g. in the process of Sphinx packet
//! // creation and adding padding to the payload whilst also sidestepping the need to manually manage an intermediate buffer of the
//! // incoming bytes from the tcp stream and writing them to our server with our Nym client.
//! let codec = BytesCodec::new();
//! let mut framed_read = FramedRead::new(read, codec);
//! // Much like the tcpstream, split our Nym client into a sender and receiver for concurrent read/write
//! let sender = client.split_sender();
//! // The server / service provider address our client is sending messages to will remain static
//! let server_addr = server_address;
//! // Store outgoing messages in instance of Dashset abstraction
//! let messages_account = Arc::new(DashSet::new());
//! // Wrap in an Arc for memsafe concurrent access
//! let sent_messages_account = Arc::clone(&messages_account);
//!
//! // 'Outgoing' thread
//! tokio::spawn(async move {
//! let mut message_id = 0;
//! // While able to read from OwnedReadHalf of TcpStream:
//! // - increment our messageID - we need to ensure message ordering on both client and server.
//! // - create instance of ProxiedMessage abstraction with framed bytes: this is really just the message data payload in the form of those bytes
//! // & session and messageIDs.
//! // - Serialise + send message through the mixnet to the Service Provider.
//! // - Repeat these steps, but sending a message with a payload containing a Close signal for this session; since we have message ordering implemented
//! // we can fire off the session close signal without having to wait on making sure the server has received the rest of the messages.
//! // - Trigger our session timeout alert in the 'Incoming' thread select! loop via tx end of our oneshot channel.
//! while let Some(Ok(bytes)) = framed_read.next().await {
//! message_id += 1;
//! sent_messages_account.insert(message_id);
//! let message =
//! ProxiedMessage::new(Payload::Data(bytes.to_vec()), session_id, message_id);
//! let coded_message = bincode::serialize(&message)?;
//! sender
//! .send_message(server_addr, &coded_message, IncludedSurbs::Amount(100))
//! .await?;
//! info!(
//! "Sent message with id {} for session {} of {} bytes",
//! message_id,
//! session_id,
//! bytes.len()
//! );
//! }
//! message_id += 1;
//! let message = ProxiedMessage::new(Payload::Close, session_id, message_id);
//!
//! let coded_message = bincode::serialize(&message)?;
//! sender
//! .send_message(server_addr, &coded_message, IncludedSurbs::Amount(100))
//! .await?;
//!
//! info!("Closing read end of session: {}", session_id);
//! tx.send(true)
//! .map_err(|_| anyhow::anyhow!("Could not send close signal"))?;
//! Ok::<(), anyhow::Error>(())
//! });
//!
//! // 'Incoming' thread
//! tokio::spawn(async move {
//! // Abstraction containing logic ordering: all our incoming messages need to be parsed based on their messageIDs per session.
//! // All the message-ordering and time-tracking methods are defined in utils.rs, mostly used in .tick().
//! let mut msg_buffer = MessageBuffer::new();
//! // Select!-ing one of following options:
//! // - rx is triggered by tx to log the session will end in ARGS.close_timeout time, break from this loop to pass to loop below
//! // - Deserialise incoming mixnet message, push to msg buffer and tick() to order and write to OwnedWriteHalf.
//! // - If the cancel_token is in cancelled state, break and kick down to the loop below.
//! // - Call tick() once per 100ms if neither of the above have occurred.
//! loop {
//! tokio::select! {
//! _ = &mut rx => {
//! info!("Closing write end of session: {} in {} seconds", session_id, close_timeout);
//! break
//! }
//! Some(message) = client.next() => {
//! let message = bincode::deserialize::<ProxiedMessage>(&message.message)?;
//! msg_buffer.push(message);
//! msg_buffer.tick(&mut write).await?;
//! },
//! _ = cancel_token.cancelled() => {
//! info!("CTRL_C triggered in thread, triggering loop shutdown");
//! break
//! },
//! _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
//! msg_buffer.tick(&mut write).await?;
//! }
//! }
//! }
//! // Select!-ing one of following options:
//! // - Deserialise incoming mixnet message, push to msg buffer and tick() to order and write next messageID in line to OwnedWriteHalf.
//! // - If the cancel_token is in cancelled state, shutdown client for this thread.
//! // - Sleep for session timeout and return, kills thread with Ok(()).
//! loop {
//! tokio::select! {
//! Some(message) = client.next() => {
//! let message = bincode::deserialize::<ProxiedMessage>(&message.message)?;
//! msg_buffer.push(message);
//! msg_buffer.tick(&mut write).await?;
//! },
//! _ = cancel_token.cancelled() => {
//! info!("CTRL_C triggered in thread, triggering client shutdown");
//! client.disconnect().await;
//! return Ok::<(), anyhow::Error>(())
//! },
//! _ = tokio::time::sleep(tokio::time::Duration::from_secs(close_timeout)) => {
//! info!("Closing write end of session: {}", session_id);
//! info!("Triggering client shutdown");
//! client.disconnect().await;
//! return Ok::<(), anyhow::Error>(())
//! },
//! }
//! }
//! });
//! Ok(())
//! }
//! // Run the proxy (blocks until disconnected)
//! client.run().await?;
//! Ok(())
//! }
//! ```
//!
//! ## Server Side
//!
//! ```rust,no_run
//! use nym_sdk::tcp_proxy::NymProxyServer;
//!
//! #[tokio::main]
//! async fn main() -> anyhow::Result<()> {
//! // Create a proxy server that forwards to localhost:3000
//! let mut server = NymProxyServer::new("127.0.0.1:3000", "./nym-proxy-data", None, None).await?;
//!
//! println!("Server listening at: {}", server.nym_address());
//!
//! // Run the server (blocks until disconnected)
//! server.run_with_shutdown().await?;
//! Ok(())
//! }
//! ```
//!
//! # Utilities
//!
//! The [`utils`] submodule provides the message ordering infrastructure:
//!
//! - [`ProxiedMessage`] - A message with session ID and sequence number
//! - [`MessageBuffer`] - Orders out-of-order messages before delivery
//! - [`Payload`] - Message payload (data or close signal)
//! - [`DecayWrapper`] - Tracks message age for time-based delivery
mod tcp_proxy_client;
mod tcp_proxy_server;
@@ -18,22 +18,93 @@ use tokio_util::codec::{BytesCodec, FramedRead};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument};
const DEFAULT_CLOSE_TIMEOUT: u64 = 60; // seconds
/// Default timeout in seconds before closing an idle session.
const DEFAULT_CLOSE_TIMEOUT: u64 = 60;
/// Default address to listen on for incoming TCP connections.
const DEFAULT_LISTEN_HOST: &str = "127.0.0.1";
/// Default port to listen on for incoming TCP connections.
const DEFAULT_LISTEN_PORT: &str = "8080";
/// Default number of mixnet clients to keep in the connection pool.
const DEFAULT_CLIENT_POOL_SIZE: usize = 2;
/// A TCP proxy client that tunnels local TCP connections through the Nym mixnet.
///
/// `NymProxyClient` acts as a local TCP server that accepts connections and forwards
/// them through the Nym mixnet to a remote [`NymProxyServer`](super::NymProxyServer).
/// This allows existing TCP-based applications to gain mixnet privacy without
/// modification.
///
/// ## Architecture
///
/// ```text
/// [Your App] --> [NymProxyClient] --> [Nym Mixnet] --> [NymProxyServer] --> [Target Server]
/// ```
///
/// The client:
/// 1. Listens on a local TCP address (default `127.0.0.1:8080`)
/// 2. Accepts incoming connections from your application
/// 3. Creates a unique session for each connection
/// 4. Sends data through the mixnet using Sphinx packets
/// 5. Receives responses via anonymous reply SURBs
/// 6. Handles message ordering (mixnet doesn't guarantee order)
///
/// ## Example
///
/// ```rust,no_run
/// use nym_sdk::tcp_proxy::NymProxyClient;
/// use nym_sphinx::addressing::Recipient;
///
/// #[tokio::main]
/// async fn main() -> anyhow::Result<()> {
/// // The Nym address of the NymProxyServer to connect to
/// let server_address: Recipient = "server_nym_address...".parse()?;
///
/// // Create client with default settings (listens on 127.0.0.1:8080)
/// let client = NymProxyClient::new_with_defaults(server_address, None).await?;
///
/// // Run the proxy (blocks until disconnected)
/// client.run().await?;
///
/// Ok(())
/// }
/// ```
///
/// ## Message Ordering
///
/// The Nym mixnet does not guarantee message ordering. This proxy implements
/// session-based message ordering using unique session IDs and message sequence
/// numbers, buffering out-of-order messages until they can be delivered in sequence.
#[derive(Clone)]
pub struct NymProxyClient {
/// The Nym address of the remote NymProxyServer.
server_address: Recipient,
/// Local address to listen on.
listen_address: String,
/// Local port to listen on.
listen_port: String,
/// Timeout in seconds before closing idle sessions.
close_timeout: u64,
/// Pool of pre-initialized mixnet clients for handling connections.
conn_pool: ClientPool,
/// Token for graceful shutdown.
cancel_token: CancellationToken,
}
impl NymProxyClient {
/// Creates a new `NymProxyClient` with custom configuration.
///
/// # Arguments
///
/// * `server_address` - The Nym address of the [`NymProxyServer`](super::NymProxyServer) to forward traffic to.
/// * `listen_address` - Local address to listen on (e.g., `"127.0.0.1"`).
/// * `listen_port` - Local port to listen on (e.g., `"8080"`).
/// * `close_timeout` - Seconds to wait before closing an idle session.
/// * `env` - Optional path to a `.env` file for network configuration. If `None`, uses mainnet defaults.
/// * `default_client_amount` - Number of mixnet clients to keep in the connection pool.
///
/// # Returns
///
/// A configured `NymProxyClient` ready to be started with [`run`](Self::run).
pub async fn new(
server_address: Recipient,
listen_address: &str,
@@ -54,7 +125,17 @@ impl NymProxyClient {
})
}
// server_address is the Nym address of the NymProxyServer to communicate with.
/// Creates a new `NymProxyClient` with default settings.
///
/// Uses the following defaults:
/// - Listen address: `127.0.0.1:8080`
/// - Close timeout: 60 seconds
/// - Client pool size: 2
///
/// # Arguments
///
/// * `server_address` - The Nym address of the [`NymProxyServer`](super::NymProxyServer) to forward traffic to.
/// * `env` - Optional path to a `.env` file for network configuration. If `None`, uses mainnet defaults.
pub async fn new_with_defaults(server_address: Recipient, env: Option<String>) -> Result<Self> {
NymProxyClient::new(
server_address,
@@ -67,6 +148,20 @@ impl NymProxyClient {
.await
}
/// Starts the proxy and begins accepting TCP connections.
///
/// This method:
/// 1. Binds to the configured local address and port
/// 2. Starts the client pool to maintain ready mixnet clients
/// 3. Accepts incoming TCP connections and spawns handlers for each
///
/// This method blocks until [`disconnect`](Self::disconnect) is called or
/// an error occurs.
///
/// # Returns
///
/// Returns `Ok(())` when shutdown is triggered, or an error if binding fails
/// or a connection handler encounters an unrecoverable error.
pub async fn run(&self) -> Result<()> {
info!("Connecting to mixnet server at {}", self.server_address);
@@ -98,6 +193,10 @@ impl NymProxyClient {
}
}
/// Disconnects the proxy and shuts down all active sessions.
///
/// This method cancels the accept loop, stops the client pool, and
/// disconnects all pooled clients. Active sessions will be terminated.
pub async fn disconnect(&self) {
self.cancel_token.cancel();
self.conn_pool.disconnect_pool().await;
@@ -24,19 +24,102 @@ mod utils;
use utils::{MessageBuffer, Payload, ProxiedMessage};
use uuid::Uuid;
/// A TCP proxy server that receives traffic from the Nym mixnet and forwards it to an upstream service.
///
/// `NymProxyServer` is the server-side counterpart to [`NymProxyClient`](super::NymProxyClient).
/// It listens for incoming mixnet messages and forwards them to a local TCP service,
/// then sends responses back through the mixnet using anonymous reply SURBs.
///
/// ## Architecture
///
/// ```text
/// [NymProxyClient] --> [Nym Mixnet] --> [NymProxyServer] --> [Upstream Service]
/// <--
/// ```
///
/// The server:
/// 1. Maintains a persistent Nym address (stored in `config_dir`)
/// 2. Receives messages from the mixnet
/// 3. Creates TCP connections to the upstream service for each session
/// 4. Forwards data bidirectionally, handling message ordering
/// 5. Uses anonymous reply SURBs to send responses back to clients
///
/// ## Example
///
/// ```rust,no_run
/// use nym_sdk::tcp_proxy::NymProxyServer;
///
/// #[tokio::main]
/// async fn main() -> anyhow::Result<()> {
/// // Forward traffic to a local HTTP server
/// let mut server = NymProxyServer::new(
/// "127.0.0.1:8000", // Upstream service address
/// "./nym-proxy-data", // Config directory for persistent keys
/// None, // Use mainnet (or path to .env)
/// None, // Use random gateway
/// ).await?;
///
/// println!("Server Nym address: {}", server.nym_address());
///
/// // Run the server (blocks until shutdown signal)
/// server.run_with_shutdown().await?;
///
/// Ok(())
/// }
/// ```
///
/// ## Persistence
///
/// Unlike `NymProxyClient`, the server maintains a persistent Nym address stored in
/// `config_dir`. This allows clients to connect to a known address across server restarts.
///
/// ## Shutdown
///
/// To gracefully shut down the server, use the shutdown signal channel:
///
/// ```rust,no_run
/// # use nym_sdk::tcp_proxy::NymProxyServer;
/// # async fn example(mut server: NymProxyServer) {
/// let shutdown_tx = server.disconnect_signal();
/// // Later, trigger shutdown:
/// shutdown_tx.send(()).await.unwrap();
/// # }
/// ```
pub struct NymProxyServer {
/// Address of the upstream TCP service to forward traffic to.
upstream_address: String,
/// Tracks active session IDs.
session_map: DashSet<Uuid>,
/// The underlying mixnet client for receiving messages.
mixnet_client: MixnetClient,
/// Sender half of the mixnet client for replying to clients.
mixnet_client_sender: Arc<RwLock<MixnetClientSender>>,
/// Channel for broadcasting incoming messages to session handlers.
tx: tokio::sync::watch::Sender<Option<(ProxiedMessage, AnonymousSenderTag)>>,
/// Receiver for incoming message broadcasts.
rx: tokio::sync::watch::Receiver<Option<(ProxiedMessage, AnonymousSenderTag)>>,
/// Token for graceful shutdown of session handlers.
cancel_token: CancellationToken,
/// Channel for receiving shutdown signals.
shutdown_tx: tokio::sync::mpsc::Sender<()>,
/// Receiver for shutdown signals.
shutdown_rx: tokio::sync::mpsc::Receiver<()>,
}
impl NymProxyServer {
/// Creates a new `NymProxyServer` that forwards traffic to an upstream service.
///
/// # Arguments
///
/// * `upstream_address` - The address of the upstream TCP service (e.g., `"127.0.0.1:8000"`).
/// * `config_dir` - Directory to store persistent client keys and configuration.
/// The server will maintain the same Nym address across restarts if this directory persists.
/// * `env` - Optional path to a `.env` file for network configuration. If `None`, uses mainnet defaults.
/// * `gateway` - Optional specific gateway to connect to. If `None`, a gateway is selected automatically.
///
/// # Returns
///
/// A configured `NymProxyServer` ready to be started with [`run_with_shutdown`](Self::run_with_shutdown).
pub async fn new(
upstream_address: &str,
config_dir: &str,
@@ -94,6 +177,20 @@ impl NymProxyServer {
})
}
/// Runs the server until a shutdown signal is received.
///
/// This method:
/// 1. Listens for incoming mixnet messages
/// 2. Creates session handlers for new sessions
/// 3. Routes messages to appropriate session handlers
/// 4. Handles shutdown gracefully when signaled
///
/// Use [`disconnect_signal`](Self::disconnect_signal) to get a sender for triggering shutdown.
///
/// # Returns
///
/// Returns `Ok(())` when shutdown is triggered, or an error if an unrecoverable
/// error occurs.
pub async fn run_with_shutdown(&mut self) -> Result<()> {
let handle_token = self.cancel_token.child_token();
let upstream_address = self.upstream_address.clone();
@@ -268,30 +365,66 @@ impl NymProxyServer {
Ok(())
}
/// Returns a sender that can be used to trigger server shutdown.
///
/// Send `()` on this channel to initiate graceful shutdown of the server.
///
/// # Example
///
/// ```rust,no_run
/// # use nym_sdk::tcp_proxy::NymProxyServer;
/// # async fn example(server: &NymProxyServer) {
/// let shutdown_tx = server.disconnect_signal();
///
/// // Trigger shutdown from another task
/// tokio::spawn(async move {
/// tokio::time::sleep(std::time::Duration::from_secs(60)).await;
/// shutdown_tx.send(()).await.unwrap();
/// });
/// # }
/// ```
pub fn disconnect_signal(&self) -> tokio::sync::mpsc::Sender<()> {
self.shutdown_tx.clone()
}
/// Returns the Nym address of this server.
///
/// Clients need this address to connect to the server through the mixnet.
/// This address is persistent across server restarts if the same `config_dir`
/// is used.
pub fn nym_address(&self) -> &Recipient {
self.mixnet_client.nym_address()
}
/// Returns a mutable reference to the underlying mixnet client.
///
/// This is primarily for internal use and advanced scenarios.
pub fn mixnet_client_mut(&mut self) -> &mut MixnetClient {
&mut self.mixnet_client
}
/// Returns the set of currently active session IDs.
pub fn session_map(&self) -> &DashSet<Uuid> {
&self.session_map
}
/// Returns a clone of the mixnet client sender wrapped in an `Arc<RwLock>`.
///
/// This is primarily for internal use by session handlers.
pub fn mixnet_client_sender(&self) -> Arc<RwLock<MixnetClientSender>> {
Arc::clone(&self.mixnet_client_sender)
}
/// Returns a clone of the message broadcast sender.
///
/// This is primarily for internal use.
pub fn tx(&self) -> tokio::sync::watch::Sender<Option<(ProxiedMessage, AnonymousSenderTag)>> {
self.tx.clone()
}
/// Returns a clone of the message broadcast receiver.
///
/// This is primarily for internal use by session handlers.
pub fn rx(&self) -> tokio::sync::watch::Receiver<Option<(ProxiedMessage, AnonymousSenderTag)>> {
self.rx.clone()
}
+125 -9
View File
@@ -1,6 +1,11 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
//! Utility types for TCP proxy message handling and ordering.
//!
//! The Nym mixnet does not guarantee message ordering, so these utilities implement
//! session-based message reordering using sequence numbers and time-based decay.
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::{collections::HashSet, fmt, ops::Deref, time::Instant};
@@ -8,18 +13,49 @@ use tokio::{io::AsyncWriteExt as _, net::tcp::OwnedWriteHalf};
use tracing::{debug, info};
use uuid::Uuid;
const DEFAULT_DECAY: u64 = 6; // decay time in seconds
/// Default decay time in seconds before a message is considered "old" and processed regardless of order.
const DEFAULT_DECAY: u64 = 6;
// Keeps track of
// - incoming and unsorted messages wrapped in DecayWrapper for keeping track of when they were received
// - the expected next message ID (reset on .tick())
/// A buffer for reordering out-of-order messages from the mixnet.
///
/// Messages arriving through the Nym mixnet may be received out of order due to
/// the probabilistic nature of mix node routing. `MessageBuffer` collects incoming
/// messages and reorders them based on their sequence numbers before writing to
/// the output stream.
///
/// ## Ordering Strategy
///
/// The buffer uses two strategies to determine when to deliver messages:
///
/// 1. **Sequence ordering**: Messages are delivered in sequence number order when
/// the expected next message arrives.
///
/// 2. **Decay-based delivery**: If a message has been waiting longer than
/// 6 seconds (the default decay timeout), it is delivered even if
/// earlier messages haven't arrived. This prevents indefinite blocking when
/// messages are lost.
///
/// ## Usage
///
/// ```text
/// let mut buffer = MessageBuffer::new();
///
/// // Push incoming messages (may be out of order)
/// buffer.push(message);
///
/// // Periodically tick to process and write ready messages
/// let should_close = buffer.tick(&mut write_half).await?;
/// ```
#[derive(Debug, Default)]
pub struct MessageBuffer {
/// Buffered messages wrapped with timing information.
buffer: Vec<DecayWrapper<ProxiedMessage>>,
/// The next expected message ID in sequence.
next_msg_id: u16,
}
impl MessageBuffer {
/// Creates a new empty message buffer.
pub fn new() -> Self {
MessageBuffer {
buffer: Vec::new(),
@@ -27,10 +63,14 @@ impl MessageBuffer {
}
}
/// Adds a message to the buffer for reordering.
///
/// The message is wrapped in a [`DecayWrapper`] to track when it was received.
pub fn push(&mut self, msg: ProxiedMessage) {
self.buffer.push(DecayWrapper::new(msg));
}
/// Retains only messages that satisfy the predicate.
pub fn retain<F>(&mut self, f: F)
where
F: FnMut(&DecayWrapper<ProxiedMessage>) -> bool,
@@ -38,22 +78,39 @@ impl MessageBuffer {
self.buffer.retain(f);
}
/// Returns the number of messages currently buffered.
#[allow(dead_code)]
pub fn len(&self) -> usize {
self.buffer.len()
}
/// Returns `true` if the buffer contains no messages.
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
/// Returns an iterator over buffered messages.
pub fn iter(&self) -> std::slice::Iter<'_, DecayWrapper<ProxiedMessage>> {
self.buffer.iter()
}
// Used by the client to create and manipulate a buffer of messages to write => OwnedWriteHalf.
// Used by the server for this + to conditionally decide whether to kill a session on returning true.
// #[instrument]
/// Processes buffered messages and writes ready ones to the output stream.
///
/// This method should be called periodically (e.g., every 100ms) to process
/// buffered messages. It:
///
/// 1. Identifies messages ready for delivery (in sequence or decayed)
/// 2. Sorts them by message ID
/// 3. Writes data payloads to the output stream
/// 4. Updates the expected next message ID
/// 5. Removes delivered messages from the buffer
///
/// # Returns
///
/// - `Ok(true)` if a [`Payload::Close`] message was encountered, indicating
/// the session should be closed.
/// - `Ok(false)` if processing completed normally.
/// - `Err` if writing to the stream failed.
pub async fn tick(&mut self, write: &mut OwnedWriteHalf) -> Result<bool> {
if self.is_empty() {
return Ok(false);
@@ -107,21 +164,41 @@ impl MessageBuffer {
}
}
// Wrapper used for tracking the 'age' of a message from when it was received.
// Used in the ordering logic in MessageBuffer.tick().
/// A wrapper that tracks the age of a value since it was created.
///
/// `DecayWrapper` is used by [`MessageBuffer`] to implement time-based message
/// delivery. Messages that have been waiting longer than the decay threshold
/// are delivered even if they're out of sequence, preventing indefinite blocking
/// when earlier messages are lost in the network.
///
/// ## Decay Behavior
///
/// A wrapped value is considered "decayed" when it has existed for longer than
/// the decay duration (default: 6 seconds). The [`decayed`](Self::decayed) method
/// checks this condition.
#[derive(Debug)]
pub struct DecayWrapper<T> {
/// The wrapped value.
value: T,
/// When this wrapper was created.
start: Instant,
/// Decay threshold in seconds.
decay: u64,
}
impl<T> DecayWrapper<T> {
/// Returns `true` if this wrapper has existed longer than the decay threshold.
///
/// Used by [`MessageBuffer::tick`] to determine if a message should be
/// delivered regardless of its sequence position.
pub fn decayed(&self) -> bool {
debug!("Decayed: {:?}", self.start.elapsed().as_secs() > self.decay);
self.start.elapsed().as_secs() > self.decay
}
/// Creates a new decay wrapper around the given value.
///
/// The decay timer starts immediately upon creation.
pub fn new(value: T) -> Self {
DecayWrapper {
value,
@@ -130,11 +207,13 @@ impl<T> DecayWrapper<T> {
}
}
/// Consumes the wrapper and returns the inner value.
#[allow(dead_code)]
pub fn into_inner(self) -> T {
self.value
}
/// Returns a reference to the inner value.
pub fn inner(&self) -> &T {
&self.value
}
@@ -148,14 +227,39 @@ impl<T> Deref for DecayWrapper<T> {
}
}
/// A message sent through the TCP proxy over the Nym mixnet.
///
/// `ProxiedMessage` encapsulates the data being proxied along with metadata
/// needed for session management and message ordering.
///
/// ## Fields
///
/// - `message`: The actual payload (data bytes or close signal)
/// - `session_id`: Unique identifier for this TCP session
/// - `message_id`: Sequence number for ordering messages within a session
///
/// ## Serialization
///
/// Messages are serialized using `bincode` for efficient transmission through
/// the mixnet.
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ProxiedMessage {
/// The message payload.
pub message: Payload,
/// Unique session identifier (one per TCP connection).
pub session_id: Uuid,
/// Sequence number for message ordering within the session.
pub message_id: u16,
}
impl ProxiedMessage {
/// Creates a new proxied message.
///
/// # Arguments
///
/// * `message` - The payload to send.
/// * `session_id` - The session this message belongs to.
/// * `message_id` - The sequence number of this message within the session.
pub fn new(message: Payload, session_id: Uuid, message_id: u16) -> Self {
ProxiedMessage {
message,
@@ -164,22 +268,34 @@ impl ProxiedMessage {
}
}
/// Returns a reference to the message payload.
pub fn message(&self) -> &Payload {
&self.message
}
/// Returns the session ID this message belongs to.
pub fn session_id(&self) -> Uuid {
self.session_id
}
/// Returns the sequence number of this message.
pub fn message_id(&self) -> u16 {
self.message_id
}
}
/// The payload of a proxied message.
///
/// Each message through the TCP proxy contains either actual data bytes
/// or a control signal to close the session.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum Payload {
/// Raw data bytes to be forwarded.
Data(Vec<u8>),
/// Signal to close the session.
///
/// When received, the session handler should finish processing any
/// remaining buffered messages and then close the connection.
Close,
}