Compare commits

...

10 Commits

Author SHA1 Message Date
dynco-nym 452ce3da8c self described field not null 2024-10-30 17:37:53 +01:00
dynco-nym f6105c9483 Point to internal deps + rebase + v0.1.5 2024-10-30 17:33:31 +01:00
dynco-nym 52004b3e13 Compatible with directory v2 2024-10-30 16:54:04 +01:00
dynco-nym f46332d24a Sqlx offline query data + clippy 2024-10-30 16:54:04 +01:00
dynco-nym 7629c5b3a0 Agent 0.1.4 2024-10-30 16:54:04 +01:00
dynco-nym 4d22ddd75a Agent 0.1.3 2024-10-30 16:54:04 +01:00
dynco-nym e21b37b1a9 Testrun stores gw identity key instead of gw pk 2024-10-30 16:54:04 +01:00
dynco-nym 493aaf7bb4 Better logging on agent 2024-10-30 16:54:04 +01:00
dynco-nym bdecab0205 Clean up stale testruns & logging
- log gw identity key
- better agent testrun logging
- log responses
- change response code for agents
2024-10-30 16:54:04 +01:00
dynco-nym 19d1939fb7 Use unstable explorer client 2024-10-30 16:54:04 +01:00
50 changed files with 1389 additions and 840 deletions
Generated
+338 -726
View File
File diff suppressed because it is too large Load Diff
+1 -2
View File
@@ -2,7 +2,6 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TestrunAssignment {
/// has nothing to do with GW identity key. This is PK from `gateways` table
pub testrun_id: i64,
pub gateway_pk_id: i64,
pub gateway_identity_key: String,
}
+1 -1
View File
@@ -4,7 +4,7 @@
[package]
name = "nym-node-status-agent"
version = "0.1.0"
version = "0.1.4"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
+10 -4
View File
@@ -2,7 +2,11 @@
set -eu
export RUST_LOG=${RUST_LOG:-debug}
environment="qa"
source ../envs/${environment}.env
export RUST_LOG="debug"
crate_root=$(dirname $(realpath "$0"))
gateway_probe_src=$(dirname $(dirname "$crate_root"))/nym-vpn-client/nym-vpn-core
@@ -14,6 +18,8 @@ export NODE_STATUS_AGENT_PROBE_PATH="$crate_root/nym-gateway-probe"
# build & copy over GW probe
function copy_gw_probe() {
pushd $gateway_probe_src
git switch main
git pull
cargo build --release --package nym-gateway-probe
cp target/release/nym-gateway-probe "$crate_root"
$crate_root/nym-gateway-probe --version
@@ -30,8 +36,8 @@ function swarm() {
build_agent
for ((i=1; i<=$workers; i++)); do
../target/release/nym-node-status-agent run-probe &
for ((i = 1; i <= $workers; i++)); do
../target/release/nym-node-status-agent run-probe &
done
wait
@@ -44,6 +50,6 @@ export NODE_STATUS_AGENT_SERVER_PORT="8000"
copy_gw_probe
swarm 30
swarm 8
# cargo run -- run-probe
+4 -8
View File
@@ -31,24 +31,19 @@ pub(crate) enum Command {
/// path of binary to run
#[arg(long, env = "NODE_STATUS_AGENT_PROBE_PATH")]
probe_path: String,
#[arg(short, long, env = "NODE_STATUS_AGENT_GATEWAY_ID")]
gateway_id: Option<String>,
},
}
impl Args {
pub(crate) async fn execute(&self) -> anyhow::Result<()> {
match &self.command {
Command::RunProbe {
probe_path,
gateway_id,
} => self.run_probe(probe_path, gateway_id).await?,
Command::RunProbe { probe_path } => self.run_probe(probe_path).await?,
}
Ok(())
}
async fn run_probe(&self, probe_path: &str, gateway_id: &Option<String>) -> anyhow::Result<()> {
async fn run_probe(&self, probe_path: &str) -> anyhow::Result<()> {
let server_address = format!("{}:{}", &self.server_address, self.server_port);
let probe = GwProbe::new(probe_path.to_string());
@@ -58,7 +53,7 @@ impl Args {
let testrun = request_testrun(&server_address).await?;
let log = probe.run_and_get_log(gateway_id);
let log = probe.run_and_get_log(&Some(testrun.gateway_identity_key));
submit_results(&server_address, testrun.testrun_id, log).await?;
@@ -97,6 +92,7 @@ async fn submit_results(
) -> anyhow::Result<()> {
let target_url = format!("{}/{}/{}", server_addr, URL_BASE, testrun_id);
let client = reqwest::Client::new();
let res = client
.post(target_url)
.body(probe_outcome)
+6
View File
@@ -40,6 +40,12 @@ impl GwProbe {
match command.spawn() {
Ok(child) => {
if let Ok(output) = child.wait_with_output() {
if !output.status.success() {
let out = String::from_utf8_lossy(&output.stdout);
let err = String::from_utf8_lossy(&output.stderr);
tracing::error!("Probe exited with {:?}:\n{}\n{}", output.status, out, err);
}
return String::from_utf8(output.stdout)
.unwrap_or("Unable to get log from test run".to_string());
}
@@ -0,0 +1,20 @@
{
"db_name": "SQLite",
"query": "SELECT\n gateway_identity_key\n FROM\n gateways\n WHERE\n id = ?",
"describe": {
"columns": [
{
"name": "gateway_identity_key",
"ordinal": 0,
"type_info": "Text"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false
]
},
"hash": "06b17d1e5f61201a1b7542896ba55c69cd5c1a7e7d87073c94600c783a0a3984"
}
@@ -0,0 +1,32 @@
{
"db_name": "SQLite",
"query": "SELECT\n key as \"key!\",\n value_json as \"value_json!\",\n last_updated_utc as \"last_updated_utc!\"\n FROM summary",
"describe": {
"columns": [
{
"name": "key!",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "value_json!",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "last_updated_utc!",
"ordinal": 2,
"type_info": "Int64"
}
],
"parameters": {
"Right": 0
},
"nullable": [
true,
true,
false
]
},
"hash": "1327b5118f9144dddbcf8edb11f7dc549cf503409fd6dfedcdc02dbcd61d5454"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE mixnodes\n SET bonded = ?, last_updated_utc = ?\n WHERE id = ?;",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "18abc8fde56cf86baed7b4afa38f2c63cdf90f2f3b6d81afb9000bb0968dcaea"
}
@@ -0,0 +1,26 @@
{
"db_name": "SQLite",
"query": "\n SELECT\n id,\n gateway_identity_key\n FROM gateways\n WHERE id = ?\n LIMIT 1",
"describe": {
"columns": [
{
"name": "id",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "gateway_identity_key",
"ordinal": 1,
"type_info": "Text"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
false
]
},
"hash": "2236299f9f691376db54cbd58ec5ceb89b9925cba46efcf4ed79ef0759a01129"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO testruns (gateway_id, status, ip_address, timestamp_utc, log) VALUES (?, ?, ?, ?, ?)",
"describe": {
"columns": [],
"parameters": {
"Right": 5
},
"nullable": []
},
"hash": "3c584e211d07c511644c8079187965acf3bcfb3f84ba8d24ed645d79976cf784"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE gateways\n SET bonded = ?, last_updated_utc = ?\n WHERE id = ?;",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "3d3a1fa429e3090741c6b6a8e82e692afc04b51e8782bcbf59f1eb4116112536"
}
@@ -0,0 +1,38 @@
{
"db_name": "SQLite",
"query": "SELECT\n id as \"id!\",\n gateway_identity_key as \"gateway_identity_key!\",\n self_described as \"self_described?\",\n explorer_pretty_bond as \"explorer_pretty_bond?\"\n FROM gateways\n WHERE gateway_identity_key = ?\n ORDER BY gateway_identity_key\n LIMIT 1",
"describe": {
"columns": [
{
"name": "id!",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "gateway_identity_key!",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "self_described?",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "explorer_pretty_bond?",
"ordinal": 3,
"type_info": "Text"
}
],
"parameters": {
"Right": 1
},
"nullable": [
true,
false,
true,
true
]
},
"hash": "3d5fc502f976f5081f01352856b8632c29c81bfafb043bb8744129cf9e0266ad"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE testruns SET status = ? WHERE id = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 2
},
"nullable": []
},
"hash": "418944f2eccb838cb3882f34469203c8569f03fdd39ce09d7b74177896e52a8c"
}
@@ -0,0 +1,50 @@
{
"db_name": "SQLite",
"query": "SELECT\n id as \"id!\",\n gateway_id as \"gateway_id!\",\n status as \"status!\",\n timestamp_utc as \"timestamp_utc!\",\n ip_address as \"ip_address!\",\n log as \"log!\"\n FROM testruns\n WHERE gateway_id = ? AND status != 2\n ORDER BY id DESC\n LIMIT 1",
"describe": {
"columns": [
{
"name": "id!",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "gateway_id!",
"ordinal": 1,
"type_info": "Int64"
},
{
"name": "status!",
"ordinal": 2,
"type_info": "Int64"
},
{
"name": "timestamp_utc!",
"ordinal": 3,
"type_info": "Int64"
},
{
"name": "ip_address!",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "log!",
"ordinal": 5,
"type_info": "Text"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
false,
false,
false,
false,
false
]
},
"hash": "46d76bc6d3fba2dae3b21511a36289dd776749dd7a20cda61b0480f2fba60889"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE gateways SET last_probe_log = ? WHERE id = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 2
},
"nullable": []
},
"hash": "4afcc6673890f795c2793f1e2f8570ee787fc7daf00fcb916f18d1cb7d6c8f08"
}
@@ -0,0 +1,32 @@
{
"db_name": "SQLite",
"query": "SELECT\n id as \"id!\",\n gateway_identity_key as \"identity_key!\",\n bonded as \"bonded: bool\"\n FROM gateways\n WHERE bonded = ?",
"describe": {
"columns": [
{
"name": "id!",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "identity_key!",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "bonded: bool",
"ordinal": 2,
"type_info": "Int64"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
false,
false
]
},
"hash": "4b61a4bc32333c92a8f5ad4ad0017b40dc01845f554b5479f37855d89b309e6f"
}
@@ -0,0 +1,20 @@
{
"db_name": "SQLite",
"query": "SELECT count(id) FROM mixnodes",
"describe": {
"columns": [
{
"name": "count(id)",
"ordinal": 0,
"type_info": "Int"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false
]
},
"hash": "670b7ed7d57a6986181b24be24ca667e8cacdf677ccb906415b3fe92be0c436b"
}
@@ -0,0 +1,50 @@
{
"db_name": "SQLite",
"query": "SELECT\n id as \"id!\",\n gateway_id as \"gateway_id!\",\n status as \"status!\",\n timestamp_utc as \"timestamp_utc!\",\n ip_address as \"ip_address!\",\n log as \"log!\"\n FROM testruns\n WHERE\n id = ?\n AND\n status = ?\n ORDER BY timestamp_utc",
"describe": {
"columns": [
{
"name": "id!",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "gateway_id!",
"ordinal": 1,
"type_info": "Int64"
},
{
"name": "status!",
"ordinal": 2,
"type_info": "Int64"
},
{
"name": "timestamp_utc!",
"ordinal": 3,
"type_info": "Int64"
},
{
"name": "ip_address!",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "log!",
"ordinal": 5,
"type_info": "Text"
}
],
"parameters": {
"Right": 2
},
"nullable": [
false,
false,
false,
false,
false,
false
]
},
"hash": "6d7967b831b355d5f2c77950abc56f816956b0824c66a25da611dce688105d36"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO mixnodes\n (mix_id, identity_key, bonded, total_stake,\n host, http_api_port, blacklisted, full_details,\n self_described, last_updated_utc, is_dp_delegatee)\n VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)\n ON CONFLICT(mix_id) DO UPDATE SET\n bonded=excluded.bonded,\n total_stake=excluded.total_stake, host=excluded.host,\n http_api_port=excluded.http_api_port,blacklisted=excluded.blacklisted,\n full_details=excluded.full_details,self_described=excluded.self_described,\n last_updated_utc=excluded.last_updated_utc,\n is_dp_delegatee = excluded.is_dp_delegatee;",
"describe": {
"columns": [],
"parameters": {
"Right": 11
},
"nullable": []
},
"hash": "6eb1a682cf13205cf701590021cdf795147ac3724e89df5b2f24f7215d87dce1"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE gateways SET last_probe_result = ? WHERE id = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 2
},
"nullable": []
},
"hash": "6ef3efde571d46961244cd90420f3de5949a5ff2083453cb879af8a1689efe2f"
}
@@ -0,0 +1,98 @@
{
"db_name": "SQLite",
"query": "SELECT\n gw.gateway_identity_key as \"gateway_identity_key!\",\n gw.bonded as \"bonded: bool\",\n gw.blacklisted as \"blacklisted: bool\",\n gw.performance as \"performance!\",\n gw.self_described as \"self_described?\",\n gw.explorer_pretty_bond as \"explorer_pretty_bond?\",\n gw.last_probe_result as \"last_probe_result?\",\n gw.last_probe_log as \"last_probe_log?\",\n gw.last_testrun_utc as \"last_testrun_utc?\",\n gw.last_updated_utc as \"last_updated_utc!\",\n COALESCE(gd.moniker, \"NA\") as \"moniker!\",\n COALESCE(gd.website, \"NA\") as \"website!\",\n COALESCE(gd.security_contact, \"NA\") as \"security_contact!\",\n COALESCE(gd.details, \"NA\") as \"details!\"\n FROM gateways gw\n LEFT JOIN gateway_description gd\n ON gw.gateway_identity_key = gd.gateway_identity_key\n ORDER BY gw.gateway_identity_key",
"describe": {
"columns": [
{
"name": "gateway_identity_key!",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "bonded: bool",
"ordinal": 1,
"type_info": "Int64"
},
{
"name": "blacklisted: bool",
"ordinal": 2,
"type_info": "Int64"
},
{
"name": "performance!",
"ordinal": 3,
"type_info": "Int64"
},
{
"name": "self_described?",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "explorer_pretty_bond?",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "last_probe_result?",
"ordinal": 6,
"type_info": "Text"
},
{
"name": "last_probe_log?",
"ordinal": 7,
"type_info": "Text"
},
{
"name": "last_testrun_utc?",
"ordinal": 8,
"type_info": "Int64"
},
{
"name": "last_updated_utc!",
"ordinal": 9,
"type_info": "Int64"
},
{
"name": "moniker!",
"ordinal": 10,
"type_info": "Text"
},
{
"name": "website!",
"ordinal": 11,
"type_info": "Text"
},
{
"name": "security_contact!",
"ordinal": 12,
"type_info": "Text"
},
{
"name": "details!",
"ordinal": 13,
"type_info": "Text"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false,
false,
false,
false,
true,
true,
true,
true,
true,
false,
false,
false,
false,
false
]
},
"hash": "71a455c705f9c25d3843ff2fb8629d1320a5eb10797cdb5a435455d22c6aeac1"
}
@@ -0,0 +1,38 @@
{
"db_name": "SQLite",
"query": "SELECT\n id as \"id!\",\n date as \"date!\",\n timestamp_utc as \"timestamp_utc!\",\n value_json as \"value_json!\"\n FROM summary_history\n ORDER BY date DESC\n LIMIT 30",
"describe": {
"columns": [
{
"name": "id!",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "date!",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "timestamp_utc!",
"ordinal": 2,
"type_info": "Int64"
},
{
"name": "value_json!",
"ordinal": 3,
"type_info": "Text"
}
],
"parameters": {
"Right": 0
},
"nullable": [
true,
false,
false,
true
]
},
"hash": "7600823da7ce80b8ffda933608603a2752e28df775d1af8fd943a5fc8d7dc00d"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO summary_history\n (date, timestamp_utc, value_json)\n VALUES (?, ?, ?)\n ON CONFLICT(date) DO UPDATE SET\n timestamp_utc=excluded.timestamp_utc,\n value_json=excluded.value_json;",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "788515c34588aec352773df4b6e6c5e41f3c0bb56a27648b5e25466b8634a578"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE gateways\n SET blacklisted = true\n WHERE gateway_identity_key = ?;",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
},
"hash": "8571faad2f66e08f24acfbfe036d17ca6eb090df7f6d52ef89c5d51564f8b45c"
}
@@ -0,0 +1,20 @@
{
"db_name": "SQLite",
"query": "SELECT count(id) FROM gateways",
"describe": {
"columns": [
{
"name": "count(id)",
"ordinal": 0,
"type_info": "Int"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false
]
},
"hash": "86ff64db477a1d6235179b0b88d86b86d1b9be62336c9eac0eef44987a5451b5"
}
@@ -0,0 +1,26 @@
{
"db_name": "SQLite",
"query": "SELECT\n gateway_identity_key as \"gateway_identity_key!\",\n bonded as \"bonded: bool\"\n FROM gateways\n ORDER BY last_testrun_utc",
"describe": {
"columns": [
{
"name": "gateway_identity_key!",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "bonded: bool",
"ordinal": 1,
"type_info": "Int64"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false,
false
]
},
"hash": "930a41e612b4e964ae214843da190f6c66c14d4267a2cc2ca73354becc2c8bb8"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE gateways SET last_testrun_utc = ?, last_updated_utc = ? WHERE id = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "c214c001acbbf79fa499816f36ec586c4c29c03efb4cf0c40b73a5c76159cf5c"
}
@@ -0,0 +1,44 @@
{
"db_name": "SQLite",
"query": "\n SELECT\n date_utc as \"date_utc!\",\n packets_received as \"total_packets_received!: i64\",\n packets_sent as \"total_packets_sent!: i64\",\n packets_dropped as \"total_packets_dropped!: i64\",\n total_stake as \"total_stake!: i64\"\n FROM (\n SELECT\n date_utc,\n SUM(packets_received) as packets_received,\n SUM(packets_sent) as packets_sent,\n SUM(packets_dropped) as packets_dropped,\n SUM(total_stake) as total_stake\n FROM mixnode_daily_stats\n GROUP BY date_utc\n ORDER BY date_utc DESC\n LIMIT 30\n )\n GROUP BY date_utc\n ORDER BY date_utc\n ",
"describe": {
"columns": [
{
"name": "date_utc!",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "total_packets_received!: i64",
"ordinal": 1,
"type_info": "Int64"
},
{
"name": "total_packets_sent!: i64",
"ordinal": 2,
"type_info": "Int64"
},
{
"name": "total_packets_dropped!: i64",
"ordinal": 3,
"type_info": "Int64"
},
{
"name": "total_stake!: i64",
"ordinal": 4,
"type_info": "Int64"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false,
true,
true,
true,
false
]
},
"hash": "c5e3cd7284b334df5aa979b1627ea1f6dc2aed00cedde25f2be3567e47064351"
}
@@ -0,0 +1,32 @@
{
"db_name": "SQLite",
"query": "SELECT\n id as \"id!\",\n identity_key as \"identity_key!\",\n bonded as \"bonded: bool\"\n FROM mixnodes\n WHERE bonded = ?",
"describe": {
"columns": [
{
"name": "id!",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "identity_key!",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "bonded: bool",
"ordinal": 2,
"type_info": "Int64"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
false,
false
]
},
"hash": "c7ba2621becb9ac4b5dee0ce303dadfcf19095935867a51cbd5b8362d1505fcc"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO gateways\n (gateway_identity_key, bonded, blacklisted,\n self_described, explorer_pretty_bond,\n last_updated_utc, performance)\n VALUES (?, ?, ?, ?, ?, ?, ?)\n ON CONFLICT(gateway_identity_key) DO UPDATE SET\n bonded=excluded.bonded,\n blacklisted=excluded.blacklisted,\n self_described=excluded.self_described,\n explorer_pretty_bond=excluded.explorer_pretty_bond,\n last_updated_utc=excluded.last_updated_utc,\n performance = excluded.performance;",
"describe": {
"columns": [],
"parameters": {
"Right": 7
},
"nullable": []
},
"hash": "d8ea93e781666e6267902170709ee2aa37f6163525bbdce1a4cebef4a285f8d9"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO summary\n (key, value_json, last_updated_utc)\n VALUES (?, ?, ?)\n ON CONFLICT(key) DO UPDATE SET\n value_json=excluded.value_json,\n last_updated_utc=excluded.last_updated_utc;",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "e0c76a959276e3b0f44c720af9c74a5bf4912ee73468e62e7d0d96b1d9074cbe"
}
@@ -0,0 +1,86 @@
{
"db_name": "SQLite",
"query": "SELECT\n mn.mix_id as \"mix_id!\",\n mn.bonded as \"bonded: bool\",\n mn.blacklisted as \"blacklisted: bool\",\n mn.is_dp_delegatee as \"is_dp_delegatee: bool\",\n mn.total_stake as \"total_stake!\",\n mn.full_details as \"full_details!\",\n mn.self_described as \"self_described\",\n mn.last_updated_utc as \"last_updated_utc!\",\n COALESCE(md.moniker, \"NA\") as \"moniker!\",\n COALESCE(md.website, \"NA\") as \"website!\",\n COALESCE(md.security_contact, \"NA\") as \"security_contact!\",\n COALESCE(md.details, \"NA\") as \"details!\"\n FROM mixnodes mn\n LEFT JOIN mixnode_description md ON mn.mix_id = md.mix_id\n ORDER BY mn.mix_id",
"describe": {
"columns": [
{
"name": "mix_id!",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "bonded: bool",
"ordinal": 1,
"type_info": "Int64"
},
{
"name": "blacklisted: bool",
"ordinal": 2,
"type_info": "Int64"
},
{
"name": "is_dp_delegatee: bool",
"ordinal": 3,
"type_info": "Int64"
},
{
"name": "total_stake!",
"ordinal": 4,
"type_info": "Int64"
},
{
"name": "full_details!",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "self_described",
"ordinal": 6,
"type_info": "Text"
},
{
"name": "last_updated_utc!",
"ordinal": 7,
"type_info": "Int64"
},
{
"name": "moniker!",
"ordinal": 8,
"type_info": "Text"
},
{
"name": "website!",
"ordinal": 9,
"type_info": "Text"
},
{
"name": "security_contact!",
"ordinal": 10,
"type_info": "Text"
},
{
"name": "details!",
"ordinal": 11,
"type_info": "Text"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false,
false,
false,
false,
false,
true,
true,
false,
false,
false,
false,
false
]
},
"hash": "f0a4316081d1be9444a87b95d933d31cb4bcc4071d31d8d2f7755e2d2c2e3e35"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE\n testruns\n SET\n status = ?\n WHERE\n status = ?\n AND\n timestamp_utc < ?\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "f5048d9926a5f5329f7f3b96d43b925e033ceec4f8112258feb4ac9e96fc5924"
}
@@ -0,0 +1,26 @@
{
"db_name": "SQLite",
"query": "UPDATE testruns\n SET status = ?\n WHERE rowid =\n (\n SELECT rowid\n FROM testruns\n WHERE status = ?\n ORDER BY timestamp_utc asc\n LIMIT 1\n )\n RETURNING\n id as \"id!\",\n gateway_id\n ",
"describe": {
"columns": [
{
"name": "id!",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "gateway_id",
"ordinal": 1,
"type_info": "Int64"
}
],
"parameters": {
"Right": 2
},
"nullable": [
true,
false
]
},
"hash": "ff9334ba7b670b218b2f9100e9ab5d2f2d08b2e53203aab9f07ea9b52acbd407"
}
+4 -8
View File
@@ -3,7 +3,7 @@
[package]
name = "nym-node-status-api"
version = "0.1.0"
version = "0.1.5"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
@@ -24,14 +24,10 @@ moka = { workspace = true, features = ["future"] }
nym-bin-common = { path = "../common/bin-common", features = ["models"]}
nym-common-models = { path = "../common/models" }
nym-explorer-client = { path = "../explorer-api/explorer-client" }
# TODO dz: before Nym API client breaking changes. Update to latest develop once new Nym API is live
nym-network-defaults = { git = "https://github.com/nymtech/nym", branch = "pre-dir-v2-fork" }
nym-validator-client = { git = "https://github.com/nymtech/nym", branch = "pre-dir-v2-fork" }
# nym-network-defaults = { path = "../common/network-defaults" }
# nym-validator-client = { path = "../common/client-libs/validator-client" }
nym-network-defaults = { path = "../common/network-defaults" }
nym-validator-client = { path = "../common/client-libs/validator-client" }
nym-task = { path = "../common/task" }
nym-node-requests = { git = "https://github.com/nymtech/nym", branch = "pre-dir-v2-fork" }
# nym-node-requests = { path = "../nym-node/nym-node-requests", features = ["openapi"] }
nym-node-requests = { path = "../nym-node/nym-node-requests", features = ["openapi"] }
regex = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true, features = ["derive"] }
-6
View File
@@ -25,7 +25,6 @@ async fn main() -> Result<()> {
// not a valid windows path... but hey, it works...
println!("cargo::rustc-env=DATABASE_URL=sqlite:///{}", &database_path);
rerun_if_changed();
Ok(())
}
@@ -33,11 +32,6 @@ fn read_env_var(var: &str) -> Result<String> {
std::env::var(var).map_err(|_| anyhow!("You need to set {} env var", var))
}
fn rerun_if_changed() {
println!("cargo::rerun-if-changed=migrations");
println!("cargo::rerun-if-changed=src/db/queries");
}
/// use `./enter_db.sh` to inspect DB
async fn write_db_path_to_file(out_dir: &str, db_filename: &str) -> anyhow::Result<()> {
let mut file = File::create("enter_db.sh").await?;
@@ -6,8 +6,9 @@ export RUST_LOG=${RUST_LOG:-debug}
export NYM_API_CLIENT_TIMEOUT=60
export EXPLORER_CLIENT_TIMEOUT=60
export NODE_STATUS_API_TESTRUN_REFRESH_INTERVAL=60
export ENVIRONMENT="mainnet.env"
export ENVIRONMENT="qa.env"
function run_bare() {
# export necessary env vars
+2 -2
View File
@@ -2,7 +2,7 @@ CREATE TABLE gateways
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
gateway_identity_key VARCHAR NOT NULL UNIQUE,
self_described VARCHAR,
self_described VARCHAR NOT NULL,
explorer_pretty_bond VARCHAR,
last_probe_result VARCHAR,
last_probe_log VARCHAR,
@@ -103,7 +103,7 @@ CREATE TABLE
CREATE TABLE testruns
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
gateway_id INTEGER,
gateway_id INTEGER NOT NULL,
status INTEGER NOT NULL, -- 0=pending, 1=in-progress, 2=complete
timestamp_utc INTEGER NOT NULL,
ip_address VARCHAR NOT NULL,
+3 -2
View File
@@ -11,7 +11,7 @@ pub(crate) struct GatewayRecord {
pub(crate) identity_key: String,
pub(crate) bonded: bool,
pub(crate) blacklisted: bool,
pub(crate) self_described: Option<String>,
pub(crate) self_described: String,
pub(crate) explorer_pretty_bond: Option<String>,
pub(crate) last_updated_utc: i64,
pub(crate) performance: u8,
@@ -300,6 +300,7 @@ pub(crate) mod gateway {
}
}
#[allow(dead_code)] // not dead code, this is SQL data model
#[derive(Debug, Clone)]
pub struct TestRunDto {
pub id: i64,
@@ -315,7 +316,7 @@ pub struct TestRunDto {
pub(crate) enum TestRunStatus {
Complete = 2,
InProgress = 1,
Pending = 0,
Queued = 0,
}
#[derive(Debug, Clone)]
+23 -3
View File
@@ -6,9 +6,29 @@ use crate::{
http::models::Gateway,
};
use futures_util::TryStreamExt;
use nym_validator_client::models::DescribedGateway;
use nym_validator_client::models::NymNodeDescription;
use sqlx::{pool::PoolConnection, Sqlite};
use tracing::error;
pub(crate) async fn select_gateway_identity(
conn: &mut PoolConnection<Sqlite>,
gateway_pk: i64,
) -> anyhow::Result<String> {
let record = sqlx::query!(
r#"SELECT
gateway_identity_key
FROM
gateways
WHERE
id = ?"#,
gateway_pk
)
.fetch_one(conn.as_mut())
.await?;
Ok(record.gateway_identity_key)
}
pub(crate) async fn insert_gateways(
pool: &DbPool,
gateways: Vec<GatewayRecord>,
@@ -68,13 +88,13 @@ where
/// Ensure all gateways that are set as bonded, are still bonded
pub(crate) async fn ensure_gateways_still_bonded(
pool: &DbPool,
gateways: &[DescribedGateway],
gateways: &[&NymNodeDescription],
) -> anyhow::Result<usize> {
let bonded_gateways_rows = get_all_bonded_gateways_row_ids_by_status(pool, true).await?;
let unbonded_gateways_rows = bonded_gateways_rows.iter().filter(|v| {
!gateways
.iter()
.any(|bonded| *bonded.bond.identity() == v.identity_key)
.any(|bonded| *bonded.ed25519_identity_key().to_base58_string() == v.identity_key)
});
let recently_unbonded_gateways = unbonded_gateways_rows.to_owned().count();
+1 -1
View File
@@ -5,7 +5,7 @@ mod summary;
pub(crate) mod testruns;
pub(crate) use gateways::{
ensure_gateways_still_bonded, get_all_gateways, insert_gateways,
ensure_gateways_still_bonded, get_all_gateways, insert_gateways, select_gateway_identity,
write_blacklisted_gateways_to_db,
};
pub(crate) use misc::insert_summaries;
+71 -13
View File
@@ -1,12 +1,14 @@
use crate::db::DbPool;
use crate::http::models::TestrunAssignment;
use crate::{
db::models::{TestRunDto, TestRunStatus},
testruns::now_utc,
};
use anyhow::Context;
use chrono::Duration;
use sqlx::{pool::PoolConnection, Sqlite};
pub(crate) async fn get_testrun_by_id(
pub(crate) async fn get_in_progress_testrun_by_id(
conn: &mut PoolConnection<Sqlite>,
testrun_id: i64,
) -> anyhow::Result<TestRunDto> {
@@ -20,22 +22,58 @@ pub(crate) async fn get_testrun_by_id(
ip_address as "ip_address!",
log as "log!"
FROM testruns
WHERE id = ?
WHERE
id = ?
AND
status = ?
ORDER BY timestamp_utc"#,
testrun_id
testrun_id,
TestRunStatus::InProgress as i64,
)
.fetch_one(conn.as_mut())
.await
.context(format!("Couldn't retrieve testrun {testrun_id}"))
}
pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> anyhow::Result<u64> {
let mut conn = db.acquire().await?;
let previous_run = now_utc() - age;
let cutoff_timestamp = previous_run.timestamp();
let res = sqlx::query!(
r#"UPDATE
testruns
SET
status = ?
WHERE
status = ?
AND
timestamp_utc < ?
"#,
TestRunStatus::Queued as i64,
TestRunStatus::InProgress as i64,
cutoff_timestamp
)
.execute(conn.as_mut())
.await?;
let stale_testruns = res.rows_affected();
if stale_testruns > 0 {
tracing::debug!(
"Refreshed {} stale testruns, scheduled before {} but not yet finished",
stale_testruns,
previous_run
);
}
Ok(stale_testruns)
}
pub(crate) async fn get_oldest_testrun_and_make_it_pending(
// TODO dz accept mut reference, repeat in all similar functions
conn: PoolConnection<Sqlite>,
conn: &mut PoolConnection<Sqlite>,
) -> anyhow::Result<Option<TestrunAssignment>> {
let mut conn = conn;
let assignment = sqlx::query_as!(
TestrunAssignment,
// find & mark as "In progress" in the same transaction to avoid race conditions
let returning = sqlx::query!(
r#"UPDATE testruns
SET status = ?
WHERE rowid =
@@ -47,16 +85,36 @@ pub(crate) async fn get_oldest_testrun_and_make_it_pending(
LIMIT 1
)
RETURNING
id as "testrun_id!",
gateway_id as "gateway_pk_id!"
id as "id!",
gateway_id
"#,
TestRunStatus::InProgress as i64,
TestRunStatus::Pending as i64,
TestRunStatus::Queued as i64,
)
.fetch_optional(&mut *conn)
.fetch_optional(conn.as_mut())
.await?;
Ok(assignment)
if let Some(testrun) = returning {
let gw_identity = sqlx::query!(
r#"
SELECT
id,
gateway_identity_key
FROM gateways
WHERE id = ?
LIMIT 1"#,
testrun.gateway_id
)
.fetch_one(conn.as_mut())
.await?;
Ok(Some(TestrunAssignment {
testrun_id: testrun.id,
gateway_identity_key: gw_identity.gateway_identity_key,
}))
} else {
Ok(None)
}
}
pub(crate) async fn update_testrun_status(
+8 -2
View File
@@ -1,7 +1,10 @@
use anyhow::anyhow;
use axum::{response::Redirect, Router};
use tokio::net::ToSocketAddrs;
use tower_http::{cors::CorsLayer, trace::TraceLayer};
use tower_http::{
cors::CorsLayer,
trace::{DefaultOnResponse, TraceLayer},
};
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
@@ -58,7 +61,10 @@ impl RouterBuilder {
// CORS layer needs to wrap other API layers
.layer(setup_cors())
// logger should be outermost layer
.layer(TraceLayer::new_for_http())
.layer(
TraceLayer::new_for_http()
.on_response(DefaultOnResponse::new().level(tracing::Level::DEBUG)),
)
}
}
+25 -16
View File
@@ -1,3 +1,4 @@
use axum::extract::DefaultBodyLimit;
use axum::Json;
use axum::{
extract::{Path, State},
@@ -23,31 +24,32 @@ pub(crate) fn routes() -> Router<AppState> {
Router::new()
.route("/", axum::routing::get(request_testrun))
.route("/:testrun_id", axum::routing::post(submit_testrun))
.layer(DefaultBodyLimit::max(1024 * 1024 * 5))
}
#[tracing::instrument(level = "debug", skip_all)]
async fn request_testrun(State(state): State<AppState>) -> HttpResult<Json<TestrunAssignment>> {
// TODO dz log agent's key
// TODO dz log agent's network probe version
tracing::debug!("Agent X requested testrun");
tracing::debug!("Agent requested testrun");
let db = state.db_pool();
let conn = db
let mut conn = db
.acquire()
.await
.map_err(HttpError::internal_with_logging)?;
return match db::queries::testruns::get_oldest_testrun_and_make_it_pending(conn).await {
return match db::queries::testruns::get_oldest_testrun_and_make_it_pending(&mut conn).await {
Ok(res) => {
if let Some(testrun) = res {
// TODO dz consider adding a column to testruns table with agent's public key
tracing::debug!(
"🏃‍ Assigned testrun row_id {} to agent X",
&testrun.testrun_id
"🏃‍ Assigned testrun row_id {} gateway {} to agent",
&testrun.testrun_id,
testrun.gateway_identity_key
);
Ok(Json(testrun))
} else {
Err(HttpError::not_found("No testruns available"))
Err(HttpError::no_available_testruns())
}
}
Err(err) => Err(HttpError::internal_with_logging(err)),
@@ -61,25 +63,32 @@ async fn submit_testrun(
State(state): State<AppState>,
body: String,
) -> HttpResult<StatusCode> {
tracing::debug!(
"Agent submitted testrun {}. Total length: {}",
testrun_id,
body.len(),
);
// TODO dz store testrun results
let db = state.db_pool();
let mut conn = db
.acquire()
.await
.map_err(HttpError::internal_with_logging)?;
let testrun = queries::testruns::get_testrun_by_id(&mut conn, testrun_id)
let testrun = queries::testruns::get_in_progress_testrun_by_id(&mut conn, testrun_id)
.await
.map_err(|e| {
tracing::error!("{e}");
HttpError::not_found(testrun_id)
})?;
let gw_identity = db::queries::select_gateway_identity(&mut conn, testrun.gateway_id)
.await
.map_err(|_| {
// should never happen:
HttpError::internal_with_logging("No gateway found for testrun")
})?;
tracing::debug!(
"Agent submitted testrun {} for gateway {} ({} bytes)",
testrun_id,
gw_identity,
body.len(),
);
// TODO dz this should be part of a single transaction: commit after everything is done
queries::testruns::update_testrun_status(&mut conn, testrun_id, TestRunStatus::Complete)
.await
@@ -99,7 +108,7 @@ async fn submit_testrun(
tracing::info!(
"✅ Testrun row_id {} for gateway {} complete",
testrun.id,
testrun.gateway_id
gw_identity
);
Ok(StatusCode::CREATED)
+8 -2
View File
@@ -8,9 +8,9 @@ pub(crate) struct HttpError {
}
impl HttpError {
pub(crate) fn invalid_input(message: String) -> Self {
pub(crate) fn invalid_input(msg: impl Display) -> Self {
Self {
message,
message: serde_json::json!({"message": msg.to_string()}).to_string(),
status: axum::http::StatusCode::BAD_REQUEST,
}
}
@@ -27,6 +27,12 @@ impl HttpError {
}
}
pub(crate) fn no_available_testruns() -> Self {
Self {
message: serde_json::json!({"message": "No available testruns"}).to_string(),
status: axum::http::StatusCode::SERVICE_UNAVAILABLE,
}
}
pub(crate) fn not_found(msg: impl Display) -> Self {
Self {
message: serde_json::json!({"message": msg.to_string()}).to_string(),
-10
View File
@@ -1,4 +1,3 @@
use crate::db::models::TestRunDto;
use nym_node_requests::api::v1::node::models::NodeDescription;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
@@ -75,12 +74,3 @@ pub(crate) struct SummaryHistory {
pub value_json: serde_json::Value,
pub timestamp_utc: String,
}
impl From<TestRunDto> for TestrunAssignment {
fn from(value: TestRunDto) -> Self {
Self {
gateway_pk_id: value.gateway_id,
testrun_id: value.id,
}
}
}
+73 -28
View File
@@ -10,7 +10,9 @@ use cosmwasm_std::Decimal;
use nym_explorer_client::{ExplorerClient, PrettyDetailedGatewayBond};
use nym_network_defaults::NymNetworkDetails;
use nym_validator_client::client::NymApiClientExt;
use nym_validator_client::models::{DescribedGateway, DescribedMixNode, MixNodeBondAnnotated};
use nym_validator_client::models::{
LegacyDescribedMixNode, MixNodeBondAnnotated, NymNodeDescription,
};
use nym_validator_client::nym_nodes::SkimmedNode;
use nym_validator_client::nyxd::contract_traits::PagedMixnetQueryClient;
use nym_validator_client::nyxd::{AccountId, NyxdClient};
@@ -91,20 +93,39 @@ async fn run(
let explorer_client =
ExplorerClient::new_with_timeout(default_explorer_url, explorer_client_timeout)?;
let explorer_gateways = explorer_client
.get_gateways()
.unstable_get_gateways()
.await
.log_error("get_gateways")?;
.log_error("unstable_get_gateways")?;
let api_client = NymApiClient::new_with_timeout(default_api_url, nym_api_client_timeout);
let gateways = api_client
.get_cached_described_gateways()
let all_nodes = api_client
.get_all_described_nodes()
.await
.log_error("get_described_gateways")?;
tracing::debug!("Fetched {} gateways", gateways.len());
let skimmed_gateways = api_client
.get_basic_gateways(None)
.log_error("get_all_described_nodes")?;
tracing::debug!("Fetched {} total nodes", all_nodes.len());
let gateways = all_nodes
.iter()
.filter(|node| node.description.declared_role.entry)
.collect::<Vec<_>>();
tracing::debug!("Of those, {} gateways", gateways.len());
for gw in gateways.iter() {
tracing::debug!("{}", gw.ed25519_identity_key().to_base58_string());
}
let mixnodes = all_nodes
.iter()
.filter(|node| node.description.declared_role.mixnode)
.collect::<Vec<_>>();
tracing::debug!("Of those, {} mixnodes", mixnodes.len());
log_gw_in_explorer_not_api(explorer_gateways.as_slice(), gateways.as_slice());
let all_skimmed_nodes = api_client
.get_all_basic_nodes(None)
.await
.log_error("get_basic_gateways")?;
.log_error("get_all_basic_nodes")?;
let mixnodes = api_client
.get_cached_mixnodes()
@@ -112,11 +133,12 @@ async fn run(
.log_error("get_cached_mixnodes")?;
tracing::debug!("Fetched {} mixnodes", mixnodes.len());
let gateways_blacklisted = skimmed_gateways
// let gateways_blacklisted = gateways.iter().filter(|gw|gw.)
let gateways_blacklisted = all_skimmed_nodes
.iter()
.filter_map(|gw| {
if gw.performance.round_to_integer() <= 50 {
Some(gw.ed25519_identity_pubkey.to_owned())
.filter_map(|node| {
if node.performance.round_to_integer() <= 50 && node.supported_roles.entry {
Some(node.ed25519_identity_pubkey.to_base58_string())
} else {
None
}
@@ -153,7 +175,7 @@ async fn run(
&gateways,
&gateways_blacklisted,
explorer_gateways,
skimmed_gateways,
all_skimmed_nodes,
)?;
queries::insert_gateways(pool, gateway_records)
.await
@@ -165,8 +187,8 @@ async fn run(
let count_gateways_blacklisted = gateways
.iter()
.filter(|gw| {
let gw_identity = gw.bond.identity();
gateways_blacklisted.contains(gw_identity)
let gw_identity = gw.ed25519_identity_key().to_base58_string();
gateways_blacklisted.contains(&gw_identity)
})
.count();
@@ -277,7 +299,7 @@ async fn run(
}
fn prepare_gateway_data(
gateways: &[DescribedGateway],
gateways: &[&NymNodeDescription],
gateways_blacklisted: &HashSet<String>,
explorer_gateways: Vec<PrettyDetailedGatewayBond>,
skimmed_gateways: Vec<SkimmedNode>,
@@ -285,24 +307,25 @@ fn prepare_gateway_data(
let mut gateway_records = Vec::new();
for gateway in gateways {
let identity_key = gateway.bond.identity();
let identity_key = gateway.ed25519_identity_key().to_base58_string();
let bonded = true;
let last_updated_utc = chrono::offset::Utc::now().timestamp();
let blacklisted = gateways_blacklisted.contains(identity_key);
let blacklisted = gateways_blacklisted.contains(&identity_key);
let self_described = gateway
.self_described
.as_ref()
.and_then(|v| serde_json::to_string(&v).ok());
let self_described = serde_json::to_string(&gateway.description)?;
let explorer_pretty_bond = explorer_gateways
.iter()
.find(|g| g.gateway.identity_key.eq(identity_key));
.find(|g| g.gateway.identity_key.eq(&identity_key));
let explorer_pretty_bond = explorer_pretty_bond.and_then(|g| serde_json::to_string(g).ok());
let performance = skimmed_gateways
.iter()
.find(|g| g.ed25519_identity_pubkey.eq(identity_key))
.find(|g| {
g.ed25519_identity_pubkey
.to_base58_string()
.eq(&identity_key)
})
.map(|g| g.performance)
.unwrap_or_default()
.round_to_integer();
@@ -323,7 +346,7 @@ fn prepare_gateway_data(
fn prepare_mixnode_data(
mixnodes: &[MixNodeBondAnnotated],
mixnodes_described: Vec<DescribedMixNode>,
mixnodes_described: Vec<LegacyDescribedMixNode>,
delegation_program_members: Vec<u32>,
) -> anyhow::Result<Vec<MixnodeRecord>> {
let mut mixnode_records = Vec::new();
@@ -364,6 +387,28 @@ fn prepare_mixnode_data(
Ok(mixnode_records)
}
fn log_gw_in_explorer_not_api(
explorer: &[PrettyDetailedGatewayBond],
api_gateways: &[&NymNodeDescription],
) {
let api_gateways = api_gateways
.iter()
.map(|gw| gw.ed25519_identity_key().to_base58_string())
.collect::<HashSet<_>>();
let explorer_only = explorer
.iter()
.filter(|gw| !api_gateways.contains(&gw.gateway.identity_key.to_string()))
.collect::<Vec<_>>();
tracing::debug!(
"Gateways listed by explorer but not by Nym API: {}",
explorer_only.len()
);
for gw in explorer_only.iter() {
tracing::debug!("{}", gw.gateway.identity_key.to_string());
}
}
// TODO dz is there a common monorepo place this can be put?
pub trait NumericalCheckedCast<T>
where
@@ -423,7 +468,7 @@ async fn get_delegation_program_details(
let mix_ids: Vec<u32> = delegations
.iter()
.map(|delegation| delegation.mix_id)
.map(|delegation| delegation.node_id)
.collect();
Ok(mix_ids)
+13 -3
View File
@@ -11,10 +11,12 @@ pub(crate) use queue::now_utc;
pub(crate) async fn spawn(pool: DbPool, refresh_interval: Duration) {
tokio::spawn(async move {
loop {
tracing::info!("Spawning testruns...");
if let Err(e) = refresh_stale_testruns(&pool, refresh_interval).await {
tracing::error!("{e}");
}
if let Err(e) = run(&pool).await {
tracing::error!("Cron job failed: {}", e);
tracing::error!("Assigning testruns failed: {}", e);
}
tracing::debug!("Sleeping for {}s...", refresh_interval.as_secs());
tokio::time::sleep(refresh_interval).await;
@@ -24,9 +26,9 @@ pub(crate) async fn spawn(pool: DbPool, refresh_interval: Duration) {
// TODO dz make number of max agents configurable
// TODO dz periodically clean up stale pending testruns
#[instrument(level = "debug", name = "testrun_queue", skip_all)]
async fn run(pool: &DbPool) -> anyhow::Result<()> {
tracing::info!("Spawning testruns...");
if pool.is_closed() {
tracing::debug!("DB pool closed, returning early");
return Ok(());
@@ -74,3 +76,11 @@ async fn run(pool: &DbPool) -> anyhow::Result<()> {
Ok(())
}
#[instrument(level = "debug", skip_all)]
async fn refresh_stale_testruns(pool: &DbPool, refresh_interval: Duration) -> anyhow::Result<()> {
let chrono_duration = chrono::Duration::from_std(refresh_interval)?;
crate::db::queries::testruns::update_testruns_older_than(pool, chrono_duration).await?;
Ok(())
}
+2 -2
View File
@@ -82,7 +82,7 @@ pub(crate) async fn try_queue_testrun(
//
// save test run
//
let status = TestRunStatus::Pending as u32;
let status = TestRunStatus::Queued as u32;
let log = format!(
"Test for {identity_key} requested at {} UTC\n\n",
timestamp_pretty
@@ -103,7 +103,7 @@ pub(crate) async fn try_queue_testrun(
Ok(TestRun {
id: id as u32,
identity_key,
status: format!("{}", TestRunStatus::Pending),
status: format!("{}", TestRunStatus::Queued),
log,
})
}