Compare commits

...

1 Commits

Author SHA1 Message Date
nikss31 2c717c0ebd Created an RTT Tester Client 2026-01-07 22:49:06 +00:00
18 changed files with 1886 additions and 14 deletions
Generated
+17
View File
@@ -5204,6 +5204,7 @@ dependencies = [
"nym-task",
"nym-topology",
"nym-validator-client",
"once_cell",
"rand 0.8.5",
"rand_chacha 0.3.1",
"serde",
@@ -5279,6 +5280,22 @@ dependencies = [
"tracing",
]
[[package]]
name = "nym-client-rtt-tester"
version = "0.1.0"
dependencies = [
"nym-client-core",
"nym-config",
"nym-crypto",
"nym-network-defaults",
"nym-sdk",
"nym-sphinx",
"nym-task",
"nym-topology",
"tokio",
"tracing",
]
[[package]]
name = "nym-client-wasm"
version = "1.4.1"
+1
View File
@@ -18,6 +18,7 @@ resolver = "2"
members = [
"clients/native",
"clients/native/websocket-requests",
"clients/rtt-tester",
"clients/socks5",
"common/async-file-watcher",
"common/authenticator-requests",
+22
View File
@@ -0,0 +1,22 @@
[package]
name = "nym-client-rtt-tester"
version = "0.1.0"
edition = "2021"
description = "RTT testing client built using nym-client-core"
license.workspace = true
[[bin]]
name = "nym-client-rtt-tester"
path = "src/main.rs"
[dependencies]
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
nym-client-core = { path = "../../common/client-core" }
nym-network-defaults = { path = "../../common/network-defaults" }
nym-sphinx = { path = "../../common/nymsphinx" }
nym-topology = { path = "../../common/topology" }
nym-config = { path = "../../common/config" }
nym-task = { path = "../../common/task" }
nym-crypto = { path = "../../common/crypto" }
nym-sdk = { path = "../../sdk/rust/nym-sdk" }
+466
View File
@@ -0,0 +1,466 @@
use nym_sdk::mixnet;
use nym_sdk::mixnet::MixnetMessageSender;
use nym_client_core::client::rtt_analyzer::{RttAnalyzer, RttConfig, RttEvent, RttPattern};
use nym_sdk::DebugConfig;
use tokio::io::{self, AsyncBufReadExt};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// ============================================================
// 1. Start RTT Analyzer + background worker
// ============================================================
let _analyzer = RttAnalyzer::new();
let tx = RttAnalyzer::producer().expect("Analyzer was not initialized!");
// ============================================================
// 2. Build mixnet client
// ============================================================
let mut debug = DebugConfig::default();
// Disable ALL Poisson & cover streams
debug.traffic.disable_main_poisson_packet_distribution = true;
debug.cover_traffic.disable_loop_cover_traffic_stream = false;
let client = mixnet::MixnetClientBuilder::new_ephemeral()
.debug_config(debug)
.build()
.unwrap();
let client = client.connect_to_mixnet().await.unwrap();
let our_address = client.nym_address();
println!("Our client nym address is: {our_address}");
// ============================================================
// 3. Ask the user for RTT TEST configuration
// ============================================================
let config = ask_user_for_rtt_config().await;
println!("\nStarting RTT test with:");
println!(" packets_per_route = {}", config.packets_per_route);
println!(" pattern = {:?}", config.pattern);
println!(" delay (ms) = {}", config.inter_route_delay_ms);
// START THE TEST
let _ = client
.send_rtt_test(*our_address, None, tx.clone(), config)
.await
.unwrap();
// ============================================================
// 4. Background listener for incoming messages
// ============================================================
tokio::spawn({
let mut client = client;
async move {
loop {
if client.wait_for_messages().await.is_some() {
//I should do something here to shutdown
}
sleep(Duration::from_millis(10)).await;
}
}
});
// ============================================================
// 5. Main input loop
// ============================================================
let stdin = io::BufReader::new(io::stdin());
let mut lines = stdin.lines();
println!("Type 'menu' to show RTT commands.");
loop {
if let Ok(Some(input)) = lines.next_line().await {
let input = input.trim().to_lowercase();
if input == "menu" {
show_menu_and_handle_choice(&tx).await;
}
}
sleep(Duration::from_millis(50)).await;
}
}
// =====================================================================
// ASK USER FOR RTT TEST SETTINGS AT PROGRAM START
// =====================================================================
async fn ask_user_for_rtt_config() -> RttConfig {
let stdin = io::BufReader::new(io::stdin());
let mut lines = stdin.lines();
println!("\n========== RTT TEST CONFIGURATION ==========");
// -----------------------------
// Ask for packets per route
// -----------------------------
println!("Enter number of packets per route: ");
let packets = read_u32_from_stdin(&mut lines).await;
// -----------------------------
// Ask for pattern: Burst / RR
// -----------------------------
println!("Choose pattern:");
println!(" 1) Burst");
println!(" 2) Round Robin");
let pattern = loop {
let input = read_string(&mut lines).await;
match input.as_str() {
"1" => break RttPattern::Burst,
"2" => break RttPattern::RoundRobin,
_ => println!("Invalid choice! Please type 1 or 2:"),
}
};
// -----------------------------
// Ask for delay between packets
// -----------------------------
println!("Enter delay between packets (ms): ");
let delay = read_u64_from_stdin(&mut lines).await;
// Build Config
RttConfig {
packets_per_route: packets,
pattern,
inter_route_delay_ms: delay,
}
}
// =====================================================================
// Util functions for reading typed input
// =====================================================================
async fn read_string(lines: &mut tokio::io::Lines<io::BufReader<io::Stdin>>) -> String {
loop {
if let Ok(Some(line)) = lines.next_line().await {
let trimmed = line.trim().to_string();
if !trimmed.is_empty() {
return trimmed;
}
}
println!("Please type a value:");
}
}
async fn read_u32_from_stdin(lines: &mut tokio::io::Lines<io::BufReader<io::Stdin>>) -> u32 {
loop {
if let Ok(Some(line)) = lines.next_line().await {
if let Ok(num) = line.trim().parse::<u32>() {
return num;
}
}
println!("Invalid number, try again:");
}
}
async fn read_u64_from_stdin(lines: &mut tokio::io::Lines<io::BufReader<io::Stdin>>) -> u64 {
loop {
if let Ok(Some(line)) = lines.next_line().await {
if let Ok(num) = line.trim().parse::<u64>() {
return num;
}
}
println!("Invalid number, try again:");
}
}
// =====================================================================
// MENU HANDLER (FULL VERSION WITH HELP / DOCS)
// =====================================================================
async fn show_menu_and_handle_choice(tx: &tokio::sync::mpsc::Sender<RttEvent>) {
println!("\n======================== RTT MENU ========================");
println!("1) Print global RTT statistics");
println!("2) Write statistics to CSV file");
println!("3) Print route details by ROUTE INDEX");
println!("4) Print route details by ROUTE NODES STRING");
println!("5) Print routes with AVG RTT above threshold");
println!("6) Print routes with ANY RTT above threshold");
println!("7) Help (Show all commands & how to use them)");
println!("8) Write CSV and generate RTT histogram(s) with Python");
println!("9) Show overall experiment completion percentage");
println!("===========================================================");
print!("Select option: ");
use std::io::Write;
std::io::stdout().flush().unwrap();
let mut input = String::new();
let _ = std::io::stdin().read_line(&mut input);
let choice = input.trim();
match choice {
// -------------------- 1. PRINT GLOBAL STATS --------------------
"1" => {
let _ = tx.send(RttEvent::PrintStats).await;
}
// -------------------- 2. WRITE STATS ---------------------------
"2" => {
print!("Enter file path: ");
std::io::stdout().flush().unwrap();
let mut path = String::new();
let _ = std::io::stdin().read_line(&mut path);
let path = path.trim().to_string();
let _ = tx.send(RttEvent::WriteStats { path }).await;
}
// -------------------- 3. PRINT ROUTE DETAILS -------------------
"3" => {
print!("Enter route index (0-based): ");
std::io::stdout().flush().unwrap();
let mut s = String::new();
let _ = std::io::stdin().read_line(&mut s);
if let Ok(index) = s.trim().parse::<usize>() {
let _ = tx
.send(RttEvent::PrintRouteDetail { route_index: index })
.await;
} else {
println!("Invalid index.");
}
}
// -------------------- 4. PRINT STATS BY NODE STRING -----------
"4" => {
println!("Enter Node String EXACTLY as stored.");
println!("Example format:");
println!(" <base58_node1> > <base58_node2> > <base58_node3>");
print!("Nodes: ");
std::io::stdout().flush().unwrap();
let mut nodes = String::new();
let _ = std::io::stdin().read_line(&mut nodes);
let nodes = nodes.trim().to_string();
let _ = tx.send(RttEvent::PrintRouteStatsByNodes { nodes }).await;
}
// -------------------- 5. AVG ABOVE THRESHOLD ------------------
"5" => {
print!("Enter threshold in ms: ");
std::io::stdout().flush().unwrap();
let mut s = String::new();
let _ = std::io::stdin().read_line(&mut s);
if let Ok(th) = s.trim().parse::<u128>() {
let _ = tx
.send(RttEvent::PrintRoutesWithAvgAbove { threshold_ms: th })
.await;
} else {
println!("Invalid number.");
}
}
// -------------------- 6. ANY ABOVE THRESHOLD ------------------
"6" => {
print!("Enter threshold in ms: ");
std::io::stdout().flush().unwrap();
let mut s = String::new();
let _ = std::io::stdin().read_line(&mut s);
if let Ok(th) = s.trim().parse::<u128>() {
let _ = tx
.send(RttEvent::PrintRoutesWithAnyAbove { threshold_ms: th })
.await;
} else {
println!("Invalid number.");
}
}
// -------------------- 7. HELP --------------------------------
"7" => {
print_help();
}
"8" => {
// Ask for CSV path
print!("Enter CSV output path (e.g. rtt_stats.csv): ");
std::io::stdout().flush().unwrap();
let mut path = String::new();
let _ = std::io::stdin().read_line(&mut path);
let path = path.trim().to_string();
// Sub-menu for histogram mode
println!("\nHistogram mode:");
println!(" 1) One plot with ALL RTT samples (including outliers)");
println!(" 2) One plot with INLIERS only (RTT <= cutoff)");
println!(" 3) TWO plots: one for INLIERS and one for OUTLIERS");
print!("Select mode: ");
std::io::stdout().flush().unwrap();
let mut mode_input = String::new();
let _ = std::io::stdin().read_line(&mut mode_input);
let mode_choice = mode_input.trim();
let outlier_mode = match mode_choice {
// 1) All RTTs
"1" => "all".to_string(),
// 2) Only inliers, ask for cutoff in seconds
"2" => {
print!("Enter cutoff in seconds (e.g. 1.0 for 1 second): ");
std::io::stdout().flush().unwrap();
let mut c = String::new();
let _ = std::io::stdin().read_line(&mut c);
let cutoff = c.trim();
cutoff.to_string() // e.g. "1.0"
}
// 3) Two plots: inliers + outliers (both)
"3" => {
print!("Enter cutoff in seconds (e.g. 1.0 for 1 second): ");
std::io::stdout().flush().unwrap();
let mut c = String::new();
let _ = std::io::stdin().read_line(&mut c);
let cutoff = c.trim();
// Encode as 'both:<cutoff>' so Python can understand it
format!("both:{cutoff}")
}
_ => {
println!("Invalid mode, aborting histogram generation.");
return;
}
};
let _ = tx
.send(RttEvent::WriteStatsAndPlot { path, outlier_mode })
.await;
}
"9" => {
// Send an event to the RTT analyzer to compute and print progress
let _ = tx.send(RttEvent::PrintExperimentProgress).await;
}
_ => println!("Invalid selection."),
}
}
fn print_help() {
println!("\n======================== RTT HELP ========================\n");
println!("This tool allows you to perform detailed RTT analysis over all mixnet routes.");
println!("The client sends RTT probe traffic through every candidate route,");
println!("and the RTT analyzer collects per-route statistics in the background.\n");
println!("Main commands (from the RTT menu):\n");
println!(" 1) Print global RTT statistics");
println!(" Prints one summary line per route:");
println!(" - route index");
println!(" - packets sent (including retransmissions)");
println!(" - number of ACKs");
println!(" - number of timeouts");
println!(" - average RTT (computed over all stored RTT samples, in ms)");
println!();
println!(" 2) Write stats to CSV file");
println!(" Writes one line per route to a CSV file on disk.");
println!(" Current CSV columns:");
println!(" route,sent,acks,timeouts,avg_rtt");
println!(" route : numeric route index");
println!(" sent : how many FragmentSent events were recorded");
println!(" acks : how many FragmentAckReceived events were recorded");
println!(" timeouts : how many FragmentAckExpired events were recorded");
println!(" avg_rtt : average RTT (in milliseconds) from all RTT samples");
println!();
println!(" 3) Print route details BY ROUTE INDEX");
println!(" Input: a 0-based route index.");
println!(" Output for that route:");
println!(" - node list (base58 identities) in order: Node1 > Node2 > Node3");
println!(" - ALL RTT samples recorded for that route (each sample shown in ms)");
println!(" This is useful when you already know the route index and");
println!(" want to inspect exactly how it behaves packet by packet.");
println!();
println!(" 4) Print route details BY NODE STRING");
println!(" Input format must match exactly what the analyzer stored, for example:");
println!(" <node1_base58> > <node2_base58> > <node3_base58>");
println!(" If a route with that node sequence exists, the tool will:");
println!(" - print the matching route index");
println!(" - print the full per-route detail (same as option 3).");
println!(" This is useful when you have a specific mixnode combination");
println!(" (e.g. a slow or suspicious path) and want its statistics.");
println!();
println!(" 5) Print routes with AVERAGE RTT ABOVE a threshold");
println!(" You provide a threshold in milliseconds (e.g. 150).");
println!(" The tool will:");
println!(" - compute avg RTT for each route");
println!(" - select only routes where avg RTT > threshold");
println!(" - print detailed info for each matching route (nodes + RTT samples).");
println!(" Use this to quickly find generally slow routes.");
println!();
println!(" 6) Print routes with ANY RTT ABOVE a threshold");
println!(" You provide a threshold in milliseconds (e.g. 500).");
println!(" For each route, if at least one RTT sample exceeds the threshold,");
println!(" that route is printed with full details.");
println!(" Use this to find routes that occasionally spike very high,");
println!(" even if their average RTT is still acceptable.");
println!();
println!(" 7) Show experiment progress (percentage completed)");
println!(" Uses the stored experiment configuration (total_routes, packets_per_route)");
println!(" plus the number of RTT samples recorded so far to estimate:");
println!(" completion = received_samples / (total_routes * packets_per_route)");
println!(" The result is printed as a percentage (0%100%).");
println!(" This tells you roughly how far the RTT experiment has progressed.");
println!();
println!(" 8) Write stats AND generate histogram(s) via Python");
println!(" This command will:");
println!(" 1) Write the current route statistics to a CSV file (same as option 2).");
println!(" 2) Call the Python script 'rtt_histogram.py' to visualize RTTs.");
println!();
println!(" When prompted, you will provide two things:");
println!(" - CSV file path (where to save the stats)");
println!(" - outlier_mode string, which controls which histograms are generated:");
println!();
println!("\"all\"");
println!(" Use ALL avg_rtt values from the CSV.");
println!(
" Result: a single histogram containing every route's avg RTT (in seconds)."
);
println!();
println!("\"<cutoff>\" (numeric, in seconds, e.g. \"1.0\")");
println!(" Only keep avg_rtt <= cutoff.");
println!(" Result: a single histogram with INLIERS only (values <= cutoff).");
println!(
" Example: \"1.0\" keeps everything at or below 1.0s and drops slower routes."
);
println!();
println!("\"both:<cutoff>\" (e.g. \"both:1.0\")");
println!(" Split the data into two sets:");
println!(" - inliers : avg_rtt <= cutoff");
println!(" - outliers : avg_rtt > cutoff");
println!(" Result: TWO histograms are generated:");
println!(" 1) Distribution of inliers");
println!(" 2) Distribution of outliers");
println!(
" This helps visually compare the \"normal\" routes and the very slow ones."
);
println!();
println!("Helpful notes:");
println!(" • RTT samples are computed when a FragmentReceived event arrives.");
println!(" For each fragment that may be retransmitted, the analyzer stores");
println!(" multiple send times and receive times, and pairs them in order");
println!(" to compute multiple RTT values for that fragment if needed.");
println!("\"sent\" in the stats includes retransmissions as well, so it may be");
println!(" higher than packets_per_route for unstable routes.");
println!("===========================================================\n");
}
+2
View File
@@ -29,6 +29,8 @@ time = { workspace = true }
tokio = { workspace = true, features = ["sync", "macros"] }
tracing = { workspace = true }
zeroize = { workspace = true }
once_cell = "1.19"
# internal
nym-id = { path = "../nym-id" }
@@ -1,11 +1,13 @@
// Copyright 2020-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::rtt_analyzer::{RttConfig, RttEvent};
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::params::PacketType;
use nym_task::connections::TransmissionLane;
use tokio::sync::mpsc::Sender;
pub type InputMessageSender = tokio::sync::mpsc::Sender<InputMessage>;
pub type InputMessageReceiver = tokio::sync::mpsc::Receiver<InputMessage>;
@@ -46,6 +48,13 @@ pub enum InputMessage {
lane: TransmissionLane,
max_retransmissions: Option<u32>,
},
RunRTTTest {
recipient: Recipient,
lane: TransmissionLane,
max_retransmissions: Option<u32>,
sender: Sender<RttEvent>,
config: RttConfig,
},
/// Attempt to use our internally received and stored `ReplySurb` to send the message back
/// to specified recipient whilst not knowing its full identity (or even gateway).
@@ -150,6 +159,7 @@ impl InputMessage {
match self {
InputMessage::Regular { lane, .. }
| InputMessage::Anonymous { lane, .. }
| InputMessage::RunRTTTest { lane, .. }
| InputMessage::Reply { lane, .. }
| InputMessage::Premade { lane, .. } => lane,
InputMessage::MessageWrapper { message, .. } => message.lane(),
@@ -166,6 +176,10 @@ impl InputMessage {
max_retransmissions: m,
..
}
| InputMessage::RunRTTTest {
max_retransmissions: m,
..
}
| InputMessage::Reply {
max_retransmissions: m,
..
+1
View File
@@ -11,6 +11,7 @@ pub mod mix_traffic;
pub mod real_messages_control;
pub mod received_buffer;
pub mod replies;
pub mod rtt_analyzer;
pub mod statistics_control;
pub mod topology_control;
pub(crate) mod transmission_buffer;
@@ -4,6 +4,7 @@
use super::action_controller::{AckActionSender, Action};
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use crate::client::rtt_analyzer::{RttAnalyzer, RttEvent};
use futures::StreamExt;
use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::{
@@ -12,6 +13,7 @@ use nym_sphinx::{
};
use nym_task::ShutdownToken;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::*;
/// Module responsible for listening for any data resembling acknowledgements from the network
@@ -38,7 +40,11 @@ impl AcknowledgementListener {
}
}
async fn on_ack(&mut self, ack_content: Vec<u8>) {
async fn on_ack(
&mut self,
ack_content: Vec<u8>,
rtt_producer: Option<tokio::sync::mpsc::Sender<RttEvent>>,
) {
trace!("Received an ack");
self.stats_tx
.report(PacketStatisticsEvent::AckReceived(ack_content.len()).into());
@@ -62,6 +68,16 @@ impl AcknowledgementListener {
return;
}
if let Some(ref producer) = rtt_producer {
if let Ok(duration) = SystemTime::now().duration_since(UNIX_EPOCH) {
let now = duration.as_millis();
let _ = producer.try_send(RttEvent::FragmentAckReceived {
fragment_id: frag_id.set_id().to_string(),
timestamp: now,
});
}
}
trace!("Received {frag_id} from the mix network");
self.stats_tx
.report(PacketStatisticsEvent::RealAckReceived(ack_content.len()).into());
@@ -70,15 +86,20 @@ impl AcknowledgementListener {
.unbounded_send(Action::new_remove(frag_id));
}
async fn handle_ack_receiver_item(&mut self, item: Vec<Vec<u8>>) {
async fn handle_ack_receiver_item(
&mut self,
item: Vec<Vec<u8>>,
rtt_producer: Option<tokio::sync::mpsc::Sender<RttEvent>>,
) {
// realistically we would only be getting one ack at the time
for ack in item {
self.on_ack(ack).await;
self.on_ack(ack, rtt_producer.clone()).await;
}
}
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started AcknowledgementListener with graceful shutdown support");
let rtt_producer = RttAnalyzer::producer();
loop {
tokio::select! {
@@ -88,7 +109,7 @@ impl AcknowledgementListener {
break;
}
acks = self.ack_receiver.next() => match acks {
Some(acks) => self.handle_ack_receiver_item(acks).await,
Some(acks) => self.handle_ack_receiver_item(acks,rtt_producer.clone()).await,
None => {
tracing::trace!("AcknowledgementListener: Stopping since channel closed");
break;
@@ -3,6 +3,7 @@
use super::PendingAcknowledgement;
use crate::client::real_messages_control::acknowledgement_control::RetransmissionRequestSender;
use crate::client::rtt_analyzer::{RttAnalyzer, RttEvent};
use futures::channel::mpsc;
use futures::StreamExt;
use nym_nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue, QueueKey};
@@ -16,6 +17,7 @@ use tracing::*;
pub(crate) type AckActionSender = mpsc::UnboundedSender<Action>;
pub(crate) type AckActionReceiver = mpsc::UnboundedReceiver<Action>;
use std::time::{SystemTime, UNIX_EPOCH};
// The actual data being sent off as well as potential key to the delay queue
type PendingAckEntry = (Arc<PendingAcknowledgement>, Option<QueueKey>);
@@ -207,8 +209,22 @@ impl ActionController {
// note: when the entry expires it's automatically removed from pending_acks_timers
#[allow(clippy::panic)]
fn handle_expired_ack_timer(&mut self, expired_ack: Expired<FragmentIdentifier>) {
fn handle_expired_ack_timer(
&mut self,
expired_ack: Expired<FragmentIdentifier>,
rtt_producer: Option<tokio::sync::mpsc::Sender<RttEvent>>,
) {
let frag_id = expired_ack.into_inner();
if let Some(ref producer) = rtt_producer {
if let Ok(duration) = SystemTime::now().duration_since(UNIX_EPOCH) {
let now: u128 = duration.as_millis();
let _ = producer.try_send(RttEvent::FragmentAckExpired {
fragment_id: frag_id.set_id().to_string(),
timestamp: now,
});
}
}
trace!("{frag_id} has expired");
@@ -244,7 +260,7 @@ impl ActionController {
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started ActionController with graceful shutdown support");
let rtt_producer = RttAnalyzer::producer();
loop {
tokio::select! {
biased;
@@ -262,7 +278,7 @@ impl ActionController {
}
},
expired_ack = self.pending_acks_timers.next() => match expired_ack {
Some(expired_ack) => self.handle_expired_ack_timer(expired_ack),
Some(expired_ack) => self.handle_expired_ack_timer(expired_ack,rtt_producer.clone()),
None => {
tracing::trace!("ActionController: Stopping since ack channel closed");
break;
@@ -5,6 +5,7 @@ use crate::client::inbound_messages::{InputMessage, InputMessageReceiver};
use crate::client::real_messages_control::message_handler::MessageHandler;
use crate::client::real_messages_control::real_traffic_stream::RealMessage;
use crate::client::replies::reply_controller::ReplyControllerSender;
use crate::client::rtt_analyzer::{RttConfig, RttEvent};
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::forwarding::packet::MixPacket;
@@ -12,6 +13,7 @@ use nym_sphinx::params::PacketType;
use nym_task::connections::TransmissionLane;
use nym_task::ShutdownToken;
use rand::{CryptoRng, Rng};
use tokio::sync::mpsc::Sender;
use tracing::*;
/// Module responsible for dealing with the received messages: splitting them, creating acknowledgements,
@@ -111,7 +113,30 @@ where
warn!("failed to send a repliable message - {err}")
}
}
async fn run_rtt_test(
&mut self,
recipient: Recipient,
lane: TransmissionLane,
packet_type: PacketType,
max_retransmissions: Option<u32>,
sender: Sender<RttEvent>,
config: RttConfig,
) {
if let Err(err) = self
.message_handler
.try_run_rtt_test(
recipient,
lane,
packet_type,
max_retransmissions,
sender,
config,
)
.await
{
warn!("failed to send a repliable message - {err}")
}
}
#[allow(clippy::panic)]
async fn on_input_message(&mut self, msg: InputMessage) {
match msg {
@@ -147,6 +172,23 @@ where
)
.await
}
InputMessage::RunRTTTest {
recipient,
lane,
max_retransmissions,
sender,
config,
} => {
self.run_rtt_test(
recipient,
lane,
PacketType::Mix,
max_retransmissions,
sender,
config,
)
.await
}
InputMessage::Reply {
recipient_tag,
data,
@@ -176,6 +218,23 @@ where
)
.await
}
InputMessage::RunRTTTest {
recipient,
lane,
max_retransmissions,
sender,
config,
} => {
self.run_rtt_test(
recipient,
lane,
PacketType::Mix,
max_retransmissions,
sender,
config,
)
.await
}
InputMessage::Anonymous {
recipient,
data,
@@ -8,6 +8,7 @@ use crate::client::real_messages_control::real_traffic_stream::{
use crate::client::real_messages_control::{AckActionSender, Action};
use crate::client::replies::reply_controller::MaxRetransmissions;
use crate::client::replies::reply_storage::{ReceivedReplySurbsMap, SentReplyKeys, UsedSenderTags};
use crate::client::rtt_analyzer::{RttConfig, RttEvent, RttPattern};
use crate::client::topology_control::{TopologyAccessor, TopologyReadPermit};
use nym_client_core_surb_storage::RetrievedReplySurb;
use nym_sphinx::acknowledgements::AckKey;
@@ -27,6 +28,8 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::mpsc::Sender;
use tokio::time::sleep;
use tracing::{debug, error, info, trace, warn};
// TODO: move that error elsewhere since it seems to be contaminating different files
@@ -555,6 +558,97 @@ where
Ok(())
}
pub(crate) async fn try_split_and_send_non_reply_rtt_message(
&mut self,
sender_tag: AnonymousSenderTag,
recipient: Recipient,
lane: TransmissionLane,
packet_type: PacketType,
topology: &NymRouteProvider,
max_retransmissions: Option<u32>,
route_index: usize,
sender: Sender<RttEvent>,
) -> Result<(), PreparationError> {
debug!("Sending RTT test message on route index {route_index} with packet type {packet_type:?}");
// Construct the base message
let message = NymMessage::new_repliable(RepliableMessage::new_data(
self.config.use_legacy_sphinx_format,
Vec::new(),
sender_tag,
Vec::new(),
));
debug_assert!(!matches!(message, NymMessage::Reply(_)));
let packet_size = if packet_type == PacketType::Outfox {
PacketSize::OutfoxRegularPacket
} else {
self.optimal_packet_size(&message)
};
trace!("Using packet size {packet_size:?}");
// ✅ Drop the read lock before mutably borrowing self
// Prepare fragments from message
let fragments = self
.message_preparer
.pad_and_split_message(message, packet_size);
if fragments.len() > 1 {
println!(
"[RTT TEST] Warning: message was split into {} fragments",
fragments.len()
);
}
let mut pending_acks = Vec::with_capacity(fragments.len());
let mut real_messages = Vec::with_capacity(fragments.len());
for fragment in &fragments {
let prepared_fragment = self
.message_preparer
.prepare_chunk_for_sending_with_deterministic_route(
fragment.clone(),
&topology,
&self.config.ack_key,
&recipient,
packet_type,
route_index,
)?;
let _ = sender.try_send(RttEvent::RouteUsed {
route_index,
fragment_id: (fragment.fragment_identifier().set_id().to_string()),
});
let real_message = RealMessage::new(
prepared_fragment.mix_packet,
Some(fragment.fragment_identifier().clone()),
);
let pending_ack = PendingAcknowledgement::new_known(
fragment.clone(),
prepared_fragment.total_delay,
recipient,
max_retransmissions,
);
real_messages.push(real_message);
pending_acks.push(pending_ack);
}
// Record ACKs and forward messages for *this route only*
self.insert_pending_acks(pending_acks);
self.forward_messages(real_messages, lane).await;
// // Optional: small delay to avoid flooding
// sleep(Duration::from_millis(200)).await;
Ok(())
}
pub(crate) async fn try_send_additional_reply_surbs(
&mut self,
recipient: Recipient,
@@ -633,6 +727,142 @@ where
Ok(())
}
// Helper: sends ONE RTT packet on ONE specific route.
// Rust requires this to be a standalone async function (not an async closure),
// because async closures cannot borrow local variables safely.
async fn send_packet_on_route(
&mut self,
recipient: &Recipient,
num_reply_surbs: u32,
lane: TransmissionLane,
packet_type: PacketType,
topology: &NymRouteProvider,
max_retransmissions: Option<u32>,
route_index: usize,
sender: &Sender<RttEvent>,
) -> Result<(), SurbWrappedPreparationError> {
let sender_tag = self.get_or_create_sender_tag(recipient);
// Prepare reply SURBs
let reply_surbs = self.generate_reply_surbs(num_reply_surbs as usize).await?;
let reply_keys = reply_surbs
.iter()
.map(|s| *s.encryption_key())
.collect::<Vec<_>>();
// Send message on the given route
self.try_split_and_send_non_reply_rtt_message(
sender_tag,
recipient.clone(),
lane,
packet_type,
topology,
max_retransmissions,
route_index,
sender.clone(),
)
.await?;
// Store reply keys after sending
self.reply_key_storage.insert_multiple(reply_keys);
Ok(())
}
pub(crate) async fn try_run_rtt_test(
&mut self,
recipient: Recipient,
lane: TransmissionLane,
packet_type: PacketType,
max_retransmissions: Option<u32>,
sender: Sender<RttEvent>,
config: RttConfig,
) -> Result<(), SurbWrappedPreparationError> {
debug!("Starting RTT test using pattern {:?}", config.pattern);
// Load topology
let topology_permit = self.topology_access.get_read_permit().await;
let mut topology = self.get_topology(&topology_permit)?.clone();
let route_strings = topology
.topology
.initialize_static_mixnodes_for_rtt_testing()?;
let total_routes = topology.topology.all_mix_routes.len();
drop(topology_permit);
// =====================================================
// SEND ROUTE STRINGS TO RTT ANALYZER USING try_send()
// =====================================================
for (route_index, nodes_string) in route_strings {
let _ = sender.try_send(RttEvent::RouteNodes {
route_index,
nodes: nodes_string,
});
}
sender
.send(RttEvent::ExperimentConfiguration {
total_routes: total_routes as usize,
per_route_sent: config.packets_per_route as usize,
})
.await
.unwrap();
// ==============================================================
// PATTERN: BURST
// Send packets_per_route packets on each route sequentially
// ==============================================================
if let RttPattern::Burst = config.pattern {
for route in 0..total_routes {
for _ in 0..config.packets_per_route {
self.send_packet_on_route(
&recipient,
0,
lane,
packet_type,
&topology,
max_retransmissions,
route,
&sender,
)
.await?;
// Optional delay between packets on the same route
if config.inter_route_delay_ms > 0 {
tokio::time::sleep(Duration::from_millis(config.inter_route_delay_ms))
.await;
}
}
}
return Ok(());
}
// ==============================================================
// PATTERN: ROUND ROBIN
// Send packets in cycles: 1 on route 0, 1 on route 1, ..., repeat
// ==============================================================
if let RttPattern::RoundRobin = config.pattern {
for _cycle in 0..config.packets_per_route {
for route in 0..total_routes {
self.send_packet_on_route(
&recipient,
0,
lane,
packet_type,
&topology,
max_retransmissions,
route,
&sender,
)
.await?;
if config.inter_route_delay_ms > 0 {
tokio::time::sleep(Duration::from_millis(config.inter_route_delay_ms))
.await;
}
}
}
return Ok(());
}
Ok(())
}
pub(crate) async fn try_prepare_single_chunk_for_sending(
&mut self,
@@ -4,6 +4,7 @@
use self::sending_delay_controller::SendingDelayController;
use crate::client::mix_traffic::BatchMixMessageSender;
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
use crate::client::rtt_analyzer::{RttAnalyzer, RttEvent};
use crate::client::topology_control::TopologyAccessor;
use crate::client::transmission_buffer::TransmissionBuffer;
use crate::config;
@@ -21,6 +22,7 @@ use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, C
use nym_task::connections::{
ConnectionCommand, ConnectionCommandReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
};
use nym_task::ShutdownToken;
use rand::{CryptoRng, Rng};
use std::pin::Pin;
@@ -224,7 +226,11 @@ where
}
}
async fn on_message(&mut self, next_message: StreamMessage) {
async fn on_message(
&mut self,
next_message: StreamMessage,
rtt_producer: Option<tokio::sync::mpsc::Sender<RttEvent>>,
) {
trace!("created new message");
let (next_message, fragment_id, packet_size) = match next_message {
@@ -271,6 +277,21 @@ where
)
}
StreamMessage::Real(real_message) => {
if let Some(ref producer) = rtt_producer {
if let Some(fragment_id_local) = real_message.fragment_id {
use std::time::{SystemTime, UNIX_EPOCH};
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
let _ = producer.try_send(RttEvent::FragmentSent {
fragment_id: (fragment_id_local.set_id().to_string()),
timestamp: (now),
});
}
}
let packet_size = real_message.packet_size();
(
real_message.mix_packet,
@@ -584,7 +605,7 @@ where
pub(crate) async fn run(&mut self) {
debug!("Started OutQueueControl with graceful shutdown support");
let rtt_producer = RttAnalyzer::producer();
// avoid borrow on self
let shutdown_token = self.shutdown_token.clone();
#[cfg(not(target_arch = "wasm32"))]
@@ -602,7 +623,7 @@ where
self.log_status();
}
next_message = self.next() => if let Some(next_message) = next_message {
self.on_message(next_message).await;
self.on_message(next_message,rtt_producer.clone()).await;
} else {
tracing::trace!("OutQueueControl: Stopping since channel closed");
break;
@@ -5,6 +5,7 @@ use crate::client::helpers::get_time_now;
use crate::client::replies::{
reply_controller::ReplyControllerSender, reply_storage::SentReplyKeys,
};
use crate::client::rtt_analyzer::{RttAnalyzer, RttEvent};
use futures::channel::mpsc;
use futures::lock::Mutex;
use futures::StreamExt;
@@ -55,6 +56,7 @@ struct ReceivedMessagesBufferInner<R: MessageReceiver> {
// Periodically check for stale buffers to clean up
last_stale_check: crate::client::helpers::Instant,
rtt_producer: Option<tokio::sync::mpsc::Sender<RttEvent>>,
}
impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
@@ -81,7 +83,20 @@ impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
warn!("failed to recover fragment from raw data: {err}. The whole underlying message might be corrupted and unrecoverable!");
return None;
}
Ok(frag) => frag,
Ok(frag) => {
if let Some(ref producer) = self.rtt_producer {
use std::time::{SystemTime, UNIX_EPOCH};
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
let _ = producer.try_send(RttEvent::FragmentReceived {
fragment_id: frag.id().to_string(),
timestamp: now,
});
}
frag
}
};
if self.recently_reconstructed.contains(&fragment.id()) {
@@ -181,6 +196,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
reply_controller_sender: ReplyControllerSender,
stats_tx: ClientStatsSender,
shutdown_token: ShutdownToken,
rtt_producer: Option<tokio::sync::mpsc::Sender<RttEvent>>,
) -> Self {
ReceivedMessagesBuffer {
inner: Arc::new(Mutex::new(ReceivedMessagesBufferInner {
@@ -191,6 +207,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
recently_reconstructed: HashSet::new(),
stats_tx,
last_stale_check: get_time_now(),
rtt_producer: rtt_producer.clone(),
})),
reply_key_storage,
reply_controller_sender,
@@ -585,6 +602,7 @@ impl<R: MessageReceiver + Clone + Send + 'static> ReceivedMessagesBufferControll
reply_controller_sender,
metrics_reporter,
shutdown_token.clone(),
RttAnalyzer::producer(),
);
ReceivedMessagesBufferController {
@@ -0,0 +1,514 @@
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufWriter, Write};
use once_cell::sync::OnceCell;
use std::process::Command;
use std::sync::Mutex;
use tokio::sync::mpsc::{self, Sender};
use tokio::task::JoinHandle;
#[derive(Debug, Clone)]
pub enum RttPattern {
Burst,
RoundRobin,
}
#[derive(Debug, Clone)]
pub struct RttConfig {
pub packets_per_route: u32,
pub pattern: RttPattern,
pub inter_route_delay_ms: u64,
}
#[derive(Debug, Clone)]
pub enum RttEvent {
RouteUsed {
route_index: usize,
fragment_id: String,
},
FragmentSent {
fragment_id: String,
timestamp: u128,
},
FragmentAckReceived {
fragment_id: String,
timestamp: u128,
},
FragmentAckExpired {
fragment_id: String,
timestamp: u128,
},
FragmentReceived {
fragment_id: String,
timestamp: u128,
},
RouteNodes {
route_index: usize,
nodes: String,
},
ExperimentConfiguration {
total_routes: usize,
per_route_sent: usize,
},
PrintRouteDetail {
route_index: usize,
},
PrintRouteStatsByNodes {
nodes: String,
},
PrintRoutesWithAvgAbove {
threshold_ms: u128,
},
PrintRoutesWithAnyAbove {
threshold_ms: u128,
},
PrintStats,
WriteStats {
path: String,
},
WriteStatsAndPlot {
path: String,
outlier_mode: String, // "all" or cutoff() seconds (e.g. "1.0"))
},
PrintExperimentProgress,
}
pub struct StoredRouteSummary {
pub total_routes: usize,
pub per_route_sent: usize,
}
static PRODUCER: OnceCell<Mutex<Option<Sender<RttEvent>>>> = OnceCell::new();
#[derive(Default, Debug)]
pub struct RouteStats {
pub sent: u32,
pub acks: u32,
pub timeouts: u32,
pub rtts: Vec<u128>,
}
pub struct RttAnalyzer {
/// fragment_id → (route, Vec<sent_times>)
fragments: HashMap<String, (usize, Vec<u128>)>,
/// fragment_id → Vec<recv_times>
receive_times: HashMap<String, Vec<u128>>,
/// fragment_id → last ack
ack_times: HashMap<String, u128>,
route_stats: HashMap<usize, RouteStats>,
route_summary: Option<StoredRouteSummary>,
route_nodes: HashMap<usize, String>,
consumer_handle: JoinHandle<()>,
}
impl RttAnalyzer {
pub fn consumer_handle(&self) -> &JoinHandle<()> {
&self.consumer_handle
}
pub fn new() -> Self {
let (tx, mut rx) = mpsc::channel(80000);
PRODUCER
.set(Mutex::new(Some(tx.clone())))
.expect("PRODUCER already initialized");
let handle = tokio::spawn(async move {
let mut analyzer = RttAnalyzer {
fragments: HashMap::new(),
receive_times: HashMap::new(),
ack_times: HashMap::new(),
route_stats: HashMap::new(),
route_summary: None,
route_nodes: HashMap::new(),
consumer_handle: tokio::spawn(async {}),
};
while let Some(event) = rx.recv().await {
analyzer.process(event);
}
println!("RTT Analyzer consumer exited");
});
Self {
fragments: HashMap::new(),
receive_times: HashMap::new(),
ack_times: HashMap::new(),
route_stats: HashMap::new(),
route_summary: None,
route_nodes: HashMap::new(),
consumer_handle: handle,
}
}
pub fn producer() -> Option<Sender<RttEvent>> {
let lock = PRODUCER.get()?.lock().unwrap();
lock.clone()
}
pub fn process(&mut self, event: RttEvent) {
match event {
// -------------------------
// FIRST USE OF A FRAGMENT
// -------------------------
RttEvent::RouteUsed {
route_index,
fragment_id,
} => {
self.fragments
.insert(fragment_id.clone(), (route_index, Vec::new()));
}
// -------------------------
// RETRANSMISSION → append new sent time
// -------------------------
RttEvent::FragmentSent {
fragment_id,
timestamp,
} => {
if let Some((_route, sent_list)) = self.fragments.get_mut(&fragment_id) {
sent_list.push(timestamp);
self.route_stats.entry(*_route).or_default().sent += 1;
}
}
// -------------------------
// ACK RECEIVED
// -------------------------
RttEvent::FragmentAckReceived {
fragment_id,
timestamp,
} => {
if let Some((route, _)) = self.fragments.get(&fragment_id) {
let stats = self.route_stats.entry(*route).or_default();
stats.acks += 1;
}
self.ack_times.insert(fragment_id, timestamp);
}
// -------------------------
// ACK TIMEOUT
// -------------------------
RttEvent::FragmentAckExpired { fragment_id, .. } => {
if let Some((route, _)) = self.fragments.get(&fragment_id) {
self.route_stats.entry(*route).or_default().timeouts += 1;
}
}
RttEvent::WriteStatsAndPlot { path, outlier_mode } => {
// 1) write the csv
if let Err(e) = self.write_csv(&path) {
eprintln!("Failed to write CSV: {}", e);
return;
}
// 2) Call the Python script
if let Err(e) = Self::run_histogram_script(&path, &outlier_mode) {
eprintln!("Failed to run histogram script: {}", e);
}
}
// -------------------------
// PACKET RECEIVED → compute RTTs
// -------------------------
RttEvent::FragmentReceived {
fragment_id,
timestamp,
} => {
// Append receive time
let recv_list = self.receive_times.entry(fragment_id.clone()).or_default();
recv_list.push(timestamp);
// Lookup route + sent times
if let Some((route, sent_list)) = self.fragments.get(&fragment_id) {
let recv_list = self.receive_times.get(&fragment_id).unwrap();
// Index of the *newly added* receive time
let idx = recv_list.len() - 1;
/*
Maybe we can put a retransmission flag and not counting the new RTTs and only the basic N?
*/
//println!("Fragment id: {} Sent list length: {} Recv list length:{}",fragment_id,sent_list.len(),recv_list.len());
// Check if we have a matching sent timestamp
if idx < sent_list.len() {
let sent_ts = sent_list[idx];
let rtt = recv_list[idx] - sent_ts;
self.route_stats.entry(*route).or_default().rtts.push(rtt);
}
}
}
RttEvent::RouteNodes { route_index, nodes } => {
self.route_nodes.insert(route_index, nodes);
}
RttEvent::PrintStats => self.print_stats(),
RttEvent::WriteStats { path } => {
if let Err(e) = self.write_csv(&path) {
eprintln!("Failed to write CSV: {}", e)
}
}
RttEvent::ExperimentConfiguration {
total_routes,
per_route_sent,
} => {
self.route_summary = Some(StoredRouteSummary {
total_routes,
per_route_sent,
});
}
RttEvent::PrintExperimentProgress => {
self.print_experiment_progress();
}
RttEvent::PrintRouteDetail { route_index } => {
self.print_route_detail(route_index);
}
RttEvent::PrintRoutesWithAvgAbove { threshold_ms } => {
self.print_routes_with_avg_above(threshold_ms);
}
RttEvent::PrintRoutesWithAnyAbove { threshold_ms } => {
self.print_routes_with_any_above(threshold_ms);
}
RttEvent::PrintRouteStatsByNodes { nodes } => {
self.print_route_by_nodes(nodes);
}
}
}
// ---------------------- PRINT FUNCTIONS (unchanged) ----------------------
pub fn print_stats(&self) {
println!("\n================ Route RTT Statistics ================");
for (route, stats) in self.route_stats.iter() {
let avg_rtt = if !stats.rtts.is_empty() {
stats.rtts.iter().sum::<u128>() as f64 / stats.rtts.len() as f64
} else {
0.0
};
println!(
"Route {:5} | Sent {:4} | ACKs {:4} | Timeouts {:4} | Avg RTT {:8.2}",
route, stats.sent, stats.acks, stats.timeouts, avg_rtt
);
}
println!("======================================================\n");
}
pub fn write_csv(&self, path: &str) -> std::io::Result<()> {
let mut writer = BufWriter::new(File::create(path)?);
writer.write_all(b"route,sent,acks,timeouts,avg_rtt\n")?;
for (route, stats) in &self.route_stats {
let avg_rtt = if !stats.rtts.is_empty() {
stats.rtts.iter().sum::<u128>() as f64 / stats.rtts.len() as f64
} else {
0.0
};
writer.write_all(
format!(
"{},{},{},{},{:.2}\n",
route, stats.sent, stats.acks, stats.timeouts, avg_rtt
)
.as_bytes(),
)?;
}
Ok(())
}
pub fn print_route_detail(&self, route_index: usize) {
println!(
"\n================ Route #{} Details ================\n",
route_index
);
if let Some(nodes) = self.route_nodes.get(&route_index) {
println!(" Route Nodes:");
for (i, node) in nodes.split(" > ").enumerate() {
println!(" • Node {}: {}", i + 1, node);
}
}
if let Some(stats) = self.route_stats.get(&route_index) {
println!("\n RTT Values:");
for (i, rtt) in stats.rtts.iter().enumerate() {
println!(" [{:3}] {} ms", i, rtt);
}
}
println!("======================================================\n");
}
fn run_histogram_script(csv_path: &str, outlier_mode: &str) -> std::io::Result<()> {
let status = Command::new("python")
.arg("rtt_histogram.py") // path του script
.arg(csv_path)
.arg(outlier_mode)
.status()?;
if !status.success() {
eprintln!("Python histogram script exited with status: {}", status);
}
Ok(())
}
pub fn print_routes_with_avg_above(&self, threshold_ms: u128) {
println!(
"\n======= Routes with AVG RTT > {} ms =======\n",
threshold_ms
);
let mut matches: Vec<usize> = self
.route_stats
.iter()
.filter_map(|(route, stats)| {
if stats.rtts.is_empty() {
return None;
}
let avg = stats.rtts.iter().sum::<u128>() as f64 / stats.rtts.len() as f64;
if (avg as u128) > threshold_ms {
Some(*route)
} else {
None
}
})
.collect();
matches.sort();
for route in matches {
self.print_route_detail(route);
}
println!("====================================================\n");
}
/// Prints overall experiment completion percentage.
///
/// It uses:
/// - self.route_summary.total_routes
/// - self.route_summary.per_route_sent
/// to compute how many packets were planned in total.
///
/// Then it sums, over all routes:
/// - how many packets were actually sent (RouteStats.sent)
/// - how many packets have a completed RTT sample (RouteStats.rtts.len())
///
/// Finally it prints:
/// - total expected packets
/// - total sent packets and percentage
/// - total completed RTT packets and percentage
pub fn print_experiment_progress(&self) {
println!("\n=========== RTT Experiment Progress ===========");
// Check if experiment configuration is available
let summary = match &self.route_summary {
Some(s) => s,
None => {
println!("No experiment configuration stored (route_summary is None).");
println!("You must send an ExperimentConfiguration event first.");
println!("==============================================\n");
return;
}
};
let total_routes = summary.total_routes;
let per_route_sent = summary.per_route_sent;
// Total number of packets that were planned for the whole experiment
let expected_total: usize = total_routes.saturating_mul(per_route_sent);
if expected_total == 0 {
println!("Experiment configuration has zero expected packets.");
println!("==============================================\n");
return;
}
// Sum how many packets were actually sent and how many have a measured RTT
let mut sent_total: usize = 0;
let mut received_total: usize = 0;
for (_route_idx, stats) in &self.route_stats {
// 'sent' counts how many times we called FragmentSent for this route
sent_total += std::cmp::min(stats.sent, per_route_sent as u32) as usize;
// Each RTT entry corresponds to one packet for which we have both send and receive time
let route_recv = std::cmp::min(stats.rtts.len(), per_route_sent);
received_total += route_recv;
}
let sent_pct = (sent_total as f64 / expected_total as f64) * 100.0;
let recv_pct = (received_total as f64 / expected_total as f64) * 100.0;
println!("Total routes configured : {}", total_routes);
println!("Packets per route (planned) : {}", per_route_sent);
println!("Total expected packets : {}", expected_total);
println!("---------------------------------------------");
println!(
"Total sent packets : {} ({:.2}%)",
sent_total, sent_pct
);
println!(
"Total completed RTT packets : {} ({:.2}%)",
received_total, recv_pct
);
println!("==============================================\n");
}
pub fn print_routes_with_any_above(&self, threshold_ms: u128) {
println!(
"\n======= Routes with ANY RTT > {} ms =======\n",
threshold_ms
);
let mut matches: Vec<usize> = self
.route_stats
.iter()
.filter_map(|(route, stats)| {
if stats.rtts.iter().any(|&x| x > threshold_ms) {
Some(*route)
} else {
None
}
})
.collect();
matches.sort();
for route in matches {
self.print_route_detail(route);
}
println!("====================================================\n");
}
pub fn print_route_by_nodes(&self, nodes: String) {
println!("\n========== Searching route by nodes ==========\n");
let mut routes: Vec<(usize, &String)> =
self.route_nodes.iter().map(|(k, v)| (*k, v)).collect();
routes.sort_by_key(|(idx, _)| *idx);
for (route, stored) in routes {
if *stored == nodes {
println!("Found route {}!", route);
self.print_route_detail(route);
return;
}
}
println!("No route found with nodes: {}", nodes);
println!("=============================================\n");
}
}
+153
View File
@@ -89,6 +89,29 @@ pub trait FragmentPreparer {
)
}
fn generate_surb_ack_with_0_delays(
&mut self,
recipient: &Recipient,
fragment_id: FragmentIdentifier,
topology: &NymRouteProvider,
ack_key: &AckKey,
packet_type: PacketType,
) -> Result<SurbAck, NymTopologyError> {
let use_legacy_sphinx_format = self.use_legacy_sphinx_format();
let disable_mix_hops = self.mix_hops_disabled();
SurbAck::construct(
self.rng(),
use_legacy_sphinx_format,
recipient,
ack_key,
fragment_id.to_bytes(),
Duration::ZERO,
topology,
packet_type,
disable_mix_hops,
)
}
/// The procedure is as follows:
/// For each fragment:
/// - compute SURB_ACK
@@ -288,6 +311,114 @@ pub trait FragmentPreparer {
})
}
#[allow(clippy::too_many_arguments)]
fn prepare_chunk_with_deterministic_route_for_sending_and_rtt_test(
&mut self,
fragment: Fragment,
topology: &NymRouteProvider, // needs to be mutable because it may auto-generate routes
ack_key: &AckKey,
packet_sender: &Recipient,
packet_recipient: &Recipient,
packet_type: PacketType,
route_index: usize, // NEW ARGUMENT: select which route to use
) -> Result<PreparedFragment, NymTopologyError> {
debug!(
"Preparing chunk for sending (deterministic route index = {})",
route_index
);
let destination = packet_recipient.gateway();
monitoring::fragment_sent(&fragment, self.nonce(), destination);
let non_reply_overhead = x25519::PUBLIC_KEY_SIZE;
let expected_plaintext = match packet_type {
PacketType::Outfox => {
fragment.serialized_size() + OUTFOX_ACK_OVERHEAD + non_reply_overhead
}
_ => fragment.serialized_size() + ACK_OVERHEAD + non_reply_overhead,
};
let packet_size = PacketSize::get_type_from_plaintext(expected_plaintext, packet_type)
.expect("the message has been incorrectly fragmented");
let rotation_id = topology.current_key_rotation();
let sphinx_key_rotation = SphinxKeyRotation::from(rotation_id);
let fragment_identifier = fragment.fragment_identifier();
// create an ack
let surb_ack = self.generate_surb_ack_with_0_delays(
packet_sender,
fragment_identifier,
topology,
ack_key,
packet_type,
)?;
let ack_delay = surb_ack.expected_total_delay();
// build the payload
let packet_payload = NymPayloadBuilder::new(fragment, surb_ack)
.build_regular(self.rng(), packet_recipient.encryption_key())
.map_err(|_| NymTopologyError::PayloadBuilder)?;
// Get the deterministic route by index
trace!("Selecting deterministic route index {}", route_index);
let route = topology.deterministic_route_to_egress(route_index, destination)?;
let destination = packet_recipient.as_sphinx_destination();
// No artificial delay for RTT test
let delays = nym_sphinx_routing::generate_hop_delays(Duration::ZERO, route.len());
// build the actual Sphinx packet
let packet = match packet_type {
PacketType::Outfox => NymPacket::outfox_build(
packet_payload,
route.as_slice(),
&destination,
Some(packet_size.plaintext_size()),
)?,
PacketType::Mix => NymPacket::sphinx_build(
self.use_legacy_sphinx_format(),
packet_size.payload_size(),
packet_payload,
&route,
&destination,
&delays,
)?,
};
let first_hop_address =
NymNodeRoutingAddress::try_from(route.first().unwrap().address).unwrap();
Ok(PreparedFragment {
total_delay: delays.iter().take(delays.len() - 1).sum::<Delay>() + ack_delay,
mix_packet: MixPacket::new(first_hop_address, packet, packet_type, sphinx_key_rotation),
fragment_identifier,
})
}
fn pad_and_split_message(
&mut self,
message: NymMessage,
@@ -442,6 +573,28 @@ where
)
}
pub fn prepare_chunk_for_sending_with_deterministic_route(
&mut self,
fragment: Fragment,
topology: &NymRouteProvider,
ack_key: &AckKey,
packet_recipient: &Recipient,
packet_type: PacketType,
route_index: usize,
) -> Result<PreparedFragment, NymTopologyError> {
let sender = self.sender_address;
<Self as FragmentPreparer>::prepare_chunk_with_deterministic_route_for_sending_and_rtt_test(
self,
fragment,
topology,
ack_key,
&sender,
packet_recipient,
packet_type,
route_index,
)
}
/// Construct an acknowledgement SURB for the given [`FragmentIdentifier`]
pub fn generate_surb_ack(
&mut self,
+87 -2
View File
@@ -21,7 +21,6 @@ pub use error::NymTopologyError;
pub use nym_mixnet_contract_common::nym_node::Role;
pub use nym_mixnet_contract_common::{EpochRewardedSet, NodeId};
pub use rewarded_set::CachedEpochRewardedSet;
pub mod error;
pub mod node;
pub mod rewarded_set;
@@ -135,7 +134,8 @@ pub struct NymTopology {
// while this is not ideal, use empty values as default to not break backwards compatibility
#[serde(default)]
metadata: NymTopologyMetadata,
#[serde(default)]
pub all_mix_routes: Vec<Vec<RoutingNode>>,
// for the purposes of future VRF, everyone will need the same view of the network, regardless of performance filtering
// so we use the same 'master' rewarded set information for that
//
@@ -231,6 +231,30 @@ impl NymRouteProvider {
.random_route_to_egress(rng, egress_identity, self.ignore_egress_epoch_roles)
}
/// Selects a deterministic route to the egress, using the i-th precomputed mixnode route.
/// This requires that [`generate_all_mix_routes()`] has already been called.
pub fn deterministic_route_to_egress(
&self,
route_index: usize,
egress_identity: NodeIdentity,
) -> Result<Vec<SphinxNode>, NymTopologyError> {
if self.topology.all_mix_routes.is_empty() {
return Err(NymTopologyError::EmptyNetworkTopology);
}
let Some(existing_route) = self.topology.all_mix_routes.get(route_index) else {
return Err(NymTopologyError::NoMixnodesAvailable);
};
let mut route: Vec<SphinxNode> = existing_route.iter().map(Into::into).collect();
// add egress node
let egress = self
.topology
.egress_node_by_identity(egress_identity, self.ignore_egress_epoch_roles)?;
route.push(egress);
Ok(route)
}
/// Returns a route directly to the egress point, which can be any known node
pub fn empty_route_to_egress(
&self,
@@ -262,6 +286,7 @@ impl NymTopology {
metadata: NymTopologyMetadata::default(),
rewarded_set: rewarded_set.into(),
node_details: Default::default(),
all_mix_routes: Vec::new(),
}
}
@@ -272,6 +297,7 @@ impl NymTopology {
) -> Self {
NymTopology {
metadata,
all_mix_routes: Vec::new(),
rewarded_set: rewarded_set.into(),
node_details: node_details.into_iter().map(|n| (n.node_id, n)).collect(),
}
@@ -531,6 +557,65 @@ impl NymTopology {
Ok(mix_route)
}
pub fn initialize_static_mixnodes_for_rtt_testing(
&mut self,
) -> Result<Vec<(usize, String)>, NymTopologyError> {
if self.rewarded_set.is_empty() || self.node_details.is_empty() {
return Err(NymTopologyError::EmptyNetworkTopology);
}
// Collect nodes for each layer
let layer1_nodes: Vec<&RoutingNode> = self
.rewarded_set
.layer1
.iter()
.filter_map(|id| self.node_details.get(id))
.collect();
let layer2_nodes: Vec<&RoutingNode> = self
.rewarded_set
.layer2
.iter()
.filter_map(|id| self.node_details.get(id))
.collect();
let layer3_nodes: Vec<&RoutingNode> = self
.rewarded_set
.layer3
.iter()
.filter_map(|id| self.node_details.get(id))
.collect();
// Reset routes
self.all_mix_routes.clear();
// Build mix routes
for n1 in layer1_nodes.clone() {
for n2 in layer2_nodes.clone() {
for n3 in layer3_nodes.clone() {
self.all_mix_routes
.push(vec![n1.clone(), n2.clone(), n3.clone()]);
}
}
}
// ============================================================
// RETURN route_index + string
// ============================================================
let mut results: Vec<(usize, String)> = Vec::new();
for (i, route) in self.all_mix_routes.iter().enumerate() {
let node_strings: Vec<String> = route
.iter()
.map(|node| node.identity_key.to_base58_string())
.collect();
results.push((i, node_strings.join(" > ")));
}
Ok(results)
}
pub fn random_mix_route<R>(&self, rng: &mut R) -> Result<Vec<SphinxNode>, NymTopologyError>
where
R: Rng + CryptoRng + ?Sized,
+202
View File
@@ -0,0 +1,202 @@
#!/usr/bin/env python3
"""
rtt_histogram.py
Usage:
python rtt_histogram.py <csv_path> <outlier_mode>
Examples:
python rtt_histogram.py rtt_stats.csv all
-> one histogram with ALL avg_rtt values
python rtt_histogram.py rtt_stats.csv 1.0
-> one histogram only for avg_rtt <= 1.0s (filters out values above 1 second)
python rtt_histogram.py rtt_stats.csv both:1.0
-> TWO histograms:
1) inliers: avg_rtt <= 1.0s
2) outliers: avg_rtt > 1.0s
"""
import csv
import sys
import statistics
import matplotlib.pyplot as plt
def load_rtt_values(csv_path: str) -> list[float]:
"""
Read the CSV file and return a list of RTT values in milliseconds (float).
It expects a column named 'avg_rtt' (as written by write_csv).
If 'avg_rtt' is not found, it falls back to 'rtt_ms'.
"""
values_ms: list[float] = []
with open(csv_path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
fieldnames = [name.strip() for name in (reader.fieldnames or [])]
if "avg_rtt" in fieldnames:
col = "avg_rtt"
elif "rtt_ms" in fieldnames:
col = "rtt_ms"
else:
raise RuntimeError(
f"CSV '{csv_path}' does not contain 'avg_rtt' or 'rtt_ms' column. "
f"Found columns: {fieldnames}"
)
for row in reader:
raw = row.get(col, "").strip()
if not raw:
continue
try:
values_ms.append(float(raw))
except ValueError:
# Ignore rows where the RTT column is not a valid number
continue
return values_ms
def plot_hist(values_sec, title_suffix: str, output_suffix: str | None = None):
"""
Plot a single histogram for the given RTT values (in seconds).
If output_suffix is provided, it can be used to build a filename
with plt.savefig, if you want. Right now we only show the figure.
"""
if not values_sec:
print(f"No RTT values for plot '{title_suffix}', skipping.")
return
median_val = statistics.median(values_sec)
plt.figure(figsize=(8, 6))
plt.hist(values_sec, bins=30, edgecolor="black")
plt.axvline(
median_val,
color="red",
linestyle="--",
linewidth=2,
label=f"Median: {median_val:.2f}s",
)
plt.title(f"Distribution of RTTs {title_suffix}")
plt.xlabel("RTT (seconds)")
plt.ylabel("Frequency")
plt.legend()
plt.tight_layout()
# If you want to save instead of show, uncomment the following
# if output_suffix is not None:
# filename = f"rtt_hist_{output_suffix}.png"
# plt.savefig(filename)
# print(f"Saved plot to {filename}")
# else:
# plt.show()
def main():
if len(sys.argv) != 3:
print("Usage: python rtt_histogram.py <csv_path> <outlier_mode>", file=sys.stderr)
print(" outlier_mode:", file=sys.stderr)
print(" 'all' -> one plot with all RTTs", file=sys.stderr)
print(" '<cutoff>' -> one plot with RTT <= cutoff seconds", file=sys.stderr)
print(" 'both:<cutoff>' -> two plots: inliers & outliers around cutoff", file=sys.stderr)
sys.exit(1)
csv_path = sys.argv[1]
outlier_mode = sys.argv[2].strip().lower()
# 1) Load avg_rtt values (ms)
try:
rtt_ms = load_rtt_values(csv_path)
except Exception as e:
print(f"Error reading CSV: {e}", file=sys.stderr)
sys.exit(1)
if not rtt_ms:
print("No RTT values found in CSV nothing to plot.")
sys.exit(0)
# 2) Convert to seconds
rtt_sec = [v / 1000.0 for v in rtt_ms]
# --------------------------------------------------------------------
# MODE 1: both:<cutoff> → generate TWO plots (inliers & outliers)
# --------------------------------------------------------------------
if outlier_mode.startswith("both:"):
cutoff_str = outlier_mode.split(":", 1)[1]
try:
cutoff = float(cutoff_str)
except ValueError:
print(
f"Invalid outlier_mode '{outlier_mode}'. "
f"Expected format 'both:<numeric_cutoff>', e.g. 'both:1.0'.",
file=sys.stderr,
)
sys.exit(1)
inliers = [v for v in rtt_sec if v <= cutoff]
outliers = [v for v in rtt_sec if v > cutoff]
if not inliers and not outliers:
print("No RTT values found for inliers or outliers nothing to plot.")
sys.exit(0)
print(f"Total RTT samples: {len(rtt_sec)}")
print(f"Inliers (<= {cutoff:.3f}s): {len(inliers)}")
print(f"Outliers (> {cutoff:.3f}s): {len(outliers)}")
# Plot inliers
plot_hist(inliers, title_suffix=f"(inliers ≤ {cutoff:.2f}s)", output_suffix="inliers")
# Plot outliers
plot_hist(outliers, title_suffix=f"(outliers > {cutoff:.2f}s)", output_suffix="outliers")
# Show all open figures
plt.show()
sys.exit(0)
# --------------------------------------------------------------------
# MODE 2: all → single plot with all RTT values
# --------------------------------------------------------------------
if outlier_mode == "all":
if not rtt_sec:
print("No RTT values to plot.")
sys.exit(0)
plot_hist(rtt_sec, title_suffix="(all samples)", output_suffix=None)
plt.show()
sys.exit(0)
# --------------------------------------------------------------------
# MODE 3: numeric cutoff → single plot with inliers only
# --------------------------------------------------------------------
try:
cutoff = float(outlier_mode)
except ValueError:
print(
f"Invalid outlier_mode '{outlier_mode}'. "
f"Use 'all', a numeric cutoff (e.g. '1.0'), or 'both:<cutoff>'.",
file=sys.stderr,
)
sys.exit(1)
filtered = [v for v in rtt_sec if v <= cutoff]
if not filtered:
print(
f"After filtering with cutoff={cutoff:.2f}s, no RTT values remain nothing to plot."
)
sys.exit(0)
print(f"Total RTT samples: {len(rtt_sec)}")
print(f"Filtered inliers (<= {cutoff:.3f}s): {len(filtered)}")
plot_hist(filtered, title_suffix=f"(≤ {cutoff:.2f}s)", output_suffix=None)
plt.show()
if __name__ == "__main__":
main()
+30
View File
@@ -5,8 +5,10 @@ use crate::mixnet::{AnonymousSenderTag, IncludedSurbs, Recipient};
use crate::Result;
use async_trait::async_trait;
use nym_client_core::client::inbound_messages::InputMessage;
use nym_client_core::client::rtt_analyzer::{RttConfig, RttEvent};
use nym_sphinx::params::PacketType;
use nym_task::connections::TransmissionLane;
use tokio::sync::mpsc::Sender;
// defined to guarantee common interface regardless of whether you're using the full client
// or just the sending handler
@@ -87,6 +89,34 @@ pub trait MixnetMessageSender {
};
self.send(input_msg).await
}
/// Sends a RunRTTTest message to the supplied Nym address.
///
/// This is a special message used for measuring per-route RTT.
/// It will instruct the client to run a test that sends one message
/// per available route and logs the time of each send.
async fn send_rtt_test(
&self,
address: Recipient,
max_retransmissions: Option<u32>,
sender: Sender<RttEvent>,
config: RttConfig,
) -> Result<()>
where {
let lane = TransmissionLane::General;
//Is there a way to find my address from here?
// Construct a RunRTTTest message
let input_msg = InputMessage::RunRTTTest {
recipient: address,
lane,
max_retransmissions,
sender,
config,
};
println!("[RTT TEST DEBUG] Sending RTT test message to {})", address,);
// Send it for processing
self.send(input_msg).await
}
/// Sends reply data to the supplied anonymous recipient.
///