Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| bb23785d41 |
@@ -90,6 +90,12 @@ pub(crate) struct Cli {
|
||||
#[arg(value_delimiter = ',')]
|
||||
pub(crate) agent_key_list: Vec<String>,
|
||||
|
||||
#[clap(long, env = "NODE_STATUS_API_AGENT_REGION_MAP")]
|
||||
pub(crate) agent_region_map: Option<String>,
|
||||
|
||||
#[clap(long, env = "NODE_STATUS_API_REGION_CENTROIDS")]
|
||||
pub(crate) region_centroids: Option<String>,
|
||||
|
||||
#[clap(long, default_value = "120s", env = "AGENT_REQUEST_FRESHNESS")]
|
||||
#[arg(value_parser = parse_duration_humantime)]
|
||||
pub(crate) agent_request_freshness: time::Duration,
|
||||
|
||||
@@ -145,6 +145,87 @@ pub(crate) async fn assign_oldest_testrun(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn assign_nearest_testrun(
|
||||
conn: &mut DbConnection,
|
||||
agent_lat: f64,
|
||||
agent_lon: f64,
|
||||
) -> anyhow::Result<Option<TestrunAssignment>> {
|
||||
let now = now_utc().unix_timestamp();
|
||||
// We rank queued testruns by distance between agent centroid and gateway coordinates.
|
||||
// Coordinates are read from explorer_pretty_bond.location.{latitude,longitude}.
|
||||
// Missing or malformed gateway coordinates pushes that node into FIFO fallback ordering by created_utc.
|
||||
let returning = sqlx::query!(
|
||||
r#"
|
||||
WITH ranked_queued AS (
|
||||
SELECT
|
||||
t.id,
|
||||
t.gateway_id,
|
||||
t.created_utc,
|
||||
CASE
|
||||
WHEN g.explorer_pretty_bond IS NULL THEN 1e12::double precision
|
||||
WHEN ((g.explorer_pretty_bond::jsonb -> 'location' ->> 'latitude') ~ '^-?[0-9]+(\.[0-9]+)?$')
|
||||
AND ((g.explorer_pretty_bond::jsonb -> 'location' ->> 'longitude') ~ '^-?[0-9]+(\.[0-9]+)?$')
|
||||
AND (((g.explorer_pretty_bond::jsonb -> 'location' ->> 'latitude')::double precision) BETWEEN -90.0 AND 90.0)
|
||||
AND (((g.explorer_pretty_bond::jsonb -> 'location' ->> 'longitude')::double precision) BETWEEN -180.0 AND 180.0)
|
||||
THEN 6371.0 * 2.0 * ASIN(
|
||||
LEAST(1.0, SQRT(
|
||||
POWER(SIN(RADIANS((((g.explorer_pretty_bond::jsonb -> 'location' ->> 'latitude')::double precision) - $2) / 2.0)), 2)
|
||||
+ COS(RADIANS($2)) * COS(RADIANS((g.explorer_pretty_bond::jsonb -> 'location' ->> 'latitude')::double precision))
|
||||
* POWER(SIN(RADIANS((((g.explorer_pretty_bond::jsonb -> 'location' ->> 'longitude')::double precision) - $3) / 2.0)), 2)
|
||||
))
|
||||
)
|
||||
ELSE 1e12::double precision
|
||||
END AS distance
|
||||
FROM testruns t
|
||||
JOIN gateways g ON g.id = t.gateway_id
|
||||
WHERE t.status = $1
|
||||
ORDER BY distance ASC, t.created_utc ASC
|
||||
LIMIT 1
|
||||
FOR UPDATE OF t SKIP LOCKED
|
||||
)
|
||||
UPDATE testruns
|
||||
SET
|
||||
status = $4,
|
||||
last_assigned_utc = $5
|
||||
FROM ranked_queued
|
||||
WHERE testruns.id = ranked_queued.id
|
||||
RETURNING
|
||||
testruns.id,
|
||||
testruns.gateway_id
|
||||
"#,
|
||||
TestRunStatus::Queued as i32,
|
||||
agent_lat,
|
||||
agent_lon,
|
||||
TestRunStatus::InProgress as i32,
|
||||
now,
|
||||
)
|
||||
.fetch_optional(conn.as_mut())
|
||||
.await?;
|
||||
|
||||
if let Some(testrun) = returning {
|
||||
let gw_identity = sqlx::query!(
|
||||
r#"
|
||||
SELECT
|
||||
id,
|
||||
gateway_identity_key
|
||||
FROM gateways
|
||||
WHERE id = $1
|
||||
LIMIT 1"#,
|
||||
testrun.gateway_id
|
||||
)
|
||||
.fetch_one(conn.as_mut())
|
||||
.await?;
|
||||
|
||||
Ok(Some(TestrunAssignment {
|
||||
testrun_id: testrun.id,
|
||||
gateway_identity_key: gw_identity.gateway_identity_key,
|
||||
assigned_at_utc: now,
|
||||
}))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn update_testrun_status(
|
||||
conn: &mut DbConnection,
|
||||
testrun_id: i32,
|
||||
@@ -288,3 +369,83 @@ pub(crate) async fn update_testrun_status_by_gateway(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use serde_json::Value;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Candidate {
|
||||
created_utc: i64,
|
||||
explorer_pretty_bond: Option<String>,
|
||||
}
|
||||
|
||||
fn distance_or_fallback_km(
|
||||
explorer_pretty_bond: Option<&str>,
|
||||
agent_lat: f64,
|
||||
agent_lon: f64,
|
||||
) -> f64 {
|
||||
let Some(raw) = explorer_pretty_bond else {
|
||||
return 1e12;
|
||||
};
|
||||
let Ok(value) = serde_json::from_str::<Value>(raw) else {
|
||||
return 1e12;
|
||||
};
|
||||
let Some(location) = value.get("location") else {
|
||||
return 1e12;
|
||||
};
|
||||
let Some(lat) = location.get("latitude").and_then(Value::as_f64) else {
|
||||
return 1e12;
|
||||
};
|
||||
let Some(lon) = location.get("longitude").and_then(Value::as_f64) else {
|
||||
return 1e12;
|
||||
};
|
||||
|
||||
let dlat = (lat - agent_lat).to_radians();
|
||||
let dlon = (lon - agent_lon).to_radians();
|
||||
let a = (dlat / 2.0).sin().powi(2)
|
||||
+ agent_lat.to_radians().cos() * lat.to_radians().cos() * (dlon / 2.0).sin().powi(2);
|
||||
6371.0 * 2.0 * a.sqrt().asin()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn nearest_assignment_falls_back_behind_valid_geo_when_geo_missing() {
|
||||
let agent_lat = 50.1109;
|
||||
let agent_lon = 8.6821;
|
||||
let mut candidates = vec![
|
||||
Candidate {
|
||||
created_utc: 1,
|
||||
explorer_pretty_bond: None,
|
||||
},
|
||||
Candidate {
|
||||
created_utc: 2,
|
||||
explorer_pretty_bond: Some(
|
||||
r#"{"location":{"latitude":50.1109,"longitude":8.6821}}"#.to_string(),
|
||||
),
|
||||
},
|
||||
Candidate {
|
||||
created_utc: 0,
|
||||
explorer_pretty_bond: None,
|
||||
},
|
||||
];
|
||||
|
||||
candidates.sort_by(|a, b| {
|
||||
let da =
|
||||
distance_or_fallback_km(a.explorer_pretty_bond.as_deref(), agent_lat, agent_lon);
|
||||
let db =
|
||||
distance_or_fallback_km(b.explorer_pretty_bond.as_deref(), agent_lat, agent_lon);
|
||||
da.total_cmp(&db).then(a.created_utc.cmp(&b.created_utc))
|
||||
});
|
||||
|
||||
assert!(
|
||||
candidates[0].explorer_pretty_bond.is_some(),
|
||||
"expected valid-geo candidate first, got {:?}",
|
||||
candidates[0]
|
||||
);
|
||||
|
||||
// Missing geo fallback goes to FIFO order.
|
||||
assert!(candidates[1].explorer_pretty_bond.is_none());
|
||||
assert!(candidates[2].explorer_pretty_bond.is_none());
|
||||
assert!(candidates[1].created_utc < candidates[2].created_utc);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,7 +58,21 @@ async fn request_testrun(
|
||||
return Err(HttpError::no_testruns_available());
|
||||
}
|
||||
|
||||
match db::queries::testruns::assign_oldest_testrun(&mut conn).await {
|
||||
let assignment_result = if let Some((region, centroid)) =
|
||||
state.agent_region_and_centroid(&request.payload.agent_public_key)
|
||||
{
|
||||
tracing::debug!(
|
||||
"Resolved agent region '{region}' (lat={}, lon={}), assigning nearest testrun",
|
||||
centroid.lat,
|
||||
centroid.lon
|
||||
);
|
||||
db::queries::testruns::assign_nearest_testrun(&mut conn, centroid.lat, centroid.lon).await
|
||||
} else {
|
||||
tracing::debug!("Agent region not configured, falling back to FIFO assignment");
|
||||
db::queries::testruns::assign_oldest_testrun(&mut conn).await
|
||||
};
|
||||
|
||||
match assignment_result {
|
||||
Ok(res) => {
|
||||
let Some(assignment) = res else {
|
||||
tracing::debug!("No testruns available");
|
||||
|
||||
@@ -8,7 +8,7 @@ use axum::Router;
|
||||
use core::net::SocketAddr;
|
||||
use nym_crypto::asymmetric::ed25519::PublicKey;
|
||||
use nym_task::ShutdownTracker;
|
||||
use std::sync::Arc;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tokio::{net::TcpListener, sync::RwLock};
|
||||
|
||||
/// Return handles that allow for graceful shutdown of server + awaiting its
|
||||
@@ -19,6 +19,8 @@ pub(crate) async fn start_http_api(
|
||||
http_port: u16,
|
||||
nym_http_cache_ttl: u64,
|
||||
agent_key_list: Vec<PublicKey>,
|
||||
agent_region_map: HashMap<PublicKey, String>,
|
||||
region_centroids: HashMap<String, crate::http::state::RegionCentroid>,
|
||||
agent_max_count: i64,
|
||||
agent_request_freshness_requirement: time::Duration,
|
||||
node_geocache: NodeGeoCache,
|
||||
@@ -32,6 +34,8 @@ pub(crate) async fn start_http_api(
|
||||
db_pool,
|
||||
nym_http_cache_ttl,
|
||||
agent_key_list,
|
||||
agent_region_map,
|
||||
region_centroids,
|
||||
agent_max_count,
|
||||
agent_request_freshness_requirement,
|
||||
node_geocache,
|
||||
|
||||
@@ -36,6 +36,8 @@ pub(crate) struct AppState {
|
||||
db_pool: DbPool,
|
||||
cache: HttpCache,
|
||||
agent_key_list: Vec<PublicKey>,
|
||||
agent_region_map: HashMap<PublicKey, String>,
|
||||
region_centroids: HashMap<String, RegionCentroid>,
|
||||
agent_max_count: i64,
|
||||
agent_request_freshness_requirement: time::Duration,
|
||||
node_geocache: NodeGeoCache,
|
||||
@@ -50,6 +52,8 @@ impl AppState {
|
||||
db_pool: DbPool,
|
||||
cache_ttl: u64,
|
||||
agent_key_list: Vec<PublicKey>,
|
||||
agent_region_map: HashMap<PublicKey, String>,
|
||||
region_centroids: HashMap<String, RegionCentroid>,
|
||||
agent_max_count: i64,
|
||||
agent_request_freshness_requirement: time::Duration,
|
||||
node_geocache: NodeGeoCache,
|
||||
@@ -60,6 +64,8 @@ impl AppState {
|
||||
db_pool,
|
||||
cache: HttpCache::new(cache_ttl).await,
|
||||
agent_key_list,
|
||||
agent_region_map,
|
||||
region_centroids,
|
||||
agent_max_count,
|
||||
agent_request_freshness_requirement,
|
||||
node_geocache,
|
||||
@@ -85,6 +91,15 @@ impl AppState {
|
||||
self.agent_max_count
|
||||
}
|
||||
|
||||
pub(crate) fn agent_region_and_centroid(
|
||||
&self,
|
||||
agent_pubkey: &PublicKey,
|
||||
) -> Option<(&str, RegionCentroid)> {
|
||||
let region = self.agent_region_map.get(agent_pubkey)?;
|
||||
let centroid = self.region_centroids.get(region)?;
|
||||
Some((region.as_str(), *centroid))
|
||||
}
|
||||
|
||||
pub(crate) fn node_geocache(&self) -> NodeGeoCache {
|
||||
self.node_geocache.clone()
|
||||
}
|
||||
@@ -151,6 +166,12 @@ impl AppState {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub(crate) struct RegionCentroid {
|
||||
pub(crate) lat: f64,
|
||||
pub(crate) lon: f64,
|
||||
}
|
||||
|
||||
static GATEWAYS_LIST_KEY: &str = "gateways";
|
||||
static DVPN_GATEWAYS_LIST_KEY: &str = "dvpn_gateways";
|
||||
static DVPN_EXIT_GATEWAY_IPS: &str = "dvpn_exit_gateway_ips";
|
||||
|
||||
@@ -10,7 +10,7 @@ use nym_crypto::asymmetric::ed25519::PublicKey;
|
||||
use nym_network_defaults::setup_env;
|
||||
use nym_task::ShutdownManager;
|
||||
use nym_validator_client::nyxd::NyxdClient;
|
||||
use std::sync::Arc;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
mod cli;
|
||||
mod db;
|
||||
@@ -41,6 +41,13 @@ async fn main() -> anyhow::Result<()> {
|
||||
.map(|value| PublicKey::from_base58_string(value.trim()).map_err(anyhow::Error::from))
|
||||
.collect::<anyhow::Result<Vec<_>>>()?;
|
||||
tracing::info!("Registered {} agent keys", agent_key_list.len());
|
||||
let agent_region_map = parse_agent_region_map(args.agent_region_map.as_deref())?;
|
||||
let region_centroids = parse_region_centroids(args.region_centroids.as_deref())?;
|
||||
tracing::info!(
|
||||
"Configured {} agent region mappings and {} region centroids",
|
||||
agent_region_map.len(),
|
||||
region_centroids.len()
|
||||
);
|
||||
|
||||
let connection_url = args.database_url.clone();
|
||||
if std::env::var("SHOW_CONFIG").ok().is_some() {
|
||||
@@ -186,6 +193,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
args.http_port,
|
||||
args.nym_http_cache_ttl,
|
||||
agent_key_list.to_owned(),
|
||||
agent_region_map,
|
||||
region_centroids,
|
||||
args.max_agent_count,
|
||||
args.agent_request_freshness,
|
||||
geocache,
|
||||
@@ -202,3 +211,133 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn parse_agent_region_map(raw: Option<&str>) -> anyhow::Result<HashMap<PublicKey, String>> {
|
||||
let mut out = HashMap::new();
|
||||
let Some(raw) = raw else {
|
||||
return Ok(out);
|
||||
};
|
||||
|
||||
for entry in raw.split(',').map(str::trim).filter(|s| !s.is_empty()) {
|
||||
let (pubkey_raw, region_raw) = entry.split_once('=').ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"malformed NODE_STATUS_API_AGENT_REGION_MAP entry '{entry}', expected '<pubkey>=<region>'"
|
||||
)
|
||||
})?;
|
||||
let pubkey =
|
||||
PublicKey::from_base58_string(pubkey_raw.trim()).map_err(anyhow::Error::from)?;
|
||||
let region = region_raw.trim();
|
||||
if region.is_empty() {
|
||||
anyhow::bail!("empty region in NODE_STATUS_API_AGENT_REGION_MAP entry '{entry}'");
|
||||
}
|
||||
out.insert(pubkey, region.to_string());
|
||||
}
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
fn parse_region_centroids(
|
||||
raw: Option<&str>,
|
||||
) -> anyhow::Result<HashMap<String, http::state::RegionCentroid>> {
|
||||
let mut out = HashMap::new();
|
||||
let Some(raw) = raw else {
|
||||
return Ok(out);
|
||||
};
|
||||
|
||||
for entry in raw.split(',').map(str::trim).filter(|s| !s.is_empty()) {
|
||||
let (region_raw, lat_lon_raw) = entry.split_once('=').ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"malformed NODE_STATUS_API_REGION_CENTROIDS entry '{entry}', expected '<region>=<lat>:<lon>'"
|
||||
)
|
||||
})?;
|
||||
let region = region_raw.trim();
|
||||
let (lat_raw, lon_raw) = lat_lon_raw.split_once(':').ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"malformed NODE_STATUS_API_REGION_CENTROIDS entry '{entry}', expected '<region>=<lat>:<lon>'"
|
||||
)
|
||||
})?;
|
||||
let lat = lat_raw.trim().parse::<f64>().map_err(|err| {
|
||||
anyhow::anyhow!(
|
||||
"invalid latitude '{}' in entry '{}': {err}",
|
||||
lat_raw.trim(),
|
||||
entry
|
||||
)
|
||||
})?;
|
||||
let lon = lon_raw.trim().parse::<f64>().map_err(|err| {
|
||||
anyhow::anyhow!(
|
||||
"invalid longitude '{}' in entry '{}': {err}",
|
||||
lon_raw.trim(),
|
||||
entry
|
||||
)
|
||||
})?;
|
||||
|
||||
if region.is_empty() {
|
||||
anyhow::bail!("empty region in NODE_STATUS_API_REGION_CENTROIDS entry '{entry}'");
|
||||
}
|
||||
|
||||
out.insert(region.to_string(), http::state::RegionCentroid { lat, lon });
|
||||
}
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parses_agent_region_map() {
|
||||
let pubkey_a = nym_crypto::asymmetric::ed25519::PublicKey::from_bytes(&[1; 32])
|
||||
.expect("failed to create test public key A")
|
||||
.to_base58_string();
|
||||
let pubkey_b = nym_crypto::asymmetric::ed25519::PublicKey::from_bytes(&[2; 32])
|
||||
.expect("failed to create test public key B")
|
||||
.to_base58_string();
|
||||
let raw = format!("{pubkey_a}=eu-west,{pubkey_b}=asia-tokyo");
|
||||
|
||||
let parsed = parse_agent_region_map(Some(&raw)).expect("failed to parse map");
|
||||
|
||||
assert_eq!(parsed.len(), 2);
|
||||
let key_a = PublicKey::from_base58_string(&pubkey_a).expect("failed to decode key A");
|
||||
let key_b = PublicKey::from_base58_string(&pubkey_b).expect("failed to decode key B");
|
||||
assert_eq!(parsed.get(&key_a).map(String::as_str), Some("eu-west"));
|
||||
assert_eq!(parsed.get(&key_b).map(String::as_str), Some("asia-tokyo"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn malformed_agent_region_map_entry_returns_error() {
|
||||
let err = parse_agent_region_map(Some("this_is_not_valid")).expect_err("expected error");
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("malformed NODE_STATUS_API_AGENT_REGION_MAP entry"),
|
||||
"unexpected error: {err}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parses_region_centroids() {
|
||||
let raw = "eu-west=50.1109:8.6821,asia-tokyo=35.6762:139.6503";
|
||||
|
||||
let parsed = parse_region_centroids(Some(raw)).expect("failed to parse centroids");
|
||||
|
||||
assert_eq!(parsed.len(), 2);
|
||||
let eu = parsed.get("eu-west").expect("missing eu-west centroid");
|
||||
let asia = parsed
|
||||
.get("asia-tokyo")
|
||||
.expect("missing asia-tokyo centroid");
|
||||
assert!((eu.lat - 50.1109).abs() < 1e-9);
|
||||
assert!((eu.lon - 8.6821).abs() < 1e-9);
|
||||
assert!((asia.lat - 35.6762).abs() < 1e-9);
|
||||
assert!((asia.lon - 139.6503).abs() < 1e-9);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn malformed_region_centroids_entry_returns_error() {
|
||||
let err = parse_region_centroids(Some("eu-west=50.1|8.6")).expect_err("expected error");
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("malformed NODE_STATUS_API_REGION_CENTROIDS entry"),
|
||||
"unexpected error: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user