Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| cebb768845 | |||
| d030444659 | |||
| e6bfa3ea09 | |||
| 89839fd2ee | |||
| ec29f4eee0 | |||
| 6ae81a81b9 | |||
| 5b55082e9b | |||
| e1917ff2a3 | |||
| 761f970912 | |||
| 0cdefc5881 | |||
| 85454dc431 |
@@ -0,0 +1,63 @@
|
||||
name: ci-build-upload-network-monitor-agent
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
build-and-upload:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
platform: [arc-ubuntu-22.04]
|
||||
|
||||
runs-on: ${{ matrix.platform }}
|
||||
env:
|
||||
CARGO_TERM_COLOR: always
|
||||
RUSTUP_PERMIT_COPY_RENAME: 1
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
|
||||
- name: Prepare build output directory
|
||||
shell: bash
|
||||
env:
|
||||
OUTPUT_DIR: ci-builds/${{ github.ref_name }}
|
||||
run: |
|
||||
rm -rf ci-builds || true
|
||||
mkdir -p "$OUTPUT_DIR"
|
||||
|
||||
- name: Install Dependencies (Linux)
|
||||
run: sudo apt-get update && sudo apt-get -y install libudev-dev
|
||||
|
||||
- name: Install Rust toolchain
|
||||
uses: dtolnay/rust-toolchain@master
|
||||
with:
|
||||
toolchain: ${{ vars.REQUIRED_RUSTC_VERSION }}
|
||||
|
||||
- name: Build nym-network-monitor-agent
|
||||
shell: bash
|
||||
run: cargo build -p nym-network-monitor-agent --release
|
||||
|
||||
- name: Upload artifact
|
||||
uses: actions/upload-artifact@v6
|
||||
with:
|
||||
name: nym-network-monitor-agent
|
||||
path: target/release/nym-network-monitor-agent
|
||||
retention-days: 30
|
||||
|
||||
- name: Prepare build output
|
||||
shell: bash
|
||||
env:
|
||||
OUTPUT_DIR: ci-builds/${{ github.ref_name }}
|
||||
run: cp target/release/nym-network-monitor-agent "$OUTPUT_DIR"
|
||||
|
||||
- name: Deploy to CI www
|
||||
uses: easingthemes/ssh-deploy@main
|
||||
env:
|
||||
SSH_PRIVATE_KEY: ${{ secrets.CI_WWW_SSH_PRIVATE_KEY }}
|
||||
ARGS: "-avzr"
|
||||
SOURCE: "ci-builds/"
|
||||
REMOTE_HOST: ${{ secrets.CI_WWW_REMOTE_HOST }}
|
||||
REMOTE_USER: ${{ secrets.CI_WWW_REMOTE_USER }}
|
||||
TARGET: ${{ secrets.CI_WWW_REMOTE_TARGET }}/builds/
|
||||
EXCLUDE: "/dist/, /node_modules/"
|
||||
@@ -19,6 +19,7 @@ jobs:
|
||||
RUSTUP_PERMIT_COPY_RENAME: 1
|
||||
permissions:
|
||||
contents: write
|
||||
pull-requests: write
|
||||
steps:
|
||||
- name: Checkout repo
|
||||
uses: actions/checkout@v6
|
||||
@@ -66,11 +67,20 @@ jobs:
|
||||
--no-git-commit \
|
||||
--yes
|
||||
|
||||
- name: Commit and push version bump
|
||||
run: |
|
||||
git add -A
|
||||
git commit -m "crates release: bump version to ${{ inputs.version }}"
|
||||
git push
|
||||
- name: Create pull request
|
||||
uses: peter-evans/create-pull-request@v7
|
||||
with:
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
||||
branch: "chore/bump-version-${{ inputs.version }}"
|
||||
base: ${{ github.ref_name }}
|
||||
commit-message: "crates release: bump version to ${{ inputs.version }}"
|
||||
title: "chore: bump crate versions to ${{ inputs.version }}"
|
||||
body: |
|
||||
Automated version bump from `${{ steps.current_version.outputs.version }}` → `${{ inputs.version }}`.
|
||||
|
||||
Triggered by @${{ github.actor }} via workflow dispatch.
|
||||
labels: "automated, crates-version-bump"
|
||||
delete-branch: true
|
||||
|
||||
- name: Show package versions
|
||||
run: cargo workspaces list --long
|
||||
run: cargo workspaces list --long
|
||||
Generated
+3
-3
@@ -5726,7 +5726,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-api"
|
||||
version = "1.1.80"
|
||||
version = "1.1.80-no-gw-penalty"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@@ -7464,7 +7464,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-network-monitor-agent"
|
||||
version = "1.0.2"
|
||||
version = "1.0.4"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arrayref",
|
||||
@@ -7497,7 +7497,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-network-monitor-orchestrator"
|
||||
version = "1.0.2"
|
||||
version = "1.0.5"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"axum 0.7.9",
|
||||
|
||||
+1
-1
@@ -346,7 +346,7 @@ semver = "1.0.26"
|
||||
serde = "1.0.219"
|
||||
serde_bytes = "0.11.17"
|
||||
serde_derive = "1.0"
|
||||
serde_json = "1.0.140"
|
||||
serde_json = { version = "1.0.140", features = ["float_roundtrip"] }
|
||||
serde_json_path = "0.7.2"
|
||||
serde_repr = "0.1"
|
||||
serde_with = "3.9.0"
|
||||
|
||||
+2
-1
@@ -3,7 +3,7 @@
|
||||
|
||||
[package]
|
||||
name = "nym-api"
|
||||
version = "1.1.80"
|
||||
version = "1.1.80-no-gw-penalty"
|
||||
authors.workspace = true
|
||||
edition = "2021"
|
||||
license = "GPL-3.0"
|
||||
@@ -122,6 +122,7 @@ sqlx = { workspace = true, features = [
|
||||
] }
|
||||
|
||||
[dev-dependencies]
|
||||
nym-api-requests = { workspace = true, features = ["mock-fixtures"] }
|
||||
axum-test = { workspace = true }
|
||||
reqwest = { workspace = true, features = ["json", "query"] }
|
||||
tempfile = { workspace = true }
|
||||
|
||||
@@ -54,6 +54,8 @@ nym-network-defaults = { workspace = true }
|
||||
nym-ticketbooks-merkle = { workspace = true }
|
||||
nym-ecash-signer-check-types = { workspace = true }
|
||||
nym-kkt-ciphersuite = { workspace = true }
|
||||
# only pulled in by the `mock-fixtures` feature; also a dev-dependency for this crate's own tests
|
||||
nym-test-utils = { workspace = true, optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
nym-crypto = { workspace = true, features = ["rand"] }
|
||||
@@ -62,3 +64,5 @@ nym-test-utils = { workspace = true }
|
||||
[features]
|
||||
default = []
|
||||
generate-ts = ["ts-rs", "nym-mixnet-contract-common/generate-ts"]
|
||||
# exposes deterministic data mocks (e.g. `mock_nym_node_description`) to downstream crates' tests
|
||||
mock-fixtures = ["dep:nym-test-utils", "nym-crypto/rand"]
|
||||
|
||||
@@ -267,7 +267,7 @@ impl From<NymNodeDescriptionV1> for NymNodeDescriptionV2 {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[cfg(any(test, feature = "mock-fixtures"))]
|
||||
pub fn mock_nym_node_description(seed: u64) -> NymNodeDescriptionV2 {
|
||||
use nym_node_requests::api::v1::lewes_protocol::models::{LPHashFunction, LPKEM};
|
||||
use nym_test_utils::helpers::{u64_seeded_rng, RngCore};
|
||||
|
||||
@@ -181,4 +181,168 @@ pub mod v3 {
|
||||
/// as an authorised network monitor permitted to submit stress testing results.
|
||||
pub authorised: bool,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::signable::SignableMessageBody;
|
||||
use nym_test_utils::helpers::deterministic_rng;
|
||||
use time::macros::datetime;
|
||||
|
||||
fn dummy_results() -> Vec<StressTestResult> {
|
||||
// Order-distinguishable entries: if deserialisation ever permuted the array, the
|
||||
// re-serialised body would no longer match the signed bytes, and `verify_signature`
|
||||
// would return false. `testrun_id` is the order witness.
|
||||
vec![
|
||||
StressTestResult {
|
||||
testrun_id: 1,
|
||||
node_id: 42,
|
||||
is_mixnode: true,
|
||||
test_timestamp: datetime!(2026-06-01 12:34:56.123456789 UTC),
|
||||
test_performance: 0.6666666666666666,
|
||||
was_reachable: true,
|
||||
},
|
||||
StressTestResult {
|
||||
testrun_id: 2,
|
||||
node_id: 7,
|
||||
is_mixnode: true,
|
||||
test_timestamp: datetime!(2026-06-01 12:34:56 UTC),
|
||||
test_performance: 0.0,
|
||||
was_reachable: false,
|
||||
},
|
||||
StressTestResult {
|
||||
testrun_id: 3,
|
||||
node_id: u32::MAX,
|
||||
is_mixnode: true,
|
||||
test_timestamp: datetime!(2026-06-01 12:34:56.999999999 UTC),
|
||||
test_performance: 1.0,
|
||||
was_reachable: true,
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
// Integrity check on the wire is `serde_json::to_vec(deserialize(serde_json::to_vec(body)))
|
||||
// == serde_json::to_vec(body)`. If JSON serialisation isn't a fixed point, every batch
|
||||
// submission would fail nym-api's signature verification. Cover the timestamp shapes the
|
||||
// orchestrator actually produces, including the `+1ns` bump from the monotonicity safeguard.
|
||||
#[test]
|
||||
fn signed_batch_submission_roundtrips_through_json() {
|
||||
let mut rng = deterministic_rng();
|
||||
let keys = ed25519::KeyPair::new(&mut rng);
|
||||
|
||||
let timestamps = [
|
||||
datetime!(2026-06-01 12:34:56 UTC),
|
||||
datetime!(2026-06-01 12:34:56.000000001 UTC),
|
||||
datetime!(2026-06-01 12:34:56.999999999 UTC),
|
||||
datetime!(2026-06-01 12:34:56.123456789 UTC),
|
||||
OffsetDateTime::now_utc(),
|
||||
OffsetDateTime::now_utc() + time::Duration::NANOSECOND,
|
||||
];
|
||||
|
||||
for timestamp in timestamps {
|
||||
let body = StressTestBatchSubmissionContent {
|
||||
signer: *keys.public_key(),
|
||||
timestamp,
|
||||
results: dummy_results(),
|
||||
};
|
||||
let signed = body.clone().sign(keys.private_key());
|
||||
|
||||
let bytes = serde_json::to_vec(&signed).unwrap();
|
||||
let deserialised: StressTestBatchSubmission =
|
||||
serde_json::from_slice(&bytes).unwrap();
|
||||
|
||||
// The handler verifies against `body.body.signer` — match that exactly.
|
||||
assert!(
|
||||
deserialised.verify_signature(&deserialised.body.signer),
|
||||
"signature failed to verify after JSON round-trip for timestamp {timestamp}",
|
||||
);
|
||||
assert_eq!(deserialised.body.timestamp, timestamp);
|
||||
}
|
||||
}
|
||||
|
||||
// Every f64 that the orchestrator's `received as f64 / sent as f64` formula can produce
|
||||
// (storage/models.rs) must round-trip byte-exactly through JSON. Exhaustively cover the
|
||||
// range and exercise sent values that produce non-terminating fractions (1/3, 1/7, ...).
|
||||
#[test]
|
||||
fn computed_test_performance_values_roundtrip() {
|
||||
for sent in 1u64..=200 {
|
||||
for received in 0u64..=(sent * 2) {
|
||||
let perf = received as f64 / sent as f64;
|
||||
let s = serde_json::to_string(&perf).unwrap();
|
||||
let perf2: f64 = serde_json::from_str(&s).unwrap();
|
||||
let s2 = serde_json::to_string(&perf2).unwrap();
|
||||
assert_eq!(
|
||||
s, s2,
|
||||
"f64 round-trip mismatch for {received}/{sent} = {perf}: {s} -> {s2}",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// serde_json serialises non-finite f64 as `null`. Confirm what the deserialiser does with
|
||||
// `null` for a struct field typed as f64 - if it succeeds with a default value (rather than
|
||||
// erroring), a NaN/Infinity test_performance could silently break signature verification
|
||||
// because the re-serialised body would no longer have `null` at that position.
|
||||
#[test]
|
||||
fn non_finite_test_performance_breaks_loudly_not_silently() {
|
||||
let nan_result = StressTestResult {
|
||||
testrun_id: 1,
|
||||
node_id: 1,
|
||||
is_mixnode: true,
|
||||
test_timestamp: datetime!(2026-06-01 12:34:56 UTC),
|
||||
test_performance: f64::NAN,
|
||||
was_reachable: true,
|
||||
};
|
||||
let json = serde_json::to_string(&nan_result).unwrap();
|
||||
// NaN serialises as `null` - this is the dangerous shape
|
||||
assert!(
|
||||
json.contains(r#""test_performance":null"#),
|
||||
"expected NaN to serialise as null: {json}",
|
||||
);
|
||||
// ...and `null` MUST fail to deserialise rather than silently becoming 0.0 / default;
|
||||
// if this ever changes, NaN would silently corrupt signature verification.
|
||||
let deserialised: Result<StressTestResult, _> = serde_json::from_str(&json);
|
||||
assert!(
|
||||
deserialised.is_err(),
|
||||
"deserialising null into f64 unexpectedly succeeded - signature verification \
|
||||
would silently fail for any submission containing a non-finite test_performance",
|
||||
);
|
||||
}
|
||||
|
||||
// Specifically pin the two hypotheses we want to rule out:
|
||||
// 1. Vec<StressTestResult> serialisation/deserialisation preserves order.
|
||||
// 2. The body bytes serialised standalone (= what gets signed) are byte-identical to
|
||||
// the body sub-object bytes embedded in the outer SignedMessage JSON (= what the
|
||||
// server sees after parsing). Re-serialising the deserialised body must reproduce
|
||||
// the signed bytes verbatim, otherwise no signature could ever verify.
|
||||
#[test]
|
||||
fn batch_body_serialisation_is_a_byte_exact_fixed_point() {
|
||||
let mut rng = deterministic_rng();
|
||||
let keys = ed25519::KeyPair::new(&mut rng);
|
||||
|
||||
let body = StressTestBatchSubmissionContent {
|
||||
signer: *keys.public_key(),
|
||||
timestamp: datetime!(2026-06-01 12:34:56.123456789 UTC),
|
||||
results: dummy_results(),
|
||||
};
|
||||
|
||||
let signed_bytes = body.plaintext();
|
||||
let body_str = std::str::from_utf8(&signed_bytes).unwrap();
|
||||
|
||||
// (1) array order preserved on the wire
|
||||
let pos1 = body_str.find(r#""testrun_id":1"#).unwrap();
|
||||
let pos2 = body_str.find(r#""testrun_id":2"#).unwrap();
|
||||
let pos3 = body_str.find(r#""testrun_id":3"#).unwrap();
|
||||
assert!(pos1 < pos2 && pos2 < pos3, "JSON: {body_str}");
|
||||
|
||||
// (2) round-trip is byte-exact
|
||||
let deserialised: StressTestBatchSubmissionContent =
|
||||
serde_json::from_slice(&signed_bytes).unwrap();
|
||||
let resigned_bytes = deserialised.plaintext();
|
||||
assert_eq!(
|
||||
signed_bytes, resigned_bytes,
|
||||
"deserialise-then-re-serialise was not a fixed point"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+131
-24
@@ -16,7 +16,7 @@ use crate::{
|
||||
node_status_api::cache::NodeStatusCacheError, support::caching::CacheNotification,
|
||||
};
|
||||
use ::time::OffsetDateTime;
|
||||
use nym_api_requests::models::{DetailedNodePerformanceV2, NodeAnnotationV2};
|
||||
use nym_api_requests::models::{DetailedNodePerformanceV2, NodeAnnotationV2, NymNodeDescriptionV2};
|
||||
use nym_mixnet_contract_common::{NodeId, NymNodeDetails};
|
||||
use nym_task::ShutdownToken;
|
||||
use nym_topology::CachedEpochRewardedSet;
|
||||
@@ -218,15 +218,23 @@ impl NodeStatusCacheRefresher {
|
||||
|
||||
let use_stress_testing_scores = self.config.use_stress_testing_data;
|
||||
let threshold = self.config.minimum_available_stress_testing_results;
|
||||
let available_ratio =
|
||||
stress_testing_scores.available_count() as f32 / nym_nodes.len() as f32;
|
||||
|
||||
// Guard against an orchestrator outage silently slashing every node's performance:
|
||||
// if too few nodes have a reachable stress-test sample for the configured window we
|
||||
// Only mixnodes are currently stress-tested: the orchestrator selects test targets by
|
||||
// self-described mixnode capability (see `NodeType::from_roles`), so the availability ratio
|
||||
// must be taken over stress-test-eligible nodes only. Counting gateways in the denominator
|
||||
// would let the network's mixnode:gateway composition - rather than orchestrator health -
|
||||
// decide whether the data is used at all.
|
||||
let eligible_count = nym_nodes
|
||||
.iter()
|
||||
.filter(|n| stress_test_eligible(described_nodes.get_node(&n.node_id())))
|
||||
.count();
|
||||
let available_ratio =
|
||||
stress_availability_ratio(stress_testing_scores.available_count(), eligible_count);
|
||||
|
||||
// Guard against an orchestrator outage silently slashing every eligible node's performance:
|
||||
// if too few mixnodes have a reachable stress-test sample for the configured window we
|
||||
// assume the orchestrator (rather than the network) is at fault and fall back to the
|
||||
// routing × config score alone. The threshold is a network-wide ratio, not per-node —
|
||||
// once it is met, individual nodes without data still score 0 in the weighted formula
|
||||
// by design, on the assumption that the orchestrator tried to test them and failed.
|
||||
// routing × config score alone.
|
||||
let include_stress_testing = use_stress_testing_scores && available_ratio >= threshold;
|
||||
|
||||
if use_stress_testing_scores && !include_stress_testing {
|
||||
@@ -236,30 +244,28 @@ impl NodeStatusCacheRefresher {
|
||||
);
|
||||
}
|
||||
|
||||
// stress testing
|
||||
let sw = self.config.stress_testing_score_weight;
|
||||
|
||||
// not stress testing
|
||||
let nsw = 1.0 - sw;
|
||||
|
||||
for nym_node in nym_nodes {
|
||||
let node_id = nym_node.node_id();
|
||||
let described = described_nodes.get_node(&node_id);
|
||||
let routing_score = routing_scores.get_or_log(node_id);
|
||||
let config_score =
|
||||
calculate_config_score(config_score_data, described_nodes.get_node(&node_id));
|
||||
let config_score = calculate_config_score(config_score_data, described);
|
||||
let stress_testing_score = stress_testing_scores.get_or_log(node_id);
|
||||
|
||||
let performance = if include_stress_testing {
|
||||
// use weighted arithmetic mean (we don't want a single 0 to cause the whole thing to be 0)
|
||||
sw * stress_testing_score.score + nsw * routing_score.score * config_score.score
|
||||
} else {
|
||||
routing_score.score * config_score.score
|
||||
};
|
||||
// a node only takes the stress-testing component if it is actually stress-tested (i.e.
|
||||
// it is a mixnode); gateways have no stress data and must not be penalised for it.
|
||||
let apply_stress = include_stress_testing && stress_test_eligible(described);
|
||||
let performance = node_performance(
|
||||
apply_stress,
|
||||
self.config.stress_testing_score_weight,
|
||||
stress_testing_score.score,
|
||||
routing_score.score,
|
||||
config_score.score,
|
||||
);
|
||||
|
||||
annotations.insert(
|
||||
nym_node.node_id(),
|
||||
node_id,
|
||||
NodeAnnotationV2 {
|
||||
current_role: rewarded_set.role(nym_node.node_id()).map(|r| r.into()),
|
||||
current_role: rewarded_set.role(node_id).map(|r| r.into()),
|
||||
detailed_performance: DetailedNodePerformanceV2::new(
|
||||
performance,
|
||||
routing_score,
|
||||
@@ -333,3 +339,104 @@ impl NodeStatusCacheRefresher {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether `node` is currently in scope for stress testing, and therefore expected to have a
|
||||
/// stress-test sample. This is the single source of truth for stress-test scope and must stay in
|
||||
/// sync with the orchestrator's test-target selection (`NodeType::from_roles`, which keys off the
|
||||
/// self-described role capabilities).
|
||||
///
|
||||
/// A node that is *not* in scope has no stress data by design - not because it failed a test - so
|
||||
/// it must never have the stress component folded into its performance score (otherwise gateways
|
||||
/// would be silently penalised for a test they were never subjected to). Conversely, an in-scope
|
||||
/// node with no sample legitimately scores 0 for stress, guarded network-wide by the availability
|
||||
/// threshold against orchestrator outages.
|
||||
///
|
||||
/// Today only mixnodes are stress-tested; when gateway stress testing lands, widen this predicate
|
||||
/// (e.g. to also accept `entry`/`exit` capable nodes) and nothing else in the scoring path needs
|
||||
/// to change.
|
||||
fn stress_test_eligible(described: Option<&NymNodeDescriptionV2>) -> bool {
|
||||
described
|
||||
.map(|n| n.description.declared_role.mixnode)
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Fraction of stress-test-eligible nodes for which the orchestrator produced a reachable sample.
|
||||
/// The denominator is the eligible count, not the total node count, so the network's role
|
||||
/// composition cannot drag the ratio below the orchestrator-health threshold. Returns 0 when there
|
||||
/// are no eligible nodes (nothing to base a judgement on, so the data is treated as unavailable).
|
||||
fn stress_availability_ratio(available_count: usize, eligible_count: usize) -> f32 {
|
||||
if eligible_count == 0 {
|
||||
0.0
|
||||
} else {
|
||||
available_count as f32 / eligible_count as f32
|
||||
}
|
||||
}
|
||||
|
||||
/// Overall node performance. When the stress-testing component applies, it is a weighted arithmetic
|
||||
/// mean of the stress score and routing × config (so a single 0 doesn't zero the whole thing);
|
||||
/// otherwise it is simply routing × config.
|
||||
fn node_performance(
|
||||
apply_stress: bool,
|
||||
stress_weight: f64,
|
||||
stress_score: f64,
|
||||
routing_score: f64,
|
||||
config_score: f64,
|
||||
) -> f64 {
|
||||
if apply_stress {
|
||||
stress_weight * stress_score + (1.0 - stress_weight) * routing_score * config_score
|
||||
} else {
|
||||
routing_score * config_score
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use nym_api_requests::models::mock_nym_node_description;
|
||||
|
||||
#[test]
|
||||
fn ineligible_nodes_are_not_penalised_for_missing_stress_data() {
|
||||
let sw = 0.2;
|
||||
let stress = 0.0; // no stress sample -> unreachable() score
|
||||
let routing = 0.9;
|
||||
let config = 1.0;
|
||||
|
||||
// an out-of-scope node (e.g. a gateway) is scored on routing × config alone - no haircut
|
||||
let out_of_scope = node_performance(false, sw, stress, routing, config);
|
||||
assert_eq!(out_of_scope, routing * config);
|
||||
|
||||
// an in-scope node (a mixnode) with no/zero stress sample does take the weighted hit
|
||||
let in_scope = node_performance(true, sw, stress, routing, config);
|
||||
assert_eq!(in_scope, sw * stress + (1.0 - sw) * routing * config);
|
||||
assert!(
|
||||
in_scope < out_of_scope,
|
||||
"in-scope node with a 0 stress score should score strictly lower than an out-of-scope one"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn availability_ratio_uses_eligible_denominator() {
|
||||
// every eligible node reachable -> full ratio, no matter how many ineligible nodes
|
||||
// (gateways) also exist in the network.
|
||||
assert_eq!(stress_availability_ratio(5, 5), 1.0);
|
||||
// half the eligible nodes reachable
|
||||
assert_eq!(stress_availability_ratio(3, 6), 0.5);
|
||||
// no eligible nodes -> 0, never a division by zero / NaN
|
||||
assert_eq!(stress_availability_ratio(0, 0), 0.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn only_in_scope_node_types_are_stress_test_eligible() {
|
||||
let mut mixnode = mock_nym_node_description(1);
|
||||
mixnode.description.declared_role.mixnode = true;
|
||||
assert!(stress_test_eligible(Some(&mixnode)));
|
||||
|
||||
// a gateway-only node (not a mixnode) is currently out of scope
|
||||
let mut gateway = mock_nym_node_description(2);
|
||||
gateway.description.declared_role.mixnode = false;
|
||||
assert!(!stress_test_eligible(Some(&gateway)));
|
||||
|
||||
// a node with no self-described data is out of scope (the orchestrator can't classify it)
|
||||
assert!(!stress_test_eligible(None));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "nym-network-monitor-agent"
|
||||
description = "Agent used for stress testing Nym mixnodes"
|
||||
version = "1.0.2"
|
||||
version = "1.0.4"
|
||||
authors.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
@@ -350,6 +350,13 @@ impl NodeStressTester {
|
||||
"did not manage to send all required packets within the sending window. sent {sent}/{total_packets}"
|
||||
);
|
||||
}
|
||||
// Report `total_packets` (= expected) rather than `sent` so the orchestrator's
|
||||
// `received / sent` score formula effectively becomes `received / expected` -
|
||||
// a node that throttled us via TCP back-pressure into not pushing all packets
|
||||
// through is correctly penalised. Per-batch `set_packets_sent(sent)` updates
|
||||
// above remain in place for the `Ok(false)` early-exit (send error) path, so
|
||||
// partial-progress visibility is preserved when the test aborts mid-run.
|
||||
result.set_packets_sent(total_packets);
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
|
||||
@@ -144,7 +144,7 @@ impl MixnetListener {
|
||||
/// Returns `self` so that the caller can inspect fields such as
|
||||
/// [`last_noise_handshake_duration`](Self::last_noise_handshake_duration) after the run.
|
||||
pub(crate) async fn run(mut self, on_start: Arc<Notify>) -> Self {
|
||||
on_start.notify_waiters();
|
||||
on_start.notify_one();
|
||||
// only handle a single connection at once
|
||||
// (we don't need more than that)
|
||||
loop {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "nym-network-monitor-orchestrator"
|
||||
description = "Orchestrator for performing Nym network stress testing"
|
||||
version = "1.0.2"
|
||||
version = "1.0.5"
|
||||
authors.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
@@ -37,4 +37,7 @@ pub mod vars {
|
||||
"NYM_NETWORK_MONITOR_CHAIN_AUTH_CHECK_RETRY_DELAY";
|
||||
pub const NYM_NETWORK_MONITOR_RESULT_SUBMISSION_INTERVAL_ARG: &str =
|
||||
"NYM_NETWORK_MONITOR_RESULT_SUBMISSION_INTERVAL";
|
||||
|
||||
pub const NYM_NETWORK_MONITOR_RESULT_SUBMISSION_BATCH_SIZE_ARG: &str =
|
||||
"NYM_NETWORK_MONITOR_RESULT_SUBMISSION_BATCH_SIZE";
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ use nym_crypto::asymmetric::ed25519;
|
||||
use nym_validator_client::nyxd::bip39;
|
||||
use std::mem;
|
||||
use std::net::SocketAddr;
|
||||
use std::num::NonZeroU32;
|
||||
use std::num::{NonZeroU32, NonZeroUsize};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -109,6 +109,10 @@ pub(crate) struct Args {
|
||||
/// batch submission (e.g. `15m`, `1h`).
|
||||
#[clap(long, env = NYM_NETWORK_MONITOR_RESULT_SUBMISSION_INTERVAL_ARG, value_parser = humantime::parse_duration, default_value = "15m")]
|
||||
result_submission_interval: Duration,
|
||||
|
||||
/// Maximum number of stress testing results to submit in a single POST request
|
||||
#[clap(long, env = NYM_NETWORK_MONITOR_RESULT_SUBMISSION_BATCH_SIZE_ARG, default_value = "50")]
|
||||
result_submission_batch_size: NonZeroUsize,
|
||||
}
|
||||
|
||||
impl Args {
|
||||
@@ -145,6 +149,7 @@ impl Args {
|
||||
chain_authorisation_check_max_attempts: self.chain_authorisation_check_max_attempts,
|
||||
chain_authorisation_check_retry_delay: self.chain_authorisation_check_retry_delay,
|
||||
result_submission_interval: self.result_submission_interval,
|
||||
result_submission_batch_size: self.result_submission_batch_size.get(),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -71,6 +71,9 @@ pub(crate) struct Config {
|
||||
/// How often the orchestrator flushes accumulated test results to the nym-api as a signed
|
||||
/// batch submission (e.g. `15m`, `1h`).
|
||||
pub(crate) result_submission_interval: Duration,
|
||||
|
||||
/// Maximum number of stress testing results to submit in a single POST request
|
||||
pub(crate) result_submission_batch_size: usize,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
|
||||
+3
-3
@@ -23,7 +23,7 @@ use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
use tokio::time::{Instant, interval};
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
pub(crate) struct NodeRefresher {
|
||||
pub(crate) client: QueryHttpRpcNyxdClient,
|
||||
@@ -142,13 +142,13 @@ impl NodeRefresher {
|
||||
.await
|
||||
{
|
||||
Err(_timeout) => {
|
||||
warn!(
|
||||
debug!(
|
||||
"timed out while attempting to retrieve self-described node details for node {node_id}"
|
||||
);
|
||||
return node_update;
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
error!("failed to retrieve self-described node details for node {node_id}: {err}");
|
||||
debug!("failed to retrieve self-described node details for node {node_id}: {err}");
|
||||
return node_update;
|
||||
}
|
||||
Ok(Ok(info)) => info,
|
||||
|
||||
+50
-19
@@ -12,8 +12,9 @@ use nym_validator_client::nym_api::NymApiClientExt;
|
||||
use nym_validator_client::signable::SignableMessageBody;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::time::{Instant, MissedTickBehavior, interval_at};
|
||||
use tracing::{debug, info};
|
||||
use tracing::info;
|
||||
|
||||
/// Background task that periodically drains freshly-completed test run results from the local
|
||||
/// storage, wraps them into a signed [`StressTestBatchSubmission`][batch], and POSTs the batch to
|
||||
@@ -38,6 +39,9 @@ pub(crate) struct ResultSubmitter {
|
||||
/// Cadence at which [`Self::run`] attempts a submission sweep.
|
||||
submission_interval: Duration,
|
||||
|
||||
/// Maximum number of stress testing results to submit in a single POST request
|
||||
result_submission_batch_size: usize,
|
||||
|
||||
shutdown_token: ShutdownToken,
|
||||
}
|
||||
|
||||
@@ -54,6 +58,7 @@ impl ResultSubmitter {
|
||||
storage,
|
||||
identity_keys,
|
||||
submission_interval: config.result_submission_interval,
|
||||
result_submission_batch_size: config.result_submission_batch_size,
|
||||
shutdown_token,
|
||||
}
|
||||
}
|
||||
@@ -74,7 +79,7 @@ impl ResultSubmitter {
|
||||
///
|
||||
/// [batch]: nym_api_requests::models::network_monitor::StressTestBatchSubmission
|
||||
async fn submit_pending_results(&self) -> anyhow::Result<()> {
|
||||
info!("submitting stress-test results to nym-api");
|
||||
info!("attempting to submit stress-test results to nym-api");
|
||||
let last_submitted = self.storage.get_last_submitted_testrun_id().await?;
|
||||
// `None` means "never submitted" - treat as 0, which pulls everything currently in the
|
||||
// table (testrun.id is AUTOINCREMENT, so always >= 1).
|
||||
@@ -82,29 +87,55 @@ impl ResultSubmitter {
|
||||
|
||||
let pending = self.storage.get_testruns_after(after_id).await?;
|
||||
if pending.is_empty() {
|
||||
debug!("stress-test result submission sweep: no new results");
|
||||
info!("stress-test result submission sweep: no new results");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// `get_testruns_after` returns rows ordered by id ASC, so the last row carries the
|
||||
// highest id and is what we advance the watermark to once the batch is accepted.
|
||||
#[allow(clippy::expect_used)]
|
||||
let max_id = pending.last().expect("pending is non-empty").id;
|
||||
let batch_size = pending.len();
|
||||
|
||||
let results: Vec<StressTestResult> = pending.into_iter().map(Into::into).collect();
|
||||
info!("{} pending stress test results to submit", pending.len());
|
||||
|
||||
let signer = *self.identity_keys.public_key();
|
||||
let body = StressTestBatchSubmissionContent::new(signer, results);
|
||||
let signed = body.sign(self.identity_keys.private_key());
|
||||
// nym-api requires each submission's timestamp to be strictly greater than the previous one
|
||||
// for a given signer (replay protection). Within a single sweep, two consecutive chunks
|
||||
// could otherwise share a `now_utc()` reading if the host clock has too-coarse resolution
|
||||
// or steps backwards, which would get the second chunk rejected. Track the last timestamp
|
||||
// we used and bump by a nanosecond if `now_utc()` hasn't advanced past it.
|
||||
let mut last_timestamp = OffsetDateTime::now_utc();
|
||||
|
||||
self.client
|
||||
.submit_stress_testing_results(&signed)
|
||||
.await
|
||||
.context("failed to POST stress-test batch submission to nym-api")?;
|
||||
for chunk in pending.chunks(self.result_submission_batch_size) {
|
||||
// `get_testruns_after` returns rows ordered by id ASC, so the last row carries the
|
||||
// highest id and is what we advance the watermark to once the batch is accepted.
|
||||
#[allow(clippy::expect_used)]
|
||||
let max_id = chunk.last().expect("chunk is non-empty").id;
|
||||
let batch_size = chunk.len();
|
||||
|
||||
let results: Vec<StressTestResult> = chunk.iter().map(Into::into).collect();
|
||||
|
||||
let now = OffsetDateTime::now_utc();
|
||||
let timestamp = if now > last_timestamp {
|
||||
now
|
||||
} else {
|
||||
last_timestamp + time::Duration::NANOSECOND
|
||||
};
|
||||
last_timestamp = timestamp;
|
||||
|
||||
let body = StressTestBatchSubmissionContent {
|
||||
signer,
|
||||
timestamp,
|
||||
results,
|
||||
};
|
||||
let signed = body.sign(self.identity_keys.private_key());
|
||||
|
||||
self.client
|
||||
.submit_stress_testing_results(&signed)
|
||||
.await
|
||||
.context("failed to POST stress-test batch submission to nym-api")?;
|
||||
|
||||
self.storage.set_last_submitted_testrun_id(max_id).await?;
|
||||
info!(
|
||||
"submitted {batch_size} stress-test results batch to nym-api (testrun ids up to {max_id})"
|
||||
);
|
||||
}
|
||||
|
||||
self.storage.set_last_submitted_testrun_id(max_id).await?;
|
||||
info!("submitted {batch_size} stress-test results to nym-api (testrun ids up to {max_id})");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -130,7 +161,7 @@ impl ResultSubmitter {
|
||||
// Submission errors shouldn't kill the task - local storage retains the
|
||||
// pending rows until the retention window expires, so the next tick will
|
||||
// retry and eventually catch up once the nym-api is reachable again.
|
||||
tracing::error!("failed to submit stress-test results: {err}");
|
||||
tracing::error!("failed to submit stress-test results: {err:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -274,6 +274,14 @@ impl From<TestRun> for StressTestResult {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&TestRun> for StressTestResult {
|
||||
fn from(run: &TestRun) -> Self {
|
||||
// the data is small enough that cloning is negligible
|
||||
// (since we're going to be converting at most couple dozen a minute rather than a few billion...)
|
||||
run.clone().into()
|
||||
}
|
||||
}
|
||||
|
||||
/// The data required to insert or update a row in `nym_node`. Does not carry `last_testrun`
|
||||
/// since that is managed separately via [`StorageManager::set_node_last_testrun`].
|
||||
#[derive(Debug, Clone, sqlx::FromRow)]
|
||||
|
||||
Reference in New Issue
Block a user