NS API with directory v2 (#5068)

* Use unstable explorer client

* Clean up stale testruns & logging
- log gw identity key
- better agent testrun logging
- log responses
- change response code for agents

* Better logging on agent

* Testrun stores gw identity key instead of gw pk

* Agent 0.1.3

* Agent 0.1.4

* Sqlx offline query data + clippy

* Compatible with directory v2

* Point to internal deps + rebase + v0.1.5

* self described field not null

* Fix build.rs typo
This commit is contained in:
Dinko Zdravac
2024-10-31 13:52:20 +01:00
committed by GitHub
parent 9f8bf2d080
commit cf4fe5f875
49 changed files with 3079 additions and 0 deletions
+7
View File
@@ -0,0 +1,7 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TestrunAssignment {
pub testrun_id: i64,
pub gateway_identity_key: String,
}
+27
View File
@@ -0,0 +1,27 @@
# Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
# SPDX-License-Identifier: Apache-2.0
[package]
name = "nym-node-status-agent"
version = "0.1.4"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
readme.workspace = true
[dependencies]
anyhow = { workspace = true}
clap = { workspace = true, features = ["derive", "env"] }
nym-bin-common = { path = "../common/bin-common", features = ["models"]}
nym-common-models = { path = "../common/models" }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "process"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
reqwest = { workspace = true, features = ["json"] }
serde_json = { workspace = true }
+55
View File
@@ -0,0 +1,55 @@
#!/bin/bash
set -eu
environment="qa"
source ../envs/${environment}.env
export RUST_LOG="debug"
crate_root=$(dirname $(realpath "$0"))
gateway_probe_src=$(dirname $(dirname "$crate_root"))/nym-vpn-client/nym-vpn-core
echo "gateway_probe_src=$gateway_probe_src"
echo "crate_root=$crate_root"
export NODE_STATUS_AGENT_PROBE_PATH="$crate_root/nym-gateway-probe"
# build & copy over GW probe
function copy_gw_probe() {
pushd $gateway_probe_src
git switch main
git pull
cargo build --release --package nym-gateway-probe
cp target/release/nym-gateway-probe "$crate_root"
$crate_root/nym-gateway-probe --version
popd
}
function build_agent() {
cargo build --package nym-node-status-agent --release
}
function swarm() {
local workers=$1
echo "Running $workers in parallel"
build_agent
for ((i = 1; i <= $workers; i++)); do
../target/release/nym-node-status-agent run-probe &
done
wait
echo "All agents completed"
}
export NODE_STATUS_AGENT_SERVER_ADDRESS="http://127.0.0.1"
export NODE_STATUS_AGENT_SERVER_PORT="8000"
copy_gw_probe
swarm 8
# cargo run -- run-probe
+105
View File
@@ -0,0 +1,105 @@
use clap::{Parser, Subcommand};
use nym_bin_common::bin_info;
use nym_common_models::ns_api::TestrunAssignment;
use std::sync::OnceLock;
use tracing::instrument;
use crate::probe::GwProbe;
// Helper for passing LONG_VERSION to clap
fn pretty_build_info_static() -> &'static str {
static PRETTY_BUILD_INFORMATION: OnceLock<String> = OnceLock::new();
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
}
#[derive(Parser, Debug)]
#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)]
pub(crate) struct Args {
#[command(subcommand)]
pub(crate) command: Command,
#[arg(short, long, env = "NODE_STATUS_AGENT_SERVER_ADDRESS")]
pub(crate) server_address: String,
#[arg(short = 'p', long, env = "NODE_STATUS_AGENT_SERVER_PORT")]
pub(crate) server_port: u16,
// TODO dz accept keypair for identification / auth
}
#[derive(Subcommand, Debug)]
pub(crate) enum Command {
RunProbe {
/// path of binary to run
#[arg(long, env = "NODE_STATUS_AGENT_PROBE_PATH")]
probe_path: String,
},
}
impl Args {
pub(crate) async fn execute(&self) -> anyhow::Result<()> {
match &self.command {
Command::RunProbe { probe_path } => self.run_probe(probe_path).await?,
}
Ok(())
}
async fn run_probe(&self, probe_path: &str) -> anyhow::Result<()> {
let server_address = format!("{}:{}", &self.server_address, self.server_port);
let probe = GwProbe::new(probe_path.to_string());
let version = probe.version().await;
tracing::info!("Probe version:\n{}", version);
let testrun = request_testrun(&server_address).await?;
let log = probe.run_and_get_log(&Some(testrun.gateway_identity_key));
submit_results(&server_address, testrun.testrun_id, log).await?;
Ok(())
}
}
const URL_BASE: &str = "internal/testruns";
#[instrument(level = "debug", skip_all)]
async fn request_testrun(server_addr: &str) -> anyhow::Result<TestrunAssignment> {
let target_url = format!("{}/{}", server_addr, URL_BASE);
let client = reqwest::Client::new();
let res = client
.get(target_url)
.send()
.await
.and_then(|response| response.error_for_status())?;
res.json()
.await
.map(|testrun| {
tracing::info!("Received testrun assignment: {:?}", testrun);
testrun
})
.map_err(|err| {
tracing::error!("err");
err.into()
})
}
#[instrument(level = "debug", skip(probe_outcome))]
async fn submit_results(
server_addr: &str,
testrun_id: i64,
probe_outcome: String,
) -> anyhow::Result<()> {
let target_url = format!("{}/{}/{}", server_addr, URL_BASE, testrun_id);
let client = reqwest::Client::new();
let res = client
.post(target_url)
.body(probe_outcome)
.send()
.await
.and_then(|response| response.error_for_status())?;
tracing::debug!("Submitted results: {})", res.status());
Ok(())
}
+60
View File
@@ -0,0 +1,60 @@
use tracing::error;
pub(crate) struct GwProbe {
path: String,
}
impl GwProbe {
pub(crate) fn new(probe_path: String) -> Self {
Self { path: probe_path }
}
pub(crate) async fn version(&self) -> String {
let mut command = tokio::process::Command::new(&self.path);
command.stdout(std::process::Stdio::piped());
command.arg("--version");
match command.spawn() {
Ok(child) => {
if let Ok(output) = child.wait_with_output().await {
return String::from_utf8(output.stdout)
.unwrap_or("Unable to get log from test run".to_string());
}
"Unable to get probe version".to_string()
}
Err(e) => {
error!("Failed to get probe version: {}", e);
"Failed to get probe version".to_string()
}
}
}
pub(crate) fn run_and_get_log(&self, gateway_key: &Option<String>) -> String {
let mut command = std::process::Command::new(&self.path);
command.stdout(std::process::Stdio::piped());
if let Some(gateway_id) = gateway_key {
command.arg("--gateway").arg(gateway_id);
}
match command.spawn() {
Ok(child) => {
if let Ok(output) = child.wait_with_output() {
if !output.status.success() {
let out = String::from_utf8_lossy(&output.stdout);
let err = String::from_utf8_lossy(&output.stderr);
tracing::error!("Probe exited with {:?}:\n{}\n{}", output.status, out, err);
}
return String::from_utf8(output.stdout)
.unwrap_or("Unable to get log from test run".to_string());
}
"Unable to get log from test run".to_string()
}
Err(e) => {
error!("Failed to spawn test: {}", e);
"Failed to spawn test run task".to_string()
}
}
}
}
@@ -0,0 +1,20 @@
{
"db_name": "SQLite",
"query": "SELECT\n gateway_identity_key\n FROM\n gateways\n WHERE\n id = ?",
"describe": {
"columns": [
{
"name": "gateway_identity_key",
"ordinal": 0,
"type_info": "Text"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false
]
},
"hash": "06b17d1e5f61201a1b7542896ba55c69cd5c1a7e7d87073c94600c783a0a3984"
}
@@ -0,0 +1,32 @@
{
"db_name": "SQLite",
"query": "SELECT\n key as \"key!\",\n value_json as \"value_json!\",\n last_updated_utc as \"last_updated_utc!\"\n FROM summary",
"describe": {
"columns": [
{
"name": "key!",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "value_json!",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "last_updated_utc!",
"ordinal": 2,
"type_info": "Int64"
}
],
"parameters": {
"Right": 0
},
"nullable": [
true,
true,
false
]
},
"hash": "1327b5118f9144dddbcf8edb11f7dc549cf503409fd6dfedcdc02dbcd61d5454"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE mixnodes\n SET bonded = ?, last_updated_utc = ?\n WHERE id = ?;",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "18abc8fde56cf86baed7b4afa38f2c63cdf90f2f3b6d81afb9000bb0968dcaea"
}
@@ -0,0 +1,26 @@
{
"db_name": "SQLite",
"query": "\n SELECT\n id,\n gateway_identity_key\n FROM gateways\n WHERE id = ?\n LIMIT 1",
"describe": {
"columns": [
{
"name": "id",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "gateway_identity_key",
"ordinal": 1,
"type_info": "Text"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
false
]
},
"hash": "2236299f9f691376db54cbd58ec5ceb89b9925cba46efcf4ed79ef0759a01129"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO testruns (gateway_id, status, ip_address, timestamp_utc, log) VALUES (?, ?, ?, ?, ?)",
"describe": {
"columns": [],
"parameters": {
"Right": 5
},
"nullable": []
},
"hash": "3c584e211d07c511644c8079187965acf3bcfb3f84ba8d24ed645d79976cf784"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE gateways\n SET bonded = ?, last_updated_utc = ?\n WHERE id = ?;",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "3d3a1fa429e3090741c6b6a8e82e692afc04b51e8782bcbf59f1eb4116112536"
}
@@ -0,0 +1,38 @@
{
"db_name": "SQLite",
"query": "SELECT\n id as \"id!\",\n gateway_identity_key as \"gateway_identity_key!\",\n self_described as \"self_described?\",\n explorer_pretty_bond as \"explorer_pretty_bond?\"\n FROM gateways\n WHERE gateway_identity_key = ?\n ORDER BY gateway_identity_key\n LIMIT 1",
"describe": {
"columns": [
{
"name": "id!",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "gateway_identity_key!",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "self_described?",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "explorer_pretty_bond?",
"ordinal": 3,
"type_info": "Text"
}
],
"parameters": {
"Right": 1
},
"nullable": [
true,
false,
true,
true
]
},
"hash": "3d5fc502f976f5081f01352856b8632c29c81bfafb043bb8744129cf9e0266ad"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE testruns SET status = ? WHERE id = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 2
},
"nullable": []
},
"hash": "418944f2eccb838cb3882f34469203c8569f03fdd39ce09d7b74177896e52a8c"
}
@@ -0,0 +1,50 @@
{
"db_name": "SQLite",
"query": "SELECT\n id as \"id!\",\n gateway_id as \"gateway_id!\",\n status as \"status!\",\n timestamp_utc as \"timestamp_utc!\",\n ip_address as \"ip_address!\",\n log as \"log!\"\n FROM testruns\n WHERE gateway_id = ? AND status != 2\n ORDER BY id DESC\n LIMIT 1",
"describe": {
"columns": [
{
"name": "id!",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "gateway_id!",
"ordinal": 1,
"type_info": "Int64"
},
{
"name": "status!",
"ordinal": 2,
"type_info": "Int64"
},
{
"name": "timestamp_utc!",
"ordinal": 3,
"type_info": "Int64"
},
{
"name": "ip_address!",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "log!",
"ordinal": 5,
"type_info": "Text"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
false,
false,
false,
false,
false
]
},
"hash": "46d76bc6d3fba2dae3b21511a36289dd776749dd7a20cda61b0480f2fba60889"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE gateways SET last_probe_log = ? WHERE id = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 2
},
"nullable": []
},
"hash": "4afcc6673890f795c2793f1e2f8570ee787fc7daf00fcb916f18d1cb7d6c8f08"
}
@@ -0,0 +1,32 @@
{
"db_name": "SQLite",
"query": "SELECT\n id as \"id!\",\n gateway_identity_key as \"identity_key!\",\n bonded as \"bonded: bool\"\n FROM gateways\n WHERE bonded = ?",
"describe": {
"columns": [
{
"name": "id!",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "identity_key!",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "bonded: bool",
"ordinal": 2,
"type_info": "Int64"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
false,
false
]
},
"hash": "4b61a4bc32333c92a8f5ad4ad0017b40dc01845f554b5479f37855d89b309e6f"
}
@@ -0,0 +1,20 @@
{
"db_name": "SQLite",
"query": "SELECT count(id) FROM mixnodes",
"describe": {
"columns": [
{
"name": "count(id)",
"ordinal": 0,
"type_info": "Int"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false
]
},
"hash": "670b7ed7d57a6986181b24be24ca667e8cacdf677ccb906415b3fe92be0c436b"
}
@@ -0,0 +1,50 @@
{
"db_name": "SQLite",
"query": "SELECT\n id as \"id!\",\n gateway_id as \"gateway_id!\",\n status as \"status!\",\n timestamp_utc as \"timestamp_utc!\",\n ip_address as \"ip_address!\",\n log as \"log!\"\n FROM testruns\n WHERE\n id = ?\n AND\n status = ?\n ORDER BY timestamp_utc",
"describe": {
"columns": [
{
"name": "id!",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "gateway_id!",
"ordinal": 1,
"type_info": "Int64"
},
{
"name": "status!",
"ordinal": 2,
"type_info": "Int64"
},
{
"name": "timestamp_utc!",
"ordinal": 3,
"type_info": "Int64"
},
{
"name": "ip_address!",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "log!",
"ordinal": 5,
"type_info": "Text"
}
],
"parameters": {
"Right": 2
},
"nullable": [
false,
false,
false,
false,
false,
false
]
},
"hash": "6d7967b831b355d5f2c77950abc56f816956b0824c66a25da611dce688105d36"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO mixnodes\n (mix_id, identity_key, bonded, total_stake,\n host, http_api_port, blacklisted, full_details,\n self_described, last_updated_utc, is_dp_delegatee)\n VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)\n ON CONFLICT(mix_id) DO UPDATE SET\n bonded=excluded.bonded,\n total_stake=excluded.total_stake, host=excluded.host,\n http_api_port=excluded.http_api_port,blacklisted=excluded.blacklisted,\n full_details=excluded.full_details,self_described=excluded.self_described,\n last_updated_utc=excluded.last_updated_utc,\n is_dp_delegatee = excluded.is_dp_delegatee;",
"describe": {
"columns": [],
"parameters": {
"Right": 11
},
"nullable": []
},
"hash": "6eb1a682cf13205cf701590021cdf795147ac3724e89df5b2f24f7215d87dce1"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE gateways SET last_probe_result = ? WHERE id = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 2
},
"nullable": []
},
"hash": "6ef3efde571d46961244cd90420f3de5949a5ff2083453cb879af8a1689efe2f"
}
@@ -0,0 +1,98 @@
{
"db_name": "SQLite",
"query": "SELECT\n gw.gateway_identity_key as \"gateway_identity_key!\",\n gw.bonded as \"bonded: bool\",\n gw.blacklisted as \"blacklisted: bool\",\n gw.performance as \"performance!\",\n gw.self_described as \"self_described?\",\n gw.explorer_pretty_bond as \"explorer_pretty_bond?\",\n gw.last_probe_result as \"last_probe_result?\",\n gw.last_probe_log as \"last_probe_log?\",\n gw.last_testrun_utc as \"last_testrun_utc?\",\n gw.last_updated_utc as \"last_updated_utc!\",\n COALESCE(gd.moniker, \"NA\") as \"moniker!\",\n COALESCE(gd.website, \"NA\") as \"website!\",\n COALESCE(gd.security_contact, \"NA\") as \"security_contact!\",\n COALESCE(gd.details, \"NA\") as \"details!\"\n FROM gateways gw\n LEFT JOIN gateway_description gd\n ON gw.gateway_identity_key = gd.gateway_identity_key\n ORDER BY gw.gateway_identity_key",
"describe": {
"columns": [
{
"name": "gateway_identity_key!",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "bonded: bool",
"ordinal": 1,
"type_info": "Int64"
},
{
"name": "blacklisted: bool",
"ordinal": 2,
"type_info": "Int64"
},
{
"name": "performance!",
"ordinal": 3,
"type_info": "Int64"
},
{
"name": "self_described?",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "explorer_pretty_bond?",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "last_probe_result?",
"ordinal": 6,
"type_info": "Text"
},
{
"name": "last_probe_log?",
"ordinal": 7,
"type_info": "Text"
},
{
"name": "last_testrun_utc?",
"ordinal": 8,
"type_info": "Int64"
},
{
"name": "last_updated_utc!",
"ordinal": 9,
"type_info": "Int64"
},
{
"name": "moniker!",
"ordinal": 10,
"type_info": "Text"
},
{
"name": "website!",
"ordinal": 11,
"type_info": "Text"
},
{
"name": "security_contact!",
"ordinal": 12,
"type_info": "Text"
},
{
"name": "details!",
"ordinal": 13,
"type_info": "Text"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false,
false,
false,
false,
true,
true,
true,
true,
true,
false,
false,
false,
false,
false
]
},
"hash": "71a455c705f9c25d3843ff2fb8629d1320a5eb10797cdb5a435455d22c6aeac1"
}
@@ -0,0 +1,38 @@
{
"db_name": "SQLite",
"query": "SELECT\n id as \"id!\",\n date as \"date!\",\n timestamp_utc as \"timestamp_utc!\",\n value_json as \"value_json!\"\n FROM summary_history\n ORDER BY date DESC\n LIMIT 30",
"describe": {
"columns": [
{
"name": "id!",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "date!",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "timestamp_utc!",
"ordinal": 2,
"type_info": "Int64"
},
{
"name": "value_json!",
"ordinal": 3,
"type_info": "Text"
}
],
"parameters": {
"Right": 0
},
"nullable": [
true,
false,
false,
true
]
},
"hash": "7600823da7ce80b8ffda933608603a2752e28df775d1af8fd943a5fc8d7dc00d"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO summary_history\n (date, timestamp_utc, value_json)\n VALUES (?, ?, ?)\n ON CONFLICT(date) DO UPDATE SET\n timestamp_utc=excluded.timestamp_utc,\n value_json=excluded.value_json;",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "788515c34588aec352773df4b6e6c5e41f3c0bb56a27648b5e25466b8634a578"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE gateways\n SET blacklisted = true\n WHERE gateway_identity_key = ?;",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
},
"hash": "8571faad2f66e08f24acfbfe036d17ca6eb090df7f6d52ef89c5d51564f8b45c"
}
@@ -0,0 +1,20 @@
{
"db_name": "SQLite",
"query": "SELECT count(id) FROM gateways",
"describe": {
"columns": [
{
"name": "count(id)",
"ordinal": 0,
"type_info": "Int"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false
]
},
"hash": "86ff64db477a1d6235179b0b88d86b86d1b9be62336c9eac0eef44987a5451b5"
}
@@ -0,0 +1,26 @@
{
"db_name": "SQLite",
"query": "SELECT\n gateway_identity_key as \"gateway_identity_key!\",\n bonded as \"bonded: bool\"\n FROM gateways\n ORDER BY last_testrun_utc",
"describe": {
"columns": [
{
"name": "gateway_identity_key!",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "bonded: bool",
"ordinal": 1,
"type_info": "Int64"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false,
false
]
},
"hash": "930a41e612b4e964ae214843da190f6c66c14d4267a2cc2ca73354becc2c8bb8"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE gateways SET last_testrun_utc = ?, last_updated_utc = ? WHERE id = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "c214c001acbbf79fa499816f36ec586c4c29c03efb4cf0c40b73a5c76159cf5c"
}
@@ -0,0 +1,44 @@
{
"db_name": "SQLite",
"query": "\n SELECT\n date_utc as \"date_utc!\",\n packets_received as \"total_packets_received!: i64\",\n packets_sent as \"total_packets_sent!: i64\",\n packets_dropped as \"total_packets_dropped!: i64\",\n total_stake as \"total_stake!: i64\"\n FROM (\n SELECT\n date_utc,\n SUM(packets_received) as packets_received,\n SUM(packets_sent) as packets_sent,\n SUM(packets_dropped) as packets_dropped,\n SUM(total_stake) as total_stake\n FROM mixnode_daily_stats\n GROUP BY date_utc\n ORDER BY date_utc DESC\n LIMIT 30\n )\n GROUP BY date_utc\n ORDER BY date_utc\n ",
"describe": {
"columns": [
{
"name": "date_utc!",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "total_packets_received!: i64",
"ordinal": 1,
"type_info": "Int64"
},
{
"name": "total_packets_sent!: i64",
"ordinal": 2,
"type_info": "Int64"
},
{
"name": "total_packets_dropped!: i64",
"ordinal": 3,
"type_info": "Int64"
},
{
"name": "total_stake!: i64",
"ordinal": 4,
"type_info": "Int64"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false,
true,
true,
true,
false
]
},
"hash": "c5e3cd7284b334df5aa979b1627ea1f6dc2aed00cedde25f2be3567e47064351"
}
@@ -0,0 +1,32 @@
{
"db_name": "SQLite",
"query": "SELECT\n id as \"id!\",\n identity_key as \"identity_key!\",\n bonded as \"bonded: bool\"\n FROM mixnodes\n WHERE bonded = ?",
"describe": {
"columns": [
{
"name": "id!",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "identity_key!",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "bonded: bool",
"ordinal": 2,
"type_info": "Int64"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
false,
false
]
},
"hash": "c7ba2621becb9ac4b5dee0ce303dadfcf19095935867a51cbd5b8362d1505fcc"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO gateways\n (gateway_identity_key, bonded, blacklisted,\n self_described, explorer_pretty_bond,\n last_updated_utc, performance)\n VALUES (?, ?, ?, ?, ?, ?, ?)\n ON CONFLICT(gateway_identity_key) DO UPDATE SET\n bonded=excluded.bonded,\n blacklisted=excluded.blacklisted,\n self_described=excluded.self_described,\n explorer_pretty_bond=excluded.explorer_pretty_bond,\n last_updated_utc=excluded.last_updated_utc,\n performance = excluded.performance;",
"describe": {
"columns": [],
"parameters": {
"Right": 7
},
"nullable": []
},
"hash": "d8ea93e781666e6267902170709ee2aa37f6163525bbdce1a4cebef4a285f8d9"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO summary\n (key, value_json, last_updated_utc)\n VALUES (?, ?, ?)\n ON CONFLICT(key) DO UPDATE SET\n value_json=excluded.value_json,\n last_updated_utc=excluded.last_updated_utc;",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "e0c76a959276e3b0f44c720af9c74a5bf4912ee73468e62e7d0d96b1d9074cbe"
}
@@ -0,0 +1,86 @@
{
"db_name": "SQLite",
"query": "SELECT\n mn.mix_id as \"mix_id!\",\n mn.bonded as \"bonded: bool\",\n mn.blacklisted as \"blacklisted: bool\",\n mn.is_dp_delegatee as \"is_dp_delegatee: bool\",\n mn.total_stake as \"total_stake!\",\n mn.full_details as \"full_details!\",\n mn.self_described as \"self_described\",\n mn.last_updated_utc as \"last_updated_utc!\",\n COALESCE(md.moniker, \"NA\") as \"moniker!\",\n COALESCE(md.website, \"NA\") as \"website!\",\n COALESCE(md.security_contact, \"NA\") as \"security_contact!\",\n COALESCE(md.details, \"NA\") as \"details!\"\n FROM mixnodes mn\n LEFT JOIN mixnode_description md ON mn.mix_id = md.mix_id\n ORDER BY mn.mix_id",
"describe": {
"columns": [
{
"name": "mix_id!",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "bonded: bool",
"ordinal": 1,
"type_info": "Int64"
},
{
"name": "blacklisted: bool",
"ordinal": 2,
"type_info": "Int64"
},
{
"name": "is_dp_delegatee: bool",
"ordinal": 3,
"type_info": "Int64"
},
{
"name": "total_stake!",
"ordinal": 4,
"type_info": "Int64"
},
{
"name": "full_details!",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "self_described",
"ordinal": 6,
"type_info": "Text"
},
{
"name": "last_updated_utc!",
"ordinal": 7,
"type_info": "Int64"
},
{
"name": "moniker!",
"ordinal": 8,
"type_info": "Text"
},
{
"name": "website!",
"ordinal": 9,
"type_info": "Text"
},
{
"name": "security_contact!",
"ordinal": 10,
"type_info": "Text"
},
{
"name": "details!",
"ordinal": 11,
"type_info": "Text"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false,
false,
false,
false,
false,
true,
true,
false,
false,
false,
false,
false
]
},
"hash": "f0a4316081d1be9444a87b95d933d31cb4bcc4071d31d8d2f7755e2d2c2e3e35"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE\n testruns\n SET\n status = ?\n WHERE\n status = ?\n AND\n timestamp_utc < ?\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "f5048d9926a5f5329f7f3b96d43b925e033ceec4f8112258feb4ac9e96fc5924"
}
@@ -0,0 +1,26 @@
{
"db_name": "SQLite",
"query": "UPDATE testruns\n SET status = ?\n WHERE rowid =\n (\n SELECT rowid\n FROM testruns\n WHERE status = ?\n ORDER BY timestamp_utc asc\n LIMIT 1\n )\n RETURNING\n id as \"id!\",\n gateway_id\n ",
"describe": {
"columns": [
{
"name": "id!",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "gateway_id",
"ordinal": 1,
"type_info": "Int64"
}
],
"parameters": {
"Right": 2
},
"nullable": [
true,
false
]
},
"hash": "ff9334ba7b670b218b2f9100e9ab5d2f2d08b2e53203aab9f07ea9b52acbd407"
}
+62
View File
@@ -0,0 +1,62 @@
# Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
# SPDX-License-Identifier: Apache-2.0
[package]
name = "nym-node-status-api"
version = "0.1.5"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
[dependencies]
anyhow = { workspace = true }
axum = { workspace = true, features = ["tokio", "macros"] }
chrono = { workspace = true }
clap = { workspace = true, features = ["cargo", "derive", "env", "string"] }
cosmwasm-std = { workspace = true }
envy = { workspace = true }
futures-util = { workspace = true }
moka = { workspace = true, features = ["future"] }
nym-bin-common = { path = "../common/bin-common", features = ["models"]}
nym-common-models = { path = "../common/models" }
nym-explorer-client = { path = "../explorer-api/explorer-client" }
nym-network-defaults = { path = "../common/network-defaults" }
nym-validator-client = { path = "../common/client-libs/validator-client" }
nym-task = { path = "../common/task" }
nym-node-requests = { path = "../nym-node/nym-node-requests", features = ["openapi"] }
regex = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
serde_json_path = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tracing-log = { workspace = true }
tower-http = { workspace = true, features = ["cors", "trace"] }
utoipa = { workspace = true, features = ["axum_extras", "time"] }
utoipa-swagger-ui = { workspace = true, features = ["axum"] }
# TODO dz `cargo update async-trait`
# for automatic schema detection, which was merged, but not released yet
# https://github.com/ProbablyClem/utoipauto/pull/38
# utoipauto = { git = "https://github.com/ProbablyClem/utoipauto", rev = "eb04cba" }
utoipauto = { workspace = true }
[build-dependencies]
anyhow = { workspace = true }
tokio = { workspace = true, features = ["macros"] }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
"macros",
"migrate",
] }
+45
View File
@@ -0,0 +1,45 @@
use anyhow::{anyhow, Result};
use sqlx::{Connection, SqliteConnection};
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
use tokio::{fs::File, io::AsyncWriteExt};
const SQLITE_DB_FILENAME: &str = "nym-node-status-api.sqlite";
/// If you need to re-run migrations or reset the db, just run
/// cargo clean -p nym-node-status-api
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
let out_dir = read_env_var("OUT_DIR")?;
let database_path = format!("{}/{}?mode=rwc", out_dir, SQLITE_DB_FILENAME);
write_db_path_to_file(&out_dir, SQLITE_DB_FILENAME).await?;
let mut conn = SqliteConnection::connect(&database_path).await?;
sqlx::migrate!("./migrations").run(&mut conn).await?;
#[cfg(target_family = "unix")]
println!("cargo::rustc-env=DATABASE_URL=sqlite://{}", &database_path);
#[cfg(target_family = "windows")]
// for some strange reason we need to add a leading `/` to the windows path even though it's
// not a valid windows path... but hey, it works...
println!("cargo::rustc-env=DATABASE_URL=sqlite:///{}", &database_path);
Ok(())
}
fn read_env_var(var: &str) -> Result<String> {
std::env::var(var).map_err(|_| anyhow!("You need to set {} env var", var))
}
/// use `./enter_db.sh` to inspect DB
async fn write_db_path_to_file(out_dir: &str, db_filename: &str) -> anyhow::Result<()> {
let mut file = File::create("enter_db.sh").await?;
let _ = file.write(b"#!/bin/bash\n").await?;
file.write_all(format!("sqlite3 {}/{}", out_dir, db_filename).as_bytes())
.await?;
file.set_permissions(Permissions::from_mode(0o755))
.await
.map_err(From::from)
}
+40
View File
@@ -0,0 +1,40 @@
#!/bin/bash
set -e
export RUST_LOG=${RUST_LOG:-debug}
export NYM_API_CLIENT_TIMEOUT=60
export EXPLORER_CLIENT_TIMEOUT=60
export NODE_STATUS_API_TESTRUN_REFRESH_INTERVAL=60
export ENVIRONMENT="qa.env"
function run_bare() {
# export necessary env vars
set -a
source ../envs/$ENVIRONMENT
set +a
export RUST_LOG=debug
# --conection-url is provided in build.rs
cargo run --package nym-node-status-api
}
function run_docker() {
cargo build --package nym-node-status-api --release
cp ../target/release/nym-node-status-api .
cd ..
docker build -t node-status-api -f nym-node-status-api/Dockerfile.dev .
docker run --env-file envs/${ENVIRONMENT} \
-e EXPLORER_CLIENT_TIMEOUT=$EXPLORER_CLIENT_TIMEOUT \
-e NYM_API_CLIENT_TIMEOUT=$NYM_API_CLIENT_TIMEOUT \
-e DATABASE_URL="sqlite://node-status-api.sqlite?mode=rwc" \
-e RUST_LOG=${RUST_LOG} node-status-api
}
run_bare
# run_docker
+112
View File
@@ -0,0 +1,112 @@
CREATE TABLE gateways
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
gateway_identity_key VARCHAR NOT NULL UNIQUE,
self_described VARCHAR NOT NULL,
explorer_pretty_bond VARCHAR,
last_probe_result VARCHAR,
last_probe_log VARCHAR,
config_score INTEGER NOT NULL DEFAULT (0),
config_score_successes REAL NOT NULL DEFAULT (0),
config_score_samples REAL NOT NULL DEFAULT (0),
routing_score INTEGER NOT NULL DEFAULT (0),
routing_score_successes REAL NOT NULL DEFAULT (0),
routing_score_samples REAL NOT NULL DEFAULT (0),
test_run_samples REAL NOT NULL DEFAULT (0),
last_testrun_utc INTEGER,
last_updated_utc INTEGER NOT NULL,
bonded INTEGER CHECK (bonded in (0, 1)) NOT NULL DEFAULT 0,
blacklisted INTEGER CHECK (bonded in (0, 1)) NOT NULL DEFAULT 0,
performance INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX idx_gateway_description_gateway_identity_key ON gateways (gateway_identity_key);
CREATE TABLE mixnodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
identity_key VARCHAR NOT NULL UNIQUE,
mix_id INTEGER NOT NULL UNIQUE,
bonded INTEGER CHECK (bonded in (0, 1)) NOT NULL DEFAULT 0,
total_stake INTEGER NOT NULL,
host VARCHAR NOT NULL,
http_api_port INTEGER NOT NULL,
blacklisted INTEGER CHECK (blacklisted in (0, 1)) NOT NULL DEFAULT 0,
full_details VARCHAR,
self_described VARCHAR,
last_updated_utc INTEGER NOT NULL
, is_dp_delegatee INTEGER CHECK (is_dp_delegatee IN (0, 1)) NOT NULL DEFAULT 0);
CREATE INDEX idx_mixnodes_mix_id ON mixnodes (mix_id);
CREATE INDEX idx_mixnodes_identity_key ON mixnodes (identity_key);
CREATE TABLE
mixnode_description (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mix_id INTEGER UNIQUE NOT NULL,
moniker VARCHAR,
website VARCHAR,
security_contact VARCHAR,
details VARCHAR,
last_updated_utc INTEGER NOT NULL,
FOREIGN KEY (mix_id) REFERENCES mixnodes (mix_id)
);
-- Indexes for description table
CREATE INDEX idx_mixnode_description_mix_id ON mixnode_description (mix_id);
CREATE TABLE summary
(
key VARCHAR PRIMARY KEY,
value_json VARCHAR,
last_updated_utc INTEGER NOT NULL
);
CREATE TABLE summary_history
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
date VARCHAR UNIQUE NOT NULL,
timestamp_utc INTEGER NOT NULL,
value_json VARCHAR
);
CREATE INDEX idx_summary_history_timestamp_utc ON summary_history (timestamp_utc);
CREATE INDEX idx_summary_history_date ON summary_history (date);
CREATE TABLE gateway_description (
id INTEGER PRIMARY KEY AUTOINCREMENT,
gateway_identity_key VARCHAR UNIQUE NOT NULL,
moniker VARCHAR,
website VARCHAR,
security_contact VARCHAR,
details VARCHAR,
last_updated_utc INTEGER NOT NULL,
FOREIGN KEY (gateway_identity_key) REFERENCES gateways (gateway_identity_key)
);
CREATE TABLE
mixnode_daily_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mix_id INTEGER NOT NULL,
total_stake BIGINT NOT NULL,
date_utc VARCHAR NOT NULL,
packets_received INTEGER DEFAULT 0,
packets_sent INTEGER DEFAULT 0,
packets_dropped INTEGER DEFAULT 0,
FOREIGN KEY (mix_id) REFERENCES mixnodes (mix_id),
UNIQUE (mix_id, date_utc) -- This constraint automatically creates an index
);
CREATE TABLE testruns
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
gateway_id INTEGER NOT NULL,
status INTEGER NOT NULL, -- 0=pending, 1=in-progress, 2=complete
timestamp_utc INTEGER NOT NULL,
ip_address VARCHAR NOT NULL,
log VARCHAR NOT NULL,
FOREIGN KEY (gateway_id) REFERENCES gateways (id)
);
+335
View File
@@ -0,0 +1,335 @@
use crate::{
http::{self, models::SummaryHistory},
monitor::NumericalCheckedCast,
};
use nym_node_requests::api::v1::node::models::NodeDescription;
use serde::{Deserialize, Serialize};
use strum_macros::{EnumString, FromRepr};
use utoipa::ToSchema;
pub(crate) struct GatewayRecord {
pub(crate) identity_key: String,
pub(crate) bonded: bool,
pub(crate) blacklisted: bool,
pub(crate) self_described: String,
pub(crate) explorer_pretty_bond: Option<String>,
pub(crate) last_updated_utc: i64,
pub(crate) performance: u8,
}
#[derive(Debug, Clone)]
pub(crate) struct GatewayDto {
pub(crate) gateway_identity_key: String,
pub(crate) bonded: bool,
pub(crate) blacklisted: bool,
pub(crate) performance: i64,
pub(crate) self_described: Option<String>,
pub(crate) explorer_pretty_bond: Option<String>,
pub(crate) last_probe_result: Option<String>,
pub(crate) last_probe_log: Option<String>,
pub(crate) last_testrun_utc: Option<i64>,
pub(crate) last_updated_utc: i64,
pub(crate) moniker: String,
pub(crate) security_contact: String,
pub(crate) details: String,
pub(crate) website: String,
}
impl TryFrom<GatewayDto> for http::models::Gateway {
type Error = anyhow::Error;
fn try_from(value: GatewayDto) -> Result<Self, Self::Error> {
// Instead of using routing_score_successes / routing_score_samples, we use the
// number of successful testruns in the last 24h.
let routing_score = 0f32;
let config_score = 0u32;
let last_updated_utc =
timestamp_as_utc(value.last_updated_utc.cast_checked()?).to_rfc3339();
let last_testrun_utc = value
.last_testrun_utc
.and_then(|i| i.cast_checked().ok())
.map(|t| timestamp_as_utc(t).to_rfc3339());
let self_described = value.self_described.clone().unwrap_or("null".to_string());
let explorer_pretty_bond = value
.explorer_pretty_bond
.clone()
.unwrap_or("null".to_string());
let last_probe_result = value
.last_probe_result
.clone()
.unwrap_or("null".to_string());
let last_probe_log = value.last_probe_log.clone();
let self_described = serde_json::from_str(&self_described).unwrap_or(None);
let explorer_pretty_bond = serde_json::from_str(&explorer_pretty_bond).unwrap_or(None);
let last_probe_result = serde_json::from_str(&last_probe_result).unwrap_or(None);
let bonded = value.bonded;
let blacklisted = value.blacklisted;
let performance = value.performance as u8;
let description = NodeDescription {
moniker: value.moniker.clone(),
website: value.website.clone(),
security_contact: value.security_contact.clone(),
details: value.details.clone(),
};
Ok(http::models::Gateway {
gateway_identity_key: value.gateway_identity_key.clone(),
bonded,
blacklisted,
performance,
self_described,
explorer_pretty_bond,
description,
last_probe_result,
last_probe_log,
routing_score,
config_score,
last_testrun_utc,
last_updated_utc,
})
}
}
fn timestamp_as_utc(unix_timestamp: u64) -> chrono::DateTime<chrono::Utc> {
let d = std::time::UNIX_EPOCH + std::time::Duration::from_secs(unix_timestamp);
d.into()
}
pub(crate) struct MixnodeRecord {
pub(crate) mix_id: u32,
pub(crate) identity_key: String,
pub(crate) bonded: bool,
pub(crate) total_stake: i64,
pub(crate) host: String,
pub(crate) http_port: u16,
pub(crate) blacklisted: bool,
pub(crate) full_details: String,
pub(crate) self_described: Option<String>,
pub(crate) last_updated_utc: i64,
pub(crate) is_dp_delegatee: bool,
}
#[derive(Debug, Clone)]
pub(crate) struct MixnodeDto {
pub(crate) mix_id: i64,
pub(crate) bonded: bool,
pub(crate) blacklisted: bool,
pub(crate) is_dp_delegatee: bool,
pub(crate) total_stake: i64,
pub(crate) full_details: String,
pub(crate) self_described: Option<String>,
pub(crate) last_updated_utc: i64,
pub(crate) moniker: String,
pub(crate) website: String,
pub(crate) security_contact: String,
pub(crate) details: String,
}
impl TryFrom<MixnodeDto> for http::models::Mixnode {
type Error = anyhow::Error;
fn try_from(value: MixnodeDto) -> Result<Self, Self::Error> {
let mix_id = value.mix_id.cast_checked()?;
let full_details = value.full_details.clone();
let full_details = serde_json::from_str(&full_details).unwrap_or(None);
let self_described = value
.self_described
.clone()
.map(|v| serde_json::from_str(&v).unwrap_or(serde_json::Value::Null));
let last_updated_utc =
timestamp_as_utc(value.last_updated_utc.cast_checked()?).to_rfc3339();
let blacklisted = value.blacklisted;
let is_dp_delegatee = value.is_dp_delegatee;
let moniker = value.moniker.clone();
let website = value.website.clone();
let security_contact = value.security_contact.clone();
let details = value.details.clone();
Ok(http::models::Mixnode {
mix_id,
bonded: value.bonded,
blacklisted,
is_dp_delegatee,
total_stake: value.total_stake,
full_details,
description: NodeDescription {
moniker,
website,
security_contact,
details,
},
self_described,
last_updated_utc,
})
}
}
#[allow(unused)]
#[derive(Debug, Clone)]
pub(crate) struct BondedStatusDto {
pub(crate) id: i64,
pub(crate) identity_key: String,
pub(crate) bonded: bool,
}
#[allow(unused)]
#[derive(Debug, Clone, Default)]
pub(crate) struct SummaryDto {
pub(crate) key: String,
pub(crate) value_json: String,
pub(crate) last_updated_utc: i64,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct SummaryHistoryDto {
#[allow(dead_code)]
pub id: i64,
pub date: String,
pub value_json: String,
pub timestamp_utc: i64,
}
impl TryFrom<SummaryHistoryDto> for SummaryHistory {
type Error = anyhow::Error;
fn try_from(value: SummaryHistoryDto) -> Result<Self, Self::Error> {
let value_json = serde_json::from_str(&value.value_json).unwrap_or_default();
Ok(SummaryHistory {
value_json,
date: value.date.clone(),
timestamp_utc: timestamp_as_utc(value.timestamp_utc.cast_checked()?).to_rfc3339(),
})
}
}
pub(crate) const MIXNODES_BONDED_COUNT: &str = "mixnodes.bonded.count";
pub(crate) const MIXNODES_BONDED_ACTIVE: &str = "mixnodes.bonded.active";
pub(crate) const MIXNODES_BONDED_INACTIVE: &str = "mixnodes.bonded.inactive";
pub(crate) const MIXNODES_BONDED_RESERVE: &str = "mixnodes.bonded.reserve";
pub(crate) const MIXNODES_BLACKLISTED_COUNT: &str = "mixnodes.blacklisted.count";
pub(crate) const GATEWAYS_BONDED_COUNT: &str = "gateways.bonded.count";
pub(crate) const GATEWAYS_EXPLORER_COUNT: &str = "gateways.explorer.count";
pub(crate) const GATEWAYS_BLACKLISTED_COUNT: &str = "gateways.blacklisted.count";
pub(crate) const MIXNODES_HISTORICAL_COUNT: &str = "mixnodes.historical.count";
pub(crate) const GATEWAYS_HISTORICAL_COUNT: &str = "gateways.historical.count";
// `utoipa`` goes crazy if you use module-qualified prefix as field type so we
// have to import it
use gateway::GatewaySummary;
use mixnode::MixnodeSummary;
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct NetworkSummary {
pub(crate) mixnodes: MixnodeSummary,
pub(crate) gateways: GatewaySummary,
}
pub(crate) mod mixnode {
use super::*;
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct MixnodeSummary {
pub(crate) bonded: MixnodeSummaryBonded,
pub(crate) blacklisted: MixnodeSummaryBlacklisted,
pub(crate) historical: MixnodeSummaryHistorical,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct MixnodeSummaryBonded {
pub(crate) count: i32,
pub(crate) active: i32,
pub(crate) inactive: i32,
pub(crate) reserve: i32,
pub(crate) last_updated_utc: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct MixnodeSummaryBlacklisted {
pub(crate) count: i32,
pub(crate) last_updated_utc: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct MixnodeSummaryHistorical {
pub(crate) count: i32,
pub(crate) last_updated_utc: String,
}
}
pub(crate) mod gateway {
use super::*;
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct GatewaySummary {
pub(crate) bonded: GatewaySummaryBonded,
pub(crate) blacklisted: GatewaySummaryBlacklisted,
pub(crate) historical: GatewaySummaryHistorical,
pub(crate) explorer: GatewaySummaryExplorer,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct GatewaySummaryExplorer {
pub(crate) count: i32,
pub(crate) last_updated_utc: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct GatewaySummaryBonded {
pub(crate) count: i32,
pub(crate) last_updated_utc: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct GatewaySummaryHistorical {
pub(crate) count: i32,
pub(crate) last_updated_utc: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct GatewaySummaryBlacklisted {
pub(crate) count: i32,
pub(crate) last_updated_utc: String,
}
}
#[allow(dead_code)] // not dead code, this is SQL data model
#[derive(Debug, Clone)]
pub struct TestRunDto {
pub id: i64,
pub gateway_id: i64,
pub status: i64,
pub timestamp_utc: i64,
pub ip_address: String,
pub log: String,
}
#[derive(Debug, Clone, strum_macros::Display, EnumString, FromRepr, PartialEq)]
#[repr(u8)]
pub(crate) enum TestRunStatus {
Complete = 2,
InProgress = 1,
Queued = 0,
}
#[derive(Debug, Clone)]
pub struct GatewayIdentityDto {
pub gateway_identity_key: String,
pub bonded: bool,
}
#[allow(dead_code)] // it's not dead code but clippy doesn't detect usage in sqlx macros
#[derive(Debug, Clone)]
pub struct GatewayInfoDto {
pub id: i64,
pub gateway_identity_key: String,
pub self_described: Option<String>,
pub explorer_pretty_bond: Option<String>,
}
@@ -0,0 +1,180 @@
use crate::{
db::{
models::{BondedStatusDto, GatewayDto, GatewayRecord},
DbPool,
},
http::models::Gateway,
};
use futures_util::TryStreamExt;
use nym_validator_client::models::NymNodeDescription;
use sqlx::{pool::PoolConnection, Sqlite};
use tracing::error;
pub(crate) async fn select_gateway_identity(
conn: &mut PoolConnection<Sqlite>,
gateway_pk: i64,
) -> anyhow::Result<String> {
let record = sqlx::query!(
r#"SELECT
gateway_identity_key
FROM
gateways
WHERE
id = ?"#,
gateway_pk
)
.fetch_one(conn.as_mut())
.await?;
Ok(record.gateway_identity_key)
}
pub(crate) async fn insert_gateways(
pool: &DbPool,
gateways: Vec<GatewayRecord>,
) -> anyhow::Result<()> {
let mut db = pool.acquire().await?;
for record in gateways {
sqlx::query!(
"INSERT INTO gateways
(gateway_identity_key, bonded, blacklisted,
self_described, explorer_pretty_bond,
last_updated_utc, performance)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(gateway_identity_key) DO UPDATE SET
bonded=excluded.bonded,
blacklisted=excluded.blacklisted,
self_described=excluded.self_described,
explorer_pretty_bond=excluded.explorer_pretty_bond,
last_updated_utc=excluded.last_updated_utc,
performance = excluded.performance;",
record.identity_key,
record.bonded,
record.blacklisted,
record.self_described,
record.explorer_pretty_bond,
record.last_updated_utc,
record.performance
)
.execute(&mut *db)
.await?;
}
Ok(())
}
pub(crate) async fn write_blacklisted_gateways_to_db<'a, I>(
pool: &DbPool,
gateways: I,
) -> anyhow::Result<()>
where
I: Iterator<Item = &'a String>,
{
let mut conn = pool.acquire().await?;
for gateway_identity_key in gateways {
sqlx::query!(
"UPDATE gateways
SET blacklisted = true
WHERE gateway_identity_key = ?;",
gateway_identity_key,
)
.execute(&mut *conn)
.await?;
}
Ok(())
}
/// Ensure all gateways that are set as bonded, are still bonded
pub(crate) async fn ensure_gateways_still_bonded(
pool: &DbPool,
gateways: &[&NymNodeDescription],
) -> anyhow::Result<usize> {
let bonded_gateways_rows = get_all_bonded_gateways_row_ids_by_status(pool, true).await?;
let unbonded_gateways_rows = bonded_gateways_rows.iter().filter(|v| {
!gateways
.iter()
.any(|bonded| *bonded.ed25519_identity_key().to_base58_string() == v.identity_key)
});
let recently_unbonded_gateways = unbonded_gateways_rows.to_owned().count();
let last_updated_utc = chrono::offset::Utc::now().timestamp();
let mut transaction = pool.begin().await?;
for row in unbonded_gateways_rows {
sqlx::query!(
"UPDATE gateways
SET bonded = ?, last_updated_utc = ?
WHERE id = ?;",
false,
last_updated_utc,
row.id,
)
.execute(&mut *transaction)
.await?;
}
transaction.commit().await?;
Ok(recently_unbonded_gateways)
}
async fn get_all_bonded_gateways_row_ids_by_status(
pool: &DbPool,
status: bool,
) -> anyhow::Result<Vec<BondedStatusDto>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query_as!(
BondedStatusDto,
r#"SELECT
id as "id!",
gateway_identity_key as "identity_key!",
bonded as "bonded: bool"
FROM gateways
WHERE bonded = ?"#,
status,
)
.fetch(&mut *conn)
.try_collect::<Vec<_>>()
.await?;
Ok(items)
}
pub(crate) async fn get_all_gateways(pool: &DbPool) -> anyhow::Result<Vec<Gateway>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query_as!(
GatewayDto,
r#"SELECT
gw.gateway_identity_key as "gateway_identity_key!",
gw.bonded as "bonded: bool",
gw.blacklisted as "blacklisted: bool",
gw.performance as "performance!",
gw.self_described as "self_described?",
gw.explorer_pretty_bond as "explorer_pretty_bond?",
gw.last_probe_result as "last_probe_result?",
gw.last_probe_log as "last_probe_log?",
gw.last_testrun_utc as "last_testrun_utc?",
gw.last_updated_utc as "last_updated_utc!",
COALESCE(gd.moniker, "NA") as "moniker!",
COALESCE(gd.website, "NA") as "website!",
COALESCE(gd.security_contact, "NA") as "security_contact!",
COALESCE(gd.details, "NA") as "details!"
FROM gateways gw
LEFT JOIN gateway_description gd
ON gw.gateway_identity_key = gd.gateway_identity_key
ORDER BY gw.gateway_identity_key"#,
)
.fetch(&mut *conn)
.try_collect::<Vec<_>>()
.await?;
let items: Vec<Gateway> = items
.into_iter()
.map(|item| item.try_into())
.collect::<anyhow::Result<Vec<_>>>()
.map_err(|e| {
error!("Conversion from DTO failed: {e}. Invalidly stored data?");
e
})?;
tracing::trace!("Fetched {} gateways from DB", items.len());
Ok(items)
}
+15
View File
@@ -0,0 +1,15 @@
mod gateways;
mod misc;
mod mixnodes;
mod summary;
pub(crate) mod testruns;
pub(crate) use gateways::{
ensure_gateways_still_bonded, get_all_gateways, insert_gateways, select_gateway_identity,
write_blacklisted_gateways_to_db,
};
pub(crate) use misc::insert_summaries;
pub(crate) use mixnodes::{
ensure_mixnodes_still_bonded, get_all_mixnodes, get_daily_stats, insert_mixnodes,
};
pub(crate) use summary::{get_summary, get_summary_history};
@@ -0,0 +1,184 @@
use crate::db::DbPool;
use crate::http::models::TestrunAssignment;
use crate::{
db::models::{TestRunDto, TestRunStatus},
testruns::now_utc,
};
use anyhow::Context;
use chrono::Duration;
use sqlx::{pool::PoolConnection, Sqlite};
pub(crate) async fn get_in_progress_testrun_by_id(
conn: &mut PoolConnection<Sqlite>,
testrun_id: i64,
) -> anyhow::Result<TestRunDto> {
sqlx::query_as!(
TestRunDto,
r#"SELECT
id as "id!",
gateway_id as "gateway_id!",
status as "status!",
timestamp_utc as "timestamp_utc!",
ip_address as "ip_address!",
log as "log!"
FROM testruns
WHERE
id = ?
AND
status = ?
ORDER BY timestamp_utc"#,
testrun_id,
TestRunStatus::InProgress as i64,
)
.fetch_one(conn.as_mut())
.await
.context(format!("Couldn't retrieve testrun {testrun_id}"))
}
pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> anyhow::Result<u64> {
let mut conn = db.acquire().await?;
let previous_run = now_utc() - age;
let cutoff_timestamp = previous_run.timestamp();
let res = sqlx::query!(
r#"UPDATE
testruns
SET
status = ?
WHERE
status = ?
AND
timestamp_utc < ?
"#,
TestRunStatus::Queued as i64,
TestRunStatus::InProgress as i64,
cutoff_timestamp
)
.execute(conn.as_mut())
.await?;
let stale_testruns = res.rows_affected();
if stale_testruns > 0 {
tracing::debug!(
"Refreshed {} stale testruns, scheduled before {} but not yet finished",
stale_testruns,
previous_run
);
}
Ok(stale_testruns)
}
pub(crate) async fn get_oldest_testrun_and_make_it_pending(
conn: &mut PoolConnection<Sqlite>,
) -> anyhow::Result<Option<TestrunAssignment>> {
// find & mark as "In progress" in the same transaction to avoid race conditions
let returning = sqlx::query!(
r#"UPDATE testruns
SET status = ?
WHERE rowid =
(
SELECT rowid
FROM testruns
WHERE status = ?
ORDER BY timestamp_utc asc
LIMIT 1
)
RETURNING
id as "id!",
gateway_id
"#,
TestRunStatus::InProgress as i64,
TestRunStatus::Queued as i64,
)
.fetch_optional(conn.as_mut())
.await?;
if let Some(testrun) = returning {
let gw_identity = sqlx::query!(
r#"
SELECT
id,
gateway_identity_key
FROM gateways
WHERE id = ?
LIMIT 1"#,
testrun.gateway_id
)
.fetch_one(conn.as_mut())
.await?;
Ok(Some(TestrunAssignment {
testrun_id: testrun.id,
gateway_identity_key: gw_identity.gateway_identity_key,
}))
} else {
Ok(None)
}
}
pub(crate) async fn update_testrun_status(
conn: &mut PoolConnection<Sqlite>,
testrun_id: i64,
status: TestRunStatus,
) -> anyhow::Result<()> {
let status = status as u32;
sqlx::query!(
"UPDATE testruns SET status = ? WHERE id = ?",
status,
testrun_id
)
.execute(conn.as_mut())
.await?;
Ok(())
}
pub(crate) async fn update_gateway_last_probe_log(
conn: &mut PoolConnection<Sqlite>,
gateway_pk: i64,
log: &str,
) -> anyhow::Result<()> {
sqlx::query!(
"UPDATE gateways SET last_probe_log = ? WHERE id = ?",
log,
gateway_pk
)
.execute(conn.as_mut())
.await
.map(drop)
.map_err(From::from)
}
pub(crate) async fn update_gateway_last_probe_result(
conn: &mut PoolConnection<Sqlite>,
gateway_pk: i64,
result: &str,
) -> anyhow::Result<()> {
sqlx::query!(
"UPDATE gateways SET last_probe_result = ? WHERE id = ?",
result,
gateway_pk
)
.execute(conn.as_mut())
.await
.map(drop)
.map_err(From::from)
}
pub(crate) async fn update_gateway_score(
conn: &mut PoolConnection<Sqlite>,
gateway_pk: i64,
) -> anyhow::Result<()> {
let now = now_utc().timestamp();
sqlx::query!(
"UPDATE gateways SET last_testrun_utc = ?, last_updated_utc = ? WHERE id = ?",
now,
now,
gateway_pk
)
.execute(conn.as_mut())
.await
.map(drop)
.map_err(From::from)
}
+94
View File
@@ -0,0 +1,94 @@
use anyhow::anyhow;
use axum::{response::Redirect, Router};
use tokio::net::ToSocketAddrs;
use tower_http::{
cors::CorsLayer,
trace::{DefaultOnResponse, TraceLayer},
};
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
use crate::http::{server::HttpServer, state::AppState};
pub(crate) mod gateways;
pub(crate) mod mixnodes;
pub(crate) mod services;
pub(crate) mod summary;
pub(crate) mod testruns;
pub(crate) struct RouterBuilder {
unfinished_router: Router<AppState>,
}
impl RouterBuilder {
pub(crate) fn with_default_routes() -> Self {
let router = Router::new()
.merge(
SwaggerUi::new("/swagger")
.url("/api-docs/openapi.json", super::api_docs::ApiDoc::openapi()),
)
.route(
"/",
axum::routing::get(|| async { Redirect::permanent("/swagger") }),
)
.nest(
"/v2",
Router::new()
.nest("/gateways", gateways::routes())
.nest("/mixnodes", mixnodes::routes())
.nest("/services", services::routes())
.nest("/summary", summary::routes()),
)
.nest(
"/internal",
Router::new().nest("/testruns", testruns::routes()),
);
Self {
unfinished_router: router,
}
}
pub(crate) fn with_state(self, state: AppState) -> RouterWithState {
RouterWithState {
router: self.finalize_routes().with_state(state),
}
}
fn finalize_routes(self) -> Router<AppState> {
// layers added later wrap earlier layers
self.unfinished_router
// CORS layer needs to wrap other API layers
.layer(setup_cors())
// logger should be outermost layer
.layer(
TraceLayer::new_for_http()
.on_response(DefaultOnResponse::new().level(tracing::Level::DEBUG)),
)
}
}
pub(crate) struct RouterWithState {
router: Router,
}
impl RouterWithState {
pub(crate) async fn build_server<A: ToSocketAddrs>(
self,
bind_address: A,
) -> anyhow::Result<HttpServer> {
tokio::net::TcpListener::bind(bind_address)
.await
.map(|listener| HttpServer::new(self.router, listener))
.map_err(|err| anyhow!("Couldn't bind to address due to {}", err))
}
}
fn setup_cors() -> CorsLayer {
use axum::http::Method;
CorsLayer::new()
.allow_origin(tower_http::cors::Any)
.allow_methods([Method::POST, Method::GET, Method::PATCH, Method::OPTIONS])
.allow_headers(tower_http::cors::Any)
.allow_credentials(false)
}
@@ -0,0 +1,125 @@
use axum::extract::DefaultBodyLimit;
use axum::Json;
use axum::{
extract::{Path, State},
Router,
};
use reqwest::StatusCode;
use crate::db::models::TestRunStatus;
use crate::db::queries;
use crate::{
db,
http::{
error::{HttpError, HttpResult},
models::TestrunAssignment,
state::AppState,
},
};
// TODO dz consider adding endpoint to trigger testrun scan for a given gateway_id
// like in H< src/http/testruns.rs
pub(crate) fn routes() -> Router<AppState> {
Router::new()
.route("/", axum::routing::get(request_testrun))
.route("/:testrun_id", axum::routing::post(submit_testrun))
.layer(DefaultBodyLimit::max(1024 * 1024 * 5))
}
#[tracing::instrument(level = "debug", skip_all)]
async fn request_testrun(State(state): State<AppState>) -> HttpResult<Json<TestrunAssignment>> {
// TODO dz log agent's key
// TODO dz log agent's network probe version
tracing::debug!("Agent requested testrun");
let db = state.db_pool();
let mut conn = db
.acquire()
.await
.map_err(HttpError::internal_with_logging)?;
return match db::queries::testruns::get_oldest_testrun_and_make_it_pending(&mut conn).await {
Ok(res) => {
if let Some(testrun) = res {
tracing::debug!(
"🏃‍ Assigned testrun row_id {} gateway {} to agent",
&testrun.testrun_id,
testrun.gateway_identity_key
);
Ok(Json(testrun))
} else {
Err(HttpError::no_available_testruns())
}
}
Err(err) => Err(HttpError::internal_with_logging(err)),
};
}
// TODO dz accept testrun_id as query parameter
#[tracing::instrument(level = "debug", skip_all)]
async fn submit_testrun(
Path(testrun_id): Path<i64>,
State(state): State<AppState>,
body: String,
) -> HttpResult<StatusCode> {
let db = state.db_pool();
let mut conn = db
.acquire()
.await
.map_err(HttpError::internal_with_logging)?;
let testrun = queries::testruns::get_in_progress_testrun_by_id(&mut conn, testrun_id)
.await
.map_err(|e| {
tracing::error!("{e}");
HttpError::not_found(testrun_id)
})?;
let gw_identity = db::queries::select_gateway_identity(&mut conn, testrun.gateway_id)
.await
.map_err(|_| {
// should never happen:
HttpError::internal_with_logging("No gateway found for testrun")
})?;
tracing::debug!(
"Agent submitted testrun {} for gateway {} ({} bytes)",
testrun_id,
gw_identity,
body.len(),
);
// TODO dz this should be part of a single transaction: commit after everything is done
queries::testruns::update_testrun_status(&mut conn, testrun_id, TestRunStatus::Complete)
.await
.map_err(HttpError::internal_with_logging)?;
queries::testruns::update_gateway_last_probe_log(&mut conn, testrun.gateway_id, &body)
.await
.map_err(HttpError::internal_with_logging)?;
let result = get_result_from_log(&body);
queries::testruns::update_gateway_last_probe_result(&mut conn, testrun.gateway_id, &result)
.await
.map_err(HttpError::internal_with_logging)?;
queries::testruns::update_gateway_score(&mut conn, testrun.gateway_id)
.await
.map_err(HttpError::internal_with_logging)?;
// TODO dz log gw identity key
tracing::info!(
"✅ Testrun row_id {} for gateway {} complete",
testrun.id,
gw_identity
);
Ok(StatusCode::CREATED)
}
fn get_result_from_log(log: &str) -> String {
let re = regex::Regex::new(r"\n\{\s").unwrap();
let result: Vec<_> = re.splitn(log, 2).collect();
if result.len() == 2 {
let res = format!("{} {}", "{", result[1]).to_string();
return res;
}
"".to_string()
}
+48
View File
@@ -0,0 +1,48 @@
use std::fmt::Display;
pub(crate) type HttpResult<T> = Result<T, HttpError>;
pub(crate) struct HttpError {
message: String,
status: axum::http::StatusCode,
}
impl HttpError {
pub(crate) fn invalid_input(msg: impl Display) -> Self {
Self {
message: serde_json::json!({"message": msg.to_string()}).to_string(),
status: axum::http::StatusCode::BAD_REQUEST,
}
}
pub(crate) fn internal_with_logging(msg: impl Display) -> Self {
tracing::error!("{}", msg.to_string());
Self::internal()
}
pub(crate) fn internal() -> Self {
Self {
message: serde_json::json!({"message": "Internal server error"}).to_string(),
status: axum::http::StatusCode::INTERNAL_SERVER_ERROR,
}
}
pub(crate) fn no_available_testruns() -> Self {
Self {
message: serde_json::json!({"message": "No available testruns"}).to_string(),
status: axum::http::StatusCode::SERVICE_UNAVAILABLE,
}
}
pub(crate) fn not_found(msg: impl Display) -> Self {
Self {
message: serde_json::json!({"message": msg.to_string()}).to_string(),
status: axum::http::StatusCode::NOT_FOUND,
}
}
}
impl axum::response::IntoResponse for HttpError {
fn into_response(self) -> axum::response::Response {
(self.status, self.message).into_response()
}
}
+76
View File
@@ -0,0 +1,76 @@
use nym_node_requests::api::v1::node::models::NodeDescription;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
pub(crate) use nym_common_models::ns_api::TestrunAssignment;
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct Gateway {
pub gateway_identity_key: String,
pub bonded: bool,
pub blacklisted: bool,
pub performance: u8,
pub self_described: Option<serde_json::Value>,
pub explorer_pretty_bond: Option<serde_json::Value>,
pub description: NodeDescription,
pub last_probe_result: Option<serde_json::Value>,
pub last_probe_log: Option<String>,
pub last_testrun_utc: Option<String>,
pub last_updated_utc: String,
pub routing_score: f32,
pub config_score: u32,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct GatewaySkinny {
pub gateway_identity_key: String,
pub self_described: Option<serde_json::Value>,
pub explorer_pretty_bond: Option<serde_json::Value>,
pub last_probe_result: Option<serde_json::Value>,
pub last_testrun_utc: Option<String>,
pub last_updated_utc: String,
pub routing_score: f32,
pub config_score: u32,
pub performance: u8,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct Mixnode {
pub mix_id: u32,
pub bonded: bool,
pub blacklisted: bool,
pub is_dp_delegatee: bool,
pub total_stake: i64,
pub full_details: Option<serde_json::Value>,
pub self_described: Option<serde_json::Value>,
pub description: NodeDescription,
pub last_updated_utc: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct DailyStats {
pub date_utc: String,
pub total_packets_received: i64,
pub total_packets_sent: i64,
pub total_packets_dropped: i64,
pub total_stake: i64,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct Service {
pub gateway_identity_key: String,
pub last_updated_utc: String,
pub routing_score: f32,
pub service_provider_client_id: Option<String>,
pub ip_address: Option<String>,
pub hostname: Option<String>,
pub mixnet_websockets: Option<serde_json::Value>,
pub last_successful_ping_utc: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct SummaryHistory {
pub date: String,
pub value_json: serde_json::Value,
pub timestamp_utc: String,
}
+511
View File
@@ -0,0 +1,511 @@
use crate::db::models::{
gateway, mixnode, GatewayRecord, MixnodeRecord, NetworkSummary, GATEWAYS_BLACKLISTED_COUNT,
GATEWAYS_BONDED_COUNT, GATEWAYS_EXPLORER_COUNT, GATEWAYS_HISTORICAL_COUNT,
MIXNODES_BLACKLISTED_COUNT, MIXNODES_BONDED_ACTIVE, MIXNODES_BONDED_COUNT,
MIXNODES_BONDED_INACTIVE, MIXNODES_BONDED_RESERVE, MIXNODES_HISTORICAL_COUNT,
};
use crate::db::{queries, DbPool};
use anyhow::anyhow;
use cosmwasm_std::Decimal;
use nym_explorer_client::{ExplorerClient, PrettyDetailedGatewayBond};
use nym_network_defaults::NymNetworkDetails;
use nym_validator_client::client::NymApiClientExt;
use nym_validator_client::models::{
LegacyDescribedMixNode, MixNodeBondAnnotated, NymNodeDescription,
};
use nym_validator_client::nym_nodes::SkimmedNode;
use nym_validator_client::nyxd::contract_traits::PagedMixnetQueryClient;
use nym_validator_client::nyxd::{AccountId, NyxdClient};
use nym_validator_client::NymApiClient;
use reqwest::Url;
use std::collections::HashSet;
use std::str::FromStr;
use tokio::time::Duration;
use tracing::instrument;
// TODO dz should be configurable
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60);
static DELEGATION_PROGRAM_WALLET: &str = "n1rnxpdpx3kldygsklfft0gech7fhfcux4zst5lw";
// TODO dz: query many NYM APIs:
// multiple instances running directory cache, ask sachin
#[instrument(level = "debug", name = "data_monitor", skip_all)]
pub(crate) async fn spawn_in_background(
db_pool: DbPool,
explorer_client_timeout: Duration,
nym_api_client_timeout: Duration,
nyxd_addr: &Url,
refresh_interval: Duration,
) {
let network_defaults = nym_network_defaults::NymNetworkDetails::new_from_env();
loop {
tracing::info!("Refreshing node info...");
if let Err(e) = run(
&db_pool,
&network_defaults,
explorer_client_timeout,
nym_api_client_timeout,
nyxd_addr,
)
.await
{
tracing::error!(
"Monitor run failed: {e}, retrying in {}s...",
FAILURE_RETRY_DELAY.as_secs()
);
// TODO dz implement some sort of backoff
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
} else {
tracing::info!(
"Info successfully collected, sleeping for {}s...",
refresh_interval.as_secs()
);
tokio::time::sleep(refresh_interval).await;
}
}
}
async fn run(
pool: &DbPool,
network_details: &NymNetworkDetails,
explorer_client_timeout: Duration,
nym_api_client_timeout: Duration,
nyxd_addr: &Url,
) -> anyhow::Result<()> {
let default_api_url = network_details
.endpoints
.first()
.expect("rust sdk mainnet default incorrectly configured")
.api_url()
.clone()
.expect("rust sdk mainnet default missing api_url");
let default_explorer_url = network_details.explorer_api.clone().map(|url| {
url.parse()
.expect("rust sdk mainnet default explorer url not parseable")
});
// TODO dz replace explorer api with ipinfo.io
let default_explorer_url =
default_explorer_url.expect("explorer url missing in network config");
let explorer_client =
ExplorerClient::new_with_timeout(default_explorer_url, explorer_client_timeout)?;
let explorer_gateways = explorer_client
.unstable_get_gateways()
.await
.log_error("unstable_get_gateways")?;
let api_client = NymApiClient::new_with_timeout(default_api_url, nym_api_client_timeout);
let all_nodes = api_client
.get_all_described_nodes()
.await
.log_error("get_all_described_nodes")?;
tracing::debug!("Fetched {} total nodes", all_nodes.len());
let gateways = all_nodes
.iter()
.filter(|node| node.description.declared_role.entry)
.collect::<Vec<_>>();
tracing::debug!("Of those, {} gateways", gateways.len());
for gw in gateways.iter() {
tracing::debug!("{}", gw.ed25519_identity_key().to_base58_string());
}
let mixnodes = all_nodes
.iter()
.filter(|node| node.description.declared_role.mixnode)
.collect::<Vec<_>>();
tracing::debug!("Of those, {} mixnodes", mixnodes.len());
log_gw_in_explorer_not_api(explorer_gateways.as_slice(), gateways.as_slice());
let all_skimmed_nodes = api_client
.get_all_basic_nodes(None)
.await
.log_error("get_all_basic_nodes")?;
let mixnodes = api_client
.get_cached_mixnodes()
.await
.log_error("get_cached_mixnodes")?;
tracing::debug!("Fetched {} mixnodes", mixnodes.len());
// let gateways_blacklisted = gateways.iter().filter(|gw|gw.)
let gateways_blacklisted = all_skimmed_nodes
.iter()
.filter_map(|node| {
if node.performance.round_to_integer() <= 50 && node.supported_roles.entry {
Some(node.ed25519_identity_pubkey.to_base58_string())
} else {
None
}
})
.collect::<HashSet<_>>();
// Cached mixnodes don't include blacklisted nodes
// We need that to calculate the total locked tokens later
let mixnodes = api_client
.nym_api
.get_mixnodes_detailed_unfiltered()
.await
.log_error("get_mixnodes_detailed_unfiltered")?;
let mixnodes_described = api_client
.nym_api
.get_mixnodes_described()
.await
.log_error("get_mixnodes_described")?;
let mixnodes_active = api_client
.nym_api
.get_active_mixnodes()
.await
.log_error("get_active_mixnodes")?;
let delegation_program_members =
get_delegation_program_details(network_details, nyxd_addr).await?;
// keep stats for later
let count_bonded_mixnodes = mixnodes.len();
let count_bonded_gateways = gateways.len();
let count_explorer_gateways = explorer_gateways.len();
let count_bonded_mixnodes_active = mixnodes_active.len();
let gateway_records = prepare_gateway_data(
&gateways,
&gateways_blacklisted,
explorer_gateways,
all_skimmed_nodes,
)?;
queries::insert_gateways(pool, gateway_records)
.await
.map(|_| {
tracing::debug!("Gateway info written to DB!");
})?;
// instead of counting blacklisted GWs returned from API cache, count from the active set
let count_gateways_blacklisted = gateways
.iter()
.filter(|gw| {
let gw_identity = gw.ed25519_identity_key().to_base58_string();
gateways_blacklisted.contains(&gw_identity)
})
.count();
if count_gateways_blacklisted > 0 {
queries::write_blacklisted_gateways_to_db(pool, gateways_blacklisted.iter())
.await
.map(|_| {
tracing::debug!(
"Gateway blacklist info written to DB! {} blacklisted by Nym API",
count_gateways_blacklisted
)
})?;
}
let mixnode_records =
prepare_mixnode_data(&mixnodes, mixnodes_described, delegation_program_members)?;
queries::insert_mixnodes(pool, mixnode_records)
.await
.map(|_| {
tracing::debug!("Mixnode info written to DB!");
})?;
let count_mixnodes_blacklisted = mixnodes.iter().filter(|elem| elem.blacklisted).count();
let recently_unbonded_gateways = queries::ensure_gateways_still_bonded(pool, &gateways).await?;
let recently_unbonded_mixnodes = queries::ensure_mixnodes_still_bonded(pool, &mixnodes).await?;
let count_bonded_mixnodes_reserve = 0; // TODO: NymAPI doesn't report the reserve set size
let count_bonded_mixnodes_inactive = count_bonded_mixnodes - count_bonded_mixnodes_active;
let (all_historical_gateways, all_historical_mixnodes) = calculate_stats(pool).await?;
//
// write summary keys and values to table
//
let nodes_summary = vec![
(MIXNODES_BONDED_COUNT, &count_bonded_mixnodes),
(MIXNODES_BONDED_ACTIVE, &count_bonded_mixnodes_active),
(MIXNODES_BONDED_INACTIVE, &count_bonded_mixnodes_inactive),
(MIXNODES_BONDED_RESERVE, &count_bonded_mixnodes_reserve),
(MIXNODES_BLACKLISTED_COUNT, &count_mixnodes_blacklisted),
(GATEWAYS_BONDED_COUNT, &count_bonded_gateways),
(GATEWAYS_EXPLORER_COUNT, &count_explorer_gateways),
(MIXNODES_HISTORICAL_COUNT, &all_historical_mixnodes),
(GATEWAYS_HISTORICAL_COUNT, &all_historical_gateways),
(GATEWAYS_BLACKLISTED_COUNT, &count_gateways_blacklisted),
];
let last_updated = chrono::offset::Utc::now();
let last_updated_utc = last_updated.timestamp().to_string();
let network_summary = NetworkSummary {
mixnodes: mixnode::MixnodeSummary {
bonded: mixnode::MixnodeSummaryBonded {
count: count_bonded_mixnodes.cast_checked()?,
active: count_bonded_mixnodes_active.cast_checked()?,
inactive: count_bonded_mixnodes_inactive.cast_checked()?,
reserve: count_bonded_mixnodes_reserve.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
blacklisted: mixnode::MixnodeSummaryBlacklisted {
count: count_mixnodes_blacklisted.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
historical: mixnode::MixnodeSummaryHistorical {
count: all_historical_mixnodes.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
},
gateways: gateway::GatewaySummary {
bonded: gateway::GatewaySummaryBonded {
count: count_bonded_gateways.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
blacklisted: gateway::GatewaySummaryBlacklisted {
count: count_gateways_blacklisted.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
historical: gateway::GatewaySummaryHistorical {
count: all_historical_gateways.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
explorer: gateway::GatewaySummaryExplorer {
count: count_explorer_gateways.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
},
};
queries::insert_summaries(pool, &nodes_summary, &network_summary, last_updated).await?;
let mut log_lines: Vec<String> = vec![];
for (key, value) in nodes_summary.iter() {
log_lines.push(format!("{} = {}", key, value));
}
log_lines.push(format!(
"recently_unbonded_mixnodes = {}",
recently_unbonded_mixnodes
));
log_lines.push(format!(
"recently_unbonded_gateways = {}",
recently_unbonded_gateways
));
tracing::info!("Directory summary: \n{}", log_lines.join("\n"));
Ok(())
}
fn prepare_gateway_data(
gateways: &[&NymNodeDescription],
gateways_blacklisted: &HashSet<String>,
explorer_gateways: Vec<PrettyDetailedGatewayBond>,
skimmed_gateways: Vec<SkimmedNode>,
) -> anyhow::Result<Vec<GatewayRecord>> {
let mut gateway_records = Vec::new();
for gateway in gateways {
let identity_key = gateway.ed25519_identity_key().to_base58_string();
let bonded = true;
let last_updated_utc = chrono::offset::Utc::now().timestamp();
let blacklisted = gateways_blacklisted.contains(&identity_key);
let self_described = serde_json::to_string(&gateway.description)?;
let explorer_pretty_bond = explorer_gateways
.iter()
.find(|g| g.gateway.identity_key.eq(&identity_key));
let explorer_pretty_bond = explorer_pretty_bond.and_then(|g| serde_json::to_string(g).ok());
let performance = skimmed_gateways
.iter()
.find(|g| {
g.ed25519_identity_pubkey
.to_base58_string()
.eq(&identity_key)
})
.map(|g| g.performance)
.unwrap_or_default()
.round_to_integer();
gateway_records.push(GatewayRecord {
identity_key: identity_key.to_owned(),
bonded,
blacklisted,
self_described,
explorer_pretty_bond,
last_updated_utc,
performance,
});
}
Ok(gateway_records)
}
fn prepare_mixnode_data(
mixnodes: &[MixNodeBondAnnotated],
mixnodes_described: Vec<LegacyDescribedMixNode>,
delegation_program_members: Vec<u32>,
) -> anyhow::Result<Vec<MixnodeRecord>> {
let mut mixnode_records = Vec::new();
for mixnode in mixnodes {
let mix_id = mixnode.mix_id();
let identity_key = mixnode.identity_key();
let bonded = true;
let total_stake = decimal_to_i64(mixnode.mixnode_details.total_stake());
let blacklisted = mixnode.blacklisted;
let node_info = mixnode.mix_node();
let host = node_info.host.clone();
let http_port = node_info.http_api_port;
// Contains all the information including what's above
let full_details = serde_json::to_string(&mixnode)?;
let mixnode_described = mixnodes_described.iter().find(|m| m.bond.mix_id == mix_id);
let self_described = mixnode_described.and_then(|v| serde_json::to_string(v).ok());
let is_dp_delegatee = delegation_program_members.contains(&mix_id);
let last_updated_utc = chrono::offset::Utc::now().timestamp();
mixnode_records.push(MixnodeRecord {
mix_id,
identity_key: identity_key.to_owned(),
bonded,
total_stake,
host,
http_port,
blacklisted,
full_details,
self_described,
last_updated_utc,
is_dp_delegatee,
});
}
Ok(mixnode_records)
}
fn log_gw_in_explorer_not_api(
explorer: &[PrettyDetailedGatewayBond],
api_gateways: &[&NymNodeDescription],
) {
let api_gateways = api_gateways
.iter()
.map(|gw| gw.ed25519_identity_key().to_base58_string())
.collect::<HashSet<_>>();
let explorer_only = explorer
.iter()
.filter(|gw| !api_gateways.contains(&gw.gateway.identity_key.to_string()))
.collect::<Vec<_>>();
tracing::debug!(
"Gateways listed by explorer but not by Nym API: {}",
explorer_only.len()
);
for gw in explorer_only.iter() {
tracing::debug!("{}", gw.gateway.identity_key.to_string());
}
}
// TODO dz is there a common monorepo place this can be put?
pub trait NumericalCheckedCast<T>
where
T: TryFrom<Self>,
<T as TryFrom<Self>>::Error: std::error::Error,
Self: std::fmt::Display + Copy,
{
fn cast_checked(self) -> anyhow::Result<T> {
T::try_from(self).map_err(|e| {
anyhow::anyhow!(
"Couldn't cast {} to {}: {}",
self,
std::any::type_name::<T>(),
e
)
})
}
}
impl<T, U> NumericalCheckedCast<U> for T
where
U: TryFrom<T>,
<U as TryFrom<T>>::Error: std::error::Error,
T: std::fmt::Display + Copy,
{
}
async fn calculate_stats(pool: &DbPool) -> anyhow::Result<(usize, usize)> {
let mut conn = pool.acquire().await?;
let all_historical_gateways = sqlx::query_scalar!(r#"SELECT count(id) FROM gateways"#)
.fetch_one(&mut *conn)
.await?
.cast_checked()?;
let all_historical_mixnodes = sqlx::query_scalar!(r#"SELECT count(id) FROM mixnodes"#)
.fetch_one(&mut *conn)
.await?
.cast_checked()?;
Ok((all_historical_gateways, all_historical_mixnodes))
}
async fn get_delegation_program_details(
network_details: &NymNetworkDetails,
nyxd_addr: &Url,
) -> anyhow::Result<Vec<u32>> {
let config = nym_validator_client::nyxd::Config::try_from_nym_network_details(network_details)?;
let client = NyxdClient::connect(config, nyxd_addr.as_str())
.map_err(|err| anyhow::anyhow!("Couldn't connect: {}", err))?;
let account_id = AccountId::from_str(DELEGATION_PROGRAM_WALLET)
.map_err(|e| anyhow!("Invalid bech32 address: {}", e))?;
let delegations = client.get_all_delegator_delegations(&account_id).await?;
let mix_ids: Vec<u32> = delegations
.iter()
.map(|delegation| delegation.node_id)
.collect();
Ok(mix_ids)
}
fn decimal_to_i64(decimal: Decimal) -> i64 {
// Convert the underlying Uint128 to a u128
let atomics = decimal.atomics().u128();
let precision = 1_000_000_000_000_000_000u128;
// Get the fractional part
let fractional = atomics % precision;
// Get the integer part
let integer = atomics / precision;
// Combine them into a float
let float_value = integer as f64 + (fractional as f64 / 1_000_000_000_000_000_000_f64);
// Limit to 6 decimal places
let rounded_value = (float_value * 1_000_000.0).round() / 1_000_000.0;
rounded_value as i64
}
trait LogError<T, E> {
fn log_error(self, msg: &str) -> Result<T, E>;
}
impl<T, E> LogError<T, E> for anyhow::Result<T, E>
where
E: std::error::Error,
{
fn log_error(self, msg: &str) -> Result<T, E> {
if let Err(e) = &self {
tracing::error!("[{msg}]:\t{e}");
}
self
}
}
+86
View File
@@ -0,0 +1,86 @@
use crate::db::models::GatewayIdentityDto;
use crate::db::DbPool;
use futures_util::TryStreamExt;
use std::time::Duration;
use tracing::instrument;
pub(crate) mod models;
mod queue;
pub(crate) use queue::now_utc;
pub(crate) async fn spawn(pool: DbPool, refresh_interval: Duration) {
tokio::spawn(async move {
loop {
if let Err(e) = refresh_stale_testruns(&pool, refresh_interval).await {
tracing::error!("{e}");
}
if let Err(e) = run(&pool).await {
tracing::error!("Assigning testruns failed: {}", e);
}
tracing::debug!("Sleeping for {}s...", refresh_interval.as_secs());
tokio::time::sleep(refresh_interval).await;
}
});
}
// TODO dz make number of max agents configurable
#[instrument(level = "debug", name = "testrun_queue", skip_all)]
async fn run(pool: &DbPool) -> anyhow::Result<()> {
tracing::info!("Spawning testruns...");
if pool.is_closed() {
tracing::debug!("DB pool closed, returning early");
return Ok(());
}
let mut conn = pool.acquire().await?;
let gateways = sqlx::query_as!(
GatewayIdentityDto,
r#"SELECT
gateway_identity_key as "gateway_identity_key!",
bonded as "bonded: bool"
FROM gateways
ORDER BY last_testrun_utc"#,
)
.fetch(&mut *conn)
.try_collect::<Vec<_>>()
.await?;
// TODO dz this filtering could be done in SQL
let gateways: Vec<GatewayIdentityDto> = gateways.into_iter().filter(|g| g.bonded).collect();
tracing::debug!("Trying to queue {} testruns", gateways.len());
let mut testruns_created = 0;
for gateway in gateways {
if let Err(e) = queue::try_queue_testrun(
&mut conn,
gateway.gateway_identity_key.clone(),
// TODO dz read from config
"127.0.0.1".to_string(),
)
.await
// TODO dz measure how many were actually inserted and how many were skipped
{
tracing::debug!(
"Skipping test for identity {} with error {}",
&gateway.gateway_identity_key,
e
);
} else {
testruns_created += 1;
}
}
tracing::debug!("{} testruns queued in total", testruns_created);
Ok(())
}
#[instrument(level = "debug", skip_all)]
async fn refresh_stale_testruns(pool: &DbPool, refresh_interval: Duration) -> anyhow::Result<()> {
let chrono_duration = chrono::Duration::from_std(refresh_interval)?;
crate::db::queries::testruns::update_testruns_older_than(pool, chrono_duration).await?;
Ok(())
}
+118
View File
@@ -0,0 +1,118 @@
use crate::db::models::{GatewayInfoDto, TestRunDto, TestRunStatus};
use crate::testruns::models::TestRun;
use anyhow::anyhow;
use chrono::DateTime;
use futures_util::TryStreamExt;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use std::time::SystemTime;
pub(crate) async fn try_queue_testrun(
conn: &mut PoolConnection<Sqlite>,
identity_key: String,
ip_address: String,
) -> anyhow::Result<TestRun> {
let timestamp = now_utc().timestamp();
let timestamp_pretty = now_utc_as_rfc3339();
let items = sqlx::query_as!(
GatewayInfoDto,
r#"SELECT
id as "id!",
gateway_identity_key as "gateway_identity_key!",
self_described as "self_described?",
explorer_pretty_bond as "explorer_pretty_bond?"
FROM gateways
WHERE gateway_identity_key = ?
ORDER BY gateway_identity_key
LIMIT 1"#,
identity_key,
)
// TODO dz shoudl call .fetch_one
// TODO dz replace this in other queries as well
.fetch(conn.as_mut())
.try_collect::<Vec<_>>()
.await?;
let gateway = items
.iter()
.find(|g| g.gateway_identity_key == identity_key);
// TODO dz if let Some() = gateway.first() ...
if gateway.is_none() {
return Err(anyhow!("Unknown gateway {identity_key}"));
}
let gateway_id = gateway.unwrap().id;
//
// check if there is already a test run for this gateway
//
let items = sqlx::query_as!(
TestRunDto,
r#"SELECT
id as "id!",
gateway_id as "gateway_id!",
status as "status!",
timestamp_utc as "timestamp_utc!",
ip_address as "ip_address!",
log as "log!"
FROM testruns
WHERE gateway_id = ? AND status != 2
ORDER BY id DESC
LIMIT 1"#,
gateway_id,
)
.fetch(conn.as_mut())
.try_collect::<Vec<_>>()
.await?;
if !items.is_empty() {
let testrun = items.first().unwrap();
return Ok(TestRun {
id: testrun.id as u32,
identity_key,
status: format!(
"{}",
TestRunStatus::from_repr(testrun.status as u8).unwrap()
),
log: testrun.log.clone(),
});
}
//
// save test run
//
let status = TestRunStatus::Queued as u32;
let log = format!(
"Test for {identity_key} requested at {} UTC\n\n",
timestamp_pretty
);
let id = sqlx::query!(
"INSERT INTO testruns (gateway_id, status, ip_address, timestamp_utc, log) VALUES (?, ?, ?, ?, ?)",
gateway_id,
status,
ip_address,
timestamp,
log,
)
.execute(conn.as_mut())
.await?
.last_insert_rowid();
Ok(TestRun {
id: id as u32,
identity_key,
status: format!("{}", TestRunStatus::Queued),
log,
})
}
// TODO dz do we need these?
pub fn now_utc() -> DateTime<chrono::Utc> {
SystemTime::now().into()
}
pub fn now_utc_as_rfc3339() -> String {
now_utc().to_rfc3339()
}