fix cache eviction if node loses its address
This commit is contained in:
+86
-25
@@ -75,8 +75,10 @@ struct CachedChainCapabilities {
|
||||
/// Only successful lookups are stored. Nodes that don't advertise a usable on-chain address, and
|
||||
/// nodes whose query failed, are intentionally absent: they're cheaply re-derived from the
|
||||
/// described data on every refresh and retried as needed, rather than being pinned to a stale
|
||||
/// `false` for a whole TTL window. This map is purely in-memory and never persisted, so a restart
|
||||
/// simply triggers a one-off full re-query on the first refresh.
|
||||
/// `false` for a whole TTL window. Entries for nodes that unbond or later drop their address are
|
||||
/// evicted on the next refresh, so a stale value can't outlive the address it was derived from.
|
||||
/// This map is purely in-memory and never persisted, so a restart simply triggers a one-off full
|
||||
/// re-query on the first refresh.
|
||||
#[derive(Default)]
|
||||
struct ChainCapabilitiesCache {
|
||||
entries: HashMap<NodeId, CachedChainCapabilities>,
|
||||
@@ -115,9 +117,10 @@ impl ChainCapabilitiesCache {
|
||||
);
|
||||
}
|
||||
|
||||
/// Drops entries for nodes not in `live`, keeping the map bounded as nodes unbond.
|
||||
fn retain_live(&mut self, live: &HashSet<NodeId>) {
|
||||
self.entries.retain(|node_id, _| live.contains(node_id));
|
||||
/// Drops every entry whose node id is not in `keep`. Used to evict nodes that have unbonded or
|
||||
/// no longer advertise a usable on-chain address, so their stale capabilities stop being served.
|
||||
fn retain_only(&mut self, keep: &HashSet<NodeId>) {
|
||||
self.entries.retain(|node_id, _| keep.contains(node_id));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -296,35 +299,31 @@ impl NodeStatusCacheRefresher {
|
||||
/// Refreshes cached chain capabilities (balance + feegrant) for described nodes that need it:
|
||||
/// those with no cached value or whose value is older than the configured TTL. Nodes that
|
||||
/// don't advertise a usable on-chain address are skipped entirely - there's nothing to query,
|
||||
/// so we neither store nor retry them. Only successful lookups are recorded; a failed query
|
||||
/// leaves any previous value untouched and is retried on the next refresh.
|
||||
/// so we neither store nor retry them, and any value cached from a previously-advertised address
|
||||
/// is evicted. Only successful lookups are recorded; a failed query leaves any previous value
|
||||
/// untouched and is retried on the next refresh.
|
||||
// SAFETY: unwrap is fine as if the mutex got poisoned we'd be experiencing some UB anyway
|
||||
#[allow(clippy::unwrap_used)]
|
||||
async fn refresh_chain_capabilities(&mut self, nodes: &DescribedNodes) {
|
||||
// the describe cache only ever contains bonded nym-nodes, so pruning to its current
|
||||
// membership drops entries for nodes that have since unbonded and keeps the map bounded.
|
||||
let live_ids = nodes.nodes.keys().copied().collect::<HashSet<_>>();
|
||||
self.chain_capabilities.retain_live(&live_ids);
|
||||
// resolve the current usable on-chain account for every described node (parsing once so the
|
||||
// result is shared by both the eviction and the query paths below).
|
||||
let addressed = usable_chain_accounts(nodes);
|
||||
|
||||
// evict cached entries for any node not in this set: those that have unbonded (the describe
|
||||
// cache only ever holds bonded nym-nodes) and those that no longer advertise a usable
|
||||
// address, so a stale value can't keep flowing into scoring after its address is gone.
|
||||
let usable_ids = addressed.iter().map(|(id, _)| *id).collect::<HashSet<_>>();
|
||||
self.chain_capabilities.retain_only(&usable_ids);
|
||||
|
||||
let ttl = self.config.chain_capabilities_refresh_interval;
|
||||
let denom = self.config.minimum_on_chain_balance.denom.clone();
|
||||
|
||||
// collect (node_id, account_id) for nodes with a valid address that are due a refresh.
|
||||
// of the addressed nodes, query only those due a refresh (no cached value or past TTL).
|
||||
// materialised into a Vec so the immutable borrow on `self.chain_capabilities` ends before
|
||||
// the async queries (and before we record the results back into it).
|
||||
let to_query = nodes
|
||||
.nodes
|
||||
.values()
|
||||
.filter(|n| self.chain_capabilities.needs_refresh(n.node_id, ttl))
|
||||
.filter_map(|n| {
|
||||
let addr = n.description.auxiliary_details.address.as_ref()?;
|
||||
AccountId::from_str(addr)
|
||||
.inspect_err(|_| {
|
||||
warn!("node {} has provided an invalid account address", n.node_id)
|
||||
})
|
||||
.ok()
|
||||
.map(|account_id| (n.node_id, account_id))
|
||||
})
|
||||
let to_query = addressed
|
||||
.into_iter()
|
||||
.filter(|(node_id, _)| self.chain_capabilities.needs_refresh(*node_id, ttl))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if to_query.is_empty() {
|
||||
@@ -513,6 +512,25 @@ impl NodeStatusCacheRefresher {
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolves the current usable on-chain account for each described node, returning `(node_id,
|
||||
/// account_id)` pairs. Nodes that advertise no address (e.g. running an old version) or an
|
||||
/// unparseable one are excluded - there's nothing to query for them.
|
||||
fn usable_chain_accounts(nodes: &DescribedNodes) -> Vec<(NodeId, AccountId)> {
|
||||
nodes
|
||||
.nodes
|
||||
.values()
|
||||
.filter_map(|n| {
|
||||
let addr = n.description.auxiliary_details.address.as_ref()?;
|
||||
AccountId::from_str(addr)
|
||||
.inspect_err(|_| {
|
||||
warn!("node {} has provided an invalid account address", n.node_id)
|
||||
})
|
||||
.ok()
|
||||
.map(|account_id| (n.node_id, account_id))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn retrieve_chain_capabilities(
|
||||
query_client: &QueryHttpRpcNyxdClient,
|
||||
node_id: NodeId,
|
||||
@@ -646,4 +664,47 @@ mod tests {
|
||||
// a node with no self-described data is out of scope (the orchestrator can't classify it)
|
||||
assert!(!stress_test_eligible(None));
|
||||
}
|
||||
|
||||
fn described_nodes(nodes: impl IntoIterator<Item = NymNodeDescriptionV3>) -> DescribedNodes {
|
||||
DescribedNodes {
|
||||
nodes: nodes.into_iter().map(|n| (n.node_id, n)).collect(),
|
||||
addresses_cache: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cached_capabilities_are_evicted_when_a_node_loses_its_usable_address() {
|
||||
let (with_addr, without_addr, unbonded) = (1, 2, 3);
|
||||
|
||||
let mut keeps_address = mock_nym_node_description(0);
|
||||
keeps_address.node_id = with_addr;
|
||||
|
||||
// still bonded/described, but its self-described address is gone (e.g. downgraded to a
|
||||
// version that only exposes v1 auxiliary details)
|
||||
let mut drops_address = mock_nym_node_description(1);
|
||||
drops_address.node_id = without_addr;
|
||||
drops_address.description.auxiliary_details.address = None;
|
||||
|
||||
let described = described_nodes([keeps_address, drops_address]);
|
||||
|
||||
let mut cache = ChainCapabilitiesCache::default();
|
||||
let caps = ChainInteractionCapabilitiesDetailed {
|
||||
on_chain_balance: coin(1_000000, "unym"),
|
||||
is_feegrant_grantee: true,
|
||||
};
|
||||
let now = Instant::now();
|
||||
cache.record(with_addr, caps.clone(), now);
|
||||
cache.record(without_addr, caps.clone(), now);
|
||||
cache.record(unbonded, caps, now); // no longer in the describe cache at all
|
||||
|
||||
let usable_ids = usable_chain_accounts(&described)
|
||||
.into_iter()
|
||||
.map(|(id, _)| id)
|
||||
.collect::<HashSet<_>>();
|
||||
cache.retain_only(&usable_ids);
|
||||
|
||||
assert!(cache.get(with_addr).is_some()); // still advertises a usable address -> kept
|
||||
assert!(cache.get(without_addr).is_none()); // address dropped -> evicted
|
||||
assert!(cache.get(unbonded).is_none()); // no longer described -> evicted
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user