Compare commits
1 Commits
develop
...
rtt-analyzer
| Author | SHA1 | Date | |
|---|---|---|---|
| 2c717c0ebd |
Generated
+17
@@ -5204,6 +5204,7 @@ dependencies = [
|
||||
"nym-task",
|
||||
"nym-topology",
|
||||
"nym-validator-client",
|
||||
"once_cell",
|
||||
"rand 0.8.5",
|
||||
"rand_chacha 0.3.1",
|
||||
"serde",
|
||||
@@ -5279,6 +5280,22 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nym-client-rtt-tester"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"nym-client-core",
|
||||
"nym-config",
|
||||
"nym-crypto",
|
||||
"nym-network-defaults",
|
||||
"nym-sdk",
|
||||
"nym-sphinx",
|
||||
"nym-task",
|
||||
"nym-topology",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nym-client-wasm"
|
||||
version = "1.4.1"
|
||||
|
||||
@@ -18,6 +18,7 @@ resolver = "2"
|
||||
members = [
|
||||
"clients/native",
|
||||
"clients/native/websocket-requests",
|
||||
"clients/rtt-tester",
|
||||
"clients/socks5",
|
||||
"common/async-file-watcher",
|
||||
"common/authenticator-requests",
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
[package]
|
||||
name = "nym-client-rtt-tester"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
description = "RTT testing client built using nym-client-core"
|
||||
license.workspace = true
|
||||
|
||||
[[bin]]
|
||||
name = "nym-client-rtt-tester"
|
||||
path = "src/main.rs"
|
||||
|
||||
[dependencies]
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
tracing = { workspace = true }
|
||||
nym-client-core = { path = "../../common/client-core" }
|
||||
nym-network-defaults = { path = "../../common/network-defaults" }
|
||||
nym-sphinx = { path = "../../common/nymsphinx" }
|
||||
nym-topology = { path = "../../common/topology" }
|
||||
nym-config = { path = "../../common/config" }
|
||||
nym-task = { path = "../../common/task" }
|
||||
nym-crypto = { path = "../../common/crypto" }
|
||||
nym-sdk = { path = "../../sdk/rust/nym-sdk" }
|
||||
@@ -0,0 +1,466 @@
|
||||
use nym_sdk::mixnet;
|
||||
use nym_sdk::mixnet::MixnetMessageSender;
|
||||
|
||||
use nym_client_core::client::rtt_analyzer::{RttAnalyzer, RttConfig, RttEvent, RttPattern};
|
||||
use nym_sdk::DebugConfig;
|
||||
use tokio::io::{self, AsyncBufReadExt};
|
||||
use tokio::time::{sleep, Duration};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
// ============================================================
|
||||
// 1. Start RTT Analyzer + background worker
|
||||
// ============================================================
|
||||
let _analyzer = RttAnalyzer::new();
|
||||
|
||||
let tx = RttAnalyzer::producer().expect("Analyzer was not initialized!");
|
||||
|
||||
// ============================================================
|
||||
// 2. Build mixnet client
|
||||
// ============================================================
|
||||
let mut debug = DebugConfig::default();
|
||||
|
||||
// Disable ALL Poisson & cover streams
|
||||
debug.traffic.disable_main_poisson_packet_distribution = true;
|
||||
debug.cover_traffic.disable_loop_cover_traffic_stream = false;
|
||||
|
||||
let client = mixnet::MixnetClientBuilder::new_ephemeral()
|
||||
.debug_config(debug)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let client = client.connect_to_mixnet().await.unwrap();
|
||||
|
||||
let our_address = client.nym_address();
|
||||
println!("Our client nym address is: {our_address}");
|
||||
|
||||
// ============================================================
|
||||
// 3. Ask the user for RTT TEST configuration
|
||||
// ============================================================
|
||||
let config = ask_user_for_rtt_config().await;
|
||||
|
||||
println!("\nStarting RTT test with:");
|
||||
println!(" packets_per_route = {}", config.packets_per_route);
|
||||
println!(" pattern = {:?}", config.pattern);
|
||||
println!(" delay (ms) = {}", config.inter_route_delay_ms);
|
||||
|
||||
// START THE TEST
|
||||
let _ = client
|
||||
.send_rtt_test(*our_address, None, tx.clone(), config)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// ============================================================
|
||||
// 4. Background listener for incoming messages
|
||||
// ============================================================
|
||||
tokio::spawn({
|
||||
let mut client = client;
|
||||
async move {
|
||||
loop {
|
||||
if client.wait_for_messages().await.is_some() {
|
||||
//I should do something here to shutdown
|
||||
}
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// ============================================================
|
||||
// 5. Main input loop
|
||||
// ============================================================
|
||||
let stdin = io::BufReader::new(io::stdin());
|
||||
let mut lines = stdin.lines();
|
||||
|
||||
println!("Type 'menu' to show RTT commands.");
|
||||
|
||||
loop {
|
||||
if let Ok(Some(input)) = lines.next_line().await {
|
||||
let input = input.trim().to_lowercase();
|
||||
|
||||
if input == "menu" {
|
||||
show_menu_and_handle_choice(&tx).await;
|
||||
}
|
||||
}
|
||||
|
||||
sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
}
|
||||
|
||||
// =====================================================================
|
||||
// ASK USER FOR RTT TEST SETTINGS AT PROGRAM START
|
||||
// =====================================================================
|
||||
async fn ask_user_for_rtt_config() -> RttConfig {
|
||||
let stdin = io::BufReader::new(io::stdin());
|
||||
let mut lines = stdin.lines();
|
||||
|
||||
println!("\n========== RTT TEST CONFIGURATION ==========");
|
||||
|
||||
// -----------------------------
|
||||
// Ask for packets per route
|
||||
// -----------------------------
|
||||
println!("Enter number of packets per route: ");
|
||||
let packets = read_u32_from_stdin(&mut lines).await;
|
||||
|
||||
// -----------------------------
|
||||
// Ask for pattern: Burst / RR
|
||||
// -----------------------------
|
||||
println!("Choose pattern:");
|
||||
println!(" 1) Burst");
|
||||
println!(" 2) Round Robin");
|
||||
let pattern = loop {
|
||||
let input = read_string(&mut lines).await;
|
||||
|
||||
match input.as_str() {
|
||||
"1" => break RttPattern::Burst,
|
||||
"2" => break RttPattern::RoundRobin,
|
||||
_ => println!("Invalid choice! Please type 1 or 2:"),
|
||||
}
|
||||
};
|
||||
|
||||
// -----------------------------
|
||||
// Ask for delay between packets
|
||||
// -----------------------------
|
||||
println!("Enter delay between packets (ms): ");
|
||||
let delay = read_u64_from_stdin(&mut lines).await;
|
||||
|
||||
// Build Config
|
||||
RttConfig {
|
||||
packets_per_route: packets,
|
||||
pattern,
|
||||
inter_route_delay_ms: delay,
|
||||
}
|
||||
}
|
||||
|
||||
// =====================================================================
|
||||
// Util functions for reading typed input
|
||||
// =====================================================================
|
||||
async fn read_string(lines: &mut tokio::io::Lines<io::BufReader<io::Stdin>>) -> String {
|
||||
loop {
|
||||
if let Ok(Some(line)) = lines.next_line().await {
|
||||
let trimmed = line.trim().to_string();
|
||||
if !trimmed.is_empty() {
|
||||
return trimmed;
|
||||
}
|
||||
}
|
||||
println!("Please type a value:");
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_u32_from_stdin(lines: &mut tokio::io::Lines<io::BufReader<io::Stdin>>) -> u32 {
|
||||
loop {
|
||||
if let Ok(Some(line)) = lines.next_line().await {
|
||||
if let Ok(num) = line.trim().parse::<u32>() {
|
||||
return num;
|
||||
}
|
||||
}
|
||||
println!("Invalid number, try again:");
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_u64_from_stdin(lines: &mut tokio::io::Lines<io::BufReader<io::Stdin>>) -> u64 {
|
||||
loop {
|
||||
if let Ok(Some(line)) = lines.next_line().await {
|
||||
if let Ok(num) = line.trim().parse::<u64>() {
|
||||
return num;
|
||||
}
|
||||
}
|
||||
println!("Invalid number, try again:");
|
||||
}
|
||||
}
|
||||
// =====================================================================
|
||||
// MENU HANDLER (FULL VERSION WITH HELP / DOCS)
|
||||
// =====================================================================
|
||||
async fn show_menu_and_handle_choice(tx: &tokio::sync::mpsc::Sender<RttEvent>) {
|
||||
println!("\n======================== RTT MENU ========================");
|
||||
println!("1) Print global RTT statistics");
|
||||
println!("2) Write statistics to CSV file");
|
||||
println!("3) Print route details by ROUTE INDEX");
|
||||
println!("4) Print route details by ROUTE NODES STRING");
|
||||
println!("5) Print routes with AVG RTT above threshold");
|
||||
println!("6) Print routes with ANY RTT above threshold");
|
||||
println!("7) Help (Show all commands & how to use them)");
|
||||
println!("8) Write CSV and generate RTT histogram(s) with Python");
|
||||
println!("9) Show overall experiment completion percentage");
|
||||
println!("===========================================================");
|
||||
print!("Select option: ");
|
||||
|
||||
use std::io::Write;
|
||||
std::io::stdout().flush().unwrap();
|
||||
|
||||
let mut input = String::new();
|
||||
let _ = std::io::stdin().read_line(&mut input);
|
||||
let choice = input.trim();
|
||||
|
||||
match choice {
|
||||
// -------------------- 1. PRINT GLOBAL STATS --------------------
|
||||
"1" => {
|
||||
let _ = tx.send(RttEvent::PrintStats).await;
|
||||
}
|
||||
|
||||
// -------------------- 2. WRITE STATS ---------------------------
|
||||
"2" => {
|
||||
print!("Enter file path: ");
|
||||
std::io::stdout().flush().unwrap();
|
||||
|
||||
let mut path = String::new();
|
||||
let _ = std::io::stdin().read_line(&mut path);
|
||||
|
||||
let path = path.trim().to_string();
|
||||
let _ = tx.send(RttEvent::WriteStats { path }).await;
|
||||
}
|
||||
|
||||
// -------------------- 3. PRINT ROUTE DETAILS -------------------
|
||||
"3" => {
|
||||
print!("Enter route index (0-based): ");
|
||||
std::io::stdout().flush().unwrap();
|
||||
|
||||
let mut s = String::new();
|
||||
let _ = std::io::stdin().read_line(&mut s);
|
||||
|
||||
if let Ok(index) = s.trim().parse::<usize>() {
|
||||
let _ = tx
|
||||
.send(RttEvent::PrintRouteDetail { route_index: index })
|
||||
.await;
|
||||
} else {
|
||||
println!("Invalid index.");
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------- 4. PRINT STATS BY NODE STRING -----------
|
||||
"4" => {
|
||||
println!("Enter Node String EXACTLY as stored.");
|
||||
println!("Example format:");
|
||||
println!(" <base58_node1> > <base58_node2> > <base58_node3>");
|
||||
print!("Nodes: ");
|
||||
std::io::stdout().flush().unwrap();
|
||||
|
||||
let mut nodes = String::new();
|
||||
let _ = std::io::stdin().read_line(&mut nodes);
|
||||
|
||||
let nodes = nodes.trim().to_string();
|
||||
let _ = tx.send(RttEvent::PrintRouteStatsByNodes { nodes }).await;
|
||||
}
|
||||
|
||||
// -------------------- 5. AVG ABOVE THRESHOLD ------------------
|
||||
"5" => {
|
||||
print!("Enter threshold in ms: ");
|
||||
std::io::stdout().flush().unwrap();
|
||||
|
||||
let mut s = String::new();
|
||||
let _ = std::io::stdin().read_line(&mut s);
|
||||
|
||||
if let Ok(th) = s.trim().parse::<u128>() {
|
||||
let _ = tx
|
||||
.send(RttEvent::PrintRoutesWithAvgAbove { threshold_ms: th })
|
||||
.await;
|
||||
} else {
|
||||
println!("Invalid number.");
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------- 6. ANY ABOVE THRESHOLD ------------------
|
||||
"6" => {
|
||||
print!("Enter threshold in ms: ");
|
||||
std::io::stdout().flush().unwrap();
|
||||
|
||||
let mut s = String::new();
|
||||
let _ = std::io::stdin().read_line(&mut s);
|
||||
|
||||
if let Ok(th) = s.trim().parse::<u128>() {
|
||||
let _ = tx
|
||||
.send(RttEvent::PrintRoutesWithAnyAbove { threshold_ms: th })
|
||||
.await;
|
||||
} else {
|
||||
println!("Invalid number.");
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------- 7. HELP --------------------------------
|
||||
"7" => {
|
||||
print_help();
|
||||
}
|
||||
"8" => {
|
||||
// Ask for CSV path
|
||||
print!("Enter CSV output path (e.g. rtt_stats.csv): ");
|
||||
std::io::stdout().flush().unwrap();
|
||||
|
||||
let mut path = String::new();
|
||||
let _ = std::io::stdin().read_line(&mut path);
|
||||
let path = path.trim().to_string();
|
||||
|
||||
// Sub-menu for histogram mode
|
||||
println!("\nHistogram mode:");
|
||||
println!(" 1) One plot with ALL RTT samples (including outliers)");
|
||||
println!(" 2) One plot with INLIERS only (RTT <= cutoff)");
|
||||
println!(" 3) TWO plots: one for INLIERS and one for OUTLIERS");
|
||||
print!("Select mode: ");
|
||||
std::io::stdout().flush().unwrap();
|
||||
|
||||
let mut mode_input = String::new();
|
||||
let _ = std::io::stdin().read_line(&mut mode_input);
|
||||
let mode_choice = mode_input.trim();
|
||||
|
||||
let outlier_mode = match mode_choice {
|
||||
// 1) All RTTs
|
||||
"1" => "all".to_string(),
|
||||
|
||||
// 2) Only inliers, ask for cutoff in seconds
|
||||
"2" => {
|
||||
print!("Enter cutoff in seconds (e.g. 1.0 for 1 second): ");
|
||||
std::io::stdout().flush().unwrap();
|
||||
|
||||
let mut c = String::new();
|
||||
let _ = std::io::stdin().read_line(&mut c);
|
||||
let cutoff = c.trim();
|
||||
cutoff.to_string() // e.g. "1.0"
|
||||
}
|
||||
|
||||
// 3) Two plots: inliers + outliers (both)
|
||||
"3" => {
|
||||
print!("Enter cutoff in seconds (e.g. 1.0 for 1 second): ");
|
||||
std::io::stdout().flush().unwrap();
|
||||
|
||||
let mut c = String::new();
|
||||
let _ = std::io::stdin().read_line(&mut c);
|
||||
let cutoff = c.trim();
|
||||
// Encode as 'both:<cutoff>' so Python can understand it
|
||||
format!("both:{cutoff}")
|
||||
}
|
||||
|
||||
_ => {
|
||||
println!("Invalid mode, aborting histogram generation.");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let _ = tx
|
||||
.send(RttEvent::WriteStatsAndPlot { path, outlier_mode })
|
||||
.await;
|
||||
}
|
||||
|
||||
"9" => {
|
||||
// Send an event to the RTT analyzer to compute and print progress
|
||||
let _ = tx.send(RttEvent::PrintExperimentProgress).await;
|
||||
}
|
||||
|
||||
_ => println!("Invalid selection."),
|
||||
}
|
||||
}
|
||||
|
||||
fn print_help() {
|
||||
println!("\n======================== RTT HELP ========================\n");
|
||||
|
||||
println!("This tool allows you to perform detailed RTT analysis over all mixnet routes.");
|
||||
println!("The client sends RTT probe traffic through every candidate route,");
|
||||
println!("and the RTT analyzer collects per-route statistics in the background.\n");
|
||||
|
||||
println!("Main commands (from the RTT menu):\n");
|
||||
|
||||
println!(" 1) Print global RTT statistics");
|
||||
println!(" Prints one summary line per route:");
|
||||
println!(" - route index");
|
||||
println!(" - packets sent (including retransmissions)");
|
||||
println!(" - number of ACKs");
|
||||
println!(" - number of timeouts");
|
||||
println!(" - average RTT (computed over all stored RTT samples, in ms)");
|
||||
println!();
|
||||
|
||||
println!(" 2) Write stats to CSV file");
|
||||
println!(" Writes one line per route to a CSV file on disk.");
|
||||
println!(" Current CSV columns:");
|
||||
println!(" route,sent,acks,timeouts,avg_rtt");
|
||||
println!(" route : numeric route index");
|
||||
println!(" sent : how many FragmentSent events were recorded");
|
||||
println!(" acks : how many FragmentAckReceived events were recorded");
|
||||
println!(" timeouts : how many FragmentAckExpired events were recorded");
|
||||
println!(" avg_rtt : average RTT (in milliseconds) from all RTT samples");
|
||||
println!();
|
||||
|
||||
println!(" 3) Print route details BY ROUTE INDEX");
|
||||
println!(" Input: a 0-based route index.");
|
||||
println!(" Output for that route:");
|
||||
println!(" - node list (base58 identities) in order: Node1 > Node2 > Node3");
|
||||
println!(" - ALL RTT samples recorded for that route (each sample shown in ms)");
|
||||
println!(" This is useful when you already know the route index and");
|
||||
println!(" want to inspect exactly how it behaves packet by packet.");
|
||||
println!();
|
||||
|
||||
println!(" 4) Print route details BY NODE STRING");
|
||||
println!(" Input format must match exactly what the analyzer stored, for example:");
|
||||
println!(" <node1_base58> > <node2_base58> > <node3_base58>");
|
||||
println!(" If a route with that node sequence exists, the tool will:");
|
||||
println!(" - print the matching route index");
|
||||
println!(" - print the full per-route detail (same as option 3).");
|
||||
println!(" This is useful when you have a specific mixnode combination");
|
||||
println!(" (e.g. a slow or suspicious path) and want its statistics.");
|
||||
println!();
|
||||
|
||||
println!(" 5) Print routes with AVERAGE RTT ABOVE a threshold");
|
||||
println!(" You provide a threshold in milliseconds (e.g. 150).");
|
||||
println!(" The tool will:");
|
||||
println!(" - compute avg RTT for each route");
|
||||
println!(" - select only routes where avg RTT > threshold");
|
||||
println!(" - print detailed info for each matching route (nodes + RTT samples).");
|
||||
println!(" Use this to quickly find generally slow routes.");
|
||||
println!();
|
||||
|
||||
println!(" 6) Print routes with ANY RTT ABOVE a threshold");
|
||||
println!(" You provide a threshold in milliseconds (e.g. 500).");
|
||||
println!(" For each route, if at least one RTT sample exceeds the threshold,");
|
||||
println!(" that route is printed with full details.");
|
||||
println!(" Use this to find routes that occasionally spike very high,");
|
||||
println!(" even if their average RTT is still acceptable.");
|
||||
println!();
|
||||
|
||||
println!(" 7) Show experiment progress (percentage completed)");
|
||||
println!(" Uses the stored experiment configuration (total_routes, packets_per_route)");
|
||||
println!(" plus the number of RTT samples recorded so far to estimate:");
|
||||
println!(" completion = received_samples / (total_routes * packets_per_route)");
|
||||
println!(" The result is printed as a percentage (0%–100%).");
|
||||
println!(" This tells you roughly how far the RTT experiment has progressed.");
|
||||
println!();
|
||||
|
||||
println!(" 8) Write stats AND generate histogram(s) via Python");
|
||||
println!(" This command will:");
|
||||
println!(" 1) Write the current route statistics to a CSV file (same as option 2).");
|
||||
println!(" 2) Call the Python script 'rtt_histogram.py' to visualize RTTs.");
|
||||
println!();
|
||||
println!(" When prompted, you will provide two things:");
|
||||
println!(" - CSV file path (where to save the stats)");
|
||||
println!(" - outlier_mode string, which controls which histograms are generated:");
|
||||
println!();
|
||||
println!(" • \"all\"");
|
||||
println!(" Use ALL avg_rtt values from the CSV.");
|
||||
println!(
|
||||
" Result: a single histogram containing every route's avg RTT (in seconds)."
|
||||
);
|
||||
println!();
|
||||
println!(" • \"<cutoff>\" (numeric, in seconds, e.g. \"1.0\")");
|
||||
println!(" Only keep avg_rtt <= cutoff.");
|
||||
println!(" Result: a single histogram with INLIERS only (values <= cutoff).");
|
||||
println!(
|
||||
" Example: \"1.0\" keeps everything at or below 1.0s and drops slower routes."
|
||||
);
|
||||
println!();
|
||||
println!(" • \"both:<cutoff>\" (e.g. \"both:1.0\")");
|
||||
println!(" Split the data into two sets:");
|
||||
println!(" - inliers : avg_rtt <= cutoff");
|
||||
println!(" - outliers : avg_rtt > cutoff");
|
||||
println!(" Result: TWO histograms are generated:");
|
||||
println!(" 1) Distribution of inliers");
|
||||
println!(" 2) Distribution of outliers");
|
||||
println!(
|
||||
" This helps visually compare the \"normal\" routes and the very slow ones."
|
||||
);
|
||||
println!();
|
||||
|
||||
println!("Helpful notes:");
|
||||
println!(" • RTT samples are computed when a FragmentReceived event arrives.");
|
||||
println!(" For each fragment that may be retransmitted, the analyzer stores");
|
||||
println!(" multiple send times and receive times, and pairs them in order");
|
||||
println!(" to compute multiple RTT values for that fragment if needed.");
|
||||
println!(" • \"sent\" in the stats includes retransmissions as well, so it may be");
|
||||
println!(" higher than packets_per_route for unstable routes.");
|
||||
|
||||
println!("===========================================================\n");
|
||||
}
|
||||
@@ -29,6 +29,8 @@ time = { workspace = true }
|
||||
tokio = { workspace = true, features = ["sync", "macros"] }
|
||||
tracing = { workspace = true }
|
||||
zeroize = { workspace = true }
|
||||
once_cell = "1.19"
|
||||
|
||||
|
||||
# internal
|
||||
nym-id = { path = "../nym-id" }
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
// Copyright 2020-2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::client::rtt_analyzer::{RttConfig, RttEvent};
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use nym_sphinx::params::PacketType;
|
||||
use nym_task::connections::TransmissionLane;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
||||
pub type InputMessageSender = tokio::sync::mpsc::Sender<InputMessage>;
|
||||
pub type InputMessageReceiver = tokio::sync::mpsc::Receiver<InputMessage>;
|
||||
@@ -46,6 +48,13 @@ pub enum InputMessage {
|
||||
lane: TransmissionLane,
|
||||
max_retransmissions: Option<u32>,
|
||||
},
|
||||
RunRTTTest {
|
||||
recipient: Recipient,
|
||||
lane: TransmissionLane,
|
||||
max_retransmissions: Option<u32>,
|
||||
sender: Sender<RttEvent>,
|
||||
config: RttConfig,
|
||||
},
|
||||
|
||||
/// Attempt to use our internally received and stored `ReplySurb` to send the message back
|
||||
/// to specified recipient whilst not knowing its full identity (or even gateway).
|
||||
@@ -150,6 +159,7 @@ impl InputMessage {
|
||||
match self {
|
||||
InputMessage::Regular { lane, .. }
|
||||
| InputMessage::Anonymous { lane, .. }
|
||||
| InputMessage::RunRTTTest { lane, .. }
|
||||
| InputMessage::Reply { lane, .. }
|
||||
| InputMessage::Premade { lane, .. } => lane,
|
||||
InputMessage::MessageWrapper { message, .. } => message.lane(),
|
||||
@@ -166,6 +176,10 @@ impl InputMessage {
|
||||
max_retransmissions: m,
|
||||
..
|
||||
}
|
||||
| InputMessage::RunRTTTest {
|
||||
max_retransmissions: m,
|
||||
..
|
||||
}
|
||||
| InputMessage::Reply {
|
||||
max_retransmissions: m,
|
||||
..
|
||||
|
||||
@@ -11,6 +11,7 @@ pub mod mix_traffic;
|
||||
pub mod real_messages_control;
|
||||
pub mod received_buffer;
|
||||
pub mod replies;
|
||||
pub mod rtt_analyzer;
|
||||
pub mod statistics_control;
|
||||
pub mod topology_control;
|
||||
pub(crate) mod transmission_buffer;
|
||||
|
||||
+25
-4
@@ -4,6 +4,7 @@
|
||||
use super::action_controller::{AckActionSender, Action};
|
||||
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
|
||||
|
||||
use crate::client::rtt_analyzer::{RttAnalyzer, RttEvent};
|
||||
use futures::StreamExt;
|
||||
use nym_gateway_client::AcknowledgementReceiver;
|
||||
use nym_sphinx::{
|
||||
@@ -12,6 +13,7 @@ use nym_sphinx::{
|
||||
};
|
||||
use nym_task::ShutdownToken;
|
||||
use std::sync::Arc;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use tracing::*;
|
||||
|
||||
/// Module responsible for listening for any data resembling acknowledgements from the network
|
||||
@@ -38,7 +40,11 @@ impl AcknowledgementListener {
|
||||
}
|
||||
}
|
||||
|
||||
async fn on_ack(&mut self, ack_content: Vec<u8>) {
|
||||
async fn on_ack(
|
||||
&mut self,
|
||||
ack_content: Vec<u8>,
|
||||
rtt_producer: Option<tokio::sync::mpsc::Sender<RttEvent>>,
|
||||
) {
|
||||
trace!("Received an ack");
|
||||
self.stats_tx
|
||||
.report(PacketStatisticsEvent::AckReceived(ack_content.len()).into());
|
||||
@@ -62,6 +68,16 @@ impl AcknowledgementListener {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(ref producer) = rtt_producer {
|
||||
if let Ok(duration) = SystemTime::now().duration_since(UNIX_EPOCH) {
|
||||
let now = duration.as_millis();
|
||||
let _ = producer.try_send(RttEvent::FragmentAckReceived {
|
||||
fragment_id: frag_id.set_id().to_string(),
|
||||
timestamp: now,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
trace!("Received {frag_id} from the mix network");
|
||||
self.stats_tx
|
||||
.report(PacketStatisticsEvent::RealAckReceived(ack_content.len()).into());
|
||||
@@ -70,15 +86,20 @@ impl AcknowledgementListener {
|
||||
.unbounded_send(Action::new_remove(frag_id));
|
||||
}
|
||||
|
||||
async fn handle_ack_receiver_item(&mut self, item: Vec<Vec<u8>>) {
|
||||
async fn handle_ack_receiver_item(
|
||||
&mut self,
|
||||
item: Vec<Vec<u8>>,
|
||||
rtt_producer: Option<tokio::sync::mpsc::Sender<RttEvent>>,
|
||||
) {
|
||||
// realistically we would only be getting one ack at the time
|
||||
for ack in item {
|
||||
self.on_ack(ack).await;
|
||||
self.on_ack(ack, rtt_producer.clone()).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
|
||||
debug!("Started AcknowledgementListener with graceful shutdown support");
|
||||
let rtt_producer = RttAnalyzer::producer();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
@@ -88,7 +109,7 @@ impl AcknowledgementListener {
|
||||
break;
|
||||
}
|
||||
acks = self.ack_receiver.next() => match acks {
|
||||
Some(acks) => self.handle_ack_receiver_item(acks).await,
|
||||
Some(acks) => self.handle_ack_receiver_item(acks,rtt_producer.clone()).await,
|
||||
None => {
|
||||
tracing::trace!("AcknowledgementListener: Stopping since channel closed");
|
||||
break;
|
||||
|
||||
+19
-3
@@ -3,6 +3,7 @@
|
||||
|
||||
use super::PendingAcknowledgement;
|
||||
use crate::client::real_messages_control::acknowledgement_control::RetransmissionRequestSender;
|
||||
use crate::client::rtt_analyzer::{RttAnalyzer, RttEvent};
|
||||
use futures::channel::mpsc;
|
||||
use futures::StreamExt;
|
||||
use nym_nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue, QueueKey};
|
||||
@@ -16,6 +17,7 @@ use tracing::*;
|
||||
|
||||
pub(crate) type AckActionSender = mpsc::UnboundedSender<Action>;
|
||||
pub(crate) type AckActionReceiver = mpsc::UnboundedReceiver<Action>;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
// The actual data being sent off as well as potential key to the delay queue
|
||||
type PendingAckEntry = (Arc<PendingAcknowledgement>, Option<QueueKey>);
|
||||
@@ -207,8 +209,22 @@ impl ActionController {
|
||||
|
||||
// note: when the entry expires it's automatically removed from pending_acks_timers
|
||||
#[allow(clippy::panic)]
|
||||
fn handle_expired_ack_timer(&mut self, expired_ack: Expired<FragmentIdentifier>) {
|
||||
fn handle_expired_ack_timer(
|
||||
&mut self,
|
||||
expired_ack: Expired<FragmentIdentifier>,
|
||||
rtt_producer: Option<tokio::sync::mpsc::Sender<RttEvent>>,
|
||||
) {
|
||||
let frag_id = expired_ack.into_inner();
|
||||
if let Some(ref producer) = rtt_producer {
|
||||
if let Ok(duration) = SystemTime::now().duration_since(UNIX_EPOCH) {
|
||||
let now: u128 = duration.as_millis();
|
||||
|
||||
let _ = producer.try_send(RttEvent::FragmentAckExpired {
|
||||
fragment_id: frag_id.set_id().to_string(),
|
||||
timestamp: now,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
trace!("{frag_id} has expired");
|
||||
|
||||
@@ -244,7 +260,7 @@ impl ActionController {
|
||||
|
||||
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
|
||||
debug!("Started ActionController with graceful shutdown support");
|
||||
|
||||
let rtt_producer = RttAnalyzer::producer();
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
@@ -262,7 +278,7 @@ impl ActionController {
|
||||
}
|
||||
},
|
||||
expired_ack = self.pending_acks_timers.next() => match expired_ack {
|
||||
Some(expired_ack) => self.handle_expired_ack_timer(expired_ack),
|
||||
Some(expired_ack) => self.handle_expired_ack_timer(expired_ack,rtt_producer.clone()),
|
||||
None => {
|
||||
tracing::trace!("ActionController: Stopping since ack channel closed");
|
||||
break;
|
||||
|
||||
+60
-1
@@ -5,6 +5,7 @@ use crate::client::inbound_messages::{InputMessage, InputMessageReceiver};
|
||||
use crate::client::real_messages_control::message_handler::MessageHandler;
|
||||
use crate::client::real_messages_control::real_traffic_stream::RealMessage;
|
||||
use crate::client::replies::reply_controller::ReplyControllerSender;
|
||||
use crate::client::rtt_analyzer::{RttConfig, RttEvent};
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
@@ -12,6 +13,7 @@ use nym_sphinx::params::PacketType;
|
||||
use nym_task::connections::TransmissionLane;
|
||||
use nym_task::ShutdownToken;
|
||||
use rand::{CryptoRng, Rng};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tracing::*;
|
||||
|
||||
/// Module responsible for dealing with the received messages: splitting them, creating acknowledgements,
|
||||
@@ -111,7 +113,30 @@ where
|
||||
warn!("failed to send a repliable message - {err}")
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_rtt_test(
|
||||
&mut self,
|
||||
recipient: Recipient,
|
||||
lane: TransmissionLane,
|
||||
packet_type: PacketType,
|
||||
max_retransmissions: Option<u32>,
|
||||
sender: Sender<RttEvent>,
|
||||
config: RttConfig,
|
||||
) {
|
||||
if let Err(err) = self
|
||||
.message_handler
|
||||
.try_run_rtt_test(
|
||||
recipient,
|
||||
lane,
|
||||
packet_type,
|
||||
max_retransmissions,
|
||||
sender,
|
||||
config,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("failed to send a repliable message - {err}")
|
||||
}
|
||||
}
|
||||
#[allow(clippy::panic)]
|
||||
async fn on_input_message(&mut self, msg: InputMessage) {
|
||||
match msg {
|
||||
@@ -147,6 +172,23 @@ where
|
||||
)
|
||||
.await
|
||||
}
|
||||
InputMessage::RunRTTTest {
|
||||
recipient,
|
||||
lane,
|
||||
max_retransmissions,
|
||||
sender,
|
||||
config,
|
||||
} => {
|
||||
self.run_rtt_test(
|
||||
recipient,
|
||||
lane,
|
||||
PacketType::Mix,
|
||||
max_retransmissions,
|
||||
sender,
|
||||
config,
|
||||
)
|
||||
.await
|
||||
}
|
||||
InputMessage::Reply {
|
||||
recipient_tag,
|
||||
data,
|
||||
@@ -176,6 +218,23 @@ where
|
||||
)
|
||||
.await
|
||||
}
|
||||
InputMessage::RunRTTTest {
|
||||
recipient,
|
||||
lane,
|
||||
max_retransmissions,
|
||||
sender,
|
||||
config,
|
||||
} => {
|
||||
self.run_rtt_test(
|
||||
recipient,
|
||||
lane,
|
||||
PacketType::Mix,
|
||||
max_retransmissions,
|
||||
sender,
|
||||
config,
|
||||
)
|
||||
.await
|
||||
}
|
||||
InputMessage::Anonymous {
|
||||
recipient,
|
||||
data,
|
||||
|
||||
@@ -8,6 +8,7 @@ use crate::client::real_messages_control::real_traffic_stream::{
|
||||
use crate::client::real_messages_control::{AckActionSender, Action};
|
||||
use crate::client::replies::reply_controller::MaxRetransmissions;
|
||||
use crate::client::replies::reply_storage::{ReceivedReplySurbsMap, SentReplyKeys, UsedSenderTags};
|
||||
use crate::client::rtt_analyzer::{RttConfig, RttEvent, RttPattern};
|
||||
use crate::client::topology_control::{TopologyAccessor, TopologyReadPermit};
|
||||
use nym_client_core_surb_storage::RetrievedReplySurb;
|
||||
use nym_sphinx::acknowledgements::AckKey;
|
||||
@@ -27,6 +28,8 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::time::sleep;
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
|
||||
// TODO: move that error elsewhere since it seems to be contaminating different files
|
||||
@@ -555,6 +558,97 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn try_split_and_send_non_reply_rtt_message(
|
||||
&mut self,
|
||||
sender_tag: AnonymousSenderTag,
|
||||
recipient: Recipient,
|
||||
lane: TransmissionLane,
|
||||
packet_type: PacketType,
|
||||
topology: &NymRouteProvider,
|
||||
max_retransmissions: Option<u32>,
|
||||
route_index: usize,
|
||||
sender: Sender<RttEvent>,
|
||||
) -> Result<(), PreparationError> {
|
||||
debug!("Sending RTT test message on route index {route_index} with packet type {packet_type:?}");
|
||||
|
||||
// Construct the base message
|
||||
let message = NymMessage::new_repliable(RepliableMessage::new_data(
|
||||
self.config.use_legacy_sphinx_format,
|
||||
Vec::new(),
|
||||
sender_tag,
|
||||
Vec::new(),
|
||||
));
|
||||
|
||||
debug_assert!(!matches!(message, NymMessage::Reply(_)));
|
||||
|
||||
let packet_size = if packet_type == PacketType::Outfox {
|
||||
PacketSize::OutfoxRegularPacket
|
||||
} else {
|
||||
self.optimal_packet_size(&message)
|
||||
};
|
||||
|
||||
trace!("Using packet size {packet_size:?}");
|
||||
|
||||
// ✅ Drop the read lock before mutably borrowing self
|
||||
|
||||
// Prepare fragments from message
|
||||
let fragments = self
|
||||
.message_preparer
|
||||
.pad_and_split_message(message, packet_size);
|
||||
|
||||
if fragments.len() > 1 {
|
||||
println!(
|
||||
"[RTT TEST] Warning: message was split into {} fragments",
|
||||
fragments.len()
|
||||
);
|
||||
}
|
||||
|
||||
let mut pending_acks = Vec::with_capacity(fragments.len());
|
||||
let mut real_messages = Vec::with_capacity(fragments.len());
|
||||
|
||||
for fragment in &fragments {
|
||||
let prepared_fragment = self
|
||||
.message_preparer
|
||||
.prepare_chunk_for_sending_with_deterministic_route(
|
||||
fragment.clone(),
|
||||
&topology,
|
||||
&self.config.ack_key,
|
||||
&recipient,
|
||||
packet_type,
|
||||
route_index,
|
||||
)?;
|
||||
|
||||
let _ = sender.try_send(RttEvent::RouteUsed {
|
||||
route_index,
|
||||
fragment_id: (fragment.fragment_identifier().set_id().to_string()),
|
||||
});
|
||||
|
||||
let real_message = RealMessage::new(
|
||||
prepared_fragment.mix_packet,
|
||||
Some(fragment.fragment_identifier().clone()),
|
||||
);
|
||||
|
||||
let pending_ack = PendingAcknowledgement::new_known(
|
||||
fragment.clone(),
|
||||
prepared_fragment.total_delay,
|
||||
recipient,
|
||||
max_retransmissions,
|
||||
);
|
||||
|
||||
real_messages.push(real_message);
|
||||
pending_acks.push(pending_ack);
|
||||
}
|
||||
|
||||
// Record ACKs and forward messages for *this route only*
|
||||
self.insert_pending_acks(pending_acks);
|
||||
self.forward_messages(real_messages, lane).await;
|
||||
|
||||
// // Optional: small delay to avoid flooding
|
||||
// sleep(Duration::from_millis(200)).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn try_send_additional_reply_surbs(
|
||||
&mut self,
|
||||
recipient: Recipient,
|
||||
@@ -633,6 +727,142 @@ where
|
||||
|
||||
Ok(())
|
||||
}
|
||||
// Helper: sends ONE RTT packet on ONE specific route.
|
||||
// Rust requires this to be a standalone async function (not an async closure),
|
||||
// because async closures cannot borrow local variables safely.
|
||||
async fn send_packet_on_route(
|
||||
&mut self,
|
||||
recipient: &Recipient,
|
||||
num_reply_surbs: u32,
|
||||
lane: TransmissionLane,
|
||||
packet_type: PacketType,
|
||||
topology: &NymRouteProvider,
|
||||
max_retransmissions: Option<u32>,
|
||||
route_index: usize,
|
||||
sender: &Sender<RttEvent>,
|
||||
) -> Result<(), SurbWrappedPreparationError> {
|
||||
let sender_tag = self.get_or_create_sender_tag(recipient);
|
||||
|
||||
// Prepare reply SURBs
|
||||
let reply_surbs = self.generate_reply_surbs(num_reply_surbs as usize).await?;
|
||||
let reply_keys = reply_surbs
|
||||
.iter()
|
||||
.map(|s| *s.encryption_key())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Send message on the given route
|
||||
self.try_split_and_send_non_reply_rtt_message(
|
||||
sender_tag,
|
||||
recipient.clone(),
|
||||
lane,
|
||||
packet_type,
|
||||
topology,
|
||||
max_retransmissions,
|
||||
route_index,
|
||||
sender.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Store reply keys after sending
|
||||
self.reply_key_storage.insert_multiple(reply_keys);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
pub(crate) async fn try_run_rtt_test(
|
||||
&mut self,
|
||||
recipient: Recipient,
|
||||
lane: TransmissionLane,
|
||||
packet_type: PacketType,
|
||||
max_retransmissions: Option<u32>,
|
||||
sender: Sender<RttEvent>,
|
||||
config: RttConfig,
|
||||
) -> Result<(), SurbWrappedPreparationError> {
|
||||
debug!("Starting RTT test using pattern {:?}", config.pattern);
|
||||
|
||||
// Load topology
|
||||
let topology_permit = self.topology_access.get_read_permit().await;
|
||||
let mut topology = self.get_topology(&topology_permit)?.clone();
|
||||
let route_strings = topology
|
||||
.topology
|
||||
.initialize_static_mixnodes_for_rtt_testing()?;
|
||||
let total_routes = topology.topology.all_mix_routes.len();
|
||||
drop(topology_permit);
|
||||
// =====================================================
|
||||
// SEND ROUTE STRINGS TO RTT ANALYZER USING try_send()
|
||||
// =====================================================
|
||||
for (route_index, nodes_string) in route_strings {
|
||||
let _ = sender.try_send(RttEvent::RouteNodes {
|
||||
route_index,
|
||||
nodes: nodes_string,
|
||||
});
|
||||
}
|
||||
sender
|
||||
.send(RttEvent::ExperimentConfiguration {
|
||||
total_routes: total_routes as usize,
|
||||
per_route_sent: config.packets_per_route as usize,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// ==============================================================
|
||||
// PATTERN: BURST
|
||||
// Send packets_per_route packets on each route sequentially
|
||||
// ==============================================================
|
||||
if let RttPattern::Burst = config.pattern {
|
||||
for route in 0..total_routes {
|
||||
for _ in 0..config.packets_per_route {
|
||||
self.send_packet_on_route(
|
||||
&recipient,
|
||||
0,
|
||||
lane,
|
||||
packet_type,
|
||||
&topology,
|
||||
max_retransmissions,
|
||||
route,
|
||||
&sender,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Optional delay between packets on the same route
|
||||
if config.inter_route_delay_ms > 0 {
|
||||
tokio::time::sleep(Duration::from_millis(config.inter_route_delay_ms))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// ==============================================================
|
||||
// PATTERN: ROUND ROBIN
|
||||
// Send packets in cycles: 1 on route 0, 1 on route 1, ..., repeat
|
||||
// ==============================================================
|
||||
if let RttPattern::RoundRobin = config.pattern {
|
||||
for _cycle in 0..config.packets_per_route {
|
||||
for route in 0..total_routes {
|
||||
self.send_packet_on_route(
|
||||
&recipient,
|
||||
0,
|
||||
lane,
|
||||
packet_type,
|
||||
&topology,
|
||||
max_retransmissions,
|
||||
route,
|
||||
&sender,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if config.inter_route_delay_ms > 0 {
|
||||
tokio::time::sleep(Duration::from_millis(config.inter_route_delay_ms))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn try_prepare_single_chunk_for_sending(
|
||||
&mut self,
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
use self::sending_delay_controller::SendingDelayController;
|
||||
use crate::client::mix_traffic::BatchMixMessageSender;
|
||||
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
|
||||
use crate::client::rtt_analyzer::{RttAnalyzer, RttEvent};
|
||||
use crate::client::topology_control::TopologyAccessor;
|
||||
use crate::client::transmission_buffer::TransmissionBuffer;
|
||||
use crate::config;
|
||||
@@ -21,6 +22,7 @@ use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, C
|
||||
use nym_task::connections::{
|
||||
ConnectionCommand, ConnectionCommandReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
|
||||
};
|
||||
|
||||
use nym_task::ShutdownToken;
|
||||
use rand::{CryptoRng, Rng};
|
||||
use std::pin::Pin;
|
||||
@@ -224,7 +226,11 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
async fn on_message(&mut self, next_message: StreamMessage) {
|
||||
async fn on_message(
|
||||
&mut self,
|
||||
next_message: StreamMessage,
|
||||
rtt_producer: Option<tokio::sync::mpsc::Sender<RttEvent>>,
|
||||
) {
|
||||
trace!("created new message");
|
||||
|
||||
let (next_message, fragment_id, packet_size) = match next_message {
|
||||
@@ -271,6 +277,21 @@ where
|
||||
)
|
||||
}
|
||||
StreamMessage::Real(real_message) => {
|
||||
if let Some(ref producer) = rtt_producer {
|
||||
if let Some(fragment_id_local) = real_message.fragment_id {
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
let _ = producer.try_send(RttEvent::FragmentSent {
|
||||
fragment_id: (fragment_id_local.set_id().to_string()),
|
||||
timestamp: (now),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let packet_size = real_message.packet_size();
|
||||
(
|
||||
real_message.mix_packet,
|
||||
@@ -584,7 +605,7 @@ where
|
||||
|
||||
pub(crate) async fn run(&mut self) {
|
||||
debug!("Started OutQueueControl with graceful shutdown support");
|
||||
|
||||
let rtt_producer = RttAnalyzer::producer();
|
||||
// avoid borrow on self
|
||||
let shutdown_token = self.shutdown_token.clone();
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
@@ -602,7 +623,7 @@ where
|
||||
self.log_status();
|
||||
}
|
||||
next_message = self.next() => if let Some(next_message) = next_message {
|
||||
self.on_message(next_message).await;
|
||||
self.on_message(next_message,rtt_producer.clone()).await;
|
||||
} else {
|
||||
tracing::trace!("OutQueueControl: Stopping since channel closed");
|
||||
break;
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::client::helpers::get_time_now;
|
||||
use crate::client::replies::{
|
||||
reply_controller::ReplyControllerSender, reply_storage::SentReplyKeys,
|
||||
};
|
||||
use crate::client::rtt_analyzer::{RttAnalyzer, RttEvent};
|
||||
use futures::channel::mpsc;
|
||||
use futures::lock::Mutex;
|
||||
use futures::StreamExt;
|
||||
@@ -55,6 +56,7 @@ struct ReceivedMessagesBufferInner<R: MessageReceiver> {
|
||||
|
||||
// Periodically check for stale buffers to clean up
|
||||
last_stale_check: crate::client::helpers::Instant,
|
||||
rtt_producer: Option<tokio::sync::mpsc::Sender<RttEvent>>,
|
||||
}
|
||||
|
||||
impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
|
||||
@@ -81,7 +83,20 @@ impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
|
||||
warn!("failed to recover fragment from raw data: {err}. The whole underlying message might be corrupted and unrecoverable!");
|
||||
return None;
|
||||
}
|
||||
Ok(frag) => frag,
|
||||
Ok(frag) => {
|
||||
if let Some(ref producer) = self.rtt_producer {
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
let _ = producer.try_send(RttEvent::FragmentReceived {
|
||||
fragment_id: frag.id().to_string(),
|
||||
timestamp: now,
|
||||
});
|
||||
}
|
||||
frag
|
||||
}
|
||||
};
|
||||
|
||||
if self.recently_reconstructed.contains(&fragment.id()) {
|
||||
@@ -181,6 +196,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
stats_tx: ClientStatsSender,
|
||||
shutdown_token: ShutdownToken,
|
||||
rtt_producer: Option<tokio::sync::mpsc::Sender<RttEvent>>,
|
||||
) -> Self {
|
||||
ReceivedMessagesBuffer {
|
||||
inner: Arc::new(Mutex::new(ReceivedMessagesBufferInner {
|
||||
@@ -191,6 +207,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
|
||||
recently_reconstructed: HashSet::new(),
|
||||
stats_tx,
|
||||
last_stale_check: get_time_now(),
|
||||
rtt_producer: rtt_producer.clone(),
|
||||
})),
|
||||
reply_key_storage,
|
||||
reply_controller_sender,
|
||||
@@ -585,6 +602,7 @@ impl<R: MessageReceiver + Clone + Send + 'static> ReceivedMessagesBufferControll
|
||||
reply_controller_sender,
|
||||
metrics_reporter,
|
||||
shutdown_token.clone(),
|
||||
RttAnalyzer::producer(),
|
||||
);
|
||||
|
||||
ReceivedMessagesBufferController {
|
||||
|
||||
@@ -0,0 +1,514 @@
|
||||
use std::collections::HashMap;
|
||||
use std::fs::File;
|
||||
use std::io::{BufWriter, Write};
|
||||
|
||||
use once_cell::sync::OnceCell;
|
||||
use std::process::Command;
|
||||
use std::sync::Mutex;
|
||||
use tokio::sync::mpsc::{self, Sender};
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RttPattern {
|
||||
Burst,
|
||||
RoundRobin,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RttConfig {
|
||||
pub packets_per_route: u32,
|
||||
pub pattern: RttPattern,
|
||||
pub inter_route_delay_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RttEvent {
|
||||
RouteUsed {
|
||||
route_index: usize,
|
||||
fragment_id: String,
|
||||
},
|
||||
FragmentSent {
|
||||
fragment_id: String,
|
||||
timestamp: u128,
|
||||
},
|
||||
FragmentAckReceived {
|
||||
fragment_id: String,
|
||||
timestamp: u128,
|
||||
},
|
||||
FragmentAckExpired {
|
||||
fragment_id: String,
|
||||
timestamp: u128,
|
||||
},
|
||||
FragmentReceived {
|
||||
fragment_id: String,
|
||||
timestamp: u128,
|
||||
},
|
||||
RouteNodes {
|
||||
route_index: usize,
|
||||
nodes: String,
|
||||
},
|
||||
ExperimentConfiguration {
|
||||
total_routes: usize,
|
||||
per_route_sent: usize,
|
||||
},
|
||||
PrintRouteDetail {
|
||||
route_index: usize,
|
||||
},
|
||||
PrintRouteStatsByNodes {
|
||||
nodes: String,
|
||||
},
|
||||
PrintRoutesWithAvgAbove {
|
||||
threshold_ms: u128,
|
||||
},
|
||||
PrintRoutesWithAnyAbove {
|
||||
threshold_ms: u128,
|
||||
},
|
||||
PrintStats,
|
||||
WriteStats {
|
||||
path: String,
|
||||
},
|
||||
WriteStatsAndPlot {
|
||||
path: String,
|
||||
outlier_mode: String, // "all" or cutoff() seconds (e.g. "1.0"))
|
||||
},
|
||||
PrintExperimentProgress,
|
||||
}
|
||||
|
||||
pub struct StoredRouteSummary {
|
||||
pub total_routes: usize,
|
||||
pub per_route_sent: usize,
|
||||
}
|
||||
|
||||
static PRODUCER: OnceCell<Mutex<Option<Sender<RttEvent>>>> = OnceCell::new();
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct RouteStats {
|
||||
pub sent: u32,
|
||||
pub acks: u32,
|
||||
pub timeouts: u32,
|
||||
pub rtts: Vec<u128>,
|
||||
}
|
||||
|
||||
pub struct RttAnalyzer {
|
||||
/// fragment_id → (route, Vec<sent_times>)
|
||||
fragments: HashMap<String, (usize, Vec<u128>)>,
|
||||
|
||||
/// fragment_id → Vec<recv_times>
|
||||
receive_times: HashMap<String, Vec<u128>>,
|
||||
|
||||
/// fragment_id → last ack
|
||||
ack_times: HashMap<String, u128>,
|
||||
|
||||
route_stats: HashMap<usize, RouteStats>,
|
||||
route_summary: Option<StoredRouteSummary>,
|
||||
route_nodes: HashMap<usize, String>,
|
||||
consumer_handle: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl RttAnalyzer {
|
||||
pub fn consumer_handle(&self) -> &JoinHandle<()> {
|
||||
&self.consumer_handle
|
||||
}
|
||||
|
||||
pub fn new() -> Self {
|
||||
let (tx, mut rx) = mpsc::channel(80000);
|
||||
|
||||
PRODUCER
|
||||
.set(Mutex::new(Some(tx.clone())))
|
||||
.expect("PRODUCER already initialized");
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
let mut analyzer = RttAnalyzer {
|
||||
fragments: HashMap::new(),
|
||||
receive_times: HashMap::new(),
|
||||
ack_times: HashMap::new(),
|
||||
route_stats: HashMap::new(),
|
||||
route_summary: None,
|
||||
route_nodes: HashMap::new(),
|
||||
consumer_handle: tokio::spawn(async {}),
|
||||
};
|
||||
|
||||
while let Some(event) = rx.recv().await {
|
||||
analyzer.process(event);
|
||||
}
|
||||
|
||||
println!("RTT Analyzer consumer exited");
|
||||
});
|
||||
|
||||
Self {
|
||||
fragments: HashMap::new(),
|
||||
receive_times: HashMap::new(),
|
||||
ack_times: HashMap::new(),
|
||||
route_stats: HashMap::new(),
|
||||
route_summary: None,
|
||||
route_nodes: HashMap::new(),
|
||||
consumer_handle: handle,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn producer() -> Option<Sender<RttEvent>> {
|
||||
let lock = PRODUCER.get()?.lock().unwrap();
|
||||
lock.clone()
|
||||
}
|
||||
|
||||
pub fn process(&mut self, event: RttEvent) {
|
||||
match event {
|
||||
// -------------------------
|
||||
// FIRST USE OF A FRAGMENT
|
||||
// -------------------------
|
||||
RttEvent::RouteUsed {
|
||||
route_index,
|
||||
fragment_id,
|
||||
} => {
|
||||
self.fragments
|
||||
.insert(fragment_id.clone(), (route_index, Vec::new()));
|
||||
}
|
||||
|
||||
// -------------------------
|
||||
// RETRANSMISSION → append new sent time
|
||||
// -------------------------
|
||||
RttEvent::FragmentSent {
|
||||
fragment_id,
|
||||
timestamp,
|
||||
} => {
|
||||
if let Some((_route, sent_list)) = self.fragments.get_mut(&fragment_id) {
|
||||
sent_list.push(timestamp);
|
||||
self.route_stats.entry(*_route).or_default().sent += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------
|
||||
// ACK RECEIVED
|
||||
// -------------------------
|
||||
RttEvent::FragmentAckReceived {
|
||||
fragment_id,
|
||||
timestamp,
|
||||
} => {
|
||||
if let Some((route, _)) = self.fragments.get(&fragment_id) {
|
||||
let stats = self.route_stats.entry(*route).or_default();
|
||||
stats.acks += 1;
|
||||
}
|
||||
self.ack_times.insert(fragment_id, timestamp);
|
||||
}
|
||||
|
||||
// -------------------------
|
||||
// ACK TIMEOUT
|
||||
// -------------------------
|
||||
RttEvent::FragmentAckExpired { fragment_id, .. } => {
|
||||
if let Some((route, _)) = self.fragments.get(&fragment_id) {
|
||||
self.route_stats.entry(*route).or_default().timeouts += 1;
|
||||
}
|
||||
}
|
||||
RttEvent::WriteStatsAndPlot { path, outlier_mode } => {
|
||||
// 1) write the csv
|
||||
if let Err(e) = self.write_csv(&path) {
|
||||
eprintln!("Failed to write CSV: {}", e);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2) Call the Python script
|
||||
if let Err(e) = Self::run_histogram_script(&path, &outlier_mode) {
|
||||
eprintln!("Failed to run histogram script: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------
|
||||
// PACKET RECEIVED → compute RTTs
|
||||
// -------------------------
|
||||
RttEvent::FragmentReceived {
|
||||
fragment_id,
|
||||
timestamp,
|
||||
} => {
|
||||
// Append receive time
|
||||
let recv_list = self.receive_times.entry(fragment_id.clone()).or_default();
|
||||
recv_list.push(timestamp);
|
||||
|
||||
// Lookup route + sent times
|
||||
if let Some((route, sent_list)) = self.fragments.get(&fragment_id) {
|
||||
let recv_list = self.receive_times.get(&fragment_id).unwrap();
|
||||
|
||||
// Index of the *newly added* receive time
|
||||
let idx = recv_list.len() - 1;
|
||||
/*
|
||||
Maybe we can put a retransmission flag and not counting the new RTTs and only the basic N?
|
||||
*/
|
||||
//println!("Fragment id: {} Sent list length: {} Recv list length:{}",fragment_id,sent_list.len(),recv_list.len());
|
||||
|
||||
// Check if we have a matching sent timestamp
|
||||
if idx < sent_list.len() {
|
||||
let sent_ts = sent_list[idx];
|
||||
let rtt = recv_list[idx] - sent_ts;
|
||||
|
||||
self.route_stats.entry(*route).or_default().rtts.push(rtt);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RttEvent::RouteNodes { route_index, nodes } => {
|
||||
self.route_nodes.insert(route_index, nodes);
|
||||
}
|
||||
|
||||
RttEvent::PrintStats => self.print_stats(),
|
||||
|
||||
RttEvent::WriteStats { path } => {
|
||||
if let Err(e) = self.write_csv(&path) {
|
||||
eprintln!("Failed to write CSV: {}", e)
|
||||
}
|
||||
}
|
||||
|
||||
RttEvent::ExperimentConfiguration {
|
||||
total_routes,
|
||||
per_route_sent,
|
||||
} => {
|
||||
self.route_summary = Some(StoredRouteSummary {
|
||||
total_routes,
|
||||
per_route_sent,
|
||||
});
|
||||
}
|
||||
RttEvent::PrintExperimentProgress => {
|
||||
self.print_experiment_progress();
|
||||
}
|
||||
|
||||
RttEvent::PrintRouteDetail { route_index } => {
|
||||
self.print_route_detail(route_index);
|
||||
}
|
||||
|
||||
RttEvent::PrintRoutesWithAvgAbove { threshold_ms } => {
|
||||
self.print_routes_with_avg_above(threshold_ms);
|
||||
}
|
||||
|
||||
RttEvent::PrintRoutesWithAnyAbove { threshold_ms } => {
|
||||
self.print_routes_with_any_above(threshold_ms);
|
||||
}
|
||||
|
||||
RttEvent::PrintRouteStatsByNodes { nodes } => {
|
||||
self.print_route_by_nodes(nodes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------- PRINT FUNCTIONS (unchanged) ----------------------
|
||||
pub fn print_stats(&self) {
|
||||
println!("\n================ Route RTT Statistics ================");
|
||||
for (route, stats) in self.route_stats.iter() {
|
||||
let avg_rtt = if !stats.rtts.is_empty() {
|
||||
stats.rtts.iter().sum::<u128>() as f64 / stats.rtts.len() as f64
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
|
||||
println!(
|
||||
"Route {:5} | Sent {:4} | ACKs {:4} | Timeouts {:4} | Avg RTT {:8.2}",
|
||||
route, stats.sent, stats.acks, stats.timeouts, avg_rtt
|
||||
);
|
||||
}
|
||||
println!("======================================================\n");
|
||||
}
|
||||
|
||||
pub fn write_csv(&self, path: &str) -> std::io::Result<()> {
|
||||
let mut writer = BufWriter::new(File::create(path)?);
|
||||
|
||||
writer.write_all(b"route,sent,acks,timeouts,avg_rtt\n")?;
|
||||
|
||||
for (route, stats) in &self.route_stats {
|
||||
let avg_rtt = if !stats.rtts.is_empty() {
|
||||
stats.rtts.iter().sum::<u128>() as f64 / stats.rtts.len() as f64
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
|
||||
writer.write_all(
|
||||
format!(
|
||||
"{},{},{},{},{:.2}\n",
|
||||
route, stats.sent, stats.acks, stats.timeouts, avg_rtt
|
||||
)
|
||||
.as_bytes(),
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn print_route_detail(&self, route_index: usize) {
|
||||
println!(
|
||||
"\n================ Route #{} Details ================\n",
|
||||
route_index
|
||||
);
|
||||
|
||||
if let Some(nodes) = self.route_nodes.get(&route_index) {
|
||||
println!(" Route Nodes:");
|
||||
for (i, node) in nodes.split(" > ").enumerate() {
|
||||
println!(" • Node {}: {}", i + 1, node);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(stats) = self.route_stats.get(&route_index) {
|
||||
println!("\n RTT Values:");
|
||||
for (i, rtt) in stats.rtts.iter().enumerate() {
|
||||
println!(" [{:3}] {} ms", i, rtt);
|
||||
}
|
||||
}
|
||||
|
||||
println!("======================================================\n");
|
||||
}
|
||||
fn run_histogram_script(csv_path: &str, outlier_mode: &str) -> std::io::Result<()> {
|
||||
let status = Command::new("python")
|
||||
.arg("rtt_histogram.py") // path του script
|
||||
.arg(csv_path)
|
||||
.arg(outlier_mode)
|
||||
.status()?;
|
||||
|
||||
if !status.success() {
|
||||
eprintln!("Python histogram script exited with status: {}", status);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn print_routes_with_avg_above(&self, threshold_ms: u128) {
|
||||
println!(
|
||||
"\n======= Routes with AVG RTT > {} ms =======\n",
|
||||
threshold_ms
|
||||
);
|
||||
|
||||
let mut matches: Vec<usize> = self
|
||||
.route_stats
|
||||
.iter()
|
||||
.filter_map(|(route, stats)| {
|
||||
if stats.rtts.is_empty() {
|
||||
return None;
|
||||
}
|
||||
let avg = stats.rtts.iter().sum::<u128>() as f64 / stats.rtts.len() as f64;
|
||||
if (avg as u128) > threshold_ms {
|
||||
Some(*route)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
matches.sort();
|
||||
|
||||
for route in matches {
|
||||
self.print_route_detail(route);
|
||||
}
|
||||
|
||||
println!("====================================================\n");
|
||||
}
|
||||
/// Prints overall experiment completion percentage.
|
||||
///
|
||||
/// It uses:
|
||||
/// - self.route_summary.total_routes
|
||||
/// - self.route_summary.per_route_sent
|
||||
/// to compute how many packets were planned in total.
|
||||
///
|
||||
/// Then it sums, over all routes:
|
||||
/// - how many packets were actually sent (RouteStats.sent)
|
||||
/// - how many packets have a completed RTT sample (RouteStats.rtts.len())
|
||||
///
|
||||
/// Finally it prints:
|
||||
/// - total expected packets
|
||||
/// - total sent packets and percentage
|
||||
/// - total completed RTT packets and percentage
|
||||
pub fn print_experiment_progress(&self) {
|
||||
println!("\n=========== RTT Experiment Progress ===========");
|
||||
|
||||
// Check if experiment configuration is available
|
||||
let summary = match &self.route_summary {
|
||||
Some(s) => s,
|
||||
None => {
|
||||
println!("No experiment configuration stored (route_summary is None).");
|
||||
println!("You must send an ExperimentConfiguration event first.");
|
||||
println!("==============================================\n");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let total_routes = summary.total_routes;
|
||||
let per_route_sent = summary.per_route_sent;
|
||||
|
||||
// Total number of packets that were planned for the whole experiment
|
||||
let expected_total: usize = total_routes.saturating_mul(per_route_sent);
|
||||
|
||||
if expected_total == 0 {
|
||||
println!("Experiment configuration has zero expected packets.");
|
||||
println!("==============================================\n");
|
||||
return;
|
||||
}
|
||||
|
||||
// Sum how many packets were actually sent and how many have a measured RTT
|
||||
let mut sent_total: usize = 0;
|
||||
let mut received_total: usize = 0;
|
||||
|
||||
for (_route_idx, stats) in &self.route_stats {
|
||||
// 'sent' counts how many times we called FragmentSent for this route
|
||||
sent_total += std::cmp::min(stats.sent, per_route_sent as u32) as usize;
|
||||
|
||||
// Each RTT entry corresponds to one packet for which we have both send and receive time
|
||||
let route_recv = std::cmp::min(stats.rtts.len(), per_route_sent);
|
||||
received_total += route_recv;
|
||||
}
|
||||
|
||||
let sent_pct = (sent_total as f64 / expected_total as f64) * 100.0;
|
||||
let recv_pct = (received_total as f64 / expected_total as f64) * 100.0;
|
||||
|
||||
println!("Total routes configured : {}", total_routes);
|
||||
println!("Packets per route (planned) : {}", per_route_sent);
|
||||
println!("Total expected packets : {}", expected_total);
|
||||
println!("---------------------------------------------");
|
||||
println!(
|
||||
"Total sent packets : {} ({:.2}%)",
|
||||
sent_total, sent_pct
|
||||
);
|
||||
println!(
|
||||
"Total completed RTT packets : {} ({:.2}%)",
|
||||
received_total, recv_pct
|
||||
);
|
||||
println!("==============================================\n");
|
||||
}
|
||||
|
||||
pub fn print_routes_with_any_above(&self, threshold_ms: u128) {
|
||||
println!(
|
||||
"\n======= Routes with ANY RTT > {} ms =======\n",
|
||||
threshold_ms
|
||||
);
|
||||
|
||||
let mut matches: Vec<usize> = self
|
||||
.route_stats
|
||||
.iter()
|
||||
.filter_map(|(route, stats)| {
|
||||
if stats.rtts.iter().any(|&x| x > threshold_ms) {
|
||||
Some(*route)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
matches.sort();
|
||||
|
||||
for route in matches {
|
||||
self.print_route_detail(route);
|
||||
}
|
||||
|
||||
println!("====================================================\n");
|
||||
}
|
||||
|
||||
pub fn print_route_by_nodes(&self, nodes: String) {
|
||||
println!("\n========== Searching route by nodes ==========\n");
|
||||
|
||||
let mut routes: Vec<(usize, &String)> =
|
||||
self.route_nodes.iter().map(|(k, v)| (*k, v)).collect();
|
||||
routes.sort_by_key(|(idx, _)| *idx);
|
||||
|
||||
for (route, stored) in routes {
|
||||
if *stored == nodes {
|
||||
println!("Found route {}!", route);
|
||||
self.print_route_detail(route);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
println!("No route found with nodes: {}", nodes);
|
||||
println!("=============================================\n");
|
||||
}
|
||||
}
|
||||
@@ -89,6 +89,29 @@ pub trait FragmentPreparer {
|
||||
)
|
||||
}
|
||||
|
||||
fn generate_surb_ack_with_0_delays(
|
||||
&mut self,
|
||||
recipient: &Recipient,
|
||||
fragment_id: FragmentIdentifier,
|
||||
topology: &NymRouteProvider,
|
||||
ack_key: &AckKey,
|
||||
packet_type: PacketType,
|
||||
) -> Result<SurbAck, NymTopologyError> {
|
||||
let use_legacy_sphinx_format = self.use_legacy_sphinx_format();
|
||||
let disable_mix_hops = self.mix_hops_disabled();
|
||||
|
||||
SurbAck::construct(
|
||||
self.rng(),
|
||||
use_legacy_sphinx_format,
|
||||
recipient,
|
||||
ack_key,
|
||||
fragment_id.to_bytes(),
|
||||
Duration::ZERO,
|
||||
topology,
|
||||
packet_type,
|
||||
disable_mix_hops,
|
||||
)
|
||||
}
|
||||
/// The procedure is as follows:
|
||||
/// For each fragment:
|
||||
/// - compute SURB_ACK
|
||||
@@ -288,6 +311,114 @@ pub trait FragmentPreparer {
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn prepare_chunk_with_deterministic_route_for_sending_and_rtt_test(
|
||||
&mut self,
|
||||
|
||||
fragment: Fragment,
|
||||
|
||||
topology: &NymRouteProvider, // needs to be mutable because it may auto-generate routes
|
||||
|
||||
ack_key: &AckKey,
|
||||
|
||||
packet_sender: &Recipient,
|
||||
|
||||
packet_recipient: &Recipient,
|
||||
|
||||
packet_type: PacketType,
|
||||
|
||||
route_index: usize, // NEW ARGUMENT: select which route to use
|
||||
) -> Result<PreparedFragment, NymTopologyError> {
|
||||
debug!(
|
||||
"Preparing chunk for sending (deterministic route index = {})",
|
||||
route_index
|
||||
);
|
||||
|
||||
let destination = packet_recipient.gateway();
|
||||
|
||||
monitoring::fragment_sent(&fragment, self.nonce(), destination);
|
||||
|
||||
let non_reply_overhead = x25519::PUBLIC_KEY_SIZE;
|
||||
|
||||
let expected_plaintext = match packet_type {
|
||||
PacketType::Outfox => {
|
||||
fragment.serialized_size() + OUTFOX_ACK_OVERHEAD + non_reply_overhead
|
||||
}
|
||||
|
||||
_ => fragment.serialized_size() + ACK_OVERHEAD + non_reply_overhead,
|
||||
};
|
||||
|
||||
let packet_size = PacketSize::get_type_from_plaintext(expected_plaintext, packet_type)
|
||||
.expect("the message has been incorrectly fragmented");
|
||||
|
||||
let rotation_id = topology.current_key_rotation();
|
||||
|
||||
let sphinx_key_rotation = SphinxKeyRotation::from(rotation_id);
|
||||
|
||||
let fragment_identifier = fragment.fragment_identifier();
|
||||
|
||||
// create an ack
|
||||
|
||||
let surb_ack = self.generate_surb_ack_with_0_delays(
|
||||
packet_sender,
|
||||
fragment_identifier,
|
||||
topology,
|
||||
ack_key,
|
||||
packet_type,
|
||||
)?;
|
||||
|
||||
let ack_delay = surb_ack.expected_total_delay();
|
||||
|
||||
// build the payload
|
||||
|
||||
let packet_payload = NymPayloadBuilder::new(fragment, surb_ack)
|
||||
.build_regular(self.rng(), packet_recipient.encryption_key())
|
||||
.map_err(|_| NymTopologyError::PayloadBuilder)?;
|
||||
|
||||
// Get the deterministic route by index
|
||||
|
||||
trace!("Selecting deterministic route index {}", route_index);
|
||||
|
||||
let route = topology.deterministic_route_to_egress(route_index, destination)?;
|
||||
|
||||
let destination = packet_recipient.as_sphinx_destination();
|
||||
|
||||
// No artificial delay for RTT test
|
||||
|
||||
let delays = nym_sphinx_routing::generate_hop_delays(Duration::ZERO, route.len());
|
||||
|
||||
// build the actual Sphinx packet
|
||||
|
||||
let packet = match packet_type {
|
||||
PacketType::Outfox => NymPacket::outfox_build(
|
||||
packet_payload,
|
||||
route.as_slice(),
|
||||
&destination,
|
||||
Some(packet_size.plaintext_size()),
|
||||
)?,
|
||||
|
||||
PacketType::Mix => NymPacket::sphinx_build(
|
||||
self.use_legacy_sphinx_format(),
|
||||
packet_size.payload_size(),
|
||||
packet_payload,
|
||||
&route,
|
||||
&destination,
|
||||
&delays,
|
||||
)?,
|
||||
};
|
||||
|
||||
let first_hop_address =
|
||||
NymNodeRoutingAddress::try_from(route.first().unwrap().address).unwrap();
|
||||
|
||||
Ok(PreparedFragment {
|
||||
total_delay: delays.iter().take(delays.len() - 1).sum::<Delay>() + ack_delay,
|
||||
|
||||
mix_packet: MixPacket::new(first_hop_address, packet, packet_type, sphinx_key_rotation),
|
||||
|
||||
fragment_identifier,
|
||||
})
|
||||
}
|
||||
|
||||
fn pad_and_split_message(
|
||||
&mut self,
|
||||
message: NymMessage,
|
||||
@@ -442,6 +573,28 @@ where
|
||||
)
|
||||
}
|
||||
|
||||
pub fn prepare_chunk_for_sending_with_deterministic_route(
|
||||
&mut self,
|
||||
fragment: Fragment,
|
||||
topology: &NymRouteProvider,
|
||||
ack_key: &AckKey,
|
||||
packet_recipient: &Recipient,
|
||||
packet_type: PacketType,
|
||||
route_index: usize,
|
||||
) -> Result<PreparedFragment, NymTopologyError> {
|
||||
let sender = self.sender_address;
|
||||
|
||||
<Self as FragmentPreparer>::prepare_chunk_with_deterministic_route_for_sending_and_rtt_test(
|
||||
self,
|
||||
fragment,
|
||||
topology,
|
||||
ack_key,
|
||||
&sender,
|
||||
packet_recipient,
|
||||
packet_type,
|
||||
route_index,
|
||||
)
|
||||
}
|
||||
/// Construct an acknowledgement SURB for the given [`FragmentIdentifier`]
|
||||
pub fn generate_surb_ack(
|
||||
&mut self,
|
||||
|
||||
@@ -21,7 +21,6 @@ pub use error::NymTopologyError;
|
||||
pub use nym_mixnet_contract_common::nym_node::Role;
|
||||
pub use nym_mixnet_contract_common::{EpochRewardedSet, NodeId};
|
||||
pub use rewarded_set::CachedEpochRewardedSet;
|
||||
|
||||
pub mod error;
|
||||
pub mod node;
|
||||
pub mod rewarded_set;
|
||||
@@ -135,7 +134,8 @@ pub struct NymTopology {
|
||||
// while this is not ideal, use empty values as default to not break backwards compatibility
|
||||
#[serde(default)]
|
||||
metadata: NymTopologyMetadata,
|
||||
|
||||
#[serde(default)]
|
||||
pub all_mix_routes: Vec<Vec<RoutingNode>>,
|
||||
// for the purposes of future VRF, everyone will need the same view of the network, regardless of performance filtering
|
||||
// so we use the same 'master' rewarded set information for that
|
||||
//
|
||||
@@ -231,6 +231,30 @@ impl NymRouteProvider {
|
||||
.random_route_to_egress(rng, egress_identity, self.ignore_egress_epoch_roles)
|
||||
}
|
||||
|
||||
/// Selects a deterministic route to the egress, using the i-th precomputed mixnode route.
|
||||
/// This requires that [`generate_all_mix_routes()`] has already been called.
|
||||
pub fn deterministic_route_to_egress(
|
||||
&self,
|
||||
route_index: usize,
|
||||
egress_identity: NodeIdentity,
|
||||
) -> Result<Vec<SphinxNode>, NymTopologyError> {
|
||||
if self.topology.all_mix_routes.is_empty() {
|
||||
return Err(NymTopologyError::EmptyNetworkTopology);
|
||||
}
|
||||
|
||||
let Some(existing_route) = self.topology.all_mix_routes.get(route_index) else {
|
||||
return Err(NymTopologyError::NoMixnodesAvailable);
|
||||
};
|
||||
let mut route: Vec<SphinxNode> = existing_route.iter().map(Into::into).collect();
|
||||
|
||||
// add egress node
|
||||
let egress = self
|
||||
.topology
|
||||
.egress_node_by_identity(egress_identity, self.ignore_egress_epoch_roles)?;
|
||||
route.push(egress);
|
||||
|
||||
Ok(route)
|
||||
}
|
||||
/// Returns a route directly to the egress point, which can be any known node
|
||||
pub fn empty_route_to_egress(
|
||||
&self,
|
||||
@@ -262,6 +286,7 @@ impl NymTopology {
|
||||
metadata: NymTopologyMetadata::default(),
|
||||
rewarded_set: rewarded_set.into(),
|
||||
node_details: Default::default(),
|
||||
all_mix_routes: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -272,6 +297,7 @@ impl NymTopology {
|
||||
) -> Self {
|
||||
NymTopology {
|
||||
metadata,
|
||||
all_mix_routes: Vec::new(),
|
||||
rewarded_set: rewarded_set.into(),
|
||||
node_details: node_details.into_iter().map(|n| (n.node_id, n)).collect(),
|
||||
}
|
||||
@@ -531,6 +557,65 @@ impl NymTopology {
|
||||
Ok(mix_route)
|
||||
}
|
||||
|
||||
pub fn initialize_static_mixnodes_for_rtt_testing(
|
||||
&mut self,
|
||||
) -> Result<Vec<(usize, String)>, NymTopologyError> {
|
||||
if self.rewarded_set.is_empty() || self.node_details.is_empty() {
|
||||
return Err(NymTopologyError::EmptyNetworkTopology);
|
||||
}
|
||||
|
||||
// Collect nodes for each layer
|
||||
let layer1_nodes: Vec<&RoutingNode> = self
|
||||
.rewarded_set
|
||||
.layer1
|
||||
.iter()
|
||||
.filter_map(|id| self.node_details.get(id))
|
||||
.collect();
|
||||
|
||||
let layer2_nodes: Vec<&RoutingNode> = self
|
||||
.rewarded_set
|
||||
.layer2
|
||||
.iter()
|
||||
.filter_map(|id| self.node_details.get(id))
|
||||
.collect();
|
||||
|
||||
let layer3_nodes: Vec<&RoutingNode> = self
|
||||
.rewarded_set
|
||||
.layer3
|
||||
.iter()
|
||||
.filter_map(|id| self.node_details.get(id))
|
||||
.collect();
|
||||
|
||||
// Reset routes
|
||||
self.all_mix_routes.clear();
|
||||
|
||||
// Build mix routes
|
||||
for n1 in layer1_nodes.clone() {
|
||||
for n2 in layer2_nodes.clone() {
|
||||
for n3 in layer3_nodes.clone() {
|
||||
self.all_mix_routes
|
||||
.push(vec![n1.clone(), n2.clone(), n3.clone()]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// RETURN route_index + string
|
||||
// ============================================================
|
||||
let mut results: Vec<(usize, String)> = Vec::new();
|
||||
|
||||
for (i, route) in self.all_mix_routes.iter().enumerate() {
|
||||
let node_strings: Vec<String> = route
|
||||
.iter()
|
||||
.map(|node| node.identity_key.to_base58_string())
|
||||
.collect();
|
||||
|
||||
results.push((i, node_strings.join(" > ")));
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
pub fn random_mix_route<R>(&self, rng: &mut R) -> Result<Vec<SphinxNode>, NymTopologyError>
|
||||
where
|
||||
R: Rng + CryptoRng + ?Sized,
|
||||
|
||||
@@ -0,0 +1,202 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
rtt_histogram.py
|
||||
|
||||
Usage:
|
||||
python rtt_histogram.py <csv_path> <outlier_mode>
|
||||
|
||||
Examples:
|
||||
python rtt_histogram.py rtt_stats.csv all
|
||||
-> one histogram with ALL avg_rtt values
|
||||
|
||||
python rtt_histogram.py rtt_stats.csv 1.0
|
||||
-> one histogram only for avg_rtt <= 1.0s (filters out values above 1 second)
|
||||
|
||||
python rtt_histogram.py rtt_stats.csv both:1.0
|
||||
-> TWO histograms:
|
||||
1) inliers: avg_rtt <= 1.0s
|
||||
2) outliers: avg_rtt > 1.0s
|
||||
"""
|
||||
|
||||
import csv
|
||||
import sys
|
||||
import statistics
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
|
||||
def load_rtt_values(csv_path: str) -> list[float]:
|
||||
"""
|
||||
Read the CSV file and return a list of RTT values in milliseconds (float).
|
||||
|
||||
It expects a column named 'avg_rtt' (as written by write_csv).
|
||||
If 'avg_rtt' is not found, it falls back to 'rtt_ms'.
|
||||
"""
|
||||
values_ms: list[float] = []
|
||||
|
||||
with open(csv_path, newline="", encoding="utf-8") as f:
|
||||
reader = csv.DictReader(f)
|
||||
fieldnames = [name.strip() for name in (reader.fieldnames or [])]
|
||||
|
||||
if "avg_rtt" in fieldnames:
|
||||
col = "avg_rtt"
|
||||
elif "rtt_ms" in fieldnames:
|
||||
col = "rtt_ms"
|
||||
else:
|
||||
raise RuntimeError(
|
||||
f"CSV '{csv_path}' does not contain 'avg_rtt' or 'rtt_ms' column. "
|
||||
f"Found columns: {fieldnames}"
|
||||
)
|
||||
|
||||
for row in reader:
|
||||
raw = row.get(col, "").strip()
|
||||
if not raw:
|
||||
continue
|
||||
try:
|
||||
values_ms.append(float(raw))
|
||||
except ValueError:
|
||||
# Ignore rows where the RTT column is not a valid number
|
||||
continue
|
||||
|
||||
return values_ms
|
||||
|
||||
|
||||
def plot_hist(values_sec, title_suffix: str, output_suffix: str | None = None):
|
||||
"""
|
||||
Plot a single histogram for the given RTT values (in seconds).
|
||||
|
||||
If output_suffix is provided, it can be used to build a filename
|
||||
with plt.savefig, if you want. Right now we only show the figure.
|
||||
"""
|
||||
if not values_sec:
|
||||
print(f"No RTT values for plot '{title_suffix}', skipping.")
|
||||
return
|
||||
|
||||
median_val = statistics.median(values_sec)
|
||||
|
||||
plt.figure(figsize=(8, 6))
|
||||
plt.hist(values_sec, bins=30, edgecolor="black")
|
||||
plt.axvline(
|
||||
median_val,
|
||||
color="red",
|
||||
linestyle="--",
|
||||
linewidth=2,
|
||||
label=f"Median: {median_val:.2f}s",
|
||||
)
|
||||
|
||||
plt.title(f"Distribution of RTTs {title_suffix}")
|
||||
plt.xlabel("RTT (seconds)")
|
||||
plt.ylabel("Frequency")
|
||||
plt.legend()
|
||||
plt.tight_layout()
|
||||
# If you want to save instead of show, uncomment the following
|
||||
# if output_suffix is not None:
|
||||
# filename = f"rtt_hist_{output_suffix}.png"
|
||||
# plt.savefig(filename)
|
||||
# print(f"Saved plot to {filename}")
|
||||
# else:
|
||||
# plt.show()
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) != 3:
|
||||
print("Usage: python rtt_histogram.py <csv_path> <outlier_mode>", file=sys.stderr)
|
||||
print(" outlier_mode:", file=sys.stderr)
|
||||
print(" 'all' -> one plot with all RTTs", file=sys.stderr)
|
||||
print(" '<cutoff>' -> one plot with RTT <= cutoff seconds", file=sys.stderr)
|
||||
print(" 'both:<cutoff>' -> two plots: inliers & outliers around cutoff", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
csv_path = sys.argv[1]
|
||||
outlier_mode = sys.argv[2].strip().lower()
|
||||
|
||||
# 1) Load avg_rtt values (ms)
|
||||
try:
|
||||
rtt_ms = load_rtt_values(csv_path)
|
||||
except Exception as e:
|
||||
print(f"Error reading CSV: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if not rtt_ms:
|
||||
print("No RTT values found in CSV – nothing to plot.")
|
||||
sys.exit(0)
|
||||
|
||||
# 2) Convert to seconds
|
||||
rtt_sec = [v / 1000.0 for v in rtt_ms]
|
||||
|
||||
# --------------------------------------------------------------------
|
||||
# MODE 1: both:<cutoff> → generate TWO plots (inliers & outliers)
|
||||
# --------------------------------------------------------------------
|
||||
if outlier_mode.startswith("both:"):
|
||||
cutoff_str = outlier_mode.split(":", 1)[1]
|
||||
try:
|
||||
cutoff = float(cutoff_str)
|
||||
except ValueError:
|
||||
print(
|
||||
f"Invalid outlier_mode '{outlier_mode}'. "
|
||||
f"Expected format 'both:<numeric_cutoff>', e.g. 'both:1.0'.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
inliers = [v for v in rtt_sec if v <= cutoff]
|
||||
outliers = [v for v in rtt_sec if v > cutoff]
|
||||
|
||||
if not inliers and not outliers:
|
||||
print("No RTT values found for inliers or outliers – nothing to plot.")
|
||||
sys.exit(0)
|
||||
|
||||
print(f"Total RTT samples: {len(rtt_sec)}")
|
||||
print(f"Inliers (<= {cutoff:.3f}s): {len(inliers)}")
|
||||
print(f"Outliers (> {cutoff:.3f}s): {len(outliers)}")
|
||||
|
||||
# Plot inliers
|
||||
plot_hist(inliers, title_suffix=f"(inliers ≤ {cutoff:.2f}s)", output_suffix="inliers")
|
||||
# Plot outliers
|
||||
plot_hist(outliers, title_suffix=f"(outliers > {cutoff:.2f}s)", output_suffix="outliers")
|
||||
|
||||
# Show all open figures
|
||||
plt.show()
|
||||
sys.exit(0)
|
||||
|
||||
# --------------------------------------------------------------------
|
||||
# MODE 2: all → single plot with all RTT values
|
||||
# --------------------------------------------------------------------
|
||||
if outlier_mode == "all":
|
||||
if not rtt_sec:
|
||||
print("No RTT values to plot.")
|
||||
sys.exit(0)
|
||||
|
||||
plot_hist(rtt_sec, title_suffix="(all samples)", output_suffix=None)
|
||||
plt.show()
|
||||
sys.exit(0)
|
||||
|
||||
# --------------------------------------------------------------------
|
||||
# MODE 3: numeric cutoff → single plot with inliers only
|
||||
# --------------------------------------------------------------------
|
||||
try:
|
||||
cutoff = float(outlier_mode)
|
||||
except ValueError:
|
||||
print(
|
||||
f"Invalid outlier_mode '{outlier_mode}'. "
|
||||
f"Use 'all', a numeric cutoff (e.g. '1.0'), or 'both:<cutoff>'.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
filtered = [v for v in rtt_sec if v <= cutoff]
|
||||
|
||||
if not filtered:
|
||||
print(
|
||||
f"After filtering with cutoff={cutoff:.2f}s, no RTT values remain – nothing to plot."
|
||||
)
|
||||
sys.exit(0)
|
||||
|
||||
print(f"Total RTT samples: {len(rtt_sec)}")
|
||||
print(f"Filtered inliers (<= {cutoff:.3f}s): {len(filtered)}")
|
||||
|
||||
plot_hist(filtered, title_suffix=f"(≤ {cutoff:.2f}s)", output_suffix=None)
|
||||
plt.show()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -5,8 +5,10 @@ use crate::mixnet::{AnonymousSenderTag, IncludedSurbs, Recipient};
|
||||
use crate::Result;
|
||||
use async_trait::async_trait;
|
||||
use nym_client_core::client::inbound_messages::InputMessage;
|
||||
use nym_client_core::client::rtt_analyzer::{RttConfig, RttEvent};
|
||||
use nym_sphinx::params::PacketType;
|
||||
use nym_task::connections::TransmissionLane;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
||||
// defined to guarantee common interface regardless of whether you're using the full client
|
||||
// or just the sending handler
|
||||
@@ -87,6 +89,34 @@ pub trait MixnetMessageSender {
|
||||
};
|
||||
self.send(input_msg).await
|
||||
}
|
||||
/// Sends a RunRTTTest message to the supplied Nym address.
|
||||
///
|
||||
/// This is a special message used for measuring per-route RTT.
|
||||
/// It will instruct the client to run a test that sends one message
|
||||
/// per available route and logs the time of each send.
|
||||
async fn send_rtt_test(
|
||||
&self,
|
||||
address: Recipient,
|
||||
max_retransmissions: Option<u32>,
|
||||
sender: Sender<RttEvent>,
|
||||
config: RttConfig,
|
||||
) -> Result<()>
|
||||
where {
|
||||
let lane = TransmissionLane::General;
|
||||
//Is there a way to find my address from here?
|
||||
// Construct a RunRTTTest message
|
||||
let input_msg = InputMessage::RunRTTTest {
|
||||
recipient: address,
|
||||
lane,
|
||||
max_retransmissions,
|
||||
sender,
|
||||
config,
|
||||
};
|
||||
println!("[RTT TEST DEBUG] Sending RTT test message to {})", address,);
|
||||
|
||||
// Send it for processing
|
||||
self.send(input_msg).await
|
||||
}
|
||||
|
||||
/// Sends reply data to the supplied anonymous recipient.
|
||||
///
|
||||
|
||||
Reference in New Issue
Block a user