Compare commits

..

1 Commits

Author SHA1 Message Date
Bogdan-Ștefan Neacşu 8e992c3c58 Handle ecash network errors differently (#5378) 2025-01-23 10:48:03 +01:00
605 changed files with 8869 additions and 24379 deletions
-1
View File
@@ -1 +0,0 @@
nym-validator-rewarder/.sqlx/** diff=nodiff
-2
View File
@@ -31,5 +31,3 @@ updates:
update-types:
- "patch"
open-pull-requests-limit: 10
assignees:
- "octol"
@@ -79,6 +79,7 @@ jobs:
target/release/nym-socks5-client
target/release/nym-api
target/release/nym-network-requester
target/release/nym-data-observatory
target/release/nym-cli
target/release/nymvisor
target/release/nym-node
@@ -96,6 +97,7 @@ jobs:
cp target/release/nym-socks5-client $OUTPUT_DIR
cp target/release/nym-api $OUTPUT_DIR
cp target/release/nym-network-requester $OUTPUT_DIR
cp target/release/nym-data-observatory $OUTPUT_DIR
cp target/release/nymvisor $OUTPUT_DIR
cp target/release/nym-node $OUTPUT_DIR
cp target/release/nym-cli $OUTPUT_DIR
+8 -10
View File
@@ -8,18 +8,16 @@ on:
- 'explorer-api/**'
- 'gateway/**'
- 'integrations/**'
- 'nym-api/**'
- 'nym-credential-proxy/**'
- 'nym-network-monitor/**'
- 'nym-node/**'
- 'nym-node-status-api/**'
- 'nym-outfox/**'
- 'nym-validator-rewarder/**'
- 'nyx-chain-watcher/**'
- 'sdk/ffi/**'
- 'mixnode/**'
- 'sdk/rust/**'
- 'sdk/lib/**'
- 'service-providers/**'
- 'nym-browser-extension/storage/**'
- 'nym-network-monitor/**'
- 'nym-api/**'
- 'nym-node/**'
- 'nym-outfox/**'
- 'nym-data-observatory/**'
- 'nym-validator-rewarder/**'
- 'tools/**'
- 'wasm/**'
- 'Cargo.toml'
-1
View File
@@ -1,7 +1,6 @@
name: ci-sdk-wasm
on:
workflow_dispatch:
pull_request:
paths:
- 'wasm/**'
-1
View File
@@ -15,7 +15,6 @@ jobs:
runs-on: ${{ matrix.os }}
env:
CARGO_TERM_COLOR: always
IPINFO_API_TOKEN: ${{ secrets.IPINFO_API_TOKEN }}
continue-on-error: true
steps:
- name: Check out repository code
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.1
uses: mikefarah/yq@v4.44.6
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/nym-credential-proxy/Cargo.toml
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.1
uses: mikefarah/yq@v4.44.6
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.1
uses: mikefarah/yq@v4.44.6
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/nym-network-monitor/Cargo.toml
@@ -31,7 +31,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.1
uses: mikefarah/yq@v4.44.6
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.1
uses: mikefarah/yq@v4.44.6
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.1
uses: mikefarah/yq@v4.44.6
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.1
uses: mikefarah/yq@v4.44.6
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.1
uses: mikefarah/yq@v4.44.6
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
-5
View File
@@ -54,8 +54,3 @@ nym-network-monitor/__pycache__
nym-network-monitor/*.key
nym-network-monitor/.envrc
nym-network-monitor/.envrc
nym-api/redocly/formatted-openapi.json
*.sqlite
.build
Generated
+651 -1333
View File
File diff suppressed because it is too large Load Diff
+65 -65
View File
@@ -53,8 +53,8 @@ members = [
"common/execute",
"common/exit-policy",
"common/gateway-requests",
"common/gateway-stats-storage",
"common/gateway-storage",
"common/gateway-stats-storage",
"common/http-api-client",
"common/http-api-common",
"common/inclusion-probability",
@@ -93,7 +93,6 @@ members = [
"common/topology",
"common/tun",
"common/types",
"common/verloc",
"common/wasm/client-core",
"common/wasm/storage",
"common/wasm/utils",
@@ -105,22 +104,6 @@ members = [
"explorer-api/explorer-client",
"gateway",
"integrations/bity",
"nym-api",
"nym-api/nym-api-requests",
"nym-browser-extension/storage",
"nym-credential-proxy/nym-credential-proxy",
"nym-credential-proxy/nym-credential-proxy-requests",
"nym-credential-proxy/vpn-api-lib-wasm",
"nym-network-monitor",
"nym-node",
"nym-node-status-api/nym-node-status-agent",
"nym-node-status-api/nym-node-status-api",
"nym-node-status-api/nym-node-status-client",
"nym-node/nym-node-metrics",
"nym-node/nym-node-requests",
"nym-outfox",
"nym-validator-rewarder",
"nyx-chain-watcher",
"sdk/ffi/cpp",
"sdk/ffi/go",
"sdk/ffi/shared",
@@ -129,16 +112,26 @@ members = [
"service-providers/common",
"service-providers/ip-packet-router",
"service-providers/network-requester",
"nym-api",
"nym-api/nym-api-requests",
"nym-browser-extension/storage",
"nym-credential-proxy/nym-credential-proxy",
"nym-credential-proxy/nym-credential-proxy-requests",
"nym-credential-proxy/vpn-api-lib-wasm",
"nym-data-observatory",
"nym-network-monitor",
"nym-node",
"nym-node/nym-node-requests",
"nym-node/nym-node-metrics",
"nym-node-status-api/nym-node-status-agent",
"nym-node-status-api/nym-node-status-api",
"nym-node-status-api/nym-node-status-client",
"nym-outfox",
"nym-validator-rewarder",
"tools/echo-server",
"tools/echo-server",
"tools/internal/contract-state-importer/importer-cli",
"tools/internal/contract-state-importer/importer-contract",
"tools/internal/mixnet-connectivity-check",
# "tools/internal/sdk-version-bump",
"tools/internal/ssl-inject",
# "tools/internal/sdk-version-bump",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager/dkg-bypass-contract",
"tools/internal/testnet-manager/dkg-bypass-contract",
"tools/nym-cli",
"tools/nym-id-cli",
@@ -150,6 +143,11 @@ members = [
"wasm/mix-fetch",
"wasm/node-tester",
"wasm/zknym-lib",
"tools/echo-server",
"tools/internal/contract-state-importer/importer-cli",
"tools/internal/contract-state-importer/importer-contract",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager/dkg-bypass-contract", "common/verloc", "tools/internal/mixnet-connectivity-check",
]
default-members = [
@@ -158,11 +156,11 @@ default-members = [
"explorer-api",
"nym-api",
"nym-credential-proxy/nym-credential-proxy",
"nym-data-observatory",
"nym-node",
"nym-node-status-api/nym-node-status-agent",
"nym-node-status-api/nym-node-status-api",
"nym-validator-rewarder",
"nyx-chain-watcher",
"service-providers/authenticator",
"service-providers/ip-packet-router",
"service-providers/network-requester",
@@ -173,6 +171,7 @@ exclude = [
"explorer",
"contracts",
"nym-wallet",
"nym-vpn/ui/src-tauri",
"cpu-cycles",
]
@@ -192,10 +191,9 @@ aes = "0.8.1"
aes-gcm = "0.10.1"
aes-gcm-siv = "0.11.1"
aead = "0.5.2"
anyhow = "1.0.95"
arc-swap = "1.7.1"
anyhow = "1.0.90"
argon2 = "0.5.0"
async-trait = "0.1.85"
async-trait = "0.1.83"
axum-client-ip = "0.6.1"
axum = "0.7.5"
axum-extra = "0.9.4"
@@ -204,7 +202,7 @@ bincode = "1.3.3"
bip39 = { version = "2.0.0", features = ["zeroize"] }
bit-vec = "0.7.0" # can we unify those?
bitvec = "1.0.0"
blake3 = "1.5.5"
blake3 = "1.5.4"
bloomfilter = "1.0.14"
bs58 = "0.5.1"
bytecodec = "0.4.15"
@@ -214,20 +212,20 @@ celes = "2.4.0"
cfg-if = "1.0.0"
chacha20 = "0.9.0"
chacha20poly1305 = "0.10.1"
chrono = "0.4.39"
chrono = "0.4.31"
cipher = "0.4.3"
clap = "4.5.26"
clap = "4.5.20"
clap_complete = "4.5"
clap_complete_fig = "4.5"
colored = "2.0"
comfy-table = "7.1.3"
console = "0.15.10"
comfy-table = "7.1.1"
console = "0.15.8"
console-subscriber = "0.1.1"
console_error_panic_hook = "0.1"
const-str = "0.5.6"
const_format = "0.2.34"
criterion = "0.5"
csv = "1.3.1"
const_format = "0.2.33"
criterion = "0.4"
csv = "1.3.0"
ctr = "0.9.1"
cupid = "0.6.1"
curve25519-dalek = "4.1"
@@ -244,8 +242,8 @@ etherparse = "0.13.0"
envy = "0.4"
eyre = "0.6.9"
fastrand = "2.1.1"
flate2 = "1.0.35"
futures = "0.3.31"
flate2 = "1.0.34"
futures = "0.3.28"
futures-util = "0.3"
generic-array = "0.14.7"
getrandom = "0.2.10"
@@ -254,7 +252,6 @@ handlebars = "3.5.5"
headers = "0.4.0"
hex = "0.4.3"
hex-literal = "0.3.3"
hickory-resolver = "0.24.2"
hkdf = "0.12.3"
hmac = "0.12.1"
http = "1"
@@ -265,7 +262,7 @@ humantime-serde = "1.1.1"
human-repr = "1.1.0"
hyper = "1.4.1"
hyper-util = "0.1"
indicatif = "0.17.9"
indicatif = "0.17.8"
inquire = "0.6.2"
ip_network = "0.4.1"
ipnetwork = "0.20"
@@ -290,7 +287,7 @@ parking_lot = "0.12.3"
pem = "0.8"
petgraph = "0.6.5"
pin-project = "1.1"
pin-project-lite = "0.2.16"
pin-project-lite = "0.2.14"
pretty_env_logger = "0.4.0"
publicsuffix = "2.2.3"
quote = "1"
@@ -308,11 +305,11 @@ rocket_cors = "0.6.0"
rocket_okapi = "0.8.0"
safer-ffi = "0.1.13"
schemars = "0.8.21"
semver = "1.0.24"
serde = "1.0.217"
semver = "1.0.23"
serde = "1.0.211"
serde_bytes = "0.11.15"
serde_derive = "1.0"
serde_json = "1.0.135"
serde_json = "1.0.132"
serde_json_path = "0.7.1"
serde_repr = "0.1"
serde_with = "3.9.0"
@@ -325,36 +322,36 @@ strum = "0.26"
strum_macros = "0.26"
subtle-encoding = "0.5"
syn = "1"
sysinfo = "0.33.0"
sysinfo = "0.30.13"
tap = "1.0.1"
tar = "0.4.43"
tempfile = "3.15"
tar = "0.4.42"
tempfile = "3.14"
thiserror = "1.0.64"
time = "0.3.37"
time = "0.3.30"
tokio = "1.39"
tokio-stream = "0.1.17"
tokio-stream = "0.1.16"
tokio-test = "0.4.4"
tokio-tun = "0.11.5"
tokio-tungstenite = { version = "0.20.1" }
tokio-util = "0.7.13"
toml = "0.8.19"
tokio-util = "0.7.12"
toml = "0.8.14"
tower = "0.4.13"
tower-http = "0.5.2"
tracing = "0.1.41"
tracing = "0.1.37"
tracing-opentelemetry = "0.19.0"
tracing-subscriber = "0.3.19"
tracing-subscriber = "0.3.16"
tracing-tree = "0.2.2"
tracing-log = "0.2"
ts-rs = "10.1.0"
ts-rs = "10.0.0"
tungstenite = { version = "0.20.1", default-features = false }
url = "2.5"
utoipa = "5.2"
utoipa-swagger-ui = "8.0"
utoipauto = "0.2"
utoipa = "4.2"
utoipa-swagger-ui = "7.1"
utoipauto = "0.1"
uuid = "*"
vergen = { version = "=8.3.1", default-features = false }
walkdir = "2"
wasm-bindgen-test = "0.3.49"
wasm-bindgen-test = "0.3.43"
x25519-dalek = "2.0.0"
zeroize = "1.6.0"
@@ -387,17 +384,20 @@ cw-controllers = { version = "=1.1.0" }
# cosmrs-related
bip32 = { version = "0.5.2", default-features = false }
cosmrs = { version = "0.21.0" }
tendermint = "0.40.0"
tendermint-rpc = "0.40.0"
prost = { version = "0.13", default-features = false }
# temporarily using a fork again (yay.) because we need staking and slashing support (which are already on main but not released)
# plus response message parsing (which is, as of the time of writing this message, waiting to get merged)
#cosmrs = { path = "../cosmos-rust-fork/cosmos-rust/cosmrs" }
cosmrs = { git = "https://github.com/cosmos/cosmos-rust", rev = "4b1332e6d8258ac845cef71589c8d362a669675a" } # unfortuntely we need a fork by yours truly to get the staking support
tendermint = "0.37.0" # same version as used by cosmrs
tendermint-rpc = "0.37.0" # same version as used by cosmrs
prost = { version = "0.12", default-features = false }
# wasm-related dependencies
gloo-utils = "0.2.0"
gloo-net = "0.6.0"
indexed_db_futures = "0.6.0"
# TODO: migrate to 0.6+
indexed_db_futures = "0.4.2"
js-sys = "0.3.76"
serde-wasm-bindgen = "0.6.5"
tsify = "0.4.5"
-23
View File
@@ -1,23 +0,0 @@
Boost Software License - Version 1.0 - August 17th, 2003
Permission is hereby granted, free of charge, to any person or organization
obtaining a copy of the software and accompanying documentation covered by
this license (the "Software") to use, reproduce, display, distribute,
execute, and transmit the Software, and to prepare derivative works of the
Software, and to permit third-parties to whom the Software is furnished to
do so, all subject to the following:
The copyright notices in the Software and this entire statement, including
the above license grant, this restriction and the following disclaimer,
must be included in all copies of the Software, in whole or in part, and
all derivative works of the Software, unless such copies or derivative
works are solely in the form of machine-executable object code generated by
a source language processor.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
+7 -7
View File
@@ -13,8 +13,8 @@ The platform is composed of multiple Rust crates. Top-level executable binary cr
* `nym-client` - an executable which you can build into your own applications. Use it for interacting with Nym nodes.
* `nym-socks5-client` - a Socks5 proxy you can run on your machine and use with existing applications.
* `nym-explorer` - a (projected) block explorer and (existing) mixnet viewer.
* `nym-wallet` - a desktop wallet implemented using the [Tauri](https://tauri.app)) framework.
* `nym-cli` - a tool for interacting with the network from the CLI.
* `nym-wallet` - a desktop wallet implemented using the [Tauri](https://tauri.studio/en/docs/about/intro) framework.
* `nym-cli` - a tool for interacting with the network from the CLI.
<!-- coming soon
* `nym-network-monitor` - sends packets through the full system to check that they are working as expected, and stores node uptime histories as the basis of a rewards system ("mixmining" or "proof-of-mixing").
-->
@@ -42,10 +42,10 @@ client ───► Gateway ──┘ mix │ mix ┌─►mix ───►
References for developers:
* [Dev Docs](https://nym.com/docs/developers)
* [SDKs](https://nym.com/docs/developers/rust)
* [Network Docs](https://nym.com/docs/network)
* [Release Cycle - git flow](https://nym.com/docs/operators/release-cycle)
* [Dev Docs](https://nymtech.net/docs/developers)
* [SDKs](https://nymtech.net/docs/developers/rust)
* [Network Docs](https://nymtech.net/docs/network)
* [Release Cycle - git flow](https://nymtech.net/docs/operators/release-cycle)
### Developer chat
@@ -66,4 +66,4 @@ As a general approach, licensing is as follows this pattern:
- libraries and components are Apache 2.0 or MIT
- documentation is Apache 2.0 or CC0-1.0
Nym Node Operators and Validators Terms and Conditions can be found [here](https://nym.com/operators-validators-terms).
Nym Node Operators and Validators Temrs and Conditions can be found [here](https://nymtech.net/terms-and-conditions/operators/v1.0.0).
+2 -2
View File
@@ -3,7 +3,7 @@ name = "nym-client-core"
version = "1.1.15"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
edition = "2021"
rust-version = "1.76"
rust-version = "1.70"
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -45,7 +45,7 @@ nym-nonexhaustive-delayqueue = { path = "../nonexhaustive-delayqueue" }
nym-sphinx = { path = "../nymsphinx" }
nym-statistics-common = { path = "../statistics" }
nym-pemstore = { path = "../pemstore" }
nym-topology = { path = "../topology", features = ["persistence"] }
nym-topology = { path = "../topology", features = ["serializable"] }
nym-mixnet-client = { path = "../client-libs/mixnet-client", default-features = false }
nym-validator-client = { path = "../client-libs/validator-client", default-features = false }
nym-task = { path = "../task" }
@@ -550,14 +550,6 @@ pub struct Topology {
/// Specifies a minimum performance of a gateway that is used on route construction.
/// This setting is only applicable when `NymApi` topology is used.
pub minimum_gateway_performance: u8,
/// Specifies whether this client should attempt to retrieve all available network nodes
/// as opposed to just active mixnodes/gateways.
pub use_extended_topology: bool,
/// Specifies whether this client should ignore the current epoch role of the target egress node
/// when constructing the final hop packets.
pub ignore_egress_epoch_role: bool,
}
#[allow(clippy::large_enum_variant)]
@@ -594,8 +586,6 @@ impl Default for Topology {
topology_structure: TopologyStructure::default(),
minimum_mixnode_performance: DEFAULT_MIN_MIXNODE_PERFORMANCE,
minimum_gateway_performance: DEFAULT_MIN_GATEWAY_PERFORMANCE,
use_extended_topology: false,
ignore_egress_epoch_role: false,
}
}
}
@@ -15,7 +15,6 @@ pub mod error;
mod manager;
mod models;
#[derive(Clone)]
pub struct OnDiskGatewaysDetails {
manager: StorageManager,
}
@@ -20,12 +20,12 @@ pub enum InMemStorageError {
MalformedGateway(#[from] BadGateway),
}
#[derive(Clone, Debug, Default)]
#[derive(Debug, Default)]
pub struct InMemGatewaysDetails {
inner: Arc<RwLock<InMemStorageInner>>,
}
#[derive(Clone, Debug, Default)]
#[derive(Debug, Default)]
struct InMemStorageInner {
active_gateway: Option<String>,
gateways: HashMap<String, GatewayRegistration>,
@@ -112,10 +112,10 @@ where
source,
}
})?;
hardcoded_topology.entry_capable_nodes().cloned().collect()
hardcoded_topology.get_gateways()
} else {
let mut rng = rand::thread_rng();
crate::init::helpers::gateways_for_init(
crate::init::helpers::current_gateways(
&mut rng,
&core.client.nym_api_urls,
user_agent,
@@ -128,7 +128,7 @@ where
// make sure the list of available gateways doesn't overlap the list of known gateways
let available_gateways = available_gateways
.into_iter()
.filter(|g| !registered_gateways.contains(&g.identity()))
.filter(|g| !registered_gateways.contains(g.identity()))
.collect::<Vec<_>>();
if available_gateways.is_empty() {
@@ -167,10 +167,10 @@ where
source,
}
})?;
hardcoded_topology.entry_capable_nodes().cloned().collect()
hardcoded_topology.get_gateways()
} else {
let mut rng = rand::thread_rng();
crate::init::helpers::gateways_for_init(
crate::init::helpers::current_gateways(
&mut rng,
&core.client.nym_api_urls,
user_agent,
@@ -3,6 +3,7 @@
use super::received_buffer::ReceivedBufferMessage;
use super::statistics_control::StatisticsControl;
use super::topology_control::geo_aware_provider::GeoAwareTopologyProvider;
use crate::client::base_client::storage::helpers::store_client_keys;
use crate::client::base_client::storage::MixnetClientStorage;
use crate::client::cover_traffic_stream::LoopCoverTrafficStream;
@@ -23,7 +24,7 @@ use crate::client::replies::reply_storage::{
};
use crate::client::topology_control::nym_api_provider::NymApiTopologyProvider;
use crate::client::topology_control::{
TopologyAccessor, TopologyRefresher, TopologyRefresherConfig,
nym_api_provider, TopologyAccessor, TopologyRefresher, TopologyRefresherConfig,
};
use crate::config::{Config, DebugConfig};
use crate::error::ClientCoreError;
@@ -463,8 +464,8 @@ where
details_store
.upgrade_stored_remote_gateway_key(gateway_client.gateway_identity(), &updated_key)
.await.map_err(|err| {
error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
ClientCoreError::GatewaysDetailsStoreError { source: Box::new(err) }
error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
ClientCoreError::GatewaysDetailsStoreError { source: Box::new(err) }
})?
}
@@ -472,7 +473,6 @@ where
.claim_initial_bandwidth()
.await
.map_err(gateway_failure)?;
gateway_client
.start_listening_for_mixnet_messages()
.map_err(gateway_failure)?;
@@ -539,15 +539,15 @@ where
// if no custom provider was ... provided ..., create one using nym-api
custom_provider.unwrap_or_else(|| match config_topology.topology_structure {
config::TopologyStructure::NymApi => Box::new(NymApiTopologyProvider::new(
config_topology,
nym_api_provider::Config {
min_mixnode_performance: config_topology.minimum_mixnode_performance,
min_gateway_performance: config_topology.minimum_gateway_performance,
},
nym_api_urls,
user_agent,
)),
config::TopologyStructure::GeoAware(group_by) => {
warn!("using deprecated 'GeoAware' topology provider - this option will be removed very soon");
#[allow(deprecated)]
Box::new(crate::client::topology_control::GeoAwareTopologyProvider::new(nym_api_urls, group_by))
Box::new(GeoAwareTopologyProvider::new(nym_api_urls, group_by))
}
})
}
@@ -558,7 +558,7 @@ where
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
topology_config: config::Topology,
topology_accessor: TopologyAccessor,
local_gateway: NodeIdentity,
local_gateway: &NodeIdentity,
wait_for_gateway: bool,
mut shutdown: TaskClient,
) -> Result<(), ClientCoreError> {
@@ -590,7 +590,7 @@ where
};
if let Err(err) = topology_refresher
.ensure_contains_routable_egress(local_gateway)
.ensure_contains_gateway(local_gateway)
.await
{
if let Some(waiting_timeout) = gateway_wait_timeout {
@@ -740,8 +740,7 @@ where
// channels responsible for controlling ack messages
let (ack_sender, ack_receiver) = mpsc::unbounded();
let shared_topology_accessor =
TopologyAccessor::new(self.config.debug.topology.ignore_egress_epoch_role);
let shared_topology_accessor = TopologyAccessor::new();
// Shutdown notifier for signalling tasks to stop
let shutdown = self
@@ -63,7 +63,7 @@ pub trait MixnetClientStorage {
fn gateway_details_store(&self) -> &Self::GatewaysDetailsStore;
}
#[derive(Clone, Default)]
#[derive(Default)]
pub struct Ephemeral {
key_store: InMemEphemeralKeys,
reply_store: reply_storage::Empty,
@@ -114,7 +114,6 @@ impl MixnetClientStorage for Ephemeral {
}
}
#[derive(Clone)]
#[cfg(all(
not(target_arch = "wasm32"),
feature = "fs-surb-storage",
@@ -163,7 +163,6 @@ impl LoopCoverTrafficStream<OsRng> {
// poisson delay, but is it really a problem?
let topology_permit = self.topology_access.get_read_permit().await;
// the ack is sent back to ourselves (and then ignored)
let topology_ref = match topology_permit.try_get_valid_topology_ref(
&self.our_full_destination,
Some(&self.our_full_destination),
@@ -28,6 +28,7 @@ pub enum InputMessage {
recipient: Recipient,
data: Vec<u8>,
lane: TransmissionLane,
mix_hops: Option<u8>,
},
/// Creates a message used for a duplex anonymous communication where the recipient
@@ -43,6 +44,7 @@ pub enum InputMessage {
data: Vec<u8>,
reply_surbs: u32,
lane: TransmissionLane,
mix_hops: Option<u8>,
},
/// Attempt to use our internally received and stored `ReplySurb` to send the message back
@@ -92,6 +94,29 @@ impl InputMessage {
recipient,
data,
lane,
mix_hops: None,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
} else {
message
}
}
// IMHO `new_regular` should take `mix_hops: Option<u8>` as an argument instead of creating
// this function, but that would potentially break backwards compatibility with the current API
pub fn new_regular_with_custom_hops(
recipient: Recipient,
data: Vec<u8>,
lane: TransmissionLane,
packet_type: Option<PacketType>,
mix_hops: Option<u8>,
) -> Self {
let message = InputMessage::Regular {
recipient,
data,
lane,
mix_hops,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -112,6 +137,7 @@ impl InputMessage {
data,
reply_surbs,
lane,
mix_hops: None,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -128,12 +154,14 @@ impl InputMessage {
reply_surbs: u32,
lane: TransmissionLane,
packet_type: Option<PacketType>,
mix_hops: Option<u8>,
) -> Self {
let message = InputMessage::Anonymous {
recipient,
data,
reply_surbs,
lane,
mix_hops,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -4,7 +4,6 @@
use crate::client::key_manager::ClientKeys;
use async_trait::async_trait;
use std::error::Error;
use std::sync::Arc;
use tokio::sync::Mutex;
#[cfg(not(target_arch = "wasm32"))]
@@ -65,7 +64,6 @@ pub enum OnDiskKeysError {
},
}
#[derive(Clone)]
#[cfg(not(target_arch = "wasm32"))]
pub struct OnDiskKeys {
paths: ClientKeysPaths,
@@ -195,9 +193,9 @@ impl KeyStore for OnDiskKeys {
}
}
#[derive(Clone, Default)]
#[derive(Default)]
pub struct InMemEphemeralKeys {
keys: Arc<Mutex<Option<ClientKeys>>>,
keys: Mutex<Option<ClientKeys>>,
}
#[derive(Debug, thiserror::Error)]
@@ -73,10 +73,11 @@ where
content: Vec<u8>,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) {
if let Err(err) = self
.message_handler
.try_send_plain_message(recipient, content, lane, packet_type)
.try_send_plain_message(recipient, content, lane, packet_type, mix_hops)
.await
{
warn!("failed to send a plain message - {err}")
@@ -90,10 +91,18 @@ where
reply_surbs: u32,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) {
if let Err(err) = self
.message_handler
.try_send_message_with_reply_surbs(recipient, content, reply_surbs, lane, packet_type)
.try_send_message_with_reply_surbs(
recipient,
content,
reply_surbs,
lane,
packet_type,
mix_hops,
)
.await
{
warn!("failed to send a repliable message - {err}")
@@ -106,8 +115,9 @@ where
recipient,
data,
lane,
mix_hops,
} => {
self.handle_plain_message(recipient, data, lane, PacketType::Mix)
self.handle_plain_message(recipient, data, lane, PacketType::Mix, mix_hops)
.await
}
InputMessage::Anonymous {
@@ -115,9 +125,17 @@ where
data,
reply_surbs,
lane,
mix_hops,
} => {
self.handle_repliable_message(recipient, data, reply_surbs, lane, PacketType::Mix)
.await
self.handle_repliable_message(
recipient,
data,
reply_surbs,
lane,
PacketType::Mix,
mix_hops,
)
.await
}
InputMessage::Reply {
recipient_tag,
@@ -135,8 +153,9 @@ where
recipient,
data,
lane,
mix_hops,
} => {
self.handle_plain_message(recipient, data, lane, packet_type)
self.handle_plain_message(recipient, data, lane, packet_type, mix_hops)
.await
}
InputMessage::Anonymous {
@@ -144,9 +163,17 @@ where
data,
reply_surbs,
lane,
mix_hops,
} => {
self.handle_repliable_message(recipient, data, reply_surbs, lane, packet_type)
.await
self.handle_repliable_message(
recipient,
data,
reply_surbs,
lane,
packet_type,
mix_hops,
)
.await
}
InputMessage::Reply {
recipient_tag,
@@ -70,6 +70,7 @@ pub(crate) struct PendingAcknowledgement {
message_chunk: Fragment,
delay: SphinxDelay,
destination: PacketDestination,
mix_hops: Option<u8>,
retransmissions: u32,
}
@@ -79,11 +80,13 @@ impl PendingAcknowledgement {
message_chunk: Fragment,
delay: SphinxDelay,
recipient: Recipient,
mix_hops: Option<u8>,
) -> Self {
PendingAcknowledgement {
message_chunk,
delay,
destination: PacketDestination::KnownRecipient(recipient.into()),
mix_hops,
retransmissions: 0,
}
}
@@ -101,6 +104,9 @@ impl PendingAcknowledgement {
recipient_tag,
extra_surb_request,
},
// Messages sent using SURBs are using the number of mix hops set by the recipient when
// they provided the SURBs, so it doesn't make sense to include it here.
mix_hops: None,
retransmissions: 0,
}
}
@@ -52,12 +52,18 @@ where
packet_recipient: Recipient,
chunk_data: Fragment,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<PreparedFragment, PreparationError> {
debug!("retransmitting normal packet...");
// TODO: Figure out retransmission packet type signaling
self.message_handler
.try_prepare_single_chunk_for_sending(packet_recipient, chunk_data, packet_type)
.try_prepare_single_chunk_for_sending(
packet_recipient,
chunk_data,
packet_type,
mix_hops,
)
.await
}
@@ -104,6 +110,7 @@ where
**recipient,
timed_out_ack.message_chunk.clone(),
packet_type,
timed_out_ack.mix_hops,
)
.await
}
@@ -15,11 +15,11 @@ use nym_sphinx::anonymous_replies::requests::{AnonymousSenderTag, RepliableMessa
use nym_sphinx::anonymous_replies::{ReplySurb, SurbEncryptionKey};
use nym_sphinx::chunking::fragment::{Fragment, FragmentIdentifier};
use nym_sphinx::message::NymMessage;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::params::{PacketSize, PacketType, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx::preparer::{MessagePreparer, PreparedFragment};
use nym_sphinx::Delay;
use nym_task::connections::TransmissionLane;
use nym_topology::{NymRouteProvider, NymTopologyError};
use nym_topology::{NymTopology, NymTopologyError};
use rand::{CryptoRng, Rng};
use std::collections::HashMap;
use std::sync::Arc;
@@ -100,6 +100,10 @@ pub(crate) struct Config {
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
average_ack_delay: Duration,
/// Number of mix hops each packet ('real' message, ack, reply) is expected to take.
/// Note that it does not include gateway hops.
num_mix_hops: u8,
/// Primary predefined packet size used for the encapsulated messages.
primary_packet_size: PacketSize,
@@ -121,11 +125,19 @@ impl Config {
deterministic_route_selection,
average_packet_delay,
average_ack_delay,
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
primary_packet_size: PacketSize::default(),
secondary_packet_size: None,
}
}
/// Allows setting non-default number of expected mix hops in the network.
#[allow(dead_code)]
pub fn with_mix_hops(mut self, hops: u8) -> Self {
self.num_mix_hops = hops;
self
}
/// Allows setting non-default size of the sphinx packets sent out.
pub fn with_custom_primary_packet_size(mut self, packet_size: PacketSize) -> Self {
self.primary_packet_size = packet_size;
@@ -173,7 +185,9 @@ where
config.sender_address,
config.average_packet_delay,
config.average_ack_delay,
);
)
.with_mix_hops(config.num_mix_hops);
MessageHandler {
config,
rng,
@@ -202,7 +216,7 @@ where
fn get_topology<'a>(
&self,
permit: &'a TopologyReadPermit<'a>,
) -> Result<&'a NymRouteProvider, PreparationError> {
) -> Result<&'a NymTopology, PreparationError> {
match permit.try_get_valid_topology_ref(&self.config.sender_address, None) {
Ok(topology_ref) => Ok(topology_ref),
Err(err) => {
@@ -219,8 +233,9 @@ where
return self.config.primary_packet_size;
};
let primary_count = msg.required_packets(self.config.primary_packet_size);
let secondary_count = msg.required_packets(secondary_packet);
let primary_count =
msg.required_packets(self.config.primary_packet_size, self.config.num_mix_hops);
let secondary_count = msg.required_packets(secondary_packet, self.config.num_mix_hops);
trace!("This message would require: {primary_count} primary packets or {secondary_count} secondary packets...");
// if there would be no benefit in using the secondary packet - use the primary (duh)
@@ -409,9 +424,10 @@ where
message: Vec<u8>,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), PreparationError> {
let message = NymMessage::new_plain(message);
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type)
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type, mix_hops)
.await
}
@@ -421,6 +437,7 @@ where
recipient: Recipient,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), PreparationError> {
debug!("Sending non-reply message with packet type {packet_type}");
// TODO: I really dislike existence of this assertion, it implies code has to be re-organised
@@ -453,6 +470,7 @@ where
&self.config.ack_key,
&recipient,
packet_type,
mix_hops,
)?;
let real_message = RealMessage::new(
@@ -460,7 +478,8 @@ where
Some(fragment.fragment_identifier()),
);
let delay = prepared_fragment.total_delay;
let pending_ack = PendingAcknowledgement::new_known(fragment, delay, recipient);
let pending_ack =
PendingAcknowledgement::new_known(fragment, delay, recipient, mix_hops);
real_messages.push(real_message);
pending_acks.push(pending_ack);
@@ -477,6 +496,7 @@ where
recipient: Recipient,
amount: u32,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), PreparationError> {
debug!("Sending additional reply SURBs with packet type {packet_type}");
let sender_tag = self.get_or_create_sender_tag(&recipient);
@@ -493,6 +513,7 @@ where
recipient,
TransmissionLane::AdditionalReplySurbs,
packet_type,
mix_hops,
)
.await?;
@@ -509,6 +530,7 @@ where
num_reply_surbs: u32,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), SurbWrappedPreparationError> {
debug!("Sending message with reply SURBs with packet type {packet_type}");
let sender_tag = self.get_or_create_sender_tag(&recipient);
@@ -519,7 +541,7 @@ where
let message =
NymMessage::new_repliable(RepliableMessage::new_data(message, sender_tag, reply_surbs));
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type)
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type, mix_hops)
.await?;
log::trace!("storing {} reply keys", reply_keys.len());
@@ -533,18 +555,23 @@ where
recipient: Recipient,
chunk: Fragment,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<PreparedFragment, PreparationError> {
debug!("Sending single chunk with packet type {packet_type}");
let topology_permit = self.topology_access.get_read_permit().await;
let topology = self.get_topology(&topology_permit)?;
let prepared_fragment = self.message_preparer.prepare_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
&recipient,
packet_type,
)?;
let prepared_fragment = self
.message_preparer
.prepare_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
&recipient,
packet_type,
mix_hops,
)
.unwrap();
Ok(prepared_fragment)
}
@@ -597,13 +624,16 @@ where
Err(err) => return Err(err.return_surbs(vec![reply_surb])),
};
let prepared_fragment = self.message_preparer.prepare_reply_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
reply_surb,
PacketType::Mix,
)?;
let prepared_fragment = self
.message_preparer
.prepare_reply_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
reply_surb,
PacketType::Mix,
)
.unwrap();
Ok(prepared_fragment)
}
@@ -9,12 +9,10 @@ use self::{
acknowledgement_control::AcknowledgementController, real_traffic_stream::OutQueueControl,
};
use crate::client::real_messages_control::message_handler::MessageHandler;
use crate::client::replies::reply_controller;
use crate::client::replies::reply_controller::{
ReplyController, ReplyControllerReceiver, ReplyControllerSender,
};
use crate::client::replies::reply_storage::CombinedReplyStorage;
use crate::config;
use crate::{
client::{
inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender,
@@ -29,13 +27,16 @@ use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::params::PacketType;
use nym_statistics_common::clients::ClientStatsSender;
use nym_task::connections::{ConnectionCommandReceiver, LaneQueueLengths};
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::sync::Arc;
use crate::client::replies::reply_controller;
use crate::config;
pub(crate) use acknowledgement_control::{AckActionSender, Action};
use nym_statistics_common::clients::ClientStatsSender;
pub(crate) mod acknowledgement_control;
pub(crate) mod message_handler;
pub(crate) mod real_traffic_stream;
@@ -230,7 +230,6 @@ where
// poisson delay, but is it really a problem?
let topology_permit = self.topology_access.get_read_permit().await;
// the ack is sent back to ourselves (and then ignored)
let topology_ref = match topology_permit.try_get_valid_topology_ref(
&self.config.our_full_destination,
Some(&self.config.our_full_destination),
@@ -70,10 +70,7 @@ impl SendingDelayController {
lower_bound,
multiplier_elevated_counter: 0,
time_when_logged_about_elevated_multiplier: now
.checked_sub(Duration::from_secs(
INTERVAL_BETWEEN_WARNING_ABOUT_ELEVATED_MULTIPLIER_SECS,
))
.unwrap_or(now),
- Duration::from_secs(INTERVAL_BETWEEN_WARNING_ABOUT_ELEVATED_MULTIPLIER_SECS),
time_when_changed: now,
time_when_backpressure_detected: now,
}
@@ -516,6 +516,7 @@ where
recipient,
to_send,
nym_sphinx::params::PacketType::Mix,
self.config.reply_surbs.surb_mix_hops,
)
.await
{
@@ -16,14 +16,14 @@
#![warn(clippy::todo)]
#![warn(clippy::dbg_macro)]
use futures::StreamExt;
use std::time::Duration;
use nym_client_core_config_types::StatsReporting;
use nym_sphinx::addressing::Recipient;
use nym_statistics_common::clients::{
ClientStatsController, ClientStatsReceiver, ClientStatsSender,
};
use nym_task::connections::TransmissionLane;
use std::time::Duration;
use crate::{
client::inbound_messages::{InputMessage, InputMessageSender},
@@ -94,32 +94,10 @@ impl StatisticsControl {
async fn run_with_shutdown(&mut self, mut task_client: nym_task::TaskClient) {
log::debug!("Started StatisticsControl with graceful shutdown support");
#[cfg(not(target_arch = "wasm32"))]
let mut stats_report_interval = tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(self.reporting_config.reporting_interval),
);
#[cfg(not(target_arch = "wasm32"))]
let mut local_report_interval = tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(LOCAL_REPORT_INTERVAL),
);
#[cfg(not(target_arch = "wasm32"))]
let mut snapshot_interval =
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(SNAPSHOT_INTERVAL));
#[cfg(target_arch = "wasm32")]
let mut stats_report_interval = gloo_timers::future::IntervalStream::new(
self.reporting_config.reporting_interval.as_millis() as u32,
);
#[cfg(target_arch = "wasm32")]
let mut local_report_interval =
gloo_timers::future::IntervalStream::new(LOCAL_REPORT_INTERVAL.as_millis() as u32);
#[cfg(target_arch = "wasm32")]
let mut snapshot_interval =
gloo_timers::future::IntervalStream::new(SNAPSHOT_INTERVAL.as_millis() as u32);
let mut stats_report_interval =
tokio::time::interval(self.reporting_config.reporting_interval);
let mut local_report_interval = tokio::time::interval(LOCAL_REPORT_INTERVAL);
let mut snapshot_interval = tokio::time::interval(SNAPSHOT_INTERVAL);
loop {
tokio::select! {
@@ -130,20 +108,16 @@ impl StatisticsControl {
break;
}
},
_ = snapshot_interval.next() => {
_ = snapshot_interval.tick() => {
self.stats.snapshot();
}
_ = stats_report_interval.next() => {
let Some(recipient) = self.reporting_config.provider_address else {
continue
};
if self.reporting_config.enabled {
self.report_stats(recipient).await;
}
_ = stats_report_interval.tick(), if self.reporting_config.enabled && self.reporting_config.provider_address.is_some() => {
// SAFTEY : this branch executes only if reporting is not none, so unwrapp is fine
#[allow(clippy::unwrap_used)]
self.report_stats(self.reporting_config.provider_address.unwrap()).await;
}
_ = local_report_interval.next() => {
_ = local_report_interval.tick() => {
self.stats.local_report(&mut task_client);
}
_ = task_client.recv_with_delay() => {
@@ -2,7 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
use nym_sphinx::addressing::clients::Recipient;
use nym_topology::{NymRouteProvider, NymTopology, NymTopologyError};
use nym_sphinx::params::DEFAULT_NUM_MIX_HOPS;
use nym_topology::{NymTopology, NymTopologyError};
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@@ -16,36 +17,29 @@ pub struct TopologyAccessorInner {
// few seconds, while reads are needed every single packet generated.
// However, proper benchmarks will be needed to determine if `RwLock` is indeed a better
// approach than a `Mutex`
topology: RwLock<NymRouteProvider>,
topology: RwLock<Option<NymTopology>>,
}
impl TopologyAccessorInner {
fn new(initial: NymRouteProvider) -> Self {
fn new() -> Self {
TopologyAccessorInner {
controlled_manually: AtomicBool::new(false),
released_manual_control: Notify::new(),
topology: RwLock::new(initial),
topology: RwLock::new(None),
}
}
async fn update(&self, new: Option<NymTopology>) {
let mut guard = self.topology.write().await;
match new {
Some(updated) => {
guard.update(updated);
}
None => guard.clear_topology(),
}
*self.topology.write().await = new;
}
}
pub struct TopologyReadPermit<'a> {
permit: RwLockReadGuard<'a, NymRouteProvider>,
permit: RwLockReadGuard<'a, Option<NymTopology>>,
}
impl Deref for TopologyReadPermit<'_> {
type Target = NymRouteProvider;
type Target = Option<NymTopology>;
fn deref(&self) -> &Self::Target {
&self.permit
@@ -59,31 +53,43 @@ impl<'a> TopologyReadPermit<'a> {
&'a self,
ack_recipient: &Recipient,
packet_recipient: Option<&Recipient>,
) -> Result<&'a NymRouteProvider, NymTopologyError> {
let route_provider = self.permit.deref();
let topology = &route_provider.topology;
) -> Result<&'a NymTopology, NymTopologyError> {
// 1. Have we managed to get anything from the refresher, i.e. have the nym-api queries gone through?
topology.ensure_not_empty()?;
let topology = self
.permit
.as_ref()
.ok_or(NymTopologyError::EmptyNetworkTopology)?;
// 2. does the topology have a node on each mixing layer?
topology.ensure_minimally_routable()?;
// 2. does it have any mixnode at all?
// 3. does it have any gateways at all?
// 4. does it have a mixnode on each layer?
topology.ensure_can_construct_path_through(DEFAULT_NUM_MIX_HOPS)?;
// 3. does it contain OUR gateway (so that we could create an ack packet)?
let _ = route_provider.egress_by_identity(ack_recipient.gateway())?;
// 4. for our target recipient, does it contain THEIR gateway (so that we send anything over?)
if let Some(recipient) = packet_recipient {
let _ = route_provider.egress_by_identity(recipient.gateway())?;
// 5. does it contain OUR gateway (so that we could create an ack packet)?
if !topology.gateway_exists(ack_recipient.gateway()) {
return Err(NymTopologyError::NonExistentGatewayError {
identity_key: ack_recipient.gateway().to_base58_string(),
});
}
Ok(route_provider)
// 6. for our target recipient, does it contain THEIR gateway (so that we could create
if let Some(recipient) = packet_recipient {
if !topology.gateway_exists(recipient.gateway()) {
return Err(NymTopologyError::NonExistentGatewayError {
identity_key: recipient.gateway().to_base58_string(),
});
}
}
Ok(topology)
}
}
impl<'a> From<RwLockReadGuard<'a, NymRouteProvider>> for TopologyReadPermit<'a> {
fn from(permit: RwLockReadGuard<'a, NymRouteProvider>) -> Self {
TopologyReadPermit { permit }
impl<'a> From<RwLockReadGuard<'a, Option<NymTopology>>> for TopologyReadPermit<'a> {
fn from(read_permit: RwLockReadGuard<'a, Option<NymTopology>>) -> Self {
TopologyReadPermit {
permit: read_permit,
}
}
}
@@ -93,11 +99,9 @@ pub struct TopologyAccessor {
}
impl TopologyAccessor {
pub fn new(ignore_egress_epoch_roles: bool) -> Self {
pub fn new() -> Self {
TopologyAccessor {
inner: Arc::new(TopologyAccessorInner::new(NymRouteProvider::new_empty(
ignore_egress_epoch_roles,
))),
inner: Arc::new(TopologyAccessorInner::new()),
}
}
@@ -117,21 +121,8 @@ impl TopologyAccessor {
self.inner.released_manual_control.notified().await
}
#[deprecated(note = "use .current_route_provider instead")]
pub async fn current_topology(&self) -> Option<NymTopology> {
self.current_route_provider()
.await
.as_ref()
.map(|p| p.topology.clone())
}
pub async fn current_route_provider(&self) -> Option<RwLockReadGuard<NymRouteProvider>> {
let provider = self.inner.topology.read().await;
if provider.topology.is_empty() {
None
} else {
Some(provider)
}
self.inner.topology.read().await.clone()
}
pub async fn manually_change_topology(&self, new_topology: NymTopology) {
@@ -149,11 +140,15 @@ impl TopologyAccessor {
// only used by the client at startup to get a slightly more reasonable error message
// (currently displays as unused because health checker is disabled due to required changes)
pub async fn ensure_is_routable(&self) -> Result<(), NymTopologyError> {
self.inner
.topology
.read()
.await
.topology
.ensure_minimally_routable()
match self.inner.topology.read().await.deref() {
None => Err(NymTopologyError::EmptyNetworkTopology),
Some(ref topology) => topology.ensure_can_construct_path_through(DEFAULT_NUM_MIX_HOPS),
}
}
}
impl Default for TopologyAccessor {
fn default() -> Self {
TopologyAccessor::new()
}
}
@@ -3,6 +3,7 @@ use log::{debug, error};
use nym_explorer_client::{ExplorerClient, PrettyDetailedMixNodeBond};
use nym_network_defaults::var_names::EXPLORER_API;
use nym_topology::{
nym_topology_from_basic_info,
provider_trait::{async_trait, TopologyProvider},
NymTopology,
};
@@ -14,6 +15,8 @@ use url::Url;
pub use nym_country_group::CountryGroup;
const MIN_NODES_PER_LAYER: usize = 1;
fn create_explorer_client() -> Option<ExplorerClient> {
let Ok(explorer_api_url) = std::env::var(EXPLORER_API) else {
error!("Missing EXPLORER_API");
@@ -60,20 +63,30 @@ fn log_mixnode_distribution(mixnodes: &HashMap<CountryGroup, Vec<NodeId>>) {
}
fn check_layer_integrity(topology: NymTopology) -> Result<(), ()> {
if topology.ensure_minimally_routable().is_err() {
let mixes = topology.mixes();
if mixes.keys().len() < 3 {
error!("Layer is missing in topology!");
return Err(());
}
for (layer, mixnodes) in mixes {
debug!("Layer {:?} has {} mixnodes", layer, mixnodes.len());
if mixnodes.len() < MIN_NODES_PER_LAYER {
error!(
"There are only {} mixnodes in layer {:?}",
mixnodes.len(),
layer
);
return Err(());
}
}
Ok(())
}
#[deprecated(note = "use NymApiTopologyProvider instead as explorer API will soon be removed")]
pub struct GeoAwareTopologyProvider {
validator_client: nym_validator_client::client::NymApiClient,
filter_on: GroupBy,
}
#[allow(deprecated)]
impl GeoAwareTopologyProvider {
pub fn new(mut nym_api_urls: Vec<Url>, filter_on: GroupBy) -> GeoAwareTopologyProvider {
log::info!(
@@ -91,15 +104,6 @@ impl GeoAwareTopologyProvider {
}
async fn get_topology(&self) -> Option<NymTopology> {
let rewarded_set = self
.validator_client
.get_current_rewarded_set()
.await
.inspect_err(|err| error!("failed to get current rewarded set: {err}"))
.ok()?;
let mut topology = NymTopology::new_empty(rewarded_set);
let mixnodes = match self
.validator_client
.get_all_basic_active_mixing_assigned_nodes()
@@ -183,8 +187,7 @@ impl GeoAwareTopologyProvider {
.filter(|m| filtered_mixnode_ids.contains(&m.node_id))
.collect::<Vec<_>>();
topology.add_skimmed_nodes(&mixnodes);
topology.add_skimmed_nodes(&gateways);
let topology = nym_topology_from_basic_info(&mixnodes, &gateways);
// TODO: return real error type
check_layer_integrity(topology.clone()).ok()?;
@@ -193,7 +196,6 @@ impl GeoAwareTopologyProvider {
}
}
#[allow(deprecated)]
#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
impl TopologyProvider for GeoAwareTopologyProvider {
@@ -203,7 +205,6 @@ impl TopologyProvider for GeoAwareTopologyProvider {
}
}
#[allow(deprecated)]
#[cfg(target_arch = "wasm32")]
#[async_trait(?Send)]
impl TopologyProvider for GeoAwareTopologyProvider {
@@ -19,7 +19,6 @@ mod accessor;
pub mod geo_aware_provider;
pub mod nym_api_provider;
#[allow(deprecated)]
pub use geo_aware_provider::GeoAwareTopologyProvider;
pub use nym_api_provider::{Config as NymApiTopologyProviderConfig, NymApiTopologyProvider};
pub use nym_topology::provider_trait::TopologyProvider;
@@ -28,7 +27,7 @@ pub use nym_topology::provider_trait::TopologyProvider;
const MAX_FAILURE_COUNT: usize = 10;
pub struct TopologyRefresherConfig {
pub refresh_rate: Duration,
refresh_rate: Duration,
}
impl TopologyRefresherConfig {
@@ -97,24 +96,28 @@ impl TopologyRefresher {
self.topology_accessor.ensure_is_routable().await
}
pub async fn ensure_contains_routable_egress(
pub async fn ensure_contains_gateway(
&self,
egress: NodeIdentity,
gateway: &NodeIdentity,
) -> Result<(), NymTopologyError> {
let topology = self
.topology_accessor
.current_route_provider()
.current_topology()
.await
.ok_or(NymTopologyError::EmptyNetworkTopology)?;
let _ = topology.egress_by_identity(egress)?;
if !topology.gateway_exists(gateway) {
return Err(NymTopologyError::NonExistentGatewayError {
identity_key: gateway.to_base58_string(),
});
}
Ok(())
}
pub async fn wait_for_gateway(
&mut self,
gateway: NodeIdentity,
gateway: &NodeIdentity,
timeout_duration: Duration,
) -> Result<(), NymTopologyError> {
info!(
@@ -132,7 +135,7 @@ impl TopologyRefresher {
})
}
_ = self.try_refresh() => {
if self.ensure_contains_routable_egress(gateway).await.is_ok() {
if self.ensure_contains_gateway(gateway).await.is_ok() {
return Ok(())
}
info!("gateway '{gateway}' is still not online...");
@@ -4,39 +4,32 @@
use async_trait::async_trait;
use log::{debug, error, warn};
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::NymTopology;
use nym_topology::{NymTopology, NymTopologyError};
use nym_validator_client::UserAgent;
use rand::prelude::SliceRandom;
use rand::thread_rng;
use std::cmp::min;
use url::Url;
// the same values as our current (10.06.24) blacklist
pub const DEFAULT_MIN_MIXNODE_PERFORMANCE: u8 = 50;
pub const DEFAULT_MIN_GATEWAY_PERFORMANCE: u8 = 50;
#[derive(Debug)]
pub struct Config {
pub min_mixnode_performance: u8,
pub min_gateway_performance: u8,
pub use_extended_topology: bool,
pub ignore_egress_epoch_role: bool,
}
impl From<nym_client_core_config_types::Topology> for Config {
fn from(value: nym_client_core_config_types::Topology) -> Self {
impl Default for Config {
fn default() -> Self {
// old values that decided on blacklist membership
Config {
min_mixnode_performance: value.minimum_mixnode_performance,
min_gateway_performance: value.minimum_gateway_performance,
use_extended_topology: value.use_extended_topology,
ignore_egress_epoch_role: value.ignore_egress_epoch_role,
min_mixnode_performance: DEFAULT_MIN_MIXNODE_PERFORMANCE,
min_gateway_performance: DEFAULT_MIN_GATEWAY_PERFORMANCE,
}
}
}
impl Config {
// if we're using 'extended' topology, filter the nodes based on the lowest set performance
fn min_node_performance(&self) -> u8 {
min(self.min_mixnode_performance, self.min_gateway_performance)
}
}
pub struct NymApiTopologyProvider {
config: Config,
@@ -46,11 +39,7 @@ pub struct NymApiTopologyProvider {
}
impl NymApiTopologyProvider {
pub fn new(
config: impl Into<Config>,
mut nym_api_urls: Vec<Url>,
user_agent: Option<UserAgent>,
) -> Self {
pub fn new(config: Config, mut nym_api_urls: Vec<Url>, user_agent: Option<UserAgent>) -> Self {
nym_api_urls.shuffle(&mut thread_rng());
let validator_client = if let Some(user_agent) = user_agent {
@@ -63,7 +52,7 @@ impl NymApiTopologyProvider {
};
NymApiTopologyProvider {
config: config.into(),
config,
validator_client,
nym_api_urls,
currently_used_api: 0,
@@ -81,69 +70,70 @@ impl NymApiTopologyProvider {
.change_nym_api(self.nym_api_urls[self.currently_used_api].clone())
}
/// Verifies whether nodes a reasonably distributed among all mix layers.
///
/// In ideal world we would have 33% nodes on layer 1, 33% on layer 2 and 33% on layer 3.
/// However, this is a rather unrealistic expectation, instead we check whether there exists
/// a layer with more than 66% of nodes or with fewer than 15% and if so, we trigger a failure.
///
/// # Arguments
///
/// * `topology`: active topology constructed from validator api data
fn check_layer_distribution(
&self,
active_topology: &NymTopology,
) -> Result<(), NymTopologyError> {
let lower_threshold = 0.15;
let upper_threshold = 0.66;
active_topology.ensure_even_layer_distribution(lower_threshold, upper_threshold)
}
async fn get_current_compatible_topology(&mut self) -> Option<NymTopology> {
let rewarded_set = self
let mixnodes = match self
.validator_client
.get_current_rewarded_set()
.get_all_basic_active_mixing_assigned_nodes()
.await
.inspect_err(|err| error!("failed to get current rewarded set: {err}"))
.ok()?;
let mut topology = NymTopology::new_empty(rewarded_set);
if self.config.use_extended_topology {
let all_nodes = self
.validator_client
.get_all_basic_nodes()
.await
.inspect_err(|err| error!("failed to get network nodes: {err}"))
.ok()?;
debug!(
"there are {} nodes on the network (before filtering)",
all_nodes.len()
);
topology.add_additional_nodes(all_nodes.iter().filter(|n| {
n.performance.round_to_integer() >= self.config.min_node_performance()
}));
} else {
// if we're not using extended topology, we're only getting active set mixnodes and gateways
let mixnodes = self
.validator_client
.get_all_basic_active_mixing_assigned_nodes()
.await
.inspect_err(|err| error!("failed to get network mixnodes: {err}"))
.ok()?;
// TODO: we really should be getting ACTIVE gateways only
let gateways = self
.validator_client
.get_all_basic_entry_assigned_nodes()
.await
.inspect_err(|err| error!("failed to get network gateways: {err}"))
.ok()?;
debug!(
"there are {} mixnodes and {} gateways in total (before performance filtering)",
mixnodes.len(),
gateways.len()
);
topology.add_additional_nodes(mixnodes.iter().filter(|m| {
m.performance.round_to_integer() >= self.config.min_mixnode_performance
}));
topology.add_additional_nodes(gateways.iter().filter(|m| {
m.performance.round_to_integer() >= self.config.min_gateway_performance
}));
{
Err(err) => {
error!("failed to get network mixnodes - {err}");
return None;
}
Ok(mixes) => mixes,
};
if !topology.is_minimally_routable() {
error!("the current filtered active topology can't be used to construct any packets");
return None;
}
let gateways = match self
.validator_client
.get_all_basic_entry_assigned_nodes()
.await
{
Err(err) => {
error!("failed to get network gateways - {err}");
return None;
}
Ok(gateways) => gateways,
};
Some(topology)
debug!(
"there are {} mixnodes and {} gateways in total (before performance filtering)",
mixnodes.len(),
gateways.len()
);
let topology = NymTopology::from_unordered(
mixnodes.iter().filter(|m| {
m.performance.round_to_integer() >= self.config.min_mixnode_performance
}),
gateways.iter().filter(|g| {
g.performance.round_to_integer() >= self.config.min_gateway_performance
}),
);
if let Err(err) = self.check_layer_distribution(&topology) {
warn!("The current filtered active topology has extremely skewed layer distribution. It cannot be used: {err}");
self.use_next_nym_api();
None
} else {
Some(topology)
}
}
}
@@ -152,11 +142,7 @@ impl NymApiTopologyProvider {
#[async_trait]
impl TopologyProvider for NymApiTopologyProvider {
async fn get_new_topology(&mut self) -> Option<NymTopology> {
let Some(topology) = self.get_current_compatible_topology().await else {
self.use_next_nym_api();
return None;
};
Some(topology)
self.get_current_compatible_topology().await
}
}
@@ -164,10 +150,6 @@ impl TopologyProvider for NymApiTopologyProvider {
#[async_trait(?Send)]
impl TopologyProvider for NymApiTopologyProvider {
async fn get_new_topology(&mut self) -> Option<NymTopology> {
let Some(topology) = self.get_current_compatible_topology().await else {
self.use_next_nym_api();
return None;
};
Some(topology)
self.get_current_compatible_topology().await
}
}
+4 -7
View File
@@ -4,8 +4,8 @@
use crate::client::mix_traffic::transceiver::ErasedGatewayError;
use nym_crypto::asymmetric::identity::Ed25519RecoveryError;
use nym_gateway_client::error::GatewayClientError;
use nym_topology::node::RoutingNodeError;
use nym_topology::{NodeId, NymTopologyError};
use nym_topology::gateway::GatewayConversionError;
use nym_topology::NymTopologyError;
use nym_validator_client::ValidatorClientError;
use std::error::Error;
use std::path::PathBuf;
@@ -74,10 +74,10 @@ pub enum ClientCoreError {
#[error("the gateway id is invalid - {0}")]
UnableToCreatePublicKeyFromGatewayId(Ed25519RecoveryError),
#[error("the node is malformed: {source}")]
#[error("The gateway is malformed: {source}")]
MalformedGateway {
#[from]
source: Box<RoutingNodeError>,
source: GatewayConversionError,
},
#[error("failed to establish connection to gateway: {source}")]
@@ -159,9 +159,6 @@ pub enum ClientCoreError {
#[error("the specified gateway '{gateway}' does not support the wss protocol")]
UnsupportedWssProtocol { gateway: String },
#[error("node {id} ({identity}) does not support mixnet entry mode")]
UnsupportedEntry { id: NodeId, identity: String },
#[error(
"failed to load custom topology using path '{}'. detailed message: {source}", file_path.display()
)]
+20 -43
View File
@@ -7,7 +7,7 @@ use futures::{SinkExt, StreamExt};
use log::{debug, info, trace, warn};
use nym_crypto::asymmetric::identity;
use nym_gateway_client::GatewayClient;
use nym_topology::node::RoutingNode;
use nym_topology::gateway;
use nym_validator_client::client::IdentityKeyRef;
use nym_validator_client::UserAgent;
use rand::{seq::SliceRandom, Rng};
@@ -15,7 +15,6 @@ use std::{sync::Arc, time::Duration};
use tungstenite::Message;
use url::Url;
use nym_topology::NodeId;
#[cfg(not(target_arch = "wasm32"))]
use tokio::net::TcpStream;
#[cfg(not(target_arch = "wasm32"))]
@@ -26,6 +25,7 @@ use tokio::time::Instant;
use tokio_tungstenite::connect_async;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
#[cfg(target_arch = "wasm32")]
use wasm_utils::websocket::JSWebsocket;
#[cfg(target_arch = "wasm32")]
@@ -48,30 +48,22 @@ const PING_TIMEOUT: Duration = Duration::from_millis(1000);
// The abstraction that some of these helpers use
pub trait ConnectableGateway {
fn node_id(&self) -> NodeId;
fn identity(&self) -> identity::PublicKey;
fn clients_address(&self, prefer_ipv6: bool) -> Option<String>;
fn identity(&self) -> &identity::PublicKey;
fn clients_address(&self) -> String;
fn is_wss(&self) -> bool;
}
impl ConnectableGateway for RoutingNode {
fn node_id(&self) -> NodeId {
self.node_id
impl ConnectableGateway for gateway::LegacyNode {
fn identity(&self) -> &identity::PublicKey {
self.identity()
}
fn identity(&self) -> identity::PublicKey {
self.identity_key
}
fn clients_address(&self, prefer_ipv6: bool) -> Option<String> {
self.ws_entry_address(prefer_ipv6)
fn clients_address(&self) -> String {
self.clients_address()
}
fn is_wss(&self) -> bool {
self.entry
.as_ref()
.map(|e| e.clients_wss_port.is_some())
.unwrap_or_default()
self.clients_wss_port.is_some()
}
}
@@ -86,12 +78,12 @@ impl<'a, G: ConnectableGateway> GatewayWithLatency<'a, G> {
}
}
pub async fn gateways_for_init<R: Rng>(
pub async fn current_gateways<R: Rng>(
rng: &mut R,
nym_apis: &[Url],
user_agent: Option<UserAgent>,
minimum_performance: u8,
) -> Result<Vec<RoutingNode>, ClientCoreError> {
) -> Result<Vec<gateway::LegacyNode>, ClientCoreError> {
let nym_api = nym_apis
.choose(rng)
.ok_or(ClientCoreError::ListOfNymApisIsEmpty)?;
@@ -108,14 +100,11 @@ pub async fn gateways_for_init<R: Rng>(
log::trace!("Gateways: {:#?}", gateways);
// filter out gateways below minimum performance and ones that could operate as a mixnode
// (we don't want instability)
let valid_gateways = gateways
.iter()
.filter(|g| !g.supported_roles.mixnode)
.filter(|g| g.performance.round_to_integer() >= minimum_performance)
.filter_map(|gateway| gateway.try_into().ok())
.collect::<Vec<_>>();
.collect::<Vec<gateway::LegacyNode>>();
log::debug!("After checking validity: {}", valid_gateways.len());
log::trace!("Valid gateways: {:#?}", valid_gateways);
@@ -145,12 +134,7 @@ async fn measure_latency<G>(gateway: &G) -> Result<GatewayWithLatency<G>, Client
where
G: ConnectableGateway,
{
let Some(addr) = gateway.clients_address(false) else {
return Err(ClientCoreError::UnsupportedEntry {
id: gateway.node_id(),
identity: gateway.identity().to_string(),
});
};
let addr = gateway.clients_address();
trace!(
"establishing connection to {} ({addr})...",
gateway.identity(),
@@ -221,7 +205,7 @@ pub async fn choose_gateway_by_latency<R: Rng, G: ConnectableGateway + Clone>(
let gateways_with_latency = Arc::new(tokio::sync::Mutex::new(Vec::new()));
futures::stream::iter(gateways)
.for_each_concurrent(CONCURRENT_GATEWAYS_MEASURED, |gateway| async {
let id = gateway.identity();
let id = *gateway.identity();
trace!("measuring latency to {id}...");
match measure_latency(gateway).await {
Ok(with_latency) => {
@@ -268,9 +252,9 @@ fn filter_by_tls<G: ConnectableGateway>(
pub(super) fn uniformly_random_gateway<R: Rng>(
rng: &mut R,
gateways: &[RoutingNode],
gateways: &[gateway::LegacyNode],
must_use_tls: bool,
) -> Result<RoutingNode, ClientCoreError> {
) -> Result<gateway::LegacyNode, ClientCoreError> {
filter_by_tls(gateways, must_use_tls)?
.choose(rng)
.ok_or(ClientCoreError::NoGatewaysOnNetwork)
@@ -279,9 +263,9 @@ pub(super) fn uniformly_random_gateway<R: Rng>(
pub(super) fn get_specified_gateway(
gateway_identity: IdentityKeyRef,
gateways: &[RoutingNode],
gateways: &[gateway::LegacyNode],
must_use_tls: bool,
) -> Result<RoutingNode, ClientCoreError> {
) -> Result<gateway::LegacyNode, ClientCoreError> {
log::debug!("Requesting specified gateway: {}", gateway_identity);
let user_gateway = identity::PublicKey::from_base58_string(gateway_identity)
.map_err(ClientCoreError::UnableToCreatePublicKeyFromGatewayId)?;
@@ -291,14 +275,7 @@ pub(super) fn get_specified_gateway(
.find(|gateway| gateway.identity_key == user_gateway)
.ok_or_else(|| ClientCoreError::NoGatewayWithId(gateway_identity.to_string()))?;
let Some(entry_details) = gateway.entry.as_ref() else {
return Err(ClientCoreError::UnsupportedEntry {
id: gateway.node_id,
identity: gateway.identity().to_string(),
});
};
if must_use_tls && entry_details.clients_wss_port.is_none() {
if must_use_tls && gateway.clients_wss_port.is_none() {
return Err(ClientCoreError::UnsupportedWssProtocol {
gateway: gateway_identity.to_string(),
});
+2 -2
View File
@@ -19,7 +19,7 @@ use crate::init::types::{
use nym_client_core_gateways_storage::GatewaysDetailsStore;
use nym_client_core_gateways_storage::{GatewayDetails, GatewayRegistration};
use nym_gateway_client::client::InitGatewayClient;
use nym_topology::node::RoutingNode;
use nym_topology::gateway;
use rand::rngs::OsRng;
use rand::{CryptoRng, RngCore};
use serde::Serialize;
@@ -50,7 +50,7 @@ async fn setup_new_gateway<K, D>(
key_store: &K,
details_store: &D,
selection_specification: GatewaySelectionSpecification,
available_gateways: Vec<RoutingNode>,
available_gateways: Vec<gateway::LegacyNode>,
) -> Result<InitialisationResult, ClientCoreError>
where
K: KeyStore,
+6 -41
View File
@@ -13,11 +13,11 @@ use nym_crypto::asymmetric::identity;
use nym_gateway_client::client::InitGatewayClient;
use nym_gateway_requests::shared_key::SharedGatewayKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_topology::node::RoutingNode;
use nym_topology::gateway;
use nym_validator_client::client::IdentityKey;
use nym_validator_client::nyxd::AccountId;
use serde::Serialize;
use std::fmt::{Debug, Display};
use std::fmt::Display;
use std::sync::Arc;
use time::OffsetDateTime;
use url::Url;
@@ -38,23 +38,16 @@ pub enum SelectedGateway {
impl SelectedGateway {
pub fn from_topology_node(
node: RoutingNode,
node: gateway::LegacyNode,
must_use_tls: bool,
) -> Result<Self, ClientCoreError> {
// for now, let's use 'old' behaviour, if you want to change it, you can pass it up the enum stack yourself : )
let prefer_ipv6 = false;
let gateway_listener = if must_use_tls {
node.ws_entry_address_tls()
node.clients_address_tls()
.ok_or(ClientCoreError::UnsupportedWssProtocol {
gateway: node.identity_key.to_base58_string(),
})?
} else {
node.ws_entry_address(prefer_ipv6)
.ok_or(ClientCoreError::UnsupportedEntry {
id: node.node_id,
identity: node.identity_key.to_base58_string(),
})?
node.clients_address()
};
let gateway_listener =
@@ -207,7 +200,7 @@ pub enum GatewaySetup {
specification: GatewaySelectionSpecification,
// TODO: seems to be a bit inefficient to pass them by value
available_gateways: Vec<RoutingNode>,
available_gateways: Vec<gateway::LegacyNode>,
},
ReuseConnection {
@@ -221,34 +214,6 @@ pub enum GatewaySetup {
},
}
impl Debug for GatewaySetup {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
GatewaySetup::MustLoad { gateway_id } => f
.debug_struct("GatewaySetup::MustLoad")
.field("gateway_id", gateway_id)
.finish(),
GatewaySetup::New {
specification,
available_gateways,
} => f
.debug_struct("GatewaySetup::New")
.field("specification", specification)
.field("available_gateways", available_gateways)
.field("gateways", specification)
.finish(),
GatewaySetup::ReuseConnection {
gateway_details, ..
} => f
.debug_struct("GatewaySetup::ReuseConnection")
.field("authenticated_ephemeral_client", &"***")
.field("gateway_details", gateway_details)
.field("client_keys", &"***")
.finish(),
}
}
}
impl GatewaySetup {
pub fn try_reuse_connection(init_res: InitialisationResult) -> Result<Self, ClientCoreError> {
if let Some(authenticated_ephemeral_client) = init_res.authenticated_ephemeral_client {
+2 -1
View File
@@ -14,7 +14,8 @@ pub mod error;
pub mod init;
pub use nym_topology::{
HardcodedTopologyProvider, NymRouteProvider, NymTopology, NymTopologyError, TopologyProvider,
HardcodedTopologyProvider, NymTopology, NymTopologyError, SerializableNymTopology,
SerializableTopologyError, TopologyProvider,
};
#[cfg(target_arch = "wasm32")]
@@ -22,7 +22,7 @@ mod error;
mod manager;
mod models;
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct Backend {
temporary_old_path: Option<PathBuf>,
database_path: PathBuf,
@@ -19,7 +19,7 @@ pub mod fs_backend;
#[error("no information provided")]
pub struct UndefinedError;
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct Empty {
// we need to keep 'basic' metadata here to "load" the CombinedReplyStorage
pub min_surb_threshold: usize,
@@ -19,9 +19,8 @@ use nym_api_requests::ecash::{
PartialExpirationDateSignatureResponse, VerificationKeyResponse,
};
use nym_api_requests::models::{
ApiHealthResponse, GatewayBondAnnotated, GatewayCoreStatusResponse,
HistoricalPerformanceResponse, MixnodeCoreStatusResponse, MixnodeStatusResponse,
NymNodeDescription, RewardEstimationResponse, StakeSaturationResponse,
ApiHealthResponse, GatewayBondAnnotated, GatewayCoreStatusResponse, MixnodeCoreStatusResponse,
MixnodeStatusResponse, NymNodeDescription, RewardEstimationResponse, StakeSaturationResponse,
};
use nym_api_requests::models::{LegacyDescribedGateway, MixNodeBondAnnotated};
use nym_api_requests::nym_nodes::SkimmedNode;
@@ -33,10 +32,10 @@ use time::Date;
use url::Url;
pub use crate::nym_api::NymApiClientExt;
use nym_mixnet_contract_common::EpochRewardedSet;
pub use nym_mixnet_contract_common::{
mixnode::MixNodeDetails, GatewayBond, IdentityKey, IdentityKeyRef, NodeId, NymNodeDetails,
};
// re-export the type to not break existing imports
pub use crate::coconut::EcashApiClient;
@@ -265,31 +264,6 @@ impl<C, S> Client<C, S> {
Ok(self.nym_api.get_gateways_detailed_unfiltered().await?)
}
pub async fn get_full_node_performance_history(
&self,
node_id: NodeId,
) -> Result<Vec<HistoricalPerformanceResponse>, ValidatorClientError> {
// TODO: deal with paging in macro or some helper function or something, because it's the same pattern everywhere
let mut page = 0;
let mut history = Vec::new();
loop {
let mut res = self
.nym_api
.get_node_performance_history(node_id, Some(page), None)
.await?;
history.append(&mut res.history.data);
if history.len() < res.history.pagination.total {
page += 1
} else {
break;
}
}
Ok(history)
}
// TODO: combine with NymApiClient...
pub async fn get_all_cached_described_nodes(
&self,
@@ -393,10 +367,6 @@ impl NymApiClient {
Ok(self.nym_api.get_basic_gateways().await?.nodes)
}
pub async fn get_current_rewarded_set(&self) -> Result<EpochRewardedSet, ValidatorClientError> {
Ok(self.nym_api.get_rewarded_set().await?.into())
}
/// retrieve basic information for nodes are capable of operating as an entry gateway
/// this includes legacy gateways and nym-nodes
pub async fn get_all_basic_entry_assigned_nodes(
@@ -65,6 +65,12 @@ pub enum EcashApiError {
#[from]
source: cosmrs::ErrorReport,
},
#[error("nym api error")]
NymApi {
#[from]
source: crate::ValidatorClientError,
},
}
impl TryFrom<ContractVKShare> for EcashApiClient {
@@ -13,7 +13,7 @@ use nym_api_requests::ecash::models::{
use nym_api_requests::ecash::VerificationKeyResponse;
use nym_api_requests::models::{
AnnotationResponse, ApiHealthResponse, LegacyDescribedMixNode, NodePerformanceResponse,
NodeRefreshBody, NymNodeDescription, PerformanceHistoryResponse, RewardedSetResponse,
NodeRefreshBody, NymNodeDescription,
};
use nym_api_requests::nym_nodes::PaginatedCachedNodesResponse;
use nym_api_requests::pagination::PaginatedResponse;
@@ -163,35 +163,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[tracing::instrument(level = "debug", skip_all)]
async fn get_node_performance_history(
&self,
node_id: NodeId,
page: Option<u32>,
per_page: Option<u32>,
) -> Result<PerformanceHistoryResponse, NymAPIError> {
let mut params = Vec::new();
if let Some(page) = page {
params.push(("page", page.to_string()))
}
if let Some(per_page) = per_page {
params.push(("per_page", per_page.to_string()))
}
self.get_json(
&[
routes::API_VERSION,
routes::NYM_NODES_ROUTES,
routes::NYM_NODES_PERFORMANCE_HISTORY,
&*node_id.to_string(),
],
&params,
)
.await
}
#[tracing::instrument(level = "debug", skip_all)]
async fn get_nodes_described(
&self,
@@ -208,15 +179,8 @@ pub trait NymApiClientExt: ApiClient {
params.push(("per_page", per_page.to_string()))
}
self.get_json(
&[
routes::API_VERSION,
routes::NYM_NODES_ROUTES,
routes::NYM_NODES_DESCRIBED,
],
&params,
)
.await
self.get_json(&[routes::API_VERSION, "nym-nodes", "described"], &params)
.await
}
#[tracing::instrument(level = "debug", skip_all)]
@@ -235,15 +199,8 @@ pub trait NymApiClientExt: ApiClient {
params.push(("per_page", per_page.to_string()))
}
self.get_json(
&[
routes::API_VERSION,
routes::NYM_NODES_ROUTES,
routes::NYM_NODES_BONDED,
],
&params,
)
.await
self.get_json(&[routes::API_VERSION, "nym-nodes", "bonded"], &params)
.await
}
#[deprecated]
@@ -253,7 +210,7 @@ pub trait NymApiClientExt: ApiClient {
&[
routes::API_VERSION,
"unstable",
routes::NYM_NODES_ROUTES,
"nym-nodes",
"mixnodes",
"skimmed",
],
@@ -269,7 +226,7 @@ pub trait NymApiClientExt: ApiClient {
&[
routes::API_VERSION,
"unstable",
routes::NYM_NODES_ROUTES,
"nym-nodes",
"gateways",
"skimmed",
],
@@ -278,19 +235,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_rewarded_set(&self) -> Result<RewardedSetResponse, NymAPIError> {
self.get_json(
&[
routes::API_VERSION,
routes::NYM_NODES_ROUTES,
routes::NYM_NODES_REWARDED_SET,
],
NO_PARAMS,
)
.await
}
/// retrieve basic information for nodes are capable of operating as an entry gateway
/// this includes legacy gateways and nym-nodes
#[instrument(level = "debug", skip(self))]
@@ -318,7 +262,7 @@ pub trait NymApiClientExt: ApiClient {
&[
routes::API_VERSION,
"unstable",
routes::NYM_NODES_ROUTES,
"nym-nodes",
"skimmed",
"entry-gateways",
"all",
@@ -355,7 +299,7 @@ pub trait NymApiClientExt: ApiClient {
&[
routes::API_VERSION,
"unstable",
routes::NYM_NODES_ROUTES,
"nym-nodes",
"skimmed",
"mixnodes",
"active",
@@ -392,7 +336,7 @@ pub trait NymApiClientExt: ApiClient {
&[
routes::API_VERSION,
"unstable",
routes::NYM_NODES_ROUTES,
"nym-nodes",
"skimmed",
"mixnodes",
"all",
@@ -424,12 +368,7 @@ pub trait NymApiClientExt: ApiClient {
}
self.get_json(
&[
routes::API_VERSION,
"unstable",
routes::NYM_NODES_ROUTES,
"skimmed",
],
&[routes::API_VERSION, "unstable", "nym-nodes", "skimmed"],
&params,
)
.await
@@ -738,8 +677,8 @@ pub trait NymApiClientExt: ApiClient {
self.get_json(
&[
routes::API_VERSION,
routes::NYM_NODES_ROUTES,
routes::NYM_NODES_PERFORMANCE,
"nym-nodes",
"performance",
&node_id.to_string(),
],
NO_PARAMS,
@@ -754,8 +693,8 @@ pub trait NymApiClientExt: ApiClient {
self.get_json(
&[
routes::API_VERSION,
routes::NYM_NODES_ROUTES,
routes::NYM_NODES_ANNOTATION,
"nym-nodes",
"annotation",
&node_id.to_string(),
],
NO_PARAMS,
@@ -973,24 +912,18 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn force_refresh_describe_cache(
&self,
request: &NodeRefreshBody,
) -> Result<(), NymAPIError> {
self.post_json(
&[
routes::API_VERSION,
routes::NYM_NODES_ROUTES,
routes::NYM_NODES_REFRESH_DESCRIBED,
],
&[routes::API_VERSION, "nym-nodes", "refresh-described"],
NO_PARAMS,
request,
)
.await
}
#[instrument(level = "debug", skip(self))]
async fn issued_ticketbooks_for(
&self,
expiration_date: Date,
@@ -1007,7 +940,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn issued_ticketbooks_challenge(
&self,
expiration_date: Date,
@@ -34,19 +34,6 @@ pub mod ecash {
pub const EPOCH_ID_PARAM: &str = "epoch_id";
}
pub const NYM_NODES_ROUTES: &str = "nym-nodes";
pub use nym_nodes::*;
pub mod nym_nodes {
pub const NYM_NODES_PERFORMANCE_HISTORY: &str = "performance-history";
pub const NYM_NODES_PERFORMANCE: &str = "performance";
pub const NYM_NODES_ANNOTATION: &str = "annotation";
pub const NYM_NODES_DESCRIBED: &str = "described";
pub const NYM_NODES_BONDED: &str = "bonded";
pub const NYM_NODES_REWARDED_SET: &str = "rewarded-set";
pub const NYM_NODES_REFRESH_DESCRIBED: &str = "refresh-described";
}
pub const STATUS_ROUTES: &str = "status";
pub const API_STATUS_ROUTES: &str = "api-status";
pub const HEALTH: &str = "health";
@@ -26,10 +26,10 @@ use nym_mixnet_contract_common::{
reward_params::{Performance, RewardingParams},
rewarding::{EstimatedCurrentEpochRewardResponse, PendingRewardResponse},
ContractBuildInformation, ContractState, ContractStateParams, CurrentIntervalResponse,
CurrentNymNodeVersionResponse, Delegation, EpochEventId, EpochRewardedSet, EpochStatus,
GatewayBond, GatewayBondResponse, GatewayOwnershipResponse, HistoricalNymNodeVersionEntry,
IdentityKey, IdentityKeyRef, IntervalEventId, MixNodeBond, MixNodeDetails,
MixOwnershipResponse, MixnodeDetailsByIdentityResponse, MixnodeDetailsResponse, NodeId,
CurrentNymNodeVersionResponse, Delegation, EpochEventId, EpochStatus, GatewayBond,
GatewayBondResponse, GatewayOwnershipResponse, HistoricalNymNodeVersionEntry, IdentityKey,
IdentityKeyRef, IntervalEventId, MixNodeBond, MixNodeDetails, MixOwnershipResponse,
MixnodeDetailsByIdentityResponse, MixnodeDetailsResponse, NodeId,
NumberOfPendingEventsResponse, NymNodeBond, NymNodeDetails, NymNodeVersionHistoryResponse,
PagedAllDelegationsResponse, PagedDelegatorDelegationsResponse, PagedGatewayResponse,
PagedMixnodeBondsResponse, PagedNodeDelegationsResponse, PendingEpochEvent,
@@ -670,7 +670,7 @@ impl<T> PagedMixnetQueryClient for T where T: MixnetQueryClient {}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait MixnetQueryClientExt: MixnetQueryClient {
async fn get_rewarded_set(&self) -> Result<EpochRewardedSet, NyxdError> {
async fn get_rewarded_set(&self) -> Result<RewardedSet, NyxdError> {
let error_response = |message| Err(NyxdError::extension_query_failure("mixnet", message));
let metadata = self.get_rewarded_set_metadata().await?;
@@ -711,16 +711,13 @@ pub trait MixnetQueryClientExt: MixnetQueryClient {
return error_response("the nodes assigned for 'standby' returned unexpected epoch_id");
}
Ok(EpochRewardedSet {
epoch_id: expected_epoch_id,
assignment: RewardedSet {
entry_gateways: entry.nodes,
exit_gateways: exit.nodes,
layer1: layer1.nodes,
layer2: layer2.nodes,
layer3: layer3.nodes,
standby: standby.nodes,
},
Ok(RewardedSet {
entry_gateways: entry.nodes,
exit_gateways: exit.nodes,
layer1: layer1.nodes,
layer2: layer2.nodes,
layer3: layer3.nodes,
standby: standby.nodes,
})
}
}
@@ -153,20 +153,13 @@ pub trait CosmWasmClient: TendermintRpcClient {
let req = QueryAllBalancesRequest {
address: address.to_string(),
pagination,
resolve_denom: false,
};
let mut res = self
.make_abci_query::<_, QueryAllBalancesResponse>(path.clone(), req)
.await?;
let early_break = res.balances.is_empty();
raw_balances.append(&mut res.balances);
if early_break {
break;
}
if let Some(next_key) = next_page_key(res.pagination) {
pagination = Some(create_pagination(next_key))
} else {
@@ -194,13 +187,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
.make_abci_query::<_, QueryTotalSupplyResponse>(path.clone(), req)
.await?;
let early_break = res.supply.is_empty();
supply.append(&mut res.supply);
if early_break {
break;
}
if let Some(next_key) = next_page_key(res.pagination) {
pagination = Some(create_pagination(next_key))
} else {
@@ -341,13 +328,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
.make_abci_query::<_, QueryCodesResponse>(path.clone(), req)
.await?;
let early_break = res.code_infos.is_empty();
raw_codes.append(&mut res.code_infos);
if early_break {
break;
}
if let Some(next_key) = next_page_key(res.pagination) {
pagination = Some(create_pagination(next_key))
} else {
@@ -392,13 +373,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
.make_abci_query::<_, QueryContractsByCodeResponse>(path.clone(), req)
.await?;
let early_break = res.contracts.is_empty();
raw_contracts.append(&mut res.contracts);
if early_break {
break;
}
if let Some(next_key) = next_page_key(res.pagination) {
pagination = Some(create_pagination(next_key))
} else {
@@ -454,13 +429,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
.make_abci_query::<_, QueryContractHistoryResponse>(path.clone(), req)
.await?;
let early_break = res.entries.is_empty();
raw_entries.append(&mut res.entries);
if early_break {
break;
}
if let Some(next_key) = next_page_key(res.pagination) {
pagination = Some(create_pagination(next_key))
} else {
@@ -4,11 +4,9 @@
use crate::rpc::TendermintRpcClient;
use async_trait::async_trait;
use base64::Engine;
use cosmrs::tendermint;
use cosmrs::tendermint::{block::Height, evidence::Evidence, Hash};
use reqwest::header::HeaderMap;
use reqwest::{header, RequestBuilder};
use tendermint_rpc::dialect::{v0_34, v0_37, v0_38, LatestDialect};
use tendermint_rpc::{
client::CompatMode,
dialect::{self, Dialect},
@@ -23,21 +21,8 @@ macro_rules! perform_with_compat {
($self:expr, $request:expr) => {{
let request = $request;
match $self.compat {
CompatMode::V0_38 => {
$self
.perform_request_with_dialect(request, dialect::v0_38::Dialect)
.await
}
CompatMode::V0_37 => {
$self
.perform_request_with_dialect(request, dialect::v0_37::Dialect)
.await
}
CompatMode::V0_34 => {
$self
.perform_request_with_dialect(request, dialect::v0_34::Dialect)
.await
}
CompatMode::V0_37 => $self.perform_v0_37(request).await,
CompatMode::V0_34 => $self.perform_v0_34(request).await,
}
}};
}
@@ -85,11 +70,7 @@ impl ReqwestRpcClient {
.headers(headers)
}
async fn perform_request_with_dialect<R, S>(
&self,
request: R,
_dialect: S,
) -> Result<R::Output, Error>
async fn perform_request<R, S>(&self, request: R) -> Result<R::Output, Error>
where
R: SimpleRequest<S>,
S: Dialect,
@@ -100,25 +81,26 @@ impl ReqwestRpcClient {
.send()
.await
.map_err(TendermintRpcErrorMap::into_rpc_err)?;
let response_status = response.status();
let bytes = response
.bytes()
.await
.map_err(TendermintRpcErrorMap::into_rpc_err)?;
// Successful JSON-RPC requests are expected to return a 200 OK HTTP status.
// Otherwise, this means that the HTTP request failed as a whole,
// as opposed to the JSON-RPC request returning an error,
// and we cannot expect the response body to be a valid JSON-RPC response.
if response_status != reqwest::StatusCode::OK {
// hehe, that's so nasty but we have to somehow convert between different versions of the same lib
return Err(Error::http_request_failed(
response_status.as_u16().try_into().unwrap(),
));
}
R::Response::from_string(bytes).map(Into::into)
}
async fn perform_v0_34<R>(&self, request: R) -> Result<R::Output, Error>
where
R: SimpleRequest<dialect::v0_34::Dialect>,
{
self.perform_request(request).await
}
async fn perform_v0_37<R>(&self, request: R) -> Result<R::Output, Error>
where
R: SimpleRequest<dialect::v0_37::Dialect>,
{
self.perform_request(request).await
}
}
trait TendermintRpcErrorMap {
@@ -138,50 +120,18 @@ impl TendermintRpcClient for ReqwestRpcClient {
where
R: SimpleRequest,
{
self.perform_request_with_dialect(request, LatestDialect)
.await
self.perform_request(request).await
}
async fn block<H>(&self, height: H) -> Result<endpoint::block::Response, Error>
async fn block_results<H>(&self, height: H) -> Result<block_results::Response, Error>
where
H: Into<Height> + Send,
{
perform_with_compat!(self, endpoint::block::Request::new(height.into()))
perform_with_compat!(self, block_results::Request::new(height.into()))
}
async fn block_by_hash(
&self,
hash: tendermint::Hash,
) -> Result<endpoint::block_by_hash::Response, Error> {
perform_with_compat!(self, endpoint::block_by_hash::Request::new(hash))
}
async fn latest_block(&self) -> Result<endpoint::block::Response, Error> {
perform_with_compat!(self, endpoint::block::Request::default())
}
async fn block_results<H>(&self, height: H) -> Result<endpoint::block_results::Response, Error>
where
H: Into<Height> + Send,
{
perform_with_compat!(self, endpoint::block_results::Request::new(height.into()))
}
async fn latest_block_results(&self) -> Result<endpoint::block_results::Response, Error> {
perform_with_compat!(self, endpoint::block_results::Request::default())
}
async fn block_search(
&self,
query: Query,
page: u32,
per_page: u8,
order: Order,
) -> Result<endpoint::block_search::Response, Error> {
perform_with_compat!(
self,
endpoint::block_search::Request::new(query, page, per_page, order)
)
async fn latest_block_results(&self) -> Result<block_results::Response, Error> {
perform_with_compat!(self, block_results::Request::default())
}
async fn header<H>(&self, height: H) -> Result<endpoint::header::Response, Error>
@@ -190,26 +140,11 @@ impl TendermintRpcClient for ReqwestRpcClient {
{
let height = height.into();
match self.compat {
CompatMode::V0_38 => {
self.perform_request_with_dialect(
endpoint::header::Request::new(height),
v0_38::Dialect,
)
.await
}
CompatMode::V0_37 => {
self.perform_request_with_dialect(
endpoint::header::Request::new(height),
v0_37::Dialect,
)
.await
}
CompatMode::V0_37 => self.perform(endpoint::header::Request::new(height)).await,
CompatMode::V0_34 => {
// Back-fill with a request to /block endpoint and
// taking just the header from the response.
let resp = self
.perform_request_with_dialect(block::Request::new(height), v0_34::Dialect)
.await?;
let resp = self.perform_v0_34(block::Request::new(height)).await?;
Ok(resp.into())
}
}
@@ -217,25 +152,12 @@ impl TendermintRpcClient for ReqwestRpcClient {
async fn header_by_hash(&self, hash: Hash) -> Result<header_by_hash::Response, Error> {
match self.compat {
CompatMode::V0_38 => {
self.perform_request_with_dialect(
header_by_hash::Request::new(hash),
v0_38::Dialect,
)
.await
}
CompatMode::V0_37 => {
self.perform_request_with_dialect(
header_by_hash::Request::new(hash),
v0_37::Dialect,
)
.await
}
CompatMode::V0_37 => self.perform(header_by_hash::Request::new(hash)).await,
CompatMode::V0_34 => {
// Back-fill with a request to /block_by_hash endpoint and
// taking just the header from the response.
let resp = self
.perform_request_with_dialect(block_by_hash::Request::new(hash), v0_34::Dialect)
.perform_v0_34(block_by_hash::Request::new(hash))
.await?;
Ok(resp.into())
}
@@ -245,18 +167,8 @@ impl TendermintRpcClient for ReqwestRpcClient {
/// `/broadcast_evidence`: broadcast an evidence.
async fn broadcast_evidence(&self, e: Evidence) -> Result<evidence::Response, Error> {
match self.compat {
CompatMode::V0_38 => {
self.perform_request_with_dialect(evidence::Request::new(e), v0_38::Dialect)
.await
}
CompatMode::V0_37 => {
self.perform_request_with_dialect(evidence::Request::new(e), v0_37::Dialect)
.await
}
CompatMode::V0_34 => {
self.perform_request_with_dialect(evidence::Request::new(e), v0_34::Dialect)
.await
}
CompatMode::V0_37 => self.perform(evidence::Request::new(e)).await,
CompatMode::V0_34 => self.perform_v0_34(evidence::Request::new(e)).await,
}
}
+1 -2
View File
@@ -12,8 +12,7 @@ dirs = { workspace = true, optional = true }
handlebars = { workspace = true }
log = { workspace = true }
serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
toml = { workspace = true, features = ["display"] }
toml = { workspace = true }
url = { workspace = true }
nym-network-defaults = { path = "../network-defaults", features = ["utoipa"] }
-10
View File
@@ -1,10 +0,0 @@
use std::io;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum NymConfigTomlError {
#[error(transparent)]
FileIoFailure(#[from] io::Error),
#[error(transparent)]
TomlSerializeFailure(#[from] toml::ser::Error),
}
-37
View File
@@ -13,7 +13,6 @@ pub use helpers::{parse_urls, OptionalSet};
pub use toml::de::Error as TomlDeError;
pub mod defaults;
pub mod error;
pub mod helpers;
pub mod legacy_helpers;
pub mod serde_helpers;
@@ -96,42 +95,6 @@ where
config.format_to_writer(file)
}
pub fn save_unformatted_config_to_file<C, P>(
config: &C,
path: P,
) -> Result<(), error::NymConfigTomlError>
where
C: Serialize + ?Sized,
P: AsRef<Path>,
{
let path = path.as_ref();
log::info!("saving config file to {}", path.display());
if let Some(parent) = path.parent() {
create_dir_all(parent)?;
}
let mut file = File::create(path)?;
// TODO: check for whether any of our configs store anything sensitive
// and change that to 0o644 instead
#[cfg(target_family = "unix")]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = fs::metadata(path)?.permissions();
perms.set_mode(0o600);
fs::set_permissions(path, perms)?;
}
// let serde format the TOML in whatever ugly way it chooses
// TODO: in https://docs.rs/toml/latest/toml/fn.to_string_pretty.html it recommends using
// https://docs.rs/toml_edit/latest/toml_edit/struct.DocumentMut.html to preserve formatting
let toml_string = toml::to_string_pretty(config)?;
Ok(file.write_all(toml_string.as_bytes())?)
}
pub fn deserialize_config_from_toml_str<C>(raw: &str) -> Result<C, TomlDeError>
where
C: DeserializeOwned,
@@ -13,7 +13,6 @@ cosmwasm-std = { workspace = true }
cosmwasm-schema = { workspace = true }
cw-storage-plus = { workspace = true }
schemars = { workspace = true }
utoipa = { workspace = true, optional = true }
serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
@@ -24,5 +23,4 @@ serde_json = { workspace = true }
vergen = { workspace = true, features = ["build", "git", "gitcl", "rustc", "cargo"] }
[features]
naive_float = []
utoipa = ["dep:utoipa"]
naive_float = []
@@ -221,7 +221,6 @@ fn default_unknown() -> String {
// TODO: there's no reason this couldn't be used for proper binaries, but in that case
// perhaps the struct should get renamed and moved to a "more" common crate
#[cw_serde]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct ContractBuildInformation {
/// Provides the name of the binary, i.e. the content of `CARGO_PKG_NAME` environmental variable.
#[serde(default = "default_unknown")]
@@ -42,11 +42,9 @@ pub struct Gateway {
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct GatewayBond {
/// Original amount pledged by the operator of this node.
#[cfg_attr(feature = "utoipa", schema(value_type = crate::CoinSchema))]
pub pledge_amount: Coin,
/// Address of the owner of this gateway.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub owner: Addr,
/// Block height at which this gateway has been bonded.
@@ -57,7 +55,6 @@ pub struct GatewayBond {
/// Entity who bonded this gateway on behalf of the owner.
/// If exists, it's most likely the address of the vesting contract.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub proxy: Option<Addr>,
}
@@ -7,7 +7,6 @@
use crate::constants::{TOKEN_SUPPLY, UNIT_DELEGATION_BASE};
use crate::error::MixnetContractError;
use crate::helpers::IntoBaseDecimal;
use crate::nym_node::Role;
use crate::reward_params::{NodeRewardingParameters, RewardingParams};
use crate::rewarding::helpers::truncate_reward;
use crate::rewarding::RewardDistribution;
@@ -82,25 +81,20 @@ impl MixNodeDetails {
// currently this struct is shared between mixnodes and nymnodes
#[cw_serde]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct NodeRewarding {
/// Information provided by the operator that influence the cost function.
pub cost_params: NodeCostParams,
/// Total pledge and compounded reward earned by the node operator.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub operator: Decimal,
/// Total delegation and compounded reward earned by all node delegators.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub delegates: Decimal,
/// Cumulative reward earned by the "unit delegation" since the block 0.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub total_unit_reward: Decimal,
/// Value of the theoretical "unit delegation" that has delegated to this node at block 0.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub unit_delegation: Decimal,
/// Marks the epoch when this node was last rewarded so that we wouldn't accidentally attempt
@@ -497,17 +491,14 @@ impl NodeRewarding {
::cosmwasm_schema::schemars::JsonSchema,
)]
#[schemars(crate = "::cosmwasm_schema::schemars")]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct MixNodeBond {
/// Unique id assigned to the bonded mixnode.
pub mix_id: NodeId,
/// Address of the owner of this mixnode.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub owner: Addr,
/// Original amount pledged by the operator of this node.
#[cfg_attr(feature = "utoipa", schema(value_type = crate::CoinSchema))]
pub original_pledge: Coin,
// REMOVED (but might be needed due to legacy things, idk yet)
@@ -518,7 +509,6 @@ pub struct MixNodeBond {
/// Entity who bonded this mixnode on behalf of the owner.
/// If exists, it's most likely the address of the vesting contract.
#[cfg_attr(feature = "utoipa", schema(value_type = Option<String>))]
pub proxy: Option<Addr>,
/// Block height at which this mixnode has been bonded.
@@ -554,7 +544,6 @@ impl MixNodeBond {
feature = "generate-ts",
ts(export, export_to = "ts-packages/types/src/types/rust/Mixnode.ts")
)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct MixNode {
/// Network address of this mixnode, for example 1.1.1.1 or foo.mixnode.com
pub host: String,
@@ -581,14 +570,11 @@ pub struct MixNode {
/// The cost parameters, or the cost function, defined for the particular mixnode that influences
/// how the rewards should be split between the node operator and its delegators.
#[cw_serde]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct NodeCostParams {
/// The profit margin of the associated node, i.e. the desired percent of the reward to be distributed to the operator.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub profit_margin_percent: Percent,
/// Operating cost of the associated node per the entire interval.
#[cfg_attr(feature = "utoipa", schema(value_type = crate::CoinSchema))]
pub interval_operating_cost: Coin,
}
@@ -625,16 +611,6 @@ pub enum LegacyMixLayer {
Three = 3,
}
impl From<LegacyMixLayer> for Role {
fn from(layer: LegacyMixLayer) -> Self {
match layer {
LegacyMixLayer::One => Role::Layer1,
LegacyMixLayer::Two => Role::Layer2,
LegacyMixLayer::Three => Role::Layer3,
}
}
}
impl From<LegacyMixLayer> for String {
fn from(layer: LegacyMixLayer) -> Self {
(layer as u8).to_string()
@@ -693,9 +669,7 @@ pub struct PendingMixNodeChanges {
}
#[derive(Default, Copy, Clone, Debug, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct LegacyPendingMixNodeChanges {
#[cfg_attr(feature = "utoipa", schema(value_type = Option<u32>))]
pub pledge_change: Option<EpochEventId>,
}
@@ -113,10 +113,6 @@ impl Role {
pub fn is_standby(&self) -> bool {
matches!(self, Role::Standby)
}
pub fn is_mixnode(&self) -> bool {
matches!(self, Role::Layer1 | Role::Layer2 | Role::Layer3)
}
}
impl Display for Role {
@@ -235,7 +231,6 @@ pub struct RoleMetadata {
/// Full details associated with given node.
#[cw_serde]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct NymNodeDetails {
/// Basic bond information of this node, such as owner address, original pledge, etc.
pub bond_information: NymNodeBond,
@@ -293,19 +288,14 @@ impl NymNodeDetails {
}
#[cw_serde]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct NymNodeBond {
/// Unique id assigned to the bonded node.
#[cfg_attr(feature = "utoipa", schema(value_type = u32))]
pub node_id: NodeId,
/// Address of the owner of this nym-node.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub owner: Addr,
/// Original amount pledged by the operator of this node.
#[cfg_attr(feature = "utoipa", schema(value_type = crate::CoinSchema))]
pub original_pledge: Coin,
/// Block height at which this nym-node has been bonded.
@@ -358,7 +348,6 @@ impl NymNodeBond {
feature = "generate-ts",
ts(export, export_to = "ts-packages/types/src/types/rust/NymNode.ts")
)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct NymNode {
/// Network address of this nym-node, for example 1.1.1.1 or foo.mixnode.com
/// that is used to discover other capabilities of this node.
@@ -369,7 +358,6 @@ pub struct NymNode {
pub custom_http_port: Option<u16>,
/// Base58-encoded ed25519 EdDSA public key.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub identity_key: IdentityKey,
// TODO: I don't think we want to include sphinx keys here,
// given we want to rotate them and keeping that in sync with contract will be a PITA
@@ -447,11 +435,8 @@ pub struct NodeConfigUpdate {
export_to = "ts-packages/types/src/types/rust/PendingNodeChanges.ts"
)
)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct PendingNodeChanges {
#[cfg_attr(feature = "utoipa", schema(value_type = Option<u32>))]
pub pledge_change: Option<EpochEventId>,
#[cfg_attr(feature = "utoipa", schema(value_type = Option<u32>))]
pub cost_params_change: Option<IntervalEventId>,
}
@@ -21,37 +21,31 @@ pub type WorkFactor = Decimal;
)]
#[cw_serde]
#[derive(Copy)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct IntervalRewardParams {
/// Current value of the rewarding pool.
/// It is expected to be constant throughout the interval.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub reward_pool: Decimal,
/// Current value of the staking supply.
/// It is expected to be constant throughout the interval.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub staking_supply: Decimal,
/// Defines the percentage of stake needed to reach saturation for all of the nodes in the rewarded set.
/// Also known as `beta`.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub staking_supply_scale_factor: Percent,
// computed values
/// Current value of the computed reward budget per epoch, per node.
/// It is expected to be constant throughout the interval.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub epoch_reward_budget: Decimal,
/// Current value of the stake saturation point.
/// It is expected to be constant throughout the interval.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub stake_saturation_point: Decimal,
// constants(-ish)
@@ -60,7 +54,6 @@ pub struct IntervalRewardParams {
/// It is not really expected to be changing very often.
/// As a matter of fact, unless there's a very specific reason, it should remain constant.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub sybil_resistance: Percent,
// default: 10
@@ -68,7 +61,6 @@ pub struct IntervalRewardParams {
/// It is not really expected to be changing very often.
/// As a matter of fact, unless there's a very specific reason, it should remain constant.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub active_set_work_factor: Decimal,
// default: 2%
@@ -78,7 +70,6 @@ pub struct IntervalRewardParams {
/// It is not really expected to be changing very often.
/// As a matter of fact, unless there's a very specific reason, it should remain constant.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub interval_pool_emission: Percent,
}
@@ -99,7 +90,6 @@ impl IntervalRewardParams {
)]
#[cw_serde]
#[derive(Copy)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct RewardingParams {
/// Parameters that should remain unchanged throughout an interval.
pub interval: IntervalRewardParams,
@@ -264,7 +254,6 @@ impl RewardingParams {
)]
#[cw_serde]
#[derive(Copy)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct RewardedSetParams {
/// The expected number of nodes assigned entry gateway role (i.e. [`Role::EntryGateway`])
pub entry_gateways: u32,
@@ -17,12 +17,10 @@ pub mod simulator;
)]
#[cw_serde]
#[derive(Copy, Default)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct RewardEstimate {
/// The amount of **decimal** coins that are going to get distributed to the node,
/// i.e. the operator and all its delegators.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub total_node_reward: Decimal,
// note that operator reward includes the operating_cost,
@@ -30,17 +28,14 @@ pub struct RewardEstimate {
// in that case the operator reward would still be `1nym` as opposed to 0
/// The share of the reward that is going to get distributed to the node operator.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub operator: Decimal,
/// The share of the reward that is going to get distributed among the node delegators.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub delegates: Decimal,
/// The operating cost of this node. Note: it's already included in the operator reward.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub operating_cost: Decimal,
}
@@ -3,7 +3,6 @@
use crate::config_score::{ConfigScoreParams, OutdatedVersionWeights, VersionScoreFormulaParams};
use crate::nym_node::Role;
use crate::EpochId;
use contracts_common::Percent;
use cosmwasm_schema::cw_serde;
use cosmwasm_std::Coin;
@@ -33,23 +32,6 @@ impl RoleAssignment {
}
}
#[cw_serde]
#[derive(Default)]
pub struct EpochRewardedSet {
pub epoch_id: EpochId,
pub assignment: RewardedSet,
}
impl From<(EpochId, RewardedSet)> for EpochRewardedSet {
fn from((epoch_id, assignment): (EpochId, RewardedSet)) -> Self {
EpochRewardedSet {
epoch_id,
assignment,
}
}
}
#[cw_serde]
#[derive(Default)]
pub struct RewardedSet {
@@ -87,29 +69,6 @@ impl RewardedSet {
pub fn rewarded_set_size(&self) -> usize {
self.active_set_size() + self.standby.len()
}
pub fn get_role(&self, node_id: NodeId) -> Option<Role> {
// given each role has ~100 entries in them, doing linear lookup with vec should be fine
if self.entry_gateways.contains(&node_id) {
return Some(Role::EntryGateway);
}
if self.exit_gateways.contains(&node_id) {
return Some(Role::ExitGateway);
}
if self.layer1.contains(&node_id) {
return Some(Role::Layer1);
}
if self.layer2.contains(&node_id) {
return Some(Role::Layer2);
}
if self.layer3.contains(&node_id) {
return Some(Role::Layer3);
}
if self.standby.contains(&node_id) {
return Some(Role::Standby);
}
None
}
}
#[cw_serde]
@@ -175,14 +134,6 @@ where
}
}
#[cfg(feature = "utoipa")]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
#[cfg_attr(feature = "utoipa", schema(title = "Coin"))]
pub struct CoinSchema {
pub denom: String,
pub amount: String,
}
/// The current state of the mixnet contract.
#[cw_serde]
pub struct ContractState {
+1 -1
View File
@@ -19,7 +19,7 @@ use std::error::Error;
// `SELECT total_tickets, used_tickets FROM ecash_ticketbook WHERE expiration_date >= ?`, today_date
// then for each calculate the diff total_tickets - used_tickets and multiply the result by the size of the ticket
#[async_trait]
pub trait Storage: Clone + Send + Sync {
pub trait Storage: Send + Sync {
type StorageError: Error;
async fn close(&self);
@@ -13,6 +13,7 @@ use nym_api_requests::constants::MIN_BATCH_REDEMPTION_DELAY;
use nym_api_requests::ecash::models::{BatchRedeemTicketsBody, VerifyEcashTicketBody};
use nym_credentials_interface::Bandwidth;
use nym_credentials_interface::{ClientTicket, TicketType};
use nym_validator_client::coconut::EcashApiError;
use nym_validator_client::nym_api::EpochId;
use nym_validator_client::nyxd::contract_traits::{
EcashSigningClient, MultisigQueryClient, MultisigSigningClient, PagedMultisigQueryClient,
@@ -352,7 +353,9 @@ impl CredentialHandler {
}
Err(err) => {
error!("failed to send ticket {ticket_id} for verification to ecash signer '{client}': {err}. if we don't reach quorum, we'll retry later");
Ok(false)
Err(EcashTicketError::ApiFailure(EcashApiError::NymApi {
source: err,
}))
}
}
}
-1
View File
@@ -16,7 +16,6 @@ serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
strum = { workspace = true, features = ["derive"] }
time = { workspace = true, features = ["serde"] }
utoipa = { workspace = true }
rand = { workspace = true }
nym-compact-ecash = { path = "../nym_offline_compact_ecash" }
@@ -86,6 +86,7 @@ impl Display for AddressPolicyAction {
/// ```
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[cfg_attr(feature = "openapi", aliases(ExitPolicy))]
pub struct AddressPolicy {
/// A list of rules to apply to find out whether an address is
/// contained by this policy.
@@ -726,10 +727,10 @@ mod test {
let policy = AddressPolicy::parse_from_torrc(
r#"
ExitPolicy reject 1.2.3.4/32:*
ExitPolicy reject 1.2.3.5:*
ExitPolicy reject 1.2.3.5:*
ExitPolicy reject 1.2.3.6/16:*
ExitPolicy reject 1.2.3.6/16:123-456
ExitPolicy accept *:53
ExitPolicy reject 1.2.3.6/16:123-456
ExitPolicy accept *:53
ExitPolicy accept6 *6:119
ExitPolicy accept *4:120
ExitPolicy reject6 [FC00::]/7:*
+1
View File
@@ -11,6 +11,7 @@ license.workspace = true
[dependencies]
bincode = { workspace = true }
defguard_wireguard_rs = { workspace = true }
log = { workspace = true }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
@@ -1,7 +0,0 @@
/*
* Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: GPL-3.0-only
*/
ALTER TABLE message_store
ADD COLUMN timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP;
+39 -28
View File
@@ -2,11 +2,9 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::models::StoredMessage;
use time::OffsetDateTime;
use tracing::debug;
#[derive(Clone)]
pub struct InboxManager {
pub(crate) struct InboxManager {
connection_pool: sqlx::SqlitePool,
/// Maximum number of messages that can be obtained from the database per operation.
/// It is used to prevent out of memory errors in the case of client receiving a lot of data while
@@ -73,22 +71,44 @@ impl InboxManager {
// get 1 additional message to check whether there will be more to grab
// next time
let limit = self.retrieval_limit + 1;
let start_after = start_after.unwrap_or(-1);
let mut res: Vec<StoredMessage> = sqlx::query_as(
r#"
SELECT id, client_address_bs58, content, timestamp
FROM message_store
WHERE client_address_bs58 = ? AND id > ?
ORDER BY id ASC
LIMIT ?;
"#,
)
.bind(client_address_bs58)
.bind(start_after)
.bind(limit)
.fetch_all(&self.connection_pool)
.await?;
let mut res = if let Some(start_after) = start_after {
sqlx::query_as!(
StoredMessage,
r#"
SELECT
id as "id!",
client_address_bs58 as "client_address_bs58!",
content as "content!"
FROM message_store
WHERE client_address_bs58 = ? AND id > ?
ORDER BY id ASC
LIMIT ?;
"#,
client_address_bs58,
start_after,
limit
)
.fetch_all(&self.connection_pool)
.await?
} else {
sqlx::query_as!(
StoredMessage,
r#"
SELECT
id as "id!",
client_address_bs58 as "client_address_bs58!",
content as "content!"
FROM message_store
WHERE client_address_bs58 = ?
ORDER BY id ASC
LIMIT ?;
"#,
client_address_bs58,
limit
)
.fetch_all(&self.connection_pool)
.await?
};
if res.len() > self.retrieval_limit as usize {
res.truncate(self.retrieval_limit as usize);
@@ -126,13 +146,4 @@ impl InboxManager {
.await?;
Ok(())
}
pub async fn remove_stale(&self, cutoff: OffsetDateTime) -> Result<(), sqlx::Error> {
let affected = sqlx::query!("DELETE FROM message_store WHERE timestamp < ?", cutoff)
.execute(&self.connection_pool)
.await?
.rows_affected();
debug!("Removed {affected} stale messages");
Ok(())
}
}
+2 -2
View File
@@ -3,6 +3,7 @@
use bandwidth::BandwidthManager;
use clients::{ClientManager, ClientType};
use inboxes::InboxManager;
use models::{
Client, PersistedBandwidth, PersistedSharedKeys, RedemptionProposal, StoredMessage,
VerifiedTicket, WireguardPeer,
@@ -30,7 +31,6 @@ mod tickets;
mod wireguard_peers;
pub use error::GatewayStorageError;
pub use inboxes::InboxManager;
// note that clone here is fine as upon cloning the same underlying pool will be used
#[derive(Clone)]
@@ -53,7 +53,7 @@ impl GatewayStorage {
&self.shared_key_manager
}
pub fn inbox_manager(&self) -> &InboxManager {
pub(crate) fn inbox_manager(&self) -> &InboxManager {
&self.inbox_manager
}
-2
View File
@@ -48,13 +48,11 @@ impl TryFrom<PersistedSharedKeys> for SharedGatewayKey {
}
}
#[derive(FromRow)]
pub struct StoredMessage {
pub id: i64,
#[allow(dead_code)]
pub client_address_bs58: String,
pub content: Vec<u8>,
pub timestamp: OffsetDateTime,
}
#[derive(Debug, Clone, FromRow)]
-7
View File
@@ -15,7 +15,6 @@ async-trait = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
http.workspace = true
url = { workspace = true }
once_cell = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
@@ -23,13 +22,7 @@ tracing = { workspace = true }
nym-bin-common = { path = "../bin-common" }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
hickory-resolver = { workspace = true, features = ["dns-over-https-rustls", "webpki-roots"] }
# for request timeout until https://github.com/seanmonstar/reqwest/issues/1135 is fixed
[target."cfg(target_arch = \"wasm32\")".dependencies.wasmtimer]
workspace = true
features = ["tokio"]
[dev-dependencies]
tokio = { workspace = true, features=["rt", "macros"] }
-177
View File
@@ -1,177 +0,0 @@
//! DNS resolver configuration for internal lookups.
//!
//! The resolver itself is the set combination of the google, cloudflare, and quad9 endpoints
//! supporting DoH and DoT.
//!
//! This resolver implements a fallback mechanism where, should the DNS-over-TLS resolution fail, a
//! followup resolution will be done using the hosts configured default (e.g. `/etc/resolve.conf` on
//! linux).
//!
//! Requires the `dns-over-https-rustls`, `webpki-roots` feature for the
//! `hickory-resolver` crate
#![deny(missing_docs)]
use crate::ClientBuilder;
use std::{net::SocketAddr, sync::Arc};
use hickory_resolver::lookup_ip::LookupIp;
use hickory_resolver::{
config::{LookupIpStrategy, NameServerConfigGroup, ResolverConfig, ResolverOpts},
error::ResolveError,
lookup_ip::LookupIpIntoIter,
TokioAsyncResolver,
};
use once_cell::sync::OnceCell;
use reqwest::dns::{Addrs, Name, Resolve, Resolving};
use tracing::warn;
impl ClientBuilder {
/// Override the DNS resolver implementation used by the underlying http client.
pub fn dns_resolver<R: Resolve + 'static>(mut self, resolver: Arc<R>) -> Self {
self.reqwest_client_builder = self.reqwest_client_builder.dns_resolver(resolver);
self
}
}
struct SocketAddrs {
iter: LookupIpIntoIter,
}
#[derive(Debug, thiserror::Error)]
#[error("hickory-dns resolver error: {hickory_error}")]
pub struct HickoryDnsError {
#[from]
hickory_error: ResolveError,
}
/// Wrapper around an `AsyncResolver`, which implements the `Resolve` trait.
#[derive(Debug, Default, Clone)]
pub struct HickoryDnsResolver {
/// Since we might not have been called in the context of a
/// Tokio Runtime in initialization, so we must delay the actual
/// construction of the resolver.
state: Arc<OnceCell<TokioAsyncResolver>>,
fallback: Arc<OnceCell<TokioAsyncResolver>>,
}
impl Resolve for HickoryDnsResolver {
fn resolve(&self, name: Name) -> Resolving {
let resolver = self.state.clone();
let fallback = self.fallback.clone();
Box::pin(async move {
let resolver = resolver.get_or_try_init(new_resolver)?;
// try the primary DNS resolver that we set up (DoH or DoT or whatever)
let lookup = match resolver.lookup_ip(name.as_str()).await {
Ok(res) => res,
Err(e) => {
// on failure use the fall back system configured DNS resolver
warn!("primary DNS failed w/ error {e}: using system fallback");
let resolver = fallback.get_or_try_init(new_resolver_system)?;
resolver.lookup_ip(name.as_str()).await?
}
};
let addrs: Addrs = Box::new(SocketAddrs {
iter: lookup.into_iter(),
});
Ok(addrs)
})
}
}
impl Iterator for SocketAddrs {
type Item = SocketAddr;
fn next(&mut self) -> Option<Self::Item> {
self.iter.next().map(|ip_addr| SocketAddr::new(ip_addr, 0))
}
}
impl HickoryDnsResolver {
/// Attempt to resolve a domain name to a set of ['IpAddr']s
pub async fn resolve_str(&self, name: &str) -> Result<LookupIp, HickoryDnsError> {
let resolver = self.state.get_or_try_init(new_resolver)?;
// try the primary DNS resolver that we set up (DoH or DoT or whatever)
let lookup = match resolver.lookup_ip(name).await {
Ok(res) => res,
Err(e) => {
// on failure use the fall back system configured DNS resolver
warn!("primary DNS failed w/ error {e}: using system fallback");
let resolver = self.fallback.get_or_try_init(new_resolver_system)?;
resolver.lookup_ip(name).await?
}
};
Ok(lookup)
}
}
/// Create a new resolver with a custom DoT based configuration. The options are overridden to look
/// up for both IPv4 and IPv6 addresses to work with "happy eyeballs" algorithm.
fn new_resolver() -> Result<TokioAsyncResolver, HickoryDnsError> {
let mut name_servers = NameServerConfigGroup::google_tls();
name_servers.merge(NameServerConfigGroup::google_https());
// name_servers.merge(NameServerConfigGroup::google_h3());
name_servers.merge(NameServerConfigGroup::quad9_tls());
name_servers.merge(NameServerConfigGroup::quad9_https());
name_servers.merge(NameServerConfigGroup::cloudflare_tls());
name_servers.merge(NameServerConfigGroup::cloudflare_https());
let config = ResolverConfig::from_parts(None, Vec::new(), name_servers);
let mut opts = ResolverOpts::default();
opts.ip_strategy = LookupIpStrategy::Ipv4AndIpv6;
// Would like to enable this when 0.25 stabilizes
// opts.server_ordering_strategy = ServerOrderingStrategy::RoundRobin;
Ok(TokioAsyncResolver::tokio(config, opts))
}
/// Create a new resolver with the default configuration, which reads from the system DNS config
/// (i.e. `/etc/resolve.conf` in unix). The options are overridden to look up for both IPv4 and IPv6
/// addresses to work with "happy eyeballs" algorithm.
fn new_resolver_system() -> Result<TokioAsyncResolver, HickoryDnsError> {
let (config, mut opts) = hickory_resolver::system_conf::read_system_conf()?;
opts.ip_strategy = LookupIpStrategy::Ipv4AndIpv6;
Ok(TokioAsyncResolver::tokio(config, opts))
}
#[cfg(test)]
mod test {
use super::*;
#[tokio::test]
async fn reqwest_hickory_doh() {
let resolver = HickoryDnsResolver::default();
let client = reqwest::ClientBuilder::new()
.dns_resolver(resolver.into())
.build()
.unwrap();
let resp = client
.get("http://ifconfig.me:80")
.send()
.await
.unwrap()
.bytes()
.await
.unwrap();
assert!(!resp.is_empty());
}
#[tokio::test]
async fn dns_lookup() -> Result<(), HickoryDnsError> {
let resolver = HickoryDnsResolver::default();
let domain = "ifconfig.me";
let addrs = resolver.resolve_str(domain).await?;
assert!(addrs.into_iter().next().is_some());
Ok(())
}
}
+199 -212
View File
@@ -6,23 +6,17 @@ use reqwest::header::HeaderValue;
use reqwest::{RequestBuilder, Response, StatusCode};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::time::Duration;
use thiserror::Error;
use tracing::{instrument, warn};
use url::Url;
#[cfg(not(target_arch = "wasm32"))]
use std::sync::Arc;
use std::{fmt::Display, time::Duration};
pub use reqwest::IntoUrl;
mod user_agent;
pub use user_agent::UserAgent;
#[cfg(not(target_arch = "wasm32"))]
mod dns;
#[cfg(not(target_arch = "wasm32"))]
pub use dns::HickoryDnsResolver;
mod user_agent;
// The timeout is relatively high as we are often making requests over the mixnet, where latency is
// high and chatty protocols take a while to complete.
@@ -92,18 +86,11 @@ impl ClientBuilder {
// TODO: or should we maybe default to https?
Self::new(alt)
} else {
#[cfg(target_arch = "wasm32")]
let reqwest_client_builder = reqwest::ClientBuilder::new();
#[cfg(not(target_arch = "wasm32"))]
let reqwest_client_builder =
reqwest::ClientBuilder::new().dns_resolver(Arc::new(HickoryDnsResolver::default()));
Ok(ClientBuilder {
url: url.into_url()?,
timeout: None,
custom_user_agent: false,
reqwest_client_builder,
reqwest_client_builder: reqwest::ClientBuilder::new(),
})
}
}
@@ -205,28 +192,6 @@ impl Client {
&self.base_url
}
pub fn create_request<B, K, V>(
&self,
method: reqwest::Method,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: Option<&B>,
) -> RequestBuilder
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
{
let url = sanitize_url(&self.base_url, path, params);
let mut request = self.reqwest_client.request(method.clone(), url);
if let Some(body) = json_body {
request = request.json(body);
}
request
}
pub fn create_get_request<K, V>(
&self,
path: PathSegments<'_>,
@@ -240,6 +205,38 @@ impl Client {
self.reqwest_client.get(url)
}
#[instrument(level = "debug", skip_all, fields(path=?path))]
async fn send_get_request<K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
) -> Result<Response, HttpClientError<E>>
where
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
tracing::trace!("Sending GET request");
let url = sanitize_url(&self.base_url, path, params);
#[cfg(target_arch = "wasm32")]
{
Ok(
wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client.get(url).send(),
)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??,
)
}
#[cfg(not(target_arch = "wasm32"))]
{
Ok(self.reqwest_client.get(url).send().await?)
}
}
pub fn create_post_request<B, K, V>(
&self,
path: PathSegments<'_>,
@@ -255,6 +252,36 @@ impl Client {
self.reqwest_client.post(url).json(json_body)
}
async fn send_post_request<B, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<Response, HttpClientError<E>>
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
let url = sanitize_url(&self.base_url, path, params);
#[cfg(target_arch = "wasm32")]
{
Ok(wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client.post(url).json(json_body).send(),
)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??)
}
#[cfg(not(target_arch = "wasm32"))]
{
Ok(self.reqwest_client.post(url).json(json_body).send().await?)
}
}
pub fn create_delete_request<K, V>(
&self,
path: PathSegments<'_>,
@@ -268,6 +295,35 @@ impl Client {
self.reqwest_client.delete(url)
}
pub async fn send_delete_request<K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
) -> Result<Response, HttpClientError<E>>
where
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
tracing::trace!("Sending DELETE request");
let url = sanitize_url(&self.base_url, path, params);
#[cfg(target_arch = "wasm32")]
{
Ok(wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client.delete(url).send(),
)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??)
}
#[cfg(not(target_arch = "wasm32"))]
{
Ok(self.reqwest_client.delete(url).send().await?)
}
}
pub fn create_patch_request<B, K, V>(
&self,
path: PathSegments<'_>,
@@ -283,87 +339,6 @@ impl Client {
self.reqwest_client.patch(url).json(json_body)
}
async fn send_request<B, K, V, E>(
&self,
method: reqwest::Method,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: Option<&B>,
) -> Result<Response, HttpClientError<E>>
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
let url = sanitize_url(&self.base_url, path, params);
let mut request = self.reqwest_client.request(method.clone(), url);
if let Some(body) = json_body {
request = request.json(body);
}
#[cfg(target_arch = "wasm32")]
{
Ok(
wasmtimer::tokio::timeout(self.request_timeout, request.send())
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??,
)
}
#[cfg(not(target_arch = "wasm32"))]
{
Ok(request.send().await?)
}
}
#[instrument(level = "debug", skip_all, fields(path=?path))]
async fn send_get_request<K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
) -> Result<Response, HttpClientError<E>>
where
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
self.send_request(reqwest::Method::GET, path, params, None::<&()>)
.await
}
async fn send_post_request<B, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<Response, HttpClientError<E>>
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
self.send_request(reqwest::Method::POST, path, params, Some(json_body))
.await
}
pub async fn send_delete_request<K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
) -> Result<Response, HttpClientError<E>>
where
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
self.send_request(reqwest::Method::DELETE, path, params, None::<&()>)
.await
}
pub async fn send_patch_request<B, K, V, E>(
&self,
path: PathSegments<'_>,
@@ -376,8 +351,27 @@ impl Client {
V: AsRef<str>,
E: Display,
{
self.send_request(reqwest::Method::PATCH, path, params, Some(json_body))
let url = sanitize_url(&self.base_url, path, params);
#[cfg(target_arch = "wasm32")]
{
Ok(wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client.patch(url).json(json_body).send(),
)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??)
}
#[cfg(not(target_arch = "wasm32"))]
{
Ok(self
.reqwest_client
.patch(url)
.json(json_body)
.send()
.await?)
}
}
#[instrument(level = "debug", skip_all)]
@@ -445,39 +439,6 @@ impl Client {
parse_response(res, true).await
}
async fn call_json_endpoint<B, T, S, E>(
&self,
method: reqwest::Method,
endpoint: S,
json_body: Option<&B>,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized,
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str>,
{
let mut request = self
.reqwest_client
.request(method.clone(), self.base_url.join(endpoint.as_ref())?);
if let Some(body) = json_body {
request = request.json(body);
}
#[cfg(target_arch = "wasm32")]
let res = {
wasmtimer::tokio::timeout(self.request_timeout, request.send())
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??
};
#[cfg(not(target_arch = "wasm32"))]
let res = { request.send().await? };
parse_response(res, false).await
}
#[instrument(level = "debug", skip_all)]
pub async fn get_json_endpoint<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
where
@@ -485,8 +446,27 @@ impl Client {
E: Display + DeserializeOwned,
S: AsRef<str>,
{
self.call_json_endpoint(reqwest::Method::GET, endpoint, None::<&()>)
#[cfg(target_arch = "wasm32")]
let res = {
wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client
.get(self.base_url.join(endpoint.as_ref())?)
.send(),
)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??
};
#[cfg(not(target_arch = "wasm32"))]
let res = {
self.reqwest_client
.get(self.base_url.join(endpoint.as_ref())?)
.send()
.await?
};
parse_response(res, false).await
}
pub async fn post_json_endpoint<B, T, S, E>(
@@ -500,8 +480,29 @@ impl Client {
E: Display + DeserializeOwned,
S: AsRef<str>,
{
self.call_json_endpoint(reqwest::Method::POST, endpoint, Some(json_body))
#[cfg(target_arch = "wasm32")]
let res = {
wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client
.post(self.base_url.join(endpoint.as_ref())?)
.json(json_body)
.send(),
)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??
};
#[cfg(not(target_arch = "wasm32"))]
let res = {
self.reqwest_client
.post(self.base_url.join(endpoint.as_ref())?)
.json(json_body)
.send()
.await?
};
parse_response(res, true).await
}
pub async fn delete_json_endpoint<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
@@ -510,8 +511,27 @@ impl Client {
E: Display + DeserializeOwned,
S: AsRef<str>,
{
self.call_json_endpoint(reqwest::Method::DELETE, endpoint, None::<&()>)
#[cfg(target_arch = "wasm32")]
let res = {
wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client
.delete(self.base_url.join(endpoint.as_ref())?)
.send(),
)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??
};
#[cfg(not(target_arch = "wasm32"))]
let res = {
self.reqwest_client
.delete(self.base_url.join(endpoint.as_ref())?)
.send()
.await?
};
parse_response(res, false).await
}
pub async fn patch_json_endpoint<B, T, S, E>(
@@ -525,8 +545,29 @@ impl Client {
E: Display + DeserializeOwned,
S: AsRef<str>,
{
self.call_json_endpoint(reqwest::Method::PATCH, endpoint, Some(json_body))
#[cfg(target_arch = "wasm32")]
let res = {
wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client
.patch(self.base_url.join(endpoint.as_ref())?)
.json(json_body)
.send(),
)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??
};
#[cfg(not(target_arch = "wasm32"))]
let res = {
self.reqwest_client
.patch(self.base_url.join(endpoint.as_ref())?)
.json(json_body)
.send()
.await?
};
parse_response(res, true).await
}
}
@@ -571,19 +612,6 @@ pub trait ApiClient {
V: AsRef<str> + Sync,
E: Display + DeserializeOwned;
async fn patch_json<B, T, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized + Sync,
for<'a> T: Deserialize<'a>,
K: AsRef<str> + Sync,
V: AsRef<str> + Sync,
E: Display + DeserializeOwned;
/// `get` json data from the provided absolute endpoint, i.e. for example `"/api/v1/mixnodes?since=12345"`
async fn get_json_from<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
where
@@ -607,17 +635,6 @@ pub trait ApiClient {
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str> + Sync + Send;
async fn patch_json_data_at<B, T, S, E>(
&self,
endpoint: S,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized + Sync,
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str> + Sync + Send;
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -667,22 +684,6 @@ impl ApiClient for Client {
self.delete_json(path, params).await
}
async fn patch_json<B, T, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized + Sync,
for<'a> T: Deserialize<'a>,
K: AsRef<str> + Sync,
V: AsRef<str> + Sync,
E: Display + DeserializeOwned,
{
self.patch_json(path, params, json_body).await
}
async fn get_json_from<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
where
for<'a> T: Deserialize<'a>,
@@ -714,20 +715,6 @@ impl ApiClient for Client {
{
self.delete_json_endpoint(endpoint).await
}
async fn patch_json_data_at<B, T, S, E>(
&self,
endpoint: S,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized + Sync,
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str> + Sync + Send,
{
self.patch_json_endpoint(endpoint, json_body).await
}
}
// utility function that should solve the double slash problem in API urls forever.
+1
View File
@@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize};
pub mod middleware;
#[derive(Debug, Clone)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub enum FormattedResponse<T> {
Json(Json<T>),
Yaml(Yaml<T>),
@@ -63,7 +63,6 @@ impl From<v6::request::StaticConnectRequest> for v7::request::StaticConnectReque
}
}
#[allow(deprecated)]
impl From<v6::request::DynamicConnectRequest> for v7::request::DynamicConnectRequest {
fn from(dynamic_connect_request: v6::request::DynamicConnectRequest) -> Self {
Self {
@@ -51,7 +51,6 @@ impl IpPacketRequest {
)
}
#[allow(deprecated)]
pub fn new_dynamic_connect_request(
reply_to: Recipient,
reply_to_hops: Option<u8>,
@@ -286,9 +285,6 @@ pub struct DynamicConnectRequest {
// The number of mix node hops that responses should take, in addition to the entry and exit
// node. Zero means only client -> entry -> exit -> client.
#[deprecated(
note = "clients can no longer control number of hops to use. this field is scheduled for removal in V8"
)]
pub reply_to_hops: Option<u8>,
// The average delay at each mix node, in milliseconds. Currently this is not supported by the
+31 -33
View File
@@ -2,9 +2,10 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::NetworkTestingError;
use crate::node::{NodeType, TestableNode};
use crate::node::TestableNode;
use crate::NodeId;
use nym_sphinx::message::NymMessage;
use nym_topology::node::RoutingNode;
use nym_topology::{gateway, mix};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
@@ -25,76 +26,73 @@ pub struct TestMessage<T = Empty> {
}
impl<T> TestMessage<T> {
pub fn new(tested_node: TestableNode, msg_id: u32, total_msgs: u32, ext: T) -> Self {
pub fn new<N: Into<TestableNode>>(node: N, msg_id: u32, total_msgs: u32, ext: T) -> Self {
TestMessage {
tested_node,
tested_node: node.into(),
msg_id,
total_msgs,
ext,
}
}
pub fn new_mix(node: &RoutingNode, msg_id: u32, total_msgs: u32, ext: T) -> Self {
Self::new(
TestableNode::new_routing(node, NodeType::Mixnode),
msg_id,
total_msgs,
ext,
)
pub fn new_mix(node: &mix::LegacyNode, msg_id: u32, total_msgs: u32, ext: T) -> Self {
Self::new(node, msg_id, total_msgs, ext)
}
pub fn new_gateway(node: &RoutingNode, msg_id: u32, total_msgs: u32, ext: T) -> Self {
Self::new(
TestableNode::new_routing(node, NodeType::Gateway),
msg_id,
total_msgs,
ext,
)
// pub fn new_gateway(node: &gateway::Node, msg_id: u32, total_msgs: u32, ext: T) -> Self {
// Self::new(node, msg_id, total_msgs, ext)
// }
pub fn new_serialized<N>(
node: N,
msg_id: u32,
total_msgs: u32,
ext: T,
) -> Result<Vec<u8>, NetworkTestingError>
where
N: Into<TestableNode>,
T: Serialize,
{
Self::new(node, msg_id, total_msgs, ext).as_bytes()
}
pub fn new_plaintexts(
node: TestableNode,
pub fn new_plaintexts<N>(
node: &N,
total_msgs: u32,
ext: T,
) -> Result<Vec<Vec<u8>>, NetworkTestingError>
where
for<'a> &'a N: Into<TestableNode>,
T: Serialize + Clone,
{
let mut msgs = Vec::with_capacity(total_msgs as usize);
for msg_id in 1..=total_msgs {
msgs.push(Self::new(node.clone(), msg_id, total_msgs, ext.clone()).as_bytes()?)
msgs.push(Self::new(node, msg_id, total_msgs, ext.clone()).as_bytes()?)
}
Ok(msgs)
}
pub fn mix_plaintexts(
node: &RoutingNode,
node: &mix::LegacyNode,
total_msgs: u32,
ext: T,
) -> Result<Vec<Vec<u8>>, NetworkTestingError>
where
T: Serialize + Clone,
{
Self::new_plaintexts(
TestableNode::new_routing(node, NodeType::Mixnode),
total_msgs,
ext,
)
Self::new_plaintexts(node, total_msgs, ext)
}
pub fn legacy_gateway_plaintexts(
node: &RoutingNode,
node: &gateway::LegacyNode,
node_id: NodeId,
total_msgs: u32,
ext: T,
) -> Result<Vec<Vec<u8>>, NetworkTestingError>
where
T: Serialize + Clone,
{
Self::new_plaintexts(
TestableNode::new_routing(node, NodeType::Gateway),
total_msgs,
ext,
)
Self::new_plaintexts(&(node, node_id), total_msgs, ext)
}
pub fn as_json_string(&self) -> Result<String, NetworkTestingError>
+33 -9
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::NodeId;
use nym_topology::node::RoutingNode;
use nym_topology::{gateway, mix};
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
@@ -24,14 +24,6 @@ impl TestableNode {
}
}
pub fn new_routing(routing_node: &RoutingNode, typ: NodeType) -> Self {
TestableNode::new(
routing_node.identity_key.to_base58_string(),
typ,
routing_node.node_id,
)
}
pub fn new_mixnode(encoded_identity: String, node_id: NodeId) -> Self {
TestableNode::new(encoded_identity, NodeType::Mixnode, node_id)
}
@@ -45,6 +37,38 @@ impl TestableNode {
}
}
impl<'a> From<&'a mix::LegacyNode> for TestableNode {
fn from(value: &'a mix::LegacyNode) -> Self {
TestableNode {
encoded_identity: value.identity_key.to_base58_string(),
typ: NodeType::Mixnode,
node_id: value.mix_id,
}
}
}
impl<'a> From<(&'a gateway::LegacyNode, NodeId)> for TestableNode {
fn from((gateway, node_id): (&'a gateway::LegacyNode, NodeId)) -> Self {
(&(gateway, node_id)).into()
}
}
impl<'a> From<&'a (gateway::LegacyNode, NodeId)> for TestableNode {
fn from((gateway, node_id): &'a (gateway::LegacyNode, NodeId)) -> Self {
(gateway, *node_id).into()
}
}
impl<'a, 'b> From<&'a (&'b gateway::LegacyNode, NodeId)> for TestableNode {
fn from((gateway, node_id): &'a (&'b gateway::LegacyNode, NodeId)) -> Self {
TestableNode {
encoded_identity: gateway.identity_key.to_base58_string(),
typ: NodeType::Gateway,
node_id: *node_id,
}
}
}
impl Display for TestableNode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
+95 -41
View File
@@ -2,22 +2,21 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::NetworkTestingError;
use crate::Empty;
use crate::NodeId;
use crate::TestMessage;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::message::NymMessage;
use nym_sphinx::params::PacketSize;
use nym_sphinx::params::{PacketSize, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx::preparer::{FragmentPreparer, PreparedFragment};
use nym_sphinx_params::PacketType;
use nym_topology::node::RoutingNode;
use nym_topology::{NymRouteProvider, NymTopology, Role};
use nym_topology::{gateway, mix, NymTopology};
use rand::{CryptoRng, Rng};
use serde::Serialize;
use std::sync::Arc;
use std::time::Duration;
pub use nym_topology::node::LegacyMixLayer;
pub struct NodeTester<R> {
rng: R,
@@ -39,6 +38,10 @@ pub struct NodeTester<R> {
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
average_ack_delay: Duration,
/// Number of mix hops each packet ('real' message, ack, reply) is expected to take.
/// Note that it does not include gateway hops.
num_mix_hops: u8,
// while acks are going to be ignored they still need to be constructed
// so that the gateway would be able to correctly process and forward the message
ack_key: Arc<AckKey>,
@@ -67,27 +70,41 @@ where
deterministic_route_selection,
average_packet_delay,
average_ack_delay,
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
ack_key,
}
}
pub fn testable_mix_topology(&self, layer: LegacyMixLayer, node: &RoutingNode) -> NymTopology {
/// Allows setting non-default number of expected mix hops in the network.
#[allow(dead_code)]
pub fn with_mix_hops(mut self, hops: u8) -> Self {
self.num_mix_hops = hops;
self
}
pub fn testable_mix_topology(&self, node: &mix::LegacyNode) -> NymTopology {
let mut topology = self.base_topology.clone();
topology.set_testable_node(layer.into(), node.clone());
topology.set_mixes_in_layer(node.layer as u8, vec![node.clone()]);
topology
}
pub fn testable_gateway_topology(&self, node: &RoutingNode) -> NymTopology {
pub fn testable_gateway_topology(&self, gateway: &gateway::LegacyNode) -> NymTopology {
let mut topology = self.base_topology.clone();
topology.set_testable_node(Role::EntryGateway, node.clone());
topology.set_testable_node(Role::ExitGateway, node.clone());
topology.set_gateways(vec![gateway.clone()]);
topology
}
pub fn simple_mixnode_test_packets(
&mut self,
mix: &mix::LegacyNode,
test_packets: u32,
) -> Result<Vec<PreparedFragment>, NetworkTestingError> {
self.mixnode_test_packets(mix, Empty, test_packets, None)
}
pub fn mixnode_test_packets<T>(
&mut self,
mix: &RoutingNode,
legacy_mix_layer: LegacyMixLayer,
mix: &mix::LegacyNode,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
@@ -95,9 +112,7 @@ where
where
T: Serialize + Clone,
{
let ephemeral_topology =
NymRouteProvider::from(self.testable_mix_topology(legacy_mix_layer, mix))
.with_ignore_egress_epoch_roles(true);
let ephemeral_topology = self.testable_mix_topology(mix);
let mut packets = Vec::with_capacity(test_packets as usize);
for plaintext in TestMessage::mix_plaintexts(mix, test_packets, msg_ext)? {
@@ -113,7 +128,7 @@ where
pub fn mixnodes_test_packets<T>(
&mut self,
nodes: &[(LegacyMixLayer, RoutingNode)],
nodes: &[mix::LegacyNode],
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
@@ -122,10 +137,9 @@ where
T: Serialize + Clone,
{
let mut packets = Vec::new();
for (layer, node) in nodes {
for node in nodes {
packets.append(&mut self.mixnode_test_packets(
node,
*layer,
msg_ext.clone(),
test_packets,
custom_recipient,
@@ -135,10 +149,9 @@ where
Ok(packets)
}
pub fn existing_identity_mixnode_test_packets<T>(
pub fn existing_mixnode_test_packets<T>(
&mut self,
encoded_mix_identity: String,
layer: LegacyMixLayer,
mix_id: NodeId,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
@@ -146,30 +159,39 @@ where
where
T: Serialize + Clone,
{
let Ok(identity) = encoded_mix_identity.parse() else {
let Some(node) = self.base_topology.find_mix(mix_id) else {
return Err(NetworkTestingError::NonExistentMixnode { mix_id });
};
self.mixnode_test_packets(&node.clone(), msg_ext, test_packets, custom_recipient)
}
pub fn existing_identity_mixnode_test_packets<T>(
&mut self,
encoded_mix_identity: String,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
) -> Result<Vec<PreparedFragment>, NetworkTestingError>
where
T: Serialize + Clone,
{
let Some(node) = self
.base_topology
.find_mix_by_identity(&encoded_mix_identity)
else {
return Err(NetworkTestingError::NonExistentMixnodeIdentity {
mix_identity: encoded_mix_identity,
});
};
let Some(node) = self.base_topology.find_node_by_identity(identity) else {
return Err(NetworkTestingError::NonExistentMixnodeIdentity {
mix_identity: encoded_mix_identity,
});
};
self.mixnode_test_packets(
&node.clone(),
layer,
msg_ext,
test_packets,
custom_recipient,
)
self.mixnode_test_packets(&node.clone(), msg_ext, test_packets, custom_recipient)
}
pub fn legacy_gateway_test_packets<T>(
&mut self,
gateway: &RoutingNode,
gateway: &gateway::LegacyNode,
node_id: NodeId,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
@@ -177,11 +199,12 @@ where
where
T: Serialize + Clone,
{
let ephemeral_topology = NymRouteProvider::from(self.testable_gateway_topology(gateway))
.with_ignore_egress_epoch_roles(true);
let ephemeral_topology = self.testable_gateway_topology(gateway);
let mut packets = Vec::with_capacity(test_packets as usize);
for plaintext in TestMessage::legacy_gateway_plaintexts(gateway, test_packets, msg_ext)? {
for plaintext in
TestMessage::legacy_gateway_plaintexts(gateway, node_id, test_packets, msg_ext)?
{
packets.push(self.wrap_plaintext_data(
plaintext,
&ephemeral_topology,
@@ -192,10 +215,36 @@ where
Ok(packets)
}
pub fn existing_gateway_test_packets<T>(
&mut self,
node_id: NodeId,
encoded_gateway_identity: String,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
) -> Result<Vec<PreparedFragment>, NetworkTestingError>
where
T: Serialize + Clone,
{
let Some(node) = self.base_topology.find_gateway(&encoded_gateway_identity) else {
return Err(NetworkTestingError::NonExistentGateway {
gateway_identity: encoded_gateway_identity,
});
};
self.legacy_gateway_test_packets(
&node.clone(),
node_id,
msg_ext,
test_packets,
custom_recipient,
)
}
pub fn wrap_plaintext_data(
&mut self,
plaintext: Vec<u8>,
topology: &NymRouteProvider,
topology: &NymTopology,
custom_recipient: Option<Recipient>,
) -> Result<PreparedFragment, NetworkTestingError> {
let message = NymMessage::new_plain(plaintext);
@@ -225,13 +274,14 @@ where
&address,
&address,
PacketType::Mix,
None,
)?)
}
pub fn create_test_packet<T>(
&mut self,
message: &TestMessage<T>,
topology: &NymRouteProvider,
topology: &NymTopology,
custom_recipient: Option<Recipient>,
) -> Result<PreparedFragment, NetworkTestingError>
where
@@ -257,6 +307,10 @@ impl<R: CryptoRng + Rng> FragmentPreparer for NodeTester<R> {
1
}
fn num_mix_hops(&self) -> u8 {
self.num_mix_hops
}
fn average_packet_delay(&self) -> Duration {
self.average_packet_delay
}
+1 -1
View File
@@ -63,4 +63,4 @@ par_signing = ["rayon"]
# but given it's not done very frequently, it shouldn't be too much of a problem
# furthermore, we can't and shouldn't dedicate the entire nym-api CPU just for verification,
# but this feature might potentially be desirable for clients.
par_verify = ["rayon"]
par_verify = ["rayon"]
@@ -4,10 +4,10 @@
use crate::ecash_group_parameters;
use crate::error::Result;
use crate::helpers::{g1_tuple_to_bytes, recover_g1_tuple};
use bls12_381::{G1Projective, Scalar};
use serde::{Deserialize, Serialize};
use subtle::Choice;
pub use bls12_381::{G1Projective, G2Projective, Scalar};
pub type SignerIndex = u64;
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
+1 -1
View File
@@ -36,7 +36,7 @@ pub mod common_types;
pub mod constants;
pub mod error;
mod helpers;
pub mod proofs;
mod proofs;
pub mod scheme;
pub mod tests;
mod traits;
@@ -8,10 +8,10 @@ use nym_sphinx_addressing::nodes::{
NymNodeRoutingAddress, NymNodeRoutingAddressError, MAX_NODE_ADDRESS_UNPADDED_LEN,
};
use nym_sphinx_params::packet_sizes::PacketSize;
use nym_sphinx_params::PacketType;
use nym_sphinx_params::{PacketType, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx_types::delays::Delay;
use nym_sphinx_types::{NymPacket, NymPacketError, MIN_PACKET_SIZE};
use nym_topology::{NymRouteProvider, NymTopologyError};
use nym_topology::{NymTopology, NymTopologyError};
use rand::{CryptoRng, RngCore};
use std::time;
@@ -43,13 +43,14 @@ impl SurbAck {
ack_key: &AckKey,
marshaled_fragment_id: [u8; 5],
average_delay: time::Duration,
topology: &NymRouteProvider,
topology: &NymTopology,
packet_type: PacketType,
) -> Result<Self, NymTopologyError>
where
R: RngCore + CryptoRng,
{
let route = topology.random_route_to_egress(rng, recipient.gateway())?;
let route =
topology.random_route_to_gateway(rng, DEFAULT_NUM_MIX_HOPS, recipient.gateway())?;
let delays = nym_sphinx_routing::generate_hop_delays(average_delay, route.len());
let destination = recipient.as_sphinx_destination();
+2 -2
View File
@@ -131,8 +131,8 @@ impl Recipient {
&self.client_encryption_key
}
pub fn gateway(&self) -> NodeIdentity {
self.gateway
pub fn gateway(&self) -> &NodeIdentity {
&self.gateway
}
pub fn to_bytes(self) -> RecipientBytes {
@@ -6,9 +6,9 @@ use nym_crypto::{generic_array::typenum::Unsigned, Digest};
use nym_sphinx_addressing::clients::Recipient;
use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, MAX_NODE_ADDRESS_UNPADDED_LEN};
use nym_sphinx_params::packet_sizes::PacketSize;
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm};
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx_types::{NymPacket, SURBMaterial, SphinxError, SURB};
use nym_topology::{NymRouteProvider, NymTopologyError};
use nym_topology::{NymTopology, NymTopologyError};
use rand::{CryptoRng, RngCore};
use serde::de::{Error as SerdeError, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
@@ -89,12 +89,13 @@ impl ReplySurb {
rng: &mut R,
recipient: &Recipient,
average_delay: time::Duration,
topology: &NymRouteProvider,
topology: &NymTopology,
) -> Result<Self, NymTopologyError>
where
R: RngCore + CryptoRng,
{
let route = topology.random_route_to_egress(rng, recipient.gateway())?;
let route =
topology.random_route_to_gateway(rng, DEFAULT_NUM_MIX_HOPS, recipient.gateway())?;
let delays = nym_sphinx_routing::generate_hop_delays(average_delay, route.len());
let destination = recipient.as_sphinx_destination();
@@ -109,12 +110,15 @@ impl ReplySurb {
/// Returns the expected number of bytes the [`ReplySURB`] will take after serialization.
/// Useful for deserialization from a bytes stream.
pub fn serialized_len() -> usize {
pub fn serialized_len(mix_hops: u8) -> usize {
use nym_sphinx_types::{HEADER_SIZE, NODE_ADDRESS_LENGTH, PAYLOAD_KEY_SIZE};
// the SURB itself consists of SURB_header, first hop address and set of payload keys
// for each hop (3x mix + egress)
SurbEncryptionKeySize::USIZE + HEADER_SIZE + NODE_ADDRESS_LENGTH + 4 * PAYLOAD_KEY_SIZE
// (note extra 1 for the gateway)
SurbEncryptionKeySize::USIZE
+ HEADER_SIZE
+ NODE_ADDRESS_LENGTH
+ (1 + mix_hops as usize) * PAYLOAD_KEY_SIZE
}
pub fn encryption_key(&self) -> &SurbEncryptionKey {
@@ -169,7 +169,10 @@ impl RepliableMessage {
.collect()
}
pub fn try_from_bytes(bytes: &[u8]) -> Result<Self, InvalidReplyRequestError> {
pub fn try_from_bytes(
bytes: &[u8],
num_mix_hops: u8,
) -> Result<Self, InvalidReplyRequestError> {
if bytes.len() < SENDER_TAG_SIZE + 1 {
return Err(InvalidReplyRequestError::RequestTooShortToDeserialize);
}
@@ -177,8 +180,11 @@ impl RepliableMessage {
AnonymousSenderTag::from_bytes(bytes[..SENDER_TAG_SIZE].try_into().unwrap());
let content_tag = RepliableMessageContentTag::try_from(bytes[SENDER_TAG_SIZE])?;
let content =
RepliableMessageContent::try_from_bytes(&bytes[SENDER_TAG_SIZE + 1..], content_tag)?;
let content = RepliableMessageContent::try_from_bytes(
&bytes[SENDER_TAG_SIZE + 1..],
num_mix_hops,
content_tag,
)?;
Ok(RepliableMessage {
sender_tag,
@@ -186,20 +192,23 @@ impl RepliableMessage {
})
}
pub fn serialized_size(&self) -> usize {
pub fn serialized_size(&self, num_mix_hops: u8) -> usize {
let content_type_size = 1;
SENDER_TAG_SIZE + content_type_size + self.content.serialized_size()
SENDER_TAG_SIZE + content_type_size + self.content.serialized_size(num_mix_hops)
}
}
// this recovery code is shared between all variants containing reply surbs
fn recover_reply_surbs(bytes: &[u8]) -> Result<(Vec<ReplySurb>, usize), InvalidReplyRequestError> {
fn recover_reply_surbs(
bytes: &[u8],
num_mix_hops: u8,
) -> Result<(Vec<ReplySurb>, usize), InvalidReplyRequestError> {
let mut consumed = mem::size_of::<u32>();
if bytes.len() < consumed {
return Err(InvalidReplyRequestError::RequestTooShortToDeserialize);
}
let num_surbs = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
let surb_size = ReplySurb::serialized_len();
let surb_size = ReplySurb::serialized_len(num_mix_hops);
if bytes[consumed..].len() < num_surbs as usize * surb_size {
return Err(InvalidReplyRequestError::RequestTooShortToDeserialize);
}
@@ -298,13 +307,14 @@ impl RepliableMessageContent {
fn try_from_bytes(
bytes: &[u8],
num_mix_hops: u8,
tag: RepliableMessageContentTag,
) -> Result<Self, InvalidReplyRequestError> {
if bytes.is_empty() {
return Err(InvalidReplyRequestError::RequestTooShortToDeserialize);
}
let (reply_surbs, n) = recover_reply_surbs(bytes)?;
let (reply_surbs, n) = recover_reply_surbs(bytes, num_mix_hops)?;
match tag {
RepliableMessageContentTag::Data => Ok(RepliableMessageContent::Data {
@@ -330,7 +340,7 @@ impl RepliableMessageContent {
}
}
fn serialized_size(&self) -> usize {
fn serialized_size(&self, num_mix_hops: u8) -> usize {
match self {
RepliableMessageContent::Data {
message,
@@ -338,18 +348,19 @@ impl RepliableMessageContent {
} => {
let num_reply_surbs_tag = mem::size_of::<u32>();
num_reply_surbs_tag
+ reply_surbs.len() * ReplySurb::serialized_len()
+ reply_surbs.len() * ReplySurb::serialized_len(num_mix_hops)
+ message.len()
}
RepliableMessageContent::AdditionalSurbs { reply_surbs } => {
let num_reply_surbs_tag = mem::size_of::<u32>();
num_reply_surbs_tag + reply_surbs.len() * ReplySurb::serialized_len()
num_reply_surbs_tag + reply_surbs.len() * ReplySurb::serialized_len(num_mix_hops)
}
RepliableMessageContent::Heartbeat {
additional_reply_surbs,
} => {
let num_reply_surbs_tag = mem::size_of::<u32>();
num_reply_surbs_tag + additional_reply_surbs.len() * ReplySurb::serialized_len()
num_reply_surbs_tag
+ additional_reply_surbs.len() * ReplySurb::serialized_len(num_mix_hops)
}
}
}
@@ -567,11 +578,11 @@ mod tests {
}
}
pub(super) fn reply_surb(rng: &mut ChaCha20Rng) -> ReplySurb {
pub(super) fn reply_surb(rng: &mut ChaCha20Rng, num_mix_hops: u8) -> ReplySurb {
// due to gateway
const HOPS: u8 = 4;
let route = (0..HOPS).map(|_| node(rng)).collect();
let delays = (0..HOPS)
let num_hops = num_mix_hops + 1;
let route = (0..num_hops).map(|_| node(rng)).collect();
let delays = (0..num_hops)
.map(|_| Delay::new_from_nanos(rng.next_u64()))
.collect();
let mut destination_bytes = [0u8; 32];
@@ -594,40 +605,47 @@ mod tests {
}
}
pub(super) fn reply_surbs(rng: &mut ChaCha20Rng, n: usize) -> Vec<ReplySurb> {
pub(super) fn reply_surbs(
rng: &mut ChaCha20Rng,
num_mix_hops: u8,
n: usize,
) -> Vec<ReplySurb> {
let mut surbs = Vec::with_capacity(n);
for _ in 0..n {
surbs.push(reply_surb(rng))
surbs.push(reply_surb(rng, num_mix_hops))
}
surbs
}
pub(super) fn repliable_content_data(
rng: &mut ChaCha20Rng,
num_mix_hops: u8,
msg_len: usize,
surbs: usize,
) -> RepliableMessageContent {
RepliableMessageContent::Data {
message: random_vec_u8(rng, msg_len),
reply_surbs: reply_surbs(rng, surbs),
reply_surbs: reply_surbs(rng, num_mix_hops, surbs),
}
}
pub(super) fn repliable_content_surbs(
rng: &mut ChaCha20Rng,
num_mix_hops: u8,
surbs: usize,
) -> RepliableMessageContent {
RepliableMessageContent::AdditionalSurbs {
reply_surbs: reply_surbs(rng, surbs),
reply_surbs: reply_surbs(rng, num_mix_hops, surbs),
}
}
pub(super) fn repliable_content_heartbeat(
rng: &mut ChaCha20Rng,
num_mix_hops: u8,
surbs: usize,
) -> RepliableMessageContent {
RepliableMessageContent::Heartbeat {
additional_reply_surbs: reply_surbs(rng, surbs),
additional_reply_surbs: reply_surbs(rng, num_mix_hops, surbs),
}
}
@@ -658,54 +676,70 @@ mod tests {
#[test]
fn serialized_size_matches_actual_serialization() {
let mut rng = fixtures::test_rng();
let num_mix_hops = 3;
let data1 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_data(&mut rng, 10000, 0),
content: fixtures::repliable_content_data(&mut rng, num_mix_hops, 10000, 0),
};
assert_eq!(data1.serialized_size(), data1.into_bytes().len());
assert_eq!(
data1.serialized_size(num_mix_hops),
data1.into_bytes().len()
);
let data2 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_data(&mut rng, 10, 100),
content: fixtures::repliable_content_data(&mut rng, num_mix_hops, 10, 100),
};
assert_eq!(data2.serialized_size(), data2.into_bytes().len());
assert_eq!(
data2.serialized_size(num_mix_hops),
data2.into_bytes().len()
);
let data3 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_data(&mut rng, 100000, 1000),
content: fixtures::repliable_content_data(&mut rng, num_mix_hops, 100000, 1000),
};
assert_eq!(data3.serialized_size(), data3.into_bytes().len());
assert_eq!(
data3.serialized_size(num_mix_hops),
data3.into_bytes().len()
);
let additional_surbs1 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_surbs(&mut rng, 1),
content: fixtures::repliable_content_surbs(&mut rng, num_mix_hops, 1),
};
assert_eq!(
additional_surbs1.serialized_size(),
additional_surbs1.serialized_size(num_mix_hops),
additional_surbs1.into_bytes().len()
);
let additional_surbs2 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_surbs(&mut rng, 1000),
content: fixtures::repliable_content_surbs(&mut rng, num_mix_hops, 1000),
};
assert_eq!(
additional_surbs2.serialized_size(),
additional_surbs2.serialized_size(num_mix_hops),
additional_surbs2.into_bytes().len()
);
let heartbeat1 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_heartbeat(&mut rng, 1),
content: fixtures::repliable_content_heartbeat(&mut rng, num_mix_hops, 1),
};
assert_eq!(heartbeat1.serialized_size(), heartbeat1.into_bytes().len());
assert_eq!(
heartbeat1.serialized_size(num_mix_hops),
heartbeat1.into_bytes().len()
);
let heartbeat2 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_heartbeat(&mut rng, 1000),
content: fixtures::repliable_content_heartbeat(&mut rng, num_mix_hops, 1000),
};
assert_eq!(heartbeat2.serialized_size(), heartbeat2.into_bytes().len());
assert_eq!(
heartbeat2.serialized_size(num_mix_hops),
heartbeat2.into_bytes().len()
);
}
}
@@ -716,33 +750,49 @@ mod tests {
#[test]
fn serialized_size_matches_actual_serialization() {
let mut rng = fixtures::test_rng();
let num_mix_hops = 3;
let data1 = fixtures::repliable_content_data(&mut rng, 10000, 0);
assert_eq!(data1.serialized_size(), data1.into_bytes().len());
let data2 = fixtures::repliable_content_data(&mut rng, 10, 100);
assert_eq!(data2.serialized_size(), data2.into_bytes().len());
let data3 = fixtures::repliable_content_data(&mut rng, 100000, 1000);
assert_eq!(data3.serialized_size(), data3.into_bytes().len());
let additional_surbs1 = fixtures::repliable_content_surbs(&mut rng, 1);
let data1 = fixtures::repliable_content_data(&mut rng, num_mix_hops, 10000, 0);
assert_eq!(
additional_surbs1.serialized_size(),
data1.serialized_size(num_mix_hops),
data1.into_bytes().len()
);
let data2 = fixtures::repliable_content_data(&mut rng, num_mix_hops, 10, 100);
assert_eq!(
data2.serialized_size(num_mix_hops),
data2.into_bytes().len()
);
let data3 = fixtures::repliable_content_data(&mut rng, num_mix_hops, 100000, 1000);
assert_eq!(
data3.serialized_size(num_mix_hops),
data3.into_bytes().len()
);
let additional_surbs1 = fixtures::repliable_content_surbs(&mut rng, num_mix_hops, 1);
assert_eq!(
additional_surbs1.serialized_size(num_mix_hops),
additional_surbs1.into_bytes().len()
);
let additional_surbs2 = fixtures::repliable_content_surbs(&mut rng, 1000);
let additional_surbs2 = fixtures::repliable_content_surbs(&mut rng, num_mix_hops, 1000);
assert_eq!(
additional_surbs2.serialized_size(),
additional_surbs2.serialized_size(num_mix_hops),
additional_surbs2.into_bytes().len()
);
let heartbeat1 = fixtures::repliable_content_heartbeat(&mut rng, 1);
assert_eq!(heartbeat1.serialized_size(), heartbeat1.into_bytes().len());
let heartbeat1 = fixtures::repliable_content_heartbeat(&mut rng, num_mix_hops, 1);
assert_eq!(
heartbeat1.serialized_size(num_mix_hops),
heartbeat1.into_bytes().len()
);
let heartbeat2 = fixtures::repliable_content_heartbeat(&mut rng, 1000);
assert_eq!(heartbeat2.serialized_size(), heartbeat2.into_bytes().len());
let heartbeat2 = fixtures::repliable_content_heartbeat(&mut rng, num_mix_hops, 1000);
assert_eq!(
heartbeat2.serialized_size(num_mix_hops),
heartbeat2.into_bytes().len()
);
}
}
+17 -6
View File
@@ -69,11 +69,11 @@ pub mod monitoring {
}
}
pub fn fragment_sent(fragment: &Fragment, client_nonce: i32, destination: PublicKey) {
pub fn fragment_sent(fragment: &Fragment, client_nonce: i32, destination: PublicKey, hops: u8) {
if enabled() {
let id = fragment.fragment_identifier().set_id();
let mut entry = FRAGMENTS_SENT.entry(id).or_default();
let s = SentFragment::new(fragment.header(), now!(), client_nonce, destination);
let s = SentFragment::new(fragment.header(), now!(), client_nonce, destination, hops);
entry.push(s);
}
}
@@ -82,11 +82,16 @@ pub mod monitoring {
#[derive(Debug, Clone)]
pub struct FragmentMixParams {
destination: PublicKey,
hops: u8,
}
impl FragmentMixParams {
pub fn destination(&self) -> PublicKey {
self.destination
pub fn destination(&self) -> &PublicKey {
&self.destination
}
pub fn hops(&self) -> u8 {
self.hops
}
}
@@ -100,8 +105,14 @@ pub struct SentFragment {
}
impl SentFragment {
fn new(header: FragmentHeader, at: u64, client_nonce: i32, destination: PublicKey) -> Self {
let mixnet_params = FragmentMixParams { destination };
fn new(
header: FragmentHeader,
at: u64,
client_nonce: i32,
destination: PublicKey,
hops: u8,
) -> Self {
let mixnet_params = FragmentMixParams { destination, hops };
SentFragment {
header,
at,
+8 -5
View File
@@ -10,9 +10,11 @@ use nym_sphinx_addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx_chunking::fragment::COVER_FRAG_ID;
use nym_sphinx_forwarding::packet::MixPacket;
use nym_sphinx_params::packet_sizes::PacketSize;
use nym_sphinx_params::{PacketEncryptionAlgorithm, PacketHkdfAlgorithm, PacketType};
use nym_sphinx_params::{
PacketEncryptionAlgorithm, PacketHkdfAlgorithm, PacketType, DEFAULT_NUM_MIX_HOPS,
};
use nym_sphinx_types::NymPacket;
use nym_topology::{NymRouteProvider, NymTopologyError};
use nym_topology::{NymTopology, NymTopologyError};
use rand::{CryptoRng, RngCore};
use std::time;
@@ -34,7 +36,7 @@ pub enum CoverMessageError {
pub fn generate_loop_cover_surb_ack<R>(
rng: &mut R,
topology: &NymRouteProvider,
topology: &NymTopology,
ack_key: &AckKey,
full_address: &Recipient,
average_ack_delay: time::Duration,
@@ -57,7 +59,7 @@ where
#[allow(clippy::too_many_arguments)]
pub fn generate_loop_cover_packet<R>(
rng: &mut R,
topology: &NymRouteProvider,
topology: &NymTopology,
ack_key: &AckKey,
full_address: &Recipient,
average_ack_delay: time::Duration,
@@ -116,7 +118,8 @@ where
.chain(cover_content)
.collect();
let route = topology.random_route_to_egress(rng, full_address.gateway())?;
let route =
topology.random_route_to_gateway(rng, DEFAULT_NUM_MIX_HOPS, full_address.gateway())?;
let delays = nym_sphinx_routing::generate_hop_delays(average_packet_delay, route.len());
let destination = full_address.as_sphinx_destination();
+4
View File
@@ -16,6 +16,10 @@ pub mod packet_sizes;
pub mod packet_types;
pub mod packet_version;
// If somebody can provide an argument why it might be reasonable to have more than 255 mix hops,
// I will change this to [`usize`]
pub const DEFAULT_NUM_MIX_HOPS: u8 = 3;
// TODO: not entirely sure how to feel about those being defined here, ideally it'd be where [`Fragment`]
// is defined, but that'd introduce circular dependencies as the acknowledgements crate also needs
// access to that
+33 -1
View File
@@ -1,10 +1,19 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_sphinx_types::{delays, Delay};
use std::time::Duration;
use nym_sphinx_addressing::clients::Recipient;
use nym_sphinx_types::{delays, Delay, Node};
use thiserror::Error;
pub trait SphinxRouteMaker {
type Error;
fn sphinx_route(&mut self, hops: u8, destination: &Recipient)
-> Result<Vec<Node>, Self::Error>;
}
#[derive(Debug, Error, Clone, Copy)]
#[error("the route vector contains {available} nodes while {requested} hops are required")]
pub struct InvalidNumberOfHops {
@@ -12,6 +21,29 @@ pub struct InvalidNumberOfHops {
requested: u8,
}
// if one wants to provide a hardcoded route, they can
impl SphinxRouteMaker for Vec<Node> {
type Error = InvalidNumberOfHops;
fn sphinx_route(
&mut self,
hops: u8,
_destination: &Recipient,
) -> Result<Vec<Node>, InvalidNumberOfHops> {
// it's the responsibility of the caller to ensure the hardcoded route has correct number of hops
// and that it's final hop include the recipient's gateway.
if self.len() != hops as usize {
Err(InvalidNumberOfHops {
available: self.len(),
requested: hops,
})
} else {
Ok(self.clone())
}
}
}
pub fn generate_hop_delays(average_packet_delay: Duration, num_hops: usize) -> Vec<Delay> {
if average_packet_delay.is_zero() {
vec![nym_sphinx_types::Delay::new_from_millis(0); num_hops]
+11 -11
View File
@@ -149,7 +149,7 @@ impl NymMessage {
.collect()
}
fn try_from_bytes(bytes: &[u8]) -> Result<Self, NymMessageError> {
fn try_from_bytes(bytes: &[u8], num_mix_hops: u8) -> Result<Self, NymMessageError> {
if bytes.is_empty() {
return Err(NymMessageError::EmptyMessage);
}
@@ -158,7 +158,7 @@ impl NymMessage {
match typ_tag {
NymMessageType::Plain => Ok(NymMessage::Plain(bytes[1..].to_vec())),
NymMessageType::Repliable => Ok(NymMessage::Repliable(
RepliableMessage::try_from_bytes(&bytes[1..])?,
RepliableMessage::try_from_bytes(&bytes[1..], num_mix_hops)?,
)),
NymMessageType::Reply => Ok(NymMessage::Reply(ReplyMessage::try_from_bytes(
&bytes[1..],
@@ -166,10 +166,10 @@ impl NymMessage {
}
}
fn serialized_size(&self) -> usize {
fn serialized_size(&self, num_mix_hops: u8) -> usize {
let inner_size = match self {
NymMessage::Plain(msg) => msg.len(),
NymMessage::Repliable(msg) => msg.serialized_size(),
NymMessage::Repliable(msg) => msg.serialized_size(num_mix_hops),
NymMessage::Reply(msg) => msg.serialized_size(),
};
let message_type_size = 1;
@@ -207,9 +207,9 @@ impl NymMessage {
}
/// Determines the number of required packets of the provided size for the split message.
pub fn required_packets(&self, packet_size: PacketSize) -> usize {
pub fn required_packets(&self, packet_size: PacketSize, num_mix_hops: u8) -> usize {
let plaintext_per_packet = self.true_available_plaintext_per_packet(packet_size);
let serialized_len = self.serialized_size();
let serialized_len = self.serialized_size(num_mix_hops);
let (num_fragments, _) =
chunking::number_of_required_fragments(serialized_len, plaintext_per_packet);
@@ -279,11 +279,11 @@ impl PaddedMessage {
}
// reverse of NymMessage::pad_to_full_packet_lengths
pub fn remove_padding(self) -> Result<NymMessage, NymMessageError> {
pub fn remove_padding(self, num_mix_hops: u8) -> Result<NymMessage, NymMessageError> {
// we are looking for first occurrence of 1 in the tail and we get its index
if let Some(padding_end) = self.0.iter().rposition(|b| *b == 1) {
// and now we only take bytes until that point (but not including it)
NymMessage::try_from_bytes(&self.0[..padding_end])
NymMessage::try_from_bytes(&self.0[..padding_end], num_mix_hops)
} else {
Err(NymMessageError::InvalidMessagePadding)
}
@@ -304,7 +304,7 @@ mod tests {
fn serialized_size_matches_actual_serialization() {
// plain
let plain = NymMessage::new_plain(vec![1, 2, 3, 4, 5]);
assert_eq!(plain.serialized_size(), plain.into_bytes().len());
assert_eq!(plain.serialized_size(3), plain.into_bytes().len());
// a single variant for each repliable and reply is enough as they are more thoroughly tested
// internally
@@ -313,9 +313,9 @@ mod tests {
[42u8; 16].into(),
vec![],
));
assert_eq!(repliable.serialized_size(), repliable.into_bytes().len());
assert_eq!(repliable.serialized_size(3), repliable.into_bytes().len());
let reply = NymMessage::new_reply(ReplyMessage::new_data_message(vec![1, 2, 3, 4, 5]));
assert_eq!(reply.serialized_size(), reply.into_bytes().len());
assert_eq!(reply.serialized_size(3), reply.into_bytes().len());
}
}
+37 -16
View File
@@ -14,9 +14,9 @@ use nym_sphinx_anonymous_replies::reply_surb::ReplySurb;
use nym_sphinx_chunking::fragment::{Fragment, FragmentIdentifier};
use nym_sphinx_forwarding::packet::MixPacket;
use nym_sphinx_params::packet_sizes::PacketSize;
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm};
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx_types::{Delay, NymPacket};
use nym_topology::{NymRouteProvider, NymTopologyError};
use nym_topology::{NymTopology, NymTopologyError};
use rand::{CryptoRng, Rng, SeedableRng};
use rand_chacha::ChaCha8Rng;
@@ -54,13 +54,14 @@ pub trait FragmentPreparer {
fn deterministic_route_selection(&self) -> bool;
fn rng(&mut self) -> &mut Self::Rng;
fn nonce(&self) -> i32;
fn num_mix_hops(&self) -> u8;
fn average_packet_delay(&self) -> Duration;
fn average_ack_delay(&self) -> Duration;
fn generate_reply_surbs(
&mut self,
amount: usize,
topology: &NymRouteProvider,
topology: &NymTopology,
reply_recipient: &Recipient,
) -> Result<Vec<ReplySurb>, NymTopologyError> {
let mut reply_surbs = Vec::with_capacity(amount);
@@ -78,7 +79,7 @@ pub trait FragmentPreparer {
&mut self,
recipient: &Recipient,
fragment_id: FragmentIdentifier,
topology: &NymRouteProvider,
topology: &NymTopology,
ack_key: &AckKey,
packet_type: PacketType,
) -> Result<SurbAck, NymTopologyError> {
@@ -108,7 +109,7 @@ pub trait FragmentPreparer {
fn prepare_reply_chunk_for_sending(
&mut self,
fragment: Fragment,
topology: &NymRouteProvider,
topology: &NymTopology,
ack_key: &AckKey,
reply_surb: ReplySurb,
packet_sender: &Recipient,
@@ -129,8 +130,9 @@ pub trait FragmentPreparer {
.expect("the message has been incorrectly fragmented");
// this is not going to be accurate by any means. but that's the best estimation we can do
let expected_forward_delay =
Delay::new_from_millis((self.average_packet_delay().as_millis() * 3) as u64);
let expected_forward_delay = Delay::new_from_millis(
(self.average_packet_delay().as_millis() * self.num_mix_hops() as u128) as u64,
);
let fragment_identifier = fragment.fragment_identifier();
@@ -188,11 +190,12 @@ pub trait FragmentPreparer {
fn prepare_chunk_for_sending(
&mut self,
fragment: Fragment,
topology: &NymRouteProvider,
topology: &NymTopology,
ack_key: &AckKey,
packet_sender: &Recipient,
packet_recipient: &Recipient,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<PreparedFragment, NymTopologyError> {
debug!("Preparing chunk for sending");
// each plain or repliable packet (i.e. not a reply) attaches an ephemeral public key so that the recipient
@@ -201,7 +204,8 @@ pub trait FragmentPreparer {
let fragment_header = fragment.header();
let destination = packet_recipient.gateway();
monitoring::fragment_sent(&fragment, self.nonce(), destination);
let hops = mix_hops.unwrap_or(self.num_mix_hops());
monitoring::fragment_sent(&fragment, self.nonce(), *destination, hops);
let non_reply_overhead = encryption::PUBLIC_KEY_SIZE;
let expected_plaintext = match packet_type {
@@ -236,16 +240,16 @@ pub trait FragmentPreparer {
};
// generate pseudorandom route for the packet
log::trace!("Preparing chunk for sending");
log::trace!("Preparing chunk for sending with {hops} mix hops");
let route = if self.deterministic_route_selection() {
log::trace!("using deterministic route selection");
let seed = fragment_header.seed().wrapping_mul(self.nonce());
let mut rng = ChaCha8Rng::seed_from_u64(seed as u64);
topology.random_route_to_egress(&mut rng, destination)?
topology.random_route_to_gateway(&mut rng, hops, destination)?
} else {
log::trace!("using pseudorandom route selection");
let mut rng = self.rng();
topology.random_route_to_egress(&mut rng, destination)?
topology.random_route_to_gateway(&mut rng, hops, destination)?
};
let destination = packet_recipient.as_sphinx_destination();
@@ -331,6 +335,10 @@ pub struct MessagePreparer<R> {
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
average_ack_delay: Duration,
/// Number of mix hops each packet ('real' message, ack, reply) is expected to take.
/// Note that it does not include gateway hops.
num_mix_hops: u8,
nonce: i32,
}
@@ -353,10 +361,17 @@ where
sender_address,
average_packet_delay,
average_ack_delay,
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
nonce,
}
}
/// Allows setting non-default number of expected mix hops in the network.
pub fn with_mix_hops(mut self, hops: u8) -> Self {
self.num_mix_hops = hops;
self
}
/// Overwrites existing sender address with the provided value.
pub fn set_sender_address(&mut self, sender_address: Recipient) {
self.sender_address = sender_address;
@@ -365,7 +380,7 @@ where
pub fn generate_reply_surbs(
&mut self,
amount: usize,
topology: &NymRouteProvider,
topology: &NymTopology,
) -> Result<Vec<ReplySurb>, NymTopologyError> {
let mut reply_surbs = Vec::with_capacity(amount);
for _ in 0..amount {
@@ -384,7 +399,7 @@ where
pub fn prepare_reply_chunk_for_sending(
&mut self,
fragment: Fragment,
topology: &NymRouteProvider,
topology: &NymTopology,
ack_key: &AckKey,
reply_surb: ReplySurb,
packet_type: PacketType,
@@ -405,10 +420,11 @@ where
pub fn prepare_chunk_for_sending(
&mut self,
fragment: Fragment,
topology: &NymRouteProvider,
topology: &NymTopology,
ack_key: &AckKey,
packet_recipient: &Recipient,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<PreparedFragment, NymTopologyError> {
let sender = self.sender_address;
@@ -420,6 +436,7 @@ where
&sender,
packet_recipient,
packet_type,
mix_hops,
)
}
@@ -427,7 +444,7 @@ where
pub fn generate_surb_ack(
&mut self,
fragment_id: FragmentIdentifier,
topology: &NymRouteProvider,
topology: &NymTopology,
ack_key: &AckKey,
packet_type: PacketType,
) -> Result<SurbAck, NymTopologyError> {
@@ -466,6 +483,10 @@ impl<R: CryptoRng + Rng> FragmentPreparer for MessagePreparer<R> {
self.nonce
}
fn num_mix_hops(&self) -> u8 {
self.num_mix_hops
}
fn average_packet_delay(&self) -> Duration {
self.average_packet_delay
}
+129 -2
View File
@@ -14,6 +14,7 @@ use nym_sphinx_chunking::reconstruction::MessageReconstructor;
use nym_sphinx_chunking::ChunkingError;
use nym_sphinx_params::{
PacketEncryptionAlgorithm, PacketHkdfAlgorithm, ReplySurbEncryptionAlgorithm,
DEFAULT_NUM_MIX_HOPS,
};
use thiserror::Error;
@@ -78,6 +79,7 @@ pub enum MessageRecoveryError {
pub trait MessageReceiver {
fn new() -> Self;
fn reconstructor(&mut self) -> &mut MessageReconstructor;
fn num_mix_hops(&self) -> u8;
fn decrypt_raw_message<C>(
&self,
@@ -141,7 +143,7 @@ pub trait MessageReceiver {
fragment: Fragment,
) -> Result<Option<(NymMessage, Vec<i32>)>, MessageRecoveryError> {
if let Some((message, used_sets)) = self.reconstructor().insert_new_fragment(fragment) {
match PaddedMessage::new_reconstructed(message).remove_padding() {
match PaddedMessage::new_reconstructed(message).remove_padding(self.num_mix_hops()) {
Ok(message) => Ok(Some((message, used_sets))),
Err(err) => Err(MessageRecoveryError::MalformedReconstructedMessage {
source: err,
@@ -154,11 +156,28 @@ pub trait MessageReceiver {
}
}
#[derive(Clone, Default)]
#[derive(Clone)]
pub struct SphinxMessageReceiver {
/// High level public structure used to buffer all received data [`Fragment`]s and eventually
/// returning original messages that they encapsulate.
reconstructor: MessageReconstructor,
/// Number of mix hops each packet ('real' message, ack, reply) is expected to take.
/// Note that it does not include gateway hops.
num_mix_hops: u8,
}
impl SphinxMessageReceiver {
/// Allows setting non-default number of expected mix hops in the network.
// IMPORTANT NOTE: this is among others used to deserialize SURBs. Meaning that this is a
// global setting and currently always set to the default value. The implication is that it is
// not currently possible to have different number of hops for different SURB messages. So,
// don't try to use <3 mix hops for SURBs until this is refactored.
#[must_use]
pub fn with_mix_hops(mut self, hops: u8) -> Self {
self.num_mix_hops = hops;
self
}
}
impl MessageReceiver for SphinxMessageReceiver {
@@ -182,4 +201,112 @@ impl MessageReceiver for SphinxMessageReceiver {
fn reconstructor(&mut self) -> &mut MessageReconstructor {
&mut self.reconstructor
}
fn num_mix_hops(&self) -> u8 {
self.num_mix_hops
}
}
impl Default for SphinxMessageReceiver {
fn default() -> Self {
SphinxMessageReceiver {
reconstructor: Default::default(),
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
}
}
}
#[cfg(test)]
mod message_receiver {
use super::*;
use nym_crypto::asymmetric::identity;
use nym_mixnet_contract_common::LegacyMixLayer;
use nym_topology::{gateway, mix, NymTopology};
use std::collections::BTreeMap;
// TODO: is it somehow maybe possible to move it to `topology` and have if conditionally
// available to other modules?
/// Returns a hardcoded, valid instance of [`NymTopology`] that is to be used in
/// tests requiring instance of topology.
#[allow(dead_code)]
fn topology_fixture() -> NymTopology {
let mut mixes = BTreeMap::new();
mixes.insert(
1,
vec![mix::LegacyNode {
mix_id: 123,
host: "10.20.30.40".parse().unwrap(),
mix_host: "10.20.30.40:1789".parse().unwrap(),
identity_key: identity::PublicKey::from_base58_string(
"3ebjp1Fb9hdcS1AR6AZihgeJiMHkB5jjJUsvqNnfQwU7",
)
.unwrap(),
sphinx_key: encryption::PublicKey::from_base58_string(
"B3GzG62aXAZNg14RoMCp3BhELNBrySLr2JqrwyfYFzRc",
)
.unwrap(),
layer: LegacyMixLayer::One,
version: "0.8.0-dev".into(),
}],
);
mixes.insert(
2,
vec![mix::LegacyNode {
mix_id: 234,
host: "11.21.31.41".parse().unwrap(),
mix_host: "11.21.31.41:1789".parse().unwrap(),
identity_key: identity::PublicKey::from_base58_string(
"D6YaMzLSY7mANtSQRKXsmMZpqgqiVkeiagKM4V4oFPFr",
)
.unwrap(),
sphinx_key: encryption::PublicKey::from_base58_string(
"5Z1VqYwM2xeKxd8H7fJpGWasNiDFijYBAee7MErkZ5QT",
)
.unwrap(),
layer: LegacyMixLayer::Two,
version: "0.8.0-dev".into(),
}],
);
mixes.insert(
3,
vec![mix::LegacyNode {
mix_id: 456,
host: "12.22.32.42".parse().unwrap(),
mix_host: "12.22.32.42:1789".parse().unwrap(),
identity_key: identity::PublicKey::from_base58_string(
"GkWDysw4AjESv1KiAiVn7JzzCMJeksxNSXVfr1PpX8wD",
)
.unwrap(),
sphinx_key: encryption::PublicKey::from_base58_string(
"9EyjhCggr2QEA2nakR88YHmXgpy92DWxoe2draDRkYof",
)
.unwrap(),
layer: LegacyMixLayer::Three,
version: "0.8.0-dev".into(),
}],
);
NymTopology::new(
// currently coco_nodes don't really exist so this is still to be determined
mixes,
vec![gateway::LegacyNode {
node_id: 789,
host: "1.2.3.4".parse().unwrap(),
mix_host: "1.2.3.4:1789".parse().unwrap(),
clients_ws_port: 9000,
clients_wss_port: None,
identity_key: identity::PublicKey::from_base58_string(
"FioFa8nMmPpQnYi7JyojoTuwGLeyNS8BF4ChPr29zUML",
)
.unwrap(),
sphinx_key: encryption::PublicKey::from_base58_string(
"EB42xvMFMD5rUCstE2CDazgQQJ22zLv8SPm1Luxni44c",
)
.unwrap(),
version: "0.8.0-dev".into(),
}],
)
}
}

Some files were not shown because too many files have changed in this diff Show More