Compare commits

...

16 Commits

Author SHA1 Message Date
benedettadavico 48c0130f32 add multiple target host options 2026-04-30 09:26:36 +02:00
benedettadavico 2b237dec1f Remove in-prove
remove in-probe test, it isn't needed.
2026-04-30 08:52:46 +02:00
benedettadavico 1dc3e94bdc bugsfixes 2026-04-29 19:17:38 +02:00
benedettadavico 54efd30fb8 cleanup and remove unannounced node option 2026-04-29 12:05:31 +02:00
benedettadavico a095d975fb bump NS versions 2026-04-28 16:53:40 +02:00
benedettadavico 4c8ee91b84 probe arg fix 2026-04-28 15:38:50 +02:00
benedettadavico 07a5e91da2 migration update 2026-04-28 15:38:49 +02:00
benedettadavico 88749b5bc8 test port check in probe results 2026-04-28 15:38:49 +02:00
benedettadavico 0df359639a address comments 2026-04-28 15:38:49 +02:00
benedettadavico a43dd38e76 add support for not registered nodes
...
2026-04-28 15:38:49 +02:00
benedettadavico 8aaa8dc8d3 testing port checks 2026-04-28 15:38:49 +02:00
benedettadavico 35515fbf8c remove unregistered nodes 2026-04-28 15:38:49 +02:00
benedettadavico 5ea5cd13a2 address comments 2026-04-28 15:38:49 +02:00
benedettadavico 2874f26f57 add support for not registered nodes
...
2026-04-28 15:38:49 +02:00
benedettadavico 802417ccec add no-log to anywhere 2026-04-28 15:38:48 +02:00
benedettadavico fdb79eb5d6 testing port checks 2026-04-28 15:38:48 +02:00
53 changed files with 2269 additions and 185 deletions
Generated
+2 -2
View File
@@ -7560,7 +7560,7 @@ dependencies = [
[[package]]
name = "nym-node-status-agent"
version = "2.0.0"
version = "2.0.1"
dependencies = [
"anyhow",
"clap",
@@ -7581,7 +7581,7 @@ dependencies = [
[[package]]
name = "nym-node-status-api"
version = "4.6.1"
version = "4.6.2"
dependencies = [
"ammonia",
"anyhow",
+1 -1
View File
@@ -17,7 +17,7 @@ anyhow = { workspace = true }
base64 = { workspace = true }
bs58 = { workspace = true }
bytes = { workspace = true }
clap = { workspace = true, features = ["cargo", "derive"] }
clap = { workspace = true, features = ["cargo", "derive", "env"] }
futures = { workspace = true }
hex = { workspace = true }
tracing = { workspace = true }
+91
View File
@@ -9,6 +9,7 @@ use vergen_gitcl::{BuildBuilder, CargoBuilder, Emitter, GitclBuilder, RustcBuild
fn main() -> anyhow::Result<()> {
build_go()?;
generate_exit_policy_ports()?;
Emitter::default()
.add_instructions(&BuildBuilder::all_build()?)?
@@ -18,6 +19,96 @@ fn main() -> anyhow::Result<()> {
.emit()
}
/// Parse PORT_MAPPINGS from network-tunnel-manager.sh and generate a sorted
/// Rust const with every unique port. Ranges are represented by their start
/// and end values so a single TCP check can confirm the iptables rule exists.
fn generate_exit_policy_ports() -> anyhow::Result<()> {
use std::collections::BTreeMap;
let script_path = PathBuf::from("../scripts/nym-node-setup/network-tunnel-manager.sh");
let out_dir = PathBuf::from(std::env::var("OUT_DIR").context("OUT_DIR not set")?);
println!("cargo::rerun-if-changed={}", script_path.display());
let content = std::fs::read_to_string(&script_path).context(
"failed to read network-tunnel-manager.sh — is it present at ../scripts/nym-node-setup/ ?",
)?;
// port → service name (BTreeMap keeps them sorted)
let mut port_map: BTreeMap<u16, String> = BTreeMap::new();
let mut in_mappings = false;
for line in content.lines() {
let trimmed = line.trim();
if trimmed.starts_with("declare -A PORT_MAPPINGS=(") {
in_mappings = true;
continue;
}
if in_mappings && trimmed == ")" {
break;
}
if !in_mappings {
continue;
}
// strip comment prefix so we still pick up ports that are opened
// via a separate mechanism (e.g. SMTP/465 with rate limiting)
let stripped = trimmed.trim_start_matches('#').trim();
// match ["ServiceName"]="port-or-range"
let Some(name_start) = stripped.find("[\"") else {
continue;
};
let Some(name_end) = stripped.find("\"]=") else {
continue;
};
let service = &stripped[name_start + 2..name_end];
let value = stripped[name_end + 3..].trim_matches('"');
if value.contains('-') {
let parts: Vec<&str> = value.split('-').collect();
if parts.len() == 2 {
if let (Ok(lo), Ok(hi)) = (parts[0].parse::<u16>(), parts[1].parse::<u16>()) {
port_map
.entry(lo)
.or_insert_with(|| format!("{service} (range start)"));
port_map
.entry(hi)
.or_insert_with(|| format!("{service} (range end)"));
}
}
} else if let Ok(port) = value.parse::<u16>() {
port_map.entry(port).or_insert_with(|| service.to_string());
}
}
if port_map.is_empty() {
bail!("No ports found in PORT_MAPPINGS — is network-tunnel-manager.sh correct?");
}
// write generated Rust source
let mut out = String::new();
out.push_str(
"// Auto-generated from scripts/nym-node-setup/network-tunnel-manager.sh PORT_MAPPINGS.\n",
);
out.push_str("// Do not edit — changes are overwritten on rebuild.\n");
out.push_str("// To add or remove ports, update PORT_MAPPINGS in the shell script.\n\n");
out.push_str(&format!(
"/// {} unique ports parsed from the canonical exit policy at build time.\n",
port_map.len()
));
out.push_str("pub const EXIT_POLICY_PORTS: &[u16] = &[\n");
for (port, service) in &port_map {
let entry = format!("{port},");
out.push_str(&format!(" {entry:<7}// {service}\n"));
}
out.push_str("];\n");
std::fs::write(out_dir.join("exit_policy_ports.rs"), out)?;
Ok(())
}
fn build_go() -> anyhow::Result<()> {
const LIB_NAME: &str = "netstack_ping";
+119 -14
View File
@@ -62,21 +62,27 @@ type NetstackRequestGo struct {
DownloadTimeoutSec uint64 `json:"download_timeout_sec"`
MetadataTimeoutSec uint64 `json:"metadata_timeout_sec"`
AwgArgs string `json:"awg_args"`
// exit policy port check
PortCheckTarget string `json:"port_check_target"`
PortCheckPorts []uint16 `json:"port_check_ports"`
PortCheckOnly bool `json:"port_check_only"`
PortCheckTimeoutSec uint64 `json:"port_check_timeout_sec"`
}
type NetstackResponse struct {
CanHandshake bool `json:"can_handshake"`
CanQueryMetadata bool `json:"can_query_metadata"`
SentIps uint16 `json:"sent_ips"`
ReceivedIps uint16 `json:"received_ips"`
SentHosts uint16 `json:"sent_hosts"`
ReceivedHosts uint16 `json:"received_hosts"`
CanResolveDns bool `json:"can_resolve_dns"`
DownloadedFile string `json:"downloaded_file"`
DownloadedFileSizeBytes uint64 `json:"downloaded_file_size_bytes"`
DownloadDurationSec uint64 `json:"download_duration_sec"`
DownloadDurationMilliseconds uint64 `json:"download_duration_milliseconds"`
DownloadError string `json:"download_error"`
CanHandshake bool `json:"can_handshake"`
CanQueryMetadata bool `json:"can_query_metadata"`
SentIps uint16 `json:"sent_ips"`
ReceivedIps uint16 `json:"received_ips"`
SentHosts uint16 `json:"sent_hosts"`
ReceivedHosts uint16 `json:"received_hosts"`
CanResolveDns bool `json:"can_resolve_dns"`
DownloadedFile string `json:"downloaded_file"`
DownloadedFileSizeBytes uint64 `json:"downloaded_file_size_bytes"`
DownloadDurationSec uint64 `json:"download_duration_sec"`
DownloadDurationMilliseconds uint64 `json:"download_duration_milliseconds"`
DownloadError string `json:"download_error"`
PortCheckResults map[string]bool `json:"port_check_results,omitempty"`
}
type SuccessResult = struct {
@@ -201,7 +207,7 @@ func pingTwoHop(req TwoHopNetstackRequest) (NetstackResponse, error) {
log.Printf("Exit WG IP: %s", req.ExitWgIp)
log.Printf("IP version: %d", req.IpVersion)
response := NetstackResponse{false, false, 0, 0, 0, 0, false, "", 0, 0, 0, ""}
response := NetstackResponse{}
// Parse the exit endpoint to determine IP version for forwarder
exitEndpoint, err := netip.ParseAddrPort(req.ExitEndpoint)
@@ -439,7 +445,7 @@ func ping(req NetstackRequestGo) (NetstackResponse, error) {
ipc.WriteString("\nallowed_ip=::/0\n")
}
response := NetstackResponse{false, false, 0, 0, 0, 0, false, "", 0, 0, 0, ""}
response := NetstackResponse{}
err = dev.IpcSet(ipc.String())
if err != nil {
@@ -463,6 +469,19 @@ func ping(req NetstackRequestGo) (NetstackResponse, error) {
response.CanHandshake = true
// port-check-only mode: skip pings/download, only test TCP port reachability
if req.PortCheckOnly && len(req.PortCheckPorts) > 0 {
log.Printf("=== Port Check Only Mode ===")
response.PortCheckResults = checkPorts(req.PortCheckTarget, req.PortCheckPorts, req.PortCheckTimeoutSec, tnet)
return response, nil
}
// run port checks alongside normal tests if ports were requested
if len(req.PortCheckPorts) > 0 {
log.Printf("=== Running Port Checks (alongside normal tests) ===")
response.PortCheckResults = checkPorts(req.PortCheckTarget, req.PortCheckPorts, req.PortCheckTimeoutSec, tnet)
}
// Skip metadata query if endpoint is empty (e.g., for IPv6 where the IPv4 metadata endpoint is not reachable)
if req.MetadataEndpoint != "" {
version, duration, err := queryMetadata(req.MetadataEndpoint, req.MetadataTimeoutSec, tnet)
@@ -569,6 +588,92 @@ func ping(req NetstackRequestGo) (NetstackResponse, error) {
return response, nil
}
// checkPorts tests TCP connectivity to a target on each port through the WG
// tunnel. blocked ports (NYM-EXIT iptables) will timeout or get reset.
// runs concurrently.
func checkPorts(target string, ports []uint16, timeoutSec uint64, tnet *netstack.Net) map[string]bool {
if target == "" {
target = "portquiz.net"
}
if timeoutSec == 0 {
timeoutSec = 5
}
// resolve target once via host DNS (we only care about TCP reachability
// through the tunnel, not DNS-through-tunnel behaviour)
targetIP := target
if net.ParseIP(target) == nil {
addrs, err := net.LookupHost(target)
if err != nil || len(addrs) == 0 {
log.Printf("Port check: DNS lookup for %s failed (%v), using hostname as-is", target, err)
} else {
targetIP = addrs[0]
log.Printf("Port check: resolved %s -> %s", target, targetIP)
}
}
timeout := time.Duration(timeoutSec) * time.Second
// batching: portquiz.net enforces a per-source-IP connection limit.
// testing all 114 ports in one burst trips it after ~29 connections.
// small batches with a cooldown in between keep us under the limit.
const batchSize = 20
const batchDelay = 25 * time.Second
const dialGap = 200 * time.Millisecond
results := make(map[string]bool)
totalBatches := (len(ports) + batchSize - 1) / batchSize
for batchIdx := 0; batchIdx < totalBatches; batchIdx++ {
start := batchIdx * batchSize
end := start + batchSize
if end > len(ports) {
end = len(ports)
}
batch := ports[start:end]
if batchIdx > 0 {
log.Printf("Batch %d/%d: cooldown %v before next batch...",
batchIdx+1, totalBatches, batchDelay)
time.Sleep(batchDelay)
}
log.Printf("Batch %d/%d: testing %d ports (%d%d)...",
batchIdx+1, totalBatches, len(batch), batch[0], batch[len(batch)-1])
for i, p := range batch {
if i > 0 {
time.Sleep(dialGap)
}
addr := fmt.Sprintf("%s:%d", targetIP, p)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
conn, err := tnet.DialContext(ctx, "tcp", addr)
cancel()
if err != nil {
log.Printf("Port %d: CLOSED (%v)", p, err)
results[fmt.Sprintf("%d", p)] = false
} else {
conn.Close()
log.Printf("Port %d: OPEN", p)
results[fmt.Sprintf("%d", p)] = true
}
}
}
openCount := 0
for _, open := range results {
if open {
openCount++
}
}
log.Printf("Port check complete: %d/%d ports open", openCount, len(ports))
return results
}
func sendPing(address string, seq uint8, sendTtimeoutSecs uint64, receiveTimoutSecs uint64, tnet *netstack.Net, ipVersion uint8) (time.Duration, error) {
maxPingRetries := 2
baseTimeout := receiveTimoutSecs
+26
View File
@@ -4,6 +4,7 @@
use crate::config::NetstackArgs;
use anyhow::Context;
use serde::Deserialize;
use std::collections::BTreeMap;
use std::ffi::{CStr, CString};
mod sys {
@@ -25,6 +26,11 @@ pub struct NetstackRequest {
v6_ping_config: PingConfig,
download_timeout_sec: u64,
awg_args: String,
// exit policy port check
port_check_target: String,
port_check_ports: Vec<u16>,
port_check_only: bool,
port_check_timeout_sec: u64,
}
#[derive(serde::Serialize)]
@@ -76,6 +82,7 @@ impl NetstackRequest {
download_timeout_sec: u64,
awg_args: &str,
netstack_args: NetstackArgs,
port_check_only: bool,
) -> Self {
Self {
private_key: private_key.to_string(),
@@ -86,6 +93,10 @@ impl NetstackRequest {
v4_ping_config: PingConfig::from_netstack_args_v4(wg_ip4, &netstack_args),
v6_ping_config: PingConfig::from_netstack_args_v6(wg_ip6, &netstack_args),
download_timeout_sec,
port_check_target: netstack_args.port_check_target.clone(),
port_check_ports: netstack_args.port_check_ports.clone(),
port_check_only,
port_check_timeout_sec: netstack_args.port_check_timeout_sec,
}
}
@@ -116,6 +127,11 @@ pub struct NetstackRequestGo {
recv_timeout_sec: u64,
download_timeout_sec: u64,
awg_args: String,
// exit policy port check
port_check_target: String,
port_check_ports: Vec<u16>,
port_check_only: bool,
port_check_timeout_sec: u64,
}
impl NetstackRequestGo {
@@ -135,6 +151,10 @@ impl NetstackRequestGo {
recv_timeout_sec: req.v4_ping_config.recv_timeout_sec,
download_timeout_sec: req.download_timeout_sec,
awg_args: req.awg_args.clone(),
port_check_target: req.port_check_target.clone(),
port_check_ports: req.port_check_ports.clone(),
port_check_only: req.port_check_only,
port_check_timeout_sec: req.port_check_timeout_sec,
}
}
@@ -155,6 +175,10 @@ impl NetstackRequestGo {
recv_timeout_sec: req.v6_ping_config.recv_timeout_sec,
download_timeout_sec: req.download_timeout_sec,
awg_args: req.awg_args.clone(),
port_check_target: req.port_check_target.clone(),
port_check_ports: req.port_check_ports.clone(),
port_check_only: req.port_check_only,
port_check_timeout_sec: req.port_check_timeout_sec,
}
}
}
@@ -173,6 +197,8 @@ pub struct NetstackResponse {
pub downloaded_file_size_bytes: u64,
pub download_duration_milliseconds: u64,
pub download_error: String,
#[serde(default)]
pub port_check_results: Option<BTreeMap<String, bool>>,
}
#[derive(Clone, Debug, serde::Deserialize)]
@@ -46,6 +46,7 @@ pub async fn wg_probe(
auth_version: AuthenticatorVersion,
awg_args: Option<String>,
netstack_args: NetstackArgs,
port_check_only: bool,
// TODO: update type
credential: CredentialSpendingData,
) -> anyhow::Result<WgProbeResults> {
@@ -149,6 +150,7 @@ pub async fn wg_probe(
&tunnel_config,
&netstack_args,
&awg_args.unwrap_or_default(),
port_check_only,
&mut wg_outcome,
);
+59
View File
@@ -1,15 +1,57 @@
use nym_connection_monitor::ConnectionStatusEvent;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
pub use super::bandwidth_helpers::{AttachedTicket, AttachedTicketMaterials};
pub use super::socks5_test::HttpsConnectivityResult;
pub use nym_credentials::ecash::bandwidth::serialiser::VersionedSerialise;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PortsCheckSummary {
pub all_pass: bool,
pub error: Option<String>,
pub port_check_target: String,
pub failed_ports: Vec<String>,
}
impl PortsCheckSummary {
pub fn from_port_map(
can_register: bool,
port_check_target: impl Into<String>,
ports: &BTreeMap<String, bool>,
) -> Self {
let failed_ports: Vec<String> = ports
.iter()
.filter_map(|(port, open)| if *open { None } else { Some(port.clone()) })
.collect();
let all_pass = can_register && failed_ports.is_empty() && !ports.is_empty();
Self {
all_pass,
error: None,
port_check_target: port_check_target.into(),
failed_ports,
}
}
pub fn probe_error(port_check_target: impl Into<String>, message: impl Into<String>) -> Self {
Self {
all_pass: false,
error: Some(message.into()),
port_check_target: port_check_target.into(),
failed_ports: vec![],
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProbeResult {
pub node: String,
pub used_entry: String,
pub outcome: ProbeOutcome,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ports_check: Option<PortsCheckSummary>,
}
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
@@ -50,6 +92,10 @@ pub struct WgProbeResults {
pub download_duration_milliseconds_v6: u64,
pub downloaded_file_v6: String,
pub download_error_v6: String,
/// Per-port exit-policy check
#[serde(default, skip_serializing_if = "Option::is_none")]
pub port_check_results: Option<BTreeMap<String, bool>>,
}
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
@@ -195,6 +241,19 @@ impl Socks5ProbeResults {
}
}
/// Output of the `run-ports` subcommand — per-port TCP reachability through
/// the WG exit tunnel, without the full probe outcome.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PortCheckResult {
pub gateway: String,
pub can_register: bool,
pub port_check_target: String,
/// port → open/closed (BTreeMap for deterministic bincode serialization in signed requests)
pub ports: BTreeMap<String, bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Debug, Clone, Default)]
pub struct IpPingReplies {
pub ipr_tun_ip_v4: bool,
+16 -1
View File
@@ -63,6 +63,7 @@ impl WgTunnelConfig {
/// - DNS resolution
/// - ICMP ping to specified hosts and IPs
/// - Optional download test
/// - Optional exit policy port check (TCP connect through tunnel)
///
/// Results are written directly into the provided `wg_outcome` to avoid field-by-field
/// copying at call sites.
@@ -71,6 +72,7 @@ impl WgTunnelConfig {
/// * `config` - WireGuard tunnel configuration
/// * `netstack_args` - Netstack test parameters (DNS, hosts to ping, timeouts, etc.)
/// * `awg_args` - Amnezia WireGuard arguments (empty string for standard WG)
/// * `port_check_only` - If true, skip pings/download and only run TCP port checks
/// * `wg_outcome` - Mutable reference to write test results into
// This function extracts the shared netstack testing logic from
// wg_probe() and wg_probe_lp() to eliminate code duplication.
@@ -78,6 +80,7 @@ pub fn run_tunnel_tests(
config: &WgTunnelConfig,
netstack_args: &NetstackArgs,
awg_args: &str,
port_check_only: bool,
wg_outcome: &mut WgProbeResults,
) {
// Build the netstack request
@@ -91,9 +94,10 @@ pub fn run_tunnel_tests(
netstack_args.netstack_download_timeout_sec,
awg_args,
netstack_args.clone(),
port_check_only,
);
// Perform IPv4 ping test
// Perform IPv4 ping test (also carries port check results in port-check-only mode)
info!("Testing IPv4 tunnel connectivity...");
let ipv4_request = NetstackRequestGo::from_rust_v4(&netstack_request);
@@ -122,6 +126,11 @@ pub fn run_tunnel_tests(
netstack_response_v4.downloaded_file_size_bytes;
wg_outcome.downloaded_file_v4 = netstack_response_v4.downloaded_file;
wg_outcome.download_error_v4 = netstack_response_v4.download_error;
// capture port check results (present when ports were requested)
if netstack_response_v4.port_check_results.is_some() {
wg_outcome.port_check_results = netstack_response_v4.port_check_results;
}
}
Ok(NetstackResult::Error { error }) => {
error!("Netstack runtime error (IPv4): {error}")
@@ -131,6 +140,12 @@ pub fn run_tunnel_tests(
}
}
// in port-check-only mode, skip IPv6 tests — port checks ran through IPv4 above
if port_check_only {
info!("Port-check-only mode: skipping IPv6 tunnel tests");
return;
}
// Perform IPv6 ping test
info!("Testing IPv6 tunnel connectivity...");
let ipv6_request = NetstackRequestGo::from_rust_v6(&netstack_request);
+2 -2
View File
@@ -9,7 +9,7 @@ mod socks5;
mod test_mode;
pub use credentials::{CredentialArgs, CredentialMode};
pub use netstack::NetstackArgs;
pub use netstack::{EXIT_POLICY_PORTS, NetstackArgs};
pub use socks5::Socks5Args;
pub use test_mode::TestMode;
@@ -22,7 +22,7 @@ pub struct ProbeConfig {
/// Test mode - explicitly specify which tests to run
///
/// Modes:
/// core. - Traditional mixnet testing (entry/exit pings + WireGuard via authenticator)
/// core - Traditional mixnet testing (entry/exit pings + WireGuard via authenticator)
/// wg-mix - Wireguard via authenticator
/// wg-lp - Entry LP + Exit LP (nested forwarding) + WireGuard
/// lp-only - LP registration only (no WireGuard)
+67
View File
@@ -3,6 +3,11 @@
use clap::Args;
// EXIT_POLICY_PORTS is generated at build time by parsing PORT_MAPPINGS
// from scripts/nym-node-setup/network-tunnel-manager.sh.
// To add or remove ports, update PORT_MAPPINGS in the shell script and rebuild.
include!(concat!(env!("OUT_DIR"), "/exit_policy_ports.rs"));
#[derive(Args, Clone, Debug)]
pub struct NetstackArgs {
#[arg(long, hide = true, env = "PROBE_NETSTACK_DOWNLOAD_TIMEOUT_SEC", default_value_t = NetstackArgs::default().netstack_download_timeout_sec)]
@@ -37,6 +42,22 @@ pub struct NetstackArgs {
#[arg(long, hide = true, env = "PROBE_NETSTACK_PING_IPS_V6", default_values_t = NetstackArgs::default().netstack_ping_ips_v6)]
pub netstack_ping_ips_v6: Vec<String>,
/// Target host for exit policy port checks (must listen on all tested ports)
#[arg(
long = "use-target",
env = "PROBE_NETSTACK_PORT_CHECK_TARGET",
default_value = "portquiz.net"
)]
pub port_check_target: String,
/// List ports to check, separated by a comma.
#[arg(long = "check-ports", value_delimiter = ',', default_values_t = Vec::<u16>::new())]
pub port_check_ports: Vec<u16>,
/// Timeout in seconds for each individual port check attempt
#[arg(long, default_value_t = NetstackArgs::default().port_check_timeout_sec)]
pub port_check_timeout_sec: u64,
}
impl Default for NetstackArgs {
@@ -57,6 +78,9 @@ impl Default for NetstackArgs {
"2606:4700:4700::1111".to_string(),
"2620:fe::fe".to_string(),
],
port_check_target: "portquiz.net".to_string(),
port_check_ports: vec![],
port_check_timeout_sec: 5,
}
}
}
@@ -87,6 +111,9 @@ mod tests {
"2606:4700:4700::1111".to_string(),
"2620:fe::fe".to_string(),
],
port_check_target: "portquiz.net".to_string(),
port_check_ports: vec![],
port_check_timeout_sec: 5,
};
// Test IPv4 defaults
@@ -111,6 +138,11 @@ mod tests {
assert_eq!(args.netstack_num_ping, 5);
assert_eq!(args.netstack_send_timeout_sec, 3);
assert_eq!(args.netstack_recv_timeout_sec, 3);
// Test port check defaults
assert_eq!(args.port_check_target, "portquiz.net");
assert!(args.port_check_ports.is_empty());
assert_eq!(args.port_check_timeout_sec, 5);
}
#[test]
@@ -128,6 +160,9 @@ mod tests {
netstack_ping_ips_v4: vec!["8.8.8.8".to_string()],
netstack_ping_hosts_v6: vec!["ipv6.example.com".to_string()],
netstack_ping_ips_v6: vec!["2001:4860:4860::8888".to_string()],
port_check_target: "portquiz.net".to_string(),
port_check_ports: vec![80, 443, 8332],
port_check_timeout_sec: 10,
};
assert_eq!(args.netstack_ping_hosts_v4, vec!["example.com"]);
@@ -163,6 +198,9 @@ mod tests {
"2001:4860:4860::8888".to_string(),
"2606:4700:4700::1111".to_string(),
],
port_check_target: "portquiz.net".to_string(),
port_check_ports: vec![],
port_check_timeout_sec: 5,
};
assert_eq!(args.netstack_ping_hosts_v4, vec!["nym.com", "example.com"]);
@@ -192,6 +230,9 @@ mod tests {
netstack_ping_ips_v4: vec![],
netstack_ping_hosts_v6: vec![],
netstack_ping_ips_v6: vec![],
port_check_target: "portquiz.net".to_string(),
port_check_ports: vec![],
port_check_timeout_sec: 0,
};
assert_eq!(args.netstack_num_ping, 0);
@@ -219,6 +260,9 @@ mod tests {
netstack_ping_ips_v4: vec!["1.1.1.1".to_string()],
netstack_ping_hosts_v6: vec!["cloudflare.com".to_string()],
netstack_ping_ips_v6: vec!["2001:4860:4860::8888".to_string()],
port_check_target: "portquiz.net".to_string(),
port_check_ports: vec![],
port_check_timeout_sec: 5,
};
assert!(args.netstack_ping_hosts_v4[0].contains("nym"));
@@ -228,4 +272,27 @@ mod tests {
assert_eq!(args.netstack_v4_dns, "1.1.1.1");
assert_eq!(args.netstack_v6_dns, "2606:4700:4700::1111");
}
#[test]
fn test_exit_policy_ports_no_duplicates_and_sorted() {
let ports = EXIT_POLICY_PORTS;
assert!(!ports.is_empty(), "EXIT_POLICY_PORTS should not be empty");
// verify sorted
for window in ports.windows(2) {
assert!(
window[0] < window[1],
"EXIT_POLICY_PORTS out of order or duplicate: {} >= {}",
window[0],
window[1]
);
}
// spot-check a few well-known ports
assert!(ports.contains(&22), "should contain SSH (22)");
assert!(ports.contains(&443), "should contain HTTPS (443)");
assert!(ports.contains(&22021), "should contain Session (22021)");
assert!(ports.contains(&8332), "should contain Bitcoin (8332)");
assert!(ports.contains(&9735), "should contain Lightning (9735)");
}
}
@@ -10,6 +10,9 @@
//! - LpOnly: LP registration only, no WireGuard
//! - Socks5Only: Socks5 test
//! - All: Mixnet, wireguard over authenticator and LP registration
//!
//! Note: Exit policy port checking is handled by the `run-ports` subcommand,
//! not via a test mode.
/// Test mode for the gateway probe.
///
+452 -3
View File
@@ -9,7 +9,9 @@ use crate::common::probe_tests::{
};
use crate::common::types::{Entry, LpProbeResults};
use crate::config::{CredentialArgs, CredentialMode, NetstackArgs, ProbeConfig};
use nym_authenticator_client::{AuthClientMixnetListener, AuthenticatorClient};
use nym_authenticator_client::{
AuthClientMixnetListener, AuthClientMixnetListenerHandle, AuthenticatorClient,
};
use nym_bandwidth_controller::BandwidthTicketProvider;
use nym_client_core::config::ForgetMe;
use nym_config::defaults::NymNetworkDetails;
@@ -20,13 +22,16 @@ use nym_sdk::mixnet::{
};
use nym_topology::{HardcodedTopologyProvider, NymTopology};
use rand::rngs::OsRng;
use std::collections::BTreeMap;
use std::net::IpAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::*;
pub use crate::common::nodes::{NymApiDirectory, query_gateway_by_ip};
pub use crate::common::types::{ProbeOutcome, ProbeResult};
pub use crate::common::types::{PortCheckResult, PortsCheckSummary, ProbeOutcome, ProbeResult};
mod common;
pub use common::types;
@@ -45,7 +50,280 @@ pub struct Probe {
topology: Option<NymTopology>,
}
#[derive(Debug, Clone)]
pub struct RunPortsConfig {
pub min_gateway_mixnet_performance: Option<u8>,
pub ignore_egress_epoch_role: bool,
pub netstack_args: NetstackArgs,
}
// Port checks always target a bonded gateway. There are two entry points:
// - `run_ports` : local CLI, on-disk storage + mnemonic.
// - `run_ports_for_agent`: NS agent, ephemeral storage + ticket materials.
struct PortScanRun {
can_register: bool,
port_results: BTreeMap<String, bool>,
last_error: Option<String>,
}
/// Validated info needed to run a WG port-check via the mixnet.
struct PortCheckSetup {
exit_node: TestedNodeDetails,
exit_identity: String,
authenticator: nym_sdk::mixnet::Recipient,
ip_address: IpAddr,
port_check_target: String,
ports_count: usize,
}
impl PortCheckSetup {
fn new(exit_node: TestedNodeDetails, config: &RunPortsConfig) -> anyhow::Result<Self> {
let exit_identity = exit_node.identity.to_string();
let (authenticator, ip_address) =
match (exit_node.authenticator_address, exit_node.ip_address) {
(Some(auth), Some(ip)) => (auth, ip),
_ => anyhow::bail!(
"Gateway {} missing authenticator address or IP — not a functional exit",
exit_identity
),
};
let ports_count = config.netstack_args.port_check_ports.len();
if ports_count == 0 {
anyhow::bail!(
"No ports specified. Use --check-ports 80,443,22021 or --check-all-ports"
);
}
Ok(Self {
exit_node,
exit_identity,
authenticator,
ip_address,
port_check_target: config.netstack_args.port_check_target.clone(),
ports_count,
})
}
fn failed_to_connect(&self, err: impl std::fmt::Display) -> PortCheckResult {
PortCheckResult {
gateway: self.exit_identity.clone(),
can_register: false,
port_check_target: self.port_check_target.clone(),
ports: BTreeMap::new(),
error: Some(format!("Failed to connect to mixnet: {err}")),
}
}
}
impl Probe {
fn parse_port_check_targets(raw: &str) -> Vec<String> {
raw.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.map(ToOwned::to_owned)
.collect()
}
async fn is_target_reachable(host: &str) -> bool {
let timeout = Duration::from_secs(2);
for port in [80u16, 443u16] {
if tokio::time::timeout(timeout, tokio::net::TcpStream::connect((host, port)))
.await
.is_ok_and(|res| res.is_ok())
{
return true;
}
}
false
}
async fn run_port_scan_with_retries(
mixnet_listener_task: &AuthClientMixnetListenerHandle,
nym_address: nym_sdk::mixnet::Recipient,
authenticator: nym_sdk::mixnet::Recipient,
authenticator_version: nym_authenticator_requests::AuthenticatorVersion,
ip_address: IpAddr,
bandwidth_provider: &dyn BandwidthTicketProvider,
wg_ticket_type: TicketType,
credential_provider: nym_sdk::mixnet::NodeIdentity,
netstack_args: NetstackArgs,
awg_args: Option<String>,
) -> PortScanRun {
let mut port_results: BTreeMap<String, bool> = BTreeMap::new();
let mut can_register = false;
let mut last_error = None;
let max_attempts = 3;
for attempt in 1..=max_attempts {
if attempt > 1 {
info!("Retrying authenticator registration (attempt {attempt}/{max_attempts})...");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
let credential = match bandwidth_provider
.get_ecash_ticket(wg_ticket_type, credential_provider, 1)
.await
{
Ok(ticket) => ticket.data,
Err(e) => {
error!("Failed to get ecash ticket: {e}");
last_error = Some(format!("Failed to get ecash ticket: {e}"));
break;
}
};
let mut rng = rand::thread_rng();
let auth_client = AuthenticatorClient::new(
mixnet_listener_task.subscribe(),
mixnet_listener_task.mixnet_sender(),
nym_address,
authenticator,
authenticator_version,
Arc::new(x25519::KeyPair::new(&mut rng)),
ip_address,
);
match wg_probe(
auth_client,
ip_address,
authenticator_version,
awg_args.clone(),
netstack_args.clone(),
true, // port_check_only
credential,
)
.await
{
Ok(outcome) => {
if outcome.can_register {
can_register = true;
port_results = outcome
.port_check_results
.unwrap_or_default()
.into_iter()
.collect();
let open = port_results.values().filter(|&&v| v).count();
info!(
"Port check complete: {}/{} ports open",
open,
port_results.len()
);
break;
}
warn!(
"Auth registration returned but can_register=false (attempt {attempt}/{max_attempts})"
);
last_error = Some("Auth registration did not complete".into());
}
Err(e) => {
warn!("WG probe error: {e} (attempt {attempt}/{max_attempts})");
last_error = Some(format!("WG probe error: {e}"));
}
}
}
PortScanRun {
can_register,
port_results,
last_error,
}
}
/// Warm up routes, register with the authenticator, run the port scan and tear down.
async fn port_check_after_connect(
mixnet_client: MixnetClient,
setup: PortCheckSetup,
bandwidth_provider: &dyn BandwidthTicketProvider,
netstack_args: NetstackArgs,
) -> PortCheckResult {
let targets = Self::parse_port_check_targets(&setup.port_check_target);
let mut selected_target = targets
.first()
.cloned()
.unwrap_or_else(|| setup.port_check_target.clone());
if targets.len() > 1 {
let mut found = false;
for candidate in &targets {
if Self::is_target_reachable(candidate).await {
selected_target = candidate.clone();
found = true;
break;
}
warn!(
"Port-check target '{}' is unreachable, trying next",
candidate
);
}
if !found {
warn!(
"All port-check targets unreachable; falling back to first: '{}'",
selected_target
);
} else if selected_target != *targets.first().unwrap() {
info!(
"Port check: selected target '{}' (first reachable from list)",
selected_target
);
}
}
let mut netstack_args = netstack_args;
netstack_args.port_check_target = selected_target.clone();
info!("Warming up mixnet routes...");
let nym_address = *mixnet_client.nym_address();
let (warmup_result, mixnet_client) = do_ping(
mixnet_client,
nym_address,
setup.exit_node.exit_router_address,
false,
)
.await;
match warmup_result {
Ok(_) => info!("Mixnet warmup done"),
Err(e) => warn!("Warmup had issues ({e}), auth may be less reliable"),
}
let nym_address = *mixnet_client.nym_address();
let mixnet_listener_task =
AuthClientMixnetListener::new(mixnet_client, CancellationToken::new()).start();
let scan = Self::run_port_scan_with_retries(
&mixnet_listener_task,
nym_address,
setup.authenticator,
setup.exit_node.authenticator_version,
setup.ip_address,
bandwidth_provider,
TicketType::V1WireguardExit,
setup.exit_node.identity,
netstack_args,
None,
)
.await;
mixnet_listener_task.stop().await;
PortCheckResult {
gateway: setup.exit_identity,
can_register: scan.can_register,
port_check_target: selected_target,
ports: scan.port_results,
error: if scan.can_register {
None
} else {
scan.last_error
},
}
}
/// Create a probe with pre-queried gateway nodes
pub fn new(
entry_node: TestedNodeDetails,
@@ -90,7 +368,6 @@ impl Probe {
})
}
/// Run a probe as an NS agent (orchestrator for multiple probe runs for NS API)
pub async fn probe_run_agent(
mut self,
credential_args: CredentialArgs,
@@ -256,6 +533,174 @@ impl Probe {
self.do_probe_test(mixnet_client, bandwidth_provider).await
}
pub async fn run_ports(
entry_node: TestedNodeDetails,
exit_node: Option<TestedNodeDetails>,
network: NymNetworkDetails,
config: &RunPortsConfig,
config_dir: &PathBuf,
credential: CredentialMode,
) -> anyhow::Result<PortCheckResult> {
let exit_node = exit_node.unwrap_or(entry_node.clone());
let setup = PortCheckSetup::new(exit_node, config)?;
info!(
"Port check: testing {} ports on gateway {} via {}",
setup.ports_count, setup.exit_identity, setup.port_check_target
);
let storage_paths = StoragePaths::new_from_dir(config_dir)?;
let storage = storage_paths
.initialise_default_persistent_storage()
.await?;
let mixnet_debug_config = helpers::mixnet_debug_config(
config.min_gateway_mixnet_performance,
config.ignore_egress_epoch_role,
);
let topology = helpers::fetch_topology(&network, &mixnet_debug_config)
.await
.inspect_err(|e| warn!("Failed to fetch topology: {e}"))
.ok();
let mut mixnet_client_builder = MixnetClientBuilder::new_with_storage(storage.clone())
.request_gateway(entry_node.identity.to_string())
.network_details(network.clone())
.debug_config(mixnet_debug_config)
.with_forget_me(ForgetMe::new_stats())
.credentials_mode(!credential.use_mock_ecash);
if let Some(topology) = &topology {
mixnet_client_builder = mixnet_client_builder.custom_topology_provider(Box::new(
HardcodedTopologyProvider::new(topology.clone()),
));
}
let disconnected_mixnet_client = mixnet_client_builder.build()?;
// make sure identity keys exist before credential acquisition
// (acquire_bandwidth → create_bandwidth_client needs them on disk)
let key_store = storage.key_store();
if key_store.load_keys().await.is_err() {
debug!("Generating new client keys");
let mut rng = OsRng;
nym_client_core::init::generate_new_client_keys(&mut rng, key_store).await?;
}
credential
.acquire(&disconnected_mixnet_client, &storage)
.await?;
let bandwidth_provider = build_bandwidth_controller(
&network,
storage.credential_store().clone(),
credential.use_mock_ecash,
)?;
let mixnet_client = match disconnected_mixnet_client.connect_to_mixnet().await {
Ok(client) => {
info!(
"Connected to mixnet via entry gateway: {}",
entry_node.identity
);
info!("Our nym address: {}", *client.nym_address());
client
}
Err(e) => return Ok(setup.failed_to_connect(e)),
};
Ok(Self::port_check_after_connect(
mixnet_client,
setup,
bandwidth_provider.as_ref(),
config.netstack_args.clone(),
)
.await)
}
/// Bonded gateway port-check, run by the NS agent. Uses ephemeral storage and ticket
/// materials provided by the NS API instead of mnemonic-based acquisition.
pub async fn run_ports_for_agent(
entry_gateway: nym_sdk::mixnet::ed25519::PublicKey,
network: NymNetworkDetails,
config: &RunPortsConfig,
credential_args: CredentialArgs,
) -> anyhow::Result<PortCheckResult> {
let api_url = network
.endpoints
.first()
.and_then(|ep| ep.api_url())
.ok_or(anyhow::anyhow!("missing api url"))?;
let directory = NymApiDirectory::new(api_url).await?;
let entry_node = directory
.entry_gateway(&entry_gateway)?
.to_testable_node()?;
// agent always uses the entry gateway as the exit
let setup = PortCheckSetup::new(entry_node.clone(), config)?;
info!(
"Port check (agent): testing {} ports on gateway {} via {}",
setup.ports_count, setup.exit_identity, setup.port_check_target
);
let storage = Ephemeral::default();
let mixnet_debug_config = helpers::mixnet_debug_config(
config.min_gateway_mixnet_performance,
config.ignore_egress_epoch_role,
);
let topology = helpers::fetch_topology(&network, &mixnet_debug_config)
.await
.inspect_err(|e| warn!("Failed to fetch topology: {e}"))
.ok();
let mut mixnet_client_builder = MixnetClientBuilder::new_with_storage(storage.clone())
.request_gateway(entry_node.identity.to_string())
.network_details(network.clone())
.debug_config(mixnet_debug_config)
.with_forget_me(ForgetMe::new_stats())
.credentials_mode(true);
if let Some(topology) = &topology {
mixnet_client_builder = mixnet_client_builder.custom_topology_provider(Box::new(
HardcodedTopologyProvider::new(topology.clone()),
));
}
let disconnected_mixnet_client = mixnet_client_builder.build()?;
credential_args
.import_credential(&disconnected_mixnet_client)
.await?;
let bandwidth_provider =
build_bandwidth_controller(&network, storage.credential_store().clone(), false)?;
let mixnet_client = match disconnected_mixnet_client.connect_to_mixnet().await {
Ok(client) => {
info!(
"Connected to mixnet via entry gateway: {}",
entry_node.identity
);
info!("Our nym address: {}", *client.nym_address());
client
}
Err(e) => return Ok(setup.failed_to_connect(e)),
};
Ok(Self::port_check_after_connect(
mixnet_client,
setup,
bandwidth_provider.as_ref(),
config.netstack_args.clone(),
)
.await)
}
#[allow(clippy::too_many_arguments)]
pub async fn do_probe_test(
self,
@@ -276,6 +721,7 @@ impl Probe {
lp: None,
socks5: None,
},
ports_check: None,
};
let mixnet_client = match mixnet_client {
@@ -356,6 +802,7 @@ impl Probe {
let mixnet_listener_task =
AuthClientMixnetListener::new(mixnet_client, CancellationToken::new())
.start();
let mut rng = rand::thread_rng();
let auth_client = AuthenticatorClient::new(
mixnet_listener_task.subscribe(),
@@ -384,6 +831,7 @@ impl Probe {
exit_node.authenticator_version,
self.config.amnezia_args.clone(),
self.config.netstack_args.clone(),
false,
credential,
)
.await
@@ -391,6 +839,7 @@ impl Probe {
// Add wg results to probe result
probe_result.outcome.wg = Some(outcome);
mixnet_listener_task.stop().await;
} else {
warn!("Not enough information to run WireGuard via mixnet registration tests");
+140 -9
View File
@@ -1,12 +1,15 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use clap::{Parser, Subcommand};
use clap::{Args, Parser, Subcommand};
use nym_bin_common::bin_info;
use nym_config::defaults::setup_env;
use nym_gateway_probe::config::{CredentialArgs, CredentialMode, ProbeConfig};
use nym_gateway_probe::{NymApiDirectory, ProbeResult, query_gateway_by_ip};
use nym_gateway_probe::config::{CredentialArgs, CredentialMode, NetstackArgs, ProbeConfig};
use nym_gateway_probe::{
NymApiDirectory, PortCheckResult, ProbeResult, RunPortsConfig, query_gateway_by_ip,
};
use nym_sdk::mixnet::NodeIdentity;
use serde::Serialize;
use std::path::Path;
use std::{path::PathBuf, sync::OnceLock};
use tracing::*;
@@ -28,7 +31,7 @@ struct CliArgs {
config_env_file: Option<PathBuf>,
/// Disable logging during probe
#[arg(long)]
#[arg(long, global = true)]
no_log: bool,
}
@@ -81,6 +84,35 @@ enum Commands {
probe_config: ProbeConfig,
},
/// Check WG exit policy ports on a bonded gateway.
/// Tests TCP connectivity through the WG tunnel for each port.
/// Use --check-ports to pick specific ports, or --check-all-ports for the full exit policy list.
RunPorts {
/// Directory for credential and mixnet storage
#[arg(long)]
config_dir: Option<PathBuf>,
/// Bonded gateway identity.
#[arg(long, short = 'g', alias = "gateway")]
entry_gateway: NodeIdentity,
/// Separate exit gateway to test (entry_gateway is used for mixnet entry).
#[arg(long)]
exit_gateway: Option<NodeIdentity>,
/// Test every port in the canonical exit policy (network-tunnel-manager.sh PORT_MAPPINGS).
/// Overrides --check-ports.
#[arg(long)]
check_all_ports: bool,
/// Arguments to manage credentials
#[command(flatten)]
credential_mode: CredentialMode,
#[command(flatten)]
probe_config: RunPortsProbeConfig,
},
/// Run the probe by NS agents
RunAgent {
/// The specific gateway specified by ID.
@@ -96,6 +128,29 @@ enum Commands {
},
}
#[derive(Debug, Args, Clone)]
struct RunPortsProbeConfig {
/// Only choose gateway with that minimum performance
#[arg(long)]
min_gateway_mixnet_performance: Option<u8>,
/// Ignore egress epoch role constraints
#[arg(long, global = true)]
ignore_egress_epoch_role: bool,
/// Arguments to manage netstack downloads and port checks
#[command(flatten)]
netstack_args: NetstackArgs,
}
/// CLI output wrapper — either a standard probe result or a port-check result
#[derive(Serialize)]
#[serde(untagged)]
pub(crate) enum ProbeOutput {
Standard(ProbeResult),
PortCheck(PortCheckResult),
}
fn setup_logging() {
// SAFETY: those are valid directives
#[allow(clippy::unwrap_used)]
@@ -112,7 +167,7 @@ fn setup_logging() {
.init();
}
pub(crate) async fn run() -> anyhow::Result<ProbeResult> {
pub(crate) async fn run() -> anyhow::Result<ProbeOutput> {
let args = CliArgs::parse();
if !args.no_log {
setup_logging();
@@ -122,7 +177,7 @@ pub(crate) async fn run() -> anyhow::Result<ProbeResult> {
setup_env(args.config_env_file.as_ref());
let network = nym_sdk::NymNetworkDetails::new_from_env();
info!("{:#?}", network);
debug!("{:#?}", network);
match args.command {
Commands::RunLocal {
@@ -165,7 +220,9 @@ pub(crate) async fn run() -> anyhow::Result<ProbeResult> {
let trial =
nym_gateway_probe::Probe::new(entry_details, exit_details, network, probe_config);
Box::pin(trial.probe_run_locally(&config_dir, credential_mode)).await
Box::pin(trial.probe_run_locally(&config_dir, credential_mode))
.await
.map(ProbeOutput::Standard)
}
Commands::Run {
entry_gateway,
@@ -209,7 +266,79 @@ pub(crate) async fn run() -> anyhow::Result<ProbeResult> {
let trial =
nym_gateway_probe::Probe::new(entry_details, exit_details, network, probe_config);
Box::pin(trial.probe_run(&config_dir, credential_mode)).await
Box::pin(trial.probe_run(&config_dir, credential_mode))
.await
.map(ProbeOutput::Standard)
}
Commands::RunPorts {
entry_gateway,
exit_gateway,
config_dir,
check_all_ports,
credential_mode,
probe_config,
} => {
let mut run_ports_config = RunPortsConfig {
min_gateway_mixnet_performance: probe_config.min_gateway_mixnet_performance,
ignore_egress_epoch_role: probe_config.ignore_egress_epoch_role,
netstack_args: probe_config.netstack_args,
};
// --check-all-ports overrides --check-ports with the full exit policy list
if check_all_ports {
use nym_gateway_probe::config::EXIT_POLICY_PORTS;
run_ports_config.netstack_args.port_check_ports = EXIT_POLICY_PORTS.to_vec();
info!(
"Using full exit policy port list ({} ports)",
EXIT_POLICY_PORTS.len()
);
}
let api_url = network
.endpoints
.first()
.and_then(|ep| ep.api_url())
.ok_or(anyhow::anyhow!("missing api url"))?;
let directory = NymApiDirectory::new(api_url).await?;
let entry_details = directory
.entry_gateway(&entry_gateway)?
.to_testable_node()?;
let exit_details = exit_gateway
.map(|id_key| directory.exit_gateway(&id_key))
.transpose()?
.map(|node| node.to_testable_node())
.transpose()?;
let config_dir = config_dir
.clone()
.unwrap_or_else(|| Path::new(DEFAULT_CONFIG_DIR).join(&network.network_name));
if config_dir.is_file() {
anyhow::bail!("provided configuration directory is a file");
}
if !config_dir.exists() {
std::fs::create_dir_all(config_dir.clone())?;
}
info!(
"using the following directory for the probe config: {}",
config_dir.display()
);
Box::pin(nym_gateway_probe::Probe::run_ports(
entry_details,
exit_details,
network,
&run_ports_config,
&config_dir,
credential_mode,
))
.await
.map(ProbeOutput::PortCheck)
}
Commands::RunAgent {
entry_gateway,
@@ -219,7 +348,9 @@ pub(crate) async fn run() -> anyhow::Result<ProbeResult> {
let trial =
nym_gateway_probe::Probe::new_for_agent(entry_gateway, network, probe_config)
.await?;
Box::pin(trial.probe_run_agent(credential_args)).await
Box::pin(trial.probe_run_agent(credential_args))
.await
.map(ProbeOutput::Standard)
}
}
}
@@ -3,7 +3,7 @@
[package]
name = "nym-node-status-agent"
version = "2.0.0"
version = "2.0.1"
authors.workspace = true
edition.workspace = true
license.workspace = true
@@ -0,0 +1,42 @@
use crate::cli::{ServerConfig, parse_server_config};
use anyhow::anyhow;
use nym_gateway_probe::config::CredentialArgs;
use nym_gateway_probe::types::{AttachedTicketMaterials, VersionedSerialise};
use nym_node_status_client::NsApiClient;
use nym_sdk::mixnet::ed25519::PublicKey;
pub(crate) fn parse_servers(raw: &[String]) -> anyhow::Result<Vec<ServerConfig>> {
raw.iter()
.map(|s| {
parse_server_config(s).map_err(|e| {
tracing::error!("Invalid server config '{}': {}", s, e);
anyhow!("Invalid server config '{}': {}", s, e)
})
})
.collect()
}
pub(crate) fn primary(servers: &[ServerConfig]) -> anyhow::Result<&ServerConfig> {
servers
.first()
.ok_or_else(|| anyhow!("No servers configured"))
}
pub(crate) fn build_client(server: &ServerConfig) -> NsApiClient {
let auth_key =
nym_crypto::asymmetric::ed25519::PrivateKey::from_bytes(&server.auth_key.to_bytes())
.expect("Failed to clone auth key");
NsApiClient::new(&server.address, server.port, auth_key)
}
pub(crate) fn parse_gateway_pubkey(key: &str) -> anyhow::Result<PublicKey> {
PublicKey::from_base58_string(key).map_err(|e| anyhow!("Failed to parse GW identity key: {e}"))
}
pub(crate) fn credential_args_from(materials: AttachedTicketMaterials) -> CredentialArgs {
CredentialArgs {
ticket_materials: materials.to_serialised_string(),
ticket_materials_revision:
<AttachedTicketMaterials as VersionedSerialise>::CURRENT_SERIALISATION_REVISION,
}
}
@@ -4,7 +4,9 @@ use nym_bin_common::bin_info;
use nym_crypto::asymmetric::ed25519::PrivateKey;
use std::{env, sync::OnceLock};
pub(crate) mod common;
pub(crate) mod generate_keypair;
pub(crate) mod run_ports_check;
pub(crate) mod run_probe;
#[derive(Debug)]
@@ -20,7 +22,7 @@ fn pretty_build_info_static() -> &'static str {
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
}
fn parse_server_config(s: &str) -> Result<ServerConfig, String> {
pub(super) fn parse_server_config(s: &str) -> Result<ServerConfig, String> {
let parts: Vec<&str> = s.split('|').collect();
if parts.len() != 2 {
return Err("Server config must be in format 'address|port'".to_string());
@@ -51,6 +53,7 @@ pub(crate) struct Args {
#[derive(Subcommand, Debug)]
pub(crate) enum Command {
RunProbe(RunProbeArgs),
RunPortsCheck(RunPortsCheckArgs),
GenerateKeypair {
#[arg(long)]
@@ -71,21 +74,31 @@ pub(crate) struct RunProbeArgs {
pub probe_config: nym_gateway_probe::config::ProbeConfig,
}
#[derive(clap::Args, Debug)]
pub(crate) struct RunPortsCheckArgs {
/// Server configurations in format "address|port"
/// Can be specified multiple times for multiple servers
#[arg(short, long, required = true)]
pub server: Vec<String>,
/// Only choose gateway with that minimum performance
#[arg(long)]
pub min_gateway_mixnet_performance: Option<u8>,
/// Ignore egress epoch role constraints
#[arg(long)]
pub ignore_egress_epoch_role: bool,
/// Arguments to manage netstack downloads and port checks
#[command(flatten)]
pub netstack_args: nym_gateway_probe::config::NetstackArgs,
}
impl Args {
pub(crate) async fn execute(self, log_capture: LogCapture) -> anyhow::Result<()> {
match self.command {
Command::RunProbe(args) => {
// Parse server configs
let mut servers = Vec::new();
for s in &args.server {
match parse_server_config(s) {
Ok(config) => servers.push(config),
Err(e) => {
tracing::error!("Invalid server config '{}': {}", s, e);
anyhow::bail!("Invalid server config '{}': {}", s, e);
}
}
}
let servers = common::parse_servers(&args.server)?;
run_probe::run_probe(&servers, args.probe_config, log_capture)
.await
@@ -93,6 +106,21 @@ impl Args {
tracing::error!("{err}");
})?
}
Command::RunPortsCheck(args) => {
let servers = common::parse_servers(&args.server)?;
run_ports_check::run_ports_check(
&servers,
args.min_gateway_mixnet_performance,
args.ignore_egress_epoch_role,
args.netstack_args,
log_capture,
)
.await
.inspect_err(|err| {
tracing::error!("{err}");
})?
}
Command::GenerateKeypair { path } => {
let path = path
.to_owned()
@@ -0,0 +1,131 @@
use crate::cli::ServerConfig;
use crate::cli::common;
use crate::log_capture::LogCapture;
use nym_gateway_probe::RunPortsConfig;
use tracing::instrument;
pub(crate) async fn run_ports_check(
servers: &[ServerConfig],
min_gateway_mixnet_performance: Option<u8>,
ignore_egress_epoch_role: bool,
mut netstack_args: nym_gateway_probe::config::NetstackArgs,
log_capture: LogCapture,
) -> anyhow::Result<()> {
let primary_server = common::primary(servers)?;
tracing::info!(
"Requesting ports-check testrun from primary server: {}:{}",
primary_server.address,
primary_server.port
);
let ns_api_client = common::build_client(primary_server);
let testrun = match ns_api_client.request_ports_check_testrun().await {
Ok(Some(testrun)) => testrun,
Ok(None) => {
tracing::info!("No ports-check testruns available from primary server");
return Ok(());
}
Err(err) => {
tracing::error!("Failed to contact primary server: {err}");
return Err(err);
}
};
let testrun_id = testrun.assignment.testrun_id;
let testrun_assigned_at = testrun.assignment.assigned_at_utc;
let gateway_identity_key = testrun.assignment.gateway_identity_key.clone();
let gateway_identity_pubkey = common::parse_gateway_pubkey(&gateway_identity_key)?;
tracing::info!(
"Received ports-check testrun {testrun_id} for gateway {gateway_identity_key} from primary",
);
let network = nym_sdk::NymNetworkDetails::new_from_env();
// Force full exit policy list for this job kind
netstack_args.port_check_ports = nym_gateway_probe::config::EXIT_POLICY_PORTS.to_vec();
let run_ports_config = RunPortsConfig {
min_gateway_mixnet_performance,
ignore_egress_epoch_role,
netstack_args,
};
let credentials_args = common::credential_args_from(testrun.ticket_materials);
log_capture.start();
let port_check_result = nym_gateway_probe::Probe::run_ports_for_agent(
gateway_identity_pubkey,
network,
&run_ports_config,
credentials_args,
)
.await?;
let probe_log = log_capture.stop_and_drain();
submit_ports_check_results_to_servers(
servers,
testrun_id,
testrun_assigned_at,
&gateway_identity_key,
port_check_result,
probe_log,
)
.await;
Ok(())
}
#[instrument(level = "info", skip_all, fields(gateway_id = %gateway_identity_key, testrun = testrun_id))]
async fn submit_ports_check_results_to_servers(
servers: &[ServerConfig],
testrun_id: i32,
testrun_assigned_at: i64,
gateway_identity_key: &str,
port_check_result: nym_gateway_probe::PortCheckResult,
probe_log: String,
) {
let handles = servers
.iter()
.enumerate()
.map(|(idx, server)| {
let port_check_result = port_check_result.clone();
let probe_log = probe_log.clone();
let gateway_identity_key = gateway_identity_key.to_string();
async move {
let client = common::build_client(server);
let result = client
.submit_ports_check_results_with_context(
testrun_id,
port_check_result,
probe_log,
testrun_assigned_at,
gateway_identity_key,
)
.await;
(idx, server.address.clone(), server.port, result)
}
})
.collect::<Vec<_>>();
let results = futures::future::join_all(handles).await;
for (index, server_address, server_port, result) in results {
match result {
Ok(()) => {
tracing::info!(
"✅ Successfully submitted ports-check to server[{index}] {server_address}:{server_port}",
);
}
Err(e) => {
tracing::warn!(
"❌ Failed to submit ports-check to server[{index}] {server_address}:{server_port} - {e}"
);
}
}
}
}
@@ -1,9 +1,6 @@
use crate::cli::ServerConfig;
use crate::cli::common;
use crate::log_capture::LogCapture;
use anyhow::anyhow;
use nym_gateway_probe::config::CredentialArgs;
use nym_gateway_probe::types::{AttachedTicketMaterials, VersionedSerialise};
use nym_sdk::mixnet::ed25519::PublicKey;
use tracing::instrument;
pub(crate) async fn run_probe(
@@ -11,27 +8,14 @@ pub(crate) async fn run_probe(
probe_config: nym_gateway_probe::config::ProbeConfig,
log_capture: LogCapture,
) -> anyhow::Result<()> {
if servers.is_empty() {
anyhow::bail!("No servers configured");
}
// Always use first server as primary
let primary_server = &servers[0];
let primary_server = common::primary(servers)?;
tracing::info!(
"Requesting testrun from primary server: {}:{}",
primary_server.address,
primary_server.port
);
let auth_key = nym_crypto::asymmetric::ed25519::PrivateKey::from_bytes(
&primary_server.auth_key.to_bytes(),
)
.expect("Failed to clone auth key");
let ns_api_client = nym_node_status_client::NsApiClient::new(
&primary_server.address,
primary_server.port,
auth_key,
);
let ns_api_client = common::build_client(primary_server);
let testrun = match ns_api_client.request_testrun().await {
Ok(Some(testrun)) => testrun,
@@ -48,8 +32,7 @@ pub(crate) async fn run_probe(
let testrun_id = testrun.assignment.testrun_id;
let testrun_assigned_at = testrun.assignment.assigned_at_utc;
let gateway_identity_key = testrun.assignment.gateway_identity_key.clone();
let gateway_identity_pubkey = PublicKey::from_base58_string(gateway_identity_key.clone())
.map_err(|e| anyhow!("Failed to parse GW identity key: {e}"))?;
let gateway_identity_pubkey = common::parse_gateway_pubkey(&gateway_identity_key)?;
tracing::info!("Received testrun {testrun_id} for gateway {gateway_identity_key} from primary",);
@@ -61,12 +44,7 @@ pub(crate) async fn run_probe(
// probe constructor might modify config to suit the testing mode, so log afterwards
tracing::info!("Using probe config:\n{:#?}", &probe.config());
let serialized_ticket_materials = testrun.ticket_materials.to_serialised_string();
let credentials_args = CredentialArgs {
ticket_materials: serialized_ticket_materials,
ticket_materials_revision:
<AttachedTicketMaterials as VersionedSerialise>::CURRENT_SERIALISATION_REVISION,
};
let credentials_args = common::credential_args_from(testrun.ticket_materials);
// Run the probe, capturing all tracing output
log_capture.start();
@@ -110,15 +88,7 @@ async fn submit_results_to_servers(
let gateway_identity_key = gateway_identity_key.to_string();
async move {
let auth_key = nym_crypto::asymmetric::ed25519::PrivateKey::from_bytes(
&server.auth_key.to_bytes(),
)
.expect("Failed to clone auth key");
let client = nym_node_status_client::NsApiClient::new(
&server.address,
server.port,
auth_key,
);
let client = common::build_client(server);
let result = if idx == 0 {
// Primary server: submit regular results without context
@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "SELECT\n id,\n gateway_id,\n status,\n created_utc,\n ip_address,\n log,\n last_assigned_utc\n FROM testruns\n WHERE id = $1",
"query": "SELECT\n id,\n gateway_id,\n status,\n kind,\n created_utc,\n ip_address,\n log,\n last_assigned_utc\n FROM testruns\n WHERE gateway_id = $1 AND status != 2 AND kind = $2\n ORDER BY id DESC\n LIMIT 1",
"describe": {
"columns": [
{
@@ -20,28 +20,34 @@
},
{
"ordinal": 3,
"name": "kind",
"type_info": "Int2"
},
{
"ordinal": 4,
"name": "created_utc",
"type_info": "Int8"
},
{
"ordinal": 4,
"ordinal": 5,
"name": "ip_address",
"type_info": "Varchar"
},
{
"ordinal": 5,
"ordinal": 6,
"name": "log",
"type_info": "Varchar"
},
{
"ordinal": 6,
"ordinal": 7,
"name": "last_assigned_utc",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Int4"
"Int4",
"Int2"
]
},
"nullable": [
@@ -51,8 +57,9 @@
false,
false,
false,
false,
true
]
},
"hash": "ddf003e7e13653388f487e3adfc1aad0a285e29c797f99ec00bcccb063f76b64"
"hash": "0c635c539930c10a9be35a12ba3f2a66aae5be1c37af2eca521bf75261cecf28"
}
@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO testruns (\n id,\n gateway_id,\n status,\n created_utc,\n last_assigned_utc,\n ip_address,\n log\n ) VALUES ($1, $2, $3, $4, $5, $6, $7)",
"query": "INSERT INTO testruns (\n id,\n gateway_id,\n status,\n kind,\n created_utc,\n last_assigned_utc,\n ip_address,\n log\n ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
"describe": {
"columns": [],
"parameters": {
@@ -8,6 +8,7 @@
"Int4",
"Int4",
"Int4",
"Int2",
"Int8",
"Int8",
"Varchar",
@@ -16,5 +17,5 @@
},
"nullable": []
},
"hash": "88a4554c2857288c314768c56648a5f1811d2053582380ca602335a122cef8db"
"hash": "1dc7d1e6c2173cd1ca70d699124c2b71a1a91ef114ffdd0572a738b577fab15d"
}
@@ -1,30 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n WITH oldest_queued AS (\n SELECT id\n FROM testruns\n WHERE status = $1\n ORDER BY created_utc asc\n LIMIT 1\n FOR UPDATE SKIP LOCKED\n )\n UPDATE testruns\n SET\n status = $3,\n last_assigned_utc = $2\n FROM oldest_queued\n WHERE testruns.id = oldest_queued.id\n RETURNING\n testruns.id,\n testruns.gateway_id\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int4"
},
{
"ordinal": 1,
"name": "gateway_id",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Int4",
"Int8",
"Int4"
]
},
"nullable": [
false,
false
]
},
"hash": "38439a6c33bf21b90032659797105b6af747c3ae6f48bc41e5ec509a2d87abcc"
}
@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE gateways SET last_ports_check_utc = $1, last_updated_utc = $1 WHERE id = $2",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8",
"Int4"
]
},
"nullable": []
},
"hash": "3c6da5c1faace026222bbb28553d8ea15b957dd19b097101dcda2c05aee21d20"
}
@@ -0,0 +1,18 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO testruns (gateway_id, status, kind, created_utc, last_assigned_utc, ip_address, log)\n SELECT\n gw.id,\n $1,\n $2,\n $3,\n NULL,\n 'ports_check_scheduler',\n ''\n FROM gateways gw\n WHERE gw.bonded = true\n AND (gw.last_ports_check_utc IS NULL OR gw.last_ports_check_utc < $4)\n AND NOT EXISTS (\n SELECT 1\n FROM testruns t\n WHERE t.gateway_id = gw.id\n AND t.kind = $2\n AND t.status IN ($1, $5)\n )\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int4",
"Int2",
"Int8",
"Int8",
"Int4"
]
},
"nullable": []
},
"hash": "65b5a3ed9a73f1304badfa8d9bba57fb24f6b21655039a9bfa21c44409c87910"
}
@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "SELECT\n id as \"id!\",\n gateway_id as \"gateway_id!\",\n status as \"status!\",\n created_utc as \"created_utc!\",\n ip_address as \"ip_address!\",\n log as \"log!\",\n last_assigned_utc\n FROM testruns\n WHERE\n id = $1\n AND\n status = $2\n ORDER BY created_utc\n LIMIT 1",
"query": "SELECT\n id as \"id!\",\n gateway_id as \"gateway_id!\",\n status as \"status!\",\n kind as \"kind!\",\n created_utc as \"created_utc!\",\n ip_address as \"ip_address!\",\n log as \"log!\",\n last_assigned_utc\n FROM testruns\n WHERE\n id = $1\n AND\n status = $2\n ORDER BY created_utc\n LIMIT 1",
"describe": {
"columns": [
{
@@ -20,21 +20,26 @@
},
{
"ordinal": 3,
"name": "kind!",
"type_info": "Int2"
},
{
"ordinal": 4,
"name": "created_utc!",
"type_info": "Int8"
},
{
"ordinal": 4,
"ordinal": 5,
"name": "ip_address!",
"type_info": "Varchar"
},
{
"ordinal": 5,
"ordinal": 6,
"name": "log!",
"type_info": "Varchar"
},
{
"ordinal": 6,
"ordinal": 7,
"name": "last_assigned_utc",
"type_info": "Int8"
}
@@ -52,8 +57,9 @@
false,
false,
false,
false,
true
]
},
"hash": "d41cafc76cb49c03df7452c405a4e2e5e3951c41dc35c20261c1d959c0d6403f"
"hash": "75bdccbff8ea6a59d56230075163783249a03f2ae04ec89f734e67ceb897a385"
}
@@ -0,0 +1,31 @@
{
"db_name": "PostgreSQL",
"query": "\n WITH oldest_queued AS (\n SELECT id\n FROM testruns\n WHERE status = $1 AND kind = $4\n ORDER BY created_utc asc\n LIMIT 1\n FOR UPDATE SKIP LOCKED\n )\n UPDATE testruns\n SET\n status = $3,\n last_assigned_utc = $2\n FROM oldest_queued\n WHERE testruns.id = oldest_queued.id\n RETURNING\n testruns.id,\n testruns.gateway_id\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int4"
},
{
"ordinal": 1,
"name": "gateway_id",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Int4",
"Int8",
"Int4",
"Int2"
]
},
"nullable": [
false,
false
]
},
"hash": "ad78bf2f7ed4666b4e5f60811f04cb8bf8ebb1c26839f915780d0fc0d1939973"
}
@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "SELECT\n id,\n gateway_id,\n status,\n created_utc,\n ip_address,\n log,\n last_assigned_utc\n FROM testruns\n WHERE gateway_id = $1 AND status != 2\n ORDER BY id DESC\n LIMIT 1",
"query": "SELECT\n id,\n gateway_id,\n status,\n kind,\n created_utc,\n ip_address,\n log,\n last_assigned_utc\n FROM testruns\n WHERE id = $1",
"describe": {
"columns": [
{
@@ -20,21 +20,26 @@
},
{
"ordinal": 3,
"name": "kind",
"type_info": "Int2"
},
{
"ordinal": 4,
"name": "created_utc",
"type_info": "Int8"
},
{
"ordinal": 4,
"ordinal": 5,
"name": "ip_address",
"type_info": "Varchar"
},
{
"ordinal": 5,
"ordinal": 6,
"name": "log",
"type_info": "Varchar"
},
{
"ordinal": 6,
"ordinal": 7,
"name": "last_assigned_utc",
"type_info": "Int8"
}
@@ -51,8 +56,9 @@
false,
false,
false,
false,
true
]
},
"hash": "f343df183767af9815847cb94ccbd484010a7346de03f1e0959a09a964344de8"
"hash": "b32d29ca3cedae93fa2fcd44cc301614edc4e967f348c57f485f736b8f2f03e8"
}
@@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "SELECT last_probe_result FROM gateways WHERE id = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "last_probe_result",
"type_info": "Varchar"
}
],
"parameters": {
"Left": [
"Int4"
]
},
"nullable": [
true
]
},
"hash": "cc84f8a284c9aba4494152ff4e6087af53e9504a8bb13280dde5c37b9b418370"
}
@@ -3,7 +3,7 @@
[package]
name = "nym-node-status-api"
version = "4.6.1"
version = "4.6.2"
authors.workspace = true
edition.workspace = true
license.workspace = true
@@ -39,6 +39,19 @@ test-db-wait: ## Wait for the PostgreSQL database to be healthy
sleep 1; \
done; \
echo " Database is healthy!"
@echo "Waiting for host port to be ready..."
@for i in $$(seq 1 30); do \
if pg_isready -h localhost -p 5433 -U testuser -d nym_node_status_api_test >/dev/null 2>&1; then \
echo " Host port is ready!"; \
break; \
fi; \
if [ $$i -eq 30 ]; then \
echo " Timed out waiting for host port"; \
exit 1; \
fi; \
echo -n "."; \
sleep 1; \
done
.PHONY: test-db-down
test-db-down: ## Stop and remove the test database
@@ -0,0 +1,8 @@
ALTER TABLE gateways
ADD COLUMN IF NOT EXISTS last_ports_check_utc BIGINT;
ALTER TABLE testruns
ADD COLUMN IF NOT EXISTS kind SMALLINT NOT NULL DEFAULT 0;
CREATE INDEX IF NOT EXISTS idx_testruns_kind_status_created
ON testruns (kind, status, created_utc);
@@ -0,0 +1,7 @@
-- Persistent port-scan results (e.g. exit policy) live beside last_probe_result in the API,
-- not embedded in the probe JSON blob, so frequent probe runs do not wipe 14-day port checks.
ALTER TABLE gateways
ADD COLUMN IF NOT EXISTS ports_check JSONB;
ALTER TABLE gateways
ADD COLUMN IF NOT EXISTS last_ports_check_utc BIGINT;
@@ -0,0 +1,14 @@
-- Exit port check summary lives in `ports_check`, not inside `last_probe_result` JSON.
ALTER TABLE gateways ADD COLUMN IF NOT EXISTS ports_check JSONB;
UPDATE gateways
SET ports_check = (last_probe_result::jsonb -> 'ports_check')
WHERE last_probe_result IS NOT NULL
AND btrim(last_probe_result) <> ''
AND last_probe_result::jsonb ? 'ports_check';
UPDATE gateways
SET last_probe_result = (last_probe_result::jsonb - 'ports_check')::text
WHERE last_probe_result IS NOT NULL
AND btrim(last_probe_result) <> ''
AND last_probe_result::jsonb ? 'ports_check';
@@ -0,0 +1,6 @@
-- `20260416000000` may have copied `ports_check` from legacy JSON without setting `last_ports_check_utc`.
UPDATE gateways
SET last_ports_check_utc = last_updated_utc
WHERE ports_check IS NOT NULL
AND jsonb_typeof(ports_check) = 'object'
AND last_ports_check_utc IS NULL;
@@ -82,6 +82,15 @@ pub(crate) struct Cli {
#[arg(value_parser = parse_duration_std)]
pub(crate) testruns_refresh_interval: Duration,
/// 2 hour safety net for test runs which include a port check
#[clap(
long,
default_value = "7200",
env = "NODE_STATUS_API_TESTRUN_STALE_IN_PROGRESS"
)]
#[arg(value_parser = parse_duration_std)]
pub(crate) testruns_stale_in_progress: Duration,
#[clap(long, default_value = "86400", env = "NODE_STATUS_API_GEODATA_TTL")]
#[arg(value_parser = parse_duration_std)]
pub(crate) geodata_ttl: Duration,
@@ -18,6 +18,123 @@ use strum_macros::{EnumString, FromRepr};
use time::{Date, OffsetDateTime, UtcDateTime};
use utoipa::ToSchema;
pub(crate) fn detach_ports_check_from_probe_json(
mut probe: serde_json::Value,
) -> (serde_json::Value, Option<serde_json::Value>) {
let nested = match &mut probe {
serde_json::Value::Object(map) => map.remove("ports_check").filter(|v| !v.is_null()),
_ => None,
};
(probe, nested)
}
fn strip_wg_port_check_results_from_last_probe(value: &mut serde_json::Value) {
let Some(outcome) = value.get_mut("outcome").and_then(|o| o.as_object_mut()) else {
return;
};
let Some(wg) = outcome.get_mut("wg").and_then(|w| w.as_object_mut()) else {
return;
};
wg.remove("port_check_results");
}
fn build_ports_check_summary_json(
can_register: bool,
port_check_target: Option<String>,
ports: Option<&serde_json::Map<String, serde_json::Value>>,
error: Option<String>,
) -> serde_json::Value {
let failed_ports = ports
.map(|ports| {
ports
.iter()
.filter_map(|(port, open)| open.as_bool().filter(|is_open| !is_open).map(|_| port))
.cloned()
.collect::<Vec<_>>()
})
.unwrap_or_default();
let has_ports = ports.is_some_and(|p| !p.is_empty());
let all_pass = can_register && failed_ports.is_empty() && has_ports;
serde_json::json!({
"all_pass": all_pass,
"error": error,
"port_check_target": port_check_target,
"failed_ports": failed_ports,
})
}
pub(crate) fn ports_check_summary_json_from_result(
result: &nym_gateway_probe::PortCheckResult,
) -> serde_json::Value {
let ports = result
.ports
.iter()
.map(|(k, v)| (k.clone(), serde_json::Value::Bool(*v)))
.collect::<serde_json::Map<_, _>>();
build_ports_check_summary_json(
result.can_register,
Some(result.port_check_target.clone()),
Some(&ports),
result.error.clone(),
)
}
pub(crate) fn normalize_ports_check_payload(value: serde_json::Value) -> Option<serde_json::Value> {
let serde_json::Value::Object(map) = value else {
return None;
};
// New shape is already in place; pass through untouched.
if map.contains_key("all_pass")
&& map.contains_key("error")
&& map.contains_key("port_check_target")
&& map.contains_key("failed_ports")
{
return Some(serde_json::Value::Object(map));
}
// Legacy dedicated shape: { gateway, can_register, port_check_target, ports, error }
if map.contains_key("can_register") && map.contains_key("ports") {
let can_register = map
.get("can_register")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);
let port_check_target = map
.get("port_check_target")
.and_then(serde_json::Value::as_str)
.map(ToOwned::to_owned);
let ports = map.get("ports").and_then(serde_json::Value::as_object);
let error = map
.get("error")
.and_then(serde_json::Value::as_str)
.map(ToOwned::to_owned);
return Some(build_ports_check_summary_json(
can_register,
port_check_target,
ports,
error,
));
}
if map.contains_key("all_pass") && map.contains_key("failed_ports") {
let mut normalized = map;
normalized.remove("ports_tested");
normalized
.entry("port_check_target".to_string())
.or_insert(serde_json::Value::Null);
normalized
.entry("error".to_string())
.or_insert(serde_json::Value::Null);
return Some(serde_json::Value::Object(normalized));
}
Some(serde_json::Value::Object(map))
}
macro_rules! serialize_opt_to_value {
($var:expr) => {{
match $var {
@@ -48,6 +165,8 @@ pub(crate) struct GatewayDto {
pub(crate) explorer_pretty_bond: Option<String>,
pub(crate) last_probe_result: Option<String>,
pub(crate) last_probe_log: Option<String>,
pub(crate) ports_check: Option<serde_json::Value>,
pub(crate) last_ports_check_utc: Option<i64>,
pub(crate) last_testrun_utc: Option<i64>,
pub(crate) last_updated_utc: i64,
pub(crate) moniker: String,
@@ -73,7 +192,7 @@ impl TryFrom<GatewayDto> for http::models::Gateway {
.explorer_pretty_bond
.clone()
.unwrap_or("null".to_string());
let last_probe_result = value
let last_probe_result_raw = value
.last_probe_result
.clone()
.unwrap_or("null".to_string());
@@ -81,7 +200,27 @@ impl TryFrom<GatewayDto> for http::models::Gateway {
let self_described = serde_json::from_str(&self_described).unwrap_or(None);
let explorer_pretty_bond = serde_json::from_str(&explorer_pretty_bond).unwrap_or(None);
let last_probe_result = serde_json::from_str(&last_probe_result).unwrap_or(None);
let last_probe_parsed =
serde_json::from_str::<serde_json::Value>(&last_probe_result_raw).ok();
let (last_probe_result, nested_ports) = last_probe_parsed
.map(detach_ports_check_from_probe_json)
.map(|(v, n)| {
let mut v = v;
strip_wg_port_check_results_from_last_probe(&mut v);
let v = (!v.is_null()).then_some(v);
(v, n)
})
.unwrap_or((None, None));
let ports_check = value
.ports_check
.clone()
.or(nested_ports)
.filter(|v| !v.is_null())
.and_then(normalize_ports_check_payload);
let last_ports_check_utc = value
.last_ports_check_utc
.map(unix_timestamp_to_utc_rfc3339);
let bonded = value.bonded;
let performance = value.performance as u8;
@@ -104,6 +243,8 @@ impl TryFrom<GatewayDto> for http::models::Gateway {
description,
last_probe_result,
last_probe_log,
ports_check,
last_ports_check_utc,
routing_score,
config_score,
last_testrun_utc,
@@ -292,6 +433,7 @@ pub struct TestRunDto {
pub id: i32,
pub gateway_id: i32,
pub status: i32,
pub kind: i16,
pub created_utc: i64,
pub ip_address: String,
pub log: String,
@@ -306,6 +448,13 @@ pub(crate) enum TestRunStatus {
Queued = 0,
}
#[derive(Debug, Clone, Copy, strum_macros::Display, EnumString, FromRepr, PartialEq, Eq)]
#[repr(i16)]
pub(crate) enum TestRunKind {
Probe = 0,
PortsCheck = 1,
}
#[derive(Debug, Clone)]
pub struct GatewayIdentityDto {
pub gateway_identity_key: String,
@@ -77,23 +77,24 @@ pub(crate) async fn update_bonded_gateways(
pub(crate) async fn get_all_gateways(pool: &DbPool) -> anyhow::Result<Vec<Gateway>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query_as!(
GatewayDto,
let items = sqlx::query_as::<_, GatewayDto>(
r#"SELECT
gw.gateway_identity_key as "gateway_identity_key!",
gw.bonded as "bonded: bool",
gw.performance as "performance!",
gw.self_described as "self_described?",
gw.explorer_pretty_bond as "explorer_pretty_bond?",
gw.last_probe_result as "last_probe_result?",
gw.last_probe_log as "last_probe_log?",
gw.last_testrun_utc as "last_testrun_utc?",
gw.last_updated_utc as "last_updated_utc!",
gw.bridges as "bridges?: serde_json::Value",
COALESCE(gd.moniker, 'NA') as "moniker!",
COALESCE(gd.website, 'NA') as "website!",
COALESCE(gd.security_contact, 'NA') as "security_contact!",
COALESCE(gd.details, 'NA') as "details!"
gw.gateway_identity_key,
gw.bonded,
gw.performance,
gw.self_described,
gw.explorer_pretty_bond,
gw.last_probe_result,
gw.last_probe_log,
gw.ports_check,
gw.last_ports_check_utc,
gw.last_testrun_utc,
gw.last_updated_utc,
COALESCE(gd.moniker, 'NA') AS moniker,
COALESCE(gd.security_contact, 'NA') AS security_contact,
COALESCE(gd.details, 'NA') AS details,
COALESCE(gd.website, 'NA') AS website,
gw.bridges
FROM gateways gw
LEFT JOIN gateway_description gd
ON gw.gateway_identity_key = gd.gateway_identity_key
@@ -1,8 +1,10 @@
use crate::db::DbConnection;
use crate::db::DbPool;
use crate::db::models::{TestRunDto, TestRunStatus};
use crate::db::models::{TestRunDto, TestRunKind, TestRunStatus};
use crate::http::models::TestrunAssignment;
use crate::utils::now_utc;
use sqlx::Row;
use sqlx::types::Json;
use time::Duration;
pub(crate) async fn count_testruns_in_progress(
@@ -32,6 +34,7 @@ pub(crate) async fn get_in_progress_testrun_by_id(
id as "id!",
gateway_id as "gateway_id!",
status as "status!",
kind as "kind!",
created_utc as "created_utc!",
ip_address as "ip_address!",
log as "log!",
@@ -90,6 +93,19 @@ pub(crate) async fn update_testruns_assigned_before(
pub(crate) async fn assign_oldest_testrun(
conn: &mut DbConnection,
) -> anyhow::Result<Option<TestrunAssignment>> {
assign_oldest_testrun_by_kind(conn, TestRunKind::Probe).await
}
pub(crate) async fn assign_oldest_ports_check_testrun(
conn: &mut DbConnection,
) -> anyhow::Result<Option<TestrunAssignment>> {
assign_oldest_testrun_by_kind(conn, TestRunKind::PortsCheck).await
}
async fn assign_oldest_testrun_by_kind(
conn: &mut DbConnection,
kind: TestRunKind,
) -> anyhow::Result<Option<TestrunAssignment>> {
let now = now_utc().unix_timestamp();
// find & mark as "In progress" in the same transaction to avoid race conditions
@@ -99,7 +115,7 @@ pub(crate) async fn assign_oldest_testrun(
WITH oldest_queued AS (
SELECT id
FROM testruns
WHERE status = $1
WHERE status = $1 AND kind = $4
ORDER BY created_utc asc
LIMIT 1
FOR UPDATE SKIP LOCKED
@@ -117,28 +133,27 @@ pub(crate) async fn assign_oldest_testrun(
TestRunStatus::Queued as i32,
now,
TestRunStatus::InProgress as i32,
kind as i16,
)
.fetch_optional(conn.as_mut())
.await?;
if let Some(testrun) = returning {
let gw_identity = sqlx::query!(
r#"
SELECT
id,
gateway_identity_key
FROM gateways
WHERE id = $1
LIMIT 1"#,
testrun.gateway_id
let row = sqlx::query(
r#"SELECT gateway_identity_key, last_ports_check_utc FROM gateways WHERE id = $1 LIMIT 1"#,
)
.bind(testrun.gateway_id)
.fetch_one(conn.as_mut())
.await?;
let gateway_identity_key: String = row.try_get("gateway_identity_key")?;
let last_ports_check_utc: Option<i64> = row.try_get("last_ports_check_utc")?;
Ok(Some(TestrunAssignment {
testrun_id: testrun.id,
gateway_identity_key: gw_identity.gateway_identity_key,
gateway_identity_key,
assigned_at_utc: now,
last_ports_check_utc,
}))
} else {
Ok(None)
@@ -183,17 +198,138 @@ pub(crate) async fn update_gateway_last_probe_result(
gateway_pk: i32,
result: &str,
) -> anyhow::Result<()> {
sqlx::query!(
"UPDATE gateways SET last_probe_result = $1 WHERE id = $2",
result,
gateway_pk,
use crate::db::models::detach_ports_check_from_probe_json;
let value: serde_json::Value = serde_json::from_str(result)
.map_err(|e| anyhow::anyhow!("Invalid probe result JSON: {e}"))?;
let (stripped_json, ports_check) = detach_ports_check_from_probe_json(value);
let stripped = serde_json::to_string(&stripped_json)?;
let now_ts = crate::utils::now_utc().unix_timestamp();
let ports_check_ts = ports_check.as_ref().map(|_| now_ts);
sqlx::query(
r#"UPDATE gateways SET
last_probe_result = $1,
ports_check = COALESCE($2, ports_check),
last_ports_check_utc = CASE WHEN $2 IS NOT NULL THEN $3 ELSE last_ports_check_utc END
WHERE id = $4"#,
)
.bind(&stripped)
.bind(ports_check.map(Json))
.bind(ports_check_ts)
.bind(gateway_pk)
.execute(conn.as_mut())
.await
.map(drop)
.map_err(From::from)
}
// NOTE: port-check submissions must not re-embed `ports_check` into `last_probe_result`.
pub(crate) async fn update_gateway_ports_check_only(
conn: &mut DbConnection,
gateway_pk: i32,
port_check_result: &nym_gateway_probe::PortCheckResult,
) -> anyhow::Result<()> {
use crate::db::models::ports_check_summary_json_from_result;
let now_ts = now_utc().unix_timestamp();
let value = ports_check_summary_json_from_result(port_check_result);
sqlx::query(
r#"UPDATE gateways SET
ports_check = $1,
last_ports_check_utc = $2
WHERE id = $3"#,
)
.bind(Json(value))
.bind(now_ts)
.bind(gateway_pk)
.execute(conn.as_mut())
.await
.map(drop)
.map_err(From::from)
}
pub(crate) async fn insert_external_ports_check_testrun(
conn: &mut DbConnection,
testrun_id: i32,
gateway_id: i32,
assigned_at_utc: i64,
) -> anyhow::Result<()> {
let now = now_utc().unix_timestamp();
sqlx::query!(
r#"INSERT INTO testruns (
id,
gateway_id,
status,
kind,
created_utc,
last_assigned_utc,
ip_address,
log
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"#,
testrun_id,
gateway_id,
TestRunStatus::InProgress as i32,
TestRunKind::PortsCheck as i16,
now,
assigned_at_utc,
"external",
""
)
.execute(conn.as_mut())
.await?;
tracing::debug!(
"Created external ports-check testrun {} for gateway {}",
testrun_id,
gateway_id
);
Ok(())
}
pub(crate) async fn enqueue_due_ports_check_testruns(db: &DbPool) -> anyhow::Result<u64> {
let mut conn = db.acquire().await?;
let now = now_utc().unix_timestamp();
// 14 days soft TTL for dedicated ports-check queueing.
let cutoff = now - time::Duration::days(14).whole_seconds();
let res = sqlx::query!(
r#"
INSERT INTO testruns (gateway_id, status, kind, created_utc, last_assigned_utc, ip_address, log)
SELECT
gw.id,
$1,
$2,
$3,
NULL,
'ports_check_scheduler',
''
FROM gateways gw
WHERE gw.bonded = true
AND (gw.last_ports_check_utc IS NULL OR gw.last_ports_check_utc < $4)
AND NOT EXISTS (
SELECT 1
FROM testruns t
WHERE t.gateway_id = gw.id
AND t.kind = $2
AND t.status IN ($1, $5)
)
"#,
TestRunStatus::Queued as i32,
TestRunKind::PortsCheck as i16,
now,
cutoff,
TestRunStatus::InProgress as i32,
)
.execute(conn.as_mut())
.await?;
Ok(res.rows_affected())
}
pub(crate) async fn update_gateway_score(
conn: &mut DbConnection,
gateway_pk: i32,
@@ -221,6 +357,7 @@ pub(crate) async fn get_testrun_by_id(
id,
gateway_id,
status,
kind,
created_utc,
ip_address,
log,
@@ -247,14 +384,16 @@ pub(crate) async fn insert_external_testrun(
id,
gateway_id,
status,
kind,
created_utc,
last_assigned_utc,
ip_address,
log
) VALUES ($1, $2, $3, $4, $5, $6, $7)"#,
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"#,
testrun_id,
gateway_id,
TestRunStatus::InProgress as i32,
TestRunKind::Probe as i16,
now,
assigned_at_utc,
"external", // Marker for external origin
@@ -11,6 +11,8 @@ mod db_tests {
explorer_pretty_bond: Some("{\"key\":\"value\"}".to_string()),
last_probe_result: Some("{\"key\":\"value\"}".to_string()),
last_probe_log: Some("log".to_string()),
ports_check: None,
last_ports_check_utc: None,
last_testrun_utc: Some(1672531200),
last_updated_utc: 1672531200,
moniker: "moniker".to_string(),
@@ -37,6 +39,138 @@ mod db_tests {
assert_eq!(http_gateway.description.details, "details");
}
#[test]
fn test_gateway_dto_nested_ports_check_detached_to_top_level() {
let ports_payload = serde_json::json!({"https": 443, "wg": 51822});
let mut probe = serde_json::json!({
"node": "n1",
"used_entry": "e1",
"outcome": {"as_entry": "NotTested"},
"ports_check": ports_payload
});
let gateway_dto = crate::db::models::GatewayDto {
gateway_identity_key: "id1".to_string(),
bonded: true,
performance: 50,
self_described: Some("{}".to_string()),
explorer_pretty_bond: Some("{}".to_string()),
last_probe_result: Some(probe.to_string()),
last_probe_log: None,
ports_check: None,
last_ports_check_utc: None,
last_testrun_utc: None,
last_updated_utc: 1672531200,
moniker: "m".to_string(),
security_contact: "c".to_string(),
details: "d".to_string(),
website: "w".to_string(),
bridges: None,
};
let http_gateway: crate::http::models::Gateway = gateway_dto.try_into().unwrap();
assert_eq!(http_gateway.ports_check, Some(ports_payload));
assert!(http_gateway.last_ports_check_utc.is_none());
probe.as_object_mut().unwrap().remove("ports_check");
assert_eq!(http_gateway.last_probe_result, Some(probe));
}
#[test]
fn test_gateway_dto_normalizes_legacy_dedicated_ports_check_shape() {
let legacy_ports = serde_json::json!({
"gateway": "gw1",
"can_register": true,
"port_check_target": "portquiz.net",
"ports": {
"20": false,
"21": true,
"43": false
},
"error": null
});
let gateway_dto = crate::db::models::GatewayDto {
gateway_identity_key: "id1".to_string(),
bonded: true,
performance: 50,
self_described: Some("{}".to_string()),
explorer_pretty_bond: Some("{}".to_string()),
last_probe_result: None,
last_probe_log: None,
ports_check: Some(legacy_ports),
last_ports_check_utc: Some(1672531200),
last_testrun_utc: None,
last_updated_utc: 1672531200,
moniker: "m".to_string(),
security_contact: "c".to_string(),
details: "d".to_string(),
website: "w".to_string(),
bridges: None,
};
let http_gateway: crate::http::models::Gateway = gateway_dto.try_into().unwrap();
let normalized = http_gateway.ports_check.unwrap();
assert_eq!(
normalized.get("all_pass"),
Some(&serde_json::Value::Bool(false))
);
assert_eq!(
normalized.get("port_check_target"),
Some(&serde_json::Value::String("portquiz.net".to_string()))
);
assert_eq!(normalized.get("error"), Some(&serde_json::Value::Null));
assert_eq!(
normalized.get("failed_ports"),
Some(&serde_json::json!(["20", "43"]))
);
assert!(normalized.get("gateway").is_none());
assert!(normalized.get("ports").is_none());
}
#[test]
fn test_gateway_dto_normalizes_legacy_in_probe_summary_shape() {
let legacy_summary = serde_json::json!({
"all_pass": true,
"failed_ports": [],
"error": null,
"ports_tested": 32
});
let gateway_dto = crate::db::models::GatewayDto {
gateway_identity_key: "id2".to_string(),
bonded: true,
performance: 50,
self_described: Some("{}".to_string()),
explorer_pretty_bond: Some("{}".to_string()),
last_probe_result: None,
last_probe_log: None,
ports_check: Some(legacy_summary),
last_ports_check_utc: Some(1672531200),
last_testrun_utc: None,
last_updated_utc: 1672531200,
moniker: "m".to_string(),
security_contact: "c".to_string(),
details: "d".to_string(),
website: "w".to_string(),
bridges: None,
};
let http_gateway: crate::http::models::Gateway = gateway_dto.try_into().unwrap();
let normalized = http_gateway.ports_check.unwrap();
assert_eq!(
normalized.get("all_pass"),
Some(&serde_json::Value::Bool(true))
);
assert_eq!(normalized.get("failed_ports"), Some(&serde_json::json!([])));
assert_eq!(normalized.get("error"), Some(&serde_json::Value::Null));
assert_eq!(
normalized.get("port_check_target"),
Some(&serde_json::Value::Null)
);
assert!(normalized.get("ports_tested").is_none());
}
#[test]
fn test_mixnode_dto_try_from() {
let mixnode_dto = crate::db::models::MixnodeDto {
@@ -267,6 +401,8 @@ fn test_gateway_dto_with_null_values() {
explorer_pretty_bond: None,
last_probe_result: None,
last_probe_log: None,
ports_check: None,
last_ports_check_utc: None,
last_testrun_utc: None,
last_updated_utc: 0,
moniker: "".to_string(),
@@ -85,14 +85,14 @@ async fn get_gateway(
) -> HttpResult<Json<Gateway>> {
let db = state.db_pool();
let res = state.cache().get_gateway_list(db).await;
match res
.iter()
.find(|item| item.gateway_identity_key == identity_key)
{
Some(res) => Ok(Json(res.clone())),
None => Err(HttpError::invalid_input(identity_key)),
}
res.into_iter()
.find(|g| g.gateway_identity_key == identity_key)
.map(Json)
.ok_or_else(|| {
HttpError::invalid_input(format!(
"gateway not found in local gateway cache: {identity_key}"
))
})
}
// Extract filtering logic for testing
@@ -106,6 +106,8 @@ fn filter_bonded_gateways_to_skinny(gateways: Vec<Gateway>) -> Vec<GatewaySkinny
performance: g.performance,
explorer_pretty_bond: g.explorer_pretty_bond.clone(),
last_probe_result: g.last_probe_result.clone(),
ports_check: g.ports_check.clone(),
last_ports_check_utc: g.last_ports_check_utc.clone(),
last_testrun_utc: g.last_testrun_utc.clone(),
last_updated_utc: g.last_updated_utc.clone(),
routing_score: g.routing_score,
@@ -135,6 +137,8 @@ mod tests {
},
last_probe_result: Some(serde_json::json!({"result": "ok"})),
last_probe_log: None,
ports_check: None,
last_ports_check_utc: None,
last_testrun_utc: Some("2024-01-20T10:00:00Z".to_string()),
last_updated_utc: "2024-01-20T11:00:00Z".to_string(),
routing_score: 0.95,
@@ -79,6 +79,8 @@ mod tests {
},
last_probe_result: None,
last_probe_log: None,
ports_check: None,
last_ports_check_utc: None,
last_testrun_utc: None,
last_updated_utc: "2025-01-01T12:00:00Z".to_string(),
routing_score: 1.0,
@@ -191,6 +191,8 @@ mod tests {
},
last_probe_result: None,
last_probe_log: None,
ports_check: None,
last_ports_check_utc: None,
last_testrun_utc: Some("2024-01-20T10:00:00Z".to_string()),
last_updated_utc: "2024-01-20T11:00:00Z".to_string(),
routing_score: 0.95,
@@ -1,5 +1,5 @@
use crate::db::DbConnection;
use crate::db::models::{TestRunDto, TestRunStatus};
use crate::db::models::{TestRunDto, TestRunKind, TestRunStatus};
use crate::db::queries;
use crate::utils::{now_utc, unix_timestamp_to_utc_rfc3339};
use crate::{
@@ -16,7 +16,8 @@ use axum::{
extract::{Path, State},
};
use nym_node_status_client::models::{
TestrunAssignmentWithTickets, get_testrun, submit_results, submit_results_v2,
TestrunAssignmentWithTickets, get_testrun, submit_ports_check_results_v2, submit_results,
submit_results_v2,
};
use reqwest::StatusCode;
use tracing::error;
@@ -26,8 +27,16 @@ use tracing::error;
pub(crate) fn routes() -> Router<AppState> {
Router::new()
.route("/", axum::routing::get(request_testrun))
.route(
"/ports-check",
axum::routing::get(request_ports_check_testrun),
)
.route("/:testrun_id", axum::routing::post(submit_testrun))
.route("/:testrun_id/v2", axum::routing::post(submit_testrun_v2))
.route(
"/:testrun_id/ports-check/v2",
axum::routing::post(submit_ports_check_testrun_v2),
)
.layer(DefaultBodyLimit::max(1024 * 1024 * 5))
}
@@ -90,6 +99,64 @@ async fn request_testrun(
}
}
#[tracing::instrument(level = "debug", skip_all)]
async fn request_ports_check_testrun(
State(state): State<AppState>,
Json(request): Json<get_testrun::GetTestrunRequest>,
) -> HttpResult<Json<TestrunAssignmentWithTickets>> {
state.authenticate_agent_submission(&request)?;
state.is_fresh(&request.payload.timestamp)?;
tracing::debug!("Agent requested ports-check testrun");
let db = state.db_pool();
let mut conn = db
.acquire()
.await
.map_err(HttpError::internal_with_logging)?;
let active_testruns = db::queries::testruns::count_testruns_in_progress(&mut conn)
.await
.map_err(HttpError::internal_with_logging)?
.unwrap_or_default();
let max_count = state.agent_max_count();
if active_testruns >= max_count {
tracing::warn!("{active_testruns}/{max_count} testruns in progress, rejecting",);
return Err(HttpError::no_testruns_available());
}
match db::queries::testruns::assign_oldest_ports_check_testrun(&mut conn).await {
Ok(res) => {
let Some(assignment) = res else {
tracing::debug!("No ports-check testruns available");
return Err(HttpError::no_testruns_available());
};
tracing::info!(
"🏃‍ Assigned ports-check testrun row_id {} gateway {} to agent",
&assignment.testrun_id,
assignment.gateway_identity_key,
);
let materials = state
.ticketbook_manager_state()
.attempt_assign_ticket_materials(assignment.testrun_id)
.await
.map_err(|err| {
error!(
"failed to get ticket materials for runner {}: {err}",
assignment.testrun_id
);
HttpError::internal_with_logging(format!(
"could not retrieve needed tickets: {err}"
))
})?;
Ok(Json(assignment.with_ticket_materials(materials)))
}
Err(err) => Err(HttpError::internal_with_logging(err)),
}
}
#[tracing::instrument(level = "debug", skip_all)]
async fn submit_testrun(
Path(submitted_testrun_id): Path<i32>,
@@ -338,3 +405,148 @@ async fn process_testrun_submission_by_gateway(
Ok(StatusCode::CREATED)
}
#[tracing::instrument(level = "debug", skip_all)]
async fn submit_ports_check_testrun_v2(
Path(submitted_testrun_id): Path<i32>,
State(state): State<AppState>,
Json(submission): Json<submit_ports_check_results_v2::SubmitPortsCheckResultsV2>,
) -> HttpResult<StatusCode> {
state.authenticate_agent_submission(&submission)?;
let db = state.db_pool();
let mut conn = db
.acquire()
.await
.map_err(HttpError::internal_with_logging)?;
match queries::testruns::get_testrun_by_id(&mut conn, submitted_testrun_id).await {
Ok(testrun) => {
if testrun.kind != TestRunKind::PortsCheck as i16 {
tracing::warn!(
"Testrun {} has wrong kind for ports-check submit: {}",
submitted_testrun_id,
testrun.kind
);
return Err(HttpError::invalid_input(format!(
"Testrun {} is not a ports-check run",
submitted_testrun_id
)));
}
let gw_identity = queries::select_gateway_identity(&mut conn, testrun.gateway_id)
.await
.map_err(HttpError::internal_with_logging)?;
if gw_identity != submission.payload.gateway_identity_key {
tracing::warn!(
"Gateway mismatch for ports-check testrun {}: expected {}, got {}",
submitted_testrun_id,
gw_identity,
submission.payload.gateway_identity_key
);
return Err(HttpError::invalid_input("Gateway identity mismatch"));
}
if Some(submission.payload.assigned_at_utc) != testrun.last_assigned_utc {
tracing::warn!(
"Submitted ports-check testrun timestamp mismatch: {} != {:?}, rejecting",
submission.payload.assigned_at_utc,
testrun.last_assigned_utc
);
return Err(HttpError::invalid_input(format!(
"Ports-check testrun {} timestamp mismatch: expected {:?}, got {}",
submitted_testrun_id,
testrun.last_assigned_utc,
submission.payload.assigned_at_utc
)));
}
process_ports_check_submission(
submitted_testrun_id,
testrun.gateway_id,
submission.payload,
&mut conn,
)
.await
}
Err(_) => {
tracing::info!(
"Creating external ports-check testrun {} for gateway {}",
submitted_testrun_id,
submission.payload.gateway_identity_key
);
let gateway_id =
queries::get_or_create_gateway(&mut conn, &submission.payload.gateway_identity_key)
.await
.map_err(HttpError::internal_with_logging)?;
queries::testruns::insert_external_ports_check_testrun(
&mut conn,
submitted_testrun_id,
gateway_id,
submission.payload.assigned_at_utc,
)
.await
.map_err(HttpError::internal_with_logging)?;
process_ports_check_submission(
submitted_testrun_id,
gateway_id,
submission.payload,
&mut conn,
)
.await
}
}
}
async fn process_ports_check_submission(
testrun_id: i32,
gateway_id: i32,
payload: submit_ports_check_results_v2::Payload,
conn: &mut DbConnection,
) -> HttpResult<StatusCode> {
let gw_identity = &payload.gateway_identity_key;
let open_ports = payload
.port_check_result
.ports
.values()
.filter(|&&v| v)
.count();
let total_ports = payload.port_check_result.ports.len();
tracing::debug!(
"Processing ports-check submission for gateway {} ({} bytes, {}/{} ports open, can_register={})",
gw_identity,
payload.probe_log.len(),
open_ports,
total_ports,
payload.port_check_result.can_register,
);
queries::testruns::update_gateway_ports_check_only(
conn,
gateway_id,
&payload.port_check_result,
)
.await
.map_err(HttpError::internal_with_logging)?;
queries::testruns::update_testrun_status(conn, testrun_id, TestRunStatus::Complete)
.await
.map_err(HttpError::internal_with_logging)?;
let assigned_at = unix_timestamp_to_utc_rfc3339(payload.assigned_at_utc);
let now = now_utc();
tracing::info!(
"✅ Ports-check testrun {} for gateway {} complete (assigned at {}, current time {})",
testrun_id,
gw_identity,
assigned_at,
now
);
Ok(StatusCode::CREATED)
}
@@ -20,6 +20,8 @@ fn test_weighted_score_calculation() {
},
last_probe_result: None,
last_probe_log: None,
ports_check: None,
last_ports_check_utc: None,
last_testrun_utc: None,
last_updated_utc: "2025-10-10T00:00:00Z".to_string(),
routing_score: 0.0,
@@ -163,10 +165,12 @@ fn conversion_from_gw_probe_latest() {
downloaded_file_size_bytes_v6: 20971520,
downloaded_file_v6: "test-file-v6.bin".to_string(),
download_error_v6: "none-v6".to_string(),
port_check_results: None,
};
let probe_latest = ProbeResultLatest {
node: "test-node-identity-key".to_string(),
used_entry: "test-entry-node".to_string(),
ports_check: None,
outcome: ProbeOutcomeLatest {
as_entry: EntryLatest::Tested(EntryTestResultLatest {
can_connect: true,
@@ -43,6 +43,8 @@ pub struct Gateway {
pub description: NodeDescription,
pub last_probe_result: Option<serde_json::Value>,
pub last_probe_log: Option<String>,
pub ports_check: Option<serde_json::Value>,
pub last_ports_check_utc: Option<String>,
pub last_testrun_utc: Option<String>,
pub last_updated_utc: String,
pub routing_score: f32,
@@ -566,6 +568,8 @@ mod test {
self_described: Some(serde_json::json!({"test": "value"})),
explorer_pretty_bond: None,
last_probe_result: Some(serde_json::json!({"status": "ok"})),
ports_check: None,
last_ports_check_utc: None,
last_testrun_utc: Some("2024-01-20T10:00:00Z".to_string()),
last_updated_utc: "2024-01-20T11:00:00Z".to_string(),
routing_score: 0.95,
@@ -628,6 +632,8 @@ pub struct GatewaySkinny {
pub self_described: Option<serde_json::Value>,
pub explorer_pretty_bond: Option<serde_json::Value>,
pub last_probe_result: Option<serde_json::Value>,
pub ports_check: Option<serde_json::Value>,
pub last_ports_check_utc: Option<String>,
pub last_testrun_utc: Option<String>,
pub last_updated_utc: String,
pub routing_score: f32,
@@ -9,6 +9,7 @@ use core::net::SocketAddr;
use nym_crypto::asymmetric::ed25519::PublicKey;
use nym_task::ShutdownTracker;
use std::sync::Arc;
use tokio::time::MissedTickBehavior;
use tokio::{net::TcpListener, sync::RwLock};
/// Return handles that allow for graceful shutdown of server + awaiting its
@@ -27,6 +28,7 @@ pub(crate) async fn start_http_api(
shutdown_tracker: &ShutdownTracker,
) -> anyhow::Result<()> {
let router_builder = RouterBuilder::with_default_routes();
let db_pool_for_scheduler = db_pool.clone();
let state = AppState::new(
db_pool,
@@ -46,6 +48,34 @@ pub(crate) async fn start_http_api(
let server = router.build_server(bind_addr).await?;
let shutdown = shutdown_tracker.clone_shutdown_token().cancelled_owned();
let ports_check_scheduler_enabled = std::env::var("PORTS_CHECK_SCHEDULER_ENABLED")
.ok()
.map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "YES"))
.unwrap_or(true);
if ports_check_scheduler_enabled {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60 * 10));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let scheduler_shutdown = shutdown_tracker.clone_shutdown_token().cancelled_owned();
shutdown_tracker.spawn(async move {
tokio::select! {
_ = async {
loop {
interval.tick().await;
match crate::db::queries::testruns::enqueue_due_ports_check_testruns(&db_pool_for_scheduler).await {
Ok(enqueued) if enqueued > 0 => tracing::info!("Enqueued {enqueued} due ports-check testruns"),
Ok(_) => {}
Err(e) => tracing::warn!("Failed to enqueue due ports-check testruns: {e}"),
}
}
} => {},
_ = scheduler_shutdown => {}
}
});
} else {
tracing::info!("Ports-check scheduler disabled (PORTS_CHECK_SCHEDULER_ENABLED=false)");
}
shutdown_tracker.spawn(async move {
axum::serve(
server.listener,
@@ -122,10 +122,10 @@ impl AppState {
return Err(HttpError::unauthorized());
};
request.verify_signature().map_err(|_| {
if request.verify_signature().is_err() {
tracing::warn!("Signature verification failed, rejecting");
HttpError::unauthorized()
})?;
return Err(HttpError::unauthorized());
}
Ok(())
}
@@ -125,7 +125,12 @@ async fn main() -> anyhow::Result<()> {
let pool = storage.pool_owned();
shutdown_manager.spawn_with_shutdown(async move {
testruns::start(pool, args.testruns_refresh_interval).await
testruns::start(
pool,
args.testruns_refresh_interval,
args.testruns_stale_in_progress,
)
.await
});
let db_pool_scraper = storage.pool_owned();
@@ -29,10 +29,13 @@ pub mod builders {
},
last_probe_result: None,
last_probe_log: None,
ports_check: None,
last_ports_check_utc: None,
last_testrun_utc: None,
last_updated_utc: "2024-01-01T00:00:00Z".to_string(),
routing_score: 0.95,
config_score: 100,
bridges: None,
},
}
}
@@ -8,9 +8,13 @@ use tracing::instrument;
pub(crate) mod models;
mod queue;
pub(crate) async fn start(pool: DbPool, refresh_interval: Duration) {
pub(crate) async fn start(
pool: DbPool,
refresh_interval: Duration,
stale_in_progress_after: Duration,
) {
loop {
if let Err(e) = refresh_stale_testruns(&pool, refresh_interval).await {
if let Err(e) = refresh_stale_testruns(&pool, stale_in_progress_after).await {
tracing::error!("{e}");
}
@@ -1,5 +1,5 @@
use crate::db::DbConnection;
use crate::db::models::{GatewayInfoDto, TestRunDto, TestRunStatus};
use crate::db::models::{GatewayInfoDto, TestRunDto, TestRunKind, TestRunStatus};
use crate::testruns::models::TestRun;
use crate::utils::now_utc;
use anyhow::anyhow;
@@ -53,15 +53,17 @@ pub(crate) async fn try_queue_testrun(
id,
gateway_id,
status,
kind,
created_utc,
ip_address,
log,
last_assigned_utc
FROM testruns
WHERE gateway_id = $1 AND status != 2
WHERE gateway_id = $1 AND status != 2 AND kind = $2
ORDER BY id DESC
LIMIT 1"#,
gateway_id
gateway_id,
TestRunKind::Probe as i16
)
.fetch(conn.as_mut())
.try_collect::<Vec<_>>()
@@ -12,6 +12,10 @@ impl ApiPaths {
format!("{}/internal/testruns", self.server_address)
}
pub(super) fn request_ports_check_testrun(&self) -> String {
format!("{}/internal/testruns/ports-check", self.server_address)
}
pub(super) fn submit_results(&self, testrun_id: impl Display) -> String {
format!("{}/internal/testruns/{}", self.server_address, testrun_id)
}
@@ -22,4 +26,11 @@ impl ApiPaths {
self.server_address, testrun_id
)
}
pub(super) fn submit_ports_check_results_v2(&self, testrun_id: impl Display) -> String {
format!(
"{}/internal/testruns/{}/ports-check/v2",
self.server_address, testrun_id
)
}
}
@@ -1,4 +1,7 @@
use crate::models::{TestrunAssignmentWithTickets, get_testrun, submit_results, submit_results_v2};
use crate::models::{
TestrunAssignmentWithTickets, get_testrun, submit_ports_check_results_v2, submit_results,
submit_results_v2,
};
use anyhow::bail;
use api::ApiPaths;
use nym_crypto::asymmetric::ed25519::{PrivateKey, Signature};
@@ -76,6 +79,52 @@ impl NsApiClient {
})
}
#[instrument(level = "info", skip_all)]
pub async fn request_ports_check_testrun(
&self,
) -> anyhow::Result<Option<TestrunAssignmentWithTickets>> {
let target_url = self.api.request_ports_check_testrun();
let payload = get_testrun::Payload {
agent_public_key: self.auth_key.public_key(),
timestamp: time::UtcDateTime::now().unix_timestamp(),
};
let signature = self.sign_message(&payload)?;
let request = get_testrun::GetTestrunRequest { payload, signature };
let res = self.client.get(target_url).json(&request).send().await?;
let status = res.status();
let response_text = res.text().await?;
if status.is_client_error() {
if matches!(status, reqwest::StatusCode::NOT_FOUND) {
tracing::warn!(
"Ports-check request endpoint not available on server (404), skipping iteration"
);
return Ok(None);
}
bail!("{}: {}", status, response_text);
} else if status.is_server_error() {
if matches!(status, reqwest::StatusCode::SERVICE_UNAVAILABLE)
&& response_text.contains("No testruns available")
{
return Ok(None);
} else {
bail!("{}: {}", status, response_text);
}
}
serde_json::from_str(&response_text)
.map(|testrun| {
tracing::info!("Received ports-check testrun assignment: {:?}", testrun);
testrun
})
.map_err(|err| {
tracing::error!("err");
err.into()
})
}
#[instrument(level = "info", fields(testrun_id, assigned_at_utc), skip_all)]
pub async fn submit_results(
&self,
@@ -140,6 +189,40 @@ impl NsApiClient {
Ok(())
}
#[instrument(level = "info", skip(self, port_check_result))]
pub async fn submit_ports_check_results_with_context(
&self,
testrun_id: i32,
port_check_result: nym_gateway_probe::PortCheckResult,
probe_log: String,
assigned_at_utc: i64,
gateway_identity_key: String,
) -> anyhow::Result<()> {
let target_url = self.api.submit_ports_check_results_v2(testrun_id);
let payload = submit_ports_check_results_v2::Payload {
port_check_result,
probe_log,
agent_public_key: self.auth_key.public_key(),
assigned_at_utc,
gateway_identity_key,
};
let signature = self.sign_message(&payload)?;
let submit =
submit_ports_check_results_v2::SubmitPortsCheckResultsV2 { payload, signature };
let res = self
.client
.post(target_url)
.json(&submit)
.send()
.await
.and_then(|response| response.error_for_status())?;
tracing::debug!("Submitted ports-check results: {})", res.status());
Ok(())
}
fn sign_message<T>(&self, message: &T) -> anyhow::Result<Signature>
where
T: serde::Serialize,
@@ -41,6 +41,9 @@ pub struct TestrunAssignment {
pub testrun_id: i32,
pub assigned_at_utc: i64,
pub gateway_identity_key: String,
/// Unix timestamp (seconds) of the last persisted exit-policy port check for this gateway.
#[serde(default)]
pub last_ports_check_utc: Option<i64>,
}
impl TestrunAssignment {
@@ -143,3 +146,39 @@ pub mod submit_results_v2 {
}
}
}
pub mod submit_ports_check_results_v2 {
use crate::auth::SignedRequest;
use super::*;
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Payload {
pub port_check_result: nym_gateway_probe::PortCheckResult,
pub probe_log: String,
pub agent_public_key: PublicKey,
pub assigned_at_utc: i64,
pub gateway_identity_key: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SubmitPortsCheckResultsV2 {
pub payload: Payload,
pub signature: Signature,
}
impl SignedRequest for SubmitPortsCheckResultsV2 {
type Payload = Payload;
fn public_key(&self) -> &PublicKey {
&self.payload.agent_public_key
}
fn signature(&self) -> &Signature {
&self.signature
}
fn payload(&self) -> &Self::Payload {
&self.payload
}
}
}