change chain capabilities refresh behaviour to not tie new nodes to global timer

This commit is contained in:
Jędrzej Stuczyński
2026-06-05 16:26:17 +01:00
parent c24814179f
commit 7b8753e529
2 changed files with 121 additions and 75 deletions
+1 -16
View File
@@ -5,7 +5,7 @@ use self::data::NodeStatusCacheData;
use crate::node_performance::provider::PerformanceRetrievalFailure;
use crate::support::caching::cache::{SharedCache, UninitialisedCache};
use crate::support::caching::Cache;
use nym_api_requests::models::{ChainInteractionCapabilitiesDetailed, NodeAnnotationV2};
use nym_api_requests::models::NodeAnnotationV2;
use nym_mixnet_contract_common::NodeId;
use std::collections::HashMap;
use std::path::Path;
@@ -103,19 +103,4 @@ impl NodeStatusCache {
) -> Result<RwLockReadGuard<'_, HashMap<NodeId, NodeAnnotationV2>>, UninitialisedCache> {
self.get(|c| &c.node_annotations).await
}
async fn chain_information(
&self,
) -> Result<HashMap<NodeId, Option<ChainInteractionCapabilitiesDetailed>>, NodeStatusCacheError>
{
Ok(self
.cache()
.await?
.node_annotations
.iter()
.map(|(node_id, annotation)| {
(*node_id, annotation.chain_interaction_capabilities.clone())
})
.collect::<HashMap<_, _>>())
}
}
+120 -59
View File
@@ -29,7 +29,7 @@ use nym_topology::CachedEpochRewardedSet;
use nym_validator_client::nyxd::module_traits::feegrant::query::FeegrantQueryClient;
use nym_validator_client::nyxd::{AccountId, CosmWasmClient};
use nym_validator_client::QueryHttpRpcNyxdClient;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
@@ -41,8 +41,9 @@ pub(crate) struct NodeStatusCacheConfig {
pub(crate) minimum_on_chain_balance: Coin,
pub(crate) chain_capabilities_retrieval_concurrency: usize,
/// Indicates how often should the chain balances (and feegrants) of known nodes be refreshed.
/// (it is an overkill to do it every single iteration)
/// How long a node's cached chain capabilities (balance + feegrant) stay valid before being
/// re-queried. Evaluated per node, so lookups are spread out over time rather than refreshed
/// in a single burst.
pub(crate) chain_capabilities_refresh_interval: Duration,
pub(crate) fallback_caching_interval: Duration,
@@ -62,12 +63,71 @@ pub(crate) struct NodeStatusCacheConfig {
pub(crate) stress_testing_score_weight: f64,
}
/// A successfully-retrieved chain-capability lookup for a single node, tagged with the instant it
/// was fetched so its freshness can be evaluated against a TTL.
struct CachedChainCapabilities {
capabilities: ChainInteractionCapabilitiesDetailed,
fetched_at: Instant,
}
/// In-memory cache of successful chain-capability lookups, keyed by node id.
///
/// Only successful lookups are stored. Nodes that don't advertise a usable on-chain address, and
/// nodes whose query failed, are intentionally absent: they're cheaply re-derived from the
/// described data on every refresh and retried as needed, rather than being pinned to a stale
/// `false` for a whole TTL window. This map is purely in-memory and never persisted, so a restart
/// simply triggers a one-off full re-query on the first refresh.
#[derive(Default)]
struct ChainCapabilitiesCache {
entries: HashMap<NodeId, CachedChainCapabilities>,
}
impl ChainCapabilitiesCache {
/// Whether the node should be (re)queried: true if we have no cached value, or the cached value
/// is older than `ttl`.
fn needs_refresh(&self, node_id: NodeId, ttl: Duration) -> bool {
match self.entries.get(&node_id) {
None => true,
Some(entry) => entry.fetched_at.elapsed() > ttl,
}
}
/// Last known capabilities for a node, regardless of age. `None` if it has never been
/// successfully queried (e.g. it advertises no address, or every query so far has failed).
fn get(&self, node_id: NodeId) -> Option<ChainInteractionCapabilitiesDetailed> {
self.entries
.get(&node_id)
.map(|entry| entry.capabilities.clone())
}
fn record(
&mut self,
node_id: NodeId,
capabilities: ChainInteractionCapabilitiesDetailed,
fetched_at: Instant,
) {
self.entries.insert(
node_id,
CachedChainCapabilities {
capabilities,
fetched_at,
},
);
}
/// Drops entries for nodes not in `live`, keeping the map bounded as nodes unbond.
fn retain_live(&mut self, live: &HashSet<NodeId>) {
self.entries.retain(|node_id, _| live.contains(node_id));
}
}
// Long running task responsible for keeping the node status cache up-to-date.
pub struct NodeStatusCacheRefresher {
config: NodeStatusCacheConfig,
/// Indicates the last time chain capabilities of known nodes were refreshed.
last_refreshed_chain_capabilities: Option<Instant>,
/// Successful chain-capability lookups (balance + feegrant) cached per node, each with its own
/// TTL so they're re-queried independently rather than all at once.
chain_capabilities: ChainCapabilitiesCache,
// Main stored data
cache: NodeStatusCache,
@@ -117,7 +177,7 @@ impl NodeStatusCacheRefresher {
Self {
cache,
config,
last_refreshed_chain_capabilities: None,
chain_capabilities: ChainCapabilitiesCache::default(),
mixnet_contract_cache: contract_cache,
described_cache,
mixnet_contract_cache_listener: contract_cache_listener,
@@ -233,30 +293,43 @@ impl NodeStatusCacheRefresher {
fallback_interval.reset();
}
/// Refreshes cached chain capabilities (balance + feegrant) for described nodes that need it:
/// those with no cached value or whose value is older than the configured TTL. Nodes that
/// don't advertise a usable on-chain address are skipped entirely - there's nothing to query,
/// so we neither store nor retry them. Only successful lookups are recorded; a failed query
/// leaves any previous value untouched and is retried on the next refresh.
// SAFETY: unwrap is fine as if the mutex got poisoned we'd be experiencing some UB anyway
#[allow(clippy::unwrap_used)]
async fn retrieve_chain_info(
&self,
nodes: &DescribedNodes,
) -> Result<HashMap<NodeId, Option<ChainInteractionCapabilitiesDetailed>>, NodeStatusCacheError>
{
async fn refresh_chain_capabilities(&mut self, nodes: &DescribedNodes) {
// the describe cache only ever contains bonded nym-nodes, so pruning to its current
// membership drops entries for nodes that have since unbonded and keeps the map bounded.
let live_ids = nodes.nodes.keys().copied().collect::<HashSet<_>>();
self.chain_capabilities.retain_live(&live_ids);
let ttl = self.config.chain_capabilities_refresh_interval;
let denom = self.config.minimum_on_chain_balance.denom.clone();
// create an iterator of node ids with valid associated account addresses
let to_check = nodes.nodes.values().filter_map(|n| {
n.description
.auxiliary_details
.address
.as_ref()
.and_then(|addr| {
AccountId::from_str(addr)
.inspect_err(|_| {
warn!("node {} has provided an invalid account address", n.node_id)
})
.ok()
.map(|account_id| (n.node_id, account_id))
})
});
// collect (node_id, account_id) for nodes with a valid address that are due a refresh.
// materialised into a Vec so the immutable borrow on `self.chain_capabilities` ends before
// the async queries (and before we record the results back into it).
let to_query = nodes
.nodes
.values()
.filter(|n| self.chain_capabilities.needs_refresh(n.node_id, ttl))
.filter_map(|n| {
let addr = n.description.auxiliary_details.address.as_ref()?;
AccountId::from_str(addr)
.inspect_err(|_| {
warn!("node {} has provided an invalid account address", n.node_id)
})
.ok()
.map(|account_id| (n.node_id, account_id))
})
.collect::<Vec<_>>();
if to_query.is_empty() {
return;
}
// note: we use `for_each_concurrent` rather than `stream::iter(..).buffer_unordered(..)`.
// The latter yields a `Stream` whose `Send` bound gets over-generalised once chained into
@@ -264,28 +337,29 @@ impl NodeStatusCacheRefresher {
let concurrency = self.config.chain_capabilities_retrieval_concurrency.max(1);
// std Mutex is fine because we don't hold it across await points
let capabilities = std::sync::Mutex::new(HashMap::<
NodeId,
Option<ChainInteractionCapabilitiesDetailed>,
>::new());
futures::stream::iter(to_check)
let fresh = std::sync::Mutex::new(Vec::new());
futures::stream::iter(to_query)
.for_each_concurrent(concurrency, |(node_id, account_id)| {
let denom = denom.clone();
let query_client = &self.query_client;
let capabilities = &capabilities;
let fresh = &fresh;
async move {
let chain_info =
retrieve_chain_capabilities(query_client, node_id, account_id, denom).await;
capabilities.lock().unwrap().insert(node_id, chain_info);
if let Some(caps) =
retrieve_chain_capabilities(query_client, node_id, account_id, denom).await
{
fresh.lock().unwrap().push((node_id, caps));
}
}
})
.await;
Ok(capabilities.into_inner().unwrap())
let now = Instant::now();
for (node_id, caps) in fresh.into_inner().unwrap() {
self.chain_capabilities.record(node_id, caps, now);
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn produce_node_annotations(
async fn produce_node_annotations(
&self,
config_score_data: &ConfigScoreData,
routing_scores: &NodesRoutingScores,
@@ -293,7 +367,6 @@ impl NodeStatusCacheRefresher {
nym_nodes: &[NymNodeDetails],
rewarded_set: &CachedEpochRewardedSet,
described_nodes: &DescribedNodes,
chain_capabilities: HashMap<NodeId, Option<ChainInteractionCapabilitiesDetailed>>,
) -> HashMap<NodeId, NodeAnnotationV2> {
let mut annotations = HashMap::new();
if nym_nodes.is_empty() {
@@ -334,7 +407,7 @@ impl NodeStatusCacheRefresher {
let described = described_nodes.get_node(&node_id);
let routing_score = routing_scores.get_or_log(node_id);
let stress_testing_score = stress_testing_scores.get_or_log(node_id);
let node_chain_cap = chain_capabilities.get(&node_id).unwrap_or(&None).clone();
let node_chain_cap = self.chain_capabilities.get(node_id);
let config_score = calculate_config_score(
minimum_balance,
@@ -372,13 +445,6 @@ impl NodeStatusCacheRefresher {
annotations
}
fn should_refresh_chain_interaction(&self) -> bool {
let Some(last_refresh) = self.last_refreshed_chain_capabilities else {
return true;
};
last_refresh.elapsed() > self.config.chain_capabilities_refresh_interval
}
/// Refreshes the node status cache by fetching the latest data from the contract cache
#[allow(deprecated)]
async fn refresh(&mut self) -> Result<(), NodeStatusCacheError> {
@@ -390,7 +456,10 @@ impl NodeStatusCacheRefresher {
let nym_nodes = self.mixnet_contract_cache.nym_nodes().await;
let config_score_data = self.mixnet_contract_cache.maybe_config_score_data().await?;
let Ok(described) = self.described_cache.get().await else {
// clone the cache handle (cheap Arc clone) so the read guard borrows the local rather than
// `self`, leaving us free to take `&mut self` for the chain-capability refresh below.
let described_cache = self.described_cache.clone();
let Ok(described) = described_cache.get().await else {
return Err(NodeStatusCacheError::UnavailableDescribedCache);
};
@@ -413,16 +482,9 @@ impl NodeStatusCacheRefresher {
)
.await?;
// decide whether to refresh cache of node balances
let chain_info = if self.should_refresh_chain_interaction() {
let info = self.retrieve_chain_info(&described).await?;
self.last_refreshed_chain_capabilities = Some(Instant::now());
info
} else {
// use the currently cached values instead
self.cache.chain_information().await?
};
// refresh chain capabilities (balance + feegrant) for nodes that are due (new, previously
// failed, or past their TTL), querying only the delta rather than the whole network.
self.refresh_chain_capabilities(&described).await;
// Create annotated data
let node_annotations = self
@@ -433,7 +495,6 @@ impl NodeStatusCacheRefresher {
&nym_nodes,
&rewarded_set,
&described,
chain_info,
)
.await;