testing port checks

add no-log to anywhere

add support for not registered nodes

...
address comments

remove unregistered nodes

testing port checks
add support for not registered nodes

...
address comments

test port check in probe results
migration update

probe arg fix

bump NS versions

cleanup and remove unannounced node option

bugsfixes

Remove in-prove

remove in-probe test, it isn't needed.

add multiple target host options

cleanup

change default target, and use batch only for portquiz

Revert "change default target, and use batch only for portquiz"

This reverts commit 8b38969964e7808b9c4e50a920ee5bc51438c7bf.

ded line

bugfixes

batch fix

batch limits

force ipv4
This commit is contained in:
benedettadavico
2026-02-13 10:31:55 +01:00
parent 43a1bd38e8
commit d37b4226d0
49 changed files with 2195 additions and 121 deletions
+1 -1
View File
@@ -17,7 +17,7 @@ anyhow = { workspace = true }
base64 = { workspace = true }
bs58 = { workspace = true }
bytes = { workspace = true }
clap = { workspace = true, features = ["cargo", "derive"] }
clap = { workspace = true, features = ["cargo", "derive", "env"] }
futures = { workspace = true }
hex = { workspace = true }
tracing = { workspace = true }
+92
View File
@@ -9,6 +9,7 @@ use vergen_gitcl::{BuildBuilder, CargoBuilder, Emitter, GitclBuilder, RustcBuild
fn main() -> anyhow::Result<()> {
build_go()?;
generate_exit_policy_ports()?;
Emitter::default()
.add_instructions(&BuildBuilder::all_build()?)?
@@ -18,6 +19,97 @@ fn main() -> anyhow::Result<()> {
.emit()
}
/// Parse PORT_MAPPINGS from network-tunnel-manager.sh and generate a sorted
/// Rust const with every unique port. Ranges are represented by their start
/// and end values so a single TCP check can confirm the iptables rule exists.
/// TODO: consider runtime fetch from NS API exit policy endpoint instead of parsing the script
fn generate_exit_policy_ports() -> anyhow::Result<()> {
use std::collections::BTreeMap;
let script_path = PathBuf::from("../scripts/nym-node-setup/network-tunnel-manager.sh");
let out_dir = PathBuf::from(std::env::var("OUT_DIR").context("OUT_DIR not set")?);
println!("cargo::rerun-if-changed={}", script_path.display());
let content = std::fs::read_to_string(&script_path).context(
"failed to read network-tunnel-manager.sh — is it present at ../scripts/nym-node-setup/ ?",
)?;
// port → service name (BTreeMap keeps them sorted)
let mut port_map: BTreeMap<u16, String> = BTreeMap::new();
let mut in_mappings = false;
for line in content.lines() {
let trimmed = line.trim();
if trimmed.starts_with("declare -A PORT_MAPPINGS=(") {
in_mappings = true;
continue;
}
if in_mappings && trimmed == ")" {
break;
}
if !in_mappings {
continue;
}
// strip comment prefix so we still pick up ports that are opened
// via a separate mechanism (e.g. SMTP/465 with rate limiting)
let stripped = trimmed.trim_start_matches('#').trim();
// match ["ServiceName"]="port-or-range"
let Some(name_start) = stripped.find("[\"") else {
continue;
};
let Some(name_end) = stripped.find("\"]=") else {
continue;
};
let service = &stripped[name_start + 2..name_end];
let value = stripped[name_end + 3..].trim_matches('"');
if value.contains('-') {
let parts: Vec<&str> = value.split('-').collect();
if parts.len() == 2 {
if let (Ok(lo), Ok(hi)) = (parts[0].parse::<u16>(), parts[1].parse::<u16>()) {
port_map
.entry(lo)
.or_insert_with(|| format!("{service} (range start)"));
port_map
.entry(hi)
.or_insert_with(|| format!("{service} (range end)"));
}
}
} else if let Ok(port) = value.parse::<u16>() {
port_map.entry(port).or_insert_with(|| service.to_string());
}
}
if port_map.is_empty() {
bail!("No ports found in PORT_MAPPINGS — is network-tunnel-manager.sh correct?");
}
// write generated Rust source
let mut out = String::new();
out.push_str(
"// Auto-generated from scripts/nym-node-setup/network-tunnel-manager.sh PORT_MAPPINGS.\n",
);
out.push_str("// Do not edit — changes are overwritten on rebuild.\n");
out.push_str("// To add or remove ports, update PORT_MAPPINGS in the shell script.\n\n");
out.push_str(&format!(
"/// {} unique ports parsed from the canonical exit policy at build time.\n",
port_map.len()
));
out.push_str("pub const EXIT_POLICY_PORTS: &[u16] = &[\n");
for (port, service) in &port_map {
let entry = format!("{port},");
out.push_str(&format!(" {entry:<7}// {service}\n"));
}
out.push_str("];\n");
std::fs::write(out_dir.join("exit_policy_ports.rs"), out)?;
Ok(())
}
fn build_go() -> anyhow::Result<()> {
const LIB_NAME: &str = "netstack_ping";
+148 -14
View File
@@ -23,6 +23,7 @@ import (
netUrl "net/url"
"os"
"strings"
"sync"
"time"
"unsafe"
@@ -62,21 +63,27 @@ type NetstackRequestGo struct {
DownloadTimeoutSec uint64 `json:"download_timeout_sec"`
MetadataTimeoutSec uint64 `json:"metadata_timeout_sec"`
AwgArgs string `json:"awg_args"`
// exit policy port check
PortCheckTarget string `json:"port_check_target"`
PortCheckPorts []uint16 `json:"port_check_ports"`
PortCheckOnly bool `json:"port_check_only"`
PortCheckTimeoutSec uint64 `json:"port_check_timeout_sec"`
}
type NetstackResponse struct {
CanHandshake bool `json:"can_handshake"`
CanQueryMetadata bool `json:"can_query_metadata"`
SentIps uint16 `json:"sent_ips"`
ReceivedIps uint16 `json:"received_ips"`
SentHosts uint16 `json:"sent_hosts"`
ReceivedHosts uint16 `json:"received_hosts"`
CanResolveDns bool `json:"can_resolve_dns"`
DownloadedFile string `json:"downloaded_file"`
DownloadedFileSizeBytes uint64 `json:"downloaded_file_size_bytes"`
DownloadDurationSec uint64 `json:"download_duration_sec"`
DownloadDurationMilliseconds uint64 `json:"download_duration_milliseconds"`
DownloadError string `json:"download_error"`
CanHandshake bool `json:"can_handshake"`
CanQueryMetadata bool `json:"can_query_metadata"`
SentIps uint16 `json:"sent_ips"`
ReceivedIps uint16 `json:"received_ips"`
SentHosts uint16 `json:"sent_hosts"`
ReceivedHosts uint16 `json:"received_hosts"`
CanResolveDns bool `json:"can_resolve_dns"`
DownloadedFile string `json:"downloaded_file"`
DownloadedFileSizeBytes uint64 `json:"downloaded_file_size_bytes"`
DownloadDurationSec uint64 `json:"download_duration_sec"`
DownloadDurationMilliseconds uint64 `json:"download_duration_milliseconds"`
DownloadError string `json:"download_error"`
PortCheckResults map[string]bool `json:"port_check_results,omitempty"`
}
type SuccessResult = struct {
@@ -201,7 +208,7 @@ func pingTwoHop(req TwoHopNetstackRequest) (NetstackResponse, error) {
log.Printf("Exit WG IP: %s", req.ExitWgIp)
log.Printf("IP version: %d", req.IpVersion)
response := NetstackResponse{false, false, 0, 0, 0, 0, false, "", 0, 0, 0, ""}
response := NetstackResponse{}
// Parse the exit endpoint to determine IP version for forwarder
exitEndpoint, err := netip.ParseAddrPort(req.ExitEndpoint)
@@ -439,7 +446,7 @@ func ping(req NetstackRequestGo) (NetstackResponse, error) {
ipc.WriteString("\nallowed_ip=::/0\n")
}
response := NetstackResponse{false, false, 0, 0, 0, 0, false, "", 0, 0, 0, ""}
response := NetstackResponse{}
err = dev.IpcSet(ipc.String())
if err != nil {
@@ -463,6 +470,19 @@ func ping(req NetstackRequestGo) (NetstackResponse, error) {
response.CanHandshake = true
// port-check-only mode: skip pings/download, only test TCP port reachability
if req.PortCheckOnly && len(req.PortCheckPorts) > 0 {
log.Printf("=== Port Check Only Mode ===")
response.PortCheckResults = checkPorts(req.PortCheckTarget, req.PortCheckPorts, req.PortCheckTimeoutSec, tnet)
return response, nil
}
// run port checks alongside normal tests if ports were requested
if len(req.PortCheckPorts) > 0 {
log.Printf("=== Running Port Checks (alongside normal tests) ===")
response.PortCheckResults = checkPorts(req.PortCheckTarget, req.PortCheckPorts, req.PortCheckTimeoutSec, tnet)
}
// Skip metadata query if endpoint is empty (e.g., for IPv6 where the IPv4 metadata endpoint is not reachable)
if req.MetadataEndpoint != "" {
version, duration, err := queryMetadata(req.MetadataEndpoint, req.MetadataTimeoutSec, tnet)
@@ -569,6 +589,120 @@ func ping(req NetstackRequestGo) (NetstackResponse, error) {
return response, nil
}
const portquizBatchSize = 20
const portquizBatchDelay = 25 * time.Second
const portquizDialGap = 20 * time.Millisecond
func checkPorts(target string, ports []uint16, timeoutSec uint64, tnet *netstack.Net) map[string]bool {
if target == "" {
target = "portquiz.net"
}
if timeoutSec == 0 {
timeoutSec = 5
}
targetIP := target
if net.ParseIP(target) == nil {
addrs, err := net.LookupHost(target)
if err != nil || len(addrs) == 0 {
log.Printf("Port check: DNS lookup for %s failed (%v), using hostname as-is", target, err)
} else {
chosen := addrs[0]
for _, a := range addrs {
if net.ParseIP(a).To4() != nil {
chosen = a
break
}
}
targetIP = chosen
log.Printf("Port check: resolved %s -> %s", target, targetIP)
}
}
timeout := time.Duration(timeoutSec) * time.Second
results := make(map[string]bool, len(ports))
if strings.Contains(target, "portquiz.net") {
// portquiz.net rate-limits after ~29 connections per window; use large batches with a long cooldown
log.Printf("Port check: testing %d ports on %s in batches of %d with %v cooldown (timeout %v each)",
len(ports), target, portquizBatchSize, portquizBatchDelay, timeout)
for batchIdx := 0; batchIdx < len(ports); batchIdx += portquizBatchSize {
end := batchIdx + portquizBatchSize
if end > len(ports) {
end = len(ports)
}
batch := ports[batchIdx:end]
for i, p := range batch {
addr := net.JoinHostPort(targetIP, fmt.Sprintf("%d", p))
ctx, cancel := context.WithTimeout(context.Background(), timeout)
c, err := tnet.DialContext(ctx, "tcp", addr)
cancel()
key := fmt.Sprintf("%d", p)
if err != nil {
log.Printf("Port %d: CLOSED (%v)", p, err)
results[key] = false
} else {
c.Close()
log.Printf("Port %d: OPEN", p)
results[key] = true
}
if i < len(batch)-1 {
time.Sleep(portquizDialGap)
}
}
if batchIdx+portquizBatchSize < len(ports) {
time.Sleep(portquizBatchDelay)
}
}
} else {
// All other targets can handle concurrent connections, probably
log.Printf("Port check: testing %d ports on %s concurrently (timeout %v each)",
len(ports), target, timeout)
var (
mu sync.Mutex
wg sync.WaitGroup
)
for _, p := range ports {
wg.Add(1)
go func(port uint16) {
defer wg.Done()
addr := net.JoinHostPort(targetIP, fmt.Sprintf("%d", port))
ctx, cancel := context.WithTimeout(context.Background(), timeout)
c, err := tnet.DialContext(ctx, "tcp", addr)
cancel()
key := fmt.Sprintf("%d", port)
if err != nil {
log.Printf("Port %d: CLOSED (%v)", port, err)
mu.Lock()
results[key] = false
mu.Unlock()
} else {
c.Close()
log.Printf("Port %d: OPEN", port)
mu.Lock()
results[key] = true
mu.Unlock()
}
}(p)
}
wg.Wait()
}
openCount := 0
for _, open := range results {
if open {
openCount++
}
}
log.Printf("Port check complete: %d/%d ports open", openCount, len(ports))
return results
}
func sendPing(address string, seq uint8, sendTtimeoutSecs uint64, receiveTimoutSecs uint64, tnet *netstack.Net, ipVersion uint8) (time.Duration, error) {
maxPingRetries := 2
baseTimeout := receiveTimoutSecs
+29
View File
@@ -4,6 +4,7 @@
use crate::config::NetstackArgs;
use anyhow::Context;
use serde::Deserialize;
use std::collections::BTreeMap;
use std::ffi::{CStr, CString};
mod sys {
@@ -15,6 +16,19 @@ mod sys {
}
}
/// Port-check fields shared between `NetstackRequest` and `NetstackRequestGo`
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct PortCheckConfig {
#[serde(rename = "port_check_target")]
pub target: String,
#[serde(rename = "port_check_ports")]
pub ports: Vec<u16>,
#[serde(rename = "port_check_only")]
pub only: bool,
#[serde(rename = "port_check_timeout_sec")]
pub timeout_sec: u64,
}
#[derive(serde::Serialize)]
pub struct NetstackRequest {
private_key: String,
@@ -25,6 +39,8 @@ pub struct NetstackRequest {
v6_ping_config: PingConfig,
download_timeout_sec: u64,
awg_args: String,
#[serde(flatten)]
pub port_check: PortCheckConfig,
}
#[derive(serde::Serialize)]
@@ -76,6 +92,7 @@ impl NetstackRequest {
download_timeout_sec: u64,
awg_args: &str,
netstack_args: NetstackArgs,
port_check_only: bool,
) -> Self {
Self {
private_key: private_key.to_string(),
@@ -86,6 +103,12 @@ impl NetstackRequest {
v4_ping_config: PingConfig::from_netstack_args_v4(wg_ip4, &netstack_args),
v6_ping_config: PingConfig::from_netstack_args_v6(wg_ip6, &netstack_args),
download_timeout_sec,
port_check: PortCheckConfig {
target: netstack_args.port_check_target.clone(),
ports: netstack_args.port_check_ports.clone(),
only: port_check_only,
timeout_sec: netstack_args.port_check_timeout_sec,
},
}
}
@@ -116,6 +139,8 @@ pub struct NetstackRequestGo {
recv_timeout_sec: u64,
download_timeout_sec: u64,
awg_args: String,
#[serde(flatten)]
pub port_check: PortCheckConfig,
}
impl NetstackRequestGo {
@@ -135,6 +160,7 @@ impl NetstackRequestGo {
recv_timeout_sec: req.v4_ping_config.recv_timeout_sec,
download_timeout_sec: req.download_timeout_sec,
awg_args: req.awg_args.clone(),
port_check: req.port_check.clone(),
}
}
@@ -155,6 +181,7 @@ impl NetstackRequestGo {
recv_timeout_sec: req.v6_ping_config.recv_timeout_sec,
download_timeout_sec: req.download_timeout_sec,
awg_args: req.awg_args.clone(),
port_check: req.port_check.clone(),
}
}
}
@@ -173,6 +200,8 @@ pub struct NetstackResponse {
pub downloaded_file_size_bytes: u64,
pub download_duration_milliseconds: u64,
pub download_error: String,
#[serde(default)]
pub port_check_results: Option<BTreeMap<String, bool>>,
}
#[derive(Clone, Debug, serde::Deserialize)]
@@ -46,6 +46,7 @@ pub async fn wg_probe(
auth_version: AuthenticatorVersion,
awg_args: Option<String>,
netstack_args: NetstackArgs,
port_check_only: bool,
// TODO: update type
credential: CredentialSpendingData,
) -> anyhow::Result<WgProbeResults> {
@@ -149,6 +150,7 @@ pub async fn wg_probe(
&tunnel_config,
&netstack_args,
&awg_args.unwrap_or_default(),
port_check_only,
&mut wg_outcome,
);
+67
View File
@@ -1,10 +1,50 @@
use nym_connection_monitor::ConnectionStatusEvent;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
pub use super::bandwidth_helpers::{AttachedTicket, AttachedTicketMaterials};
pub use super::socks5_test::HttpsConnectivityResult;
pub use nym_credentials::ecash::bandwidth::serialiser::VersionedSerialise;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PortsCheckSummary {
pub all_pass: bool,
pub error: Option<String>,
pub port_check_target: String,
pub failed_ports: Vec<String>,
}
impl PortsCheckSummary {
pub fn from_port_map(
can_register: bool,
port_check_target: impl Into<String>,
ports: &BTreeMap<String, bool>,
) -> Self {
let failed_ports: Vec<String> = ports
.iter()
.filter_map(|(port, open)| if *open { None } else { Some(port.clone()) })
.collect();
let all_pass = can_register && failed_ports.is_empty() && !ports.is_empty();
Self {
all_pass,
error: None,
port_check_target: port_check_target.into(),
failed_ports,
}
}
pub fn probe_error(port_check_target: impl Into<String>, message: impl Into<String>) -> Self {
Self {
all_pass: false,
error: Some(message.into()),
port_check_target: port_check_target.into(),
failed_ports: vec![],
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProbeResult {
pub node: String,
@@ -50,6 +90,10 @@ pub struct WgProbeResults {
pub download_duration_milliseconds_v6: u64,
pub downloaded_file_v6: String,
pub download_error_v6: String,
/// Per-port exit-policy check
#[serde(default, skip_serializing_if = "Option::is_none")]
pub port_check_results: Option<BTreeMap<String, bool>>,
}
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
@@ -195,6 +239,29 @@ impl Socks5ProbeResults {
}
}
/// Output of the `run-ports` subcommand — per-port TCP reachability through
/// the WG exit tunnel, without the full probe outcome.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PortCheckResult {
pub gateway: String,
pub can_register: bool,
pub port_check_target: String,
/// port → open/closed (BTreeMap for deterministic bincode serialization in signed requests)
pub ports: BTreeMap<String, bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
impl PortCheckResult {
/// Returns the list of ports that were found **closed** on this gateway.
pub fn closed_ports(&self) -> Vec<u16> {
self.ports
.iter()
.filter_map(|(k, &open)| if !open { k.parse().ok() } else { None })
.collect()
}
}
#[derive(Debug, Clone, Default)]
pub struct IpPingReplies {
pub ipr_tun_ip_v4: bool,
+16 -1
View File
@@ -63,6 +63,7 @@ impl WgTunnelConfig {
/// - DNS resolution
/// - ICMP ping to specified hosts and IPs
/// - Optional download test
/// - Optional exit policy port check (TCP connect through tunnel)
///
/// Results are written directly into the provided `wg_outcome` to avoid field-by-field
/// copying at call sites.
@@ -71,6 +72,7 @@ impl WgTunnelConfig {
/// * `config` - WireGuard tunnel configuration
/// * `netstack_args` - Netstack test parameters (DNS, hosts to ping, timeouts, etc.)
/// * `awg_args` - Amnezia WireGuard arguments (empty string for standard WG)
/// * `port_check_only` - If true, skip pings/download and only run TCP port checks
/// * `wg_outcome` - Mutable reference to write test results into
// This function extracts the shared netstack testing logic from
// wg_probe() and wg_probe_lp() to eliminate code duplication.
@@ -78,6 +80,7 @@ pub fn run_tunnel_tests(
config: &WgTunnelConfig,
netstack_args: &NetstackArgs,
awg_args: &str,
port_check_only: bool,
wg_outcome: &mut WgProbeResults,
) {
// Build the netstack request
@@ -91,9 +94,10 @@ pub fn run_tunnel_tests(
netstack_args.netstack_download_timeout_sec,
awg_args,
netstack_args.clone(),
port_check_only,
);
// Perform IPv4 ping test
// Perform IPv4 ping test (also carries port check results in port-check-only mode)
info!("Testing IPv4 tunnel connectivity...");
let ipv4_request = NetstackRequestGo::from_rust_v4(&netstack_request);
@@ -122,6 +126,11 @@ pub fn run_tunnel_tests(
netstack_response_v4.downloaded_file_size_bytes;
wg_outcome.downloaded_file_v4 = netstack_response_v4.downloaded_file;
wg_outcome.download_error_v4 = netstack_response_v4.download_error;
// capture port check results (present when ports were requested)
if netstack_response_v4.port_check_results.is_some() {
wg_outcome.port_check_results = netstack_response_v4.port_check_results;
}
}
Ok(NetstackResult::Error { error }) => {
error!("Netstack runtime error (IPv4): {error}")
@@ -131,6 +140,12 @@ pub fn run_tunnel_tests(
}
}
// in port-check-only mode, skip IPv6 tests — port checks ran through IPv4 above
if port_check_only {
info!("Port-check-only mode: skipping IPv6 tunnel tests");
return;
}
// Perform IPv6 ping test
info!("Testing IPv6 tunnel connectivity...");
let ipv6_request = NetstackRequestGo::from_rust_v6(&netstack_request);
+2 -2
View File
@@ -9,7 +9,7 @@ mod socks5;
mod test_mode;
pub use credentials::{CredentialArgs, CredentialMode};
pub use netstack::NetstackArgs;
pub use netstack::{EXIT_POLICY_PORTS, NetstackArgs};
pub use socks5::Socks5Args;
pub use test_mode::TestMode;
@@ -22,7 +22,7 @@ pub struct ProbeConfig {
/// Test mode - explicitly specify which tests to run
///
/// Modes:
/// core. - Traditional mixnet testing (entry/exit pings + WireGuard via authenticator)
/// core - Traditional mixnet testing (entry/exit pings + WireGuard via authenticator)
/// wg-mix - Wireguard via authenticator
/// wg-lp - Entry LP + Exit LP (nested forwarding) + WireGuard
/// lp-only - LP registration only (no WireGuard)
+67
View File
@@ -3,6 +3,11 @@
use clap::Args;
// EXIT_POLICY_PORTS is generated at build time by parsing PORT_MAPPINGS
// from scripts/nym-node-setup/network-tunnel-manager.sh.
// To add or remove ports, update PORT_MAPPINGS in the shell script and rebuild.
include!(concat!(env!("OUT_DIR"), "/exit_policy_ports.rs"));
#[derive(Args, Clone, Debug)]
pub struct NetstackArgs {
#[arg(long, hide = true, env = "PROBE_NETSTACK_DOWNLOAD_TIMEOUT_SEC", default_value_t = NetstackArgs::default().netstack_download_timeout_sec)]
@@ -37,6 +42,22 @@ pub struct NetstackArgs {
#[arg(long, hide = true, env = "PROBE_NETSTACK_PING_IPS_V6", default_values_t = NetstackArgs::default().netstack_ping_ips_v6)]
pub netstack_ping_ips_v6: Vec<String>,
/// Target host for exit policy port checks (must listen on all tested ports)
#[arg(
long = "use-target",
env = "PROBE_NETSTACK_PORT_CHECK_TARGET",
default_value = "portquiz.net"
)]
pub port_check_target: String,
/// List ports to check, separated by a comma.
#[arg(long = "check-ports", value_delimiter = ',', default_values_t = Vec::<u16>::new())]
pub port_check_ports: Vec<u16>,
/// Timeout in seconds for each individual port check attempt
#[arg(long, default_value_t = NetstackArgs::default().port_check_timeout_sec)]
pub port_check_timeout_sec: u64,
}
impl Default for NetstackArgs {
@@ -57,6 +78,9 @@ impl Default for NetstackArgs {
"2606:4700:4700::1111".to_string(),
"2620:fe::fe".to_string(),
],
port_check_target: "portquiz.net".to_string(),
port_check_ports: vec![],
port_check_timeout_sec: 5,
}
}
}
@@ -87,6 +111,9 @@ mod tests {
"2606:4700:4700::1111".to_string(),
"2620:fe::fe".to_string(),
],
port_check_target: "portquiz.net".to_string(),
port_check_ports: vec![],
port_check_timeout_sec: 5,
};
// Test IPv4 defaults
@@ -111,6 +138,11 @@ mod tests {
assert_eq!(args.netstack_num_ping, 5);
assert_eq!(args.netstack_send_timeout_sec, 3);
assert_eq!(args.netstack_recv_timeout_sec, 3);
// Test port check defaults
assert_eq!(args.port_check_target, "portquiz.net");
assert!(args.port_check_ports.is_empty());
assert_eq!(args.port_check_timeout_sec, 5);
}
#[test]
@@ -128,6 +160,9 @@ mod tests {
netstack_ping_ips_v4: vec!["8.8.8.8".to_string()],
netstack_ping_hosts_v6: vec!["ipv6.example.com".to_string()],
netstack_ping_ips_v6: vec!["2001:4860:4860::8888".to_string()],
port_check_target: "portquiz.net".to_string(),
port_check_ports: vec![80, 443, 8332],
port_check_timeout_sec: 10,
};
assert_eq!(args.netstack_ping_hosts_v4, vec!["example.com"]);
@@ -163,6 +198,9 @@ mod tests {
"2001:4860:4860::8888".to_string(),
"2606:4700:4700::1111".to_string(),
],
port_check_target: "portquiz.net".to_string(),
port_check_ports: vec![],
port_check_timeout_sec: 5,
};
assert_eq!(args.netstack_ping_hosts_v4, vec!["nym.com", "example.com"]);
@@ -192,6 +230,9 @@ mod tests {
netstack_ping_ips_v4: vec![],
netstack_ping_hosts_v6: vec![],
netstack_ping_ips_v6: vec![],
port_check_target: "portquiz.net".to_string(),
port_check_ports: vec![],
port_check_timeout_sec: 0,
};
assert_eq!(args.netstack_num_ping, 0);
@@ -219,6 +260,9 @@ mod tests {
netstack_ping_ips_v4: vec!["1.1.1.1".to_string()],
netstack_ping_hosts_v6: vec!["cloudflare.com".to_string()],
netstack_ping_ips_v6: vec!["2001:4860:4860::8888".to_string()],
port_check_target: "portquiz.net".to_string(),
port_check_ports: vec![],
port_check_timeout_sec: 5,
};
assert!(args.netstack_ping_hosts_v4[0].contains("nym"));
@@ -228,4 +272,27 @@ mod tests {
assert_eq!(args.netstack_v4_dns, "1.1.1.1");
assert_eq!(args.netstack_v6_dns, "2606:4700:4700::1111");
}
#[test]
fn test_exit_policy_ports_no_duplicates_and_sorted() {
let ports = EXIT_POLICY_PORTS;
assert!(!ports.is_empty(), "EXIT_POLICY_PORTS should not be empty");
// verify sorted
for window in ports.windows(2) {
assert!(
window[0] < window[1],
"EXIT_POLICY_PORTS out of order or duplicate: {} >= {}",
window[0],
window[1]
);
}
// spot-check a few well-known ports
assert!(ports.contains(&22), "should contain SSH (22)");
assert!(ports.contains(&443), "should contain HTTPS (443)");
assert!(ports.contains(&22021), "should contain Session (22021)");
assert!(ports.contains(&8332), "should contain Bitcoin (8332)");
assert!(ports.contains(&9735), "should contain Lightning (9735)");
}
}
@@ -10,6 +10,9 @@
//! - LpOnly: LP registration only, no WireGuard
//! - Socks5Only: Socks5 test
//! - All: Mixnet, wireguard over authenticator and LP registration
//!
//! Note: Exit policy port checking is handled by the `run-ports` subcommand,
//! not via a test mode.
/// Test mode for the gateway probe.
///
+451 -3
View File
@@ -9,7 +9,9 @@ use crate::common::probe_tests::{
};
use crate::common::types::{Entry, LpProbeResults};
use crate::config::{CredentialArgs, CredentialMode, NetstackArgs, ProbeConfig};
use nym_authenticator_client::{AuthClientMixnetListener, AuthenticatorClient};
use nym_authenticator_client::{
AuthClientMixnetListener, AuthClientMixnetListenerHandle, AuthenticatorClient,
};
use nym_bandwidth_controller::BandwidthTicketProvider;
use nym_client_core::config::ForgetMe;
use nym_config::defaults::NymNetworkDetails;
@@ -20,13 +22,16 @@ use nym_sdk::mixnet::{
};
use nym_topology::{HardcodedTopologyProvider, NymTopology};
use rand::rngs::OsRng;
use std::collections::BTreeMap;
use std::net::IpAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::*;
pub use crate::common::nodes::{NymApiDirectory, query_gateway_by_ip};
pub use crate::common::types::{ProbeOutcome, ProbeResult};
pub use crate::common::types::{PortCheckResult, PortsCheckSummary, ProbeOutcome, ProbeResult};
mod common;
pub use common::types;
@@ -45,7 +50,280 @@ pub struct Probe {
topology: Option<NymTopology>,
}
#[derive(Debug, Clone)]
pub struct RunPortsConfig {
pub min_gateway_mixnet_performance: Option<u8>,
pub ignore_egress_epoch_role: bool,
pub netstack_args: NetstackArgs,
}
// Port checks always target a bonded gateway. There are two entry points:
// - `run_ports` : local CLI, on-disk storage + mnemonic.
// - `run_ports_for_agent`: NS agent, ephemeral storage + ticket materials.
struct PortScanRun {
can_register: bool,
port_results: BTreeMap<String, bool>,
last_error: Option<String>,
}
/// Validated info needed to run a WG port-check via the mixnet.
struct PortCheckSetup {
exit_node: TestedNodeDetails,
exit_identity: String,
authenticator: nym_sdk::mixnet::Recipient,
ip_address: IpAddr,
port_check_target: String,
ports_count: usize,
}
impl PortCheckSetup {
fn new(exit_node: TestedNodeDetails, config: &RunPortsConfig) -> anyhow::Result<Self> {
let exit_identity = exit_node.identity.to_string();
let (authenticator, ip_address) =
match (exit_node.authenticator_address, exit_node.ip_address) {
(Some(auth), Some(ip)) => (auth, ip),
_ => anyhow::bail!(
"Gateway {} missing authenticator address or IP — not a functional exit",
exit_identity
),
};
let ports_count = config.netstack_args.port_check_ports.len();
if ports_count == 0 {
anyhow::bail!(
"No ports specified. Use --check-ports 80,443,22021 or --check-all-ports"
);
}
Ok(Self {
exit_node,
exit_identity,
authenticator,
ip_address,
port_check_target: config.netstack_args.port_check_target.clone(),
ports_count,
})
}
fn failed_to_connect(&self, err: impl std::fmt::Display) -> PortCheckResult {
PortCheckResult {
gateway: self.exit_identity.clone(),
can_register: false,
port_check_target: self.port_check_target.clone(),
ports: BTreeMap::new(),
error: Some(format!("Failed to connect to mixnet: {err}")),
}
}
}
impl Probe {
fn parse_port_check_targets(raw: &str) -> Vec<String> {
raw.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.map(ToOwned::to_owned)
.collect()
}
async fn is_target_reachable(host: &str) -> bool {
let timeout = Duration::from_secs(2);
for port in [80u16, 443u16] {
if tokio::time::timeout(timeout, tokio::net::TcpStream::connect((host, port)))
.await
.is_ok_and(|res| res.is_ok())
{
return true;
}
}
false
}
async fn run_port_scan_with_retries(
mixnet_listener_task: &AuthClientMixnetListenerHandle,
nym_address: nym_sdk::mixnet::Recipient,
authenticator: nym_sdk::mixnet::Recipient,
authenticator_version: nym_authenticator_requests::AuthenticatorVersion,
ip_address: IpAddr,
bandwidth_provider: &dyn BandwidthTicketProvider,
wg_ticket_type: TicketType,
credential_provider: nym_sdk::mixnet::NodeIdentity,
netstack_args: NetstackArgs,
awg_args: Option<String>,
) -> PortScanRun {
let mut port_results: BTreeMap<String, bool> = BTreeMap::new();
let mut can_register = false;
let mut last_error = None;
let max_attempts = 3;
for attempt in 1..=max_attempts {
if attempt > 1 {
info!("Retrying authenticator registration (attempt {attempt}/{max_attempts})...");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
let credential = match bandwidth_provider
.get_ecash_ticket(wg_ticket_type, credential_provider, 1)
.await
{
Ok(ticket) => ticket.data,
Err(e) => {
error!("Failed to get ecash ticket: {e}");
last_error = Some(format!("Failed to get ecash ticket: {e}"));
break;
}
};
let mut rng = rand::thread_rng();
let auth_client = AuthenticatorClient::new(
mixnet_listener_task.subscribe(),
mixnet_listener_task.mixnet_sender(),
nym_address,
authenticator,
authenticator_version,
Arc::new(x25519::KeyPair::new(&mut rng)),
ip_address,
);
match wg_probe(
auth_client,
ip_address,
authenticator_version,
awg_args.clone(),
netstack_args.clone(),
true, // port_check_only
credential,
)
.await
{
Ok(outcome) => {
if outcome.can_register {
can_register = true;
port_results = outcome
.port_check_results
.unwrap_or_default()
.into_iter()
.collect();
let open = port_results.values().filter(|&&v| v).count();
info!(
"Port check complete: {}/{} ports open",
open,
port_results.len()
);
break;
}
warn!(
"Auth registration returned but can_register=false (attempt {attempt}/{max_attempts})"
);
last_error = Some("Auth registration did not complete".into());
}
Err(e) => {
warn!("WG probe error: {e} (attempt {attempt}/{max_attempts})");
last_error = Some(format!("WG probe error: {e}"));
}
}
}
PortScanRun {
can_register,
port_results,
last_error,
}
}
/// Warm up routes, register with the authenticator, run the port scan and tear down.
async fn port_check_after_connect(
mixnet_client: MixnetClient,
setup: PortCheckSetup,
bandwidth_provider: &dyn BandwidthTicketProvider,
netstack_args: NetstackArgs,
) -> PortCheckResult {
let targets = Self::parse_port_check_targets(&setup.port_check_target);
let mut selected_target = targets
.first()
.cloned()
.unwrap_or_else(|| setup.port_check_target.clone());
if targets.len() > 1 {
let mut found = false;
for candidate in &targets {
if Self::is_target_reachable(candidate).await {
selected_target = candidate.clone();
found = true;
break;
}
warn!(
"Port-check target '{}' is unreachable, trying next",
candidate
);
}
if !found {
warn!(
"All port-check targets unreachable; falling back to first: '{}'",
selected_target
);
} else if selected_target != *targets.first().unwrap() {
info!(
"Port check: selected target '{}' (first reachable from list)",
selected_target
);
}
}
let mut netstack_args = netstack_args;
netstack_args.port_check_target = selected_target.clone();
info!("Warming up mixnet routes...");
let nym_address = *mixnet_client.nym_address();
let (warmup_result, mixnet_client) = do_ping(
mixnet_client,
nym_address,
setup.exit_node.exit_router_address,
false,
)
.await;
match warmup_result {
Ok(_) => info!("Mixnet warmup done"),
Err(e) => warn!("Warmup had issues ({e}), auth may be less reliable"),
}
let nym_address = *mixnet_client.nym_address();
let mixnet_listener_task =
AuthClientMixnetListener::new(mixnet_client, CancellationToken::new()).start();
let scan = Self::run_port_scan_with_retries(
&mixnet_listener_task,
nym_address,
setup.authenticator,
setup.exit_node.authenticator_version,
setup.ip_address,
bandwidth_provider,
TicketType::V1WireguardExit,
setup.exit_node.identity,
netstack_args,
None,
)
.await;
mixnet_listener_task.stop().await;
PortCheckResult {
gateway: setup.exit_identity,
can_register: scan.can_register,
port_check_target: selected_target,
ports: scan.port_results,
error: if scan.can_register {
None
} else {
scan.last_error
},
}
}
/// Create a probe with pre-queried gateway nodes
pub fn new(
entry_node: TestedNodeDetails,
@@ -90,7 +368,6 @@ impl Probe {
})
}
/// Run a probe as an NS agent (orchestrator for multiple probe runs for NS API)
pub async fn probe_run_agent(
mut self,
credential_args: CredentialArgs,
@@ -269,6 +546,174 @@ impl Probe {
self.do_probe_test(mixnet_client, bandwidth_provider).await
}
pub async fn run_ports(
entry_node: TestedNodeDetails,
exit_node: Option<TestedNodeDetails>,
network: NymNetworkDetails,
config: &RunPortsConfig,
config_dir: &PathBuf,
credential: CredentialMode,
) -> anyhow::Result<PortCheckResult> {
let exit_node = exit_node.unwrap_or(entry_node.clone());
let setup = PortCheckSetup::new(exit_node, config)?;
info!(
"Port check: testing {} ports on gateway {} via {}",
setup.ports_count, setup.exit_identity, setup.port_check_target
);
let storage_paths = StoragePaths::new_from_dir(config_dir)?;
let storage = storage_paths
.initialise_default_persistent_storage()
.await?;
let mixnet_debug_config = helpers::mixnet_debug_config(
config.min_gateway_mixnet_performance,
config.ignore_egress_epoch_role,
);
let topology = helpers::fetch_topology(&network, &mixnet_debug_config)
.await
.inspect_err(|e| warn!("Failed to fetch topology: {e}"))
.ok();
let mut mixnet_client_builder = MixnetClientBuilder::new_with_storage(storage.clone())
.request_gateway(entry_node.identity.to_string())
.network_details(network.clone())
.debug_config(mixnet_debug_config)
.with_forget_me(ForgetMe::new_stats())
.credentials_mode(!credential.use_mock_ecash);
if let Some(topology) = &topology {
mixnet_client_builder = mixnet_client_builder.custom_topology_provider(Box::new(
HardcodedTopologyProvider::new(topology.clone()),
));
}
let disconnected_mixnet_client = mixnet_client_builder.build()?;
// make sure identity keys exist before credential acquisition
// (acquire_bandwidth → create_bandwidth_client needs them on disk)
let key_store = storage.key_store();
if key_store.load_keys().await.is_err() {
debug!("Generating new client keys");
let mut rng = OsRng;
nym_client_core::init::generate_new_client_keys(&mut rng, key_store).await?;
}
credential
.acquire(&disconnected_mixnet_client, &storage)
.await?;
let bandwidth_provider = build_bandwidth_controller(
&network,
storage.credential_store().clone(),
credential.use_mock_ecash,
)?;
let mixnet_client = match disconnected_mixnet_client.connect_to_mixnet().await {
Ok(client) => {
info!(
"Connected to mixnet via entry gateway: {}",
entry_node.identity
);
info!("Our nym address: {}", *client.nym_address());
client
}
Err(e) => return Ok(setup.failed_to_connect(e)),
};
Ok(Self::port_check_after_connect(
mixnet_client,
setup,
bandwidth_provider.as_ref(),
config.netstack_args.clone(),
)
.await)
}
/// Bonded gateway port-check, run by the NS agent. Uses ephemeral storage and ticket
/// materials provided by the NS API instead of mnemonic-based acquisition.
pub async fn run_ports_for_agent(
entry_gateway: nym_sdk::mixnet::ed25519::PublicKey,
network: NymNetworkDetails,
config: &RunPortsConfig,
credential_args: CredentialArgs,
) -> anyhow::Result<PortCheckResult> {
let api_url = network
.endpoints
.first()
.and_then(|ep| ep.api_url())
.ok_or(anyhow::anyhow!("missing api url"))?;
let directory = NymApiDirectory::new(api_url).await?;
let entry_node = directory
.entry_gateway(&entry_gateway)?
.to_testable_node()?;
// agent always uses the entry gateway as the exit
let setup = PortCheckSetup::new(entry_node.clone(), config)?;
info!(
"Port check (agent): testing {} ports on gateway {} via {}",
setup.ports_count, setup.exit_identity, setup.port_check_target
);
let storage = Ephemeral::default();
let mixnet_debug_config = helpers::mixnet_debug_config(
config.min_gateway_mixnet_performance,
config.ignore_egress_epoch_role,
);
let topology = helpers::fetch_topology(&network, &mixnet_debug_config)
.await
.inspect_err(|e| warn!("Failed to fetch topology: {e}"))
.ok();
let mut mixnet_client_builder = MixnetClientBuilder::new_with_storage(storage.clone())
.request_gateway(entry_node.identity.to_string())
.network_details(network.clone())
.debug_config(mixnet_debug_config)
.with_forget_me(ForgetMe::new_stats())
.credentials_mode(true);
if let Some(topology) = &topology {
mixnet_client_builder = mixnet_client_builder.custom_topology_provider(Box::new(
HardcodedTopologyProvider::new(topology.clone()),
));
}
let disconnected_mixnet_client = mixnet_client_builder.build()?;
credential_args
.import_credential(&disconnected_mixnet_client)
.await?;
let bandwidth_provider =
build_bandwidth_controller(&network, storage.credential_store().clone(), false)?;
let mixnet_client = match disconnected_mixnet_client.connect_to_mixnet().await {
Ok(client) => {
info!(
"Connected to mixnet via entry gateway: {}",
entry_node.identity
);
info!("Our nym address: {}", *client.nym_address());
client
}
Err(e) => return Ok(setup.failed_to_connect(e)),
};
Ok(Self::port_check_after_connect(
mixnet_client,
setup,
bandwidth_provider.as_ref(),
config.netstack_args.clone(),
)
.await)
}
#[allow(clippy::too_many_arguments)]
pub async fn do_probe_test(
self,
@@ -369,6 +814,7 @@ impl Probe {
let mixnet_listener_task =
AuthClientMixnetListener::new(mixnet_client, CancellationToken::new())
.start();
let mut rng = rand::thread_rng();
let auth_client = AuthenticatorClient::new(
mixnet_listener_task.subscribe(),
@@ -397,6 +843,7 @@ impl Probe {
exit_node.authenticator_version,
self.config.amnezia_args.clone(),
self.config.netstack_args.clone(),
false,
credential,
)
.await
@@ -404,6 +851,7 @@ impl Probe {
// Add wg results to probe result
probe_result.outcome.wg = Some(outcome);
mixnet_listener_task.stop().await;
} else {
warn!("Not enough information to run WireGuard via mixnet registration tests");
+140 -9
View File
@@ -1,12 +1,15 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use clap::{Parser, Subcommand};
use clap::{Args, Parser, Subcommand};
use nym_bin_common::bin_info;
use nym_config::defaults::setup_env;
use nym_gateway_probe::config::{CredentialArgs, CredentialMode, ProbeConfig};
use nym_gateway_probe::{NymApiDirectory, ProbeResult, query_gateway_by_ip};
use nym_gateway_probe::config::{CredentialArgs, CredentialMode, NetstackArgs, ProbeConfig};
use nym_gateway_probe::{
NymApiDirectory, PortCheckResult, ProbeResult, RunPortsConfig, query_gateway_by_ip,
};
use nym_sdk::mixnet::NodeIdentity;
use serde::Serialize;
use std::path::Path;
use std::{path::PathBuf, sync::OnceLock};
use tracing::*;
@@ -28,7 +31,7 @@ struct CliArgs {
config_env_file: Option<PathBuf>,
/// Disable logging during probe
#[arg(long)]
#[arg(long, global = true)]
no_log: bool,
}
@@ -81,6 +84,35 @@ enum Commands {
probe_config: ProbeConfig,
},
/// Check WG exit policy ports on a bonded gateway.
/// Tests TCP connectivity through the WG tunnel for each port.
/// Use --check-ports to pick specific ports, or --check-all-ports for the full exit policy list.
RunPorts {
/// Directory for credential and mixnet storage
#[arg(long)]
config_dir: Option<PathBuf>,
/// Bonded gateway identity.
#[arg(long, short = 'g', alias = "gateway")]
entry_gateway: NodeIdentity,
/// Separate exit gateway to test (entry_gateway is used for mixnet entry).
#[arg(long)]
exit_gateway: Option<NodeIdentity>,
/// Test every port in the canonical exit policy (network-tunnel-manager.sh PORT_MAPPINGS).
/// Overrides --check-ports.
#[arg(long)]
check_all_ports: bool,
/// Arguments to manage credentials
#[command(flatten)]
credential_mode: CredentialMode,
#[command(flatten)]
probe_config: RunPortsProbeConfig,
},
/// Run the probe by NS agents
RunAgent {
/// The specific gateway specified by ID.
@@ -96,6 +128,29 @@ enum Commands {
},
}
#[derive(Debug, Args, Clone)]
struct RunPortsProbeConfig {
/// Only choose gateway with that minimum performance
#[arg(long)]
min_gateway_mixnet_performance: Option<u8>,
/// Ignore egress epoch role constraints
#[arg(long, global = true)]
ignore_egress_epoch_role: bool,
/// Arguments to manage netstack downloads and port checks
#[command(flatten)]
netstack_args: NetstackArgs,
}
/// CLI output wrapper — either a standard probe result or a port-check result
#[derive(Serialize)]
#[serde(untagged)]
pub(crate) enum ProbeOutput {
Standard(ProbeResult),
PortCheck(PortCheckResult),
}
fn setup_logging() {
// SAFETY: those are valid directives
#[allow(clippy::unwrap_used)]
@@ -112,7 +167,7 @@ fn setup_logging() {
.init();
}
pub(crate) async fn run() -> anyhow::Result<ProbeResult> {
pub(crate) async fn run() -> anyhow::Result<ProbeOutput> {
let args = CliArgs::parse();
if !args.no_log {
setup_logging();
@@ -122,7 +177,7 @@ pub(crate) async fn run() -> anyhow::Result<ProbeResult> {
setup_env(args.config_env_file.as_ref());
let network = nym_sdk::NymNetworkDetails::new_from_env();
info!("{:#?}", network);
debug!("{:#?}", network);
match args.command {
Commands::RunLocal {
@@ -165,7 +220,9 @@ pub(crate) async fn run() -> anyhow::Result<ProbeResult> {
let trial =
nym_gateway_probe::Probe::new(entry_details, exit_details, network, probe_config);
Box::pin(trial.probe_run_locally(&config_dir, credential_mode)).await
Box::pin(trial.probe_run_locally(&config_dir, credential_mode))
.await
.map(ProbeOutput::Standard)
}
Commands::Run {
entry_gateway,
@@ -209,7 +266,79 @@ pub(crate) async fn run() -> anyhow::Result<ProbeResult> {
let trial =
nym_gateway_probe::Probe::new(entry_details, exit_details, network, probe_config);
Box::pin(trial.probe_run(&config_dir, credential_mode)).await
Box::pin(trial.probe_run(&config_dir, credential_mode))
.await
.map(ProbeOutput::Standard)
}
Commands::RunPorts {
entry_gateway,
exit_gateway,
config_dir,
check_all_ports,
credential_mode,
probe_config,
} => {
let mut run_ports_config = RunPortsConfig {
min_gateway_mixnet_performance: probe_config.min_gateway_mixnet_performance,
ignore_egress_epoch_role: probe_config.ignore_egress_epoch_role,
netstack_args: probe_config.netstack_args,
};
// --check-all-ports overrides --check-ports with the full exit policy list
if check_all_ports {
use nym_gateway_probe::config::EXIT_POLICY_PORTS;
run_ports_config.netstack_args.port_check_ports = EXIT_POLICY_PORTS.to_vec();
info!(
"Using full exit policy port list ({} ports)",
EXIT_POLICY_PORTS.len()
);
}
let api_url = network
.endpoints
.first()
.and_then(|ep| ep.api_url())
.ok_or(anyhow::anyhow!("missing api url"))?;
let directory = NymApiDirectory::new(api_url).await?;
let entry_details = directory
.entry_gateway(&entry_gateway)?
.to_testable_node()?;
let exit_details = exit_gateway
.map(|id_key| directory.exit_gateway(&id_key))
.transpose()?
.map(|node| node.to_testable_node())
.transpose()?;
let config_dir = config_dir
.clone()
.unwrap_or_else(|| Path::new(DEFAULT_CONFIG_DIR).join(&network.network_name));
if config_dir.is_file() {
anyhow::bail!("provided configuration directory is a file");
}
if !config_dir.exists() {
std::fs::create_dir_all(config_dir.clone())?;
}
info!(
"using the following directory for the probe config: {}",
config_dir.display()
);
Box::pin(nym_gateway_probe::Probe::run_ports(
entry_details,
exit_details,
network,
&run_ports_config,
&config_dir,
credential_mode,
))
.await
.map(ProbeOutput::PortCheck)
}
Commands::RunAgent {
entry_gateway,
@@ -219,7 +348,9 @@ pub(crate) async fn run() -> anyhow::Result<ProbeResult> {
let trial =
nym_gateway_probe::Probe::new_for_agent(entry_gateway, network, probe_config)
.await?;
Box::pin(trial.probe_run_agent(credential_args)).await
Box::pin(trial.probe_run_agent(credential_args))
.await
.map(ProbeOutput::Standard)
}
}
}