feat: introduce shared contract caches within Nym API (#6760)

it has been extracted from the mix stress testing branch and it is going to be used within node families branch
This commit is contained in:
Jędrzej Stuczyński
2026-05-11 13:02:37 +01:00
committed by GitHub
parent 412657f773
commit 2d72b1b201
4 changed files with 644 additions and 337 deletions
+15 -66
View File
@@ -2,40 +2,32 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::node_status_api::models::AxumErrorResponse;
use crate::support::http::state::helpers::ChainSharedCacheWithTtl;
use crate::support::nyxd::Client;
use nym_api_requests::models::DetailedChainStatus;
use std::sync::Arc;
use nym_validator_client::nyxd::error::NyxdError;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::RwLock;
#[derive(Clone)]
pub(crate) struct ChainStatusCache {
cache_ttl: Duration,
inner: Arc<RwLock<Option<ChainStatusCacheInner>>>,
}
pub(crate) struct ChainStatusCache(ChainSharedCacheWithTtl<DetailedChainStatus>);
impl ChainStatusCache {
pub(crate) fn new(cache_ttl: Duration) -> Self {
ChainStatusCache {
cache_ttl,
inner: Arc::new(Default::default()),
}
ChainStatusCache(ChainSharedCacheWithTtl::new(cache_ttl))
}
}
struct ChainStatusCacheInner {
last_refreshed_at: OffsetDateTime,
cache_value: DetailedChainStatus,
}
async fn refresh(client: &Client) -> Result<DetailedChainStatus, NyxdError> {
// 3. attempt to query the chain for the chain data
let abci = client.abci_info().await?;
let block = client
.block_info(abci.last_block_height.value() as u32)
.await?;
impl ChainStatusCacheInner {
fn is_valid(&self, ttl: Duration) -> bool {
if self.last_refreshed_at + ttl > OffsetDateTime::now_utc() {
return true;
}
false
}
Ok(DetailedChainStatus {
abci: abci.into(),
latest_block: block.into(),
})
}
impl ChainStatusCache {
@@ -43,49 +35,6 @@ impl ChainStatusCache {
&self,
client: &Client,
) -> Result<DetailedChainStatus, AxumErrorResponse> {
if let Some(cached) = self.check_cache().await {
return Ok(cached);
}
self.refresh(client).await
}
async fn check_cache(&self) -> Option<DetailedChainStatus> {
let guard = self.inner.read().await;
let inner = guard.as_ref()?;
if inner.is_valid(self.cache_ttl) {
return Some(inner.cache_value.clone());
}
None
}
async fn refresh(&self, client: &Client) -> Result<DetailedChainStatus, AxumErrorResponse> {
// 1. attempt to get write lock permit
let mut guard = self.inner.write().await;
// 2. check if another task hasn't already updated the cache whilst we were waiting for the permit
if let Some(cached) = guard.as_ref() {
if cached.is_valid(self.cache_ttl) {
return Ok(cached.cache_value.clone());
}
}
// 3. attempt to query the chain for the chain data
let abci = client.abci_info().await?;
let block = client
.block_info(abci.last_block_height.value() as u32)
.await?;
let status = DetailedChainStatus {
abci: abci.into(),
latest_block: block.into(),
};
*guard = Some(ChainStatusCacheInner {
last_refreshed_at: OffsetDateTime::now_utc(),
cache_value: status.clone(),
});
Ok(status)
self.0.get_or_refresh(client, refresh).await
}
}
@@ -2,6 +2,7 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::node_status_api::models::AxumErrorResponse;
use crate::support::http::state::helpers::ChainSharedCacheWithTtl;
use crate::support::nyxd::Client;
use nym_contracts_common::ContractBuildInformation;
use nym_validator_client::nyxd::contract_traits::{
@@ -10,10 +11,7 @@ use nym_validator_client::nyxd::contract_traits::{
use nym_validator_client::nyxd::error::NyxdError;
use nym_validator_client::nyxd::AccountId;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::RwLock;
type ContractAddress = String;
@@ -41,142 +39,81 @@ impl CachedContractInfo {
}
#[derive(Clone)]
pub(crate) struct ContractDetailsCache {
cache_ttl: Duration,
inner: Arc<RwLock<ContractDetailsCacheInner>>,
pub(crate) struct ContractDetailsCache(ChainSharedCacheWithTtl<CachedContractsInfo>);
async fn refresh(nyxd_client: &Client) -> Result<CachedContractsInfo, NyxdError> {
use crate::query_guard;
let mut updated = HashMap::new();
let client_guard = nyxd_client.read().await;
let mixnet = query_guard!(client_guard, mixnet_contract_address());
let vesting = query_guard!(client_guard, vesting_contract_address());
let coconut_dkg = query_guard!(client_guard, dkg_contract_address());
let group = query_guard!(client_guard, group_contract_address());
let multisig = query_guard!(client_guard, multisig_contract_address());
let ecash = query_guard!(client_guard, ecash_contract_address());
let performance = query_guard!(client_guard, performance_contract_address());
for (address, name) in [
(mixnet, "nym-mixnet-contract"),
(vesting, "nym-vesting-contract"),
(coconut_dkg, "nym-coconut-dkg-contract"),
(group, "nym-cw4-group-contract"),
(multisig, "nym-cw3-multisig-contract"),
(ecash, "nym-ecash-contract"),
(performance, "nym-performance-contract"),
] {
let (cw2, build_info) = if let Some(address) = address {
let cw2 = query_guard!(client_guard, try_get_cw2_contract_version(address).await);
let mut build_info = query_guard!(
client_guard,
try_get_contract_build_information(address).await
);
// for backwards compatibility until we migrate the contracts
if build_info.is_none() {
match name {
"nym-mixnet-contract" => {
build_info = Some(query_guard!(
client_guard,
get_mixnet_contract_version().await
)?)
}
"nym-vesting-contract" => {
build_info = Some(query_guard!(
client_guard,
get_vesting_contract_version().await
)?)
}
_ => (),
}
}
(cw2, build_info)
} else {
(None, None)
};
updated.insert(
name.to_string(),
CachedContractInfo::new(address, cw2, build_info),
);
}
Ok(updated)
}
impl ContractDetailsCache {
pub(crate) fn new(cache_ttl: Duration) -> Self {
ContractDetailsCache {
cache_ttl,
inner: Arc::new(RwLock::new(ContractDetailsCacheInner::new())),
}
}
}
struct ContractDetailsCacheInner {
last_refreshed_at: OffsetDateTime,
cache_value: CachedContractsInfo,
}
impl ContractDetailsCacheInner {
pub(crate) fn new() -> Self {
ContractDetailsCacheInner {
last_refreshed_at: OffsetDateTime::UNIX_EPOCH,
cache_value: Default::default(),
}
ContractDetailsCache(ChainSharedCacheWithTtl::new(cache_ttl))
}
fn is_valid(&self, ttl: Duration) -> bool {
if self.last_refreshed_at + ttl > OffsetDateTime::now_utc() {
return true;
}
false
}
async fn retrieve_nym_contracts_info(
&self,
nyxd_client: &Client,
) -> Result<CachedContractsInfo, NyxdError> {
use crate::query_guard;
let mut updated = HashMap::new();
let client_guard = nyxd_client.read().await;
let mixnet = query_guard!(client_guard, mixnet_contract_address());
let vesting = query_guard!(client_guard, vesting_contract_address());
let coconut_dkg = query_guard!(client_guard, dkg_contract_address());
let group = query_guard!(client_guard, group_contract_address());
let multisig = query_guard!(client_guard, multisig_contract_address());
let ecash = query_guard!(client_guard, ecash_contract_address());
let performance = query_guard!(client_guard, performance_contract_address());
for (address, name) in [
(mixnet, "nym-mixnet-contract"),
(vesting, "nym-vesting-contract"),
(coconut_dkg, "nym-coconut-dkg-contract"),
(group, "nym-cw4-group-contract"),
(multisig, "nym-cw3-multisig-contract"),
(ecash, "nym-ecash-contract"),
(performance, "nym-performance-contract"),
] {
let (cw2, build_info) = if let Some(address) = address {
let cw2 = query_guard!(client_guard, try_get_cw2_contract_version(address).await);
let mut build_info = query_guard!(
client_guard,
try_get_contract_build_information(address).await
);
// for backwards compatibility until we migrate the contracts
if build_info.is_none() {
match name {
"nym-mixnet-contract" => {
build_info = Some(query_guard!(
client_guard,
get_mixnet_contract_version().await
)?)
}
"nym-vesting-contract" => {
build_info = Some(query_guard!(
client_guard,
get_vesting_contract_version().await
)?)
}
_ => (),
}
}
(cw2, build_info)
} else {
(None, None)
};
updated.insert(
name.to_string(),
CachedContractInfo::new(address, cw2, build_info),
);
}
Ok(updated)
}
}
impl ContractDetailsCache {
pub(crate) async fn get_or_refresh(
&self,
client: &Client,
) -> Result<CachedContractsInfo, AxumErrorResponse> {
if let Some(cached) = self.check_cache().await {
return Ok(cached);
}
self.refresh(client).await
}
async fn check_cache(&self) -> Option<CachedContractsInfo> {
let guard = self.inner.read().await;
if guard.is_valid(self.cache_ttl) {
return Some(guard.cache_value.clone());
}
None
}
async fn refresh(&self, client: &Client) -> Result<CachedContractsInfo, AxumErrorResponse> {
// 1. attempt to get write lock permit
let mut guard = self.inner.write().await;
// 2. check if another task hasn't already updated the cache whilst we were waiting for the permit
if guard.is_valid(self.cache_ttl) {
return Ok(guard.cache_value.clone());
}
// 3. attempt to query the chain for the contracts data
let updated_values = guard.retrieve_nym_contracts_info(client).await?;
guard.last_refreshed_at = OffsetDateTime::now_utc();
guard.cache_value = updated_values.clone();
Ok(updated_values)
self.0.get_or_refresh(client, refresh).await
}
}
+133 -1
View File
@@ -1,15 +1,27 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node_status_api::models::AxumErrorResponse;
use crate::support::caching::refresher::RefreshRequester;
use crate::support::nyxd::Client;
use nym_validator_client::nyxd::error::NyxdError;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::RwLock;
/// Handle for on-demand cache refreshes driven by external triggers (e.g. an HTTP endpoint).
///
/// Wraps a [`RefreshRequester`] alongside the timestamp of the most recent refresh request so
/// callers can rate-limit or expose "last refreshed" information without reaching into the
/// underlying cache.
#[derive(Clone)]
pub(crate) struct Refreshing {
handle: RefreshRequester,
last_requested: Arc<AtomicI64>, // unix timestamp
/// Unix timestamp of the last refresh request; stored atomically so multiple request handlers
/// can update it concurrently without taking a lock.
last_requested: Arc<AtomicI64>,
}
impl Refreshing {
@@ -36,3 +48,123 @@ impl Refreshing {
self.handle.request_cache_refresh();
}
}
/// Shared, TTL-gated cache for values that are (re)hydrated from the nyxd chain on demand.
///
/// The cache collapses the common "check cache, otherwise refresh" pattern used across the various
/// chain-backed state caches (chain status, contract details, ...) into a single generic type.
/// Callers plug in a type-specific `refresh_fn` that knows how to fetch `T` from the chain; this
/// type handles the locking, TTL check, and single-flight behavior.
///
/// Concurrency model:
/// - Reads happen under a read lock; if the cached value is present and within TTL it is returned
/// immediately.
/// - If the cached value is missing or stale, a single writer takes the write lock, re-checks the
/// TTL (so a refresh that completed while we were waiting isn't redundantly repeated) and then
/// invokes `refresh_fn`. Other concurrent callers will block on the write lock and observe the
/// freshly populated value instead of each running their own query against the chain.
#[derive(Clone)]
pub(crate) struct ChainSharedCacheWithTtl<T> {
cache_ttl: Duration,
inner: Arc<RwLock<Option<ChainSharedCacheWithTtlInner<T>>>>,
}
impl<T> ChainSharedCacheWithTtl<T>
where
T: Clone,
{
pub(crate) fn new(cache_ttl: Duration) -> Self {
ChainSharedCacheWithTtl {
cache_ttl,
inner: Arc::new(RwLock::new(None)),
}
}
/// Return the cached value if it is still fresh, otherwise refresh it via `refresh_fn`.
///
/// Takes the read-only fast path when the cache is warm and only escalates to a write-locked
/// refresh when the value is missing or expired.
pub(crate) async fn get_or_refresh<F>(
&self,
client: &Client,
refresh_fn: F,
) -> Result<T, AxumErrorResponse>
where
F: AsyncFn(&Client) -> Result<T, NyxdError>,
{
if let Some(cached) = self.check_cache().await {
return Ok(cached);
}
self.refresh(client, refresh_fn).await
}
/// Return the cached value if present and within TTL without attempting a refresh.
async fn check_cache(&self) -> Option<T>
where
T: Clone,
{
let guard = self.inner.read().await;
let inner = guard.as_ref()?;
if inner.is_valid(self.cache_ttl) {
return Some(inner.value.clone());
}
None
}
/// Forcibly re-query the chain via `refresh_fn` and replace the cached value.
///
/// The double-checked TTL guard after acquiring the write lock prevents the common
/// thundering-herd case where many concurrent callers all observe a stale cache at once - only
/// the first one to acquire the write lock will actually hit the chain.
async fn refresh<F>(&self, client: &Client, refresh_fn: F) -> Result<T, AxumErrorResponse>
where
F: AsyncFn(&Client) -> Result<T, NyxdError>,
T: Clone,
{
// 1. attempt to get write lock permit
let mut guard = self.inner.write().await;
// 2. check if another task hasn't already updated the cache whilst we were waiting for the permit
if let Some(cached) = guard.as_ref() {
if cached.is_valid(self.cache_ttl) {
return Ok(cached.clone_value());
}
}
let refresh_res = refresh_fn(client).await?;
*guard = Self::new_inner(refresh_res.clone());
Ok(refresh_res)
}
fn new_inner(value: T) -> Option<ChainSharedCacheWithTtlInner<T>> {
Some(ChainSharedCacheWithTtlInner::new(value))
}
}
/// Cached value alongside the timestamp at which it was fetched, used to evaluate the TTL.
struct ChainSharedCacheWithTtlInner<T> {
last_refreshed_at: OffsetDateTime,
value: T,
}
impl<T> ChainSharedCacheWithTtlInner<T> {
fn new(value: T) -> Self {
ChainSharedCacheWithTtlInner {
last_refreshed_at: OffsetDateTime::now_utc(),
value,
}
}
fn is_valid(&self, ttl: Duration) -> bool {
self.last_refreshed_at + ttl > OffsetDateTime::now_utc()
}
fn clone_value(&self) -> T
where
T: Clone,
{
self.value.clone()
}
}
+429 -140
View File
File diff suppressed because it is too large Load Diff