Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d474a2840f | |||
| bacf68697c | |||
| 6c4333a962 | |||
| 03f40a9ce2 | |||
| 65f4c08050 | |||
| 1b52856fe5 | |||
| e4df6416f5 | |||
| f5ab647a7a | |||
| ff53cd6e1d |
Generated
+20
-8
@@ -3822,9 +3822,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.155"
|
||||
version = "0.2.162"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
|
||||
checksum = "18d287de67fe55fd7e1581fe933d965a5a9477b38e949cfa9f8574ef01506398"
|
||||
|
||||
[[package]]
|
||||
name = "libm"
|
||||
@@ -5010,6 +5010,9 @@ dependencies = [
|
||||
name = "nym-common-models"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bincode",
|
||||
"nym-crypto",
|
||||
"serde",
|
||||
]
|
||||
|
||||
@@ -6049,14 +6052,20 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-node-status-agent"
|
||||
version = "0.1.6"
|
||||
version = "1.0.0-rc.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bincode",
|
||||
"chrono",
|
||||
"clap 4.5.20",
|
||||
"nym-bin-common",
|
||||
"nym-common-models",
|
||||
"nym-crypto",
|
||||
"rand",
|
||||
"reqwest 0.12.4",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
@@ -6065,10 +6074,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-node-status-api"
|
||||
version = "0.1.6"
|
||||
version = "1.0.0-rc.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"axum 0.7.7",
|
||||
"bincode",
|
||||
"chrono",
|
||||
"clap 4.5.20",
|
||||
"cosmwasm-std",
|
||||
@@ -6077,6 +6087,7 @@ dependencies = [
|
||||
"moka",
|
||||
"nym-bin-common",
|
||||
"nym-common-models",
|
||||
"nym-crypto",
|
||||
"nym-explorer-client",
|
||||
"nym-network-defaults",
|
||||
"nym-node-requests",
|
||||
@@ -8277,9 +8288,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustix"
|
||||
version = "0.38.34"
|
||||
version = "0.38.40"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f"
|
||||
checksum = "99e4ea3e1cdc4b559b8e5650f9c8e5998e3e5c1343b4eaf034565f32318d63c0"
|
||||
dependencies = [
|
||||
"bitflags 2.5.0",
|
||||
"errno",
|
||||
@@ -9514,12 +9525,13 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tempfile"
|
||||
version = "3.10.1"
|
||||
version = "3.14.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1"
|
||||
checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"fastrand 2.1.1",
|
||||
"once_cell",
|
||||
"rustix",
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
+1
-1
@@ -329,7 +329,7 @@ syn = "1"
|
||||
sysinfo = "0.30.13"
|
||||
tap = "1.0.1"
|
||||
tar = "0.4.42"
|
||||
tempfile = "3.5.0"
|
||||
tempfile = "3.14"
|
||||
thiserror = "1.0.64"
|
||||
time = "0.3.30"
|
||||
tokio = "1.39"
|
||||
|
||||
@@ -15,7 +15,9 @@ mixnet-contract-common = { path = "../mixnet-contract", package = "nym-mixnet-co
|
||||
contracts-common = { path = "../contracts-common", package = "nym-contracts-common", version = "0.5.0" }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
thiserror = { workspace = true }
|
||||
ts-rs = { workspace = true, optional = true}
|
||||
# without this feature, cargo clippy emits a ton of incompatibility warnings
|
||||
# https://docs.rs/ts-rs/latest/ts_rs/#serde-compatability
|
||||
ts-rs = { workspace = true, optional = true, features = ["no-serde-warnings"] }
|
||||
|
||||
[features]
|
||||
schema = ["cw2"]
|
||||
|
||||
@@ -11,4 +11,7 @@ rust-version.workspace = true
|
||||
readme.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
bincode = { workspace = true }
|
||||
nym-crypto = { path = "../crypto", features = ["asymmetric", "serde"] }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
|
||||
@@ -1,7 +1,101 @@
|
||||
use nym_crypto::asymmetric::ed25519::{PublicKey, Signature, SignatureError};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub mod get_testrun {
|
||||
use super::*;
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct Payload {
|
||||
pub agent_public_key: PublicKey,
|
||||
pub timestamp: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct GetTestrunRequest {
|
||||
pub payload: Payload,
|
||||
pub signature: Signature,
|
||||
}
|
||||
|
||||
impl SignedRequest for GetTestrunRequest {
|
||||
type Payload = Payload;
|
||||
|
||||
fn public_key(&self) -> &PublicKey {
|
||||
&self.payload.agent_public_key
|
||||
}
|
||||
|
||||
fn signature(&self) -> &Signature {
|
||||
&self.signature
|
||||
}
|
||||
|
||||
fn payload(&self) -> &Self::Payload {
|
||||
&self.payload
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct TestrunAssignment {
|
||||
pub testrun_id: i64,
|
||||
pub assigned_at_utc: i64,
|
||||
pub gateway_identity_key: String,
|
||||
}
|
||||
|
||||
pub mod submit_results {
|
||||
use super::*;
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct Payload {
|
||||
pub probe_result: String,
|
||||
pub agent_public_key: PublicKey,
|
||||
pub assigned_at_utc: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct SubmitResults {
|
||||
pub payload: Payload,
|
||||
pub signature: Signature,
|
||||
}
|
||||
|
||||
impl SignedRequest for SubmitResults {
|
||||
type Payload = Payload;
|
||||
|
||||
fn public_key(&self) -> &PublicKey {
|
||||
&self.payload.agent_public_key
|
||||
}
|
||||
|
||||
fn signature(&self) -> &Signature {
|
||||
&self.signature
|
||||
}
|
||||
|
||||
fn payload(&self) -> &Self::Payload {
|
||||
&self.payload
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait SignedRequest {
|
||||
type Payload: serde::Serialize;
|
||||
|
||||
fn public_key(&self) -> &PublicKey;
|
||||
fn signature(&self) -> &Signature;
|
||||
fn payload(&self) -> &Self::Payload;
|
||||
}
|
||||
|
||||
pub trait VerifiableRequest: SignedRequest {
|
||||
type Error: From<bincode::Error> + From<SignatureError>;
|
||||
|
||||
fn verify_signature(&self) -> Result<(), Self::Error> {
|
||||
bincode::serialize(self.payload())
|
||||
.map_err(Self::Error::from)
|
||||
.and_then(|serialized| {
|
||||
self.public_key()
|
||||
.verify(serialized, self.signature())
|
||||
.map_err(Self::Error::from)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> VerifiableRequest for T
|
||||
where
|
||||
T: SignedRequest,
|
||||
{
|
||||
type Error = anyhow::Error;
|
||||
}
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
nym-gateway-probe
|
||||
keys/
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
|
||||
[package]
|
||||
name = "nym-node-status-agent"
|
||||
version = "0.1.6"
|
||||
version = "1.0.0-rc.1"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
@@ -16,12 +16,20 @@ readme.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = { workspace = true}
|
||||
bincode = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
clap = { workspace = true, features = ["derive", "env"] }
|
||||
nym-bin-common = { path = "../common/bin-common", features = ["models"]}
|
||||
nym-common-models = { path = "../common/models" }
|
||||
nym-crypto = { path = "../common/crypto", features = ["asymmetric", "rand"] }
|
||||
rand = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "process"] }
|
||||
tokio-util = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true, features = ["env-filter"] }
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = { workspace = true }
|
||||
|
||||
Executable
+4
@@ -0,0 +1,4 @@
|
||||
#!/bin/bash
|
||||
|
||||
mkdir -p keys
|
||||
cargo run --package nym-node-status-agent -- generate-keypair --path keys/private
|
||||
@@ -1,25 +1,35 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -eu
|
||||
export ENVIRONMENT=${ENVIRONMENT:-"sandbox"}
|
||||
|
||||
environment="qa"
|
||||
|
||||
source ../envs/${environment}.env
|
||||
|
||||
export RUST_LOG="debug"
|
||||
probe_git_ref="nym-vpn-core-v1.0.0-rc.6"
|
||||
|
||||
crate_root=$(dirname $(realpath "$0"))
|
||||
gateway_probe_src=$(dirname $(dirname "$crate_root"))/nym-vpn-client/nym-vpn-core
|
||||
monorepo_root=$(dirname "${crate_root}")
|
||||
echo "Expecting nym-vpn-client repo at a sibling level of nym monorepo dir"
|
||||
gateway_probe_src=$(dirname "${monorepo_root}")/nym-vpn-client/nym-vpn-core
|
||||
echo "gateway_probe_src=$gateway_probe_src"
|
||||
echo "crate_root=$crate_root"
|
||||
|
||||
set -a
|
||||
source "${monorepo_root}/envs/${ENVIRONMENT}.env"
|
||||
set +a
|
||||
|
||||
export RUST_LOG="info"
|
||||
export NODE_STATUS_AGENT_SERVER_ADDRESS="http://127.0.0.1"
|
||||
export NODE_STATUS_AGENT_SERVER_PORT="8000"
|
||||
export NODE_STATUS_AGENT_PROBE_PATH="$crate_root/nym-gateway-probe"
|
||||
export NODE_STATUS_AGENT_AUTH_KEY="BjyC9SsHAZUzPRkQR4sPTvVrp4GgaquTh5YfSJksvvWT"
|
||||
|
||||
workers=${1:-1}
|
||||
echo "Running $workers workers in parallel"
|
||||
|
||||
# build & copy over GW probe
|
||||
function copy_gw_probe() {
|
||||
pushd $gateway_probe_src
|
||||
git switch main
|
||||
git pull
|
||||
git fetch -a
|
||||
git checkout $probe_git_ref
|
||||
cargo build --release --package nym-gateway-probe
|
||||
cp target/release/nym-gateway-probe "$crate_root"
|
||||
$crate_root/nym-gateway-probe --version
|
||||
@@ -32,11 +42,8 @@ function build_agent() {
|
||||
|
||||
function swarm() {
|
||||
local workers=$1
|
||||
echo "Running $workers in parallel"
|
||||
|
||||
build_agent
|
||||
|
||||
for ((i = 1; i <= $workers; i++)); do
|
||||
for ((i = 1; i <= workers; i++)); do
|
||||
../target/release/nym-node-status-agent run-probe &
|
||||
done
|
||||
|
||||
@@ -45,11 +52,7 @@ function swarm() {
|
||||
echo "All agents completed"
|
||||
}
|
||||
|
||||
export NODE_STATUS_AGENT_SERVER_ADDRESS="http://127.0.0.1"
|
||||
export NODE_STATUS_AGENT_SERVER_PORT="8000"
|
||||
|
||||
copy_gw_probe
|
||||
build_agent
|
||||
|
||||
swarm 8
|
||||
|
||||
# cargo run -- run-probe
|
||||
swarm $workers
|
||||
|
||||
@@ -1,118 +0,0 @@
|
||||
use anyhow::bail;
|
||||
use clap::{Parser, Subcommand};
|
||||
use nym_bin_common::bin_info;
|
||||
use nym_common_models::ns_api::TestrunAssignment;
|
||||
use std::sync::OnceLock;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::probe::GwProbe;
|
||||
|
||||
// Helper for passing LONG_VERSION to clap
|
||||
fn pretty_build_info_static() -> &'static str {
|
||||
static PRETTY_BUILD_INFORMATION: OnceLock<String> = OnceLock::new();
|
||||
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
|
||||
}
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)]
|
||||
pub(crate) struct Args {
|
||||
#[command(subcommand)]
|
||||
pub(crate) command: Command,
|
||||
#[arg(short, long, env = "NODE_STATUS_AGENT_SERVER_ADDRESS")]
|
||||
pub(crate) server_address: String,
|
||||
|
||||
#[arg(short = 'p', long, env = "NODE_STATUS_AGENT_SERVER_PORT")]
|
||||
pub(crate) server_port: u16,
|
||||
// TODO dz accept keypair for identification / auth
|
||||
}
|
||||
|
||||
#[derive(Subcommand, Debug)]
|
||||
pub(crate) enum Command {
|
||||
RunProbe {
|
||||
/// path of binary to run
|
||||
#[arg(long, env = "NODE_STATUS_AGENT_PROBE_PATH")]
|
||||
probe_path: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl Args {
|
||||
pub(crate) async fn execute(&self) -> anyhow::Result<()> {
|
||||
match &self.command {
|
||||
Command::RunProbe { probe_path } => self.run_probe(probe_path).await?,
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_probe(&self, probe_path: &str) -> anyhow::Result<()> {
|
||||
let server_address = format!("{}:{}", &self.server_address, self.server_port);
|
||||
|
||||
let probe = GwProbe::new(probe_path.to_string());
|
||||
|
||||
let version = probe.version().await;
|
||||
tracing::info!("Probe version:\n{}", version);
|
||||
|
||||
if let Some(testrun) = request_testrun(&server_address).await? {
|
||||
let log = probe.run_and_get_log(&Some(testrun.gateway_identity_key));
|
||||
|
||||
submit_results(&server_address, testrun.testrun_id, log).await?;
|
||||
} else {
|
||||
tracing::info!("No testruns available, exiting")
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
const URL_BASE: &str = "internal/testruns";
|
||||
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
async fn request_testrun(server_addr: &str) -> anyhow::Result<Option<TestrunAssignment>> {
|
||||
let target_url = format!("{}/{}", server_addr, URL_BASE);
|
||||
let client = reqwest::Client::new();
|
||||
let res = client.get(target_url).send().await?;
|
||||
let status = res.status();
|
||||
let response_text = res.text().await?;
|
||||
|
||||
if status.is_client_error() {
|
||||
bail!("{}: {}", status, response_text);
|
||||
} else if status.is_server_error() {
|
||||
if matches!(status, reqwest::StatusCode::SERVICE_UNAVAILABLE)
|
||||
&& response_text.contains("No testruns available")
|
||||
{
|
||||
return Ok(None);
|
||||
} else {
|
||||
bail!("{}: {}", status, response_text);
|
||||
}
|
||||
}
|
||||
|
||||
serde_json::from_str(&response_text)
|
||||
.map(|testrun| {
|
||||
tracing::info!("Received testrun assignment: {:?}", testrun);
|
||||
testrun
|
||||
})
|
||||
.map_err(|err| {
|
||||
tracing::error!("err");
|
||||
err.into()
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(probe_outcome))]
|
||||
async fn submit_results(
|
||||
server_addr: &str,
|
||||
testrun_id: i64,
|
||||
probe_outcome: String,
|
||||
) -> anyhow::Result<()> {
|
||||
let target_url = format!("{}/{}/{}", server_addr, URL_BASE, testrun_id);
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
let res = client
|
||||
.post(target_url)
|
||||
.body(probe_outcome)
|
||||
.send()
|
||||
.await
|
||||
.and_then(|response| response.error_for_status())?;
|
||||
|
||||
tracing::debug!("Submitted results: {})", res.status());
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
use std::{fs::File, io::Write, path::Path};
|
||||
use tracing::info;
|
||||
|
||||
pub(crate) fn generate_key_pair(path: impl AsRef<Path>) -> anyhow::Result<()> {
|
||||
let priv_key_path = path.as_ref();
|
||||
let mut rng = rand::thread_rng();
|
||||
let keypair = nym_crypto::asymmetric::identity::KeyPair::new(&mut rng);
|
||||
info!("Generated keypair as Base58-encoded string");
|
||||
|
||||
let mut private_key_file = File::create(priv_key_path)?;
|
||||
private_key_file.write_all(keypair.private_key().to_base58_string().as_bytes())?;
|
||||
|
||||
let pub_key_path = priv_key_path.with_extension("public");
|
||||
let mut public_key_file = File::create(&pub_key_path)?;
|
||||
public_key_file.write_all(keypair.public_key().to_base58_string().as_bytes())?;
|
||||
|
||||
info!(
|
||||
"Saved Base58-encoded keypair, private key to {}, public key to {}",
|
||||
priv_key_path.display(),
|
||||
pub_key_path.display()
|
||||
);
|
||||
info!("Public key should be whitelisted with NS API");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use nym_crypto::asymmetric::ed25519::PrivateKey;
|
||||
use tempfile::TempDir;
|
||||
|
||||
use super::*;
|
||||
|
||||
use std::{fs, path::PathBuf};
|
||||
|
||||
#[test]
|
||||
fn can_generate_valid_keypair() {
|
||||
let tmp_dir = TempDir::new().unwrap();
|
||||
let pkey_file = PathBuf::from_iter(&[
|
||||
tmp_dir.path().to_path_buf(),
|
||||
PathBuf::from("agent-key-private"),
|
||||
]);
|
||||
generate_key_pair(&pkey_file).expect("Failed to generate keypair");
|
||||
|
||||
let pkey_raw = fs::read_to_string(&pkey_file).expect("Failed to read file");
|
||||
let key = PrivateKey::from_base58_string(pkey_raw).expect("Failed to load key");
|
||||
|
||||
let msg = "hello, world";
|
||||
|
||||
let signature = key.sign(msg);
|
||||
key.public_key()
|
||||
.verify(msg, &signature)
|
||||
.expect("Failed to verify signature");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,74 @@
|
||||
use crate::probe::GwProbe;
|
||||
use clap::{Parser, Subcommand};
|
||||
use nym_bin_common::bin_info;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
pub(crate) mod generate_keypair;
|
||||
pub(crate) mod run_probe;
|
||||
|
||||
// Helper for passing LONG_VERSION to clap
|
||||
fn pretty_build_info_static() -> &'static str {
|
||||
static PRETTY_BUILD_INFORMATION: OnceLock<String> = OnceLock::new();
|
||||
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
|
||||
}
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)]
|
||||
pub(crate) struct Args {
|
||||
#[command(subcommand)]
|
||||
pub(crate) command: Command,
|
||||
}
|
||||
|
||||
#[derive(Subcommand, Debug)]
|
||||
pub(crate) enum Command {
|
||||
RunProbe {
|
||||
#[arg(short, long, env = "NODE_STATUS_AGENT_SERVER_ADDRESS")]
|
||||
server_address: String,
|
||||
|
||||
#[arg(short = 'p', long, env = "NODE_STATUS_AGENT_SERVER_PORT")]
|
||||
server_port: u16,
|
||||
|
||||
/// base58-encoded private key
|
||||
#[arg(long, env = "NODE_STATUS_AGENT_AUTH_KEY")]
|
||||
ns_api_auth_key: String,
|
||||
|
||||
/// path of binary to run
|
||||
#[arg(long, env = "NODE_STATUS_AGENT_PROBE_PATH")]
|
||||
probe_path: String,
|
||||
},
|
||||
|
||||
GenerateKeypair {
|
||||
#[arg(long)]
|
||||
path: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Args {
|
||||
pub(crate) async fn execute(&self) -> anyhow::Result<()> {
|
||||
match &self.command {
|
||||
Command::RunProbe {
|
||||
server_address,
|
||||
server_port,
|
||||
ns_api_auth_key,
|
||||
probe_path,
|
||||
} => run_probe::run_probe(
|
||||
server_address,
|
||||
server_port.to_owned(),
|
||||
ns_api_auth_key,
|
||||
probe_path,
|
||||
)
|
||||
.await
|
||||
.inspect_err(|err| {
|
||||
tracing::error!("{err}");
|
||||
})?,
|
||||
Command::GenerateKeypair { path } => {
|
||||
let path = path
|
||||
.to_owned()
|
||||
.unwrap_or_else(|| String::from("private-key"));
|
||||
generate_keypair::generate_key_pair(path)?
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,140 @@
|
||||
use anyhow::{bail, Context};
|
||||
use nym_common_models::ns_api::{get_testrun, submit_results, TestrunAssignment};
|
||||
use nym_crypto::asymmetric::ed25519::{PrivateKey, Signature};
|
||||
use std::fmt::Display;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::cli::GwProbe;
|
||||
|
||||
const INTERNAL_TESTRUNS: &str = "internal/testruns";
|
||||
|
||||
pub(crate) async fn run_probe(
|
||||
server_ip: &str,
|
||||
server_port: u16,
|
||||
ns_api_auth_key: &str,
|
||||
probe_path: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let auth_key = PrivateKey::from_base58_string(ns_api_auth_key)
|
||||
.context("Couldn't parse auth key, exiting")?;
|
||||
let ns_api_client = Client::new(server_ip, server_port, auth_key);
|
||||
|
||||
let probe = GwProbe::new(probe_path.to_string());
|
||||
|
||||
let version = probe.version().await;
|
||||
tracing::info!("Probe version:\n{}", version);
|
||||
|
||||
if let Some(testrun) = ns_api_client.request_testrun().await? {
|
||||
let log = probe.run_and_get_log(&Some(testrun.gateway_identity_key));
|
||||
|
||||
ns_api_client
|
||||
.submit_results(testrun.testrun_id, log, testrun.assigned_at_utc)
|
||||
.await?;
|
||||
} else {
|
||||
tracing::info!("No testruns available, exiting")
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct Client {
|
||||
server_address: String,
|
||||
client: reqwest::Client,
|
||||
auth_key: PrivateKey,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(server_ip: &str, server_port: u16, auth_key: PrivateKey) -> Self {
|
||||
let server_address = format!("{}:{}", server_ip, server_port);
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
Self {
|
||||
server_address,
|
||||
client,
|
||||
auth_key,
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
pub(crate) async fn request_testrun(&self) -> anyhow::Result<Option<TestrunAssignment>> {
|
||||
let target_url = self.api_with_subpath(None::<String>);
|
||||
|
||||
let payload = get_testrun::Payload {
|
||||
agent_public_key: self.auth_key.public_key(),
|
||||
timestamp: chrono::offset::Utc::now().timestamp(),
|
||||
};
|
||||
let signature = self.sign_message(&payload)?;
|
||||
let request = get_testrun::GetTestrunRequest { payload, signature };
|
||||
|
||||
let res = self.client.get(target_url).json(&request).send().await?;
|
||||
let status = res.status();
|
||||
let response_text = res.text().await?;
|
||||
|
||||
if status.is_client_error() {
|
||||
bail!("{}: {}", status, response_text);
|
||||
} else if status.is_server_error() {
|
||||
if matches!(status, reqwest::StatusCode::SERVICE_UNAVAILABLE)
|
||||
&& response_text.contains("No testruns available")
|
||||
{
|
||||
return Ok(None);
|
||||
} else {
|
||||
bail!("{}: {}", status, response_text);
|
||||
}
|
||||
}
|
||||
|
||||
serde_json::from_str(&response_text)
|
||||
.map(|testrun| {
|
||||
tracing::info!("Received testrun assignment: {:?}", testrun);
|
||||
testrun
|
||||
})
|
||||
.map_err(|err| {
|
||||
tracing::error!("err");
|
||||
err.into()
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self, probe_result))]
|
||||
pub(crate) async fn submit_results(
|
||||
&self,
|
||||
testrun_id: i64,
|
||||
probe_result: String,
|
||||
assigned_at_utc: i64,
|
||||
) -> anyhow::Result<()> {
|
||||
let target_url = self.api_with_subpath(Some(testrun_id));
|
||||
|
||||
let payload = submit_results::Payload {
|
||||
probe_result,
|
||||
agent_public_key: self.auth_key.public_key(),
|
||||
assigned_at_utc,
|
||||
};
|
||||
let signature = self.sign_message(&payload)?;
|
||||
let submit_results = submit_results::SubmitResults { payload, signature };
|
||||
|
||||
let res = self
|
||||
.client
|
||||
.post(target_url)
|
||||
.json(&submit_results)
|
||||
.send()
|
||||
.await
|
||||
.and_then(|response| response.error_for_status())?;
|
||||
|
||||
tracing::debug!("Submitted results: {})", res.status());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sign_message<T>(&self, message: &T) -> anyhow::Result<Signature>
|
||||
where
|
||||
T: serde::Serialize,
|
||||
{
|
||||
let serialized = bincode::serialize(message)?;
|
||||
let signed = self.auth_key.sign(&serialized);
|
||||
Ok(signed)
|
||||
}
|
||||
|
||||
fn api_with_subpath(&self, subpath: Option<impl Display>) -> String {
|
||||
if let Some(subpath) = subpath {
|
||||
format!("{}/{}/{}", self.server_address, INTERNAL_TESTRUNS, subpath)
|
||||
} else {
|
||||
format!("{}/{}", self.server_address, INTERNAL_TESTRUNS)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -11,26 +11,11 @@ async fn main() -> anyhow::Result<()> {
|
||||
setup_tracing();
|
||||
let args = Args::parse();
|
||||
|
||||
let server_addr = format!("{}:{}", args.server_address, args.server_port);
|
||||
test_ns_api_conn(&server_addr).await?;
|
||||
|
||||
args.execute().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn test_ns_api_conn(server_addr: &str) -> anyhow::Result<()> {
|
||||
reqwest::get(server_addr)
|
||||
.await
|
||||
.map(|res| {
|
||||
tracing::info!(
|
||||
"Testing connection to NS API at {server_addr}: {}",
|
||||
res.status()
|
||||
);
|
||||
})
|
||||
.map_err(|err| anyhow::anyhow!("Couldn't connect to server on {}: {}", server_addr, err))
|
||||
}
|
||||
|
||||
pub(crate) fn setup_tracing() {
|
||||
fn directive_checked(directive: impl Into<String>) -> Directive {
|
||||
directive
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
[package]
|
||||
name = "nym-node-status-api"
|
||||
version = "0.1.6"
|
||||
version = "1.0.0-rc.1"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
@@ -15,6 +15,7 @@ rust-version.workspace = true
|
||||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
axum = { workspace = true, features = ["tokio", "macros"] }
|
||||
bincode = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
clap = { workspace = true, features = ["cargo", "derive", "env", "string"] }
|
||||
cosmwasm-std = { workspace = true }
|
||||
@@ -23,6 +24,7 @@ futures-util = { workspace = true }
|
||||
moka = { workspace = true, features = ["future"] }
|
||||
nym-bin-common = { path = "../common/bin-common", features = ["models"]}
|
||||
nym-common-models = { path = "../common/models" }
|
||||
nym-crypto = { path = "../common/crypto", features = ["asymmetric", "serde"] }
|
||||
nym-explorer-client = { path = "../explorer-api/explorer-client" }
|
||||
nym-network-defaults = { path = "../common/network-defaults" }
|
||||
nym-validator-client = { path = "../common/client-libs/validator-client" }
|
||||
|
||||
@@ -6,14 +6,25 @@ export RUST_LOG=${RUST_LOG:-debug}
|
||||
|
||||
export NYM_API_CLIENT_TIMEOUT=60
|
||||
export EXPLORER_CLIENT_TIMEOUT=60
|
||||
export NODE_STATUS_API_TESTRUN_REFRESH_INTERVAL=60
|
||||
export NODE_STATUS_API_TESTRUN_REFRESH_INTERVAL=120
|
||||
# public keys corresponding to the agents NS API is expecting to be contacted from
|
||||
export NODE_STATUS_API_AGENT_KEY_LIST="H4z8kx5Kkf5JMQHhxaW1MwYndjKCDHC7HsVhHTFfBZ4J,
|
||||
5c2GW61135DEr73DxGrR4DR22BLEujvm1k8GYEjRB9at,
|
||||
3PSFDH2iSJ61KoDNyJpAiw42xS5smV5iBXWnRGTmk2du,
|
||||
2AH7pJL5PErbSFhZdu3uH8cKa1h1tyCUfSRUm6E5EBz8,
|
||||
6wQ9ifPFm2EB73BrwpGSd3Ek7GFA5kiAMQDP2ox6JKZw,
|
||||
G1tevJBnzaQ6zCUsFsxtGJf45BqCTDgzpEz6Sgxks8EH,
|
||||
FwjL2nGrtgQQ48fPqAUzUZ8UkQZtMtgehqTqj4PQopvh,
|
||||
Eujj4GmvwQBgHZaNSyqUbjMFSsnXWPSjEYUPgAsKmx1A,
|
||||
5ZnfSGxW6EKcFxB8jftb9V3f897VpwpZtf7kCPYzB595,
|
||||
H9kuRd8BGjEUD8Grh5U9YUPN5ZaQmSYz8U44R72AffKM"
|
||||
|
||||
export ENVIRONMENT="qa.env"
|
||||
export ENVIRONMENT=${ENVIRONMENT:-"sandbox"}
|
||||
|
||||
function run_bare() {
|
||||
# export necessary env vars
|
||||
set -a
|
||||
source ../envs/$ENVIRONMENT
|
||||
source ../envs/${ENVIRONMENT}.env
|
||||
set +a
|
||||
export RUST_LOG=debug
|
||||
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
ALTER TABLE testruns
|
||||
RENAME COLUMN timestamp_utc TO created_utc;
|
||||
|
||||
ALTER TABLE testruns
|
||||
ADD COLUMN last_assigned_utc INTEGER;
|
||||
@@ -56,7 +56,7 @@ pub(crate) struct Cli {
|
||||
|
||||
#[clap(
|
||||
long,
|
||||
default_value = "600",
|
||||
default_value = "300",
|
||||
env = "NODE_STATUS_API_MONITOR_REFRESH_INTERVAL"
|
||||
)]
|
||||
#[arg(value_parser = parse_duration)]
|
||||
@@ -64,11 +64,22 @@ pub(crate) struct Cli {
|
||||
|
||||
#[clap(
|
||||
long,
|
||||
default_value = "600",
|
||||
default_value = "300",
|
||||
env = "NODE_STATUS_API_TESTRUN_REFRESH_INTERVAL"
|
||||
)]
|
||||
#[arg(value_parser = parse_duration)]
|
||||
pub(crate) testruns_refresh_interval: Duration,
|
||||
|
||||
#[clap(env = "NODE_STATUS_API_AGENT_KEY_LIST")]
|
||||
#[arg(value_delimiter = ',')]
|
||||
pub(crate) agent_key_list: Vec<String>,
|
||||
|
||||
#[clap(
|
||||
long,
|
||||
default_value_t = 40,
|
||||
env = "NYM_NODE_STATUS_API_NYM_HTTP_CACHE_TTL"
|
||||
)]
|
||||
pub(crate) max_agent_count: i64,
|
||||
}
|
||||
|
||||
fn parse_duration(arg: &str) -> Result<std::time::Duration, std::num::ParseIntError> {
|
||||
|
||||
@@ -306,9 +306,10 @@ pub struct TestRunDto {
|
||||
pub id: i64,
|
||||
pub gateway_id: i64,
|
||||
pub status: i64,
|
||||
pub timestamp_utc: i64,
|
||||
pub created_utc: i64,
|
||||
pub ip_address: String,
|
||||
pub log: String,
|
||||
pub last_assigned_utc: Option<i64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, strum_macros::Display, EnumString, FromRepr, PartialEq)]
|
||||
|
||||
@@ -4,10 +4,26 @@ use crate::{
|
||||
db::models::{TestRunDto, TestRunStatus},
|
||||
testruns::now_utc,
|
||||
};
|
||||
use anyhow::Context;
|
||||
use chrono::Duration;
|
||||
use sqlx::{pool::PoolConnection, Sqlite};
|
||||
|
||||
pub(crate) async fn count_testruns_in_progress(
|
||||
conn: &mut PoolConnection<Sqlite>,
|
||||
) -> anyhow::Result<i64> {
|
||||
sqlx::query_scalar!(
|
||||
r#"SELECT
|
||||
COUNT(id) as "count: i64"
|
||||
FROM testruns
|
||||
WHERE
|
||||
status = ?
|
||||
"#,
|
||||
TestRunStatus::InProgress as i64,
|
||||
)
|
||||
.fetch_one(conn.as_mut())
|
||||
.await
|
||||
.map_err(anyhow::Error::from)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_in_progress_testrun_by_id(
|
||||
conn: &mut PoolConnection<Sqlite>,
|
||||
testrun_id: i64,
|
||||
@@ -18,26 +34,31 @@ pub(crate) async fn get_in_progress_testrun_by_id(
|
||||
id as "id!",
|
||||
gateway_id as "gateway_id!",
|
||||
status as "status!",
|
||||
timestamp_utc as "timestamp_utc!",
|
||||
created_utc as "created_utc!",
|
||||
ip_address as "ip_address!",
|
||||
log as "log!"
|
||||
log as "log!",
|
||||
last_assigned_utc
|
||||
FROM testruns
|
||||
WHERE
|
||||
id = ?
|
||||
AND
|
||||
status = ?
|
||||
ORDER BY timestamp_utc"#,
|
||||
ORDER BY created_utc
|
||||
LIMIT 1"#,
|
||||
testrun_id,
|
||||
TestRunStatus::InProgress as i64,
|
||||
)
|
||||
.fetch_one(conn.as_mut())
|
||||
.await
|
||||
.context(format!("Couldn't retrieve testrun {testrun_id}"))
|
||||
.map_err(|e| anyhow::anyhow!("Couldn't retrieve testrun {testrun_id}: {e}"))
|
||||
}
|
||||
|
||||
pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> anyhow::Result<u64> {
|
||||
pub(crate) async fn update_testruns_assigned_before(
|
||||
db: &DbPool,
|
||||
max_age: Duration,
|
||||
) -> anyhow::Result<u64> {
|
||||
let mut conn = db.acquire().await?;
|
||||
let previous_run = now_utc() - age;
|
||||
let previous_run = now_utc() - max_age;
|
||||
let cutoff_timestamp = previous_run.timestamp();
|
||||
|
||||
let res = sqlx::query!(
|
||||
@@ -48,7 +69,7 @@ pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> an
|
||||
WHERE
|
||||
status = ?
|
||||
AND
|
||||
timestamp_utc < ?
|
||||
last_assigned_utc < ?
|
||||
"#,
|
||||
TestRunStatus::Queued as i64,
|
||||
TestRunStatus::InProgress as i64,
|
||||
@@ -59,8 +80,8 @@ pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> an
|
||||
|
||||
let stale_testruns = res.rows_affected();
|
||||
if stale_testruns > 0 {
|
||||
tracing::debug!(
|
||||
"Refreshed {} stale testruns, scheduled before {} but not yet finished",
|
||||
tracing::info!(
|
||||
"Refreshed {} stale testruns, assigned before {} but not yet finished",
|
||||
stale_testruns,
|
||||
previous_run
|
||||
);
|
||||
@@ -69,19 +90,22 @@ pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> an
|
||||
Ok(stale_testruns)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_oldest_testrun_and_make_it_pending(
|
||||
pub(crate) async fn assign_oldest_testrun(
|
||||
conn: &mut PoolConnection<Sqlite>,
|
||||
) -> anyhow::Result<Option<TestrunAssignment>> {
|
||||
let now = now_utc().timestamp();
|
||||
// find & mark as "In progress" in the same transaction to avoid race conditions
|
||||
let returning = sqlx::query!(
|
||||
r#"UPDATE testruns
|
||||
SET status = ?
|
||||
SET
|
||||
status = ?,
|
||||
last_assigned_utc = ?
|
||||
WHERE rowid =
|
||||
(
|
||||
SELECT rowid
|
||||
FROM testruns
|
||||
WHERE status = ?
|
||||
ORDER BY timestamp_utc asc
|
||||
ORDER BY created_utc asc
|
||||
LIMIT 1
|
||||
)
|
||||
RETURNING
|
||||
@@ -89,6 +113,7 @@ pub(crate) async fn get_oldest_testrun_and_make_it_pending(
|
||||
gateway_id
|
||||
"#,
|
||||
TestRunStatus::InProgress as i64,
|
||||
now,
|
||||
TestRunStatus::Queued as i64,
|
||||
)
|
||||
.fetch_optional(conn.as_mut())
|
||||
@@ -111,6 +136,7 @@ pub(crate) async fn get_oldest_testrun_and_make_it_pending(
|
||||
Ok(Some(TestrunAssignment {
|
||||
testrun_id: testrun.id,
|
||||
gateway_identity_key: gw_identity.gateway_identity_key,
|
||||
assigned_at_utc: now,
|
||||
}))
|
||||
} else {
|
||||
Ok(None)
|
||||
|
||||
@@ -4,10 +4,12 @@ use axum::{
|
||||
extract::{Path, State},
|
||||
Router,
|
||||
};
|
||||
use nym_common_models::ns_api::{get_testrun, submit_results, VerifiableRequest};
|
||||
use reqwest::StatusCode;
|
||||
|
||||
use crate::db::models::TestRunStatus;
|
||||
use crate::db::queries;
|
||||
use crate::testruns::now_utc;
|
||||
use crate::{
|
||||
db,
|
||||
http::{
|
||||
@@ -28,9 +30,14 @@ pub(crate) fn routes() -> Router<AppState> {
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
async fn request_testrun(State(state): State<AppState>) -> HttpResult<Json<TestrunAssignment>> {
|
||||
// TODO dz log agent's key
|
||||
async fn request_testrun(
|
||||
State(state): State<AppState>,
|
||||
Json(request): Json<get_testrun::GetTestrunRequest>,
|
||||
) -> HttpResult<Json<TestrunAssignment>> {
|
||||
// TODO dz log agent's network probe version
|
||||
authenticate(&request, &state)?;
|
||||
is_fresh(&request.payload.timestamp)?;
|
||||
|
||||
tracing::debug!("Agent requested testrun");
|
||||
|
||||
let db = state.db_pool();
|
||||
@@ -39,17 +46,29 @@ async fn request_testrun(State(state): State<AppState>) -> HttpResult<Json<Testr
|
||||
.await
|
||||
.map_err(HttpError::internal_with_logging)?;
|
||||
|
||||
return match db::queries::testruns::get_oldest_testrun_and_make_it_pending(&mut conn).await {
|
||||
let active_testruns = db::queries::testruns::count_testruns_in_progress(&mut conn)
|
||||
.await
|
||||
.map_err(HttpError::internal_with_logging)?;
|
||||
if active_testruns >= state.agent_max_count() {
|
||||
tracing::warn!(
|
||||
"{}/{} testruns in progress, rejecting",
|
||||
active_testruns,
|
||||
state.agent_max_count()
|
||||
);
|
||||
return Err(HttpError::no_testruns_available());
|
||||
}
|
||||
|
||||
return match db::queries::testruns::assign_oldest_testrun(&mut conn).await {
|
||||
Ok(res) => {
|
||||
if let Some(testrun) = res {
|
||||
tracing::debug!(
|
||||
tracing::info!(
|
||||
"🏃 Assigned testrun row_id {} gateway {} to agent",
|
||||
&testrun.testrun_id,
|
||||
testrun.gateway_identity_key
|
||||
testrun.gateway_identity_key,
|
||||
);
|
||||
Ok(Json(testrun))
|
||||
} else {
|
||||
tracing::debug!("No testruns available for agent");
|
||||
tracing::debug!("No testruns available");
|
||||
Err(HttpError::no_testruns_available())
|
||||
}
|
||||
}
|
||||
@@ -57,64 +76,127 @@ async fn request_testrun(State(state): State<AppState>) -> HttpResult<Json<Testr
|
||||
};
|
||||
}
|
||||
|
||||
// TODO dz accept testrun_id as query parameter
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
async fn submit_testrun(
|
||||
Path(testrun_id): Path<i64>,
|
||||
Path(submitted_testrun_id): Path<i64>,
|
||||
State(state): State<AppState>,
|
||||
body: String,
|
||||
Json(submitted_result): Json<submit_results::SubmitResults>,
|
||||
) -> HttpResult<StatusCode> {
|
||||
authenticate(&submitted_result, &state)?;
|
||||
|
||||
let db = state.db_pool();
|
||||
let mut conn = db
|
||||
.acquire()
|
||||
.await
|
||||
.map_err(HttpError::internal_with_logging)?;
|
||||
|
||||
let testrun = queries::testruns::get_in_progress_testrun_by_id(&mut conn, testrun_id)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::error!("{e}");
|
||||
HttpError::not_found(testrun_id)
|
||||
})?;
|
||||
let assigned_testrun =
|
||||
queries::testruns::get_in_progress_testrun_by_id(&mut conn, submitted_testrun_id)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
tracing::warn!(
|
||||
"No testruns in progress for testrun_id {}: {}",
|
||||
submitted_testrun_id,
|
||||
err
|
||||
);
|
||||
HttpError::invalid_input("Invalid testrun submitted")
|
||||
})?;
|
||||
if Some(submitted_result.payload.assigned_at_utc) != assigned_testrun.last_assigned_utc {
|
||||
tracing::warn!(
|
||||
"Submitted testrun timestamp mismatch: {} != {:?}, rejecting",
|
||||
submitted_result.payload.assigned_at_utc,
|
||||
assigned_testrun.last_assigned_utc
|
||||
);
|
||||
return Err(HttpError::invalid_input("Invalid testrun submitted"));
|
||||
}
|
||||
|
||||
let gw_identity = db::queries::select_gateway_identity(&mut conn, testrun.gateway_id)
|
||||
let gw_identity = db::queries::select_gateway_identity(&mut conn, assigned_testrun.gateway_id)
|
||||
.await
|
||||
.map_err(|_| {
|
||||
// should never happen:
|
||||
HttpError::internal_with_logging("No gateway found for testrun")
|
||||
HttpError::internal_with_logging(format!(
|
||||
"No gateway found for testrun {submitted_testrun_id}"
|
||||
))
|
||||
})?;
|
||||
tracing::debug!(
|
||||
"Agent submitted testrun {} for gateway {} ({} bytes)",
|
||||
testrun_id,
|
||||
submitted_testrun_id,
|
||||
gw_identity,
|
||||
body.len(),
|
||||
&submitted_result.payload.probe_result.len(),
|
||||
);
|
||||
|
||||
// TODO dz this should be part of a single transaction: commit after everything is done
|
||||
queries::testruns::update_testrun_status(&mut conn, testrun_id, TestRunStatus::Complete)
|
||||
queries::testruns::update_testrun_status(
|
||||
&mut conn,
|
||||
submitted_testrun_id,
|
||||
TestRunStatus::Complete,
|
||||
)
|
||||
.await
|
||||
.map_err(HttpError::internal_with_logging)?;
|
||||
queries::testruns::update_gateway_last_probe_log(
|
||||
&mut conn,
|
||||
assigned_testrun.gateway_id,
|
||||
&submitted_result.payload.probe_result,
|
||||
)
|
||||
.await
|
||||
.map_err(HttpError::internal_with_logging)?;
|
||||
let result = get_result_from_log(&submitted_result.payload.probe_result);
|
||||
queries::testruns::update_gateway_last_probe_result(
|
||||
&mut conn,
|
||||
assigned_testrun.gateway_id,
|
||||
&result,
|
||||
)
|
||||
.await
|
||||
.map_err(HttpError::internal_with_logging)?;
|
||||
queries::testruns::update_gateway_score(&mut conn, assigned_testrun.gateway_id)
|
||||
.await
|
||||
.map_err(HttpError::internal_with_logging)?;
|
||||
queries::testruns::update_gateway_last_probe_log(&mut conn, testrun.gateway_id, &body)
|
||||
.await
|
||||
.map_err(HttpError::internal_with_logging)?;
|
||||
let result = get_result_from_log(&body);
|
||||
queries::testruns::update_gateway_last_probe_result(&mut conn, testrun.gateway_id, &result)
|
||||
.await
|
||||
.map_err(HttpError::internal_with_logging)?;
|
||||
queries::testruns::update_gateway_score(&mut conn, testrun.gateway_id)
|
||||
.await
|
||||
.map_err(HttpError::internal_with_logging)?;
|
||||
// TODO dz log gw identity key
|
||||
|
||||
let created_at = chrono::DateTime::from_timestamp(assigned_testrun.created_utc, 0)
|
||||
.map(|d| d.to_rfc3339())
|
||||
.unwrap_or_default();
|
||||
let last_assigned = assigned_testrun
|
||||
.last_assigned_utc
|
||||
.and_then(|d| chrono::DateTime::from_timestamp(d, 0))
|
||||
.map(|d| d.to_rfc3339())
|
||||
.unwrap_or_default();
|
||||
tracing::info!(
|
||||
"✅ Testrun row_id {} for gateway {} complete",
|
||||
testrun.id,
|
||||
gw_identity
|
||||
"✅ Testrun row_id {} for gateway {} complete (last assigned at {}, created at {})",
|
||||
assigned_testrun.id,
|
||||
gw_identity,
|
||||
last_assigned,
|
||||
created_at
|
||||
);
|
||||
|
||||
Ok(StatusCode::CREATED)
|
||||
}
|
||||
|
||||
// TODO dz this should be middleware
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
fn authenticate(request: &impl VerifiableRequest, state: &AppState) -> HttpResult<()> {
|
||||
if !state.is_registered(request.public_key()) {
|
||||
tracing::warn!("Public key not registered with NS API, rejecting");
|
||||
return Err(HttpError::unauthorized());
|
||||
};
|
||||
|
||||
request.verify_signature().map_err(|_| {
|
||||
tracing::warn!("Signature verification failed, rejecting");
|
||||
HttpError::unauthorized()
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_fresh(request_time: &i64) -> HttpResult<()> {
|
||||
// if a request took longer than N minutes to reach NS API, something is very wrong
|
||||
let freshness_cutoff = chrono::Duration::minutes(1);
|
||||
let cutoff_timestamp = (now_utc() - freshness_cutoff).timestamp();
|
||||
if *request_time < cutoff_timestamp {
|
||||
tracing::warn!("Request older than {}s, rejecting", cutoff_timestamp);
|
||||
return Err(HttpError::unauthorized());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_result_from_log(log: &str) -> String {
|
||||
let re = regex::Regex::new(r"\n\{\s").unwrap();
|
||||
let result: Vec<_> = re.splitn(log, 2).collect();
|
||||
|
||||
@@ -15,6 +15,13 @@ impl HttpError {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn unauthorized() -> Self {
|
||||
Self {
|
||||
message: serde_json::json!({"message": "Make sure your public key is registered with NS API"}).to_string(),
|
||||
status: axum::http::StatusCode::UNAUTHORIZED,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn internal_with_logging(msg: impl Display) -> Self {
|
||||
tracing::error!("{}", msg.to_string());
|
||||
Self::internal()
|
||||
@@ -33,12 +40,6 @@ impl HttpError {
|
||||
status: axum::http::StatusCode::SERVICE_UNAVAILABLE,
|
||||
}
|
||||
}
|
||||
pub(crate) fn not_found(msg: impl Display) -> Self {
|
||||
Self {
|
||||
message: serde_json::json!({"message": msg.to_string()}).to_string(),
|
||||
status: axum::http::StatusCode::NOT_FOUND,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl axum::response::IntoResponse for HttpError {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use axum::Router;
|
||||
use core::net::SocketAddr;
|
||||
use nym_crypto::asymmetric::ed25519::PublicKey;
|
||||
use tokio::{net::TcpListener, task::JoinHandle};
|
||||
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
|
||||
|
||||
@@ -14,10 +15,12 @@ pub(crate) async fn start_http_api(
|
||||
db_pool: DbPool,
|
||||
http_port: u16,
|
||||
nym_http_cache_ttl: u64,
|
||||
agent_key_list: Vec<PublicKey>,
|
||||
agent_max_count: i64,
|
||||
) -> anyhow::Result<ShutdownHandles> {
|
||||
let router_builder = RouterBuilder::with_default_routes();
|
||||
|
||||
let state = AppState::new(db_pool, nym_http_cache_ttl);
|
||||
let state = AppState::new(db_pool, nym_http_cache_ttl, agent_key_list, agent_max_count);
|
||||
let router = router_builder.with_state(state);
|
||||
|
||||
// TODO dz do we need this to be configurable?
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use moka::{future::Cache, Entry};
|
||||
use nym_crypto::asymmetric::ed25519::PublicKey;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::{
|
||||
@@ -12,13 +13,22 @@ use crate::{
|
||||
pub(crate) struct AppState {
|
||||
db_pool: DbPool,
|
||||
cache: HttpCache,
|
||||
agent_key_list: Vec<PublicKey>,
|
||||
agent_max_count: i64,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
pub(crate) fn new(db_pool: DbPool, cache_ttl: u64) -> Self {
|
||||
pub(crate) fn new(
|
||||
db_pool: DbPool,
|
||||
cache_ttl: u64,
|
||||
agent_key_list: Vec<PublicKey>,
|
||||
agent_max_count: i64,
|
||||
) -> Self {
|
||||
Self {
|
||||
db_pool,
|
||||
cache: HttpCache::new(cache_ttl),
|
||||
agent_key_list,
|
||||
agent_max_count,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,6 +39,14 @@ impl AppState {
|
||||
pub(crate) fn cache(&self) -> &HttpCache {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
pub(crate) fn is_registered(&self, agent_pubkey: &PublicKey) -> bool {
|
||||
self.agent_key_list.contains(agent_pubkey)
|
||||
}
|
||||
|
||||
pub(crate) fn agent_max_count(&self) -> i64 {
|
||||
self.agent_max_count
|
||||
}
|
||||
}
|
||||
|
||||
static GATEWAYS_LIST_KEY: &str = "gateways";
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use clap::Parser;
|
||||
use nym_crypto::asymmetric::ed25519::PublicKey;
|
||||
use nym_task::signal::wait_for_signal;
|
||||
|
||||
mod cli;
|
||||
@@ -14,6 +15,13 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let args = cli::Cli::parse();
|
||||
|
||||
let agent_key_list = args
|
||||
.agent_key_list
|
||||
.iter()
|
||||
.map(|value| PublicKey::from_base58_string(value.trim()).map_err(anyhow::Error::from))
|
||||
.collect::<anyhow::Result<Vec<_>>>()?;
|
||||
tracing::info!("Registered {} agent keys", agent_key_list.len());
|
||||
|
||||
let connection_url = args.database_url.clone();
|
||||
tracing::debug!("Using config:\n{:#?}", args);
|
||||
|
||||
@@ -31,12 +39,15 @@ async fn main() -> anyhow::Result<()> {
|
||||
.await;
|
||||
tracing::info!("Started monitor task");
|
||||
});
|
||||
|
||||
testruns::spawn(storage.pool_owned(), args.testruns_refresh_interval).await;
|
||||
|
||||
let shutdown_handles = http::server::start_http_api(
|
||||
storage.pool_owned(),
|
||||
args.http_port,
|
||||
args.nym_http_cache_ttl,
|
||||
agent_key_list.to_owned(),
|
||||
args.max_agent_count,
|
||||
)
|
||||
.await
|
||||
.expect("Failed to start server");
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::db::models::GatewayIdentityDto;
|
||||
|
||||
use crate::db::DbPool;
|
||||
use futures_util::TryStreamExt;
|
||||
use std::time::Duration;
|
||||
@@ -24,8 +25,6 @@ pub(crate) async fn spawn(pool: DbPool, refresh_interval: Duration) {
|
||||
});
|
||||
}
|
||||
|
||||
// TODO dz make number of max agents configurable
|
||||
|
||||
#[instrument(level = "debug", name = "testrun_queue", skip_all)]
|
||||
async fn run(pool: &DbPool) -> anyhow::Result<()> {
|
||||
tracing::info!("Spawning testruns...");
|
||||
@@ -72,15 +71,15 @@ async fn run(pool: &DbPool) -> anyhow::Result<()> {
|
||||
testruns_created += 1;
|
||||
}
|
||||
}
|
||||
tracing::debug!("{} testruns queued in total", testruns_created);
|
||||
tracing::info!("{} testruns queued in total", testruns_created);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
async fn refresh_stale_testruns(pool: &DbPool, refresh_interval: Duration) -> anyhow::Result<()> {
|
||||
let chrono_duration = chrono::Duration::from_std(refresh_interval)?;
|
||||
crate::db::queries::testruns::update_testruns_older_than(pool, chrono_duration).await?;
|
||||
let refresh_interval = chrono::Duration::from_std(refresh_interval)?;
|
||||
crate::db::queries::testruns::update_testruns_assigned_before(pool, refresh_interval).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ pub(crate) async fn try_queue_testrun(
|
||||
LIMIT 1"#,
|
||||
identity_key,
|
||||
)
|
||||
// TODO dz shoudl call .fetch_one
|
||||
// TODO dz should call .fetch_one
|
||||
// TODO dz replace this in other queries as well
|
||||
.fetch(conn.as_mut())
|
||||
.try_collect::<Vec<_>>()
|
||||
@@ -53,9 +53,10 @@ pub(crate) async fn try_queue_testrun(
|
||||
id as "id!",
|
||||
gateway_id as "gateway_id!",
|
||||
status as "status!",
|
||||
timestamp_utc as "timestamp_utc!",
|
||||
created_utc as "created_utc!",
|
||||
ip_address as "ip_address!",
|
||||
log as "log!"
|
||||
log as "log!",
|
||||
last_assigned_utc
|
||||
FROM testruns
|
||||
WHERE gateway_id = ? AND status != 2
|
||||
ORDER BY id DESC
|
||||
@@ -89,7 +90,7 @@ pub(crate) async fn try_queue_testrun(
|
||||
);
|
||||
|
||||
let id = sqlx::query!(
|
||||
"INSERT INTO testruns (gateway_id, status, ip_address, timestamp_utc, log) VALUES (?, ?, ?, ?, ?)",
|
||||
"INSERT INTO testruns (gateway_id, status, ip_address, created_utc, log) VALUES (?, ?, ?, ?, ?)",
|
||||
gateway_id,
|
||||
status,
|
||||
ip_address,
|
||||
|
||||
Reference in New Issue
Block a user