Compare commits

...

11 Commits

Author SHA1 Message Date
Jędrzej Stuczyński cebb768845 fix gateways being penalised for no stress testing 2026-06-01 17:00:58 +01:00
Jędrzej Stuczyński d030444659 fix score inflation for throttled nodes 2026-06-01 13:59:17 +01:00
Jędrzej Stuczyński e6bfa3ea09 enable 'float_roundtrip' serde_json feature to ensure consistent float serialisation 2026-06-01 11:04:24 +01:00
Jędrzej Stuczyński 89839fd2ee split stress testing result submission into batches of maximum size 2026-06-01 10:19:22 +01:00
Jędrzej Stuczyński ec29f4eee0 add additional information upon stress testing data submission failure 2026-06-01 10:00:45 +01:00
Jędrzej Stuczyński 6ae81a81b9 chore: bump up version number 2026-05-29 14:09:44 +01:00
Jędrzej Stuczyński 5b55082e9b reduced log severity for retrieving self-described node information 2026-05-29 14:09:21 +01:00
Jędrzej Stuczyński e1917ff2a3 fixed race condition in mixnet listener creation notification 2026-05-29 14:07:17 +01:00
benedettadavico 761f970912 backport ci to waterloo 2026-05-29 14:40:25 +02:00
benedettadavico 0cdefc5881 Merge remote-tracking branch 'origin/release/2026.10-waterloo' into release/2026.10-waterloo 2026-05-27 16:53:10 +02:00
benedettadavico 85454dc431 fix crates bump 2026-05-27 16:53:00 +02:00
19 changed files with 468 additions and 62 deletions
@@ -0,0 +1,63 @@
name: ci-build-upload-network-monitor-agent
on:
workflow_dispatch:
jobs:
build-and-upload:
strategy:
fail-fast: false
matrix:
platform: [arc-ubuntu-22.04]
runs-on: ${{ matrix.platform }}
env:
CARGO_TERM_COLOR: always
RUSTUP_PERMIT_COPY_RENAME: 1
steps:
- uses: actions/checkout@v6
- name: Prepare build output directory
shell: bash
env:
OUTPUT_DIR: ci-builds/${{ github.ref_name }}
run: |
rm -rf ci-builds || true
mkdir -p "$OUTPUT_DIR"
- name: Install Dependencies (Linux)
run: sudo apt-get update && sudo apt-get -y install libudev-dev
- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ vars.REQUIRED_RUSTC_VERSION }}
- name: Build nym-network-monitor-agent
shell: bash
run: cargo build -p nym-network-monitor-agent --release
- name: Upload artifact
uses: actions/upload-artifact@v6
with:
name: nym-network-monitor-agent
path: target/release/nym-network-monitor-agent
retention-days: 30
- name: Prepare build output
shell: bash
env:
OUTPUT_DIR: ci-builds/${{ github.ref_name }}
run: cp target/release/nym-network-monitor-agent "$OUTPUT_DIR"
- name: Deploy to CI www
uses: easingthemes/ssh-deploy@main
env:
SSH_PRIVATE_KEY: ${{ secrets.CI_WWW_SSH_PRIVATE_KEY }}
ARGS: "-avzr"
SOURCE: "ci-builds/"
REMOTE_HOST: ${{ secrets.CI_WWW_REMOTE_HOST }}
REMOTE_USER: ${{ secrets.CI_WWW_REMOTE_USER }}
TARGET: ${{ secrets.CI_WWW_REMOTE_TARGET }}/builds/
EXCLUDE: "/dist/, /node_modules/"
+16 -6
View File
@@ -19,6 +19,7 @@ jobs:
RUSTUP_PERMIT_COPY_RENAME: 1
permissions:
contents: write
pull-requests: write
steps:
- name: Checkout repo
uses: actions/checkout@v6
@@ -66,11 +67,20 @@ jobs:
--no-git-commit \
--yes
- name: Commit and push version bump
run: |
git add -A
git commit -m "crates release: bump version to ${{ inputs.version }}"
git push
- name: Create pull request
uses: peter-evans/create-pull-request@v7
with:
token: ${{ secrets.GITHUB_TOKEN }}
branch: "chore/bump-version-${{ inputs.version }}"
base: ${{ github.ref_name }}
commit-message: "crates release: bump version to ${{ inputs.version }}"
title: "chore: bump crate versions to ${{ inputs.version }}"
body: |
Automated version bump from `${{ steps.current_version.outputs.version }}` → `${{ inputs.version }}`.
Triggered by @${{ github.actor }} via workflow dispatch.
labels: "automated, crates-version-bump"
delete-branch: true
- name: Show package versions
run: cargo workspaces list --long
run: cargo workspaces list --long
Generated
+3 -3
View File
@@ -5726,7 +5726,7 @@ dependencies = [
[[package]]
name = "nym-api"
version = "1.1.80"
version = "1.1.80-no-gw-penalty"
dependencies = [
"anyhow",
"async-trait",
@@ -7464,7 +7464,7 @@ dependencies = [
[[package]]
name = "nym-network-monitor-agent"
version = "1.0.2"
version = "1.0.4"
dependencies = [
"anyhow",
"arrayref",
@@ -7497,7 +7497,7 @@ dependencies = [
[[package]]
name = "nym-network-monitor-orchestrator"
version = "1.0.2"
version = "1.0.5"
dependencies = [
"anyhow",
"axum 0.7.9",
+1 -1
View File
@@ -346,7 +346,7 @@ semver = "1.0.26"
serde = "1.0.219"
serde_bytes = "0.11.17"
serde_derive = "1.0"
serde_json = "1.0.140"
serde_json = { version = "1.0.140", features = ["float_roundtrip"] }
serde_json_path = "0.7.2"
serde_repr = "0.1"
serde_with = "3.9.0"
+2 -1
View File
@@ -3,7 +3,7 @@
[package]
name = "nym-api"
version = "1.1.80"
version = "1.1.80-no-gw-penalty"
authors.workspace = true
edition = "2021"
license = "GPL-3.0"
@@ -122,6 +122,7 @@ sqlx = { workspace = true, features = [
] }
[dev-dependencies]
nym-api-requests = { workspace = true, features = ["mock-fixtures"] }
axum-test = { workspace = true }
reqwest = { workspace = true, features = ["json", "query"] }
tempfile = { workspace = true }
+4
View File
@@ -54,6 +54,8 @@ nym-network-defaults = { workspace = true }
nym-ticketbooks-merkle = { workspace = true }
nym-ecash-signer-check-types = { workspace = true }
nym-kkt-ciphersuite = { workspace = true }
# only pulled in by the `mock-fixtures` feature; also a dev-dependency for this crate's own tests
nym-test-utils = { workspace = true, optional = true }
[dev-dependencies]
nym-crypto = { workspace = true, features = ["rand"] }
@@ -62,3 +64,5 @@ nym-test-utils = { workspace = true }
[features]
default = []
generate-ts = ["ts-rs", "nym-mixnet-contract-common/generate-ts"]
# exposes deterministic data mocks (e.g. `mock_nym_node_description`) to downstream crates' tests
mock-fixtures = ["dep:nym-test-utils", "nym-crypto/rand"]
@@ -267,7 +267,7 @@ impl From<NymNodeDescriptionV1> for NymNodeDescriptionV2 {
}
}
#[cfg(test)]
#[cfg(any(test, feature = "mock-fixtures"))]
pub fn mock_nym_node_description(seed: u64) -> NymNodeDescriptionV2 {
use nym_node_requests::api::v1::lewes_protocol::models::{LPHashFunction, LPKEM};
use nym_test_utils::helpers::{u64_seeded_rng, RngCore};
@@ -181,4 +181,168 @@ pub mod v3 {
/// as an authorised network monitor permitted to submit stress testing results.
pub authorised: bool,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::signable::SignableMessageBody;
use nym_test_utils::helpers::deterministic_rng;
use time::macros::datetime;
fn dummy_results() -> Vec<StressTestResult> {
// Order-distinguishable entries: if deserialisation ever permuted the array, the
// re-serialised body would no longer match the signed bytes, and `verify_signature`
// would return false. `testrun_id` is the order witness.
vec![
StressTestResult {
testrun_id: 1,
node_id: 42,
is_mixnode: true,
test_timestamp: datetime!(2026-06-01 12:34:56.123456789 UTC),
test_performance: 0.6666666666666666,
was_reachable: true,
},
StressTestResult {
testrun_id: 2,
node_id: 7,
is_mixnode: true,
test_timestamp: datetime!(2026-06-01 12:34:56 UTC),
test_performance: 0.0,
was_reachable: false,
},
StressTestResult {
testrun_id: 3,
node_id: u32::MAX,
is_mixnode: true,
test_timestamp: datetime!(2026-06-01 12:34:56.999999999 UTC),
test_performance: 1.0,
was_reachable: true,
},
]
}
// Integrity check on the wire is `serde_json::to_vec(deserialize(serde_json::to_vec(body)))
// == serde_json::to_vec(body)`. If JSON serialisation isn't a fixed point, every batch
// submission would fail nym-api's signature verification. Cover the timestamp shapes the
// orchestrator actually produces, including the `+1ns` bump from the monotonicity safeguard.
#[test]
fn signed_batch_submission_roundtrips_through_json() {
let mut rng = deterministic_rng();
let keys = ed25519::KeyPair::new(&mut rng);
let timestamps = [
datetime!(2026-06-01 12:34:56 UTC),
datetime!(2026-06-01 12:34:56.000000001 UTC),
datetime!(2026-06-01 12:34:56.999999999 UTC),
datetime!(2026-06-01 12:34:56.123456789 UTC),
OffsetDateTime::now_utc(),
OffsetDateTime::now_utc() + time::Duration::NANOSECOND,
];
for timestamp in timestamps {
let body = StressTestBatchSubmissionContent {
signer: *keys.public_key(),
timestamp,
results: dummy_results(),
};
let signed = body.clone().sign(keys.private_key());
let bytes = serde_json::to_vec(&signed).unwrap();
let deserialised: StressTestBatchSubmission =
serde_json::from_slice(&bytes).unwrap();
// The handler verifies against `body.body.signer` — match that exactly.
assert!(
deserialised.verify_signature(&deserialised.body.signer),
"signature failed to verify after JSON round-trip for timestamp {timestamp}",
);
assert_eq!(deserialised.body.timestamp, timestamp);
}
}
// Every f64 that the orchestrator's `received as f64 / sent as f64` formula can produce
// (storage/models.rs) must round-trip byte-exactly through JSON. Exhaustively cover the
// range and exercise sent values that produce non-terminating fractions (1/3, 1/7, ...).
#[test]
fn computed_test_performance_values_roundtrip() {
for sent in 1u64..=200 {
for received in 0u64..=(sent * 2) {
let perf = received as f64 / sent as f64;
let s = serde_json::to_string(&perf).unwrap();
let perf2: f64 = serde_json::from_str(&s).unwrap();
let s2 = serde_json::to_string(&perf2).unwrap();
assert_eq!(
s, s2,
"f64 round-trip mismatch for {received}/{sent} = {perf}: {s} -> {s2}",
);
}
}
}
// serde_json serialises non-finite f64 as `null`. Confirm what the deserialiser does with
// `null` for a struct field typed as f64 - if it succeeds with a default value (rather than
// erroring), a NaN/Infinity test_performance could silently break signature verification
// because the re-serialised body would no longer have `null` at that position.
#[test]
fn non_finite_test_performance_breaks_loudly_not_silently() {
let nan_result = StressTestResult {
testrun_id: 1,
node_id: 1,
is_mixnode: true,
test_timestamp: datetime!(2026-06-01 12:34:56 UTC),
test_performance: f64::NAN,
was_reachable: true,
};
let json = serde_json::to_string(&nan_result).unwrap();
// NaN serialises as `null` - this is the dangerous shape
assert!(
json.contains(r#""test_performance":null"#),
"expected NaN to serialise as null: {json}",
);
// ...and `null` MUST fail to deserialise rather than silently becoming 0.0 / default;
// if this ever changes, NaN would silently corrupt signature verification.
let deserialised: Result<StressTestResult, _> = serde_json::from_str(&json);
assert!(
deserialised.is_err(),
"deserialising null into f64 unexpectedly succeeded - signature verification \
would silently fail for any submission containing a non-finite test_performance",
);
}
// Specifically pin the two hypotheses we want to rule out:
// 1. Vec<StressTestResult> serialisation/deserialisation preserves order.
// 2. The body bytes serialised standalone (= what gets signed) are byte-identical to
// the body sub-object bytes embedded in the outer SignedMessage JSON (= what the
// server sees after parsing). Re-serialising the deserialised body must reproduce
// the signed bytes verbatim, otherwise no signature could ever verify.
#[test]
fn batch_body_serialisation_is_a_byte_exact_fixed_point() {
let mut rng = deterministic_rng();
let keys = ed25519::KeyPair::new(&mut rng);
let body = StressTestBatchSubmissionContent {
signer: *keys.public_key(),
timestamp: datetime!(2026-06-01 12:34:56.123456789 UTC),
results: dummy_results(),
};
let signed_bytes = body.plaintext();
let body_str = std::str::from_utf8(&signed_bytes).unwrap();
// (1) array order preserved on the wire
let pos1 = body_str.find(r#""testrun_id":1"#).unwrap();
let pos2 = body_str.find(r#""testrun_id":2"#).unwrap();
let pos3 = body_str.find(r#""testrun_id":3"#).unwrap();
assert!(pos1 < pos2 && pos2 < pos3, "JSON: {body_str}");
// (2) round-trip is byte-exact
let deserialised: StressTestBatchSubmissionContent =
serde_json::from_slice(&signed_bytes).unwrap();
let resigned_bytes = deserialised.plaintext();
assert_eq!(
signed_bytes, resigned_bytes,
"deserialise-then-re-serialise was not a fixed point"
);
}
}
}
+131 -24
View File
@@ -16,7 +16,7 @@ use crate::{
node_status_api::cache::NodeStatusCacheError, support::caching::CacheNotification,
};
use ::time::OffsetDateTime;
use nym_api_requests::models::{DetailedNodePerformanceV2, NodeAnnotationV2};
use nym_api_requests::models::{DetailedNodePerformanceV2, NodeAnnotationV2, NymNodeDescriptionV2};
use nym_mixnet_contract_common::{NodeId, NymNodeDetails};
use nym_task::ShutdownToken;
use nym_topology::CachedEpochRewardedSet;
@@ -218,15 +218,23 @@ impl NodeStatusCacheRefresher {
let use_stress_testing_scores = self.config.use_stress_testing_data;
let threshold = self.config.minimum_available_stress_testing_results;
let available_ratio =
stress_testing_scores.available_count() as f32 / nym_nodes.len() as f32;
// Guard against an orchestrator outage silently slashing every node's performance:
// if too few nodes have a reachable stress-test sample for the configured window we
// Only mixnodes are currently stress-tested: the orchestrator selects test targets by
// self-described mixnode capability (see `NodeType::from_roles`), so the availability ratio
// must be taken over stress-test-eligible nodes only. Counting gateways in the denominator
// would let the network's mixnode:gateway composition - rather than orchestrator health -
// decide whether the data is used at all.
let eligible_count = nym_nodes
.iter()
.filter(|n| stress_test_eligible(described_nodes.get_node(&n.node_id())))
.count();
let available_ratio =
stress_availability_ratio(stress_testing_scores.available_count(), eligible_count);
// Guard against an orchestrator outage silently slashing every eligible node's performance:
// if too few mixnodes have a reachable stress-test sample for the configured window we
// assume the orchestrator (rather than the network) is at fault and fall back to the
// routing × config score alone. The threshold is a network-wide ratio, not per-node —
// once it is met, individual nodes without data still score 0 in the weighted formula
// by design, on the assumption that the orchestrator tried to test them and failed.
// routing × config score alone.
let include_stress_testing = use_stress_testing_scores && available_ratio >= threshold;
if use_stress_testing_scores && !include_stress_testing {
@@ -236,30 +244,28 @@ impl NodeStatusCacheRefresher {
);
}
// stress testing
let sw = self.config.stress_testing_score_weight;
// not stress testing
let nsw = 1.0 - sw;
for nym_node in nym_nodes {
let node_id = nym_node.node_id();
let described = described_nodes.get_node(&node_id);
let routing_score = routing_scores.get_or_log(node_id);
let config_score =
calculate_config_score(config_score_data, described_nodes.get_node(&node_id));
let config_score = calculate_config_score(config_score_data, described);
let stress_testing_score = stress_testing_scores.get_or_log(node_id);
let performance = if include_stress_testing {
// use weighted arithmetic mean (we don't want a single 0 to cause the whole thing to be 0)
sw * stress_testing_score.score + nsw * routing_score.score * config_score.score
} else {
routing_score.score * config_score.score
};
// a node only takes the stress-testing component if it is actually stress-tested (i.e.
// it is a mixnode); gateways have no stress data and must not be penalised for it.
let apply_stress = include_stress_testing && stress_test_eligible(described);
let performance = node_performance(
apply_stress,
self.config.stress_testing_score_weight,
stress_testing_score.score,
routing_score.score,
config_score.score,
);
annotations.insert(
nym_node.node_id(),
node_id,
NodeAnnotationV2 {
current_role: rewarded_set.role(nym_node.node_id()).map(|r| r.into()),
current_role: rewarded_set.role(node_id).map(|r| r.into()),
detailed_performance: DetailedNodePerformanceV2::new(
performance,
routing_score,
@@ -333,3 +339,104 @@ impl NodeStatusCacheRefresher {
Ok(())
}
}
/// Whether `node` is currently in scope for stress testing, and therefore expected to have a
/// stress-test sample. This is the single source of truth for stress-test scope and must stay in
/// sync with the orchestrator's test-target selection (`NodeType::from_roles`, which keys off the
/// self-described role capabilities).
///
/// A node that is *not* in scope has no stress data by design - not because it failed a test - so
/// it must never have the stress component folded into its performance score (otherwise gateways
/// would be silently penalised for a test they were never subjected to). Conversely, an in-scope
/// node with no sample legitimately scores 0 for stress, guarded network-wide by the availability
/// threshold against orchestrator outages.
///
/// Today only mixnodes are stress-tested; when gateway stress testing lands, widen this predicate
/// (e.g. to also accept `entry`/`exit` capable nodes) and nothing else in the scoring path needs
/// to change.
fn stress_test_eligible(described: Option<&NymNodeDescriptionV2>) -> bool {
described
.map(|n| n.description.declared_role.mixnode)
.unwrap_or(false)
}
/// Fraction of stress-test-eligible nodes for which the orchestrator produced a reachable sample.
/// The denominator is the eligible count, not the total node count, so the network's role
/// composition cannot drag the ratio below the orchestrator-health threshold. Returns 0 when there
/// are no eligible nodes (nothing to base a judgement on, so the data is treated as unavailable).
fn stress_availability_ratio(available_count: usize, eligible_count: usize) -> f32 {
if eligible_count == 0 {
0.0
} else {
available_count as f32 / eligible_count as f32
}
}
/// Overall node performance. When the stress-testing component applies, it is a weighted arithmetic
/// mean of the stress score and routing × config (so a single 0 doesn't zero the whole thing);
/// otherwise it is simply routing × config.
fn node_performance(
apply_stress: bool,
stress_weight: f64,
stress_score: f64,
routing_score: f64,
config_score: f64,
) -> f64 {
if apply_stress {
stress_weight * stress_score + (1.0 - stress_weight) * routing_score * config_score
} else {
routing_score * config_score
}
}
#[cfg(test)]
mod tests {
use super::*;
use nym_api_requests::models::mock_nym_node_description;
#[test]
fn ineligible_nodes_are_not_penalised_for_missing_stress_data() {
let sw = 0.2;
let stress = 0.0; // no stress sample -> unreachable() score
let routing = 0.9;
let config = 1.0;
// an out-of-scope node (e.g. a gateway) is scored on routing × config alone - no haircut
let out_of_scope = node_performance(false, sw, stress, routing, config);
assert_eq!(out_of_scope, routing * config);
// an in-scope node (a mixnode) with no/zero stress sample does take the weighted hit
let in_scope = node_performance(true, sw, stress, routing, config);
assert_eq!(in_scope, sw * stress + (1.0 - sw) * routing * config);
assert!(
in_scope < out_of_scope,
"in-scope node with a 0 stress score should score strictly lower than an out-of-scope one"
);
}
#[test]
fn availability_ratio_uses_eligible_denominator() {
// every eligible node reachable -> full ratio, no matter how many ineligible nodes
// (gateways) also exist in the network.
assert_eq!(stress_availability_ratio(5, 5), 1.0);
// half the eligible nodes reachable
assert_eq!(stress_availability_ratio(3, 6), 0.5);
// no eligible nodes -> 0, never a division by zero / NaN
assert_eq!(stress_availability_ratio(0, 0), 0.0);
}
#[test]
fn only_in_scope_node_types_are_stress_test_eligible() {
let mut mixnode = mock_nym_node_description(1);
mixnode.description.declared_role.mixnode = true;
assert!(stress_test_eligible(Some(&mixnode)));
// a gateway-only node (not a mixnode) is currently out of scope
let mut gateway = mock_nym_node_description(2);
gateway.description.declared_role.mixnode = false;
assert!(!stress_test_eligible(Some(&gateway)));
// a node with no self-described data is out of scope (the orchestrator can't classify it)
assert!(!stress_test_eligible(None));
}
}
@@ -1,7 +1,7 @@
[package]
name = "nym-network-monitor-agent"
description = "Agent used for stress testing Nym mixnodes"
version = "1.0.2"
version = "1.0.4"
authors.workspace = true
edition.workspace = true
license.workspace = true
@@ -350,6 +350,13 @@ impl NodeStressTester {
"did not manage to send all required packets within the sending window. sent {sent}/{total_packets}"
);
}
// Report `total_packets` (= expected) rather than `sent` so the orchestrator's
// `received / sent` score formula effectively becomes `received / expected` -
// a node that throttled us via TCP back-pressure into not pushing all packets
// through is correctly penalised. Per-batch `set_packets_sent(sent)` updates
// above remain in place for the `Ok(false)` early-exit (send error) path, so
// partial-progress visibility is preserved when the test aborts mid-run.
result.set_packets_sent(total_packets);
Ok(true)
}
@@ -144,7 +144,7 @@ impl MixnetListener {
/// Returns `self` so that the caller can inspect fields such as
/// [`last_noise_handshake_duration`](Self::last_noise_handshake_duration) after the run.
pub(crate) async fn run(mut self, on_start: Arc<Notify>) -> Self {
on_start.notify_waiters();
on_start.notify_one();
// only handle a single connection at once
// (we don't need more than that)
loop {
@@ -1,7 +1,7 @@
[package]
name = "nym-network-monitor-orchestrator"
description = "Orchestrator for performing Nym network stress testing"
version = "1.0.2"
version = "1.0.5"
authors.workspace = true
edition.workspace = true
license.workspace = true
@@ -37,4 +37,7 @@ pub mod vars {
"NYM_NETWORK_MONITOR_CHAIN_AUTH_CHECK_RETRY_DELAY";
pub const NYM_NETWORK_MONITOR_RESULT_SUBMISSION_INTERVAL_ARG: &str =
"NYM_NETWORK_MONITOR_RESULT_SUBMISSION_INTERVAL";
pub const NYM_NETWORK_MONITOR_RESULT_SUBMISSION_BATCH_SIZE_ARG: &str =
"NYM_NETWORK_MONITOR_RESULT_SUBMISSION_BATCH_SIZE";
}
@@ -9,7 +9,7 @@ use nym_crypto::asymmetric::ed25519;
use nym_validator_client::nyxd::bip39;
use std::mem;
use std::net::SocketAddr;
use std::num::NonZeroU32;
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
@@ -109,6 +109,10 @@ pub(crate) struct Args {
/// batch submission (e.g. `15m`, `1h`).
#[clap(long, env = NYM_NETWORK_MONITOR_RESULT_SUBMISSION_INTERVAL_ARG, value_parser = humantime::parse_duration, default_value = "15m")]
result_submission_interval: Duration,
/// Maximum number of stress testing results to submit in a single POST request
#[clap(long, env = NYM_NETWORK_MONITOR_RESULT_SUBMISSION_BATCH_SIZE_ARG, default_value = "50")]
result_submission_batch_size: NonZeroUsize,
}
impl Args {
@@ -145,6 +149,7 @@ impl Args {
chain_authorisation_check_max_attempts: self.chain_authorisation_check_max_attempts,
chain_authorisation_check_retry_delay: self.chain_authorisation_check_retry_delay,
result_submission_interval: self.result_submission_interval,
result_submission_batch_size: self.result_submission_batch_size.get(),
})
}
@@ -71,6 +71,9 @@ pub(crate) struct Config {
/// How often the orchestrator flushes accumulated test results to the nym-api as a signed
/// batch submission (e.g. `15m`, `1h`).
pub(crate) result_submission_interval: Duration,
/// Maximum number of stress testing results to submit in a single POST request
pub(crate) result_submission_batch_size: usize,
}
impl Config {
@@ -23,7 +23,7 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::time::Duration;
use tokio::time::{Instant, interval};
use tracing::{error, info, warn};
use tracing::{debug, error, info};
pub(crate) struct NodeRefresher {
pub(crate) client: QueryHttpRpcNyxdClient,
@@ -142,13 +142,13 @@ impl NodeRefresher {
.await
{
Err(_timeout) => {
warn!(
debug!(
"timed out while attempting to retrieve self-described node details for node {node_id}"
);
return node_update;
}
Ok(Err(err)) => {
error!("failed to retrieve self-described node details for node {node_id}: {err}");
debug!("failed to retrieve self-described node details for node {node_id}: {err}");
return node_update;
}
Ok(Ok(info)) => info,
@@ -12,8 +12,9 @@ use nym_validator_client::nym_api::NymApiClientExt;
use nym_validator_client::signable::SignableMessageBody;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::time::{Instant, MissedTickBehavior, interval_at};
use tracing::{debug, info};
use tracing::info;
/// Background task that periodically drains freshly-completed test run results from the local
/// storage, wraps them into a signed [`StressTestBatchSubmission`][batch], and POSTs the batch to
@@ -38,6 +39,9 @@ pub(crate) struct ResultSubmitter {
/// Cadence at which [`Self::run`] attempts a submission sweep.
submission_interval: Duration,
/// Maximum number of stress testing results to submit in a single POST request
result_submission_batch_size: usize,
shutdown_token: ShutdownToken,
}
@@ -54,6 +58,7 @@ impl ResultSubmitter {
storage,
identity_keys,
submission_interval: config.result_submission_interval,
result_submission_batch_size: config.result_submission_batch_size,
shutdown_token,
}
}
@@ -74,7 +79,7 @@ impl ResultSubmitter {
///
/// [batch]: nym_api_requests::models::network_monitor::StressTestBatchSubmission
async fn submit_pending_results(&self) -> anyhow::Result<()> {
info!("submitting stress-test results to nym-api");
info!("attempting to submit stress-test results to nym-api");
let last_submitted = self.storage.get_last_submitted_testrun_id().await?;
// `None` means "never submitted" - treat as 0, which pulls everything currently in the
// table (testrun.id is AUTOINCREMENT, so always >= 1).
@@ -82,29 +87,55 @@ impl ResultSubmitter {
let pending = self.storage.get_testruns_after(after_id).await?;
if pending.is_empty() {
debug!("stress-test result submission sweep: no new results");
info!("stress-test result submission sweep: no new results");
return Ok(());
}
// `get_testruns_after` returns rows ordered by id ASC, so the last row carries the
// highest id and is what we advance the watermark to once the batch is accepted.
#[allow(clippy::expect_used)]
let max_id = pending.last().expect("pending is non-empty").id;
let batch_size = pending.len();
let results: Vec<StressTestResult> = pending.into_iter().map(Into::into).collect();
info!("{} pending stress test results to submit", pending.len());
let signer = *self.identity_keys.public_key();
let body = StressTestBatchSubmissionContent::new(signer, results);
let signed = body.sign(self.identity_keys.private_key());
// nym-api requires each submission's timestamp to be strictly greater than the previous one
// for a given signer (replay protection). Within a single sweep, two consecutive chunks
// could otherwise share a `now_utc()` reading if the host clock has too-coarse resolution
// or steps backwards, which would get the second chunk rejected. Track the last timestamp
// we used and bump by a nanosecond if `now_utc()` hasn't advanced past it.
let mut last_timestamp = OffsetDateTime::now_utc();
self.client
.submit_stress_testing_results(&signed)
.await
.context("failed to POST stress-test batch submission to nym-api")?;
for chunk in pending.chunks(self.result_submission_batch_size) {
// `get_testruns_after` returns rows ordered by id ASC, so the last row carries the
// highest id and is what we advance the watermark to once the batch is accepted.
#[allow(clippy::expect_used)]
let max_id = chunk.last().expect("chunk is non-empty").id;
let batch_size = chunk.len();
let results: Vec<StressTestResult> = chunk.iter().map(Into::into).collect();
let now = OffsetDateTime::now_utc();
let timestamp = if now > last_timestamp {
now
} else {
last_timestamp + time::Duration::NANOSECOND
};
last_timestamp = timestamp;
let body = StressTestBatchSubmissionContent {
signer,
timestamp,
results,
};
let signed = body.sign(self.identity_keys.private_key());
self.client
.submit_stress_testing_results(&signed)
.await
.context("failed to POST stress-test batch submission to nym-api")?;
self.storage.set_last_submitted_testrun_id(max_id).await?;
info!(
"submitted {batch_size} stress-test results batch to nym-api (testrun ids up to {max_id})"
);
}
self.storage.set_last_submitted_testrun_id(max_id).await?;
info!("submitted {batch_size} stress-test results to nym-api (testrun ids up to {max_id})");
Ok(())
}
@@ -130,7 +161,7 @@ impl ResultSubmitter {
// Submission errors shouldn't kill the task - local storage retains the
// pending rows until the retention window expires, so the next tick will
// retry and eventually catch up once the nym-api is reachable again.
tracing::error!("failed to submit stress-test results: {err}");
tracing::error!("failed to submit stress-test results: {err:#}");
}
}
}
@@ -274,6 +274,14 @@ impl From<TestRun> for StressTestResult {
}
}
impl From<&TestRun> for StressTestResult {
fn from(run: &TestRun) -> Self {
// the data is small enough that cloning is negligible
// (since we're going to be converting at most couple dozen a minute rather than a few billion...)
run.clone().into()
}
}
/// The data required to insert or update a row in `nym_node`. Does not carry `last_testrun`
/// since that is managed separately via [`StorageManager::set_node_last_testrun`].
#[derive(Debug, Clone, sqlx::FromRow)]