Bugfix/cherry pick/waterloo stres testing floats (#6841)

* add additional information upon stress testing data submission failure

* split stress testing result submission into batches of maximum size

* enable 'float_roundtrip' serde_json feature to ensure consistent float serialisation
This commit is contained in:
Jędrzej Stuczyński
2026-06-01 11:44:31 +01:00
committed by GitHub
parent 11320e3f6a
commit 14a85901b4
10 changed files with 239 additions and 25 deletions
@@ -1,7 +1,7 @@
[package]
name = "nym-network-monitor-orchestrator"
description = "Orchestrator for performing Nym network stress testing"
version = "1.0.3"
version = "1.0.5"
authors.workspace = true
edition.workspace = true
license.workspace = true
@@ -37,4 +37,7 @@ pub mod vars {
"NYM_NETWORK_MONITOR_CHAIN_AUTH_CHECK_RETRY_DELAY";
pub const NYM_NETWORK_MONITOR_RESULT_SUBMISSION_INTERVAL_ARG: &str =
"NYM_NETWORK_MONITOR_RESULT_SUBMISSION_INTERVAL";
pub const NYM_NETWORK_MONITOR_RESULT_SUBMISSION_BATCH_SIZE_ARG: &str =
"NYM_NETWORK_MONITOR_RESULT_SUBMISSION_BATCH_SIZE";
}
@@ -9,7 +9,7 @@ use nym_crypto::asymmetric::ed25519;
use nym_validator_client::nyxd::bip39;
use std::mem;
use std::net::SocketAddr;
use std::num::NonZeroU32;
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
@@ -109,6 +109,10 @@ pub(crate) struct Args {
/// batch submission (e.g. `15m`, `1h`).
#[clap(long, env = NYM_NETWORK_MONITOR_RESULT_SUBMISSION_INTERVAL_ARG, value_parser = humantime::parse_duration, default_value = "15m")]
result_submission_interval: Duration,
/// Maximum number of stress testing results to submit in a single POST request
#[clap(long, env = NYM_NETWORK_MONITOR_RESULT_SUBMISSION_BATCH_SIZE_ARG, default_value = "50")]
result_submission_batch_size: NonZeroUsize,
}
impl Args {
@@ -145,6 +149,7 @@ impl Args {
chain_authorisation_check_max_attempts: self.chain_authorisation_check_max_attempts,
chain_authorisation_check_retry_delay: self.chain_authorisation_check_retry_delay,
result_submission_interval: self.result_submission_interval,
result_submission_batch_size: self.result_submission_batch_size.get(),
})
}
@@ -71,6 +71,9 @@ pub(crate) struct Config {
/// How often the orchestrator flushes accumulated test results to the nym-api as a signed
/// batch submission (e.g. `15m`, `1h`).
pub(crate) result_submission_interval: Duration,
/// Maximum number of stress testing results to submit in a single POST request
pub(crate) result_submission_batch_size: usize,
}
impl Config {
@@ -12,8 +12,9 @@ use nym_validator_client::nym_api::NymApiClientExt;
use nym_validator_client::signable::SignableMessageBody;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::time::{Instant, MissedTickBehavior, interval_at};
use tracing::{debug, info};
use tracing::info;
/// Background task that periodically drains freshly-completed test run results from the local
/// storage, wraps them into a signed [`StressTestBatchSubmission`][batch], and POSTs the batch to
@@ -38,6 +39,9 @@ pub(crate) struct ResultSubmitter {
/// Cadence at which [`Self::run`] attempts a submission sweep.
submission_interval: Duration,
/// Maximum number of stress testing results to submit in a single POST request
result_submission_batch_size: usize,
shutdown_token: ShutdownToken,
}
@@ -54,6 +58,7 @@ impl ResultSubmitter {
storage,
identity_keys,
submission_interval: config.result_submission_interval,
result_submission_batch_size: config.result_submission_batch_size,
shutdown_token,
}
}
@@ -74,7 +79,7 @@ impl ResultSubmitter {
///
/// [batch]: nym_api_requests::models::network_monitor::StressTestBatchSubmission
async fn submit_pending_results(&self) -> anyhow::Result<()> {
info!("submitting stress-test results to nym-api");
info!("attempting to submit stress-test results to nym-api");
let last_submitted = self.storage.get_last_submitted_testrun_id().await?;
// `None` means "never submitted" - treat as 0, which pulls everything currently in the
// table (testrun.id is AUTOINCREMENT, so always >= 1).
@@ -82,29 +87,55 @@ impl ResultSubmitter {
let pending = self.storage.get_testruns_after(after_id).await?;
if pending.is_empty() {
debug!("stress-test result submission sweep: no new results");
info!("stress-test result submission sweep: no new results");
return Ok(());
}
// `get_testruns_after` returns rows ordered by id ASC, so the last row carries the
// highest id and is what we advance the watermark to once the batch is accepted.
#[allow(clippy::expect_used)]
let max_id = pending.last().expect("pending is non-empty").id;
let batch_size = pending.len();
let results: Vec<StressTestResult> = pending.into_iter().map(Into::into).collect();
info!("{} pending stress test results to submit", pending.len());
let signer = *self.identity_keys.public_key();
let body = StressTestBatchSubmissionContent::new(signer, results);
let signed = body.sign(self.identity_keys.private_key());
// nym-api requires each submission's timestamp to be strictly greater than the previous one
// for a given signer (replay protection). Within a single sweep, two consecutive chunks
// could otherwise share a `now_utc()` reading if the host clock has too-coarse resolution
// or steps backwards, which would get the second chunk rejected. Track the last timestamp
// we used and bump by a nanosecond if `now_utc()` hasn't advanced past it.
let mut last_timestamp = OffsetDateTime::now_utc();
self.client
.submit_stress_testing_results(&signed)
.await
.context("failed to POST stress-test batch submission to nym-api")?;
for chunk in pending.chunks(self.result_submission_batch_size) {
// `get_testruns_after` returns rows ordered by id ASC, so the last row carries the
// highest id and is what we advance the watermark to once the batch is accepted.
#[allow(clippy::expect_used)]
let max_id = chunk.last().expect("chunk is non-empty").id;
let batch_size = chunk.len();
let results: Vec<StressTestResult> = chunk.iter().map(Into::into).collect();
let now = OffsetDateTime::now_utc();
let timestamp = if now > last_timestamp {
now
} else {
last_timestamp + time::Duration::NANOSECOND
};
last_timestamp = timestamp;
let body = StressTestBatchSubmissionContent {
signer,
timestamp,
results,
};
let signed = body.sign(self.identity_keys.private_key());
self.client
.submit_stress_testing_results(&signed)
.await
.context("failed to POST stress-test batch submission to nym-api")?;
self.storage.set_last_submitted_testrun_id(max_id).await?;
info!(
"submitted {batch_size} stress-test results batch to nym-api (testrun ids up to {max_id})"
);
}
self.storage.set_last_submitted_testrun_id(max_id).await?;
info!("submitted {batch_size} stress-test results to nym-api (testrun ids up to {max_id})");
Ok(())
}
@@ -130,7 +161,7 @@ impl ResultSubmitter {
// Submission errors shouldn't kill the task - local storage retains the
// pending rows until the retention window expires, so the next tick will
// retry and eventually catch up once the nym-api is reachable again.
tracing::error!("failed to submit stress-test results: {err}");
tracing::error!("failed to submit stress-test results: {err:#}");
}
}
}
@@ -274,6 +274,14 @@ impl From<TestRun> for StressTestResult {
}
}
impl From<&TestRun> for StressTestResult {
fn from(run: &TestRun) -> Self {
// the data is small enough that cloning is negligible
// (since we're going to be converting at most couple dozen a minute rather than a few billion...)
run.clone().into()
}
}
/// The data required to insert or update a row in `nym_node`. Does not carry `last_testrun`
/// since that is managed separately via [`StorageManager::set_node_last_testrun`].
#[derive(Debug, Clone, sqlx::FromRow)]