New Network Monitor (#4610)

* Initial commit

* Cherry pick from develop

* Keep track of fragments

* A bunch of data formats, graphs

* Use mix_id for display

* Proper API routes

* Add openapi + swagger ui

* Update locustfile

* Add node stats endpoint

* Add Swagger and locust to readme

* All node stats endpoint

* Update dependencies to use workspace

* Bunch of pedantic fixes

* More version updates, fmt

* More lints

* Add new_from_env for NymTopology

* Nym API endpoint to submit monitoring results (#4616)

* Nym API endpoint to submit monitoring results

* Add gateway monitoring results

* Cleanup, ergonomics

* Weaponize

* Finalize results submissions

* Monitor message signing and verification

* Update README

* Axum graceful shutdown

* More grtacefulness

* Restructure result submission

* Less fragile routes

* Remove gateway unique index on node_id
This commit is contained in:
Drazen Urch
2024-08-22 10:29:36 +01:00
committed by GitHub
parent 7c1fca8ce4
commit 1ac262ec90
50 changed files with 1913 additions and 92 deletions
+39
View File
@@ -0,0 +1,39 @@
[package]
name = "nym-network-monitor"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = { workspace = true }
axum = { workspace = true, features = ["json"] }
clap = { workspace = true, features = ["derive"] }
dashmap = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
petgraph = "0.6.5"
rand = { workspace = true }
rand_chacha = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["macros", "time"] }
tokio-util = { workspace = true }
utoipa = { workspace = true, features = ["axum_extras"] }
utoipa-swagger-ui = { workspace = true, features = ["axum"] }
# internal
nym-bin-common = { path = "../common/bin-common" }
nym-crypto = { path = "../common/crypto" }
nym-network-defaults = { path = "../common/network-defaults" }
nym-sdk = { path = "../sdk/rust/nym-sdk" }
nym-sphinx = { path = "../common/nymsphinx" }
nym-topology = { path = "../common/topology" }
nym-types = { path = "../common/types" }
nym-validator-client = { path = "../common/client-libs/validator-client" }
+47
View File
@@ -0,0 +1,47 @@
# Nym Network Monitor
Monitors the Nym network by sending itself packages across the mixnet.
Network monitor is running two tokio tasks, one manages mixnet clients and another manages monitoring itself. Monitor is designed to be driven externally, via an HTTP api. This means that it does not do any monitoring unless driven by something like [`locust`](https://locust.io/). This allows us to tailor the load externally, potentially distributing it across multiple monitors.
### Client manager
On start network monitor will spawn `C` clients, with 10 being the default. Random client is dropped every `T`, defaults to 60 seconds, and a new one is created. Clients chose a random gateway to connect to the mixnet. Meaning that on average all gateways will be tested in `NUMBER_OF_GATEWAYS/N*T`, assuming at least one request per client per T.
### Network monitor API
Swagger UI is available at `/v1/ui/`, ie `http://localhost:8080/v1/ui/`
### Driving the monitor with Locust
+ Head over to https://locust.io/ and get `locust`
+ Start everything
```bash
# Start the network monitor
cargo run --release
# Start locus in a separate terminal
python -m locust -H http://127.0.0.1:8080 --processes 4
```
+ Head over to http://127.0.0.1:8089/ and start a testing run
## Usage
```bash
Usage: nym-network-monitor [OPTIONS]
Options:
-C, --clients <N_CLIENTS> Number of clients to spawn [default: 10]
-T, --client-lifetime <CLIENT_LIFETIME> Lifetime of each client in seconds [default: 60]
--port <PORT> Port to listen on [default: 8080]
--host <HOST> Host to listen on [default: 127.0.0.1]
-t, --topology <TOPOLOGY> Path to the topology file
-e, --env <ENV> Path to the environment file
-m, --mixnet-timeout <MIXNET_TIMEOUT> [default: 10]
--generate-key-pair
--private-key <PRIVATE_KEY>
-h, --help Print help
-V, --version Print version
```
+21
View File
@@ -0,0 +1,21 @@
#!/bin/bash
# Takes timeout in seconds as the first argument, defaults to 60
# Takes number of users as the second argument, defaults to 10
set -ex
users=${2:-10}
timeout=${1:-600}
RUST_LOG=info nym-network-monitor --env mainnet.env --host 127.0.0.1 --port 8080 &
nnm_pid=$!
sleep 10
python -m locust -H http://127.0.0.1:8080 --processes 4 --autostart --autoquit 60 -u "$users" -t "$timeout"s &
locust_pid=$!
wait $locust_pid
kill -2 $nnm_pid
exit $?
+7
View File
@@ -0,0 +1,7 @@
from locust import HttpUser, task
class SendMsg(HttpUser):
@task
def hello_world(self):
self.client.post("/v1/send")
+382
View File
@@ -0,0 +1,382 @@
use std::collections::{HashMap, HashSet};
use anyhow::Result;
use futures::{stream::FuturesUnordered, StreamExt};
use log::{debug, info};
use nym_sphinx::chunking::{SentFragment, FRAGMENTS_RECEIVED, FRAGMENTS_SENT};
use nym_topology::{gateway, mix, NymTopology};
use nym_types::monitoring::{MonitorMessage, NodeResult};
use nym_validator_client::nym_api::routes::{API_VERSION, STATUS, SUBMIT_GATEWAY, SUBMIT_NODE};
use rand::SeedableRng;
use rand_chacha::ChaCha8Rng;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use crate::{NYM_API_URL, PRIVATE_KEY, TOPOLOGY};
struct HydratedRoute {
mix_nodes: Vec<mix::Node>,
gateway_node: gateway::Node,
}
#[derive(Serialize, Deserialize, Debug, Default, ToSchema)]
struct GatewayStats(u32, u32, Option<String>);
impl GatewayStats {
fn new(sent: u32, recv: u32, owner: Option<String>) -> Self {
GatewayStats(sent, recv, owner)
}
fn success(&self) -> u32 {
self.0
}
fn failed(&self) -> u32 {
self.1
}
fn reliability(&self) -> f64 {
self.success() as f64 / (self.success() + self.failed()) as f64
}
fn incr_success(&mut self) {
self.0 += 1;
}
fn incr_failure(&mut self) {
self.1 += 1;
}
}
#[derive(Serialize, Deserialize, Debug, Default, ToSchema)]
pub struct NetworkAccount {
complete_fragment_sets: HashSet<i32>,
incomplete_fragment_sets: HashSet<i32>,
missing_fragments: HashMap<i32, Vec<u8>>,
complete_routes: Vec<Vec<u32>>,
gateway_stats: HashMap<String, GatewayStats>,
incomplete_routes: Vec<Vec<u32>>,
#[serde(skip)]
topology: NymTopology,
tested_nodes: HashSet<u32>,
#[serde(skip)]
mix_details: HashMap<u32, mix::Node>,
#[serde(skip)]
gateway_details: HashMap<String, gateway::Node>,
}
impl NetworkAccount {
pub fn tested_nodes(&self) -> &HashSet<u32> {
&self.tested_nodes
}
pub fn node_stats(&self, id: u32) -> NodeStats {
let complete_routes = self.complete_for_id(id);
let incomplete_routes = self.incomplete_for_id(id);
let node = self
.mix_details
.get(&id)
.expect("Has to be in here, since we've put it in!");
NodeStats::new(
id,
complete_routes,
incomplete_routes,
node.identity_key.to_base58_string(),
node.owner.clone(),
)
}
fn complete_for_id(&self, id: u32) -> usize {
self.complete_routes()
.iter()
.filter(|r| r.contains(&id))
.count()
}
fn incomplete_for_id(&self, id: u32) -> usize {
self.incomplete_routes()
.iter()
.filter(|r| r.contains(&id))
.count()
}
pub fn complete_routes(&self) -> &Vec<Vec<u32>> {
&self.complete_routes
}
pub fn incomplete_routes(&self) -> &Vec<Vec<u32>> {
&self.incomplete_routes
}
pub fn finalize() -> Result<Self> {
let mut account = NetworkAccount::new();
account.find_missing_fragments();
account.hydrate_all_fragments()?;
Ok(account)
}
pub fn empty_buffers() {
FRAGMENTS_SENT.clear();
FRAGMENTS_RECEIVED.clear();
}
fn new() -> Self {
let topology = TOPOLOGY.get().expect("Topology not set yet!").clone();
let mut account = NetworkAccount {
topology,
..Default::default()
};
for fragment_set in FRAGMENTS_SENT.iter() {
let sent_fragments = fragment_set
.value()
.first()
.map(|f| f.header().total_fragments())
.unwrap_or(0);
debug!(
"SENT Fragment set {} has {} fragments",
fragment_set.key(),
sent_fragments
);
let recv = FRAGMENTS_RECEIVED.get(fragment_set.key());
let recv_fragments = recv.as_ref().map(|r| r.value().len()).unwrap_or(0);
debug!(
"RECV Fragment set {} has {} fragments",
fragment_set.key(),
recv_fragments
);
// Due to retransmission we can recieve a fragment multiple times
if sent_fragments as usize <= recv_fragments {
account.push_complete(*fragment_set.key());
} else {
account.push_incomplete(*fragment_set.key());
}
}
account
}
fn hydrate_route(&self, fragment: SentFragment) -> anyhow::Result<HydratedRoute> {
let mut rng = ChaCha8Rng::seed_from_u64(fragment.seed() as u64);
let (nodes, gw) = self.topology.random_path_to_gateway(
&mut rng,
fragment.mixnet_params().hops(),
fragment.mixnet_params().destination(),
)?;
Ok(HydratedRoute {
mix_nodes: nodes,
gateway_node: gw,
})
}
fn hydrate_all_fragments(&mut self) -> Result<()> {
for fragment_set in FRAGMENTS_SENT.iter() {
let fragment_set_id = fragment_set.key();
for fragment in fragment_set.value() {
let route = self.hydrate_route(fragment.clone())?;
let mix_ids = route
.mix_nodes
.iter()
.map(|n| n.mix_id)
.collect::<Vec<u32>>();
self.tested_nodes.extend(&mix_ids);
self.mix_details
.extend(route.mix_nodes.iter().map(|n| (n.mix_id, n.clone())));
let gateway_stats_entry = self
.gateway_stats
.entry(route.gateway_node.identity_key.to_base58_string())
.or_insert(GatewayStats::new(0, 0, route.gateway_node.owner.clone()));
self.gateway_details.insert(
route.gateway_node.identity_key.to_base58_string(),
route.gateway_node,
);
if self.complete_fragment_sets.contains(fragment_set_id) {
self.complete_routes.push(mix_ids);
gateway_stats_entry.incr_success();
} else {
self.incomplete_routes.push(mix_ids);
gateway_stats_entry.incr_failure();
}
}
}
Ok(())
}
fn find_missing_fragments(&mut self) {
let mut missing_fragments_map = HashMap::new();
for fragment_set_id in &self.incomplete_fragment_sets {
if let Some(fragment_ref) = FRAGMENTS_RECEIVED.get(fragment_set_id) {
if let Some(ref_fragment) = fragment_ref.value().first() {
let ref_header = ref_fragment.header();
let ref_id_set = (0..ref_header.total_fragments()).collect::<HashSet<u8>>();
let recieved_set = fragment_ref
.value()
.iter()
.map(|f| f.header().current_fragment())
.collect::<HashSet<u8>>();
let missing_fragments = ref_id_set
.difference(&recieved_set)
.cloned()
.collect::<Vec<u8>>();
missing_fragments_map.insert(*fragment_set_id, missing_fragments);
}
};
}
self.missing_fragments = missing_fragments_map;
}
fn push_complete(&mut self, id: i32) {
self.complete_fragment_sets.insert(id);
}
fn push_incomplete(&mut self, id: i32) {
self.incomplete_fragment_sets.insert(id);
}
}
#[derive(Serialize, Debug, Default, ToSchema)]
pub struct NetworkAccountStats {
complete_fragment_sets: usize,
incomplete_fragment_sets: usize,
missing_fragments: usize,
complete_routes: usize,
incomplete_routes: usize,
tested_nodes: usize,
}
impl From<NetworkAccount> for NetworkAccountStats {
fn from(account: NetworkAccount) -> Self {
NetworkAccountStats {
complete_fragment_sets: account.complete_fragment_sets.len(),
incomplete_fragment_sets: account.incomplete_fragment_sets.len(),
missing_fragments: account.missing_fragments.values().map(|v| v.len()).sum(),
complete_routes: account.complete_routes.len(),
incomplete_routes: account.incomplete_routes.len(),
tested_nodes: account.tested_nodes.len(),
}
}
}
#[derive(Serialize, Debug, ToSchema)]
pub struct NodeStats {
mix_id: u32,
complete_routes: usize,
incomplete_routes: usize,
reliability: f64,
identity: String,
owner: Option<String>,
}
impl NodeStats {
pub fn new(
mix_id: u32,
complete_routes: usize,
incomplete_routes: usize,
identity: String,
owner: Option<String>,
) -> Self {
NodeStats {
mix_id,
complete_routes,
incomplete_routes,
reliability: complete_routes as f64 / (complete_routes + incomplete_routes) as f64,
identity,
owner,
}
}
pub fn reliability(&self) -> f64 {
self.reliability
}
pub fn into_node_results(self) -> NodeResult {
NodeResult {
node_id: self.mix_id,
identity: self.identity,
reliability: (self.reliability * 100.) as u8,
}
}
}
pub async fn all_node_stats() -> anyhow::Result<Vec<NodeStats>> {
let account = NetworkAccount::finalize()?;
Ok(account
.tested_nodes()
.iter()
.map(|id| account.node_stats(*id))
.collect::<Vec<NodeStats>>())
}
pub async fn monitor_gateway_results() -> anyhow::Result<Vec<NodeResult>> {
let account = NetworkAccount::finalize()?;
Ok(account
.gateway_stats
.iter()
.map(into_gateway_result)
.collect())
}
pub async fn monitor_mixnode_results() -> anyhow::Result<Vec<NodeResult>> {
let stats = all_node_stats().await?;
Ok(stats
.into_iter()
.map(NodeStats::into_node_results)
.collect())
}
pub async fn submit_metrics() -> anyhow::Result<()> {
let node_stats = monitor_mixnode_results().await?;
let gateway_stats = monitor_gateway_results().await?;
info!("Submitting metrics to {}", *NYM_API_URL);
let client = reqwest::Client::new();
let node_submit_url = format!("{}/{API_VERSION}/{STATUS}/{SUBMIT_NODE}", &*NYM_API_URL);
let gateway_submit_url = format!("{}/{API_VERSION}/{STATUS}/{SUBMIT_GATEWAY}", &*NYM_API_URL);
info!("Submitting {} mixnode measurements", node_stats.len());
node_stats
.chunks(10)
.map(|chunk| {
let monitor_message =
MonitorMessage::new(chunk.to_vec(), PRIVATE_KEY.get().expect("We've set this!"));
client.post(&node_submit_url).json(&monitor_message).send()
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
info!("Submitting {} gateway measurements", gateway_stats.len());
gateway_stats
.chunks(10)
.map(|chunk| {
let monitor_message =
MonitorMessage::new(chunk.to_vec(), PRIVATE_KEY.get().expect("We've set this!"));
client
.post(&gateway_submit_url)
.json(&monitor_message)
.send()
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
NetworkAccount::empty_buffers();
Ok(())
}
fn into_gateway_result((key, stats): (&String, &GatewayStats)) -> NodeResult {
NodeResult {
identity: key.clone(),
reliability: (stats.reliability() * 100.) as u8,
node_id: 0,
}
}
+295
View File
@@ -0,0 +1,295 @@
use axum::{
extract::{Path, State},
http::StatusCode,
Json,
};
use futures::StreamExt;
use log::{debug, error, warn};
use nym_sdk::mixnet::MixnetMessageSender;
use nym_sphinx::chunking::{ReceivedFragment, SentFragment, FRAGMENTS_RECEIVED, FRAGMENTS_SENT};
use petgraph::{dot::Dot, Graph};
use rand::{distributions::Alphanumeric, seq::SliceRandom, Rng};
use serde::Serialize;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use tokio::time::timeout;
use utoipa::ToSchema;
use crate::{
accounting::{all_node_stats, NetworkAccount, NetworkAccountStats, NodeStats},
http::AppState,
MIXNET_TIMEOUT,
};
#[derive(ToSchema, Serialize)]
pub struct FragmentsSent(HashMap<i32, Vec<SentFragment>>);
#[derive(ToSchema, Serialize)]
pub struct FragmentsReceived(HashMap<i32, Vec<ReceivedFragment>>);
#[utoipa::path(
get,
path = "/v1/stats",
responses(
(status = 200, description = "Returns statistics collected since startup", body = NetworkAccountStats),
)
)]
pub async fn stats_handler() -> Result<Json<NetworkAccountStats>, StatusCode> {
let account = NetworkAccount::finalize().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(account.into()))
}
#[utoipa::path(
get,
path = "/v1/node_stats/{mix_id}",
responses(
(status = 200, description = "Returns statistics for a given mix_id, collected since startup", body = NodeStats),
)
)]
pub async fn node_stats_handler(Path(mix_id): Path<u32>) -> Result<Json<NodeStats>, StatusCode> {
let account = NetworkAccount::finalize().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(account.node_stats(mix_id)))
}
#[utoipa::path(
get,
path = "/v1/node_stats",
responses(
(status = 200, description = "Returns statistics for all nodes, collected since startup, sorted by reliability", body = Vec<NodeStats>),
)
)]
pub async fn all_nodes_stats_handler() -> Result<Json<Vec<NodeStats>>, StatusCode> {
let mut stats = all_node_stats()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
stats.sort_by(|a, b| a.reliability().partial_cmp(&b.reliability()).unwrap());
Ok(Json(stats))
}
#[utoipa::path(
get,
path = "/v1/accounting",
responses(
(status = 200, description = "Returns raw aggregated data collected since startup", body = NetworkAccount),
)
)]
pub async fn accounting_handler() -> Result<Json<NetworkAccount>, StatusCode> {
NetworkAccount::finalize()
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
.map(Json)
}
#[utoipa::path(
get,
path = "/v1/dot/{mix_id}",
responses(
(status = 200, description = "Returns Subgraph for a given *mix_id* in `dot` format", body = String),
)
)]
pub async fn mix_dot_handler(Path(mix_id): Path<u32>) -> Result<String, StatusCode> {
generate_dot(Some(mix_id))
}
#[utoipa::path(
get,
path = "/v1/dot",
responses(
(status = 200, description = "Returns entire tested network graph in `dot` format", body = String),
)
)]
pub async fn graph_handler() -> Result<String, StatusCode> {
generate_dot(None)
}
#[utoipa::path(
get,
path = "/v1/sent",
responses(
(status = 200, description = "Returns a map of all fragments sent by the network monitor", body = FragmentsSent),
)
)]
pub async fn sent_handler() -> Json<FragmentsSent> {
Json(FragmentsSent(
(*FRAGMENTS_SENT)
.clone()
.into_iter()
.collect::<HashMap<_, _>>(),
))
}
#[utoipa::path(
get,
path = "/v1/received",
responses(
(status = 200, description = "Returns a map of all fragments received by the network monitor", body = FragmentsReceived),
)
)]
pub async fn recv_handler() -> Json<FragmentsReceived> {
Json(FragmentsReceived(
(*FRAGMENTS_RECEIVED)
.clone()
.into_iter()
.collect::<HashMap<_, _>>(),
))
}
#[utoipa::path(
post,
path = "/v1/send",
responses(
(status = 200, description = "Sends a message to itself through the mixnet", body = String),
)
)]
pub async fn send_handler(State(state): State<AppState>) -> Result<String, StatusCode> {
send_receive_mixnet(state).await
}
#[utoipa::path(
get,
path = "/v1/mermaid",
responses(
(status = 200, description = "Returns entire tested network graph in `mermaid` format", body = String),
)
)]
pub async fn mermaid_handler() -> Result<String, StatusCode> {
let account = NetworkAccount::finalize().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let mut mermaid = String::new();
mermaid.push_str("flowchart LR;\n");
for route in account.complete_routes() {
mermaid.push_str(
route
.iter()
.map(|n| n.to_string())
.collect::<Vec<String>>()
.join("-->")
.as_str(),
);
mermaid.push('\n')
}
for route in account.incomplete_routes() {
mermaid.push_str(
route
.iter()
.map(|n| n.to_string())
.collect::<Vec<String>>()
.join("-- ❌ -->")
.as_str(),
);
mermaid.push('\n')
}
Ok(mermaid)
}
async fn send_receive_mixnet(state: AppState) -> Result<String, StatusCode> {
let msg: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(32)
.map(char::from)
.collect();
let sent_msg = msg.clone();
let client = {
let mut clients = state.clients().write().await;
if let Some(client) = clients.make_contiguous().choose(&mut rand::thread_rng()) {
Arc::clone(client)
} else {
error!("No clients currently available");
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
};
let recv = Arc::clone(&client);
let sender = Arc::clone(&client);
let recv_handle = tokio::spawn(async move {
match timeout(
Duration::from_secs(*MIXNET_TIMEOUT.get().expect("Set at the begining")),
recv.write().await.next(),
)
.await
{
Ok(Some(received)) => {
debug!("Received: {}", String::from_utf8_lossy(&received.message));
}
Ok(None) => debug!("No message received"),
Err(e) => warn!("Failed to receive message: {e}"),
}
});
let send_handle = tokio::spawn(async move {
let mixnet_sender = sender.read().await.split_sender();
let our_address = *sender.read().await.nym_address();
match timeout(
Duration::from_secs(5),
mixnet_sender.send_plain_message(our_address, &msg),
)
.await
{
Ok(_) => debug!("Sent message: {msg}"),
Err(e) => warn!("Failed to send message: {e}"),
};
});
let results = futures::future::join_all(vec![send_handle, recv_handle]).await;
for result in results {
match result {
Ok(_) => {}
Err(e) => {
error!("Failed to send/receive message: {e}");
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
}
}
Ok(sent_msg)
}
fn generate_dot(mix_id: Option<u32>) -> Result<String, StatusCode> {
let account = NetworkAccount::finalize().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let mut nodes = HashSet::new();
let mut edges: Vec<(u32, u32)> = vec![];
let mut broken_edges: Vec<(u32, u32)> = vec![];
let mix_id = mix_id.unwrap_or(0);
for route in account.complete_routes().iter() {
if mix_id == 0 || route.contains(&mix_id) {
for window in route.windows(2) {
nodes.insert(window[0]);
nodes.insert(window[1]);
edges.push((window[0], window[1]));
}
}
}
for route in account.incomplete_routes().iter() {
if mix_id == 0 || route.contains(&mix_id) {
for window in route.windows(2) {
nodes.insert(window[0]);
nodes.insert(window[1]);
broken_edges.push((window[0], window[1]));
}
}
}
let mut graph = Graph::new();
let node_indices: HashMap<u32, _> = nodes
.iter()
.map(|node| (*node, graph.add_node(*node)))
.collect();
for (from, to) in edges {
graph.add_edge(node_indices[&from], node_indices[&to], "");
}
for (from, to) in broken_edges {
graph.add_edge(node_indices[&from], node_indices[&to], "");
}
let dot = Dot::new(&graph);
Ok(dot.to_string())
}
+94
View File
@@ -0,0 +1,94 @@
use crate::accounting::{NetworkAccount, NetworkAccountStats, NodeStats};
use crate::handlers::{
accounting_handler, all_nodes_stats_handler, graph_handler, mermaid_handler, mix_dot_handler,
node_stats_handler, recv_handler, send_handler, sent_handler, stats_handler, FragmentsReceived,
FragmentsSent,
};
use axum::routing::{get, post};
use axum::Router;
use log::info;
use nym_sphinx::chunking::fragment::FragmentHeader;
use nym_sphinx::chunking::{ReceivedFragment, SentFragment};
use std::net::SocketAddr;
use tokio_util::sync::CancellationToken;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
use crate::ClientsWrapper;
pub struct HttpServer {
listener: SocketAddr,
cancel: CancellationToken,
}
#[derive(OpenApi)]
#[openapi(
paths(
crate::handlers::accounting_handler,
crate::handlers::graph_handler,
crate::handlers::mermaid_handler,
crate::handlers::mix_dot_handler,
crate::handlers::node_stats_handler,
crate::handlers::recv_handler,
crate::handlers::send_handler,
crate::handlers::sent_handler,
crate::handlers::all_nodes_stats_handler,
),
components(schemas(
FragmentHeader,
FragmentsReceived,
FragmentsSent,
NetworkAccount,
NetworkAccountStats,
NodeStats,
ReceivedFragment,
SentFragment,
))
)]
struct ApiDoc;
#[derive(Clone)]
pub struct AppState {
clients: ClientsWrapper,
}
impl AppState {
pub fn clients(&self) -> &ClientsWrapper {
&self.clients
}
}
impl HttpServer {
pub fn new(listener: SocketAddr, cancel: CancellationToken) -> Self {
HttpServer { listener, cancel }
}
pub async fn run(self, clients: ClientsWrapper) -> anyhow::Result<()> {
let n_clients = clients.read().await.len();
let state = AppState { clients };
let app = Router::new()
.route("/v1/send", post(send_handler).with_state(state))
.merge(SwaggerUi::new("/v1/ui").url("/v1/docs/openapi.json", ApiDoc::openapi()))
.route("/v1/accounting", get(accounting_handler))
.route("/v1/sent", get(sent_handler))
.route("/v1/dot/:mix_id", get(mix_dot_handler))
.route("/v1/dot", get(graph_handler))
.route("/v1/mermaid", get(mermaid_handler))
.route("/v1/stats", get(stats_handler))
.route("/v1/node_stats/:mix_id", get(node_stats_handler))
.route("/v1/node_stats", get(all_nodes_stats_handler))
.route("/v1/received", get(recv_handler));
let listener = tokio::net::TcpListener::bind(self.listener).await?;
let server_future =
axum::serve(listener, app).with_graceful_shutdown(self.cancel.cancelled_owned());
info!("##########################################################################################");
info!("######################### HTTP server running, with {} clients ############################################", n_clients);
info!("##########################################################################################");
server_future.await?;
Ok(())
}
}
+214
View File
@@ -0,0 +1,214 @@
use crate::http::HttpServer;
use accounting::submit_metrics;
use anyhow::Result;
use clap::Parser;
use log::{info, warn};
use nym_crypto::asymmetric::ed25519::PrivateKey;
use nym_network_defaults::setup_env;
use nym_network_defaults::var_names::NYM_API;
use nym_sdk::mixnet::{self, MixnetClient};
use nym_topology::{HardcodedTopologyProvider, NymTopology};
use std::fs::File;
use std::io::Write;
use std::sync::LazyLock;
use std::time::Duration;
use std::{
collections::VecDeque,
net::{IpAddr, Ipv4Addr, SocketAddr},
str::FromStr,
sync::Arc,
};
use tokio::sync::OnceCell;
use tokio::{signal::ctrl_c, sync::RwLock};
use tokio_util::sync::CancellationToken;
static NYM_API_URL: LazyLock<String> = LazyLock::new(|| {
std::env::var(NYM_API).unwrap_or_else(|_| panic!("{} env var not set", NYM_API))
});
static MIXNET_TIMEOUT: OnceCell<u64> = OnceCell::const_new();
static TOPOLOGY: OnceCell<NymTopology> = OnceCell::const_new();
static PRIVATE_KEY: OnceCell<PrivateKey> = OnceCell::const_new();
mod accounting;
mod handlers;
mod http;
/// Simple program to greet a person
pub type ClientsWrapper = Arc<RwLock<VecDeque<Arc<RwLock<MixnetClient>>>>>;
async fn make_clients(
clients: ClientsWrapper,
n_clients: usize,
lifetime: u64,
topology: NymTopology,
) {
loop {
let spawned_clients = clients.read().await.len();
info!("Currently spawned clients: {}", spawned_clients);
// If we have enough clients, sleep for a minute and remove the oldest one
if spawned_clients >= n_clients {
info!("New client will be spawned in {} seconds", lifetime);
tokio::time::sleep(tokio::time::Duration::from_secs(lifetime)).await;
info!("Removing oldest client");
if let Some(dropped_client) = clients.write().await.pop_front() {
loop {
if Arc::strong_count(&dropped_client) == 1 {
if let Some(client) = Arc::into_inner(dropped_client) {
client.into_inner().disconnect().await;
} else {
warn!("Failed to drop client, client had more then one strong ref")
}
break;
}
info!("Client still in use, waiting 2 seconds");
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}
}
}
info!("Spawning new client");
let client = match make_client(topology.clone()).await {
Ok(client) => client,
Err(err) => {
warn!("{}, moving on", err);
continue;
}
};
clients
.write()
.await
.push_back(Arc::new(RwLock::new(client)));
}
}
async fn make_client(topology: NymTopology) -> Result<MixnetClient> {
let net = mixnet::NymNetworkDetails::new_from_env();
let topology_provider = Box::new(HardcodedTopologyProvider::new(topology));
let mixnet_client = mixnet::MixnetClientBuilder::new_ephemeral()
.network_details(net)
.custom_topology_provider(topology_provider)
// .enable_credentials_mode()
.build()?;
let client = mixnet_client.connect_to_mixnet().await?;
Ok(client)
}
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
/// Number of clients to spawn
#[arg(short = 'C', long = "clients", default_value_t = 10)]
n_clients: usize,
/// Lifetime of each client in seconds
#[arg(short = 'T', long, default_value_t = 60)]
client_lifetime: u64,
/// Port to listen on
#[arg(long, default_value_t = 8080)]
port: u16,
/// Host to listen on
#[arg(long, default_value = "127.0.0.1")]
host: String,
/// Path to the topology file
#[arg(short, long, default_value = None)]
topology: Option<String>,
/// Path to the environment file
#[arg(short, long, default_value = None)]
env: Option<String>,
#[arg(short, long, default_value_t = 10)]
mixnet_timeout: u64,
#[arg(long, default_value_t = false)]
generate_key_pair: bool,
#[arg(long)]
private_key: String,
}
fn generate_key_pair() -> Result<()> {
let mut rng = rand::thread_rng();
let keypair = nym_crypto::asymmetric::identity::KeyPair::new(&mut rng);
let mut public_key_file = File::create("network-monitor-public")?;
public_key_file.write_all(keypair.public_key().to_base58_string().as_bytes())?;
let mut private_key_file = File::create("network-monitor-private")?;
private_key_file.write_all(keypair.private_key().to_base58_string().as_bytes())?;
info!("Generated keypair, public key to 'network-monitor-public', and private key to 'network-monitor-private', public key should be whitelisted with the nym-api");
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
nym_bin_common::logging::setup_logging();
let args = Args::parse();
setup_env(args.env); // Defaults to mainnet if empty
let cancel_token = CancellationToken::new();
let server_cancel_token = cancel_token.clone();
let clients = Arc::new(RwLock::new(VecDeque::with_capacity(args.n_clients)));
if args.generate_key_pair {
generate_key_pair()?;
std::process::exit(0);
}
let pk = PrivateKey::from_base58_string(&args.private_key)?;
PRIVATE_KEY.set(pk).ok();
TOPOLOGY
.set(if let Some(topology_file) = args.topology {
NymTopology::new_from_file(topology_file)?
} else {
NymTopology::new_from_env().await?
})
.ok();
MIXNET_TIMEOUT.set(args.mixnet_timeout).ok();
let spawn_clients = Arc::clone(&clients);
tokio::spawn(make_clients(
spawn_clients,
args.n_clients,
args.client_lifetime,
TOPOLOGY.get().expect("Topology not set yet!").clone(),
));
let server_handle = tokio::spawn(async move {
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_str(&args.host)?), args.port);
let server = HttpServer::new(socket, server_cancel_token);
server.run(clients).await
});
info!("Waiting for message (ctrl-c to exit)");
loop {
match tokio::time::timeout(Duration::from_secs(600), ctrl_c()).await {
Ok(_) => {
info!("Received kill signal, shutting down, submitting final batch of metrics");
submit_metrics().await?;
break;
}
Err(_) => {
info!("Submitting metrics, cleaning metric buffers");
submit_metrics().await?;
}
};
}
cancel_token.cancel();
server_handle.await??;
Ok(())
}