feat: introduce on-disk cache persistance for major nym-api caches (#6302)

This includes:
- mixnet contract cache
- described nodes cache
- nodes annotations cache (performance)

those changes include taking some code developed for the purposes of #6277
This commit is contained in:
Jędrzej Stuczyński
2026-02-11 15:57:47 +00:00
committed by GitHub
parent 46b9d5374b
commit 4897cb0ce4
29 changed files with 436 additions and 151 deletions
+2 -4
View File
@@ -5,11 +5,9 @@ use crate::ecash::api_routes::handlers::ecash_routes;
use crate::ecash::error::{EcashError, Result};
use crate::ecash::keys::KeyPairWithEpoch;
use crate::ecash::state::EcashState;
use crate::mixnet_contract_cache::cache::MixnetContractCache;
use crate::network::models::NetworkDetails;
use crate::node_describe_cache::cache::DescribedNodes;
use crate::node_status_api::handlers::unstable;
use crate::node_status_api::NodeStatusCache;
use crate::status::ApiStatusState;
use crate::support::caching::cache::SharedCache;
use crate::support::config;
@@ -1284,8 +1282,8 @@ impl TestFixture {
ecash_signers_cache: Default::default(),
address_info_cache: AddressInfoCache::new(Duration::from_secs(42), 1000),
forced_refresh: ForcedRefresh::new(true),
mixnet_contract_cache: MixnetContractCache::new(),
node_status_cache: NodeStatusCache::new(),
mixnet_contract_cache: SharedCache::new().into(),
node_status_cache: SharedCache::new().into(),
storage,
described_nodes_cache: SharedCache::<DescribedNodes>::new(),
network_details: NetworkDetails::new(
+7 -4
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::epoch_operations::EpochAdvancer;
use crate::support::caching::Cache;
use crate::support::caching::cache::UninitialisedCache;
use cosmwasm_std::{Decimal, Fraction};
use nym_api_requests::models::NodeAnnotation;
use nym_mixnet_contract_common::helpers::IntoBaseDecimal;
@@ -210,10 +210,13 @@ fn determine_per_node_work(
impl EpochAdvancer {
fn load_performance(
status_cache: &Option<RwLockReadGuard<'_, Cache<HashMap<NodeId, NodeAnnotation>>>>,
status_cache: &Result<
RwLockReadGuard<'_, HashMap<NodeId, NodeAnnotation>>,
UninitialisedCache,
>,
node_id: NodeId,
) -> NodeWithPerformance {
let Some(status_cache) = status_cache.as_ref() else {
let Ok(status_cache) = status_cache.as_ref() else {
return NodeWithPerformance::new_zero(node_id);
};
@@ -239,7 +242,7 @@ impl EpochAdvancer {
let standby_node_work_factor = nodes_work.standby;
let status_cache = self.status_cache.node_annotations().await;
if status_cache.is_none() {
if status_cache.is_err() {
error!("there are no node annotations available");
};
@@ -214,7 +214,7 @@ impl EpochAdvancer {
#[allow(clippy::unwrap_used)]
let described_cache = self.described_cache.get().await.unwrap();
let Some(status_cache) = self.status_cache.node_annotations().await else {
let Ok(status_cache) = self.status_cache.node_annotations().await else {
warn!("there are no node annotations available");
return Vec::new();
};
+3 -1
View File
@@ -7,8 +7,9 @@ use nym_mixnet_contract_common::{
NymNodeDetails, RewardingParams,
};
use nym_topology::CachedEpochRewardedSet;
use serde::{Deserialize, Serialize};
#[derive(Clone)]
#[derive(Clone, Serialize, Deserialize)]
pub(crate) struct ConfigScoreData {
pub(crate) config_score_params: ConfigScoreParams,
pub(crate) nym_node_version_history: Vec<HistoricalNymNodeVersionEntry>,
@@ -27,6 +28,7 @@ impl From<ConfigScoreData> for ConfigScoreDataResponse {
}
}
#[derive(Serialize, Deserialize)]
pub(crate) struct MixnetContractCacheData {
pub(crate) rewarding_denom: String,
+10 -2
View File
@@ -14,6 +14,8 @@ use nym_mixnet_contract_common::{
};
use nym_topology::CachedEpochRewardedSet;
use nym_validator_client::nyxd::Coin;
use std::path::Path;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::RwLockReadGuard;
@@ -27,10 +29,16 @@ pub struct MixnetContractCache {
pub(crate) inner: SharedCache<MixnetContractCacheData>,
}
impl From<SharedCache<MixnetContractCacheData>> for MixnetContractCache {
fn from(inner: SharedCache<MixnetContractCacheData>) -> Self {
MixnetContractCache { inner }
}
}
impl MixnetContractCache {
pub(crate) fn new() -> Self {
pub(crate) fn new<P: AsRef<Path>>(store_path: P, max_cache_age: Duration) -> Self {
MixnetContractCache {
inner: SharedCache::new(),
inner: SharedCache::new_with_persistent(store_path, max_cache_age, None),
}
}
+3
View File
@@ -7,6 +7,7 @@ use crate::mixnet_contract_cache::cache::MixnetContractCache;
use crate::support::caching::refresher::CacheRefresher;
use crate::support::{config, nyxd};
use nym_validator_client::nyxd::error::NyxdError;
use std::path::PathBuf;
pub(crate) mod cache;
pub(crate) mod handlers;
@@ -15,6 +16,7 @@ pub(crate) fn build_refresher(
config: &config::MixnetContractCache,
nym_contract_cache_state: &MixnetContractCache,
nyxd_client: nyxd::Client,
on_disk_file: PathBuf,
) -> CacheRefresher<MixnetContractCacheData, NyxdError> {
CacheRefresher::new_with_initial_value(
Box::new(MixnetContractDataProvider::new(nyxd_client)),
@@ -22,4 +24,5 @@ pub(crate) fn build_refresher(
nym_contract_cache_state.inner(),
)
.named("mixnet-contract-cache-refresher")
.with_persistent_cache(on_disk_file)
}
@@ -233,7 +233,7 @@ impl PacketPreparer {
// routes so that they wouldn't be reused
pub(crate) async fn prepare_test_routes(&self, n: usize) -> Option<Vec<TestRoute>> {
let descriptions = self.described_cache.get().await.ok()?;
let statuses = self.node_status_cache.node_annotations().await?;
let statuses = self.node_status_cache.node_annotations().await.ok()?;
let mixing_nym_nodes = descriptions.mixing_nym_nodes();
// last I checked `gatewaying` wasn't a word : )
+2 -1
View File
@@ -3,10 +3,11 @@
use nym_api_requests::models::{DescribedNodeTypeV2, NymNodeDataV2, NymNodeDescriptionV2};
use nym_mixnet_contract_common::NodeId;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::IpAddr;
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DescribedNodes {
pub(crate) nodes: HashMap<NodeId, NymNodeDescriptionV2>,
pub(crate) addresses_cache: HashMap<IpAddr, NodeId>,
+4 -13
View File
@@ -12,6 +12,7 @@ use crate::support::config::DEFAULT_NODE_DESCRIBE_BATCH_SIZE;
use async_trait::async_trait;
use futures::{stream, StreamExt};
use std::collections::HashMap;
use std::path::PathBuf;
use tracing::{error, info};
pub struct NodeDescriptionProvider {
@@ -90,23 +91,11 @@ impl CacheItemProvider for NodeDescriptionProvider {
}
}
// currently dead code : (
#[allow(dead_code)]
pub(crate) fn new_refresher(
config: &config::DescribeCache,
contract_cache: MixnetContractCache,
) -> CacheRefresher<DescribedNodes, NodeDescribeCacheError> {
CacheRefresher::new(
NodeDescriptionProvider::new(contract_cache, config.debug.allow_illegal_ips)
.with_batch_size(config.debug.batch_size),
config.debug.caching_interval,
)
}
pub(crate) fn new_provider_with_initial_value(
config: &config::DescribeCache,
contract_cache: MixnetContractCache,
initial: SharedCache<DescribedNodes>,
on_disk_file: PathBuf,
) -> CacheRefresher<DescribedNodes, NodeDescribeCacheError> {
CacheRefresher::new_with_initial_value(
Box::new(
@@ -116,4 +105,6 @@ pub(crate) fn new_provider_with_initial_value(
config.debug.caching_interval,
initial,
)
.named("node-self-described-data-refresher")
.with_persistent_cache(on_disk_file)
}
@@ -7,8 +7,10 @@ use nym_contracts_common::NaiveFloat;
use nym_mixnet_contract_common::reward_params::Performance;
use nym_mixnet_contract_common::{EpochId, NodeId};
use nym_validator_client::nyxd::contract_traits::performance_query_client::NodePerformance;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};
#[derive(Serialize, Deserialize)]
pub(crate) struct PerformanceContractEpochCacheData {
pub(crate) epoch_id: EpochId,
pub(crate) median_performance: HashMap<NodeId, Performance>,
@@ -30,6 +32,7 @@ impl PerformanceContractEpochCacheData {
}
}
#[derive(Serialize, Deserialize)]
pub(crate) struct PerformanceContractCacheData {
pub(crate) epoch_performance: BTreeMap<EpochId, PerformanceContractEpochCacheData>,
}
+6 -6
View File
@@ -1,20 +1,20 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::support::caching::Cache;
use nym_api_requests::models::NodeAnnotation;
use nym_mixnet_contract_common::NodeId;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Default)]
#[derive(Default, Serialize, Deserialize)]
#[allow(deprecated)]
pub(crate) struct NodeStatusCacheData {
/// Basic annotation for nym-nodes
pub(crate) node_annotations: Cache<HashMap<NodeId, NodeAnnotation>>,
pub(crate) node_annotations: HashMap<NodeId, NodeAnnotation>,
}
impl NodeStatusCacheData {
pub fn new() -> Self {
Self::default()
impl From<HashMap<NodeId, NodeAnnotation>> for NodeStatusCacheData {
fn from(node_annotations: HashMap<NodeId, NodeAnnotation>) -> Self {
NodeStatusCacheData { node_annotations }
}
}
+43 -23
View File
@@ -3,19 +3,18 @@
use self::data::NodeStatusCacheData;
use crate::node_performance::provider::PerformanceRetrievalFailure;
use crate::support::caching::cache::UninitialisedCache;
use crate::support::caching::cache::{SharedCache, UninitialisedCache};
use crate::support::caching::Cache;
use nym_api_requests::models::NodeAnnotation;
use nym_mixnet_contract_common::NodeId;
use std::collections::HashMap;
use std::{sync::Arc, time::Duration};
use std::path::Path;
use std::time::Duration;
use thiserror::Error;
use time::OffsetDateTime;
use tokio::sync::RwLockReadGuard;
use tokio::{sync::RwLock, time};
use tracing::error;
const CACHE_TIMEOUT_MS: u64 = 100;
mod config_score;
pub mod data;
pub mod refresher;
@@ -44,43 +43,64 @@ impl From<UninitialisedCache> for NodeStatusCacheError {
/// The cache can be triggered to update on contract cache changes, and/or periodically on a timer.
#[derive(Clone)]
pub struct NodeStatusCache {
inner: Arc<RwLock<NodeStatusCacheData>>,
inner: SharedCache<NodeStatusCacheData>,
}
impl From<SharedCache<NodeStatusCacheData>> for NodeStatusCache {
fn from(inner: SharedCache<NodeStatusCacheData>) -> Self {
NodeStatusCache { inner }
}
}
impl NodeStatusCache {
/// Creates a new cache with no data.
pub(crate) fn new() -> NodeStatusCache {
pub(crate) fn new<P: AsRef<Path>>(store_path: P, max_cache_age: Duration) -> NodeStatusCache {
NodeStatusCache {
inner: Arc::new(RwLock::new(NodeStatusCacheData::new())),
inner: SharedCache::new_with_persistent(
store_path,
max_cache_age,
Some(HashMap::new().into()),
),
}
}
pub async fn cache_timestamp(&self) -> OffsetDateTime {
let Ok(cache) = self.inner.get().await else {
return OffsetDateTime::UNIX_EPOCH;
};
cache.timestamp()
}
/// Updates the cache with the latest data.
async fn update(&self, node_annotations: HashMap<NodeId, NodeAnnotation>) {
match time::timeout(Duration::from_millis(CACHE_TIMEOUT_MS), self.inner.write()).await {
Ok(mut cache) => {
cache.node_annotations.unchecked_update(node_annotations);
}
Err(e) => error!("{e}"),
if self
.inner
.try_overwrite_old_value(node_annotations, "node-status")
.await
.is_err()
{
error!("failed to update node status cache!")
}
}
pub(crate) async fn cache(
&self,
) -> Result<RwLockReadGuard<'_, Cache<NodeStatusCacheData>>, UninitialisedCache> {
self.inner.get().await
}
async fn get<'a, T: 'a>(
&'a self,
fn_arg: impl FnOnce(&NodeStatusCacheData) -> &Cache<T>,
) -> Option<RwLockReadGuard<'a, Cache<T>>> {
match time::timeout(Duration::from_millis(CACHE_TIMEOUT_MS), self.inner.read()).await {
Ok(cache) => Some(RwLockReadGuard::map(cache, |item| fn_arg(item))),
Err(e) => {
error!("{e}");
None
}
}
fn_arg: impl FnOnce(&Cache<NodeStatusCacheData>) -> &T,
) -> Result<RwLockReadGuard<'a, T>, UninitialisedCache> {
let guard = self.inner.get().await?;
Ok(RwLockReadGuard::map(guard, fn_arg))
}
pub(crate) async fn node_annotations(
&self,
) -> Option<RwLockReadGuard<'_, Cache<HashMap<NodeId, NodeAnnotation>>>> {
) -> Result<RwLockReadGuard<'_, HashMap<NodeId, NodeAnnotation>>, UninitialisedCache> {
self.get(|c| &c.node_annotations).await
}
}
+46 -1
View File
@@ -8,6 +8,7 @@ use crate::node_performance::provider::{NodePerformanceProvider, NodesRoutingSco
use crate::node_status_api::cache::config_score::calculate_config_score;
use crate::node_status_api::models::Uptime;
use crate::support::caching::cache::SharedCache;
use crate::support::caching::refresher::RefreshRequester;
use crate::{
mixnet_contract_cache::cache::MixnetContractCache,
node_status_api::cache::NodeStatusCacheError, support::caching::CacheNotification,
@@ -18,10 +19,11 @@ use nym_mixnet_contract_common::{NodeId, NymNodeDetails};
use nym_task::ShutdownToken;
use nym_topology::CachedEpochRewardedSet;
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Duration;
use tokio::sync::watch;
use tokio::time;
use tracing::{info, trace, warn};
use tracing::{error, info, trace, warn};
// Long running task responsible for keeping the node status cache up-to-date.
pub struct NodeStatusCacheRefresher {
@@ -32,13 +34,27 @@ pub struct NodeStatusCacheRefresher {
// Sources for when refreshing data
mixnet_contract_cache: MixnetContractCache,
described_cache: SharedCache<DescribedNodes>,
/// channel notifying us when mixnet cache has been refreshed,
/// so that this cache could also be recreated
mixnet_contract_cache_listener: watch::Receiver<CacheNotification>,
/// channel notifying us when the describe cache has been refreshed,
/// so that this cache could also be recreated
describe_cache_listener: watch::Receiver<CacheNotification>,
/// channel explicitly requesting cache refresh. it does not follow the usual rate limiting
refresh_requester: RefreshRequester,
/// Path to an on-disk location where the contents of the retrieved items should be written
/// upon refresh
on_disk_file: PathBuf,
performance_provider: Box<dyn NodePerformanceProvider + Send + Sync>,
}
impl NodeStatusCacheRefresher {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
cache: NodeStatusCache,
fallback_caching_interval: Duration,
@@ -47,6 +63,7 @@ impl NodeStatusCacheRefresher {
contract_cache_listener: watch::Receiver<CacheNotification>,
describe_cache_listener: watch::Receiver<CacheNotification>,
performance_provider: Box<dyn NodePerformanceProvider + Send + Sync>,
on_disk_file: PathBuf,
) -> Self {
Self {
cache,
@@ -55,6 +72,8 @@ impl NodeStatusCacheRefresher {
described_cache,
mixnet_contract_cache_listener: contract_cache_listener,
describe_cache_listener,
refresh_requester: Default::default(),
on_disk_file,
performance_provider,
}
}
@@ -89,6 +108,23 @@ impl NodeStatusCacheRefresher {
}
}
}
// note: `Notify` is not cancellation safe, HOWEVER, there's only one listener,
// so it doesn't matter if we lose our queue position
_ = self.refresh_requester.notified() => {
tokio::select! {
// perform full refresh regardless of the rates
_ = self.refresh() => {
last_update = OffsetDateTime::now_utc();
fallback_interval.reset();
},
_ = shutdown_token.cancelled() => {
trace!("NodeStatusCacheRefresher: Received shutdown");
break;
}
}
}
// ... however, if we don't receive any notifications we fall back to periodic
// refreshes
_ = fallback_interval.tick() => {
@@ -219,6 +255,15 @@ impl NodeStatusCacheRefresher {
// Update the cache
self.cache.update(node_annotations).await;
// attempt to update on-disk cache
let Ok(new_cached) = self.cache.cache().await else {
error!("the node status cache is still not initialised!");
return Ok(());
};
// error reporting is handled by the serialise function itself
let _ = new_cached.try_serialise_to_file(&self.on_disk_file);
Ok(())
}
}
+3
View File
@@ -12,6 +12,7 @@ use crate::{
};
pub(crate) use cache::NodeStatusCache;
use nym_task::ShutdownManager;
use std::path::PathBuf;
use std::time::Duration;
use tokio::sync::watch;
@@ -39,6 +40,7 @@ pub(crate) fn start_cache_refresh(
performance_provider: Box<dyn NodePerformanceProvider + Send + Sync>,
nym_contract_cache_listener: watch::Receiver<support::caching::CacheNotification>,
described_cache_cache_listener: watch::Receiver<support::caching::CacheNotification>,
on_disk_file: PathBuf,
shutdown_manager: &ShutdownManager,
) {
let mut nym_api_cache_refresher = NodeStatusCacheRefresher::new(
@@ -49,6 +51,7 @@ pub(crate) fn start_cache_refresh(
nym_contract_cache_listener,
described_cache_cache_listener,
performance_provider,
on_disk_file,
);
let shutdown_listener = shutdown_manager.clone_shutdown_token();
tokio::spawn(async move { nym_api_cache_refresher.run(shutdown_listener).await });
-7
View File
@@ -346,13 +346,6 @@ impl AxumErrorResponse {
}
}
pub(crate) fn internal() -> Self {
Self {
message: RequestError::new("Internal server error"),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
}
pub(crate) fn not_implemented() -> Self {
Self {
message: RequestError::empty(),
+2 -10
View File
@@ -279,11 +279,7 @@ async fn get_node_annotation(
) -> AxumResult<FormattedResponse<AnnotationResponse>> {
let output = output.output.unwrap_or_default();
let annotations = state
.node_status_cache
.node_annotations()
.await
.ok_or_else(AxumErrorResponse::internal)?;
let annotations = state.node_status_cache().node_annotations().await?;
Ok(output.to_response(AnnotationResponse {
node_id,
@@ -312,11 +308,7 @@ async fn get_current_node_performance(
) -> AxumResult<FormattedResponse<NodePerformanceResponse>> {
let output = output.output.unwrap_or_default();
let annotations = state
.node_status_cache
.node_annotations()
.await
.ok_or_else(AxumErrorResponse::internal)?;
let annotations = state.node_status_cache().node_annotations().await?;
Ok(output.to_response(NodePerformanceResponse {
node_id,
+2
View File
@@ -2,10 +2,12 @@
// SPDX-License-Identifier: GPL-3.0-only
use nym_ecash_signer_check::SignersTestResult;
use serde::{Deserialize, Serialize};
pub(crate) mod data;
pub(crate) mod refresher;
#[derive(Serialize, Deserialize)]
pub(crate) struct SignersCacheData {
pub(crate) signers_results: SignersTestResult,
}
+168 -1
View File
@@ -1,13 +1,19 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use bincode::Options;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::fs;
use std::fs::File;
use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use time::OffsetDateTime;
use tokio::sync::{RwLock, RwLockMappedWriteGuard, RwLockReadGuard, RwLockWriteGuard};
use tracing::debug;
use tracing::{debug, error};
#[derive(Debug, Error)]
#[error("the cache item has not been initialised")]
@@ -32,6 +38,41 @@ impl<T> SharedCache<T> {
SharedCache::default()
}
#[track_caller]
pub(crate) fn new_with_persistent<P: AsRef<Path>>(
store_path: P,
max_cache_age: Duration,
fallback_value: Option<T>,
) -> Self
where
T: DeserializeOwned,
{
// attempt to load data from disk
let Ok(disk_cached) = Cache::<T>::try_deserialise_from_file(store_path) else {
// if failed, fallback to fresh state
// (the file might not have existed, for example on initial run)
return if let Some(fallback_value) = fallback_value {
Self::new_with_value(fallback_value)
} else {
Self::new()
};
};
// check if the entry is not too stale
if disk_cached.has_expired(max_cache_age, None) {
// if too old, fallback to fresh state
debug!("cache has expired");
return if let Some(fallback_value) = fallback_value {
Self::new_with_value(fallback_value)
} else {
Self::new()
};
}
// use loaded value
SharedCache(Arc::new(RwLock::new(CachedItem {
inner: Some(disk_cached),
})))
}
pub(crate) fn new_with_value(value: T) -> Self {
SharedCache(Arc::new(RwLock::new(CachedItem {
inner: Some(Cache::new(value)),
@@ -230,6 +271,35 @@ impl<T> Cache<T> {
pub fn timestamp(&self) -> OffsetDateTime {
self.as_at
}
#[track_caller]
pub(crate) fn try_serialise_to_file<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()>
where
T: Serialize,
{
SerialisableCache {
value: &self.value,
as_at: self.as_at,
}
.try_serialise_to_file(path)
}
#[track_caller]
pub(crate) fn try_deserialise_from_file<P: AsRef<Path>>(path: P) -> std::io::Result<Self>
where
T: DeserializeOwned,
{
let path = path.as_ref();
if !path.exists() {
debug!("cached file does not exist at: {}", path.display());
return Err(std::io::Error::other("cached file does not exist"));
}
DeserialisedCache::try_deserialise_from_file(path).map(|d| Cache {
value: d.value,
as_at: d.as_at,
})
}
}
impl<T> Deref for Cache<T> {
@@ -251,3 +321,100 @@ where
}
}
}
#[derive(Serialize)]
struct SerialisableCache<'a, T> {
value: &'a T,
#[serde(with = "time::serde::rfc3339")]
as_at: OffsetDateTime,
}
impl<'a, T> SerialisableCache<'a, T> {
#[track_caller]
fn try_serialise_to_file<P: AsRef<Path>>(self, path: P) -> std::io::Result<()>
where
T: Serialize,
{
use ::bincode::Options;
let serialiser = make_bincode_serializer();
let path = path.as_ref();
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let file = match File::create(path) {
Ok(file) => file,
Err(err) => {
error!("failed to create persistent cache file at {path:?}: {err}",);
return Err(err);
}
};
serialiser.serialize_into(file, &self).map_err(|err| {
error!("failed to serialise persistent cache file at {path:?}: {err}");
std::io::Error::other(err)
})
}
}
#[derive(Deserialize)]
struct DeserialisedCache<T> {
value: T,
#[serde(with = "time::serde::rfc3339")]
as_at: OffsetDateTime,
}
impl<T> DeserialisedCache<T> {
#[track_caller]
fn try_deserialise_from_file<P: AsRef<Path>>(path: P) -> std::io::Result<Self>
where
T: DeserializeOwned,
{
use ::bincode::Options;
let serialiser = make_bincode_serializer();
let path = path.as_ref();
let file = match File::open(path) {
Ok(file) => file,
Err(err) => {
error!("failed to open persistent cache file at {path:?}: {err}",);
return Err(err);
}
};
serialiser.deserialize_from(file).map_err(|err| {
error!("failed to deserialised persistent cache file at {path:?}: {err}");
std::io::Error::other(err)
})
}
}
fn make_bincode_serializer() -> impl ::bincode::Options {
::bincode::DefaultOptions::new()
.with_little_endian()
.with_varint_encoding()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn deserialisation_is_reciprocal() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let dummy_data = Cache {
value: "foomp".to_string(),
as_at: OffsetDateTime::now_utc(),
};
dummy_data.try_serialise_to_file(tmp.path()).unwrap();
let de = Cache::<String>::try_deserialise_from_file(tmp.path()).unwrap();
assert_eq!(dummy_data.value, de.value);
assert_eq!(dummy_data.as_at, de.as_at);
}
}
+35 -1
View File
@@ -5,8 +5,11 @@ use crate::support::caching::cache::SharedCache;
use crate::support::caching::CacheNotification;
use async_trait::async_trait;
use nym_task::ShutdownToken;
use serde::Serialize;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::futures::Notified;
use tokio::sync::{watch, Notify};
use tokio::time::interval;
use tracing::{debug, error, info, trace, warn};
@@ -20,6 +23,10 @@ impl RefreshRequester {
pub(crate) fn request_cache_refresh(&self) {
self.0.notify_waiters()
}
pub(crate) fn notified(&self) -> Notified<'_> {
self.0.notified()
}
}
impl Default for RefreshRequester {
@@ -37,6 +44,10 @@ impl Default for RefreshRequester {
/// the entire value, and we might want to just insert a new entry
pub struct CacheRefresher<T, E, S = T> {
name: String,
/// Path to an on-disk location where the contents of the retrieved items should be written
/// upon refresh
on_disk_file: Option<PathBuf>,
refreshing_interval: Duration,
refresh_notification_sender: watch::Sender<CacheNotification>,
@@ -69,6 +80,7 @@ impl<T, E, S> CacheRefresher<T, E, S>
where
E: std::error::Error,
S: Into<T>,
T: Serialize,
{
pub(crate) fn new_boxed(
item_provider: Box<dyn CacheItemProvider<Error = E, Item = S> + Send + Sync>,
@@ -78,6 +90,7 @@ where
CacheRefresher {
name: "GenericCacheRefresher".to_string(),
on_disk_file: None,
refreshing_interval,
refresh_notification_sender,
update_fn: None,
@@ -86,7 +99,6 @@ where
refresh_requester: Default::default(),
}
}
pub(crate) fn new<P>(item_provider: P, refreshing_interval: Duration) -> Self
where
P: CacheItemProvider<Error = E, Item = S> + Send + Sync + 'static,
@@ -103,6 +115,7 @@ where
CacheRefresher {
name: "GenericCacheRefresher".to_string(),
on_disk_file: None,
refreshing_interval,
refresh_notification_sender,
update_fn: None,
@@ -129,6 +142,12 @@ where
self
}
#[must_use]
pub(crate) fn with_persistent_cache(mut self, storage_path: PathBuf) -> Self {
self.on_disk_file = Some(storage_path);
self
}
pub(crate) fn update_watcher(&self) -> CacheUpdateWatcher {
self.refresh_notification_sender.subscribe()
}
@@ -194,6 +213,19 @@ where
}
}
async fn try_flush_to_disk(&self) {
// if specified, attempt to flush data onto disk
let Some(disk_cache) = self.on_disk_file.as_ref() else {
return;
};
let Ok(new_cached) = self.shared_cache.get().await else {
error!("the {} cache is still not initialised!", self.name);
return;
};
// error reporting is handled by the serialise function itself
let _ = new_cached.try_serialise_to_file(disk_cache);
}
async fn do_refresh_cache(&mut self) {
let updated_items = match self.provider.try_refresh().await {
Err(err) => {
@@ -213,6 +245,8 @@ where
self.overwrite_cache(updated_items.into()).await;
}
self.try_flush_to_disk().await;
if !self.refresh_notification_sender.is_closed()
&& self
.refresh_notification_sender
+50 -28
View File
@@ -150,17 +150,60 @@ async fn start_nym_api_tasks(config: &Config) -> anyhow::Result<ShutdownManager>
let router = RouterBuilder::with_default_routes(config.network_monitor.enabled);
let mixnet_contract_cache_state = MixnetContractCache::new();
let node_status_cache_state = NodeStatusCache::new();
let mix_denom = network_details.network.chain_details.mix_denom.base.clone();
let described_nodes_cache = SharedCache::<DescribedNodes>::new();
let node_info_cache = unstable::NodeInfoCache::default();
let storage_cfg = &config.base.storage_paths;
// ===== START: attempt to build up initial caches based on prior data
//
// MIXNET CONTRACT
let mixnet_path = storage_cfg.cache_file("mixnet_contract");
let ttl = config.mixnet_contract_cache.debug.caching_interval;
let mixnet_contract_cache_state = MixnetContractCache::new(&mixnet_path, ttl);
let mixnet_contract_cache_refresher = mixnet_contract_cache::build_refresher(
&config.mixnet_contract_cache,
&mixnet_contract_cache_state.clone(),
nyxd_client.clone(),
mixnet_path,
);
// DESCRIBED NODES
let described_path = storage_cfg.cache_file("described_nodes");
let ttl = config.describe_cache.debug.caching_interval;
let described_nodes_cache =
SharedCache::<DescribedNodes>::new_with_persistent(&described_path, ttl, None);
let describe_cache_refresher = node_describe_cache::provider::new_provider_with_initial_value(
&config.describe_cache,
mixnet_contract_cache_state.clone(),
described_nodes_cache.clone(),
described_path,
);
// NODES ANNOTATIONS
let annotations_path = storage_cfg.cache_file("node_annotations");
let ttl = config.node_status_api.debug.caching_interval;
let node_status_cache_state = NodeStatusCache::new(&annotations_path, ttl);
// note: can't create the cache refresher the same way as above as it's using a different structure
// unfortunately
// ===== END: attempt to build up initial caches based on prior data
// not a 'persistent' cache that's updated on a timer like the above - it's just use for retrieving database information
// for unstable routes regarding test runs.
let node_info_cache = unstable::NodeInfoCache::default();
// not as data sensitive as others
// check if signers cache is enabled, and if so, start the refresher
let ecash_signers_cache = if config.signers_cache.enabled {
signers_cache::start_refresher(
&config.signers_cache,
nyxd_client.clone(),
&shutdown_manager,
)
} else {
SharedCache::new()
};
let mixnet_contract_cache_refresh_requester =
mixnet_contract_cache_refresher.refresh_requester();
@@ -211,17 +254,6 @@ async fn start_nym_api_tasks(config: &Config) -> anyhow::Result<ShutdownManager>
None
};
// check if signers cache is enabled, and if so, start the refresher
let ecash_signers_cache = if config.signers_cache.enabled {
signers_cache::start_refresher(
&config.signers_cache,
nyxd_client.clone(),
&shutdown_manager,
)
} else {
SharedCache::new()
};
ecash_state.spawn_background_cleaner();
let router = router.with_state(AppState {
nyxd_client: nyxd_client.clone(),
@@ -243,17 +275,6 @@ async fn start_nym_api_tasks(config: &Config) -> anyhow::Result<ShutdownManager>
ecash_state: Arc::new(ecash_state),
});
// start note describe cache refresher
// we should be doing the below, but can't due to our current startup structure
// let refresher = node_describe_cache::new_refresher(&config.topology_cacher);
// let cache = refresher.get_shared_cache();
let describe_cache_refresher = node_describe_cache::provider::new_provider_with_initial_value(
&config.describe_cache,
mixnet_contract_cache_state.clone(),
described_nodes_cache.clone(),
)
.named("node-self-described-data-refresher");
let describe_cache_refresh_requester = describe_cache_refresher.refresh_requester();
let describe_cache_watcher =
@@ -300,6 +321,7 @@ async fn start_nym_api_tasks(config: &Config) -> anyhow::Result<ShutdownManager>
performance_provider,
contract_cache_watcher.clone(),
describe_cache_watcher,
annotations_path,
&shutdown_manager,
);
@@ -379,11 +401,11 @@ async fn start_nym_api_tasks(config: &Config) -> anyhow::Result<ShutdownManager>
pub(crate) async fn execute(args: Args) -> anyhow::Result<()> {
// args take precedence over env
let config = try_load_current_config(&args.id)?
let mut config = try_load_current_config(&args.id)?
.override_with_env()
.override_with_args(args);
config.validate()?;
config.validate_and_fixup()?;
let mut shutdown_manager = start_nym_api_tasks(&config).await?;
shutdown_manager.run_until_shutdown().await;
+8 -2
View File
@@ -21,7 +21,7 @@ use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::{Path, PathBuf};
use std::time::Duration;
use tracing::debug;
use tracing::{debug, warn};
use url::Url;
use zeroize::{Zeroize, ZeroizeOnDrop};
@@ -166,7 +166,7 @@ impl Config {
}
}
pub fn validate(&self) -> anyhow::Result<()> {
pub fn validate_and_fixup(&mut self) -> anyhow::Result<()> {
let can_sign = self.base.mnemonic.is_some();
if !can_sign && self.rewarding.enabled {
@@ -177,6 +177,12 @@ impl Config {
bail!("can't enable coconut signer without providing a mnemonic")
}
if self.base.storage_paths.persistent_cache_directory == PathBuf::default() {
warn!("[base.storage_paths].persistent_cache_directory has not been set correctly - using default value instead");
self.base.storage_paths.persistent_cache_directory =
NymApiPaths::new_default(&self.base.id).persistent_cache_directory;
}
self.ecash_signer.validate()?;
Ok(())
+11 -19
View File
@@ -18,28 +18,10 @@ pub const DEFAULT_DKG_PUBLIC_KEY_WITH_PROOF_FILENAME: &str = "dkg_public_key_wit
// don't want to be changing the defaults in case something breaks..., but it should be called ecash.pem instead
pub const DEFAULT_ECASH_KEY_FILENAME: &str = "coconut.pem";
pub const DEFAULT_CACHES_DIRECTORY: &str = ".cache";
pub const DEFAULT_PRIVATE_IDENTITY_KEY_FILENAME: &str = "private_identity.pem";
pub const DEFAULT_PUBLIC_IDENTITY_KEY_FILENAME: &str = "public_identity.pem";
// #[derive(Debug, Deserialize, PartialEq, Eq, Serialize)]
// pub struct NymApiPathfinder {
// pub network_monitor: NetworkMonitorPathfinder,
//
// pub node_status_api: NodeStatusAPIPathfinder,
//
// pub coconut: CoconutSignerPathfinder,
// }
//
// impl NymApiPathfinder {
// pub fn new_default<P: AsRef<Path>>(id: P) -> Self {
// NymApiPathfinder {
// network_monitor: NetworkMonitorPathfinder::new_default(id.as_ref()),
// node_status_api: NodeStatusAPIPathfinder::new_default(id.as_ref()),
// coconut: CoconutSignerPathfinder::new_default(id.as_ref()),
// }
// }
// }
#[derive(Debug, Deserialize, PartialEq, Eq, Serialize)]
pub struct NetworkMonitorPaths {
// TODO: this should contain the path to the database holding the results, but changing it would break backwards compatibility
@@ -106,6 +88,11 @@ impl EcashSignerPaths {
#[derive(Debug, Default, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default)]
pub struct NymApiPaths {
/// Path to directory containing persistent caches of, for example,
/// the describe information, performance, etc.
/// It is used for restarting the nym-api and preserving the data
pub persistent_cache_directory: PathBuf,
/// Path to file containing private identity key of the nym-api.
pub private_identity_key_file: PathBuf,
@@ -118,11 +105,16 @@ impl NymApiPaths {
let data_dir = default_data_directory(id);
NymApiPaths {
persistent_cache_directory: data_dir.join(DEFAULT_CACHES_DIRECTORY),
private_identity_key_file: data_dir.join(DEFAULT_PRIVATE_IDENTITY_KEY_FILENAME),
public_identity_key_file: data_dir.join(DEFAULT_PUBLIC_IDENTITY_KEY_FILENAME),
}
}
pub fn cache_file(&self, name: impl AsRef<Path>) -> PathBuf {
self.persistent_cache_directory.join(name)
}
pub fn load_identity(&self) -> anyhow::Result<ed25519::KeyPair> {
let keypaths = nym_pemstore::KeyPairPath::new(
&self.private_identity_key_file,
+6
View File
@@ -22,6 +22,12 @@ bind_address = '{{ base.bind_address }}'
mnemonic = '{{ base.mnemonic }}'
[base.storage_paths]
# Path to directory containing persistent caches of, for example,
# the describe information, performance, etc.
# It is used for restarting the nym-api and preserving the data
persistent_cache_directory = '{{ base.storage_paths.persistent_cache_directory }}'
# Path to file containing private identity key of the nym-api.
private_identity_key_file = '{{ base.storage_paths.private_identity_key_file }}'
-13
View File
@@ -20,11 +20,8 @@ use crate::support::storage;
use crate::unstable_routes::v1::account::cache::AddressInfoCache;
use crate::unstable_routes::v1::account::models::NyxAccountDetails;
use axum::extract::FromRef;
use nym_api_requests::models::NodeAnnotation;
use nym_crypto::asymmetric::ed25519;
use nym_mixnet_contract_common::NodeId;
use nym_topology::CachedEpochRewardedSet;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLockReadGuard;
@@ -158,16 +155,6 @@ impl AppState {
Ok(self.nym_contract_cache().cached_rewarded_set().await?)
}
pub(crate) async fn node_annotations(
&self,
) -> Result<RwLockReadGuard<'_, Cache<HashMap<NodeId, NodeAnnotation>>>, AxumErrorResponse>
{
self.node_status_cache()
.node_annotations()
.await
.ok_or_else(AxumErrorResponse::internal)
}
pub(crate) async fn get_address_info(
self,
account_id: nym_validator_client::nyxd::AccountId,
@@ -90,7 +90,8 @@ pub(super) async fn nodes_expanded(
let describe_cache = state.describe_nodes_cache_data().await?;
let all_nym_nodes = describe_cache.all_nym_nodes();
let annotations = state.node_annotations().await?;
let status_cache = &state.node_status_cache();
let annotations = status_cache.node_annotations().await?;
let contract_cache = state.nym_contract_cache();
let current_key_rotation = contract_cache.current_key_rotation_id().await?;
@@ -107,7 +108,7 @@ pub(super) async fn nodes_expanded(
// min of all caches
let refreshed_at = refreshed_at([
rewarded_set.timestamp(),
annotations.timestamp(),
status_cache.cache_timestamp().await,
describe_cache.timestamp(),
]);
@@ -101,7 +101,8 @@ where
let rewarded_set = state.rewarded_set().await?;
// 2. grab all annotations so that we could attach scores to the [nym] nodes
let annotations = state.node_annotations().await?;
let status_cache = &state.node_status_cache();
let annotations = status_cache.node_annotations().await?;
// 3. implicitly grab the relevant described nodes
// (ideally it'd be tied directly to the NI iterator, but I couldn't defeat the compiler)
@@ -138,7 +139,7 @@ where
// min of all caches
let refreshed_at = refreshed_at([
rewarded_set.timestamp(),
annotations.timestamp(),
status_cache.cache_timestamp().await,
describe_cache.timestamp(),
]);
@@ -155,7 +156,7 @@ where
// min of all caches
let refreshed_at = refreshed_at([
rewarded_set.timestamp(),
annotations.timestamp(),
status_cache.cache_timestamp().await,
describe_cache.timestamp(),
]);
@@ -183,7 +184,8 @@ pub(crate) async fn nodes_basic(
let describe_cache = state.describe_nodes_cache_data().await?;
let all_nym_nodes = describe_cache.all_nym_nodes();
let annotations = state.node_annotations().await?;
let status_cache = &state.node_status_cache();
let annotations = status_cache.node_annotations().await?;
let interval = state.nym_contract_cache().current_interval().await?;
let current_key_rotation = state.nym_contract_cache().current_key_rotation_id().await?;
@@ -199,7 +201,7 @@ pub(crate) async fn nodes_basic(
// min of all caches
let refreshed_at = refreshed_at([
rewarded_set.timestamp(),
annotations.timestamp(),
status_cache.cache_timestamp().await,
describe_cache.timestamp(),
]);