Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d9a6db6c2f | |||
| 277305e68d | |||
| e7d88c3c28 | |||
| 0c97b51a4c | |||
| b326993da6 | |||
| 12ed915069 | |||
| 65166bfd61 | |||
| 987e890e99 | |||
| 692cd32ac2 | |||
| 0e544da06f | |||
| 6ceb6c4ebe | |||
| 1fadf8c289 | |||
| 76195158e2 |
Generated
+4
@@ -6241,6 +6241,7 @@ dependencies = [
|
||||
"nym-crypto",
|
||||
"nym-gateway",
|
||||
"nym-gateway-stats-storage",
|
||||
"nym-http-api-client",
|
||||
"nym-http-api-common",
|
||||
"nym-ip-packet-router",
|
||||
"nym-metrics",
|
||||
@@ -6932,6 +6933,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"nym-api-requests",
|
||||
"nym-client-core-config-types",
|
||||
"nym-config",
|
||||
"nym-crypto",
|
||||
"nym-mixnet-contract-common",
|
||||
@@ -6943,6 +6945,8 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror 2.0.12",
|
||||
"time",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tsify",
|
||||
"wasm-bindgen",
|
||||
|
||||
@@ -22,7 +22,7 @@ use crate::client::replies::reply_controller::{ReplyControllerReceiver, ReplyCon
|
||||
use crate::client::replies::reply_storage::{
|
||||
CombinedReplyStorage, PersistentReplyStorage, ReplyStorageBackend, SentReplyKeys,
|
||||
};
|
||||
use crate::client::topology_control::nym_api_provider::NymApiTopologyProvider;
|
||||
use crate::client::topology_control::smart_api_provider::NymApiTopologyProvider;
|
||||
use crate::client::topology_control::{
|
||||
TopologyAccessor, TopologyRefresher, TopologyRefresherConfig,
|
||||
};
|
||||
@@ -54,8 +54,7 @@ use nym_statistics_common::clients::ClientStatsSender;
|
||||
use nym_statistics_common::generate_client_stats_id;
|
||||
use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender, LaneQueueLengths};
|
||||
use nym_task::{TaskClient, TaskHandle};
|
||||
use nym_topology::provider_trait::TopologyProvider;
|
||||
use nym_topology::HardcodedTopologyProvider;
|
||||
use nym_topology::providers::{HardcodedTopologyProvider, TopologyProvider};
|
||||
use nym_validator_client::{nyxd::contract_traits::DkgQueryClient, UserAgent};
|
||||
use rand::rngs::OsRng;
|
||||
use std::fmt::Debug;
|
||||
@@ -557,6 +556,7 @@ where
|
||||
config_topology,
|
||||
nym_api_urls,
|
||||
user_agent,
|
||||
None,
|
||||
)),
|
||||
config::TopologyStructure::GeoAware(group_by) => {
|
||||
warn!("using deprecated 'GeoAware' topology provider - this option will be removed very soon");
|
||||
|
||||
@@ -3,7 +3,7 @@ use log::{debug, error};
|
||||
use nym_explorer_client::{ExplorerClient, PrettyDetailedMixNodeBond};
|
||||
use nym_network_defaults::var_names::EXPLORER_API;
|
||||
use nym_topology::{
|
||||
provider_trait::{async_trait, TopologyProvider},
|
||||
providers::{async_trait, TopologyProvider},
|
||||
NymTopology,
|
||||
};
|
||||
use nym_validator_client::client::NodeId;
|
||||
|
||||
@@ -19,11 +19,12 @@ use wasmtimer::tokio::sleep;
|
||||
mod accessor;
|
||||
pub mod geo_aware_provider;
|
||||
pub mod nym_api_provider;
|
||||
pub mod smart_api_provider;
|
||||
|
||||
#[allow(deprecated)]
|
||||
pub use geo_aware_provider::GeoAwareTopologyProvider;
|
||||
pub use nym_api_provider::{Config as NymApiTopologyProviderConfig, NymApiTopologyProvider};
|
||||
pub use nym_topology::provider_trait::TopologyProvider;
|
||||
pub use nym_topology::providers::TopologyProvider;
|
||||
pub use smart_api_provider::{Config as NymApiTopologyProviderConfig, NymApiTopologyProvider};
|
||||
|
||||
// TODO: move it to config later
|
||||
const MAX_FAILURE_COUNT: usize = 10;
|
||||
|
||||
@@ -3,8 +3,7 @@
|
||||
|
||||
use async_trait::async_trait;
|
||||
use log::{debug, error, warn};
|
||||
use nym_topology::provider_trait::TopologyProvider;
|
||||
use nym_topology::NymTopology;
|
||||
use nym_topology::{NymTopology, TopologyProvider};
|
||||
use nym_validator_client::UserAgent;
|
||||
use rand::prelude::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
|
||||
@@ -0,0 +1,230 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
//! Caching, piecewise API Topology Provider
|
||||
//!
|
||||
|
||||
#![warn(missing_docs)]
|
||||
|
||||
use async_trait::async_trait;
|
||||
use log::{debug, error, warn};
|
||||
pub use nym_topology::providers::piecewise::Config;
|
||||
use nym_topology::{
|
||||
providers::piecewise::{NymTopologyProvider, PiecewiseTopologyProvider},
|
||||
EpochRewardedSet, NymTopology, RoutingNode, TopologyProvider,
|
||||
};
|
||||
use nym_validator_client::UserAgent;
|
||||
use rand::{prelude::SliceRandom, thread_rng};
|
||||
use url::Url;
|
||||
|
||||
/// Topology Provider build around a cached piecewise provider that uses the Nym API to
|
||||
/// fetch changes and node details.
|
||||
#[derive(Clone)]
|
||||
pub struct NymApiTopologyProvider {
|
||||
inner: NymTopologyProvider<NymApiPiecewiseProvider>,
|
||||
}
|
||||
|
||||
impl NymApiTopologyProvider {
|
||||
/// Construct a new thread safe Cached topology provider using the Nym API
|
||||
pub fn new(
|
||||
config: impl Into<Config>,
|
||||
nym_api_urls: Vec<Url>,
|
||||
user_agent: Option<UserAgent>,
|
||||
initial_topology: Option<NymTopology>,
|
||||
) -> Self {
|
||||
let manager = NymApiPiecewiseProvider::new(nym_api_urls, user_agent);
|
||||
let inner = NymTopologyProvider::new(manager, config.into(), initial_topology);
|
||||
|
||||
Self { inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<NymTopologyProvider<NymApiPiecewiseProvider>> for NymApiTopologyProvider {
|
||||
fn as_ref(&self) -> &NymTopologyProvider<NymApiPiecewiseProvider> {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<NymTopologyProvider<NymApiPiecewiseProvider>> for NymApiTopologyProvider {
|
||||
fn as_mut(&mut self) -> &mut NymTopologyProvider<NymApiPiecewiseProvider> {
|
||||
&mut self.inner
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
#[async_trait]
|
||||
impl TopologyProvider for NymApiTopologyProvider {
|
||||
async fn get_new_topology(&mut self) -> Option<NymTopology> {
|
||||
self.as_mut().get_new_topology().await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
#[async_trait(?Send)]
|
||||
impl TopologyProvider for NymApiTopologyProvider {
|
||||
async fn get_new_topology(&mut self) -> Option<NymTopology> {
|
||||
self.as_mut().get_new_topology().await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct NymApiPiecewiseProvider {
|
||||
validator_client: nym_validator_client::client::NymApiClient,
|
||||
nym_api_urls: Vec<Url>,
|
||||
currently_used_api: usize,
|
||||
}
|
||||
|
||||
impl NymApiPiecewiseProvider {
|
||||
fn new(mut nym_api_urls: Vec<Url>, user_agent: Option<UserAgent>) -> Self {
|
||||
nym_api_urls.shuffle(&mut thread_rng());
|
||||
|
||||
let validator_client = if let Some(user_agent) = user_agent {
|
||||
nym_validator_client::client::NymApiClient::new_with_user_agent(
|
||||
nym_api_urls[0].clone(),
|
||||
user_agent,
|
||||
)
|
||||
} else {
|
||||
nym_validator_client::client::NymApiClient::new(nym_api_urls[0].clone())
|
||||
};
|
||||
|
||||
Self {
|
||||
validator_client,
|
||||
nym_api_urls,
|
||||
currently_used_api: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn use_next_nym_api(&mut self) {
|
||||
if self.nym_api_urls.len() == 1 {
|
||||
warn!("There's only a single nym API available - it won't be possible to use a different one");
|
||||
return;
|
||||
}
|
||||
|
||||
self.currently_used_api = (self.currently_used_api + 1) % self.nym_api_urls.len();
|
||||
self.validator_client
|
||||
.change_nym_api(self.nym_api_urls[self.currently_used_api].clone())
|
||||
}
|
||||
|
||||
async fn get_full_topology_inner(&mut self) -> Option<NymTopology> {
|
||||
let layer_assignments = self.get_layer_assignments().await?;
|
||||
|
||||
let mut topology = NymTopology::new_empty(layer_assignments);
|
||||
|
||||
let all_nodes = self
|
||||
.validator_client
|
||||
.get_all_basic_nodes()
|
||||
.await
|
||||
.inspect_err(|err| {
|
||||
self.use_next_nym_api();
|
||||
error!("failed to get network nodes: {err}");
|
||||
})
|
||||
.ok()?;
|
||||
|
||||
debug!("there are {} nodes on the network", all_nodes.len());
|
||||
topology.add_additional_nodes(all_nodes.iter());
|
||||
|
||||
if !topology.is_minimally_routable() {
|
||||
error!("the current filtered active topology can't be used to construct any packets");
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(topology)
|
||||
}
|
||||
|
||||
async fn get_descriptor_batch_inner(&mut self, ids: &[u32]) -> Option<Vec<RoutingNode>> {
|
||||
// Does this need to return a hashmap of RoutingNodes? that is moderately inconvenient
|
||||
// especially when the nodes themselves contain their node_id unless we expect to directly
|
||||
// use the result of this fn for lookups where we would otherwise for example, have to
|
||||
// iterate over a whole vec to find a specific node_id.
|
||||
let descriptor_vec = self
|
||||
.validator_client
|
||||
.retrieve_basic_nodes_batch(ids)
|
||||
.await
|
||||
.inspect_err(|err| {
|
||||
self.use_next_nym_api();
|
||||
error!("failed to get current rewarded set: {err}");
|
||||
})
|
||||
.ok()?;
|
||||
|
||||
let mut out = Vec::new();
|
||||
for node in descriptor_vec {
|
||||
if let Ok(routing_node) = RoutingNode::try_from(&node) {
|
||||
out.push(routing_node);
|
||||
}
|
||||
}
|
||||
Some(out)
|
||||
}
|
||||
|
||||
async fn get_layer_assignments_inner(&mut self) -> Option<EpochRewardedSet> {
|
||||
self.validator_client
|
||||
.get_current_rewarded_set()
|
||||
.await
|
||||
.inspect_err(|err| {
|
||||
self.use_next_nym_api();
|
||||
error!("failed to get current rewarded set: {err}");
|
||||
})
|
||||
.ok()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
#[async_trait]
|
||||
impl PiecewiseTopologyProvider for NymApiPiecewiseProvider {
|
||||
async fn get_full_topology(&mut self) -> Option<NymTopology> {
|
||||
self.get_full_topology_inner().await
|
||||
}
|
||||
|
||||
async fn get_descriptor_batch(&mut self, ids: &[u32]) -> Option<Vec<RoutingNode>> {
|
||||
self.get_descriptor_batch_inner(ids).await
|
||||
}
|
||||
|
||||
async fn get_layer_assignments(&mut self) -> Option<EpochRewardedSet> {
|
||||
self.get_layer_assignments_inner().await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
#[async_trait(?Send)]
|
||||
impl PiecewiseTopologyProvider for NymApiPiecewiseProvider {
|
||||
async fn get_full_topology(&mut self) -> Option<NymTopology> {
|
||||
self.get_full_topology_inner().await
|
||||
}
|
||||
|
||||
async fn get_descriptor_batch(&mut self, ids: &[u32]) -> Option<Vec<RoutingNode>> {
|
||||
self.get_descriptor_batch_inner(ids).await
|
||||
}
|
||||
|
||||
async fn get_layer_assignments(&mut self) -> Option<EpochRewardedSet> {
|
||||
self.get_layer_assignments_inner().await
|
||||
}
|
||||
}
|
||||
|
||||
// // Test requires running a local instance of the nym-api binary, for example using:
|
||||
// // `RUST_LOG="info" ./target/debug/nym-api run --nyxd-validator "https://rpc.nymtech.net"`
|
||||
|
||||
// #[cfg(test)]
|
||||
// mod test {
|
||||
// use std::time::Duration;
|
||||
|
||||
// use super::*;
|
||||
// use nym_bin_common::logging::setup_tracing_logger;
|
||||
|
||||
// #[tokio::test]
|
||||
// async fn local_api_provider_test() {
|
||||
// setup_tracing_logger();
|
||||
// let mut provider = NymApiTopologyProvider::new(
|
||||
// Config::default(),
|
||||
// vec!["http://localhost:8000"
|
||||
// .parse()
|
||||
// .expect("failed to parse api url")],
|
||||
// None,
|
||||
// None,
|
||||
// );
|
||||
|
||||
// for _ in 0..180 {
|
||||
// let topo = provider.get_new_topology().await;
|
||||
// assert!(topo.is_some());
|
||||
// tokio::time::sleep(Duration::from_secs(30)).await;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
@@ -14,7 +14,8 @@ pub mod error;
|
||||
pub mod init;
|
||||
|
||||
pub use nym_topology::{
|
||||
HardcodedTopologyProvider, NymRouteProvider, NymTopology, NymTopologyError, TopologyProvider,
|
||||
providers::HardcodedTopologyProvider, NymRouteProvider, NymTopology, NymTopologyError,
|
||||
TopologyProvider,
|
||||
};
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
|
||||
@@ -498,6 +498,21 @@ impl NymApiClient {
|
||||
Ok(nodes)
|
||||
}
|
||||
|
||||
/// Batch request for node descriptors in the current topology.
|
||||
///
|
||||
/// Given the set of node IDs included in the request body, provide the descriptor for each
|
||||
/// associated node in if it is available in the current topology.
|
||||
pub async fn retrieve_basic_nodes_batch(
|
||||
&self,
|
||||
node_ids: &[u32],
|
||||
) -> Result<Vec<SkimmedNode>, ValidatorClientError> {
|
||||
Ok(self
|
||||
.nym_api
|
||||
.retrieve_basic_nodes_batch(node_ids)
|
||||
.await?
|
||||
.nodes)
|
||||
}
|
||||
|
||||
pub async fn health(&self) -> Result<ApiHealthResponse, ValidatorClientError> {
|
||||
Ok(self.nym_api.health().await?)
|
||||
}
|
||||
|
||||
@@ -270,10 +270,10 @@ pub trait NymApiClientExt: ApiClient {
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"unstable",
|
||||
routes::UNSTABLE,
|
||||
routes::NYM_NODES_ROUTES,
|
||||
"mixnodes",
|
||||
"skimmed",
|
||||
routes::SKIMMED,
|
||||
],
|
||||
NO_PARAMS,
|
||||
)
|
||||
@@ -286,10 +286,10 @@ pub trait NymApiClientExt: ApiClient {
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"unstable",
|
||||
routes::UNSTABLE,
|
||||
routes::NYM_NODES_ROUTES,
|
||||
"gateways",
|
||||
"skimmed",
|
||||
routes::SKIMMED,
|
||||
],
|
||||
NO_PARAMS,
|
||||
)
|
||||
@@ -335,9 +335,9 @@ pub trait NymApiClientExt: ApiClient {
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"unstable",
|
||||
routes::UNSTABLE,
|
||||
routes::NYM_NODES_ROUTES,
|
||||
"skimmed",
|
||||
routes::SKIMMED,
|
||||
"entry-gateways",
|
||||
"all",
|
||||
],
|
||||
@@ -372,9 +372,9 @@ pub trait NymApiClientExt: ApiClient {
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"unstable",
|
||||
routes::UNSTABLE,
|
||||
routes::NYM_NODES_ROUTES,
|
||||
"skimmed",
|
||||
routes::SKIMMED,
|
||||
"mixnodes",
|
||||
"active",
|
||||
],
|
||||
@@ -409,9 +409,9 @@ pub trait NymApiClientExt: ApiClient {
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"unstable",
|
||||
routes::UNSTABLE,
|
||||
routes::NYM_NODES_ROUTES,
|
||||
"skimmed",
|
||||
routes::SKIMMED,
|
||||
"mixnodes",
|
||||
"all",
|
||||
],
|
||||
@@ -420,6 +420,31 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Send a Post request with a set of node ids. A successful response will contain descriptors
|
||||
/// for all nodes associated with those node IDs available in the current full topology.
|
||||
///
|
||||
/// If a provided node ID is not present there will be no descriptor for that node in the response.
|
||||
///
|
||||
/// If no node IDs are provided the response will contain no descriptors.
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn retrieve_basic_nodes_batch(
|
||||
&self,
|
||||
node_ids: &[NodeId],
|
||||
) -> Result<CachedNodesResponse<SkimmedNode>, NymAPIError> {
|
||||
self.post_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
routes::UNSTABLE,
|
||||
routes::NYM_NODES_ROUTES,
|
||||
routes::SKIMMED,
|
||||
routes::BATCH,
|
||||
],
|
||||
NO_PARAMS,
|
||||
node_ids,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_basic_nodes(
|
||||
&self,
|
||||
@@ -444,9 +469,9 @@ pub trait NymApiClientExt: ApiClient {
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"unstable",
|
||||
routes::UNSTABLE,
|
||||
routes::NYM_NODES_ROUTES,
|
||||
"skimmed",
|
||||
routes::SKIMMED,
|
||||
],
|
||||
¶ms,
|
||||
)
|
||||
|
||||
@@ -74,3 +74,7 @@ pub const SERVICE_PROVIDERS: &str = "services";
|
||||
pub const DETAILS: &str = "details";
|
||||
pub const CHAIN_STATUS: &str = "chain-status";
|
||||
pub const NETWORK: &str = "network";
|
||||
|
||||
pub const UNSTABLE: &str = "unstable";
|
||||
pub const SKIMMED: &str = "skimmed";
|
||||
pub const BATCH: &str = "batch";
|
||||
|
||||
@@ -17,6 +17,9 @@ rand = { workspace = true }
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
thiserror = { workspace = true }
|
||||
time = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros", "sync"] }
|
||||
|
||||
|
||||
# 'serde' feature
|
||||
serde_json = { workspace = true, optional = true }
|
||||
@@ -35,7 +38,9 @@ nym-sphinx-types = { path = "../nymsphinx/types", features = [
|
||||
"outfox",
|
||||
] }
|
||||
nym-sphinx-routing = { path = "../nymsphinx/routing" }
|
||||
|
||||
nym-client-core-config-types = { path = "../client-core/config-types", features = [
|
||||
"disk-persistence",
|
||||
] }
|
||||
|
||||
# I'm not sure how to feel about pulling in this dependency here...
|
||||
nym-api-requests = { path = "../../nym-api/nym-api-requests" }
|
||||
@@ -45,8 +50,8 @@ nym-api-requests = { path = "../../nym-api/nym-api-requests" }
|
||||
wasm-utils = { path = "../wasm/utils", default-features = false, optional = true }
|
||||
|
||||
[features]
|
||||
default = ["provider-trait"]
|
||||
provider-trait = ["async-trait"]
|
||||
default = ["providers"]
|
||||
providers = ["async-trait"]
|
||||
wasm-serde-types = ["tsify", "wasm-bindgen", "wasm-utils"]
|
||||
persistence = ["serde_json"]
|
||||
outfox = []
|
||||
|
||||
+31
-16
@@ -23,13 +23,13 @@ pub mod error;
|
||||
pub mod node;
|
||||
pub mod rewarded_set;
|
||||
|
||||
#[cfg(feature = "provider-trait")]
|
||||
pub mod provider_trait;
|
||||
#[cfg(feature = "wasm-serde-types")]
|
||||
pub mod wasm_helpers;
|
||||
|
||||
#[cfg(feature = "provider-trait")]
|
||||
pub use provider_trait::{HardcodedTopologyProvider, TopologyProvider};
|
||||
#[cfg(feature = "providers")]
|
||||
pub mod providers;
|
||||
#[cfg(feature = "providers")]
|
||||
pub use providers::TopologyProvider;
|
||||
|
||||
#[deprecated]
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -91,18 +91,6 @@ mod deprecated_network_address_impls {
|
||||
|
||||
pub type MixLayer = u8;
|
||||
|
||||
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
|
||||
pub struct NymTopology {
|
||||
// for the purposes of future VRF, everyone will need the same view of the network, regardless of performance filtering
|
||||
// so we use the same 'master' rewarded set information for that
|
||||
//
|
||||
// how do we solve the problem of "we have to go through a node that we want to filter out?"
|
||||
// ¯\_(ツ)_/¯ that's a future problem
|
||||
rewarded_set: CachedEpochRewardedSet,
|
||||
|
||||
node_details: HashMap<NodeId, RoutingNode>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct NymRouteProvider {
|
||||
pub topology: NymTopology,
|
||||
@@ -189,6 +177,18 @@ impl NymRouteProvider {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
|
||||
pub struct NymTopology {
|
||||
// for the purposes of future VRF, everyone will need the same view of the network, regardless of performance filtering
|
||||
// so we use the same 'master' rewarded set information for that
|
||||
//
|
||||
// how do we solve the problem of "we have to go through a node that we want to filter out?"
|
||||
// ¯\_(ツ)_/¯ that's a future problem
|
||||
rewarded_set: CachedEpochRewardedSet,
|
||||
|
||||
node_details: HashMap<NodeId, RoutingNode>,
|
||||
}
|
||||
|
||||
impl NymTopology {
|
||||
pub fn new_empty(rewarded_set: impl Into<CachedEpochRewardedSet>) -> Self {
|
||||
NymTopology {
|
||||
@@ -546,4 +546,19 @@ impl NymTopology {
|
||||
.values()
|
||||
.filter(|n| self.rewarded_set.is_active_mixnode(&n.node_id))
|
||||
}
|
||||
|
||||
pub fn all_nodes(&self) -> impl Iterator<Item = &RoutingNode> {
|
||||
self.node_details.values()
|
||||
}
|
||||
|
||||
pub fn all_node_ids(&self) -> impl Iterator<Item = &NodeId> {
|
||||
self.node_details.keys()
|
||||
}
|
||||
|
||||
pub fn gateways(&self) -> impl Iterator<Item = &RoutingNode> {
|
||||
self.node_details.values().filter(|n| {
|
||||
self.rewarded_set.entry_gateways.contains(&n.node_id)
|
||||
|| self.rewarded_set.exit_gateways.contains(&n.node_id)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize};
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use thiserror::Error;
|
||||
|
||||
pub use nym_mixnet_contract_common::reward_params::Performance;
|
||||
pub use nym_mixnet_contract_common::LegacyMixLayer;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
@@ -19,7 +20,7 @@ pub enum RoutingNodeError {
|
||||
NoIpAddressesProvided { node_id: NodeId, identity: String },
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub struct EntryDetails {
|
||||
// to allow client to choose ipv6 preference, if available
|
||||
pub ip_addresses: Vec<IpAddr>,
|
||||
@@ -28,7 +29,7 @@ pub struct EntryDetails {
|
||||
pub clients_wss_port: Option<u16>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
|
||||
pub struct SupportedRoles {
|
||||
pub mixnode: bool,
|
||||
pub mixnet_entry: bool,
|
||||
@@ -45,7 +46,7 @@ impl From<DeclaredRoles> for SupportedRoles {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub struct RoutingNode {
|
||||
pub node_id: NodeId,
|
||||
|
||||
@@ -56,6 +57,7 @@ pub struct RoutingNode {
|
||||
pub sphinx_key: x25519::PublicKey,
|
||||
|
||||
pub supported_roles: SupportedRoles,
|
||||
pub performance: Performance,
|
||||
}
|
||||
|
||||
impl RoutingNode {
|
||||
@@ -109,6 +111,12 @@ impl<'a> From<&'a RoutingNode> for SphinxNode {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a RoutingNode> for RoutingNode {
|
||||
fn from(node: &'a RoutingNode) -> Self {
|
||||
node.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> TryFrom<&'a SkimmedNode> for RoutingNode {
|
||||
type Error = RoutingNodeError;
|
||||
|
||||
@@ -138,6 +146,7 @@ impl<'a> TryFrom<&'a SkimmedNode> for RoutingNode {
|
||||
identity_key: value.ed25519_identity_pubkey,
|
||||
sphinx_key: value.x25519_sphinx_pubkey,
|
||||
supported_roles: value.supported_roles.into(),
|
||||
performance: value.performance,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,9 @@
|
||||
use crate::NymTopology;
|
||||
pub use async_trait::async_trait;
|
||||
|
||||
// hehe, wasm
|
||||
/// Cached Topology Provider built using efficient piecewise requests.
|
||||
pub mod piecewise;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
#[async_trait]
|
||||
pub trait TopologyProvider: Send {
|
||||
@@ -0,0 +1,482 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
//!
|
||||
//!
|
||||
#![warn(missing_docs)]
|
||||
|
||||
use crate::{EpochRewardedSet, NymTopology, RoutingNode, TopologyProvider};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
use std::{cmp::min, collections::HashSet, sync::Arc, time::Duration};
|
||||
|
||||
/// Topology filtering and caching configuration
|
||||
#[derive(Debug)]
|
||||
pub struct Config {
|
||||
/// Specifies a minimum performance of a mixnode that is used on route construction.
|
||||
/// This setting is only applicable when `NymApi` topology is used.
|
||||
pub min_mixnode_performance: u8,
|
||||
|
||||
/// Specifies a minimum performance of a gateway that is used on route construction.
|
||||
/// This setting is only applicable when `NymApi` topology is used.
|
||||
pub min_gateway_performance: u8,
|
||||
|
||||
/// Specifies whether this client should attempt to retrieve all available network nodes
|
||||
/// as opposed to just active mixnodes/gateways.
|
||||
pub use_extended_topology: bool,
|
||||
|
||||
/// Specifies whether this client should ignore the current epoch role of the target egress node
|
||||
/// when constructing the final hop packets.
|
||||
pub ignore_egress_epoch_role: bool,
|
||||
|
||||
/// Minimum duration during which querying the topology will NOT attempt to re-fetch data, and
|
||||
/// will be served from cache.
|
||||
pub cache_ttl: Duration,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
min_mixnode_performance: 50,
|
||||
min_gateway_performance: 0,
|
||||
use_extended_topology: false,
|
||||
ignore_egress_epoch_role: true,
|
||||
cache_ttl: Self::DEFAULT_TOPOLOGY_CACHE_TTL,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<nym_client_core_config_types::Topology> for Config {
|
||||
fn from(value: nym_client_core_config_types::Topology) -> Self {
|
||||
Config {
|
||||
min_mixnode_performance: value.minimum_mixnode_performance,
|
||||
min_gateway_performance: value.minimum_gateway_performance,
|
||||
use_extended_topology: value.use_extended_topology,
|
||||
ignore_egress_epoch_role: value.ignore_egress_epoch_role,
|
||||
cache_ttl: value.topology_refresh_rate,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Config {
|
||||
/// Default duration during which the topology will be reproduced from cache.
|
||||
pub const DEFAULT_TOPOLOGY_CACHE_TTL: Duration = Duration::from_secs(120);
|
||||
|
||||
// if we're using 'extended' topology, filter the nodes based on the lowest set performance
|
||||
fn min_node_performance(&self) -> u8 {
|
||||
min(self.min_mixnode_performance, self.min_gateway_performance)
|
||||
}
|
||||
}
|
||||
|
||||
/// Topology Provider build around a cached piecewise provider that uses the Nym API to
|
||||
/// fetch changes and node details.
|
||||
#[derive(Clone)]
|
||||
pub struct NymTopologyProvider<M: PiecewiseTopologyProvider> {
|
||||
inner: Arc<Mutex<NymTopologyProviderInner<M>>>,
|
||||
}
|
||||
|
||||
impl<M: PiecewiseTopologyProvider> NymTopologyProvider<M> {
|
||||
/// Construct a new thread safe Cached topology provider
|
||||
pub fn new(
|
||||
manager: M,
|
||||
config: Config,
|
||||
initial_topology: Option<NymTopology>,
|
||||
) -> NymTopologyProvider<M> {
|
||||
let inner = NymTopologyProviderInner::new(config, manager, initial_topology);
|
||||
|
||||
NymTopologyProvider {
|
||||
inner: Arc::new(Mutex::new(inner)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Bypass the caching for the topology and force a check for the latest updates next time the
|
||||
/// topology is requested. This fn requires async to get lock in case other threads have access
|
||||
/// to the cached topology state.
|
||||
pub async fn force_refresh(&self) {
|
||||
let mut guard = self.inner.lock().await;
|
||||
guard.cached_at = OffsetDateTime::UNIX_EPOCH;
|
||||
}
|
||||
|
||||
/// Remove all stored topology state. The next time the topology is requested this will force a
|
||||
/// pull of all topology information. This fn requires async to get lock in case other threads
|
||||
/// have access to the cached topology state.
|
||||
///
|
||||
/// WARNING: This may be slow / require non-trivial bandwidth.
|
||||
pub async fn force_clear(&self) {
|
||||
let mut guard = self.inner.lock().await;
|
||||
guard.cached = None;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
#[async_trait]
|
||||
impl<M: PiecewiseTopologyProvider> TopologyProvider for NymTopologyProvider<M> {
|
||||
async fn get_new_topology(&mut self) -> Option<NymTopology> {
|
||||
let mut guard = self.inner.lock().await;
|
||||
// check the cache
|
||||
if let Some(cached) = guard.get_current_compatible_topology().await {
|
||||
return Some(cached);
|
||||
}
|
||||
|
||||
// not cached, or cache expired. try update.
|
||||
guard.update_cache().await;
|
||||
guard.get_current_compatible_topology().await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
#[async_trait(?Send)]
|
||||
impl<M: PiecewiseTopologyProvider> TopologyProvider for NymTopologyProvider<M> {
|
||||
async fn get_new_topology(&mut self) -> Option<NymTopology> {
|
||||
let mut guard = self.inner.lock().await;
|
||||
// check the cache
|
||||
if let Some(cached) = guard.get_current_compatible_topology().await {
|
||||
return Some(cached);
|
||||
}
|
||||
|
||||
// not cached, or cache expired. try update.
|
||||
guard.update_cache().await;
|
||||
guard.get_current_compatible_topology().await
|
||||
}
|
||||
}
|
||||
|
||||
struct NymTopologyProviderInner<M: PiecewiseTopologyProvider> {
|
||||
config: Config,
|
||||
|
||||
cached: Option<NymTopology>,
|
||||
cached_at: OffsetDateTime,
|
||||
|
||||
topology_manager: M,
|
||||
}
|
||||
|
||||
impl<M: PiecewiseTopologyProvider> NymTopologyProviderInner<M> {
|
||||
pub fn new(
|
||||
config: impl Into<Config>,
|
||||
manager: M,
|
||||
initial_topology: Option<NymTopology>,
|
||||
) -> Self {
|
||||
Self {
|
||||
config: config.into(),
|
||||
cached_at: OffsetDateTime::UNIX_EPOCH,
|
||||
cached: initial_topology,
|
||||
topology_manager: manager,
|
||||
}
|
||||
}
|
||||
|
||||
fn cached_topology(&self) -> Option<NymTopology> {
|
||||
if let Some(cached_topology) = &self.cached {
|
||||
if self.cached_at + self.config.cache_ttl > OffsetDateTime::now_utc() {
|
||||
return Some(cached_topology.clone());
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
async fn update_cache(&mut self) {
|
||||
if let Some(ref mut cached_topology) = self.cached {
|
||||
// get layer assignment map
|
||||
let response = self.topology_manager.get_layer_assignments().await;
|
||||
if response.is_none() {
|
||||
warn!("pulled layer assignments and got no response");
|
||||
self.cached_at = OffsetDateTime::now_utc();
|
||||
return;
|
||||
}
|
||||
|
||||
let layer_assignments = response.unwrap();
|
||||
// Check if we already know about the epoch
|
||||
if cached_topology.rewarded_set.epoch_id == layer_assignments.epoch_id {
|
||||
debug!("pulled layer assignments, epoch already known");
|
||||
self.cached_at = OffsetDateTime::now_utc();
|
||||
return;
|
||||
}
|
||||
|
||||
cached_topology.rewarded_set = layer_assignments.into();
|
||||
|
||||
// get the set of node IDs
|
||||
let new_id_set = cached_topology.rewarded_set.all_ids();
|
||||
let known_id_set = HashSet::<u32>::from_iter(cached_topology.all_node_ids().copied());
|
||||
let unknown_node_ids: Vec<_> = new_id_set.difference(&known_id_set).copied().collect();
|
||||
|
||||
// Pull node descriptors for unknown IDs
|
||||
let response = self
|
||||
.topology_manager
|
||||
.get_descriptor_batch(&unknown_node_ids[..])
|
||||
.await;
|
||||
|
||||
// Add the new nodes to our cached topology
|
||||
if let Some(new_descriptors) = response {
|
||||
cached_topology.add_routing_nodes(new_descriptors);
|
||||
}
|
||||
|
||||
// double check that we have the expected nodes
|
||||
let known_id_set = HashSet::<u32>::from_iter(cached_topology.all_node_ids().copied());
|
||||
let unknown_node_ids: Vec<_> = new_id_set.difference(&known_id_set).collect();
|
||||
if !unknown_node_ids.is_empty() {
|
||||
warn!(
|
||||
"still missing descriptors for nodes in the assigned set: {:?}",
|
||||
unknown_node_ids
|
||||
);
|
||||
}
|
||||
} else {
|
||||
self.cached = self.topology_manager.get_full_topology().await;
|
||||
}
|
||||
|
||||
self.cached_at = OffsetDateTime::now_utc();
|
||||
}
|
||||
|
||||
/// Gets the current topology state using `Self::cached_topology` and then applies any filters
|
||||
/// defined in the provided Config.
|
||||
async fn get_current_compatible_topology(&mut self) -> Option<NymTopology> {
|
||||
let full_topology = self.cached_topology()?;
|
||||
|
||||
let mut topology = NymTopology::new_empty(full_topology.rewarded_set().clone());
|
||||
|
||||
if self.config.use_extended_topology {
|
||||
topology.add_additional_nodes(full_topology.all_nodes().filter(|n| {
|
||||
n.performance.round_to_integer() >= self.config.min_node_performance()
|
||||
}));
|
||||
|
||||
return Some(full_topology);
|
||||
}
|
||||
|
||||
topology.add_additional_nodes(
|
||||
full_topology.mixnodes().filter(|m| {
|
||||
m.performance.round_to_integer() >= self.config.min_mixnode_performance
|
||||
}),
|
||||
);
|
||||
topology.add_additional_nodes(
|
||||
full_topology.gateways().filter(|m| {
|
||||
m.performance.round_to_integer() >= self.config.min_gateway_performance
|
||||
}),
|
||||
);
|
||||
|
||||
Some(topology)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
#[async_trait]
|
||||
impl<P: PiecewiseTopologyProvider> TopologyProvider for NymTopologyProviderInner<P> {
|
||||
async fn get_new_topology(&mut self) -> Option<NymTopology> {
|
||||
self.get_current_compatible_topology().await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
#[async_trait(?Send)]
|
||||
impl<P: PiecewiseTopologyProvider> TopologyProvider for NymTopologyProviderInner<P> {
|
||||
async fn get_new_topology(&mut self) -> Option<NymTopology> {
|
||||
self.get_current_compatible_topology().await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
/// Trait allowing construction and upkeep of a
|
||||
#[async_trait]
|
||||
pub trait PiecewiseTopologyProvider: Send {
|
||||
/// Pull a copy of the full topology.
|
||||
///
|
||||
/// This is intended to be used sparingly as repeated usage could result in fetching duplicate
|
||||
/// information more often than necessary.
|
||||
async fn get_full_topology(&mut self) -> Option<NymTopology>;
|
||||
|
||||
/// Fetch a node descriptors for the set of provided IDs if available.
|
||||
async fn get_descriptor_batch(&mut self, ids: &[u32]) -> Option<Vec<RoutingNode>>;
|
||||
|
||||
/// Fetch the latest mapping of node IDs to Nym Network layer.
|
||||
async fn get_layer_assignments(&mut self) -> Option<EpochRewardedSet>;
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
/// Trait allowing construction and upkeep of a
|
||||
#[async_trait(?Send)]
|
||||
pub trait PiecewiseTopologyProvider: Send {
|
||||
/// Pull a copy of the full topology.
|
||||
///
|
||||
/// This is intended to be used sparingly as repeated usage could result in fetching duplicate
|
||||
/// information more often than necessary.
|
||||
async fn get_full_topology(&mut self) -> Option<NymTopology>;
|
||||
|
||||
/// Fetch a node descriptors for the set of provided IDs if available.
|
||||
async fn get_descriptor_batch(&mut self, ids: &[u32]) -> Option<Vec<RoutingNode>>;
|
||||
|
||||
/// Fetch the latest mapping of node IDs to Nym Network layer.
|
||||
async fn get_layer_assignments(&mut self) -> Option<EpochRewardedSet>;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::SupportedRoles;
|
||||
use nym_crypto::asymmetric::encryption::PublicKey as SphinxPubkey;
|
||||
use nym_crypto::asymmetric::identity::PublicKey as IdentityPubkey;
|
||||
use nym_mixnet_contract_common::Percent;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct PassthroughPiecewiseTopologyProvider {
|
||||
topo: NymTopology,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl PiecewiseTopologyProvider for PassthroughPiecewiseTopologyProvider {
|
||||
async fn get_full_topology(&mut self) -> Option<NymTopology> {
|
||||
Some(self.topo.clone())
|
||||
}
|
||||
|
||||
async fn get_descriptor_batch(&mut self, ids: &[u32]) -> Option<Vec<RoutingNode>> {
|
||||
let mut nodes = Vec::new();
|
||||
ids.iter().for_each(|id| {
|
||||
if let Some(node) = self.topo.node_details.get(id) {
|
||||
nodes.push(node.clone());
|
||||
}
|
||||
});
|
||||
|
||||
Some(nodes)
|
||||
}
|
||||
|
||||
async fn get_layer_assignments(&mut self) -> Option<EpochRewardedSet> {
|
||||
return Some(self.topo.rewarded_set.clone().into());
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_topology_provider() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let mut topo_mgr = PassthroughPiecewiseTopologyProvider {
|
||||
topo: NymTopology::default(),
|
||||
};
|
||||
|
||||
let mut topo_provider =
|
||||
NymTopologyProviderInner::new(Config::default(), topo_mgr.clone(), None);
|
||||
|
||||
// No initial topology was provided, No update has run yet, None should be returned
|
||||
assert_eq!(topo_provider.cached_topology(), None);
|
||||
|
||||
// force an update of the cached topology
|
||||
topo_provider.update_cache().await;
|
||||
|
||||
let topo = topo_provider.cached_topology();
|
||||
assert!(topo.is_some());
|
||||
let topo = topo.unwrap();
|
||||
assert!(topo.is_empty());
|
||||
|
||||
// create a change in the manager to make sure it is propogated to the provider cache on update
|
||||
topo_mgr.topo.rewarded_set.epoch_id += 1;
|
||||
topo_mgr.topo.rewarded_set.entry_gateways = HashSet::from([123]);
|
||||
assert_eq!(topo_mgr.topo.node_details.insert(123, fake_node(123)), None);
|
||||
topo_provider.topology_manager = topo_mgr.clone();
|
||||
|
||||
// force an update of the cached topology
|
||||
topo_provider.update_cache().await;
|
||||
|
||||
let topo = topo_provider.cached_topology();
|
||||
assert!(topo.is_some());
|
||||
let topo1 = topo.unwrap();
|
||||
assert!(!topo1.is_empty());
|
||||
assert!(topo1.node_details.contains_key(&123));
|
||||
|
||||
// try forcing an update even though the epoch has not changed. Should result in no change
|
||||
topo_provider.update_cache().await;
|
||||
let topo2 = topo_provider.cached_topology().unwrap();
|
||||
assert_eq!(topo1, topo2);
|
||||
|
||||
// Add a node without a descriptor to make sure warning is printed.
|
||||
topo_mgr.topo.rewarded_set.epoch_id += 1;
|
||||
topo_mgr.topo.rewarded_set.entry_gateways = HashSet::from([123, 456]);
|
||||
topo_provider.topology_manager = topo_mgr.clone();
|
||||
|
||||
// try forcing an update even though the epoch has not changed. Should result in no change
|
||||
topo_provider.update_cache().await;
|
||||
let _ = topo_provider.cached_topology().unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_topology_provider_by_trait() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let mut topo_mgr = PassthroughPiecewiseTopologyProvider {
|
||||
topo: NymTopology::default(),
|
||||
};
|
||||
|
||||
let mut topo_provider = NymTopologyProvider::new(topo_mgr.clone(), Config::default(), None);
|
||||
|
||||
// No initial topology was provided, the NymTopologyProvider should do an update from the
|
||||
// manager to build its cache. This should be our empty topology initialized in the manage
|
||||
// above
|
||||
let maybe_topo = topo_provider.get_new_topology().await;
|
||||
assert!(maybe_topo.is_some());
|
||||
let topo1 = maybe_topo.unwrap();
|
||||
assert!(topo1.is_empty());
|
||||
|
||||
// Try pulling again, should give response from cache because we are under ttl
|
||||
let maybe_topo = topo_provider.get_new_topology().await;
|
||||
assert!(maybe_topo.is_some());
|
||||
let topo2 = maybe_topo.unwrap();
|
||||
assert_eq!(topo1, topo2);
|
||||
|
||||
// create a change in the manager
|
||||
topo_mgr.topo.rewarded_set.epoch_id += 1;
|
||||
topo_mgr.topo.rewarded_set.entry_gateways = HashSet::from([123]);
|
||||
assert_eq!(topo_mgr.topo.node_details.insert(123, fake_node(123)), None);
|
||||
{
|
||||
let mut guard = topo_provider.inner.lock().await;
|
||||
guard.topology_manager = topo_mgr.clone();
|
||||
drop(guard)
|
||||
}
|
||||
|
||||
// The NymTopologyProvider should still serve from cache because we haven't crossed ttl
|
||||
// despite updates being available in the manager
|
||||
let maybe_topo = topo_provider.get_new_topology().await;
|
||||
assert!(maybe_topo.is_some());
|
||||
let topo3 = maybe_topo.unwrap();
|
||||
assert_eq!(topo2, topo3);
|
||||
|
||||
// force ttl timeout should allow refresh that includes latest changes from manager
|
||||
topo_provider.force_refresh().await;
|
||||
let maybe_topo = topo_provider.get_new_topology().await;
|
||||
assert!(maybe_topo.is_some());
|
||||
let topo4 = maybe_topo.unwrap();
|
||||
assert_ne!(topo3, topo4);
|
||||
assert!(topo4.node_details.contains_key(&123));
|
||||
|
||||
// create another change in the manager
|
||||
topo_mgr.topo.rewarded_set.epoch_id += 1;
|
||||
topo_mgr.topo.rewarded_set.entry_gateways = HashSet::from([123, 456]);
|
||||
assert_eq!(topo_mgr.topo.node_details.insert(456, fake_node(456)), None);
|
||||
{
|
||||
let mut guard = topo_provider.inner.lock().await;
|
||||
guard.topology_manager = topo_mgr.clone();
|
||||
drop(guard)
|
||||
}
|
||||
|
||||
// force clear cache should also pull latest full topology
|
||||
topo_provider.force_clear().await;
|
||||
let maybe_topo = topo_provider.get_new_topology().await;
|
||||
assert!(maybe_topo.is_some());
|
||||
let topo5 = maybe_topo.unwrap();
|
||||
assert!(topo5.node_details.contains_key(&456));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn fake_node(node_id: u32) -> RoutingNode {
|
||||
RoutingNode {
|
||||
node_id,
|
||||
mix_host: "127.0.0.1:2345".parse().unwrap(),
|
||||
entry: None,
|
||||
identity_key: IdentityPubkey::from_bytes(&[0u8; 32][..]).unwrap(),
|
||||
sphinx_key: SphinxPubkey::from_bytes(&[0u8; 32][..]).unwrap(),
|
||||
supported_roles: SupportedRoles {
|
||||
mixnode: true,
|
||||
mixnet_entry: true,
|
||||
mixnet_exit: true,
|
||||
},
|
||||
performance: Percent::hundred(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6,7 +6,7 @@ use nym_mixnet_contract_common::{EpochId, EpochRewardedSet, NodeId, RewardedSet}
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashSet;
|
||||
|
||||
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
|
||||
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
|
||||
pub struct CachedEpochRewardedSet {
|
||||
pub epoch_id: EpochId,
|
||||
|
||||
@@ -119,4 +119,22 @@ impl CachedEpochRewardedSet {
|
||||
mixnodes.extend(&self.layer3);
|
||||
mixnodes
|
||||
}
|
||||
|
||||
pub fn all_ids(&self) -> HashSet<NodeId> {
|
||||
let mut mixnodes = HashSet::with_capacity(
|
||||
self.entry_gateways.len()
|
||||
+ self.exit_gateways.len()
|
||||
+ self.layer1.len()
|
||||
+ self.layer2.len()
|
||||
+ self.layer3.len()
|
||||
+ self.standby.len(),
|
||||
);
|
||||
mixnodes.extend(&self.entry_gateways);
|
||||
mixnodes.extend(&self.exit_gateways);
|
||||
mixnodes.extend(&self.layer1);
|
||||
mixnodes.extend(&self.layer2);
|
||||
mixnodes.extend(&self.layer3);
|
||||
mixnodes.extend(&self.standby);
|
||||
mixnodes
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
use crate::node::{EntryDetails, RoutingNode, RoutingNodeError, SupportedRoles};
|
||||
use crate::{CachedEpochRewardedSet, NymTopology};
|
||||
use nym_mixnet_contract_common::reward_params::Performance;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
@@ -105,6 +106,8 @@ impl TryFrom<WasmFriendlyRoutingNode> for RoutingNode {
|
||||
}
|
||||
})?,
|
||||
supported_roles: value.supported_roles,
|
||||
|
||||
performance: Performance::hundred(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,7 +28,9 @@ pub use nym_sphinx::{
|
||||
};
|
||||
pub use nym_statistics_common::clients::ClientStatsSender;
|
||||
pub use nym_task;
|
||||
pub use nym_topology::{HardcodedTopologyProvider, MixLayer, NymTopology, TopologyProvider};
|
||||
pub use nym_topology::{
|
||||
providers::HardcodedTopologyProvider, MixLayer, NymTopology, TopologyProvider,
|
||||
};
|
||||
pub use nym_validator_client::nym_api::Client as ApiClient;
|
||||
pub use nym_validator_client::{DirectSigningReqwestRpcNyxdClient, QueryReqwestRpcNyxdClient};
|
||||
// TODO: that's a very nasty import path. it should come from contracts instead!
|
||||
|
||||
@@ -10,7 +10,7 @@ use crate::support::caching::cache::SharedCache;
|
||||
use crate::support::legacy_helpers::legacy_host_to_ips_and_hostname;
|
||||
use nym_api_requests::legacy::{LegacyGatewayBondWithId, LegacyMixNodeBondWithLayer};
|
||||
use nym_api_requests::models::{NodeAnnotation, NymNodeDescription};
|
||||
use nym_contracts_common::NaiveFloat;
|
||||
use nym_contracts_common::{NaiveFloat, Percent};
|
||||
use nym_crypto::asymmetric::{encryption, identity};
|
||||
use nym_mixnet_contract_common::{LegacyMixLayer, NodeId};
|
||||
use nym_node_tester_utils::node::{NodeType, TestableNode};
|
||||
@@ -231,6 +231,8 @@ impl PacketPreparer {
|
||||
mixnet_entry: false,
|
||||
mixnet_exit: false,
|
||||
},
|
||||
// We have no information about performance in legacy node formats
|
||||
performance: Percent::hundred(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -263,6 +265,8 @@ impl PacketPreparer {
|
||||
mixnet_entry: true,
|
||||
mixnet_exit: false,
|
||||
},
|
||||
// We have no information about performance in legacy node formats
|
||||
performance: Percent::hundred(),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -26,7 +26,7 @@ use crate::nym_nodes::handlers::unstable::semi_skimmed::nodes_expanded;
|
||||
use crate::nym_nodes::handlers::unstable::skimmed::{
|
||||
entry_gateways_basic_active, entry_gateways_basic_all, exit_gateways_basic_active,
|
||||
exit_gateways_basic_all, mixnodes_basic_active, mixnodes_basic_all, nodes_basic_active,
|
||||
nodes_basic_all,
|
||||
nodes_basic_all, nodes_basic_batch,
|
||||
};
|
||||
use crate::support::http::helpers::PaginationRequest;
|
||||
use crate::support::http::state::AppState;
|
||||
@@ -52,6 +52,7 @@ pub(crate) fn nym_node_routes_unstable() -> Router<AppState> {
|
||||
"/skimmed",
|
||||
Router::new()
|
||||
.route("/", get(nodes_basic_all))
|
||||
.route("/batch", post(nodes_basic_batch))
|
||||
.route("/active", get(nodes_basic_active))
|
||||
.nest(
|
||||
"/mixnodes",
|
||||
|
||||
@@ -25,6 +25,7 @@ use tracing::trace;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
pub type PaginatedSkimmedNodes = AxumResult<Json<PaginatedCachedNodesResponse<SkimmedNode>>>;
|
||||
type SkimmedNodes = AxumResult<Json<CachedNodesResponse<SkimmedNode>>>;
|
||||
|
||||
/// Given all relevant caches, build part of response for JUST Nym Nodes
|
||||
fn build_nym_nodes_response<'a, NI>(
|
||||
@@ -196,7 +197,7 @@ where
|
||||
pub(super) async fn deprecated_gateways_basic(
|
||||
state: State<AppState>,
|
||||
query_params: Query<NodesParams>,
|
||||
) -> AxumResult<Json<CachedNodesResponse<SkimmedNode>>> {
|
||||
) -> SkimmedNodes {
|
||||
// 1. call '/v1/unstable/skimmed/entry-gateways/all'
|
||||
let all_gateways = entry_gateways_basic_all(state, query_params).await?;
|
||||
|
||||
@@ -223,7 +224,7 @@ pub(super) async fn deprecated_gateways_basic(
|
||||
pub(super) async fn deprecated_mixnodes_basic(
|
||||
state: State<AppState>,
|
||||
query_params: Query<NodesParams>,
|
||||
) -> AxumResult<Json<CachedNodesResponse<SkimmedNode>>> {
|
||||
) -> SkimmedNodes {
|
||||
// 1. call '/v1/unstable/nym-nodes/skimmed/mixnodes/active'
|
||||
let active_mixnodes = mixnodes_basic_active(state, query_params).await?;
|
||||
|
||||
@@ -239,7 +240,7 @@ async fn nodes_basic(
|
||||
state: State<AppState>,
|
||||
Query(_query_params): Query<NodesParams>,
|
||||
active_only: bool,
|
||||
) -> PaginatedSkimmedNodes {
|
||||
) -> AxumResult<CachedNodesResponse<SkimmedNode>> {
|
||||
// unfortunately we have to build the response semi-manually here as we need to add two sources of legacy nodes
|
||||
|
||||
// 1. grab all relevant described nym-nodes
|
||||
@@ -281,10 +282,10 @@ async fn nodes_basic(
|
||||
legacy_gateways.timestamp(),
|
||||
]);
|
||||
|
||||
Ok(Json(PaginatedCachedNodesResponse::new_full(
|
||||
Ok(CachedNodesResponse {
|
||||
refreshed_at,
|
||||
nodes,
|
||||
)))
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // not dead, used in OpenAPI docs
|
||||
@@ -326,7 +327,47 @@ pub(super) async fn nodes_basic_all(
|
||||
};
|
||||
}
|
||||
|
||||
nodes_basic(state, Query(query_params.into()), false).await
|
||||
let nodes = nodes_basic(state, Query(query_params.into()), false).await?;
|
||||
// We are never using pagination (always one page) anyways so just build it here.
|
||||
Ok(Json(PaginatedCachedNodesResponse::new_full(
|
||||
nodes.refreshed_at,
|
||||
nodes.nodes,
|
||||
)))
|
||||
}
|
||||
|
||||
/// Post request handler taking a json array of NodeId (u32) values and returning descriptors for
|
||||
/// the provided NodeId values. A successful response will contain descriptors for all nodes
|
||||
/// associated with those node IDs available in the current full topology.
|
||||
///
|
||||
/// If a provided node ID is not present in the current topology there will be no descriptor for
|
||||
/// that node in the response.
|
||||
///
|
||||
/// If no node IDs are provided the response will contain no descriptors.
|
||||
#[utoipa::path(
|
||||
tag = "Unstable Nym Nodes batch by Node ID",
|
||||
get,
|
||||
params(NodesParamsWithRole),
|
||||
path = "batch",
|
||||
context_path = "/v1/unstable/nym-nodes/skimmed",
|
||||
responses(
|
||||
(status = 200, body = PaginatedCachedNodesResponseSchema)
|
||||
)
|
||||
)]
|
||||
pub(super) async fn nodes_basic_batch(
|
||||
state: State<AppState>,
|
||||
Query(query_params): Query<NodesParamsWithRole>,
|
||||
Json(ids): Json<Vec<u32>>,
|
||||
) -> SkimmedNodes {
|
||||
let nodes = nodes_basic(state, Query(query_params.into()), false).await?;
|
||||
let requested_nodes = nodes
|
||||
.nodes
|
||||
.into_iter()
|
||||
.filter(|node| ids.contains(&node.node_id))
|
||||
.collect();
|
||||
Ok(Json(CachedNodesResponse {
|
||||
nodes: requested_nodes,
|
||||
refreshed_at: nodes.refreshed_at,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Return Nym Nodes and optionally legacy mixnodes/gateways (if `no-legacy` flag is not used)
|
||||
@@ -359,7 +400,12 @@ pub(super) async fn nodes_basic_active(
|
||||
};
|
||||
}
|
||||
|
||||
nodes_basic(state, Query(query_params.into()), true).await
|
||||
let nodes = nodes_basic(state, Query(query_params.into()), true).await?;
|
||||
// We are never using pagination (always one page) anyways so just build it here.
|
||||
Ok(Json(PaginatedCachedNodesResponse::new_full(
|
||||
nodes.refreshed_at,
|
||||
nodes.nodes,
|
||||
)))
|
||||
}
|
||||
|
||||
async fn mixnodes_basic(
|
||||
|
||||
@@ -10,7 +10,7 @@ use nym_network_defaults::setup_env;
|
||||
use nym_network_defaults::var_names::NYM_API;
|
||||
use nym_sdk::mixnet::{self, MixnetClient};
|
||||
use nym_sphinx::chunking::monitoring;
|
||||
use nym_topology::{HardcodedTopologyProvider, NymTopology};
|
||||
use nym_topology::{providers::HardcodedTopologyProvider, NymTopology};
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::sync::LazyLock;
|
||||
|
||||
@@ -46,6 +46,7 @@ use nym_node_requests::api::v1::node::models::{AnnouncePorts, NodeDescription};
|
||||
use nym_sphinx_acknowledgements::AckKey;
|
||||
use nym_sphinx_addressing::Recipient;
|
||||
use nym_task::{ShutdownManager, ShutdownToken, TaskClient};
|
||||
use nym_topology::node::Performance;
|
||||
use nym_validator_client::client::NymApiClientExt;
|
||||
use nym_validator_client::models::NodeRefreshBody;
|
||||
use nym_validator_client::{NymApiClient, UserAgent};
|
||||
@@ -579,6 +580,8 @@ impl NymNode {
|
||||
mixnet_entry: true,
|
||||
mixnet_exit: true,
|
||||
},
|
||||
// Perf metrics are not meaningful in this context.
|
||||
performance: Performance::hundred(),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
use nym_sdk::mixnet;
|
||||
use nym_sdk::mixnet::MixnetMessageSender;
|
||||
use nym_topology::provider_trait::{async_trait, TopologyProvider};
|
||||
use nym_topology::providers::{async_trait, TopologyProvider};
|
||||
use nym_topology::NymTopology;
|
||||
use url::Url;
|
||||
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use async_trait::async_trait;
|
||||
use nym_crypto::asymmetric::ed25519::PublicKey;
|
||||
use nym_gateway_requests::SharedSymmetricKey;
|
||||
use nym_sdk::mixnet::{
|
||||
self, ActiveGateway, BadGateway, ClientKeys, EmptyReplyStorage, EphemeralCredentialStorage,
|
||||
GatewayRegistration, GatewaysDetailsStore, KeyStore, MixnetClientStorage, MixnetMessageSender,
|
||||
};
|
||||
use nym_topology::provider_trait::async_trait;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
use nym_sdk::mixnet;
|
||||
use nym_sdk::mixnet::MixnetMessageSender;
|
||||
use nym_topology::{NymTopology, RoutingNode, SupportedRoles};
|
||||
use nym_topology::{node::Performance, NymTopology, RoutingNode, SupportedRoles};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
@@ -30,6 +30,7 @@ async fn main() {
|
||||
mixnet_entry: false,
|
||||
mixnet_exit: false,
|
||||
},
|
||||
performance: Performance::hundred(),
|
||||
},
|
||||
RoutingNode {
|
||||
node_id: 23,
|
||||
@@ -46,6 +47,7 @@ async fn main() {
|
||||
mixnet_entry: false,
|
||||
mixnet_exit: false,
|
||||
},
|
||||
performance: Performance::hundred(),
|
||||
},
|
||||
RoutingNode {
|
||||
node_id: 66,
|
||||
@@ -62,6 +64,7 @@ async fn main() {
|
||||
mixnet_entry: false,
|
||||
mixnet_exit: false,
|
||||
},
|
||||
performance: Performance::hundred(),
|
||||
},
|
||||
];
|
||||
|
||||
|
||||
@@ -83,7 +83,7 @@ pub use nym_statistics_common::clients::{
|
||||
connection::ConnectionStatsEvent, ClientStatsEvents, ClientStatsSender,
|
||||
};
|
||||
pub use nym_task::connections::{LaneQueueLengths, TransmissionLane};
|
||||
pub use nym_topology::{provider_trait::TopologyProvider, NymTopology};
|
||||
pub use nym_topology::{providers::TopologyProvider, NymTopology};
|
||||
pub use paths::StoragePaths;
|
||||
pub use socks5_client::Socks5MixnetClient;
|
||||
pub use traits::MixnetMessageSender;
|
||||
|
||||
@@ -32,7 +32,7 @@ use nym_credentials_interface::TicketType;
|
||||
use nym_crypto::hkdf::DerivationMaterial;
|
||||
use nym_socks5_client_core::config::Socks5;
|
||||
use nym_task::{TaskClient, TaskHandle, TaskStatus};
|
||||
use nym_topology::provider_trait::TopologyProvider;
|
||||
use nym_topology::providers::TopologyProvider;
|
||||
use nym_validator_client::{nyxd, QueryHttpRpcNyxdClient, UserAgent};
|
||||
use rand::rngs::OsRng;
|
||||
use std::path::Path;
|
||||
|
||||
Reference in New Issue
Block a user