Bugfix/cherry pick/waterloo ns api (#6833)

* NS: don't return nodes with 0 performance

* reduce concurrency during quorum check tests

* add additional leniency in ticketbook requests
This commit is contained in:
Jędrzej Stuczyński
2026-05-28 14:03:18 +01:00
committed by GitHub
parent dc64fb622c
commit dd8c0a2521
16 changed files with 208 additions and 96 deletions
@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO pending_issuance\n (deposit_id, serialization_revision, pending_ticketbook_data, expiration_date)\n VALUES ($1, $2, $3, $4)\n ",
"query": "\n INSERT INTO pending_issuance\n (deposit_id, serialization_revision, pending_ticketbook_data, expiration_date, epoch_id, failure_message)\n VALUES ($1, $2, $3, $4, $5, $6)\n ",
"describe": {
"columns": [],
"parameters": {
@@ -8,10 +8,12 @@
"Int4",
"Int2",
"Bytea",
"Date"
"Date",
"Int4",
"Text"
]
},
"nullable": []
},
"hash": "2ee6b058d423a66114d8411e7c287ade31137b30407dc0254d30f60e2d0101cf"
"hash": "7c2f58e63efd85010408f812692ecad1c89d9df3ffaf4b5d00db5adfdef854c4"
}
@@ -3,7 +3,7 @@
[package]
name = "nym-node-status-api"
version = "4.6.2-rc7"
version = "4.6.2-rc10"
authors.workspace = true
edition.workspace = true
license.workspace = true
@@ -30,17 +30,21 @@ impl Storage {
deposit_id: i32,
data: &[u8],
expiration_date: Date,
epoch_id: i32,
failure_message: &str,
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO pending_issuance
(deposit_id, serialization_revision, pending_ticketbook_data, expiration_date)
VALUES ($1, $2, $3, $4)
(deposit_id, serialization_revision, pending_ticketbook_data, expiration_date, epoch_id, failure_message)
VALUES ($1, $2, $3, $4, $5, $6)
"#,
deposit_id,
serialisation_revision,
data,
expiration_date,
epoch_id,
failure_message,
)
.execute(&self.pool)
.await?;
@@ -384,87 +384,87 @@ impl HttpCache {
let socks5_scores = calculate_socks5_percentiles(&gateways);
let res_gws = gateways
.iter()
.filter(|gw| gw.bonded)
.filter_map(|gw| match skimmed_nodes.get(&gw.gateway_identity_key) {
Some(skimmed_node) => Some((gw, skimmed_node)),
None => {
error!(
"CRITICAL: Gateway {} exists in gateways table but not in nym_nodes table! This should not happen.",
gw.gateway_identity_key
);
None
}
})
.filter_map(|(gw, skimmed_node)| {
let family = family_lookup.family_for_node(skimmed_node.node_id).cloned();
let staking = bond_info
.get(&skimmed_node.node_id)
.map(|details| NodeStakeInformation::from(&details.rewarding_details));
match DVpnGateway::new(
gw.clone(),
skimmed_node,
socks5_scores.get(&gw.gateway_identity_key),
family,
staking,
) {
Ok(gw) => Some(gw),
Err(err) => {
error!(
"CRITICAL: Failed to create DVpnGateway for node_id={}, identity_key={}: {}",
skimmed_node.node_id,
skimmed_node.ed25519_identity_pubkey.to_base58_string(),
err
);
// Don't panic here as this might be due to missing fields, but log it loudly
None
}
}
})
.filter(|gw| {
// gateways must have a country
if gw.location.two_letter_iso_country_code.len() == 2 {
true
} else {
warn!(
"Invalid country code: {}",
gw.location.two_letter_iso_country_code
);
false
}
})
// sort by country, then by identity key
.sorted_by_key(|item| {
(
item.location.two_letter_iso_country_code.clone(),
item.identity_key.clone(),
)
})
.collect::<Vec<_>>();
let mut dvpd_gateways = Vec::new();
let bonded_count = gateways.iter().filter(|gw| gw.bonded).count();
for gw in gateways {
let id = gw.gateway_identity_key.clone();
// 1. reject all gateways that are not bonded
if !gw.bonded {
continue;
}
// 2. reject all gateways with zero performance
if gw.performance == 0 {
continue;
}
// 3. get corresponding directory details
let Some(skimmed_node) = skimmed_nodes.get(&id) else {
error!(
"CRITICAL: Gateway {id} exists in gateways table but not in nym_nodes table! This should not happen",
);
continue;
};
let node_id = skimmed_node.node_id;
// 4. get corresponding auxiliary details
let family = family_lookup.family_for_node(skimmed_node.node_id).cloned();
let staking = bond_info
.get(&skimmed_node.node_id)
.map(|details| NodeStakeInformation::from(&details.rewarding_details));
let socks5_score = socks5_scores.get(&id);
// 5. construct the DVpnGateway model
let dvpn_gw = match DVpnGateway::new(gw, skimmed_node, socks5_score, family, staking) {
Ok(gw) => gw,
Err(err) => {
error!(
"CRITICAL: Failed to create DVpnGateway for node_id={node_id}, identity_key={id}: {err}",
);
// Don't panic here as this might be due to missing fields, but log it loudly
continue;
}
};
// 6. filter out nodes without valid country codes
if dvpn_gw.location.two_letter_iso_country_code.len() != 2 {
warn!(
"Invalid country code: {}",
dvpn_gw.location.two_letter_iso_country_code
);
continue;
}
dvpd_gateways.push(dvpn_gw);
}
// 7. finally, sort the nodes by country, then by identity key
dvpd_gateways.sort_by_key(|item| {
(
item.location.two_letter_iso_country_code.clone(),
item.identity_key.clone(),
)
});
tracing::info!(
"DVpn gateway filtering: {} total gateways, {} bonded, {} nym_nodes, {} final DVpn gateways",
started_with,
bonded_count,
"DVpn gateway filtering: {started_with} total gateways, {bonded_count} bonded, {} nym_nodes, {} final DVpn gateways",
skimmed_nodes.len(),
res_gws.len()
dvpd_gateways.len()
);
if res_gws.is_empty() && started_with > 0 {
if dvpd_gateways.is_empty() && started_with > 0 {
tracing::error!(
"CRITICAL: Started with {} gateways but got 0 DVpn gateways!",
started_with
"CRITICAL: Started with {started_with} gateways but got 0 DVpn gateways!"
);
} else {
tracing::info!(
"Successfully loaded {} DVpn gateways into cache",
res_gws.len()
dvpd_gateways.len()
);
}
res_gws
dvpd_gateways
}
pub async fn get_entry_dvpn_gateways(
@@ -4,6 +4,7 @@ use crate::node_scraper::helpers::scrape_and_store_description_by_node_id;
use crate::ticketbook_manager::TicketbookManager;
use crate::ticketbook_manager::state::TicketbookManagerState;
use clap::Parser;
use nym_bin_common::bin_info_owned;
use nym_credential_proxy_lib::quorum_checker::QuorumStateChecker;
use nym_credential_proxy_lib::shared_state::nyxd_client::ChainClient;
use nym_crypto::asymmetric::ed25519::PublicKey;
@@ -11,6 +12,7 @@ use nym_network_defaults::setup_env;
use nym_task::ShutdownManager;
use nym_validator_client::nyxd::NyxdClient;
use std::sync::Arc;
use tracing::info;
mod cli;
mod db;
@@ -27,6 +29,9 @@ mod utils;
async fn main() -> anyhow::Result<()> {
logging::setup_tracing_logger()?;
let bin_info = bin_info_owned!();
info!("using the following version: {bin_info}");
let args = cli::Cli::parse();
if let Some(env_file) = &args.config_env_file {
setup_env(Some(env_file));
@@ -121,11 +121,12 @@ impl TicketbookManager {
{
Err(err) => {
error!("failed to obtain aggregated wallet: {err}");
let failure_message = err.to_string();
self.state
.storage()
.insert_pending_ticketbook(&issuance_data).await.inspect_err(|err| {
.insert_pending_ticketbook(&issuance_data, epoch_id, &failure_message).await.inspect_err(|store_err| {
let deposit = issuance_data.deposit_id();
error!("could not save the recovery data for deposit {deposit}: {err}. the data will unfortunately get lost")
error!("could not save the recovery data for deposit {deposit}: {store_err}. the data will unfortunately get lost")
})?;
return Err(err.into());
}
@@ -43,6 +43,8 @@ impl TicketbookManagerStorage {
pub(crate) async fn insert_pending_ticketbook(
&self,
ticketbook: &IssuanceTicketBook,
epoch_id: EpochId,
failure_message: &str,
) -> anyhow::Result<()> {
let ser = ticketbook.pack();
let data = Zeroizing::new(ser.data);
@@ -54,6 +56,8 @@ impl TicketbookManagerStorage {
ticketbook.deposit_id() as i32,
&data,
ticketbook.expiration_date(),
epoch_id as i32,
failure_message,
)
.await?;