Compare commits

...

25 Commits

Author SHA1 Message Date
durch e9dc848950 Bump nym-api 2025-06-02 12:49:13 +02:00
durch 81162fba7e Allow nym-api init fail 2025-06-02 12:45:21 +02:00
durch be36da68b1 Clap value_delimiter 2025-06-02 11:58:12 +02:00
durch 21a56e307f Bump NNM 2025-06-02 09:31:09 +02:00
durch bd966383be Towards untangling nym-api client 2025-06-02 09:30:36 +02:00
durch 7626785ce4 Bump NM version 2025-05-27 15:40:59 +02:00
durch 6f79d39d48 Filter out non mixnodes 2025-05-27 15:40:33 +02:00
durch 014b5f767a Log APIs used 2025-05-27 15:23:18 +02:00
durch e0966565e6 Mnemonic to run, bump 2025-05-27 13:38:13 +02:00
durch c6aec663b7 Bump nym-api version 2025-05-27 13:18:31 +02:00
durch 7d041ddd44 Explicit mnemonic to entrypoint 2025-05-27 13:02:38 +02:00
durch 5d8bdc6570 Bunch of new query files 2025-05-27 10:28:00 +02:00
durch 06c412b3ba Remove debug logging 2025-05-27 10:25:27 +02:00
durch 356cf00106 Put the monitoring back properly 2025-05-27 10:25:27 +02:00
durch 58493a69aa Fix submission URLs 2025-05-27 10:25:27 +02:00
durch e881da834b More NM logging 2025-05-27 10:25:27 +02:00
durch eee9d8ab0c DEBUG: disable epoch operations, less noisy logging 2025-05-27 10:25:27 +02:00
durch 09026307f4 Debug logging for nym-api 2025-05-27 10:25:27 +02:00
durch 507ddf246c Stagger out route sending 2025-05-27 10:25:26 +02:00
durch 8d8ce29113 Update NM readme, fmt 2025-05-27 10:25:26 +02:00
durch 3be9e06bef sqlx prepare, bunch of nits 2025-05-27 10:25:26 +02:00
durch 770078a9ed Delete test script 2025-05-27 10:25:26 +02:00
durch fcffebfe45 Raw route handling and reliability corrections 2025-05-27 10:25:19 +02:00
durch 9c7d79683b Force routing through all nodes 2025-05-27 10:21:02 +02:00
durch c7f34d04c0 Support submitting to multiple APIs 2025-05-27 10:21:02 +02:00
20 changed files with 1203 additions and 759 deletions
+1
View File
@@ -62,3 +62,4 @@ nym-api/redocly/formatted-openapi.json
**/settings.sql
**/enter_db.sh
CLAUDE.md
Generated
+545 -577
View File
File diff suppressed because it is too large Load Diff
@@ -73,6 +73,7 @@ pub const STAKE_SATURATION: &str = "stake-saturation";
pub const INCLUSION_CHANCE: &str = "inclusion-probability";
pub const SUBMIT_GATEWAY: &str = "submit-gateway-monitoring-results";
pub const SUBMIT_NODE: &str = "submit-node-monitoring-results";
pub const SUBMIT_ROUTE: &str = "submit-route-monitoring-results";
pub const SERVICE_PROVIDERS: &str = "services";
@@ -124,7 +124,7 @@ impl ReconstructionBuffer {
// TODO: what to do in that case? give up on the message? overwrite it? panic?
// it *might* be due to lock ack-packet, but let's keep the `warn` level in case
// it could be somehow exploited
warn!(
debug!(
"duplicate fragment received! - frag - {} (set id: {})",
fragment.current_fragment(),
fragment.id()
+4
View File
@@ -224,6 +224,10 @@ impl NymTopology {
serde_json::from_reader(file).map_err(Into::into)
}
pub fn node_details(&self) -> &HashMap<NodeId, RoutingNode> {
&self.node_details
}
pub fn add_skimmed_nodes(&mut self, nodes: &[SkimmedNode]) {
self.add_additional_nodes(nodes.iter())
}
+28 -7
View File
@@ -11,6 +11,27 @@ static NETWORK_MONITORS: LazyLock<HashSet<String>> = LazyLock::new(|| {
nm
});
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone, ToSchema)]
pub struct RouteResult {
pub layer1: u32,
pub layer2: u32,
pub layer3: u32,
pub gw: u32,
pub success: bool,
}
impl RouteResult {
pub fn new(layer1: u32, layer2: u32, layer3: u32, gw: u32, success: bool) -> Self {
RouteResult {
layer1,
layer2,
layer3,
gw,
success,
}
}
}
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone, ToSchema)]
pub struct NodeResult {
#[schema(value_type = u32)]
@@ -29,23 +50,23 @@ impl NodeResult {
}
}
#[derive(Serialize, Deserialize, JsonSchema)]
#[derive(Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(untagged)]
pub enum MonitorResults {
Mixnode(Vec<NodeResult>),
Gateway(Vec<NodeResult>),
Node(Vec<NodeResult>),
Route(Vec<RouteResult>),
}
#[derive(Serialize, Deserialize, JsonSchema, ToSchema)]
pub struct MonitorMessage {
results: Vec<NodeResult>,
results: MonitorResults,
signature: String,
signer: String,
timestamp: i64,
}
impl MonitorMessage {
fn message_to_sign(results: &[NodeResult], timestamp: i64) -> Vec<u8> {
fn message_to_sign(results: &MonitorResults, timestamp: i64) -> Vec<u8> {
let mut msg = serde_json::to_vec(results).unwrap_or_default();
msg.extend_from_slice(&timestamp.to_le_bytes());
msg
@@ -60,7 +81,7 @@ impl MonitorMessage {
now - self.timestamp < 5
}
pub fn new(results: Vec<NodeResult>, private_key: &PrivateKey) -> Self {
pub fn new(results: MonitorResults, private_key: &PrivateKey) -> Self {
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Time went backwards")
@@ -82,7 +103,7 @@ impl MonitorMessage {
NETWORK_MONITORS.contains(&self.signer)
}
pub fn results(&self) -> &[NodeResult] {
pub fn results(&self) -> &MonitorResults {
&self.results
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n INSERT OR IGNORE INTO routes (layer1, layer2, layer3, gw, success) VALUES (?, ?, ?, ?, ?);\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 5
},
"nullable": []
},
"hash": "66109c1d856e1ca2b5126e4bf4c58c7a27b8c303bfa079cf74909354202dcc49"
}
@@ -1,26 +0,0 @@
{
"db_name": "SQLite",
"query": "\n SELECT\n d.node_id as \"node_id: NodeId\",\n CASE WHEN count(*) > 3 THEN AVG(reliability) ELSE 100 END as \"value: f32\"\n FROM\n gateway_details d\n JOIN\n gateway_status s on d.id = s.gateway_details_id\n WHERE\n timestamp >= ? AND\n timestamp <= ?\n GROUP BY 1\n ",
"describe": {
"columns": [
{
"name": "node_id: NodeId",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "value: f32",
"ordinal": 1,
"type_info": "Int"
}
],
"parameters": {
"Right": 2
},
"nullable": [
false,
true
]
},
"hash": "676299beb2004ab89f7b38cf21ffb84ab5e7d7435297573523e2532560c2e302"
}
@@ -0,0 +1,44 @@
{
"db_name": "SQLite",
"query": "\n SELECT\n layer1 as \"layer1\",\n layer2 as \"layer2\",\n layer3 as \"layer3\",\n gw as \"gw\",\n success\n FROM routes\n WHERE timestamp >= ? AND timestamp <= ?\n ORDER BY timestamp ASC\n ",
"describe": {
"columns": [
{
"name": "layer1",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "layer2",
"ordinal": 1,
"type_info": "Int64"
},
{
"name": "layer3",
"ordinal": 2,
"type_info": "Int64"
},
{
"name": "gw",
"ordinal": 3,
"type_info": "Int64"
},
{
"name": "success",
"ordinal": 4,
"type_info": "Bool"
}
],
"parameters": {
"Right": 2
},
"nullable": [
false,
false,
false,
false,
false
]
},
"hash": "6b2479c02cf1ef5ae674ce0ab4d027595b91739f3579e1f289b0c722ea91bbcc"
}
@@ -1,26 +0,0 @@
{
"db_name": "SQLite",
"query": "\n SELECT\n d.mix_id as \"mix_id: NodeId\",\n AVG(s.reliability) as \"value: f32\"\n FROM\n mixnode_details d\n JOIN\n mixnode_status s on d.id = s.mixnode_details_id\n WHERE\n timestamp >= ? AND\n timestamp <= ?\n GROUP BY 1\n ",
"describe": {
"columns": [
{
"name": "mix_id: NodeId",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "value: f32",
"ordinal": 1,
"type_info": "Int64"
}
],
"parameters": {
"Right": 2
},
"nullable": [
false,
true
]
},
"hash": "c19e1b3768bf2929407599e6e8783ead09f4d7319b7997fa2a9bb628f9404166"
}
+1 -1
View File
@@ -4,7 +4,7 @@
[package]
name = "nym-api"
license = "GPL-3.0"
version = "1.1.57"
version = "1.1.60"
authors.workspace = true
edition = "2021"
rust-version.workspace = true
+2 -1
View File
@@ -2,4 +2,5 @@
set -e
/usr/src/nym/target/release/nym-api init && /usr/src/nym/target/release/nym-api run
# Init can fail if the mounted volume already has a config
/usr/src/nym/target/release/nym-api init --mnemonic "$MNEMONIC" || true && /usr/src/nym/target/release/nym-api run --mnemonic "$MNEMONIC"
@@ -0,0 +1,17 @@
-- Add routes table for storing route metrics data
CREATE TABLE IF NOT EXISTS routes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
layer1 INTEGER NOT NULL, -- NodeId of layer 1 mixnode
layer2 INTEGER NOT NULL, -- NodeId of layer 2 mixnode
layer3 INTEGER NOT NULL, -- NodeId of layer 3 mixnode
gw INTEGER NOT NULL, -- NodeId of gateway
success BOOLEAN NOT NULL, -- Whether the packet was delivered successfully
timestamp INTEGER NOT NULL DEFAULT (unixepoch()) -- When the measurement was taken
);
-- Add indexes for efficient querying
CREATE INDEX IF NOT EXISTS idx_routes_timestamp ON routes(timestamp);
CREATE INDEX IF NOT EXISTS idx_routes_layer1 ON routes(layer1);
CREATE INDEX IF NOT EXISTS idx_routes_layer2 ON routes(layer2);
CREATE INDEX IF NOT EXISTS idx_routes_layer3 ON routes(layer3);
CREATE INDEX IF NOT EXISTS idx_routes_gw ON routes(gw);
@@ -20,7 +20,7 @@ use nym_api_requests::models::{
};
use nym_http_api_common::{FormattedResponse, OutputParams};
use nym_mixnet_contract_common::NodeId;
use nym_types::monitoring::MonitorMessage;
use nym_types::monitoring::{MonitorMessage, MonitorResults};
use tracing::error;
pub(super) fn mandatory_routes() -> Router<AppState> {
@@ -33,6 +33,10 @@ pub(super) fn mandatory_routes() -> Router<AppState> {
"/submit-node-monitoring-results",
post(submit_node_monitoring_results),
)
.route(
"/submit-route-monitoring-results",
post(submit_route_monitoring_results),
)
.nest(
"/mixnode/:mix_id",
Router::new()
@@ -58,6 +62,53 @@ pub(super) fn mandatory_routes() -> Router<AppState> {
)
}
#[utoipa::path(
tag = "status",
post,
path = "/v1/status/submit-route-monitoring-results",
responses(
(status = 200),
(status = 400, body = String, description = "TBD"),
(status = 403, body = String, description = "TBD"),
(status = 500, body = String, description = "TBD"),
),
)]
pub(crate) async fn submit_route_monitoring_results(
State(state): State<AppState>,
Json(message): Json<MonitorMessage>,
) -> AxumResult<()> {
if !message.is_in_allowed() {
return Err(AxumErrorResponse::forbidden(
"Monitor not registered to submit results",
));
}
if !message.timely() {
return Err(AxumErrorResponse::bad_request("Message is too old"));
}
if !message.verify() {
return Err(AxumErrorResponse::bad_request("invalid signature"));
}
match message.results() {
MonitorResults::Route(results) => {
match state.storage.submit_route_monitoring_results(results).await {
Ok(_) => Ok(()),
Err(err) => {
error!("failed to submit node monitoring results: {err}");
Err(AxumErrorResponse::internal_msg(
"failed to submit node monitoring results",
))
}
}
}
MonitorResults::Node(_results) => Err(AxumErrorResponse::bad_request(
"Node monitoring results not supported for this endpoint",
)),
}
}
#[utoipa::path(
tag = "status",
post,
@@ -87,18 +138,21 @@ pub(crate) async fn submit_gateway_monitoring_results(
return Err(AxumErrorResponse::bad_request("invalid signature"));
}
match state
.storage
.submit_gateway_statuses_v2(message.results())
.await
{
Ok(_) => Ok(()),
Err(err) => {
error!("failed to submit gateway monitoring results: {err}");
Err(AxumErrorResponse::internal_msg(
"failed to submit gateway monitoring results",
))
match message.results() {
MonitorResults::Node(results) => {
match state.storage.submit_gateway_statuses_v2(results).await {
Ok(_) => Ok(()),
Err(err) => {
error!("failed to submit node monitoring results: {err}");
Err(AxumErrorResponse::internal_msg(
"failed to submit node monitoring results",
))
}
}
}
MonitorResults::Route(_results) => Err(AxumErrorResponse::bad_request(
"Gateway monitoring results not supported for this endpoint",
)),
}
}
@@ -131,18 +185,21 @@ pub(crate) async fn submit_node_monitoring_results(
return Err(AxumErrorResponse::bad_request("invalid signature"));
}
match state
.storage
.submit_mixnode_statuses_v2(message.results())
.await
{
Ok(_) => Ok(()),
Err(err) => {
error!("failed to submit node monitoring results: {err}");
Err(AxumErrorResponse::internal_msg(
"failed to submit node monitoring results",
))
match message.results() {
MonitorResults::Node(results) => {
match state.storage.submit_mixnode_statuses_v2(results).await {
Ok(_) => Ok(()),
Err(err) => {
error!("failed to submit node monitoring results: {err}");
Err(AxumErrorResponse::internal_msg(
"failed to submit node monitoring results",
))
}
}
}
MonitorResults::Route(_results) => Err(AxumErrorResponse::bad_request(
"Node monitoring results not supported for this endpoint",
)),
}
}
+240 -43
View File
@@ -10,8 +10,9 @@ use crate::support::storage::models::{
};
use crate::support::storage::DbIdCache;
use nym_mixnet_contract_common::{EpochId, IdentityKey, NodeId};
use nym_types::monitoring::NodeResult;
use nym_types::monitoring::{NodeResult, RouteResult};
use sqlx::FromRow;
use std::collections::HashMap;
use time::{Date, OffsetDateTime};
use tracing::info;
@@ -51,6 +52,25 @@ impl AvgGatewayReliability {
}
}
// Helper struct for in-memory state during calculation for an interval
#[derive(Debug, Default, Clone)]
struct NodeCorrectionIntervalState {
pos_samples: u32,
neg_samples: u32,
fail_seq: u32,
}
// Output struct for the calculated corrected reliability for an interval
#[derive(Debug, serde::Serialize)] // Add utoipa::ToSchema if this struct is directly exposed via API
pub struct CorrectedNodeIntervalReliability {
pub node_id: NodeId, // nym_mixnet_contract_common::NodeId (typically u32)
pub identity: Option<String>, // Base58 public key
pub reliability: f64,
pub pos_samples_in_interval: u32,
pub neg_samples_in_interval: u32,
pub final_fail_seq_in_interval: u32,
}
// all SQL goes here
impl StorageManager {
pub(super) async fn get_all_avg_mix_reliability_in_last_24hr(
@@ -76,27 +96,29 @@ impl StorageManager {
start_ts_secs: i64,
end_ts_secs: i64,
) -> Result<Vec<AvgMixnodeReliability>, sqlx::Error> {
let result = sqlx::query_as!(
AvgMixnodeReliability,
r#"
SELECT
d.mix_id as "mix_id: NodeId",
AVG(s.reliability) as "value: f32"
FROM
mixnode_details d
JOIN
mixnode_status s on d.id = s.mixnode_details_id
WHERE
timestamp >= ? AND
timestamp <= ?
GROUP BY 1
"#,
start_ts_secs,
end_ts_secs
)
.fetch_all(&self.connection_pool)
.await?;
Ok(result)
let corrected_reliabilities = self
.calculate_corrected_node_reliabilities_for_interval(start_ts_secs, end_ts_secs)
.await
.map_err(|_e| sqlx::Error::PoolClosed)?; // Example: map anyhow::Error to sqlx::Error; adjust as needed
let mut avg_mix_reliabilities = Vec::new();
for corrected_node_info in corrected_reliabilities {
// Check if this node_id is a mixnode by attempting to fetch its identity key as a mixnode.
// This relies on get_mixnode_identity_key returning Some for mixnodes and None (or error) for non-mixnodes.
if self
.get_mixnode_identity_key(corrected_node_info.node_id)
.await?
.is_some()
{
avg_mix_reliabilities.push(AvgMixnodeReliability {
mix_id: corrected_node_info.node_id,
value: Some(corrected_node_info.reliability as f32),
});
}
}
Ok(avg_mix_reliabilities)
}
pub(super) async fn get_all_avg_gateway_reliability_in_interval(
@@ -104,27 +126,35 @@ impl StorageManager {
start_ts_secs: i64,
end_ts_secs: i64,
) -> Result<Vec<AvgGatewayReliability>, sqlx::Error> {
let result = sqlx::query_as!(
AvgGatewayReliability,
r#"
SELECT
d.node_id as "node_id: NodeId",
CASE WHEN count(*) > 3 THEN AVG(reliability) ELSE 100 END as "value: f32"
FROM
gateway_details d
JOIN
gateway_status s on d.id = s.gateway_details_id
WHERE
timestamp >= ? AND
timestamp <= ?
GROUP BY 1
"#,
start_ts_secs,
end_ts_secs
)
.fetch_all(&self.connection_pool)
.await?;
Ok(result)
let corrected_reliabilities = self
.calculate_corrected_node_reliabilities_for_interval(start_ts_secs, end_ts_secs)
.await
.map_err(|_e| sqlx::Error::PoolClosed)?; // Example: map anyhow::Error to sqlx::Error; adjust as needed
let mut avg_gateway_reliabilities = Vec::new();
for corrected_node_info in corrected_reliabilities {
// Check if this node_id is a gateway.
if self
.get_gateway_identity_key(corrected_node_info.node_id)
.await?
.is_some()
{
let total_samples = corrected_node_info.pos_samples_in_interval
+ corrected_node_info.neg_samples_in_interval;
let reliability_value = if total_samples <= 3 {
100.0 // Default to 100% if 3 or fewer samples
} else {
corrected_node_info.reliability as f32
};
avg_gateway_reliabilities.push(AvgGatewayReliability {
node_id: corrected_node_info.node_id, // AvgGatewayReliability uses node_id
value: Some(reliability_value),
});
}
}
Ok(avg_gateway_reliabilities)
}
/// Tries to obtain row id of given mixnode given its identity.
@@ -664,6 +694,33 @@ impl StorageManager {
tx.commit().await
}
pub(super) async fn submit_route_monitoring_results(
&self,
route_results: &[RouteResult],
) -> Result<(), sqlx::Error> {
info!("Inserting {} route monitoring results", route_results.len());
// insert it all in a transaction to make sure all nodes are updated at the same time
// (plus it's a nice guard against new nodes)
let mut tx = self.connection_pool.begin().await?;
for route_result in route_results {
sqlx::query!(
r#"
INSERT OR IGNORE INTO routes (layer1, layer2, layer3, gw, success) VALUES (?, ?, ?, ?, ?);
"#,
route_result.layer1,
route_result.layer2,
route_result.layer3,
route_result.gw,
route_result.success,
)
.execute(&mut *tx)
.await?;
}
tx.commit().await
}
pub(super) async fn submit_gateway_statuses_v2(
&self,
gateway_results: &[NodeResult],
@@ -1329,6 +1386,146 @@ impl StorageManager {
.fetch_all(&self.connection_pool)
.await
}
/// Fetches raw route results from the database for a given time interval.
/// Assumes the 'routes' table has layer1, layer2, layer3, gw, success, and a timestamp.
async fn get_raw_routes_in_interval(
&self,
start_ts_secs: i64,
end_ts_secs: i64,
) -> Result<Vec<(NodeId, NodeId, NodeId, NodeId, bool)>, sqlx::Error> {
// Temporary struct to match the expected columns from the 'routes' table
// NodeId here is assumed to be compatible with how layer1, etc. are stored (e.g. u32/i32/i64)
struct RawRouteData {
layer1: i64,
layer2: i64,
layer3: i64,
gw: i64,
success: Option<bool>,
// timestamp: i64, // Not explicitly selected into struct, but used in WHERE and ORDER BY
}
// Ensure your 'routes' table has:
// layer1 (NodeId type), layer2 (NodeId type), layer3 (NodeId type),
// gw (NodeId type), success (bool/INTEGER), timestamp (BIGINT/INTEGER)
// The "NodeId:" type hint for sqlx::query_as! helps map to nym_mixnet_contract_common::NodeId
let db_routes = sqlx::query_as!(
RawRouteData,
r#"
SELECT
layer1 as "layer1",
layer2 as "layer2",
layer3 as "layer3",
gw as "gw",
success
FROM routes
WHERE timestamp >= ? AND timestamp <= ?
ORDER BY timestamp ASC
"#,
start_ts_secs,
end_ts_secs
)
.fetch_all(&self.connection_pool)
.await?;
Ok(db_routes
.into_iter()
.map(|r| {
(
r.layer1 as NodeId,
r.layer2 as NodeId,
r.layer3 as NodeId,
r.gw as NodeId,
r.success.unwrap_or_default(),
)
})
.collect())
}
pub async fn calculate_corrected_node_reliabilities_for_interval(
&self,
start_ts_secs: i64,
end_ts_secs: i64,
) -> Result<Vec<CorrectedNodeIntervalReliability>, anyhow::Error> {
let raw_routes = self
.get_raw_routes_in_interval(start_ts_secs, end_ts_secs)
.await?;
let mut node_states: HashMap<NodeId, NodeCorrectionIntervalState> = HashMap::new();
for (l1, l2, l3, gw, success) in raw_routes {
let path_node_ids = [l1, l2, l3, gw];
if success {
for &node_id in &path_node_ids {
let state = node_states.entry(node_id).or_default();
state.pos_samples += 1;
state.fail_seq = 0;
}
} else {
// Path test failed
let mut current_path_node_ids_for_blame: Vec<NodeId> = Vec::new();
for &node_id in &path_node_ids {
let state = node_states.entry(node_id).or_default();
state.fail_seq += 1;
current_path_node_ids_for_blame.push(node_id);
}
let mut guilty_nodes_in_path: Vec<NodeId> = Vec::new();
for &node_id in &current_path_node_ids_for_blame {
if let Some(state_after_update) = node_states.get(&node_id) {
if state_after_update.fail_seq > 2 {
guilty_nodes_in_path.push(node_id);
}
}
}
if !guilty_nodes_in_path.is_empty() {
for &guilty_node_id in &guilty_nodes_in_path {
if let Some(state) = node_states.get_mut(&guilty_node_id) {
state.neg_samples += 1;
}
}
} else {
// No single guilty party, distribute blame
for &node_id_in_path in &current_path_node_ids_for_blame {
if let Some(state) = node_states.get_mut(&node_id_in_path) {
state.neg_samples += 1;
}
}
}
}
}
let mut final_reliabilities = Vec::new();
for (node_id, state) in node_states {
let total_samples = state.pos_samples + state.neg_samples;
let reliability = if total_samples == 0 {
0.0 // Default for no samples in this interval. Consider Option<f64> or filtering.
} else {
state.pos_samples as f64 / total_samples as f64
};
// Attempt to fetch identity, first as mixnode, then as gateway if not found.
// This assumes get_mixnode_identity_key and get_gateway_identity_key return Result<Option<String>, sqlx::Error>
let mut identity: Option<String> =
self.get_mixnode_identity_key(node_id).await.unwrap_or(None);
if identity.is_none() {
identity = self.get_gateway_identity_key(node_id).await.unwrap_or(None);
}
final_reliabilities.push(CorrectedNodeIntervalReliability {
node_id,
identity,
reliability,
pos_samples_in_interval: state.pos_samples,
neg_samples_in_interval: state.neg_samples,
final_fail_seq_in_interval: state.fail_seq,
});
}
Ok(final_reliabilities)
}
}
pub(crate) mod v3_migration {
+11 -1
View File
@@ -17,7 +17,7 @@ use crate::support::storage::models::{
};
use dashmap::DashMap;
use nym_mixnet_contract_common::NodeId;
use nym_types::monitoring::NodeResult;
use nym_types::monitoring::{NodeResult, RouteResult};
use sqlx::sqlite::{SqliteAutoVacuum, SqliteSynchronous};
use sqlx::ConnectOptions;
use std::path::Path;
@@ -835,6 +835,16 @@ impl NymApiStorage {
Ok(())
}
pub(crate) async fn submit_route_monitoring_results(
&self,
route_results: &[RouteResult],
) -> Result<(), NymApiStorageError> {
self.manager
.submit_route_monitoring_results(route_results)
.await?;
Ok(())
}
/// Obtains number of network monitor test runs that have occurred within the specified interval.
///
/// # Arguments
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-network-monitor"
version = "1.0.2"
version = "1.1.3"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
+87
View File
@@ -4,6 +4,14 @@ Monitors the Nym network by sending itself packages across the mixnet.
Network monitor is running two tokio tasks, one manages mixnet clients and another manages monitoring itself. Monitor is designed to be driven externally, via an HTTP api. This means that it does not do any monitoring unless driven by something like [`locust`](https://locust.io/). This allows us to tailor the load externally, potentially distributing it across multiple monitors.
## Features
- **Continuous Monitoring**: Periodically sends test packets through the network
- **Node Performance**: Tracks individual node reliability metrics
- **Route Performance**: Records route-level success rates through specific node combinations
- **Multi-API Submission**: Capable of submitting metrics to multiple API endpoints (fanout)
- **Force Routing**: Can force packets through all mixnet nodes for comprehensive testing
### Client manager
On start network monitor will spawn `C` clients, with 10 being the default. Random client is dropped every `T`, defaults to 60 seconds, and a new one is created. Clients chose a random gateway to connect to the mixnet. Meaning that on average all gateways will be tested in `NUMBER_OF_GATEWAYS/N*T`, assuming at least one request per client per T.
@@ -40,8 +48,87 @@ Options:
-m, --mixnet-timeout <MIXNET_TIMEOUT> [default: 10]
--generate-key-pair
--private-key <PRIVATE_KEY>
--database-url <DATABASE_URL> SQLite database URL
--nym-apis <NYM_APIS> Comma-separated list of Nym API URLs
-h, --help Print help
-V, --version Print version
```
## Metrics Collection & Reporting
### Node Metrics
The Network Monitor tracks performance metrics for individual nodes:
- **Reliability**: Percentage of successful packet handling
- **Failure Sequences**: Tracking consecutive failures
- **Volume**: Number of packets handled
### Route Metrics
Since version 1.1.0, the Network Monitor also tracks route-level metrics:
- **Route Success Rates**: Tracking which specific combinations of nodes have successful packet delivery
- **Layer Analysis**: Identifying weak points in specific network layers
- **Path Correction**: Improved algorithms for attributing failures to specific nodes
### Metrics Fanout
The Network Monitor can submit metrics to multiple API endpoints simultaneously:
1. Metrics are collected during each monitoring cycle
2. The collected metrics are submitted to each configured API endpoint
3. This provides redundancy and allows for distributed metrics collection
To enable metrics fanout, use the `--nym-apis` parameter with a comma-separated list of API URLs:
```bash
cargo run -p nym-network-monitor -- --nym-apis https://api1.example.com,https://api2.example.com
```
## Route Data Structure
Route metrics use the following data structure:
```rust
// Route performance data
pub struct RouteResult {
pub layer1: u32, // NodeId of layer 1 mixnode
pub layer2: u32, // NodeId of layer 2 mixnode
pub layer3: u32, // NodeId of layer 3 mixnode
pub gw: u32, // NodeId of gateway
pub success: bool, // Whether the packet was successfully delivered
}
```
## Forced Routing
To ensure comprehensive testing of all nodes in the network, the Monitor supports forcing packets through all available nodes:
- Each node is assigned to a specific layer (1, 2, or 3) deterministically
- This ensures all nodes participate in route testing
- The routing algorithm cycles through all possible node combinations
Since version 1.1.0, Network Monitor automatically forces all available nodes to be active and distributes them evenly across the three layers (Layer 1, Layer 2, and Layer 3). This ensures every node in the network participates in testing, providing more comprehensive coverage and better metrics for all nodes, not just the popular ones.
## Node Performance Calculation
The Network Monitor uses a sophisticated algorithm for attributing failures to specific nodes:
1. For successful packet deliveries, all nodes in the path receive a positive sample
2. For failed deliveries:
- Nodes with more than 2 consecutive failures are considered "guilty"
- If no node is clearly guilty, all nodes in the path receive negative samples
3. Final node reliability is calculated as: positive_samples / (positive_samples + negative_samples)
## Changelog
### Version 1.1.0
- Added route-level metrics tracking and submission
- Implemented metrics fanout to multiple API endpoints
- Forced routing through all available nodes for comprehensive testing
- Improved reliability corrections with consecutive failure tracking
- Updated data structures for better metrics organization
### Version 1.0.2
- Initial public release with basic monitoring capabilities
+99 -45
View File
@@ -5,11 +5,13 @@ use std::{
use anyhow::Result;
use futures::{pin_mut, stream::FuturesUnordered, StreamExt};
use log::{debug, error, info};
use log::{debug, error, info, warn};
use nym_sphinx::chunking::{monitoring, SentFragment};
use nym_topology::{NymRouteProvider, RoutingNode};
use nym_types::monitoring::{MonitorMessage, NodeResult};
use nym_validator_client::nym_api::routes::{API_VERSION, STATUS, SUBMIT_GATEWAY, SUBMIT_NODE};
use nym_types::monitoring::{MonitorMessage, MonitorResults, NodeResult, RouteResult};
use nym_validator_client::nym_api::routes::{
API_VERSION, STATUS, SUBMIT_GATEWAY, SUBMIT_NODE, SUBMIT_ROUTE,
};
use rand::SeedableRng;
use rand_chacha::ChaCha8Rng;
use serde::{Deserialize, Serialize};
@@ -17,7 +19,7 @@ use tokio::task::JoinHandle;
use tokio_postgres::{binary_copy::BinaryCopyInWriter, types::Type, Client, NoTls};
use utoipa::ToSchema;
use crate::{NYM_API_URL, PRIVATE_KEY, TOPOLOGY};
use crate::{NYM_API_URLS, PRIVATE_KEY, TOPOLOGY};
struct HydratedRoute {
mix_nodes: Vec<RoutingNode>,
@@ -439,6 +441,7 @@ async fn db_connection(database_url: Option<&String>) -> Result<Option<(Client,
Ok(None)
}
}
pub async fn submit_metrics_to_db(database_url: Option<&String>) -> anyhow::Result<()> {
if let Some((client, handle)) = db_connection(database_url).await? {
let client = Arc::new(client);
@@ -491,49 +494,100 @@ pub async fn submit_metrics(database_url: Option<&String>) -> anyhow::Result<()>
}
if let Some(private_key) = PRIVATE_KEY.get() {
let node_stats = monitor_mixnode_results().await?;
let gateway_stats = monitor_gateway_results().await?;
if let Some(nym_api_urls) = NYM_API_URLS.get() {
info!("Submitting metrics to {} nym apis", nym_api_urls.len());
for nym_api_url in nym_api_urls {
info!("Submitting metrics to {}", nym_api_url);
let node_stats = monitor_mixnode_results().await?;
let gateway_stats = monitor_gateway_results().await?;
let client = reqwest::Client::new();
info!("Submitting metrics to {}", *NYM_API_URL);
let client = reqwest::Client::new();
let node_submit_url = format!("{}/{API_VERSION}/{STATUS}/{SUBMIT_NODE}", &*NYM_API_URL);
let gateway_submit_url =
format!("{}/{API_VERSION}/{STATUS}/{SUBMIT_GATEWAY}", &*NYM_API_URL);
info!("Submitting {} mixnode measurements", node_stats.len());
node_stats
.chunks(10)
.map(|chunk| {
let monitor_message = MonitorMessage::new(chunk.to_vec(), private_key);
client.post(&node_submit_url).json(&monitor_message).send()
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
info!("Submitting {} gateway measurements", gateway_stats.len());
gateway_stats
.chunks(10)
.map(|chunk| {
let monitor_message = MonitorMessage::new(
chunk.to_vec(),
PRIVATE_KEY.get().expect("We've set this!"),
let node_submit_url =
format!("{}/{API_VERSION}/{STATUS}/{SUBMIT_NODE}", nym_api_url);
let gateway_submit_url = format!(
"{}/{API_VERSION}/{STATUS}/{SUBMIT_GATEWAY}",
nym_api_url
);
client
.post(&gateway_submit_url)
.json(&monitor_message)
.send()
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
let route_submit_url =
format!("{}/{API_VERSION}/{STATUS}/{SUBMIT_ROUTE}", nym_api_url);
info!("Submitting {} mixnode measurements", node_stats.len());
node_stats
.chunks(10)
.map(|chunk| {
let monitor_results = MonitorResults::Node(chunk.to_vec());
let monitor_message = MonitorMessage::new(monitor_results, private_key);
client.post(&node_submit_url).json(&monitor_message).send()
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
info!("Submitting {} gateway measurements", gateway_stats.len());
gateway_stats
.chunks(10)
.map(|chunk| {
let monitor_results = MonitorResults::Node(chunk.to_vec());
let monitor_message = MonitorMessage::new(
monitor_results,
PRIVATE_KEY.get().expect("We've set this!"),
);
client
.post(&gateway_submit_url)
.json(&monitor_message)
.send()
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
let network_account = NetworkAccount::finalize()?;
let accounting_routes = network_account.accounting_routes;
info!("Submitting {} accounting routes", accounting_routes.len());
match accounting_routes
.chunks(10)
.map(|chunk| {
let route_results = chunk
.iter()
.map(|route| {
RouteResult::new(
route.mix_nodes.0,
route.mix_nodes.1,
route.mix_nodes.2,
route.gateway_node,
route.success,
)
})
.collect::<Vec<RouteResult>>();
let monitor_results = MonitorResults::Route(route_results);
let monitor_message = MonitorMessage::new(monitor_results, private_key);
client.post(&route_submit_url).json(&monitor_message).send()
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
{
Ok(_) => info!(
"Successfully submitted accounting routes to {}",
nym_api_url
),
Err(e) => error!(
"Error submitting accounting routes to {}: {}",
nym_api_url, e
),
};
}
}
} else {
warn!("No private key or nym api urls found");
}
NetworkAccount::empty_buffers();
+29 -7
View File
@@ -10,10 +10,9 @@ use nym_network_defaults::setup_env;
use nym_network_defaults::var_names::NYM_API;
use nym_sdk::mixnet::{self, MixnetClient};
use nym_sphinx::chunking::monitoring;
use nym_topology::{HardcodedTopologyProvider, NymTopology};
use nym_topology::{HardcodedTopologyProvider, NymTopology, Role};
use std::fs::File;
use std::io::Write;
use std::sync::LazyLock;
use std::time::Duration;
use std::{
collections::VecDeque,
@@ -25,9 +24,7 @@ use tokio::sync::OnceCell;
use tokio::{signal::ctrl_c, sync::RwLock};
use tokio_util::sync::CancellationToken;
static NYM_API_URL: LazyLock<String> = LazyLock::new(|| {
std::env::var(NYM_API).unwrap_or_else(|_| panic!("{} env var not set", NYM_API))
});
static NYM_API_URLS: OnceCell<Vec<String>> = OnceCell::const_new();
static MIXNET_TIMEOUT: OnceCell<u64> = OnceCell::const_new();
static TOPOLOGY: OnceCell<NymTopology> = OnceCell::const_new();
@@ -138,6 +135,9 @@ struct Args {
#[arg(long, env = "DATABASE_URL")]
database_url: Option<String>,
#[arg(long, env = "NYM_APIS", value_delimiter = ',')]
nym_apis: Option<Vec<String>>,
}
fn generate_key_pair() -> Result<()> {
@@ -155,7 +155,7 @@ fn generate_key_pair() -> Result<()> {
Ok(())
}
async fn nym_topology_from_env() -> anyhow::Result<NymTopology> {
async fn nym_topology_forced_all_from_env() -> anyhow::Result<NymTopology> {
let api_url = std::env::var(NYM_API)?;
info!("Generating topology from {api_url}");
@@ -172,6 +172,23 @@ async fn nym_topology_from_env() -> anyhow::Result<NymTopology> {
let mut topology = NymTopology::new_empty(rewarded_set);
topology.add_skimmed_nodes(&nodes);
let node_ids = topology
.node_details()
.iter()
.filter(|(_node_id, node)| node.supported_roles.mixnode)
.map(|(node_id, _)| *node_id)
.collect::<Vec<_>>();
// Force all nodes to active to participate in route selection
for (idx, node_id) in node_ids.iter().enumerate() {
match idx % 3 {
0 => topology.force_set_active(*node_id, Role::Layer1),
1 => topology.force_set_active(*node_id, Role::Layer2),
2 => topology.force_set_active(*node_id, Role::Layer3),
_ => unreachable!(), // Unreachable since idx % 3 can only be 0, 1, or 2
}
}
Ok(topology)
}
@@ -200,11 +217,16 @@ async fn main() -> Result<()> {
PRIVATE_KEY.set(pk).ok();
}
if let Some(nym_apis) = args.nym_apis {
info!("Using nym apis: {:?}", nym_apis);
NYM_API_URLS.set(nym_apis).ok();
}
TOPOLOGY
.set(if let Some(topology_file) = args.topology {
NymTopology::new_from_file(topology_file)?
} else {
nym_topology_from_env().await?
nym_topology_forced_all_from_env().await?
})
.ok();